Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add manual affinity in grpc-gcp #3170

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ List<Duration> execute(
int numClients,
int numOperations,
int waitMillis,
boolean useMultiplexedSession);
boolean useMultiplexedSession,
boolean enableGrpcGcpExtension);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public List<Duration> execute(
int numClients,
int numOperations,
int waitMillis,
boolean useMultiplexedSession) {
boolean useMultiplexedSession,
boolean enableGrpcGcpExtension) {
// setup open telemetry metrics and traces
// setup open telemetry metrics and traces
SpanExporter traceExporter = TraceExporter.createWithDefaultConfiguration();
Expand Down Expand Up @@ -97,13 +98,18 @@ public List<Duration> execute(
.build();
SpannerOptions.enableOpenTelemetryMetrics();
SpannerOptions.enableOpenTelemetryTraces();
SpannerOptions options =
SpannerOptions.Builder optionsBuilder =
SpannerOptions.newBuilder()
.setOpenTelemetry(openTelemetry)
.setProjectId(databaseId.getInstanceId().getProject())
.setSessionPoolOption(sessionPoolOptions)
.setHost(SERVER_URL)
.build();
.setHost(SERVER_URL);
if(enableGrpcGcpExtension) {
System.out.println("Using gRPC-GCP extension for channel management");
optionsBuilder.enableGrpcGcpExtension();
}

SpannerOptions options = optionsBuilder.build();
// Register query stats metric.
// This should be done once before start recording the data.
Meter meter = openTelemetry.getMeter("cloud.google.com/java");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private static CommandLine parseCommandLine(String[] args) throws ParseException
options.addOption("m", "multiplexed", true, "Use multiplexed sessions. Defaults to false.");
options.addOption("w", "wait", true, "Wait time in millis. Defaults to zero.");
options.addOption("name", true, "Name of this test run");
options.addOption("g", "grpcgcpextension", true, "enable gRPC-GCP extension. Defaults to false.");
CommandLineParser parser = new DefaultParser();
return parser.parse(options, args);
}
Expand All @@ -102,6 +103,8 @@ public void run(CommandLine commandLine) {
: TransactionType.READ_ONLY_SINGLE_USE;
boolean useMultiplexedSession =
commandLine.hasOption('m') ? Boolean.parseBoolean(commandLine.getOptionValue('m')) : false;
boolean enableGrpcGcpExtension =
commandLine.hasOption('g') ? Boolean.parseBoolean(commandLine.getOptionValue('g')) : false;

System.out.println();
System.out.println("Running benchmark with the following options");
Expand All @@ -111,14 +114,15 @@ public void run(CommandLine commandLine) {
System.out.printf("Transaction type: %s\n", transactionType);
System.out.printf("Use Multiplexed Sessions: %s\n", useMultiplexedSession);
System.out.printf("Wait between queries: %dms\n", waitMillis);
System.out.printf("Using gRPC-GCP extension: %s\n", enableGrpcGcpExtension);
Copy link

@psinghbay1 psinghbay1 Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing (auto-flush on \n) with sysout in benchmark apps may add extra IO overhead.


List<Duration> javaClientResults = null;
System.out.println();
System.out.println("Running benchmark for Java Client Library");
JavaClientRunner javaClientRunner = new JavaClientRunner(databaseId);
javaClientResults =
javaClientRunner.execute(
transactionType, clients, operations, waitMillis, useMultiplexedSession);
transactionType, clients, operations, waitMillis, useMultiplexedSession, enableGrpcGcpExtension);

printResults("Java Client Library", javaClientResults);
}
Expand Down
5 changes: 5 additions & 0 deletions google-cloud-spanner-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>grpc-gcp</artifactId>
<version>1.6.1</version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we centralize the versions or create separate dependencyManagement?
on upgrades, it may be hard to find and update all such version tags

</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
Expand Down
1 change: 1 addition & 0 deletions google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>grpc-gcp</artifactId>
<version>1.6.1</version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

</dependency>
<dependency>
<groupId>io.grpc</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.grpc.GcpManagedChannel;
import com.google.cloud.grpc.GcpManagedChannelBuilder;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
Expand Down Expand Up @@ -1950,7 +1951,16 @@ <ReqT, RespT> GrpcCallContext newCallContext(
boolean routeToLeader) {
GrpcCallContext context = GrpcCallContext.createDefault();
if (options != null) {
// Set channel affinity in GAX
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
// Set channel affinity in gRPC-GCP
context =
context.withCallOptions(
context
.getCallOptions()
.withOption(
GcpManagedChannel.AFFINITY_KEY,
Option.CHANNEL_HINT.getLong(options).toString()));
}
if (compressorName != null) {
// This sets the compressor for Client -> Server.
Expand Down
3 changes: 3 additions & 0 deletions samples/snippets/grpc_java_logging_config.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
handlers = java.util.logging.ConsoleHandler
java.util.logging.ConsoleHandler.level = ALL
com.google.cloud.grpc.GcpManagedChannel.level=FINEST
6 changes: 6 additions & 0 deletions samples/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>grpc-gcp</artifactId>
<version>1.6.1</version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have dependency management above

</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>6.69.1-SNAPSHOT</version>
</dependency>
<!-- [END spanner_install_with_bom] -->

Expand Down
45 changes: 45 additions & 0 deletions samples/snippets/src/main/java/com/example/spanner/Mux.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.example.spanner;

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import java.math.BigDecimal;
import java.util.logging.Level;
import java.util.logging.Logger;

class Mux {
public static void main(String[] args){
Logger.getLogger("com.google.cloud.grpc.GcpManagedChannel").setLevel(Level.FINEST);
queryWithNumericParameter();
}

static void queryWithNumericParameter() {
// TODO(developer): Replace these variables before running the sample.
String projectId = "span-cloud-testing";
String instanceId = "harsha-test-gcloud";
String databaseId = "database1";

try (Spanner spanner =
SpannerOptions.newBuilder().setProjectId(projectId).enableGrpcGcpExtension().build().getService()) {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
queryWithNumericParameter(client);
}
}

static void queryWithNumericParameter(DatabaseClient client) {
Statement statement =
Statement.newBuilder(
"SELECT SingerId, FirstName, LastName FROM Singers")
.build();
try (ResultSet resultSet = client.singleUse().executeQuery(statement)) {
while (resultSet.next()) {
System.out.printf(
"%d %s %s %n", resultSet.getLong("SingerId"), resultSet.getString("FirstName"), resultSet.getString("LastName"));
}
}
}
}
Loading