diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index 357450c1..b7262318 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -28,8 +28,7 @@ import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteRangeOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation; -import io.streamnative.oxia.client.grpc.ChannelManager; -import io.streamnative.oxia.client.grpc.ChannelManager.StubFactory; +import io.streamnative.oxia.client.grpc.OxiaStubManager; import io.streamnative.oxia.client.metrics.BatchMetrics; import io.streamnative.oxia.client.metrics.OperationMetrics; import io.streamnative.oxia.client.notify.NotificationManager; @@ -37,7 +36,6 @@ import io.streamnative.oxia.client.shard.ShardManager; import io.streamnative.oxia.proto.ListRequest; import io.streamnative.oxia.proto.ListResponse; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import java.time.Clock; import java.util.List; import java.util.Objects; @@ -45,7 +43,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; import lombok.AccessLevel; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -55,16 +52,15 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { static @NonNull CompletableFuture newInstance(@NonNull ClientConfig config) { - var channelManager = new ChannelManager(); - var reactorStubFactory = channelManager.getReactorStubFactory(); - Supplier stubFactory = - () -> reactorStubFactory.apply(config.serviceAddress()); - var shardManager = new ShardManager(stubFactory, config.metrics(), config.namespace()); - var notificationManager = - new NotificationManager(reactorStubFactory, shardManager, config.metrics()); + var stubManager = new OxiaStubManager(); + + var serviceAddrStub = stubManager.getStub(config.serviceAddress()); + var shardManager = new ShardManager(serviceAddrStub, config.metrics(), config.namespace()); + var notificationManager = new NotificationManager(stubManager, shardManager, config.metrics()); Function leaderFn = shardManager::leader; - var stubByShardId = leaderFn.andThen(reactorStubFactory); + var stubByShardId = leaderFn.andThen(stubManager::getStub); + shardManager.addCallback(notificationManager); var batchMetrics = BatchMetrics.create(Clock.systemUTC(), config.metrics()); var readBatchManager = BatchManager.newReadBatchManager(config, stubByShardId, batchMetrics); @@ -76,25 +72,23 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { var client = new AsyncOxiaClientImpl( - channelManager, + stubManager, shardManager, notificationManager, readBatchManager, writeBatchManager, sessionManager, - reactorStubFactory, operationMetrics); return shardManager.start().thenApply(v -> client); } - private final @NonNull ChannelManager channelManager; + private final @NonNull OxiaStubManager stubManager; private final @NonNull ShardManager shardManager; private final @NonNull NotificationManager notificationManager; private final @NonNull BatchManager readBatchManager; private final @NonNull BatchManager writeBatchManager; private final @NonNull SessionManager sessionManager; - private final @NonNull StubFactory reactorStubFactory; private final @NonNull OperationMetrics metrics; private final AtomicLong sequence = new AtomicLong(); private volatile boolean closed; @@ -222,14 +216,14 @@ public void notifications(@NonNull Consumer notificationCallback) private @NonNull Flux list( long shardId, @NonNull String startKeyInclusive, @NonNull String endKeyExclusive) { var leader = shardManager.leader(shardId); - var stub = reactorStubFactory.apply(leader); + var stub = stubManager.getStub(leader); var request = ListRequest.newBuilder() .setShardId(shardId) .setStartInclusive(startKeyInclusive) .setEndExclusive(endKeyExclusive) .build(); - return stub.list(request).flatMapIterable(ListResponse::getKeysList); + return stub.reactor().list(request).flatMapIterable(ListResponse::getKeysList); } @Override @@ -243,7 +237,7 @@ public void close() throws Exception { sessionManager.close(); notificationManager.close(); shardManager.close(); - channelManager.close(); + stubManager.close(); } private void checkIfClosed() { diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/Batch.java b/client/src/main/java/io/streamnative/oxia/client/batch/Batch.java index a0e8c59b..74ee6fa8 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/Batch.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/Batch.java @@ -27,10 +27,10 @@ import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteRangeOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation.SessionInfo; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.BatchMetrics; import io.streamnative.oxia.client.session.SessionManager; import io.streamnative.oxia.proto.GetResponse; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import io.streamnative.oxia.proto.ReadRequest; import io.streamnative.oxia.proto.WriteRequest; import java.time.Clock; @@ -70,7 +70,7 @@ final class WriteBatch extends BatchBase implements Batch { private long bytes; WriteBatch( - @NonNull Function stubByShardId, + @NonNull Function stubByShardId, @NonNull SessionManager sessionManager, @NonNull String clientIdentifier, long shardId, @@ -125,7 +125,7 @@ public void complete() { sample.startExec(); Throwable t = null; try { - var response = getStub().write(toProto()).block(); + var response = getStub().reactor().write(toProto()).block(); for (var i = 0; i < deletes.size(); i++) { deletes.get(i).complete(response.getDeletes(i)); } @@ -180,7 +180,7 @@ public void add(@NonNull Operation operation) { } ReadBatch( - @NonNull Function stubByShardId, + @NonNull Function stubByShardId, long shardId, long createTime, BatchMetrics.Sample sample) { @@ -200,6 +200,7 @@ public void complete() { try { var responses = getStub() + .reactor() .read(toProto()) .flatMapSequential(response -> Flux.fromIterable(response.getGetsList())) .doOnNext(r -> bytes.add(r.getValue().size())); @@ -227,19 +228,19 @@ ReadRequest toProto() { @RequiredArgsConstructor(access = PRIVATE) abstract class BatchBase { - private final @NonNull Function stubByShardId; + private final @NonNull Function stubByShardId; @Getter private final long shardId; @Getter private final long startTime; final BatchMetrics.Sample sample; - protected ReactorOxiaClientStub getStub() { + protected OxiaStub getStub() { return stubByShardId.apply(shardId); } } @RequiredArgsConstructor(access = PACKAGE) abstract class BatchFactory implements Function { - final @NonNull Function stubByShardId; + final @NonNull Function stubByShardId; @Getter(PACKAGE) private final @NonNull ClientConfig config; @@ -254,7 +255,7 @@ class WriteBatchFactory extends BatchFactory { final @NonNull SessionManager sessionManager; public WriteBatchFactory( - @NonNull Function stubByShardId, + @NonNull Function stubByShardId, @NonNull SessionManager sessionManager, @NonNull ClientConfig config, @NonNull Clock clock, @@ -278,7 +279,7 @@ public WriteBatchFactory( class ReadBatchFactory extends BatchFactory { public ReadBatchFactory( - @NonNull Function stubByShardId, + @NonNull Function stubByShardId, @NonNull ClientConfig config, @NonNull Clock clock, @NonNull BatchMetrics metrics) { diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/BatchManager.java b/client/src/main/java/io/streamnative/oxia/client/batch/BatchManager.java index de80cb4c..f526e060 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/BatchManager.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/BatchManager.java @@ -20,9 +20,9 @@ import static lombok.AccessLevel.PACKAGE; import io.streamnative.oxia.client.ClientConfig; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.BatchMetrics; import io.streamnative.oxia.client.session.SessionManager; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import java.time.Clock; import java.util.List; import java.util.Objects; @@ -99,7 +99,7 @@ public static class ShutdownException extends Exception { public static @NonNull BatchManager newReadBatchManager( @NonNull ClientConfig config, - @NonNull Function stubByShardId, + @NonNull Function stubByShardId, BatchMetrics metrics) { return new BatchManager( Batcher.newReadBatcherFactory(config, stubByShardId, Clock.systemUTC(), metrics)); @@ -107,7 +107,7 @@ public static class ShutdownException extends Exception { public static @NonNull BatchManager newWriteBatchManager( @NonNull ClientConfig config, - @NonNull Function stubByShardId, + @NonNull Function stubByShardId, @NonNull SessionManager sessionManager, BatchMetrics metrics) { return new BatchManager( diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/Batcher.java b/client/src/main/java/io/streamnative/oxia/client/batch/Batcher.java index 082c18f0..f8e75724 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/Batcher.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/Batcher.java @@ -20,9 +20,9 @@ import io.streamnative.oxia.client.ClientConfig; import io.streamnative.oxia.client.batch.Operation.CloseOperation; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.BatchMetrics; import io.streamnative.oxia.client.session.SessionManager; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import java.time.Clock; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -112,7 +112,7 @@ public void run() { static @NonNull Function newReadBatcherFactory( @NonNull ClientConfig config, - @NonNull Function stubByShardId, + @NonNull Function stubByShardId, Clock clock, BatchMetrics metrics) { return s -> @@ -121,7 +121,7 @@ public void run() { static @NonNull Function newWriteBatcherFactory( @NonNull ClientConfig config, - @NonNull Function stubByShardId, + @NonNull Function stubByShardId, @NonNull SessionManager sessionManager, Clock clock, BatchMetrics metrics) { diff --git a/client/src/main/java/io/streamnative/oxia/client/grpc/ChannelManager.java b/client/src/main/java/io/streamnative/oxia/client/grpc/ChannelManager.java deleted file mode 100644 index 89803dba..00000000 --- a/client/src/main/java/io/streamnative/oxia/client/grpc/ChannelManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright © 2022-2024 StreamNative Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.oxia.client.grpc; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import io.grpc.Channel; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Function; -import lombok.Getter; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; - -@RequiredArgsConstructor -public class ChannelManager implements Function, AutoCloseable { - private final ConcurrentMap channels = new ConcurrentHashMap<>(); - @Getter private final @NonNull StubFactory reactorStubFactory; - private volatile boolean closed; - - public ChannelManager() { - reactorStubFactory = reactorStubFactory(this); - } - - @Override - public void close() throws Exception { - if (closed) { - return; - } - closed = true; - channels.values().forEach(this::shutdown); - } - - private void shutdown(ManagedChannel channel) { - channel.shutdown(); - try { - if (!channel.awaitTermination(100, MILLISECONDS)) { - channel.shutdownNow(); - } - } catch (InterruptedException e) { - channel.shutdownNow(); - } - } - - @Override - public @NonNull Channel apply(@NonNull String address) { - if (closed) { - throw new IllegalStateException("Channel manager is closed"); - } - var serviceAddress = new ServiceAddress(address); - return channels.computeIfAbsent( - address, - a -> - ManagedChannelBuilder.forAddress(serviceAddress.host(), serviceAddress.port()) - .usePlaintext() - .build()); - } - - static StubFactory reactorStubFactory( - @NonNull ChannelManager channelManager) { - return new StubFactory<>(channelManager, ReactorOxiaClientGrpc::newReactorStub); - } - - public static class StubFactory implements Function { - private final Function addressToStubFn; - - StubFactory( - @NonNull ChannelManager channelManager, @NonNull Function channelToStubFn) { - addressToStubFn = channelManager.andThen(channelToStubFn); - } - - private final ConcurrentMap stubs = new ConcurrentHashMap<>(); - - @Override - public T apply(String address) { - return stubs.computeIfAbsent(address, addressToStubFn); - } - } -} diff --git a/client/src/main/java/io/streamnative/oxia/client/grpc/GrpcResponseStream.java b/client/src/main/java/io/streamnative/oxia/client/grpc/GrpcResponseStream.java index ac72d9b5..1ad38ae5 100644 --- a/client/src/main/java/io/streamnative/oxia/client/grpc/GrpcResponseStream.java +++ b/client/src/main/java/io/streamnative/oxia/client/grpc/GrpcResponseStream.java @@ -17,17 +17,15 @@ import static lombok.AccessLevel.PROTECTED; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; -import java.util.function.Supplier; import lombok.NonNull; import lombok.RequiredArgsConstructor; import reactor.core.Disposable; @RequiredArgsConstructor(access = PROTECTED) public abstract class GrpcResponseStream implements AutoCloseable { - private final @NonNull Supplier stubFactory; + private final @NonNull OxiaStub stub; private volatile Disposable disposable; @@ -36,12 +34,12 @@ public abstract class GrpcResponseStream implements AutoCloseable { if (disposable != null) { throw new IllegalStateException("Already started"); } - return start(stubFactory.get(), disposable -> this.disposable = disposable); + return start(stub, disposable -> this.disposable = disposable); } } protected abstract @NonNull CompletableFuture start( - @NonNull ReactorOxiaClientStub stub, @NonNull Consumer consumer); + @NonNull OxiaStub stub, @NonNull Consumer consumer); @Override public void close() { diff --git a/client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStub.java b/client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStub.java new file mode 100644 index 00000000..28cafbc8 --- /dev/null +++ b/client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStub.java @@ -0,0 +1,63 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.grpc; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.streamnative.oxia.proto.OxiaClientGrpc; +import io.streamnative.oxia.proto.ReactorOxiaClientGrpc; +import lombok.NonNull; + +public class OxiaStub implements AutoCloseable { + private final ManagedChannel channel; + + private final @NonNull ReactorOxiaClientGrpc.ReactorOxiaClientStub reactorStub; + + private final @NonNull OxiaClientGrpc.OxiaClientBlockingStub blockingStub; + + public OxiaStub(String address) { + this(Grpc.newChannelBuilder(address, InsecureChannelCredentials.create()).build()); + } + + public OxiaStub(ManagedChannel channel) { + this.channel = channel; + this.reactorStub = ReactorOxiaClientGrpc.newReactorStub(channel); + this.blockingStub = OxiaClientGrpc.newBlockingStub(channel); + } + + public ReactorOxiaClientGrpc.ReactorOxiaClientStub reactor() { + return reactorStub; + } + + public OxiaClientGrpc.OxiaClientBlockingStub blocking() { + return blockingStub; + } + + @Override + public void close() throws Exception { + channel.shutdown(); + try { + if (!channel.awaitTermination(100, MILLISECONDS)) { + channel.shutdownNow(); + } + } catch (InterruptedException e) { + channel.shutdownNow(); + } + } +} diff --git a/client/src/main/java/io/streamnative/oxia/client/grpc/ServiceAddress.java b/client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStubManager.java similarity index 59% rename from client/src/main/java/io/streamnative/oxia/client/grpc/ServiceAddress.java rename to client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStubManager.java index 47bf200e..148073d1 100644 --- a/client/src/main/java/io/streamnative/oxia/client/grpc/ServiceAddress.java +++ b/client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStubManager.java @@ -15,17 +15,20 @@ */ package io.streamnative.oxia.client.grpc; -import lombok.NonNull; -import lombok.SneakyThrows; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -record ServiceAddress(@NonNull String serviceAddress) { +public class OxiaStubManager implements AutoCloseable { + private final Map stubs = new ConcurrentHashMap<>(); - public @NonNull String host() { - return serviceAddress.split(":")[0]; + public OxiaStub getStub(String address) { + return stubs.computeIfAbsent(address, OxiaStub::new); } - @SneakyThrows - public int port() { - return Integer.parseInt(serviceAddress.split(":")[1]); + @Override + public void close() throws Exception { + for (OxiaStub stub : stubs.values()) { + stub.close(); + } } } diff --git a/client/src/main/java/io/streamnative/oxia/client/notify/NotificationManager.java b/client/src/main/java/io/streamnative/oxia/client/notify/NotificationManager.java index fff5bf94..e0f30448 100644 --- a/client/src/main/java/io/streamnative/oxia/client/notify/NotificationManager.java +++ b/client/src/main/java/io/streamnative/oxia/client/notify/NotificationManager.java @@ -19,14 +19,13 @@ import io.streamnative.oxia.client.CompositeConsumer; import io.streamnative.oxia.client.api.Notification; -import io.streamnative.oxia.client.grpc.ChannelManager.StubFactory; import io.streamnative.oxia.client.grpc.GrpcResponseStream; +import io.streamnative.oxia.client.grpc.OxiaStubManager; import io.streamnative.oxia.client.metrics.NotificationMetrics; import io.streamnative.oxia.client.metrics.api.Metrics; import io.streamnative.oxia.client.shard.ShardManager; import io.streamnative.oxia.client.shard.ShardManager.ShardAssignmentChange.Added; import io.streamnative.oxia.client.shard.ShardManager.ShardAssignmentChanges; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -50,12 +49,11 @@ public class NotificationManager implements AutoCloseable, Consumer reactorStubFactory, + @NonNull OxiaStubManager stubManager, @NonNull ShardManager shardManager, @NonNull Metrics metrics) { this.compositeCallback = new CompositeConsumer<>(); - this.recieverFactory = - new ShardNotificationReceiver.Factory(reactorStubFactory, compositeCallback); + this.recieverFactory = new ShardNotificationReceiver.Factory(stubManager, compositeCallback); this.shardManager = shardManager; this.metrics = NotificationMetrics.create(metrics); } diff --git a/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java b/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java index 364ecc64..e9a04d4e 100644 --- a/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java +++ b/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java @@ -22,18 +22,17 @@ import io.streamnative.oxia.client.api.Notification; import io.streamnative.oxia.client.api.Notification.KeyCreated; import io.streamnative.oxia.client.api.Notification.KeyDeleted; -import io.streamnative.oxia.client.grpc.ChannelManager.StubFactory; import io.streamnative.oxia.client.grpc.GrpcResponseStream; +import io.streamnative.oxia.client.grpc.OxiaStub; +import io.streamnative.oxia.client.grpc.OxiaStubManager; import io.streamnative.oxia.client.metrics.NotificationMetrics; import io.streamnative.oxia.proto.NotificationBatch; import io.streamnative.oxia.proto.NotificationsRequest; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import java.time.Duration; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; -import java.util.function.Supplier; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -59,11 +58,11 @@ public class ShardNotificationReceiver extends GrpcResponseStream { private long offset; ShardNotificationReceiver( - @NonNull Supplier stubFactory, + @NonNull OxiaStub stub, long shardId, @NonNull Consumer callback, @NonNull NotificationMetrics metrics) { - super(stubFactory); + super(stub); this.shardId = shardId; this.callback = callback; this.metrics = metrics; @@ -88,7 +87,7 @@ public void close() { @Override protected @NonNull CompletableFuture start( - @NonNull ReactorOxiaClientStub stub, @NonNull Consumer consumer) { + @NonNull OxiaStub stub, @NonNull Consumer consumer) { var request = NotificationsRequest.newBuilder().setShardId(shardId); startingOffset.ifPresent(o -> request.setStartOffsetExclusive(o)); // TODO filter non-retriables? @@ -100,7 +99,7 @@ public void close() { var threadName = String.format("shard-%s-notifications", shardId); scheduler = Schedulers.newSingle(threadName); var disposable = - Flux.defer(() -> stub.getNotifications(request.build())) + Flux.defer(() -> stub.reactor().getNotifications(request.build())) .doOnError(t -> log.warn("Error receiving notifications for shard {}", shardId, t)) .doOnEach(metrics::recordBatch) .retryWhen(retrySpec) @@ -135,14 +134,13 @@ public long getOffset() { @RequiredArgsConstructor(access = PACKAGE) static class Factory { - private final @NonNull StubFactory reactorStubFactory; + private final @NonNull OxiaStubManager stubManager; private final @NonNull Consumer callback; @NonNull ShardNotificationReceiver newReceiver( long shardId, @NonNull String leader, @NonNull NotificationMetrics metrics) { - return new ShardNotificationReceiver( - () -> reactorStubFactory.apply(leader), shardId, callback, metrics); + return new ShardNotificationReceiver(stubManager.getStub(leader), shardId, callback, metrics); } } } diff --git a/client/src/main/java/io/streamnative/oxia/client/session/Session.java b/client/src/main/java/io/streamnative/oxia/client/session/Session.java index 2cc5680c..22e6368c 100644 --- a/client/src/main/java/io/streamnative/oxia/client/session/Session.java +++ b/client/src/main/java/io/streamnative/oxia/client/session/Session.java @@ -20,10 +20,10 @@ import com.google.common.annotations.VisibleForTesting; import io.streamnative.oxia.client.ClientConfig; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.SessionMetrics; import io.streamnative.oxia.proto.CloseSessionRequest; import io.streamnative.oxia.proto.CreateSessionRequest; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc; import io.streamnative.oxia.proto.SessionHeartbeat; import java.time.Duration; import java.util.function.Function; @@ -42,7 +42,7 @@ @Slf4j public class Session implements AutoCloseable { - private final @NonNull Function stubByShardId; + private final @NonNull Function stubByShardId; private final @NonNull Duration sessionTimeout; private final @NonNull Duration heartbeatInterval; @@ -60,7 +60,7 @@ public class Session implements AutoCloseable { private Disposable keepAliveSubscription; Session( - @NonNull Function stubByShardId, + @NonNull Function stubByShardId, @NonNull ClientConfig config, long shardId, long sessionId, @@ -92,7 +92,7 @@ void start() { Mono.just(heartbeat) .repeat() .delayElements(heartbeatInterval) - .flatMap(hb -> stubByShardId.apply(shardId).keepAlive(hb)) + .flatMap(hb -> stubByShardId.apply(shardId).reactor().keepAlive(hb)) .retryWhen(retrySpec) .timeout(sessionTimeout) .publishOn(scheduler) @@ -108,14 +108,14 @@ public void close() throws Exception { var stub = stubByShardId.apply(shardId); var request = CloseSessionRequest.newBuilder().setShardId(shardId).setSessionId(sessionId).build(); - stub.closeSession(request).block(); + stub.reactor().closeSession(request).block(); scheduler.dispose(); } @RequiredArgsConstructor(access = PACKAGE) static class Factory { @NonNull ClientConfig config; - @NonNull Function stubByShardId; + @NonNull Function stubByShardId; @NonNull SessionMetrics metrics; @NonNull @@ -127,7 +127,7 @@ Session create(long shardId) { .setShardId(shardId) .setClientIdentity(config.clientIdentifier()) .build(); - var response = stub.createSession(request).block(); + var response = stub.reactor().createSession(request).block(); if (response == null) { throw new IllegalStateException("Empty session returned for shardId: " + shardId); } diff --git a/client/src/main/java/io/streamnative/oxia/client/session/SessionManager.java b/client/src/main/java/io/streamnative/oxia/client/session/SessionManager.java index af341054..3a8ac525 100644 --- a/client/src/main/java/io/streamnative/oxia/client/session/SessionManager.java +++ b/client/src/main/java/io/streamnative/oxia/client/session/SessionManager.java @@ -20,9 +20,9 @@ import com.google.common.annotations.VisibleForTesting; import io.streamnative.oxia.client.ClientConfig; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.SessionMetrics; import io.streamnative.oxia.client.shard.ShardManager.ShardAssignmentChanges; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -44,8 +44,7 @@ public class SessionManager implements AutoCloseable, Consumer stubByShardId) { + @NonNull ClientConfig config, @NonNull Function stubByShardId) { this(new Session.Factory(config, stubByShardId, SessionMetrics.create(config.metrics()))); } diff --git a/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java b/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java index 27e86e5d..bbf8ae96 100644 --- a/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java +++ b/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java @@ -33,9 +33,9 @@ import io.streamnative.oxia.client.CompositeConsumer; import io.streamnative.oxia.client.grpc.CustomStatusCode; import io.streamnative.oxia.client.grpc.GrpcResponseStream; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.ShardAssignmentMetrics; import io.streamnative.oxia.client.metrics.api.Metrics; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import io.streamnative.oxia.proto.ShardAssignments; import io.streamnative.oxia.proto.ShardAssignmentsRequest; import java.time.Duration; @@ -50,7 +50,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Stream; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -72,23 +71,20 @@ public class ShardManager extends GrpcResponseStream implements AutoCloseable { @VisibleForTesting ShardManager( - @NonNull Supplier stubFactory, + @NonNull OxiaStub stub, @NonNull Assignments assignments, @NonNull CompositeConsumer callbacks, @NonNull ShardAssignmentMetrics metrics) { - super(stubFactory); + super(stub); this.assignments = assignments; this.callbacks = callbacks; this.metrics = metrics; this.scheduler = Schedulers.newSingle("shard-assignments"); } - public ShardManager( - @NonNull Supplier stubFactory, - @NonNull Metrics metrics, - @NonNull String namespace) { + public ShardManager(@NonNull OxiaStub stub, @NonNull Metrics metrics, @NonNull String namespace) { this( - stubFactory, + stub, new Assignments(Xxh332HashRangeShardStrategy, namespace), new CompositeConsumer<>(), ShardAssignmentMetrics.create(metrics)); @@ -101,8 +97,7 @@ public void close() { } @Override - protected CompletableFuture start( - ReactorOxiaClientStub stub, Consumer consumer) { + protected CompletableFuture start(OxiaStub stub, Consumer consumer) { RetryBackoffSpec retrySpec = Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100)) .filter(this::isErrorRetryable) @@ -110,10 +105,11 @@ protected CompletableFuture start( var assignmentsFlux = Flux.defer( () -> - stub.getShardAssignments( - ShardAssignmentsRequest.newBuilder() - .setNamespace(assignments.namespace) - .build())) + stub.reactor() + .getShardAssignments( + ShardAssignmentsRequest.newBuilder() + .setNamespace(assignments.namespace) + .build())) .doOnError(this::processError) .retryWhen(retrySpec) .repeat() diff --git a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java index a5aa1a2a..8e78a60c 100644 --- a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java @@ -38,8 +38,8 @@ import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteRangeOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation; -import io.streamnative.oxia.client.grpc.ChannelManager; -import io.streamnative.oxia.client.grpc.ChannelManager.StubFactory; +import io.streamnative.oxia.client.grpc.OxiaStub; +import io.streamnative.oxia.client.grpc.OxiaStubManager; import io.streamnative.oxia.client.metrics.OperationMetrics; import io.streamnative.oxia.client.metrics.OperationMetrics.Sample; import io.streamnative.oxia.client.notify.NotificationManager; @@ -60,14 +60,13 @@ @ExtendWith(MockitoExtension.class) class AsyncOxiaClientImplTest { - @Mock ChannelManager channelManager; + @Mock OxiaStubManager stubManager; @Mock ShardManager shardManager; @Mock NotificationManager notificationManager; @Mock BatchManager readBatchManager; @Mock BatchManager writeBatchManager; @Mock SessionManager sessionManager; @Mock Batcher batcher; - @Mock StubFactory reactorStubFactory; @Mock OperationMetrics metrics; AsyncOxiaClientImpl client; @@ -76,13 +75,12 @@ class AsyncOxiaClientImplTest { void setUp() { client = new AsyncOxiaClientImpl( - channelManager, + stubManager, shardManager, notificationManager, readBatchManager, writeBatchManager, sessionManager, - reactorStubFactory, metrics); } @@ -411,10 +409,7 @@ void getNullKey(@Mock Sample sample) throws Exception { } @Test - void list( - @Mock ReactorOxiaClientStub stub0, - @Mock ReactorOxiaClientStub stub1, - @Mock Sample> sample) { + void list(@Mock OxiaStub stub0, @Mock OxiaStub stub1, @Mock Sample> sample) { when(metrics.recordList()).thenReturn(sample); when(shardManager.getAll()).thenReturn(List.of(0L, 1L)); setupListStub(0L, "leader0", stub0); @@ -447,10 +442,13 @@ void listNullEnd(@Mock Sample> sample) throws Exception { assertThat(client.list("a", null)).isCompletedExceptionally(); } - private void setupListStub(long shardId, String leader, ReactorOxiaClientStub stub) { + private void setupListStub(long shardId, String leader, OxiaStub stub) { when(shardManager.leader(shardId)).thenReturn(leader); - when(reactorStubFactory.apply(leader)).thenReturn(stub); - when(stub.list(any(ListRequest.class))) + when(stubManager.getStub(leader)).thenReturn(stub); + + var reactor = mock(ReactorOxiaClientStub.class); + when(stub.reactor()).thenReturn(reactor); + when(reactor.list(any(ListRequest.class))) .thenReturn( Flux.just(listResponse(shardId, "a", "b"), listResponse(shardId, "c", "d")) .delayElements(Duration.ofMillis(1))); @@ -467,11 +465,11 @@ void close() throws Exception { client.close(); var inOrder = inOrder( - readBatchManager, writeBatchManager, notificationManager, shardManager, channelManager); + readBatchManager, writeBatchManager, notificationManager, shardManager, stubManager); inOrder.verify(readBatchManager).close(); inOrder.verify(writeBatchManager).close(); inOrder.verify(notificationManager).close(); inOrder.verify(shardManager).close(); - inOrder.verify(channelManager).close(); + inOrder.verify(stubManager).close(); } } diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java index 75bfc06b..5638643e 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java @@ -17,7 +17,6 @@ import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace; import static io.streamnative.oxia.proto.OxiaClientGrpc.OxiaClientImplBase; -import static io.streamnative.oxia.proto.ReactorOxiaClientGrpc.newReactorStub; import static io.streamnative.oxia.proto.Status.KEY_NOT_FOUND; import static io.streamnative.oxia.proto.Status.OK; import static io.streamnative.oxia.proto.Status.UNEXPECTED_VERSION_ID; @@ -31,7 +30,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; @@ -48,6 +46,7 @@ import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteRangeOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation.SessionInfo; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.BatchMetrics; import io.streamnative.oxia.client.metrics.api.Metrics; import io.streamnative.oxia.client.session.Session; @@ -57,7 +56,6 @@ import io.streamnative.oxia.proto.DeleteResponse; import io.streamnative.oxia.proto.GetResponse; import io.streamnative.oxia.proto.PutResponse; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import io.streamnative.oxia.proto.ReadRequest; import io.streamnative.oxia.proto.ReadResponse; import io.streamnative.oxia.proto.WriteRequest; @@ -81,7 +79,7 @@ @ExtendWith(MockitoExtension.class) class BatchTest { - Function clientByShardId; + Function clientByShardId; @Mock SessionManager sessionManager; @Mock Session session; long shardId = 1L; @@ -107,7 +105,7 @@ public void read( })); private Server server; - private ManagedChannel channel; + private OxiaStub stub; private final List>> writeResponses = new ArrayList<>(); private final List>> readResponses = new ArrayList<>(); @@ -121,14 +119,14 @@ public void setUp() throws Exception { .addService(serviceImpl) .build() .start(); - channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); - clientByShardId = s -> newReactorStub(channel); + stub = new OxiaStub(InProcessChannelBuilder.forName(serverName).directExecutor().build()); + clientByShardId = s -> stub; } @AfterEach - void tearDown() { + void tearDown() throws Exception { server.shutdownNow(); - channel.shutdownNow(); + stub.close(); } @Nested diff --git a/client/src/test/java/io/streamnative/oxia/client/grpc/ServiceAddressTest.java b/client/src/test/java/io/streamnative/oxia/client/grpc/ServiceAddressTest.java deleted file mode 100644 index ef424c5a..00000000 --- a/client/src/test/java/io/streamnative/oxia/client/grpc/ServiceAddressTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright © 2022-2024 StreamNative Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.oxia.client.grpc; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.junit.jupiter.api.Test; - -class ServiceAddressTest { - - @Test - void address() { - var address = new ServiceAddress("a:10"); - assertThat(address.host()).isEqualTo("a"); - assertThat(address.port()).isEqualTo(10); - } -} diff --git a/client/src/test/java/io/streamnative/oxia/client/notify/NotificationManagerTest.java b/client/src/test/java/io/streamnative/oxia/client/notify/NotificationManagerTest.java index 0e04c897..402589ae 100644 --- a/client/src/test/java/io/streamnative/oxia/client/notify/NotificationManagerTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/notify/NotificationManagerTest.java @@ -38,7 +38,8 @@ import io.streamnative.oxia.client.api.Notification.KeyCreated; import io.streamnative.oxia.client.api.Notification.KeyDeleted; import io.streamnative.oxia.client.api.Notification.KeyModified; -import io.streamnative.oxia.client.grpc.ChannelManager.StubFactory; +import io.streamnative.oxia.client.grpc.OxiaStub; +import io.streamnative.oxia.client.grpc.OxiaStubManager; import io.streamnative.oxia.client.metrics.NotificationMetrics; import io.streamnative.oxia.client.metrics.api.Metrics; import io.streamnative.oxia.client.shard.ShardManager; @@ -49,7 +50,6 @@ import io.streamnative.oxia.proto.NotificationBatch; import io.streamnative.oxia.proto.NotificationsRequest; import io.streamnative.oxia.proto.ReactorOxiaClientGrpc; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import java.util.List; import java.util.Optional; import java.util.Set; @@ -256,7 +256,7 @@ public Flux getNotifications(Mono reque long shardId1 = 1L; long shardId2 = 2L; - @Mock StubFactory stubByShardId; + @Mock OxiaStubManager stubManager; @Mock ShardManager shardManager; @Mock ShardManager.Assignments assignments; @Mock Consumer notificationCallback; @@ -281,10 +281,10 @@ void beforeEach() throws Exception { .start(); channel1 = InProcessChannelBuilder.forName(serverName1).directExecutor().build(); channel2 = InProcessChannelBuilder.forName(serverName2).directExecutor().build(); - var stub1 = ReactorOxiaClientGrpc.newReactorStub(channel1); - var stub2 = ReactorOxiaClientGrpc.newReactorStub(channel2); - when(stubByShardId.apply("leader1")).thenReturn(stub1); - when(stubByShardId.apply("leader2")).thenReturn(stub2); + var stub1 = new OxiaStub(channel1); + var stub2 = new OxiaStub(channel2); + when(stubManager.getStub("leader1")).thenReturn(stub1); + when(stubManager.getStub("leader2")).thenReturn(stub2); when(metrics.histogram(anyString(), any(Metrics.Unit.class))).thenReturn(histogram); } @@ -302,7 +302,7 @@ void notificationsFromMultipleShards() throws Exception { responses1.offer(Flux.just(notifications1).concatWith(Flux.never())); responses2.offer(Flux.just(notifications2).concatWith(Flux.never())); - try (var manager = new NotificationManager(stubByShardId, shardManager, metrics)) { + try (var manager = new NotificationManager(stubManager, shardManager, metrics)) { manager.registerCallback(notificationCallback); var changes = new ShardAssignmentChanges( diff --git a/client/src/test/java/io/streamnative/oxia/client/notify/ShardNotificationReceiverTest.java b/client/src/test/java/io/streamnative/oxia/client/notify/ShardNotificationReceiverTest.java index 358ae284..a201d610 100644 --- a/client/src/test/java/io/streamnative/oxia/client/notify/ShardNotificationReceiverTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/notify/ShardNotificationReceiverTest.java @@ -22,11 +22,9 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; @@ -35,17 +33,15 @@ import io.streamnative.oxia.client.api.Notification.KeyCreated; import io.streamnative.oxia.client.api.Notification.KeyDeleted; import io.streamnative.oxia.client.api.Notification.KeyModified; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.NotificationMetrics; import io.streamnative.oxia.proto.NotificationBatch; import io.streamnative.oxia.proto.NotificationsRequest; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc; import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.OxiaClientImplBase; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.function.Supplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -76,11 +72,10 @@ public Flux getNotifications(Mono reque String serverName = InProcessServerBuilder.generateName(); Server server; - ManagedChannel channel; long shardId = 1L; String leader = "address"; - @Mock Supplier stubFactory; + @Mock OxiaStub stub; @Mock Consumer notificationCallback; @Mock NotificationMetrics metrics; @@ -94,14 +89,12 @@ void beforeEach() throws Exception { .addService(serviceImpl) .build() .start(); - channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); - var stub = ReactorOxiaClientGrpc.newReactorStub(channel); - doReturn(stub).when(stubFactory).get(); + stub = new OxiaStub(InProcessChannelBuilder.forName(serverName).directExecutor().build()); } @AfterEach - void afterEach() { - channel.shutdownNow(); + void afterEach() throws Exception { + stub.close(); server.shutdownNow(); } @@ -115,7 +108,7 @@ void start() { .build(); responses.offer(Flux.just(notifications).concatWith(Flux.never())); try (var notificationReceiver = - new ShardNotificationReceiver(stubFactory, shardId, notificationCallback, metrics)) { + new ShardNotificationReceiver(stub, shardId, notificationCallback, metrics)) { assertThat(notificationReceiver.start()).isCompleted(); await() .untilAsserted( @@ -132,7 +125,7 @@ void start() { void neverStarts() { responses.offer(Flux.never()); try (var notificationReceiver = - new ShardNotificationReceiver(stubFactory, shardId, notificationCallback, metrics)) { + new ShardNotificationReceiver(stub, shardId, notificationCallback, metrics)) { assertThat(notificationReceiver.start()).isCompleted(); await() .untilAsserted( @@ -149,7 +142,7 @@ public void recoveryFromError() { NotificationBatch.newBuilder().putNotifications("key1", created(1L)).build(); responses.offer(Flux.just(notifications).concatWith(Flux.never())); try (var notificationReceiver = - new ShardNotificationReceiver(stubFactory, shardId, notificationCallback, metrics)) { + new ShardNotificationReceiver(stub, shardId, notificationCallback, metrics)) { assertThat(notificationReceiver.start()).isCompleted(); await() .untilAsserted( @@ -167,7 +160,7 @@ public void recoveryFromEndOfStream() { NotificationBatch.newBuilder().putNotifications("key1", created(1L)).build(); responses.offer(Flux.just(notifications).concatWith(Flux.never())); try (var notificationReceiver = - new ShardNotificationReceiver(stubFactory, shardId, notificationCallback, metrics)) { + new ShardNotificationReceiver(stub, shardId, notificationCallback, metrics)) { assertThat(notificationReceiver.start()).isCompleted(); await() .untilAsserted( diff --git a/client/src/test/java/io/streamnative/oxia/client/session/SessionTest.java b/client/src/test/java/io/streamnative/oxia/client/session/SessionTest.java index 1727825b..489174e1 100644 --- a/client/src/test/java/io/streamnative/oxia/client/session/SessionTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/session/SessionTest.java @@ -22,11 +22,11 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.verify; -import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.streamnative.oxia.client.ClientConfig; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.SessionMetrics; import io.streamnative.oxia.client.metrics.api.Metrics; import io.streamnative.oxia.proto.CloseSessionRequest; @@ -53,7 +53,7 @@ @ExtendWith(MockitoExtension.class) class SessionTest { - Function stubByShardId; + Function stubByShardId; ClientConfig config; long shardId = 1L; long sessionId = 2L; @@ -61,7 +61,7 @@ class SessionTest { String clientId = "client"; private Server server; - private ManagedChannel channel; + private OxiaStub stub; private TestService service; @Mock SessionMetrics metrics; @@ -91,18 +91,19 @@ void setup() throws IOException { .addService(service) .build() .start(); - channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); - stubByShardId = s -> ReactorOxiaClientGrpc.newReactorStub(channel); + stub = new OxiaStub(InProcessChannelBuilder.forName(serverName).directExecutor().build()); + + stubByShardId = s -> stub; } @AfterEach - public void stopServer() throws InterruptedException { + public void stopServer() throws Exception { server.shutdown(); server.awaitTermination(); - channel.shutdown(); + stub.close(); server = null; - channel = null; + stub = null; } @Test diff --git a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerGrpcTest.java b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerGrpcTest.java index cd62398e..f80acb15 100644 --- a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerGrpcTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerGrpcTest.java @@ -22,20 +22,17 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; -import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.api.Metrics; import io.streamnative.oxia.proto.Int32HashRange; import io.streamnative.oxia.proto.NamespaceShardsAssignment; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc; import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.OxiaClientImplBase; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import io.streamnative.oxia.proto.ShardAssignment; import io.streamnative.oxia.proto.ShardAssignments; import io.streamnative.oxia.proto.ShardAssignmentsRequest; @@ -44,7 +41,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -74,8 +70,8 @@ public Flux getShardAssignments(Mono String serverName = InProcessServerBuilder.generateName(); Server server; - ManagedChannel channel; - @Mock Supplier stubFactory; + + @Mock OxiaStub stub; @Mock Metrics metrics; @Mock Metrics.Histogram histogram; @@ -89,15 +85,14 @@ void beforeEach() throws Exception { .addService(serviceImpl) .build() .start(); - channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); - var stub = ReactorOxiaClientGrpc.newReactorStub(channel); - doReturn(stub).when(stubFactory).get(); + + stub = new OxiaStub(InProcessChannelBuilder.forName(serverName).directExecutor().build()); when(metrics.histogram(anyString(), any(Metrics.Unit.class))).thenReturn(histogram); } @AfterEach - void afterEach() { - channel.shutdownNow(); + void afterEach() throws Exception { + stub.close(); server.shutdownNow(); } @@ -110,7 +105,7 @@ void start() { NamespaceShardsAssignment.newBuilder().addAssignments(assignment(0, 0, 3)).build()) .build(); responses.offer(Flux.just(assignments).concatWith(Flux.never())); - try (var shardManager = new ShardManager(stubFactory, metrics, DefaultNamespace)) { + try (var shardManager = new ShardManager(stub, metrics, DefaultNamespace)) { assertThat(shardManager.start()).succeedsWithin(Duration.ofSeconds(1)); assertThat(shardManager.getAll()).containsExactlyInAnyOrder(0L); assertThat(shardManager.leader(0)).isEqualTo("leader0"); @@ -120,7 +115,7 @@ void start() { @Test void neverStarts() { responses.offer(Flux.never()); - try (var shardManager = new ShardManager(stubFactory, metrics, DefaultNamespace)) { + try (var shardManager = new ShardManager(stub, metrics, DefaultNamespace)) { assertThatThrownBy(() -> shardManager.start().get(1, SECONDS)) .isInstanceOf(TimeoutException.class); assertThat(shardManager.getAll()).isEmpty(); @@ -145,7 +140,7 @@ void update() { .build()) .build(); responses.offer(Flux.just(assignments0, assignments1).concatWith(Flux.never())); - try (var shardManager = new ShardManager(stubFactory, metrics, DefaultNamespace)) { + try (var shardManager = new ShardManager(stub, metrics, DefaultNamespace)) { shardManager.start().join(); await() .untilAsserted( @@ -167,7 +162,7 @@ public void recoveryFromError() { NamespaceShardsAssignment.newBuilder().addAssignments(assignment(0, 0, 3)).build()) .build(); responses.offer(Flux.just(assignments).concatWith(Flux.never())); - try (var shardManager = new ShardManager(stubFactory, metrics, DefaultNamespace)) { + try (var shardManager = new ShardManager(stub, metrics, DefaultNamespace)) { assertThat(shardManager.start()).succeedsWithin(Duration.ofSeconds(1)); assertThat(shardManager.getAll()).containsExactlyInAnyOrder(0L); assertThat(shardManager.leader(0)).isEqualTo("leader0"); @@ -185,7 +180,7 @@ public void recoveryFromEndOfStream() { NamespaceShardsAssignment.newBuilder().addAssignments(assignment(0, 0, 3)).build()) .build(); responses.offer(Flux.just(assignments).concatWith(Flux.never())); - try (var shardManager = new ShardManager(stubFactory, metrics, DefaultNamespace)) { + try (var shardManager = new ShardManager(stub, metrics, DefaultNamespace)) { assertThat(shardManager.start()).succeedsWithin(Duration.ofSeconds(1)); assertThat(shardManager.getAll()).containsExactlyInAnyOrder(0L); assertThat(shardManager.leader(0)).isEqualTo("leader0"); diff --git a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java index 074134a6..cf1561ef 100644 --- a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java @@ -25,14 +25,16 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.streamnative.oxia.client.CompositeConsumer; +import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.metrics.ShardAssignmentMetrics; import io.streamnative.oxia.client.shard.ShardManager.ShardAssignmentChange.Added; import io.streamnative.oxia.proto.NamespaceShardsAssignment; -import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; +import io.streamnative.oxia.proto.ReactorOxiaClientGrpc; import io.streamnative.oxia.proto.ShardAssignment; import io.streamnative.oxia.proto.ShardAssignments; import io.streamnative.oxia.proto.ShardAssignmentsRequest; @@ -42,7 +44,6 @@ import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -175,21 +176,26 @@ class ManagerTests { ShardManager.Assignments assignments = new ShardManager.Assignments(Xxh332HashRangeShardStrategy, DefaultNamespace); - @Mock Supplier stubFactory; + @Mock ReactorOxiaClientGrpc.ReactorOxiaClientStub reactor; + @Mock OxiaStub stub; @Mock ShardAssignmentMetrics metrics; ShardManager manager; @BeforeEach void mocking() { - manager = new ShardManager(stubFactory, assignments, new CompositeConsumer<>(), metrics); + stub = mock(OxiaStub.class); + reactor = mock(ReactorOxiaClientGrpc.ReactorOxiaClientStub.class); + + manager = new ShardManager(stub, assignments, new CompositeConsumer<>(), metrics); } @Test - void start(@Mock ReactorOxiaClientStub stub) { - when(stubFactory.get()).thenReturn(stub); + void start() { var assignment = ShardAssignment.newBuilder().setShardId(0).setLeader("leader0").build(); var nsAssignment = NamespaceShardsAssignment.newBuilder().addAssignments(assignment).build(); - when(stub.getShardAssignments( + + when(stub.reactor()).thenReturn(reactor); + when(reactor.getShardAssignments( ShardAssignmentsRequest.newBuilder().setNamespace(namespace).build())) .thenReturn( Flux.just(