Skip to content

Commit

Permalink
Refactored the handling of gRPC stubs (#128)
Browse files Browse the repository at this point in the history
* Refactored the handling of gRPC stubs

* Fixed formatting
  • Loading branch information
merlimat authored Feb 28, 2024
1 parent b0877ad commit d29883f
Show file tree
Hide file tree
Showing 21 changed files with 208 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,21 @@
import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteOperation;
import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteRangeOperation;
import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation;
import io.streamnative.oxia.client.grpc.ChannelManager;
import io.streamnative.oxia.client.grpc.ChannelManager.StubFactory;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.metrics.BatchMetrics;
import io.streamnative.oxia.client.metrics.OperationMetrics;
import io.streamnative.oxia.client.notify.NotificationManager;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.client.shard.ShardManager;
import io.streamnative.oxia.proto.ListRequest;
import io.streamnative.oxia.proto.ListResponse;
import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub;
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -55,16 +52,15 @@
class AsyncOxiaClientImpl implements AsyncOxiaClient {

static @NonNull CompletableFuture<AsyncOxiaClient> newInstance(@NonNull ClientConfig config) {
var channelManager = new ChannelManager();
var reactorStubFactory = channelManager.getReactorStubFactory();
Supplier<ReactorOxiaClientStub> stubFactory =
() -> reactorStubFactory.apply(config.serviceAddress());
var shardManager = new ShardManager(stubFactory, config.metrics(), config.namespace());
var notificationManager =
new NotificationManager(reactorStubFactory, shardManager, config.metrics());
var stubManager = new OxiaStubManager();

var serviceAddrStub = stubManager.getStub(config.serviceAddress());
var shardManager = new ShardManager(serviceAddrStub, config.metrics(), config.namespace());
var notificationManager = new NotificationManager(stubManager, shardManager, config.metrics());

Function<Long, String> leaderFn = shardManager::leader;
var stubByShardId = leaderFn.andThen(reactorStubFactory);
var stubByShardId = leaderFn.andThen(stubManager::getStub);

shardManager.addCallback(notificationManager);
var batchMetrics = BatchMetrics.create(Clock.systemUTC(), config.metrics());
var readBatchManager = BatchManager.newReadBatchManager(config, stubByShardId, batchMetrics);
Expand All @@ -76,25 +72,23 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {

var client =
new AsyncOxiaClientImpl(
channelManager,
stubManager,
shardManager,
notificationManager,
readBatchManager,
writeBatchManager,
sessionManager,
reactorStubFactory,
operationMetrics);

return shardManager.start().thenApply(v -> client);
}

private final @NonNull ChannelManager channelManager;
private final @NonNull OxiaStubManager stubManager;
private final @NonNull ShardManager shardManager;
private final @NonNull NotificationManager notificationManager;
private final @NonNull BatchManager readBatchManager;
private final @NonNull BatchManager writeBatchManager;
private final @NonNull SessionManager sessionManager;
private final @NonNull StubFactory<ReactorOxiaClientStub> reactorStubFactory;
private final @NonNull OperationMetrics metrics;
private final AtomicLong sequence = new AtomicLong();
private volatile boolean closed;
Expand Down Expand Up @@ -222,14 +216,14 @@ public void notifications(@NonNull Consumer<Notification> notificationCallback)
private @NonNull Flux<String> list(
long shardId, @NonNull String startKeyInclusive, @NonNull String endKeyExclusive) {
var leader = shardManager.leader(shardId);
var stub = reactorStubFactory.apply(leader);
var stub = stubManager.getStub(leader);
var request =
ListRequest.newBuilder()
.setShardId(shardId)
.setStartInclusive(startKeyInclusive)
.setEndExclusive(endKeyExclusive)
.build();
return stub.list(request).flatMapIterable(ListResponse::getKeysList);
return stub.reactor().list(request).flatMapIterable(ListResponse::getKeysList);
}

@Override
Expand All @@ -243,7 +237,7 @@ public void close() throws Exception {
sessionManager.close();
notificationManager.close();
shardManager.close();
channelManager.close();
stubManager.close();
}

private void checkIfClosed() {
Expand Down
19 changes: 10 additions & 9 deletions client/src/main/java/io/streamnative/oxia/client/batch/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteRangeOperation;
import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation;
import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation.SessionInfo;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.BatchMetrics;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.proto.GetResponse;
import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub;
import io.streamnative.oxia.proto.ReadRequest;
import io.streamnative.oxia.proto.WriteRequest;
import java.time.Clock;
Expand Down Expand Up @@ -70,7 +70,7 @@ final class WriteBatch extends BatchBase implements Batch {
private long bytes;

WriteBatch(
@NonNull Function<Long, ReactorOxiaClientStub> stubByShardId,
@NonNull Function<Long, OxiaStub> stubByShardId,
@NonNull SessionManager sessionManager,
@NonNull String clientIdentifier,
long shardId,
Expand Down Expand Up @@ -125,7 +125,7 @@ public void complete() {
sample.startExec();
Throwable t = null;
try {
var response = getStub().write(toProto()).block();
var response = getStub().reactor().write(toProto()).block();
for (var i = 0; i < deletes.size(); i++) {
deletes.get(i).complete(response.getDeletes(i));
}
Expand Down Expand Up @@ -180,7 +180,7 @@ public void add(@NonNull Operation<?> operation) {
}

ReadBatch(
@NonNull Function<Long, ReactorOxiaClientStub> stubByShardId,
@NonNull Function<Long, OxiaStub> stubByShardId,
long shardId,
long createTime,
BatchMetrics.Sample sample) {
Expand All @@ -200,6 +200,7 @@ public void complete() {
try {
var responses =
getStub()
.reactor()
.read(toProto())
.flatMapSequential(response -> Flux.fromIterable(response.getGetsList()))
.doOnNext(r -> bytes.add(r.getValue().size()));
Expand Down Expand Up @@ -227,19 +228,19 @@ ReadRequest toProto() {

@RequiredArgsConstructor(access = PRIVATE)
abstract class BatchBase {
private final @NonNull Function<Long, ReactorOxiaClientStub> stubByShardId;
private final @NonNull Function<Long, OxiaStub> stubByShardId;
@Getter private final long shardId;
@Getter private final long startTime;
final BatchMetrics.Sample sample;

protected ReactorOxiaClientStub getStub() {
protected OxiaStub getStub() {
return stubByShardId.apply(shardId);
}
}

@RequiredArgsConstructor(access = PACKAGE)
abstract class BatchFactory implements Function<Long, Batch> {
final @NonNull Function<Long, ReactorOxiaClientStub> stubByShardId;
final @NonNull Function<Long, OxiaStub> stubByShardId;

@Getter(PACKAGE)
private final @NonNull ClientConfig config;
Expand All @@ -254,7 +255,7 @@ class WriteBatchFactory extends BatchFactory {
final @NonNull SessionManager sessionManager;

public WriteBatchFactory(
@NonNull Function<Long, ReactorOxiaClientStub> stubByShardId,
@NonNull Function<Long, OxiaStub> stubByShardId,
@NonNull SessionManager sessionManager,
@NonNull ClientConfig config,
@NonNull Clock clock,
Expand All @@ -278,7 +279,7 @@ public WriteBatchFactory(

class ReadBatchFactory extends BatchFactory {
public ReadBatchFactory(
@NonNull Function<Long, ReactorOxiaClientStub> stubByShardId,
@NonNull Function<Long, OxiaStub> stubByShardId,
@NonNull ClientConfig config,
@NonNull Clock clock,
@NonNull BatchMetrics metrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import static lombok.AccessLevel.PACKAGE;

import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.BatchMetrics;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub;
import java.time.Clock;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -99,15 +99,15 @@ public static class ShutdownException extends Exception {

public static @NonNull BatchManager newReadBatchManager(
@NonNull ClientConfig config,
@NonNull Function<Long, ReactorOxiaClientStub> stubByShardId,
@NonNull Function<Long, OxiaStub> stubByShardId,
BatchMetrics metrics) {
return new BatchManager(
Batcher.newReadBatcherFactory(config, stubByShardId, Clock.systemUTC(), metrics));
}

public static @NonNull BatchManager newWriteBatchManager(
@NonNull ClientConfig config,
@NonNull Function<Long, ReactorOxiaClientStub> stubByShardId,
@NonNull Function<Long, OxiaStub> stubByShardId,
@NonNull SessionManager sessionManager,
BatchMetrics metrics) {
return new BatchManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.batch.Operation.CloseOperation;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.BatchMetrics;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub;
import java.time.Clock;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
Expand Down Expand Up @@ -112,7 +112,7 @@ public void run() {

static @NonNull Function<Long, Batcher> newReadBatcherFactory(
@NonNull ClientConfig config,
@NonNull Function<Long, ReactorOxiaClientStub> stubByShardId,
@NonNull Function<Long, OxiaStub> stubByShardId,
Clock clock,
BatchMetrics metrics) {
return s ->
Expand All @@ -121,7 +121,7 @@ public void run() {

static @NonNull Function<Long, Batcher> newWriteBatcherFactory(
@NonNull ClientConfig config,
@NonNull Function<Long, ReactorOxiaClientStub> stubByShardId,
@NonNull Function<Long, OxiaStub> stubByShardId,
@NonNull SessionManager sessionManager,
Clock clock,
BatchMetrics metrics) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@

import static lombok.AccessLevel.PROTECTED;

import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import reactor.core.Disposable;

@RequiredArgsConstructor(access = PROTECTED)
public abstract class GrpcResponseStream implements AutoCloseable {
private final @NonNull Supplier<ReactorOxiaClientStub> stubFactory;
private final @NonNull OxiaStub stub;

private volatile Disposable disposable;

Expand All @@ -36,12 +34,12 @@ public abstract class GrpcResponseStream implements AutoCloseable {
if (disposable != null) {
throw new IllegalStateException("Already started");
}
return start(stubFactory.get(), disposable -> this.disposable = disposable);
return start(stub, disposable -> this.disposable = disposable);
}
}

protected abstract @NonNull CompletableFuture<Void> start(
@NonNull ReactorOxiaClientStub stub, @NonNull Consumer<Disposable> consumer);
@NonNull OxiaStub stub, @NonNull Consumer<Disposable> consumer);

@Override
public void close() {
Expand Down
Loading

0 comments on commit d29883f

Please sign in to comment.