Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(assetlibrary): change connection life cycle between Lambda and Neptune #168

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@awssolutions/cdf-assetlibrary",
"comment": "Change connection life cycle between Lambda and Neptune",
"type": "none"
}
],
"packageName": "@awssolutions/cdf-assetlibrary"
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,10 @@ declare module 'gremlin' {
export class RemoteStrategy extends process.TraversalStrategy {}

export class DriverRemoteConnection extends RemoteConnection {
open(): Promise<void>;
close(): Promise<void>;
addListener(event: string | symbol, handler: (...args: any[]) => void): void;
removeListener(event: string | symbol, handler: (...args: any[]) => void): void;
}
}
}
63 changes: 29 additions & 34 deletions source/packages/services/assetlibrary/src/authz/authz.full.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,44 +48,39 @@ export class AuthzDaoFull extends BaseDaoFull {
const ids: string[] = deviceIds.map((d) => `device___${d}`);
ids.push(...groupPaths.map((g) => `group___${g}`));

let results;
const conn = super.getConnection();
try {
const traverser = conn.traversal
.V(ids)
.as('entity')
.union(
// return an item if the entity exists
__.project('entity', 'exists')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
const conn = await super.getConnection();
const traverser = conn.traversal
.V(ids)
.as('entity')
.union(
// return an item if the entity exists
__.project('entity', 'exists')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
.by(__.constant(true)),
// return an item if the entity is authorized
__.local(
__.until(__.has('groupPath', process.P.within(hierarchies)))
.repeat(
__.outE().has('isAuthCheck', true).otherV().simplePath().dedup()
)
.as('authorizedPath')
)
.project('entity', 'authorizedPath')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
.by(__.constant(true)),
// return an item if the entity is authorized
__.local(
__.until(__.has('groupPath', process.P.within(hierarchies)))
.repeat(
__.outE().has('isAuthCheck', true).otherV().simplePath().dedup()
)
.by(__.select('authorizedPath').values('groupPath'))
);
.as('authorizedPath')
)
.project('entity', 'authorizedPath')
.by(
__.select('entity').coalesce(
__.values('deviceId'),
__.values('groupPath')
)
)
.by(__.select('authorizedPath').values('groupPath'))
);

results = await traverser.toList();
} finally {
await conn.close();
}
const results = await traverser.toList();

logger.debug(
`authz.full.dao listAuthorizedHierarchies: results:${JSON.stringify(results)}`
Expand Down
34 changes: 23 additions & 11 deletions source/packages/services/assetlibrary/src/data/base.full.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,40 @@ import { TYPES } from '../di/types';
@injectable()
export class BaseDaoFull {
private _graph: structure.Graph;
private _conn: driver.DriverRemoteConnection | null;

public constructor(
@inject('neptuneUrl') private neptuneUrl: string,
@inject(TYPES.GraphSourceFactory) graphSourceFactory: () => structure.Graph
) {
this._graph = graphSourceFactory();
this._conn = null;
}

protected getConnection(): NeptuneConnection {
protected async getConnection(): Promise<NeptuneConnection> {
logger.debug(`base.full.dao getConnection: in:`);
const conn = new driver.DriverRemoteConnection(this.neptuneUrl, {
mimeType: 'application/vnd.gremlin-v2.0+json',
pingEnabled: false,
});

if (this._conn == null) {
logger.debug(`base.full.dao getConnection: create new connection:`);
this._conn = new driver.DriverRemoteConnection(this.neptuneUrl, {
mimeType: 'application/vnd.gremlin-v2.0+json',
pingEnabled: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to find further documentation on this. Though, we should enable ping/pong messages if this is possible. This is a common practice for validating that a connection is still active.

Copy link
Contributor

@canavandl canavandl Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the AssetLibrary service is targeting Lambda and not a long-running instance, I'd argue that we want to maintain the existing pingEnabled: false behavior. Note that the the ping interval appears to be an hour (https://github.com/apache/tinkerpop/blob/master/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js#L46C27-L46C36)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I would usually err on the side of caution, since the timeouts are configurable and could conceivably be set to 15 minutes, but that's probably not necessary here.

connectOnStartup: false,
});
this._conn.addListener('close', (code: number, message: string) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice there's only a "close" event here. As far as I can tell from the docs is that it's using WebSockets (though they don't link to it, so I'm not 100%). That lib also emits an "error" event that we should account for. Unless, of course, GremlinJS is handling that event somehow and doesn't emit it. Are you able to confirm/deny whether or not this is the case?

logger.info(`base.full.dao connection close: code: ${code}, message: ${message}`);
this._conn = null;
if (code === 1006) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why we call out this specific error and none of the others? https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(good reference btw) my reading of the error codes is that 1006 is the only error that seems unrecoverable (i.e. the Gremlin server shut down for abnormal reasons).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other close codes indicate results like the user intentionally closing the connection, but 1006 is a code that occurs due to an unintended disconnection. Even when this happens, it's not handled by gremlin, and it ends up waiting until the Lambda times out. Therefore, an error has to be thrown in this case.

Copy link
Contributor

@aaronatbissell aaronatbissell Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to comment on a closed pull request, but after upgrading our non-prod environment, I'm seeing a lot of these 1006 error codes. It's causing 500 errors in our rest-api:

image

Any thoughts at to why? Is this something we can add better recovery from?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aaronatbissell sorry that this is happening. Can you share more info about what you're seeing? Error 1006 is an underlying websocket issue that cannot be recovered from. As @yuma124 mentions, an error is thrown to fail this faster, but ideally, this should not be a common issue.

  • Does it fail 100% of the time or is it inconsistent?
  • Can you add logging to discover which calls cause the error?
  • It may be possible to get more info on where it's stuck by commenting out the throw so it hangs.
  • Are you seeing any errors from Neptune?
  • On the rest API side, you should catch this and retry it, which also might help identifying which call is triggering it.
    Also, it might be helpful to get any information off the socket connection that might tell you the state of it (e.g. you should be able to get the current number of connections https://stackoverflow.com/questions/10275667/socket-io-connected-user-count ).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it appears to be the same error as described in #178.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aaronatbissell have you used the fix in #178 and does it solve this connection issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just getting back to this. We haven't tried to fix this problem but would really like to see this fixed so we can upgrade asset library.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aaronatbissell have you tried the update in #178 and validated whether or not it fixes the issues you are seeing? If not, are you able to test it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested #178 but, as I commented in the PR, I think a better solution would be to open the neptune connection outside of the lambda handler.

As far as testing goes, I don't plan on getting around to test this any time soon....sorry!

throw new Error('Connection closed prematurely');
}
});
await this._conn.open();
}

logger.debug(`base.full.dao getConnection: withRemote:`);
const res = new NeptuneConnection(this._graph.traversal().withRemote(conn), conn);
const res = new NeptuneConnection(
this._graph.traversal().withRemote(this._conn),
);

logger.debug(`base.full.dao getConnection: exit:`);
return res;
Expand All @@ -44,14 +61,9 @@ export class BaseDaoFull {
export class NeptuneConnection {
constructor(
private _traversal: process.GraphTraversalSource,
private _connection: driver.DriverRemoteConnection
) {}

public get traversal(): process.GraphTraversalSource {
return this._traversal;
}

public async close(): Promise<void> {
await this._connection.close();
}
}
56 changes: 23 additions & 33 deletions source/packages/services/assetlibrary/src/data/common.full.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,26 +146,21 @@ export class CommonDaoFull extends BaseDaoFull {
relatedUnion.range(offsetAsInt, offsetAsInt + countAsInt);

// build the main part of the query, unioning the related traversers with the main entity we want to return
let results;
const conn = super.getConnection();
try {
const traverser = conn.traversal
.V(entityDbId)
.as('main')
.union(
relatedUnion,
__.select('main').valueMap().with_(process.withOptions.tokens)
);

// execute and retrieve the results
logger.debug(
`common.full.dao listRelated: traverser: ${JSON.stringify(traverser.toString())}`
const conn = await super.getConnection();
const traverser = conn.traversal
.V(entityDbId)
.as('main')
.union(
relatedUnion,
__.select('main').valueMap().with_(process.withOptions.tokens)
);
results = await traverser.toList();
logger.debug(`common.full.dao listRelated: results: ${JSON.stringify(results)}`);
} finally {
await conn.close();
}

// execute and retrieve the results
logger.debug(
`common.full.dao listRelated: traverser: ${JSON.stringify(traverser.toString())}`
);
const results = await traverser.toList();
logger.debug(`common.full.dao listRelated: results: ${JSON.stringify(results)}`);

if (results === undefined || results.length === 0) {
logger.debug(`common.full.dao listRelated: exit: node: undefined`);
Expand Down Expand Up @@ -193,20 +188,15 @@ export class CommonDaoFull extends BaseDaoFull {
return {};
}

let results;
const conn = super.getConnection();
try {
const query = conn.traversal
.V(entityDbIds)
.project('id', 'labels')
.by(__.coalesce(__.values('deviceId'), __.values('groupPath')))
.by(__.label().fold());

logger.silly(`common.full.dao getLabels: query: ${JSON.stringify(query)}`);
results = await query.toList();
} finally {
await conn.close();
}
const conn = await super.getConnection();
const query = conn.traversal
.V(entityDbIds)
.project('id', 'labels')
.by(__.coalesce(__.values('deviceId'), __.values('groupPath')))
.by(__.label().fold());

logger.silly(`common.full.dao getLabels: query: ${JSON.stringify(query)}`);
const results = await query.toList();
logger.silly(`common.full.dao getLabels: results: ${JSON.stringify(results)}`);

if ((results?.length ?? 0) === 0) {
Expand Down
Loading