A Java Gremlin client for Amazon Neptune that allows you to change the endpoints used by the client as it is running. Includes an endpoint refresh agent that can get cluster topology details, and update the client on a periodic basis. You can supply your own custom endpoint selectors to configure the client for a subset of instances in your cluster based on tags, instance types, instance IDs, Availability Zones, etc.
The client also provides support for connecting to Neptune via a proxy such as a network or application load balancer, as an alternative to using an endpoint refresh agent and custom endpoint selectors.
See Migrating from version 1 of the Neptune Gremlin Client if you are migrating an application from version 1.x.x of the Neptune Gremlin Client.
The following example shows how to build a GremlinClient
that connects to and round-robins requests across all available Neptune serverless instances that have been tagged "analytics". The list of endpoints that match this selection criteria is refreshed every 60 seconds. The refresh agent that updates the list of endpoints uses an AWS Lambda proxy function to retrieve details of the Neptune database's cluster topology.
EndpointsSelector selector = (cluster) ->
new EndpointCollection(
cluster.getInstances().stream()
.filter(i -> i.hasTag("workload", "analytics"))
.filter(i -> i.getInstanceType().equals("db.serverless"))
.filter(NeptuneInstanceMetadata::isAvailable)
.collect(Collectors.toList()));
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.lambdaProxy("neptune-endpoints-info-lambda");
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.addContactPoints(refreshAgent.getEndpoints(selector))
.create();
GremlinClient client = cluster.connect();
refreshAgent.startPollingNeptuneAPI(
RefreshTask.refresh(client, selector),
60,
TimeUnit.SECONDS);
DriverRemoteConnection connection = DriverRemoteConnection.using(client);
GraphTraversalSource g = AnonymousTraversalSource.traversal().withRemote(connection);
for (int i = 0; i < 100; i++) {
List<Map<Object, Object>> results = g.V().limit(10).valueMap(true).toList();
for (Map<Object, Object> result : results) {
//Do nothing
}
}
refreshAgent.close();
client.close();
cluster.close();
<dependency>
<groupId>software.amazon.neptune</groupId>
<artifactId>gremlin-client</artifactId>
<version>2.0.1</version>
</dependency>
- Overview
- Creating a GremlinCluster and GremlinClient
- Using a ClusterEndpointsRefreshAgent
- EndpointsSelector
- Connecting to an IAM auth enabled Neptune database
- Connecting via a proxy
- Using a load balancer with host-based routing
- Metrics
- Usage
- Demo
With the Neptune Gremlin Client you create a GremlinCluster
and GremlinClient
much as you would create a Cluster
and Client
with the Tinkerpop Java driver. The Neptune Gremlin Client is designed to be a near-drop-in replacement for the Java driver. Internally, it uses the Java driver to connect to Neptune and issue queries.
You populate a GremlinCluster
with one or more endpoints, or contact points, when you first create a client, but you can also refresh this list of endpoints from your code whenever you want. The GremlinClient
exposes a refreshEndpoints()
method that allows you to supply a new set of endpoints. This allows a running application to adapt to changes in your Neptune database's cluster topology.
The easiest way to automatically refresh the list of endpoints is to use a ClusterEndpointsRefreshAgent
. The agent can be configured to periodically discover the database cluster's current topology, select a set of endpoints, and update the client.
A ClusterEndpointsRefreshAgent
can be configured to get the database cluster's topology directly from the Neptune Management API, or from an AWS Lambda proxy function, which fetches and caches the cluster topology from the Management API on behalf of multiple clients. Unless you have a very small number of client instances (1-5) in your application, we recommend using a Lambda proxy to get the cluster toplogy. This reduces the risk of the Management API throttling requests from many clients.
Your application can then use an EndpointsSelector
to select an appropriate set of endpoints from the current cluster topology.
The following diagram shows how an application can use a GremlinClient
, ClusterEndpointsRefreshAgent
, and AWS Lambda proxy function to access a Neptune database:
The following diagram shows how an application can use a GremlinClient
, and a ClusterEndpointsRefreshAgent
that gets cluster topology information directly from the Neptune Management API, to access a Neptune database:
One of the benefits of the Neptune Gremlin Client is that it helps you distribute requests evenly across multiple read replicas in a Neptune cluster.
If you're building an application that needs to distribute requests across replicas, your first choice will typically be the reader endpoint, which balances connections across replicas. The reader endpoint continues to balance connections across replicas even if you change the cluster topology by adding or removing replicas, or promoting a replica to become the new primary.
However, in some circumstances using the reader endpoint can result in an uneven use of cluster resources. The reader endpoint works by periodically changing the host that the DNS entry points to. If a client opens a lot of connections before the DNS entry changes, all the connection requests are sent to a single Neptune instance. The same thing happens if DNS caching occurs in the application layer: the client ends up using the same replica over and over again. If an application opens a lot of connections to the reader endpoint at the same time, many of those connections can end up being tied to a single replica.
The Neptune Gremlin Client more fairly distributes connections and requests across a set of instances in a Neptune cluster. The client works by creating a connection pool for each instance endpoint in a given list of endpoints, and distributing requests (queries, not connections) in a round-robin fashion across these connection pools, thereby ensuring a more even distribution of work, and higher read throughput.
Note that the Neptune Gremlin Client will only round-robin requests across multiple read replicas if you supply it with a list of replica instance endpoints. If you supply it with the reader endpoint, you may continue to see connections and requests unevenly distributed across the cluster.
You create a GremlinCluster
and GremlinClient
using a NeptuneGremlinClusterBuilder
:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.addContactPoints("replica-endpoint-1", "replica-endpoint-2", "replica-endpoint-3")
.create();
GremlinClient client = cluster.connect();
DriverRemoteConnection connection = DriverRemoteConnection.using(client);
GraphTraversalSource g = AnonymousTraversalSource.traversal().withRemote(connection);
// Use g throughout the lifetime of your application to submit queries to Neptune
client.close();
cluster.close();
The NeptuneGremlinClusterBuilder
is configured to use port 8182, and enable SSL by default.
Use the GraphTraversalSource
created here throughout the lifetime of your application, and across threads – just as you would with the TinkerPop Java driver client. The GremlinClient
ensures that requests are distributed across the current set of endpoints in a round-robin fashion.
The GremlinClient
has a refreshEndpoints()
method that allows you to submit a fresh list of endpoint addresses. When the list of endpoints changes, subsequent requests will be distributed across the new set of endpoints.
Once you have a reference to a GremlinClient
, you can call this refreshEndpoints()
method whenever you discover the cluster topology has changed. You could subscribe to SNS events, for example, and refresh the list whenever an instance is added or removed, or when you detect a failover.
To update the list of endpoint addresses:
client.refreshEndpoints(new EndpointCollection(Arrays.asList(
new DatabaseEndpoint().withAddress("new-replica-endpoint-1"),
new DatabaseEndpoint().withAddress("new-replica-endpoint-2"),
new DatabaseEndpoint().withAddress("new-replica-endpoint-3")
)));
From version 2.0.5 onwards, you can use:
client.refreshEndpoints(
new DatabaseEndpoint().withAddress("new-replica-endpoint-1"),
new DatabaseEndpoint().withAddress("new-replica-endpoint-2"),
new DatabaseEndpoint().withAddress("new-replica-endpoint-3")
);
You can also use a ClusterEndpointsRefreshAgent
to update the endpoints automatically on a periodic basis.
Because the cluster topology can change at any moment as a result of both planned and unplanned events, you should wrap all queries with an exception handler. Should a query fail because the underlying client connection has been closed, you can attempt a retry.
Most of the best practices for using the TinkerPop Gremlin Java client with Amazon Neptune apply to the Neptune Gremlin Client.
One important point to note is that with the Neptune Gremlin Client, all connection and connection pool settings specified using the NeptuneGremlinClusterBuilder
apply on a per endpoint basis. For example, if you configure the NeptuneGremlinClusterBuilder
with three endpoints, then it will create a client with three connection pools. Each connection pool will be configured separately with the connection pool settings specified using the NeptuneGremlinClusterBuilder
.
Old versions of the TinkerPop Gremlin Java client configured with a minConnectionPoolSize
smaller than the maxConnectionPoolSize
could sometimes appear to hang if they needed to add a new connection to the pool to handle an increase in traffic. If the thread used to schedule the creation of a new connection was already doing other work, it sometimes happened that the new connection would never be created, thereby blocking the client from sending any further requests. To mitigate this, we used to recommend configuring the client with minConnectionPoolSize
equal to maxConnectionPoolSize
, so that all connections in the pool were created eagerly.
This issue has been addressed in newer versions of the TinkerPop Gremlin Java client (on which the Neptune Gremlin Client depends), so the former advice no longer applies. Consider setting minConnectionPoolSize
(per endpoint) to accomodate your steady traffic, and maxConnectionPoolSize
the peak in your traffic. The exact values will depend on your workload, and may require some experimentation. If in doubt, leave the builder to use the default values (2
and 8
respectively).
If you are using the Neptune Gremlin Client in an AWS Lambda function, consider setting both minConnectionPoolSize
and maxConnectionPoolSize
to 1
. Because concurrent client requests to your Lambda functions are handled by different function instances running in separate execution contexts, there's no need to maintain a pool of connections to handle concurrent requests inside each function instance.
The ClusterEndpointsRefreshAgent
allows you to schedule endpoint updates to a GremlinClient
. The agent can be configured to periodically discover the database cluster's current topology, select a set of endpoints using an EndpointsSelector
, and update a client.
A ClusterEndpointsRefreshAgent
can be configured to get the database cluster's topology directly from the Neptune Management API, or from an AWS Lambda proxy function, which fetches and caches the cluster topology from the Management API on behalf of multiple clients. Unless you have a very small number of client instances (1-5) in your application, you should use a Lambda proxy to get the cluster toplogy. This reduces the risk of the Management API throttling requests from many clients.
The following example shows how to create a ClusterEndpointsRefreshAgent
that queries an AWS Lambda proxy function to discover the database cluster's current topology. The proxy function periodically fetches and caches the cluster topology from the Management API on behalf of multiple clients. When the agent gets the cluster topology from the Lambda function, it then updates a GremlinClient
with the current set of read replica endpoints. Notice how the builder's addContactPoints()
method uses refreshAgent.getEndpoints(selector)
to get an initial list of endpoints from the refresh agent using the selector.
EndpointsSelector selector = EndpointsType.ReadReplicas;
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.lambdaProxy("neptune-endpoints-info-lambda");
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.addContactPoints(refreshAgent.getEndpoints(selector))
.create();
GremlinClient client = cluster.connect();
refreshAgent.startPollingNeptuneAPI(
RefreshTask.refresh(client, selector),
60,
TimeUnit.SECONDS);
When you use a ClusterEndpointsRefreshAgent
to query an AWS Lambda proxy function for cluster topology information, the identity under which you're running the agent must be authorized to perform lambda:InvokeFunction
for the proxy Lambda function. See ClusterEndpointsRefreshAgent credentials for details of supplying credentials to the refresh agent.
- Build the AWS Lambda proxy from source, or download the latest release, and put it an Amazon S3 bucket.
- Install the Lambda proxy in your account using this CloudFormation template. The template includes parameters for the current Neptune cluster ID, and the S3 source for the Lambda proxy jar (from step 1).
- Ensure all parts of your application are using the latest Neptune Gremlin Client.
- The Neptune Gremlin Client should be configured to fetch the cluster topology information from the Lambda proxy using the
ClusterEndpointsRefreshAgent.lambdaProxy()
method, as per the example above.
The AWS Lambda proxy has the following environment variables:
clusterId
– The cluster ID of the Amazon Neptune cluster to be polled for endpoint information.pollingIntervalSeconds
– The number of seconds between polls.suspended
– Determines whether specific endpoints will be suspended (see the next section). Valid values are:none
,all
,writer
,reader
.
The Lambda proxy has a suspended
environment variable that accepts a comma-separated list of the following values: all
, writer
, reader
, <endpoint_address>
, <instance_id>
. You can use this environment variable to suspend specific types of endpoint and specific instance endpoints. Suspended endpoints will not be chosen by the client when it applies a selector to the cluster topology.
To suspend a particular endpoint type or specific instance endpoint, change the variable value, save the change, and once it has propagated (this may take up to a minute), all clients that use the Lambda proxy will see the specified endpoints as being suspended. Setting the value to reader
, for example, will ensure that all instances currently in a reader role will be seen as suspended. Setting the value to writer,neptune-db-2-05d7a510
will suspend both the primary instance (the writer) and the instance with the instance id neptune-db-2-05d7a510
.
You can also suspend specific instance endpoints using Amazon Neptune tags. To suspend a specific instance, attach a tag with the key neptune:suspended
, and value true
to the instance. To remove the suspension, delete the tag or set its value to false
.
An endpoint will be considered suspended if it has been tagged as suspended, or if it is included in a group referred to by the suspended
environment variable, or if it is directly referred to using its endpoint address or instance id in the suspended
environment variable. For example, if the suspended
environment variable is set to all
, and an instance has also been tagged as suspended, it will be considered suspended. If the tag is susequently removed, the instance will still be considered suspended, because it belongs to the all
group. If an instance is tagged neptune:suspended
with the value false
, but its id is included in the suspended
environment variable, the instance will be suspended.
You can use this feature to prevent traffic to your cluster while you perform maintenance, upgrade or migration activities. Suspended endpoints apply back pressure in the client, preventing it from sending queries to the database cluster. To manage this back pressure, your application will have to handle an EndpointsUnavailableException
. This exception can occur in two different places:
- When you call
NeptuneGremlinClusterBuilder.create()
. - When you submit a query using an existing
GraphTraversalSource
.
The EndpointsUnavailableException
will appear as the root cause: invariably, it is wrapped in a RemoteConnectionexception
, or similar. The only reason you should see an EndpointsUnavailableException
is because the endpoints have been suspended.
Suspended endpoints are hidden from endpoint selectors. If an endpoint is suspended, it will not be considered for selection, even if it matches the selection criteria.
The following example shows how to create a ClusterEndpointsRefreshAgent
that queries the Neptune Management API to discover the database cluster's current topology. The agent then updates a GremlinClient
with the current set of read replica endpoints. Notice how the builder's addContactPoints()
method uses refreshAgent.getEndpoints(selector)
to get an initial list of endpoints from the refresh agent using the selector.
EndpointsSelector selector = EndpointsType.ReadReplicas;
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.managementApi("cluster-id");
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.addContactPoints(refreshAgent.getEndpoints(selector))
.create();
GremlinClient client = cluster.connect();
refreshAgent.startPollingNeptuneAPI(
RefreshTask.refresh(client, selector),
60,
TimeUnit.SECONDS);
When you use a ClusterEndpointsRefreshAgent
to query the Neptune Management API directly, the identity under which you're running the agent must be authorized to perform rds:DescribeDBClusters
for your Neptune cluster, and rds:DescribeDBInstances
and rds:ListTagsForResource
for db:*
. See ClusterEndpointsRefreshAgent credentials for details of supplying credentials to the refresh agent.
(rds:DescribeDBInstances
and rds:ListTagsForResource
require permissions for db:*
because a db
resource type can't be restricted by cluster name. A db
resource can be restricted by instance name, but this is not particularly useful here because the refresh agent is looking for instances that may have been created after the IAM policy was formulated.)
When the Neptune Management API experiences a high rate of requests, it starts throttling API calls. If you have a lot of clients frequently polling for endpoint information, your application can very quickly experience throttling (in the form of HTTP 400 throttling exceptions).
Because of this throttling behaviour, if your application uses a lot of concurrent GremlinClient
and ClusterEndpointsRefreshAgent
instances, instead of querying the Management API directly, you should proxy endpoint refresh requests through an AWS Lambda function. The Lambda function can periodically query the Management API and then cache the results on behalf of its clients.
When you create a ClusterEndpointsRefreshAgent
using one of the lambaProxy
or managementApi
factory methods, you can supply the credentials necessary to invoke the AWS Lambda proxy, or the Neptune Management API, as appropriate. These can be a separate set of credentials from the credentials used to query your Neptune database.
You can supply the name of a named profile in a local profile configuration file:
String profileName = "my-profile";
// Using a Lambda proxy
ClusterEndpointsRefreshAgent lambdaProxyRefreshAgent =
ClusterEndpointsRefreshAgent.lambdaProxy("neptune-endpoints-info-lambda", "eu-west-1", profileName);
// Querying the Neptune Management API
ClusterEndpointsRefreshAgent managementApiRefreshAgent =
ClusterEndpointsRefreshAgent.managementApi("my-cluster-id", "eu-west-1", profileName);
Or you can supply an implementation of AWSCredentialsProvider
:
AWSCredentialsProvider credentialsProvider =
new ProfileCredentialsProvider("my-profile")
// Using a Lambda proxy
ClusterEndpointsRefreshAgent lambdaProxyRefreshAgent =
ClusterEndpointsRefreshAgent.lambdaProxy("neptune-endpoints-info-lambda", "eu-west-1", credentialsProvider);
// Querying the Neptune Management API
ClusterEndpointsRefreshAgent managementApiRefreshAgent =
ClusterEndpointsRefreshAgent.managementApi("my-cluster-id", "eu-west-1", credentialsProvider);
If you're building a serverless application that uses AWS Lambda functions to query Neptune, and you're using a Neptune Gremlin Client and a refresh agent in those Lambda functions, you must ensure the refresh agent wakes up and refreshes in a timely manner. You do this by calling the awake()
method on the functions's ClusterEndpointsRefreshAgent
instance (available from version 2.0.2 onwards). Do this with each function invocation in the functions's handler. For example:
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
refreshAgent.awake();
// Rest of the handler code
}
The refresh agent schedules its refreshes on a background thread. A Lambda context – the container in which a function executes – survives across multiple invocations of the function, but in between invocations it is effectively asleep. If a refresh is scheduled to occur while the Lambda context is asleep, the refresh will not take place. As a result, changes in the Neptune cluster topology that might be expected to propogate to the Lambda proxy in several seconds (depending on the refresh interval you specify) can take several minutes to appear – appearing only when a refresh coincides with a period when the context is awake.
By calling awake()
at the beginning of every function invocation, you ensure that refreshes occur in a timely manner.
Normally your application, Neptune database and Lambda proxy will be in the same account. If, however, you need to assume a cross-account role to contact the Lambda proxy or Neptune Management API, you can use an STSAssumeRoleSessionCredentialsProvider
to create a temporary session for authentication to a resource in another account.
Before you access the Management API or Lambda proxy across accounts, follow the instructions in this tutorial to delegate access to the resources across AWS accounts using IAM roles. In line with this tutorial, you'll need to:
- Create a managed policy and role in the resource account (the account containing the Neptune database cluster or the AWS Lambda proxy) that allows trusted users to access the resource.
- Grant access to this role to the identity under which you're running the refresh agent.
- Create an
STSAssumeRoleSessionCredentialsProvider
that can assume the role, and which can be passed to the refresh agent.
If you want to access the Neptune Management API across accounts, the managed policy document that you create in the Neptune account in Step 1 of the tutorial should look like this :
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"rds:DescribeDBClusters"
],
"Resource": "<NEPTUNE_CLUSTER_ARN>"
},
{
"Effect": "Allow",
"Action": [
"rds:DescribeDBInstances",
"rds:ListTagsForResource"
],
"Resource": "arn:${Partition}:rds:${Region}:${Account}:db:*"
}
]
}
In the above example, replace <NEPTUNE_CLUSTER_ARN>
with the ARN of your Neptune database cluster, and the ${Partition}
, ${Region}
and ${Account}
placeholders with the relevant values for your account.
If you want to access the Lambda proxy across accounts, the managed policy document that you create in the Neptune account in Step 1 of the tutorial should look like this:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "<LAMBDA_ARN>"
}
]
}
In the above example, replace <LAMBDA_ARN>
with the ARN of your Lambda proxy function.
Once you've completed Step 2 of the tutorial, you can then create a refresh agent using an STSAssumeRoleSessionCredentialsProvider
.
The following example shows how to access a Lambda proxy across accounts. Replace <CROSS_ACCOUNT_ROLE_ARN>
with the ARN of the role created in Step 1 of the tutorial:
String lambdaName = "neptune-endpoinst-info";
String lambdaRegion = "eu-west-1";
String crossAccountRoleArn = "<CROSS_ACCOUNT_ROLE_ARN>";
STSAssumeRoleSessionCredentialsProvider credentialsProvider =
new STSAssumeRoleSessionCredentialsProvider.Builder(crossAccountRoleArn, "AssumeRoleSession1")
.build();
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.lambdaProxy(lambdaName, lambdaRegion, credentialsProvider);
The following example shows hot to access the Neptune aMnagement API across accounts. Replace <CROSS_ACCOUNT_ROLE_ARN>
with the ARN of the role created in Step 1 of the tutorial:
String clusterId = "my-cluster-id";
String neptuneRegion = "eu-west-1";
String crossAccountRoleArn = "<CROSS_ACCOUNT_ROLE_ARN>";
STSAssumeRoleSessionCredentialsProvider credentialsProvider =
new STSAssumeRoleSessionCredentialsProvider.Builder(crossAccountRoleArn, "AssumeRoleSession1")
.build();
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.managementApi(clusterId, neptuneRegion, credentialsProvider);
If AWS STS Regional endpoints have been enabled in your account, you may want to configure the credentials provider for Regional STS endpoint access:
EndpointConfiguration regionEndpointConfig = new EndpointConfiguration("https://sts.eu-west-1.amazonaws.com", "eu-west-1");
AWSSecurityTokenService stsRegionalClient = AWSSecurityTokenServiceClientBuilder.standard()
.withEndpointConfiguration(regionEndpointConfig)
.build();
STSAssumeRoleSessionCredentialsProvider credentialsProvider =
new STSAssumeRoleSessionCredentialsProvider.Builder(crossAccountRoleArn, "AssumeRoleSession1")
.withStsClient(stsRegionalClient)
.build();
Remember to call close()
on the credentials provider when it is no longer needed. This shuts down the thread that performs asynchronous credential refreshing.
The EndpointsSelector
interface allows you to create objects that encapuslate custom endpoint selection logic. When a selector's getEndpoints()
method is invoked, it is passed a NeptuneClusterMetadata
object that contains details about the database cluster's topology. In your getEndpoints()
implementation, you can then filter instances in the cluster by properties such as role (reader or writer), instance ID, instance type, tags, and Availability Zone.
The following example shows how to create an EndpointsSelector
that returns the endpoints of available Neptune serverless instances in a cluster that have been tagged "analytics":
EndpointsSelector selector = (cluster) ->
new EndpointCollection(
cluster.getInstances().stream()
.filter(i -> i.hasTag("workload", "analytics"))
.filter(i -> i.getInstanceType().equals("db.serverless"))
.filter(NeptuneInstanceMetadata::isAvailable)
.collect(Collectors.toList()));
The isAvailable()
property of a NeptuneInstanceMetadata
object indicates whether an endpoint is likely to be available. An endpoint is considered likely to be available if the instance itself is in one of the following states: available
, backing-up
, modifying
, upgrading
.
The full list of database instances states can be found here. Note that not all of these states are relevant to Amazon Neptune (for example, converting-to-vpc
does not apply to Amazon Neptune database instances).
We say that isAvailable()
indicates that an endpoint is likely available. There is no guarantee that the endpoint is actually available. For example, while an instance is upgrading
, there can be short periods when the endpoint is not available. During the upgrade process, instances are sometimes restarted, and while this is happening the instance endpoint will not be available, even though the state is upgrading
.
If your selection criteria returns an empty list of endpoints, you may want to fall back to using the cluster or reader endpoints. That way, your client will always have at least one endpoint, even if because of some issue outside of its control, it cannot currently connect to the database cluster. The following example falls back to the reader endpoint if there are no instances matching the selection criteria:
EndpointsSelector writerSelector = (cluster) -> {
List<NeptuneInstanceMetadata> endpoints = cluster.getInstances().stream()
.filter(i -> i.hasTag("workload", "analytics"))
.filter(i -> i.getInstanceType().equals("db.serverless"))
.filter(NeptuneInstanceMetadata::isAvailable)
.collect(Collectors.toList());
return endpoints.isEmpty() ?
new EndpointCollection(Collections.singletonList(cluster.getReaderEndpoint())) :
new EndpointCollection(endpoints);
};
The EndpointsType
enum provides implementations of EndpointsSelector
for some common use cases:
EndpointsType.All
– Returns all available instance (writer and read replicas) endpoints, or, if there are no available instance endpoints, the reader endpoint.EndpointsType.Primary
– Returns the primary (writer) instance endpoint if it is available, or the cluster endpoint if the primary instance endpoint is not available.EndpointsType.ReadReplicas
– Returns all available read replica instance endpoints, or, if there are no replica instance endpoints, the reader endpoint.EndpointsType.ClusterEndpoint
– Returns the cluster endpoint.EndpointsType.ReaderEndpoint
– Returns the reader endpoint.
The following example shows how to use the ReadReplicas
enum value to create and refresh a client:
EndpointsSelector selector = EndpointsType.ReadReplicas;
ClusterEndpointsRefreshAgent refreshAgent =
new ClusterEndpointsRefreshAgent("cluster-id");
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.addContactPoints(refreshAgent.getEndpoints(selector))
.create();
GremlinClient client = cluster.connect();
refreshAgent.startPollingNeptuneAPI(
RefreshTask.refresh(client, selector),
60,
TimeUnit.SECONDS);
When the Neptune Gremlin Client connects to an IAM auth enabled database it uses a DefaultAWSCredentialsProviderChain
to supply credentials to the signing process. You can modify this behavior in a couple of different ways.
To customize which profile is sourced from a local credentials file, use the iamProfile()
builder method:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.addContactPoint("reader-1")
.iamProfile("profile-name")
.create();
Or you can supply your own AwsCredentialsProvider
implementation:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.addContactPoint("reader-1")
.credentials(new ProfileCredentialsProvider("profile-name"))
.create();
The client includes a load balancer-aware handshake interceptor that will sign requests and adjust HTTP headers as necessary. However, you can replace this interceptor with your own implementation:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.addContactPoint("reader-1")
.handshakeInterceptor(r -> {
NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer("eu-west-1", new DefaultAWSCredentialsProviderChain());
sigV4Signer.signRequest(r);
return r;
})
.create();
If you have IAM database authentication enabled for your Neptune database, you must specify the Neptune service region when connecting from your client.
By default, the Neptune Gremlin Client will attempt to source this region parameter from several different places:
- The SERVICE_REGION environment variable.
- The SERVICE_REGION system property.
- The AWS_REGION Lambda environment variable (this assumes Neptune is in the same region as the Lambda function).
- Using the
Regions.getCurrentRegion()
method from the AWS SDK for Java (this assumes Neptune is in the current region).
You can also specify the service region when creating a GremlinCluster
using the NeptuneGremlinClusterBuilder.serviceRegion()
builder method:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.addContactPoint("reader-1")
.serviceRegion("eu-west-1")
.create();
As an alternative to connecting directly to Neptune and using a refresh agent to adjust the pool of connections to match the cluster's current topology, you can connect via a proxy, such as a load balancer. To connect via a proxy, use the proxyAddress()
and proxyPort()
builder methods. If the connection to the proxy does not require SSL, disable SSL:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableSsl(false)
.proxyAddress("http://my-proxy")
.proxyPort(80)
.serviceRegion("eu-west-1")
.create();
If your Neptune database does not have IAM auth enabled, you do not need to add any additional contact points.
If your Neptune database has IAM auth enabled, HTTP requests to the database must be signed using AWS Signature Version 4. Unless your proxy implements the signing process, you will have to sign requests in the client. The client must sign the request using the Neptune endpoint that will recieve the request, and include an HTTP Host
header whose value is <neptune-endpoint-dns:port>
.
Use the enableIamAuth()
and serviceRegion()
builder methods to sign requests.
When using a proxy, besides specifying the proxy address and port, you will also have to specify the ultimate Neptune endpoint to which a request will be directed. You do this using the addContactPoint()
builder method. The endpoint you specify here must match the Neptune endpoint to which the proxy forwards requests. There's no point using a refresh agent to supply a list of endpoints to the client unless the proxy can forward requests to the correct Neptune endpoint based on the value of the Host
header in the request sent to the proxy.
Here's an example of creating a client that connects through a proxy to an IAM auth-enabled Neptune database, where the proxy has been configured to forward requests to the database's cluster endpoint:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.serviceRegion("eu-west-1")
.addContactPoint("my-db.cluster-cktfjywp6uxn.eu-west-1.neptune.amazonaws.com") // cluster endpoint
.proxyAddress("my-proxy")
.proxyPort(80)
.create();
In some circumstances, you may need to remove the HTTP Host
header after signing the request, but before sending it to the proxy. For example, your proxy may add a Host
header to the request: if that's the case, you don't want the request when it arrives at the Neptune endpoint to contain two Host
headers:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.serviceRegion("eu-west-1")
.addContactPoint("my-db.cluster-cktfjywp6uxn.eu-west-1.neptune.amazonaws.com") // cluster endpoint
.proxyAddress("my-proxy")
.proxyPort(80)
.proxyRemoveHostHeader(true)
.serviceRegion("eu-west-1")
.create();
As of Neptune Gremlin Client version 2.0.1 you can use the Neptune Gremlin Client with a load balancer that supports host-based routing to balance requests across instances in your database cluster as selected by an EndpointsSelector
. This allows you to use custom endpoint selection logic in the client and still distribute requests across the instances in the selection set.
For this solution to work you must set up host-based routing in your load balancer.
The following steps describe how to create an AWS Application Load Balancer in your Neptune VPC and configure it for host-based routing to individual database endpoints. Note, you must configure the Preserve host headers attribute on your load balancer for this solution to work with an IAM-auth enabled database.
The solution presented here uses a fixed set of target groups, one per database instance endpoint. If your database cluster topology changes, you may need to remove some of those target groups and add others. If you don't keep the target groups up-to-date with the cluster topology, topology changes that trigger a refresh of the connection pools and connection selection logic in your Neptune Gremlin Client instances may lead to connection errors in the client.
Create a separate target group for each instance in your cluster.
Under Basic configuration:
- Select IP addresses as the target type.
- Give the target group a name that clearly indicates which instance it is associated with.
- Select the
HTTPS
protocol and port8182
(or whatever port your database cluster is using). - Select the Neptune VPC.
Under Health checks:
- The health check protocol should already be set to
HTTPS
. - Enter
/status
for the health check path. - If your Neptune database is secured with IAM database authentication, you'll need to update the Advanced health check settings: set the success codes to
200-403
.
Click Next.
Under IP addresses:
- Add the IPv4 address of the instance endpoint to the target group and click the Include as pending below button (the port should already be set to
8182
). You can find the private IP address of the instance endpoint usingdig +short <endpoint>
from your terminal.
Click Create target group.
Repeat this process for each instance in your database cluster.
Under Load balancer types choose Application Load Balancer and click Create.
Under Network mapping:
- Select the Neptune VPC.
- If you've configured the load balancer to be internet facing, you must choose at least two subnets in the Neptune VPC with routes to an internet gateway.
Under Security groups
- Select one or more security groups that allow access to your load balancer from your clients.
Under Listeners and routing
- Select the protocol clients will use to access the load balancer. If you select
HTTPS
you'll also have to configure the Secure listener settings. - At this point, you'll need to select one of your target groups to act as the target for the default action.
Click Create load balancer
Once you've created your load balancer, there's one further change you must make, which is to configure it to preserve host headers:
- On the load balancer's Attributes tab, click Edit.
- Under the Packet handling section, enable Preserve host header.
- Click Save changes.
You can now set up host-based routing rules for all your target groups.
- Open the Rules tab for your load balancer's listener and click the Manage rules button.
- Insert a rule for each of your target groups. Specify a
Host header... is
condition with the host name of the target group's instance endpoint, and aForward to...
action. - Consider modifying the default rule to return
404 - Not Found
for connection requests that cannot be routed to an existing target group.
As pointed out in the limitations section, this solution works with a fixed set of target groups, and therefore a fixed cluster topology. Because of this, there is little value in using a refresh agent to keep the client up-to-date with changes in the cluster topology. (If you automate the process of keeping the target groups and load balancer host-based routing rules up-to-date with changes in the cluster topology, then there certainly is value in using a refresh agent.) You can, however, supply multiple instance endpoints when building a client, so as to ensure that requests are distributed across those endpoints.
Here's an example of creating a client that connects through an ALB to an IAM auth-enabled Neptune database. The ALB accepts traffic on port 80, so we disable SSL (connections from the ALB to Neptune, however, do use SSL). The ALB has been configured to use host-based routing, so we can provide multiple instance endpoints to the builder:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.enableSsl(false)
.serviceRegion("eu-west-1")
.addContactPoints("replica-1-endpoint", "replica-2-endpoint", "replica-3-endpoint") // replica endpoints
.proxyAddress("alb-endpoint")
.proxyPort(80)
.create();
From version 2.0.3 onwards, the Neptune Gremlin Client can collect metrics about the attempts to connect to each endpoint, and the requests to each endpoint. Metrics are emitted each time the endpoints of a GremlinClient
are refreshed (using a refresh agent, for example).
To enable metrics collection, set enableMetrics()
to true
when building a cluster:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableMetrics(true)
... // other builder methods
.create();
You can also enable or disable metrics using a org.apache.tinkerpop.gremlin.driver.MetricsConfig.enableMetrics
environment variable or system property. If either the environment variable or system property is set to false
, then metrics will be disabled, irrespective of the builder configuration.
When you enable metrics, connection and request metrics will be written to the log at the INFO
logging level. Here's an example:
INFO MetricsLogger - Connection metrics: [duration: 15346ms, totalConnectionAttempts:38368, endpoints: [db-1.abcdefghijklm.eu-west-2.neptune.amazonaws.com [total: 19184, succeeded: 19184, unavailable: 0, closing: 0, dead: 0, npe: 0, nha: 0, minMillis: 0, maxMillis: 2, avgMillis: 0.12], db-2.abcdefghijklm.eu-west-2.neptune.amazonaws.com [total: 19184, succeeded: 19184, unavailable: 0, closing: 0, dead: 0, npe: 0, nha: 0, minMillis: 0, maxMillis: 1, avgMillis: 0.12]]]
INFO MetricsLogger - Request metrics: [duration: 15346ms, totalRequests:38368, endpoints: [db-1.abcdefghijklm.eu-west-2.neptune.amazonaws.com [count: 19184, ratePerSec: 1249.935, minMillis: 0, maxMillis: 16, avgMillis: 0.17], db-2.abcdefghijklm.eu-west-2.neptune.amazonaws.com [count: 19184, ratePerSec: 1249.935, minMillis: 0, maxMillis: 3, avgMillis: 0.17]] (dropped: 0, skipped: 0)]
Connection metrics capture the time taken to attempt to acquire a connection. Using the connection metrics, you can determine whether requests are using connections that are equally distributed across the database endpoints.
Request metrics capture the average latencies for requests to each of the endpoints. Using the request metrics, you can determine whether some endpoints are returning responses more slowly than others.
Metrics are only emitted when the endpoints of a GremlinClient
are refreshed. If you've enabled metrics when building a cluster, and you're using a refresh agent to keep your clients up-to-date with changes in your Neptune database's cluster topology, connection and request metrics will be emitted automatically. If you're not using a refresh agent (perhaps you've supplied a static list of endpoints when building a cluster, and therefore don't need to refresh them periodically), however, the client won't emit any metrics. You have two options to force metrics to be emitted periodically.
The simplest option is to 'monitor' your client using a refresh agent. In the following example, the monitor will cause the client to emit metrics every 15 seconds:
GremlinClient client = cluster.connect();
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.monitor(client, 15, TimeUnit.SECONDS);
// Application code
refreshAgent.close()
Remember to close the refresh agent when your application has finished with it.
If you don't want to use a refresh agent to monitor a client, you can call refresh()
directly on the client instance instead:
GremlinClient client = cluster.connect();
client.refresh(); // Call this periodically to emit metrics
With this second approach, you'll have to schedule the periodic call to refresh()
in your application code.
When you enable metrics, metrics will automatically be written to the log whenever the endpoints of a GremlinClient
are refreshed. Besides logging metrics, you also have the option of supplying a MetricsHandler
:
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableMetrics(true)
.addMetricsHandler((connectionMetrics, requestMetrics) -> {
// Your handler code
})
...
.create();
Using a MetricsHandler
you can, for example, publish metrics to Amazon CloudWatch.
Using a custom EndpointsSelector
you can select database instance endpoints based on the presence or absence of specific database instance tags and tag values. You can employ this feature to manage the visibility of instance endpoints to clients while you undertake some out-of-band operations. The examples below describe two operational patterns that use this feature.
(Note that pre version 2.0.1 of the Neptune Gremlin Client, calls from a refresh agent to the Neptune Management API would cache tags for individual instances. This meant that changes to a database instance's tags would not be reflected in subsequent calls to get the cluster topology. This behaviour has changed in 2.0.1: the tags associated with a database instance now remain current with that instance.)
Neptune's auto-scaling feature allows you to automatically adjust the number of Neptune replicas in a database cluster to meet your connectivity and workload requirements. Autoscaling can add replicas to your cluster on a sceduled basis or whenever the CPU utilization on existing instances exceeds a threshhold you specify in the autoscaling configuration.
New replicas, however, will start with a cold buffer cache. For some applications, the additional query latency caused by a cold cache can impact the user experience. In these circumstances, you may want to warm the cache before 'releasing' the replica to clients. In this way, you improve the performance of the first queries directed to the replica, but at the expense of the replica taking a little longer to become available to service queries.
Neptune's neptune_autoscaling_config
parameter allows you to specify one or more tags to be attached to newly provisioned autoscaled readers. You can use this feature in combination with a custom EndpointsSelector
to hide cold replicas from clients.
Here's an example neptune_autoscaling_config
JSON parameter that tags autoscaled readers with a "buffer-cache" tag with the value "cold":
"{
\"tags\": [
{ \"key\" : \"buffer-cache\", \"value\" : \"cold\" }
],
\"maintenanceWindow\" : \"wed:12:03-wed:12:33\",
\"dbInstanceClass\" : \"db.r5.xlarge\"
}"
You can use this in combination with the following custom EndpointsSelector
, which selects reader instances that do not have a "buffer-cache" tag with the value "cold":
EndpointsSelector selector = (cluster) ->
new EndpointCollection(
cluster.getInstances().stream()
.filter(NeptuneInstanceMetadata::isReader)
.filter(i -> !i.hasTag("buffer-cache", "cold")) // ignore readers with cold caches
.collect(Collectors.toList()));
For this solution to work, you now need a process that can identify a newly provisioned reader, warm it with a query workload, and then delete its "buffer-cache" tag. When the tag is deleted, the instance will become visible to the application's Neptune Gremlin Clients.
The process you use for detecting and warming readers is out of scope for this documentation. Note, however, that if you are already using an AWS Lambda proxy in your application to retrieve cluster topology information, then you already have a source you can poll to discover cold readers (i.e. readers that do have a "buffer-cache" tag with the value "cold").
The solution described elsewhere in this documentation for using an AWS Application Load Balancer with the Neptune Gremlin Client is limited insofar as it assumes a static cluster topology. If you want to use this solution with a refresh agent and EndpointsSelector
that adapts to changes in the cluster toplogy, then you will need to introduce a process that can update a load balancer's target groups and host-based routing rules whenever instances are added to or removed from the cluster. Further, you will want to ensure that clients can only select an instance endpoint once it has been registered with the load balancer.
One way to ensure that a Neptune Gremlin Client instance only uses endpoints that have been registered with an ALB, is to tag instances that have been registered with the load balancer, and to use a custom EndpointsSelector
that filters based on this tag. The following custom EndpointsSelector
only selects reader instances that have an "alb-status" tag with the value "registered":
EndpointsSelector selector = (cluster) ->
new EndpointCollection(
cluster.getInstances().stream()
.filter(NeptuneInstanceMetadata::isReader)
.filter(i -> i.hasTag("alb-status", "registered"))
.collect(Collectors.toList()));
For this solution to work, you now need a process that can identify whenever an instance is added to or removed from the cluster, and which updates the target groups and host-based routing rules accordingly. This process should then tag the database instance after it has been registered with the load balancer.
The process you use for detecting cluster changes and updating target groups and routing rules is out of scope for this documentation. Note, however, that if you are already using an AWS Lambda proxy in your application to retrieve cluster topology information, then you already have a source you can poll to discover new, unregistered instances (i.e. instances that do not have an "alb-status" tag with the value "registered").
Connection issues occur: database instances failover or restart during upgrades, and intermittent issues in the network can break connections. Write queries sometimes throw an error, because of a ConcurrentModificationException
or ConstraintViolationException
, or because the primary has failed over, and the instance to which the query is sent can no longer support writes, triggering a ReadOnlyViolationException
.
As a good practice you should consider implementing a backoff and retry strategy to handle these occurences and help your application recover and make forward progress.
- Connection issues – The TinkerPop Java driver, on which the Neptune Gremlin Client depends, automatically attempts to remediate connection issues. For example, if a host is considered unavailable, the driver will start a background task that tries to create a fresh connection. Because the driver handles reconnects automatically, if a connection issue occurs while your application is submitting a query, all your application has to do is backoff and retry the query.
ConcurrentModificationException
– The Neptune transaction semantics mean that concurrent transactions can sometimes fail with aConcurrentModificationException
. In these situations, an exponential backoff-and-retry mechanism can help resolve collisions.ConstraintViolationException
– This can sometimes occur if you attempt to create an edge between vertices that have only recently been committed. If one or other of the vertices is not yet visible to the current transaction, aConstraintViolationException
can occur. You can retry the query in the expectation that the necessary items will become visible.ReadOnlyViolationException
– This occurs if the primary has failed over to another instance. By backing off and retrying the query, you give the Neptune Gremlin Client the opportunity to refresh its endpoint information.
There are two places in your application where you should consider implementing a backoff-and-retry strategy:
- Creating a
GremlinCluster
andGremlinClient
– In many applications you need create aGremlinClient
only once, when the application starts. This client then lasts for the lifetime of the application. The situtaion is slightly different with serverless applications built using AWS Lambda functions. If you use Lambda functions to host your database access, each instance of a function will create its ownGremlinClient
. - Submitting a query – If you do use a backoff-and-retry strategy to handle write request issues, consider implementing idempotent queries for create and update requests.
The Neptune Gremlin Client includes a RetryUtils
utility class with a isRetryableException(Exception e)
method. The method encapsulates what we've learned running the Neptune Gremlin Client in long-running, high-throughput scenarios. The Result
of this method indicates whether an exception represents a connection issue or query exception that would allow for an operation to be retried:
try {
// Your code
} catch (Exception e){
RetryUtils.Result result = RetryUtils.isRetryableException(e);
boolean isRetryable = result.isRetryable();
}
The following example uses Retry4j (which is included with the Neptune Gremlin Client) for retries, and RetryUtils.isRetryableException()
for determining whether an exception represents a connection issue or query exception that would allow for an operation to be retried.
In this example, the GremlinCluster
, GremlinClient
, and GraphTraversalSource
are created inside a Callable
, which is executed by a Retry4j CallExecutor
. The Callable
returns a ClusterContext
– a simple container object provided by the Neptune Gremlin Client to hold the cluster, client and traversal source. The ClusterContext
implements Autocloseable
, and its close()
method closes both the client and the cluster.
The ClusterEndpointsRefreshAgent
is created outside the backoff-and-retry code. This allows a single ClusterEndpointsRefreshAgent
to be used to populate multiple clusters with endpoint information.
String lambdaProxy = "neptune-endpoints-info";
EndpointsSelector selector = EndpointsType.Primary;
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.lambdaProxy(lambdaProxy);
RetryConfig retryConfig = new RetryConfigBuilder()
.retryOnCustomExceptionLogic(
e -> RetryUtils.isRetryableException(e).isRetryable())
.withExponentialBackoff()
.withMaxNumberOfTries(5)
.withDelayBetweenTries(1, ChronoUnit.SECONDS)
.build();
CallExecutor executor = new CallExecutorBuilder()
.config(retryConfig)
.build();
Status<ClusterContext> status = executor.execute((Callable<ClusterContext>) () -> {
// Create cluster, client, and graph traversal source
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.addContactPoints(refreshAgent.getEndpoints(selector))
.create();
GremlinClient client = cluster.connect();
DriverRemoteConnection connection = DriverRemoteConnection.using(client);
GraphTraversalSource g = AnonymousTraversalSource.traversal().withRemote(connection);
return new ClusterContext(cluster, client, g);
});
ClusterContext clusterContext = status.getResult();
refreshAgent.startPollingNeptuneAPI(
RefreshTask.refresh(clusterContext.client(), selector),
15,
TimeUnit.SECONDS
);
// Use clusterContext.graphTraversalSource() for all queries (across threads)
// And then, when the app closes...
refreshAgent.close();
clusterContext.close();
The following example uses Retry4j (which is included with the Neptune Gremlin Client) for retries, and RetryUtils.isRetryableException()
for determing whether an exception represents a connection issue or query exception that would allow for an operation to be retried.
In this example, the query is constructed and submitted inside a Callable
, which is executed by a Retry4j CallExecutor
. The GraphTraversalSource
has been created prior to submitting any query (possibly using the example code shown above), and can be reused across queries and threads. You should use a separate CallExecutor
per thread, however.
Note that with each retry, the code creates a new Traversal
instance spawned from the GraphTraversalSource
. In other words, retry the entire query.
//GraphTraversalSource can be provided by ClusterContext – see previous example
GraphTraversalSource g = ...
RetryConfig retryConfig = new RetryConfigBuilder()
.retryOnCustomExceptionLogic(
e -> RetryUtils.isRetryableException(e).isRetryable())
.withExponentialBackoff()
.withMaxNumberOfTries(5)
.withDelayBetweenTries(1, ChronoUnit.SECONDS)
.build();
// You can reuse GraphTraversalSource and RetryConfig across threads,
// but use a CallExecutor per thread.
CallExecutor<Edge> executor = new CallExecutorBuilder<Edge>()
.config(retryConfig)
.build();
Callable<Edge> query = () -> g.addV().as("v1")
.addV().as("v2")
.addE("edge").from("v1").to("v2")
.next();
try {
Edge result = executor.execute(query).getResult();
// Do something with result
} catch (RetriesExhaustedException e) {
// Attempted backoff and retry, but this has failed
} catch (Exception e) {
// Handle unexpected exceptions
}
Whenever you submit a Gremlin request to a GremlinClient
, the client repeatedly tries to acquire a connection until it either succeeds, a ConnectionException
occurs, or a timeout threshold is exceeded.
The GremlinClient.chooseConnection()
method (which is invoked internally whenever the application submits a request via the client) respects the maxWaitForConnection
value specified when you create a GremlinCluster
. The following example creates a GremlinClient
whose chooseConnection()
method will throw a TimeoutException
after 10 seconds if it can't acquire a connection:
GremlinCluster cluster = GremlinClusterBuilder.build()
.maxWaitForConnection(10000)
...
.create();
GremlinClient client = cluster.connect();
If you don't specify a maxWaitForConnection
value, the GremlinCluster
uses a default value of 16,000
milliseconds.
Whenever a GremlinClient
attempts to acquire a connection, it iterates through the connection pools associated with the endpoints with which it has been configured, looking for the first healthy connection. By default, it waits 5 milliseconds between attempts to get a connection. You can configure this interval using the acquireConnectionBackoffMillis()
builder method.
If you have suspended the database endpoints (via a Lambda proxy), instead of throwing a TimeoutException
, the client will throw an EndpointsUnavailableException
after the maxWaitForConnection
interval.
Sometimes the reason the client is not able to acquire a connection is because it has a stale view of the cluster topology. In these circumstances, you may want the client to immediately refresh its view of the cluster topology, rather than wait for the refresh agent's next scheduled refresh.
The NeptuneGremlinClusterBuilder
provides an eagerRefreshWaitTimeMillis()
builder method that allows you to specify the maximum time the client will wait to acquire a connection before triggering an eager refresh of the endpoints. If you set eagerRefreshWaitTimeMillis
, ensure the value is less than maxWaitForConnection
. If you set eagerRefreshWaitTimeMillis
greater than maxWaitForConnection
, the client will simply throw a TimeoutException
after the maxWaitForConnection
interval.
By default, eagerRefreshWaitTimeMillis
is not configured.
If you do configure eagerRefreshWaitTimeMillis
, you must also supply an event handler using the onEagerRefresh()
builder method. The handler is an implementation of the OnEagerRefresh
interface. Its getEndpoints()
method is passed anEagerRefreshContext
(currently empty, but there to hold context information in future versions of the Neptune Gremlin Client) and must return an EndpointCollection
.
The following example shows how to create a GremlinClient
that will refresh its endpoints after 5 seconds have passed trying to acquire a connection:
EndpointsType selector = EndpointsType.ReadReplicas;
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.lambdaProxy(lambdaProxy);
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.addContactPoints(refreshAgent.getEndpoints(selector))
.eagerRefreshWaitTimeMillis(5000)
.onEagerRefresh(ctx -> refreshAgent.getEndpoints(selector))
.maxWaitForConnection(20000)
.create();
GremlinClient client = cluster.connect();
refreshAgent.startPollingNeptuneAPI(
RefreshTask.refresh(client, selector),
60,
TimeUnit.SECONDS);
The refresh agent here is configured to find the endpoint addresses of all database instances in a Neptune cluster that are currently acting as readers. The NeptuneGremlinClusterBuilder
creates a GremlinCluster
whose contact points (i.e. its endpoint addresses) are initialized via a first invocation of the refresh agent. But the builder also configures the client so that after 5 seconds have passed attempting to acquire a connection from its currently configured endpoint addresses, it refreshes those addresses, again using the agent. The client is also configured to timeout attempts to get a connection after 20 seconds. At the end of the example we also configure the refresh agent to refresh the GremlinClient
every minute, irrespective of any failures or successes.
With this setup, then, the GremlinClient
will refresh its endpoint addresses once every minute. It will also refresh its endpoints after 5 seonds have passed attempting to get a connection. If any attempt to get a connection takes longer than 20 seconds, the client will throw a TimeoutException
.
The eagerRefreshWaitTimeMillis
value is evaluated on a per-request basis. However, a GremlinClient
is capable of concurrently handling many requests. The client ensures that multiple eager refresh events cannot be triggered at the same time. Further, it imposes a backoff period between eager refresh events, so as to prevent the Neptune Management API or a Lambda proxy being overwhelmed with cluster topology requests. By default, this backoff period is 5 seconds. You can configure it using the eagerRefreshBackoffMillis()
builder method.
The Neptune Gremlin Client supports Gremlin transactions, as long as the transactions are issued against a writer endpoint:
EndpointsType selector = EndpointsType.ClusterEndpoint;
ClusterEndpointsRefreshAgent refreshAgent =
new ClusterEndpointsRefreshAgent("my-cluster-id");
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.addContactPoints(refreshAgent.getEndpoints(selector))
.create();
GremlinClient client = cluster.connect();
DriverRemoteConnection connection = DriverRemoteConnection.using(client);
Transaction tx = traversal().withRemote(connection).tx();
GraphTraversalSource g = tx.begin();
try {
String id1 = UUID.randomUUID().toString();
String id2 = UUID.randomUUID().toString();
g.addV("testNode").property(T.id, id1).iterate();
g.addV("testNode").property(T.id, id2).iterate();
g.addE("testEdge").from(__.V(id1)).to(__.V(id2)).iterate();
tx.commit();
} catch (Exception e) {
tx.rollback();
}
refreshAgent.close();
client.close();
cluster.close();
If you attempt to issue transactions against a read replica, the client returns an error:
org.apache.tinkerpop.gremlin.driver.exception.ResponseException: {"detailedMessage":"Gremlin update operation attempted on a read-only replica.","requestId":"05074a8e-c9ef-42b7-9f3e-1c388cd35ae0","code":"ReadOnlyViolationException"}
Neptune Gremlin Client 2.x.x includes the following breaking changes:
- The
EndpointsSelector.getEndpoints()
method now accepts aNeptuneClusterMetadata
object and returns anEndpointCollection
(version 1 returned a collection of String addresses). - The
GremlinClient.refreshEndpoints()
method now accepts anEndpointCollection
(version 1 accepted a collection of String addresses). NeptuneInstanceProperties
has been renamedNeptuneInstanceMetadata
.- You can no longer supply a list of selectors when creating a
ClusterEndpointsRefreshAgent
: selectors are applied lazily whenever the agent is triggered. - To supply an initial list of endpoints to the
NeptuneGremlinClusterBuilder.addContactPoints()
method, userefreshAgent.getEndpoints()
with an appropriate selector (version 1 usedgetAddresses()
). - The
ClusterEndpointsRefreshAgent.startPollingNeptuneAPI()
now accepts a collection ofRefreshTask
objects. Each task encapsulates a client and a selector. This way, you can update multiple clients, each with its own selection logic, using a single refresh agent. - The
NeptuneGremlinClusterBuilder
now usesproxyPort()
,proxyEndpoint()
andproxyRemoveHostHeader()
builder methods to configure connections through a proxy (e.g. a load balancer). These methods replaceloadBalancerPort()
,networkLoadBalancerEndpoint()
andapplicationLoadBalancerEndpoint()
. - The
NeptuneGremlinClusterBuilder.refreshOnErrorThreshold()
andNeptuneGremlinClusterBuilder.refreshOnErrorEventHandler()
builder methods have been replaced witheagerRefreshWaitTimeMillis()
andonEagerRefresh()
. Note thatrefreshOnErrorThreshold()
specified a count of consecutive failure attempts, whereaseagerRefreshWaitTimeMillis
specifies a wait time in milliseconds. When migrating, seteagerRefreshWaitTimeMillis
equal torefreshOnErrorThreshold * 5
.
The following example shows a solution built using Neptune Gremlin Client version 1.0.2:
String clusterId = "my-cluster-id";
EndpointsSelector selector = (clusterEndpoint, readerEndpoint, instances) ->
instances.stream()
.filter(NeptuneInstanceProperties::isReader)
.filter(i -> i.hasTag("workload", "analytics"))
.filter(NeptuneInstanceProperties::isAvailable)
.map(NeptuneInstanceProperties::getEndpoint)
.collect(Collectors.toList());
ClusterEndpointsRefreshAgent refreshAgent = new ClusterEndpointsRefreshAgent(
clusterId,
selector);
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.addContactPoints(refreshAgent.getAddresses().get(selector))
.refreshOnErrorThreshold(1000)
.refreshOnErrorEventHandler(() -> refreshAgent.getAddresses().get(selector))
.create();
GremlinClient client = cluster.connect();
refreshAgent.startPollingNeptuneAPI(
(OnNewAddresses) addresses -> client.refreshEndpoints(addresses.get(selector)),
60,
TimeUnit.SECONDS);
DriverRemoteConnection connection = DriverRemoteConnection.using(client);
GraphTraversalSource g = AnonymousTraversalSource.traversal().withRemote(connection);
for (int i = 0; i < 100; i++) {
List<Map<Object, Object>> results = g.V().limit(10).valueMap(true).toList();
for (Map<Object, Object> result : results) {
//Do nothing
}
}
refreshAgent.close();
client.close();
cluster.close();
The code below shows the solution updated to Neptune Gremlin Client version 2.x.x:
String lambdaProxyName = "neptune-endpoints-info";
// Selector accepts NeptuneClusterMetadata and returns EndpointCollection
// NeptuneInstanceProperties renamed to NeptuneInstanceMetadata
EndpointsSelector selector = (cluster) ->
new EndpointCollection(
cluster.getInstances().stream()
.filter(NeptuneInstanceMetadata::isReader)
.filter(i -> i.hasTag("workload", "analytics"))
.filter(NeptuneInstanceMetadata::isAvailable)
.collect(Collectors.toList())
);
// Prefer AWS Lambda proxy – no selector passed to factory method
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.lambdaProxy(lambdaProxyName);
// Use refreshAgent.getEndpoints(selector) to populate contact points
GremlinCluster cluster = NeptuneGremlinClusterBuilder.build()
.enableIamAuth(true)
.addContactPoints(refreshAgent.getEndpoints(selector))
.eagerRefreshWaitTimeMillis(5000) // 1000 * 5 millis
.onEagerRefresh(() -> refreshAgent.getEndpoints(selector))
.create();
GremlinClient client = cluster.connect();
// Accepts single RefreshTask or collection of RefreshTasks
refreshAgent.startPollingNeptuneAPI(
RefreshTask.refresh(client, selector),
60,
TimeUnit.SECONDS);
DriverRemoteConnection connection = DriverRemoteConnection.using(client);
GraphTraversalSource g = AnonymousTraversalSource.traversal().withRemote(connection);
for (int i = 0; i < 100; i++) {
List<Map<Object, Object>> results = g.V().limit(10).valueMap(true).toList();
for (Map<Object, Object> result : results) {
//Do nothing
}
}
refreshAgent.close();
client.close();
cluster.close();
The revised version above shows using an AWS Lambda proxy to refresh the endpoints. If you prefer to keep using the Neptune Management API directly, you can create a ClusterEndpointsRefreshAgent
like this:
ClusterEndpointsRefreshAgent refreshAgent =
ClusterEndpointsRefreshAgent.managementApi(clusterId);
The demo includes several sample scenarios. To run them, compile the gremlin-client-demo.jar
and then install it on an EC2 instance that allows connections to your Neptune cluster.
All of the demos use a ClusterEndpointsRefreshAgent
to get the database cluster topology and refresh the endpoints in a GremlinClient
. If you supply a --cluster-id
parameter, the refresh agent will query the Neptune Management API directly. If you supply a --lambda-proxy
name instead, the refresh agent will query an AWS Lambda proxy for endpoint information (you will need to install a Lambda proxy first).
If you use a --cluster-id
parameter, the identity under which you're running the demo must be authorized to perform rds:DescribeDBClusters
for your Neptune cluster, and rds:DescribeDBInstances
and rds:ListTagsForResource
for db:*
.
If you use a --lambda-proxy
parameter, the identity under which you're running the demo must be authorized to perform lambda:InvokeFunction
for the proxy Lambda function.
This demo shows how to use custom EndpointSelector
implementations to filter the cluster topology for writer and reader endpoints. It uses a single ClusterTopologyRefreshAgent
to refresh both a writer GremlinClient
and a reader GremlinClient
.
java -jar gremlin-client-demo.jar custom-selectors-demo \
--cluster-id <my-cluster-id>
This demo uses a ClusterTopologyRefreshAgent
to query for the current cluster topology every 15 seconds. The GremlinClient
adapts accordingly.
java -jar gremlin-client-demo.jar refresh-agent-demo \
--cluster-id <my-cluster-id>
With this demo, try triggering a failover in the cluster. After approx 15 seconds you should see a new endpoint added to the client, and the old endpoint removed. While the failover is occurring, you may see some queries fail: I've used a simple exception handler to log these errors.
This demo uses Retry4j and RetryUtils.isRetryableException()
to wrap the creation of a GremlinCluster
and GremlinClient
, and the submission of individual queries, with a backoff-and-retry strategy.
java -jar gremlin-client-demo.jar retry-demo \
--cluster-id <my-cluster-id>
With this demo, try triggering a failover in the cluster. After approx 15 seconds you should start seeing some exceptions and retry attempts in the console. Most of the operations should succeed after one or more retries. Some small number may fail after the maximum number of retries.
This demo demonstrates using the Neptune Gremlin client to issue transactions.
java -jar gremlin-client-demo.jar tx-demo \
--cluster-id <my-cluster-id>
See CONTRIBUTING for more information.
This project is licensed under the Apache-2.0 License.