Skip to content

Commit

Permalink
initial binary client based on Netty
Browse files Browse the repository at this point in the history
  • Loading branch information
mlozbin-cybervisiontech committed Nov 27, 2015
1 parent 8250803 commit a953e07
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 177 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ repositories {
}

dependencies {
compile 'io.netty:netty-all:4.0.31.Final'
compile 'org.slf4j:slf4j-api:1.7.13'
compile 'com.google.guava:guava:18.0'
compile 'org.projectlombok:lombok:1.16.6', optional
Expand Down
58 changes: 9 additions & 49 deletions src/main/java/com/github/nginate/kafka/KafkaClusterClient.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package com.github.nginate.kafka;

import com.github.nginate.kafka.core.ClusterMetadata;
import com.github.nginate.kafka.core.KafkaConnection;
import com.github.nginate.kafka.core.KafkaConnectionPool;
import com.github.nginate.kafka.core.KafkaBrokerClient;
import com.github.nginate.kafka.dto.Partition;
import com.github.nginate.kafka.exceptions.CommunicationException;
import com.github.nginate.kafka.exceptions.ConnectionException;
import com.github.nginate.kafka.exceptions.KafkaTimeoutException;
import com.github.nginate.kafka.exceptions.SerializationException;
import com.github.nginate.kafka.protocol.Serializer;
import com.github.nginate.kafka.network.BinaryMessageSerializer;
import com.github.nginate.kafka.protocol.messages.Request;
import com.github.nginate.kafka.protocol.messages.Response;
import lombok.RequiredArgsConstructor;
Expand All @@ -17,21 +13,15 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.github.nginate.kafka.util.StringUtils.format;

@Slf4j
@RequiredArgsConstructor
public class KafkaClusterClient implements Closeable {

private final KafkaConnectionPool connectionPool;
private final Serializer serializer;
private final String topic;
private final BinaryMessageSerializer binaryMessageSerializer;

public InetAddress getLeader(Partition partition) {
return null;
Expand All @@ -49,47 +39,17 @@ public ClusterMetadata getTopicMetadata(String topic) {
return null;
}

public <RS extends Response, RQ extends Request> RS request(RQ request, long timeout) throws KafkaTimeoutException {
try {
CompletableFuture<RS> future = new CompletableFuture<>();
callInConnection(getAddress(), request, future);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new KafkaTimeoutException(format("Request '{}' execution failed", request), e);
}
public <T extends Response> CompletableFuture<T> sendAndReceive(Request request, Class<T> responseClass)
throws CommunicationException {
return getClientForCurrentTopicLeader().sendAndReceive(request, responseClass);
}

@Override
public void close() throws IOException {

}

private InetAddress getAddress() throws CommunicationException {
try {
return InetAddress.getByName("127.0.0.1");
} catch (UnknownHostException e) {
throw new CommunicationException("Unknown host", e);
}
}

private <RS extends Response, RQ extends Request> void callInConnection(InetAddress address, RQ request,
CompletableFuture<RS> future) {
KafkaConnection connection = null;
try {
connection = connectionPool.connect(address);
byte[] rawResponse = connection.sendAndReceive(serializer.serialize(request));
future.complete(serializer.deserialize(rawResponse));
} catch (ConnectionException e) {
log.error("Could not connect to {}", address, e);
future.completeExceptionally(e);
} catch (CommunicationException e) {
log.error("Communication error with {}", address, e);
future.completeExceptionally(e);
} catch (SerializationException e) {
log.error("Failed on serialization", e);
future.completeExceptionally(e);
} finally {
connectionPool.releaseConnection(connection);
}
private KafkaBrokerClient getClientForCurrentTopicLeader() {
return null;
}
}

This file was deleted.

40 changes: 40 additions & 0 deletions src/main/java/com/github/nginate/kafka/core/KafkaBrokerClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.github.nginate.kafka.core;

import com.github.nginate.kafka.exceptions.CommunicationException;
import com.github.nginate.kafka.network.client.BinaryTcpClient;
import com.github.nginate.kafka.network.client.BinaryTcpClientConfig;
import com.github.nginate.kafka.protocol.messages.Request;
import com.github.nginate.kafka.protocol.messages.Response;
import lombok.Builder;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

@Builder(toBuilder = true)
public class KafkaBrokerClient implements Closeable {
private final BinaryTcpClient binaryTcpClient;

public KafkaBrokerClient(String host, int port) {
BinaryTcpClientConfig config = BinaryTcpClientConfig.custom().host(host).port(port).build();
binaryTcpClient = new BinaryTcpClient(config);
}

public void connect() {
binaryTcpClient.connect();
}

public void send(Request request) {
Response response = (Response) binaryTcpClient.request(request).join();
// TODO: check ack from broker
}

public <T extends Response> CompletableFuture<T> sendAndReceive(Request request, Class<T> responseClass)
throws CommunicationException {
return binaryTcpClient.request(request).thenApply(responseClass::cast);
}

@Override
public void close() {
binaryTcpClient.close();
}
}
28 changes: 0 additions & 28 deletions src/main/java/com/github/nginate/kafka/core/KafkaConnection.java

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.github.nginate.kafka.network;

public interface AnswerableMessage<T> {
T getCorrelationId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.nginate.kafka.network;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import lombok.RequiredArgsConstructor;

import java.util.List;

@RequiredArgsConstructor
public class BinaryMessageDecoder extends ReplayingDecoder<Void> {
private final BinaryMessageSerializer serializer;

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(serializer.deserialize(in));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.github.nginate.kafka.network;


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class BinaryMessageEncoder extends MessageToByteEncoder<AnswerableMessage> {
private final BinaryMessageSerializer serializer;

@Override
protected void encode(ChannelHandlerContext ctx, AnswerableMessage msg, ByteBuf out) throws Exception {
serializer.serialize(out, msg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.github.nginate.kafka.network;

import com.github.nginate.kafka.exceptions.SerializationException;
import io.netty.buffer.ByteBuf;

public interface BinaryMessageSerializer {
void serialize(ByteBuf buf, Object message) throws SerializationException;

Object deserialize(ByteBuf buf) throws SerializationException;
}
Loading

0 comments on commit a953e07

Please sign in to comment.