Skip to content

Commit

Permalink
fix(assetlibrary): change connection life cycle between Lambda and Ne…
Browse files Browse the repository at this point in the history
…ptune
  • Loading branch information
yuma124 committed Oct 6, 2023
1 parent 556bfd2 commit 2b32125
Show file tree
Hide file tree
Showing 12 changed files with 1,070 additions and 1,258 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@awssolutions/cdf-assetlibrary",
"comment": "Change connection life cycle between Lambda and Neptune",
"type": "none"
}
],
"packageName": "@awssolutions/cdf-assetlibrary"
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,10 @@ declare module 'gremlin' {
export class RemoteStrategy extends process.TraversalStrategy {}

export class DriverRemoteConnection extends RemoteConnection {
open(): Promise<void>;
close(): Promise<void>;
addListener(event: string | symbol, handler: (...args: any[]) => void): void;
removeListener(event: string | symbol, handler: (...args: any[]) => void): void;
}
}
}
63 changes: 29 additions & 34 deletions source/packages/services/assetlibrary/src/authz/authz.full.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,44 +48,39 @@ export class AuthzDaoFull extends BaseDaoFull {
const ids: string[] = deviceIds.map((d) => `device___${d}`);
ids.push(...groupPaths.map((g) => `group___${g}`));

let results;
const conn = super.getConnection();
try {
const traverser = conn.traversal
.V(ids)
.as('entity')
.union(
// return an item if the entity exists
__.project('entity', 'exists')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
const conn = await super.getConnection();
const traverser = conn.traversal
.V(ids)
.as('entity')
.union(
// return an item if the entity exists
__.project('entity', 'exists')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
.by(__.constant(true)),
// return an item if the entity is authorized
__.local(
__.until(__.has('groupPath', process.P.within(hierarchies)))
.repeat(
__.outE().has('isAuthCheck', true).otherV().simplePath().dedup()
)
.as('authorizedPath')
)
.project('entity', 'authorizedPath')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
.by(__.constant(true)),
// return an item if the entity is authorized
__.local(
__.until(__.has('groupPath', process.P.within(hierarchies)))
.repeat(
__.outE().has('isAuthCheck', true).otherV().simplePath().dedup()
)
.by(__.select('authorizedPath').values('groupPath'))
);
.as('authorizedPath')
)
.project('entity', 'authorizedPath')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
)
.by(__.select('authorizedPath').values('groupPath'))
);

results = await traverser.toList();
} finally {
await conn.close();
}
const results = await traverser.toList();

logger.debug(
`authz.full.dao listAuthorizedHierarchies: results:${JSON.stringify(results)}`
Expand Down
34 changes: 23 additions & 11 deletions source/packages/services/assetlibrary/src/data/base.full.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,40 @@ import { TYPES } from '../di/types';
@injectable()
export class BaseDaoFull {
private _graph: structure.Graph;
private _conn: driver.DriverRemoteConnection | null;

public constructor(
@inject('neptuneUrl') private neptuneUrl: string,
@inject(TYPES.GraphSourceFactory) graphSourceFactory: () => structure.Graph
) {
this._graph = graphSourceFactory();
this._conn = null;
}

protected getConnection(): NeptuneConnection {
protected async getConnection(): Promise<NeptuneConnection> {
logger.debug(`base.full.dao getConnection: in:`);
const conn = new driver.DriverRemoteConnection(this.neptuneUrl, {
mimeType: 'application/vnd.gremlin-v2.0+json',
pingEnabled: false,
});

if (this._conn == null) {
logger.debug(`base.full.dao getConnection: create new connection:`);
this._conn = new driver.DriverRemoteConnection(this.neptuneUrl, {
mimeType: 'application/vnd.gremlin-v2.0+json',
pingEnabled: false,
connectOnStartup: false,
});
this._conn.addListener('close', (code: number, message: string) => {
logger.info(`base.full.dao connection close: code: ${code}, message: ${message}`);
this._conn = null;
if (code === 1006) {
throw new Error('Connection closed prematurely');
}
});
await this._conn.open();
}

logger.debug(`base.full.dao getConnection: withRemote:`);
const res = new NeptuneConnection(this._graph.traversal().withRemote(conn), conn);
const res = new NeptuneConnection(
this._graph.traversal().withRemote(this._conn),
);

logger.debug(`base.full.dao getConnection: exit:`);
return res;
Expand All @@ -44,14 +61,9 @@ export class BaseDaoFull {
export class NeptuneConnection {
constructor(
private _traversal: process.GraphTraversalSource,
private _connection: driver.DriverRemoteConnection
) {}

public get traversal(): process.GraphTraversalSource {
return this._traversal;
}

public async close(): Promise<void> {
await this._connection.close();
}
}
56 changes: 23 additions & 33 deletions source/packages/services/assetlibrary/src/data/common.full.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,26 +146,21 @@ export class CommonDaoFull extends BaseDaoFull {
relatedUnion.range(offsetAsInt, offsetAsInt + countAsInt);

// build the main part of the query, unioning the related traversers with the main entity we want to return
let results;
const conn = super.getConnection();
try {
const traverser = conn.traversal
.V(entityDbId)
.as('main')
.union(
relatedUnion,
__.select('main').valueMap().with_(process.withOptions.tokens)
);

// execute and retrieve the results
logger.debug(
`common.full.dao listRelated: traverser: ${JSON.stringify(traverser.toString())}`
const conn = await super.getConnection();
const traverser = conn.traversal
.V(entityDbId)
.as('main')
.union(
relatedUnion,
__.select('main').valueMap().with_(process.withOptions.tokens)
);
results = await traverser.toList();
logger.debug(`common.full.dao listRelated: results: ${JSON.stringify(results)}`);
} finally {
await conn.close();
}

// execute and retrieve the results
logger.debug(
`common.full.dao listRelated: traverser: ${JSON.stringify(traverser.toString())}`
);
const results = await traverser.toList();
logger.debug(`common.full.dao listRelated: results: ${JSON.stringify(results)}`);

if (results === undefined || results.length === 0) {
logger.debug(`common.full.dao listRelated: exit: node: undefined`);
Expand Down Expand Up @@ -193,20 +188,15 @@ export class CommonDaoFull extends BaseDaoFull {
return {};
}

let results;
const conn = super.getConnection();
try {
const query = conn.traversal
.V(entityDbIds)
.project('id', 'labels')
.by(__.coalesce(__.values('deviceId'), __.values('groupPath')))
.by(__.label().fold());

logger.silly(`common.full.dao getLabels: query: ${JSON.stringify(query)}`);
results = await query.toList();
} finally {
await conn.close();
}
const conn = await super.getConnection();
const query = conn.traversal
.V(entityDbIds)
.project('id', 'labels')
.by(__.coalesce(__.values('deviceId'), __.values('groupPath')))
.by(__.label().fold());

logger.silly(`common.full.dao getLabels: query: ${JSON.stringify(query)}`);
const results = await query.toList();
logger.silly(`common.full.dao getLabels: results: ${JSON.stringify(results)}`);

if ((results?.length ?? 0) === 0) {
Expand Down
Loading

0 comments on commit 2b32125

Please sign in to comment.