Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Migrated to Netty 4.0.10.final #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions handshake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,6 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<repositories>
<repository>
<id>repository.jboss.org</id>
<url>http://repository.jboss.org/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>junit</groupId>
Expand All @@ -50,9 +40,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.3.Final</version>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.10.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,18 @@

import com.biasedbit.nettytutorials.handshake.common.ByteCounter;
import com.biasedbit.nettytutorials.handshake.common.MessageCounter;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
* @author <a href="mailto:[email protected]">Bruno de Carvalho</a>
Expand All @@ -29,7 +25,7 @@ public class Client {
private final String id;
private final String serverId;
private final ClientListener listener;
private ClientBootstrap bootstrap;
private Bootstrap bootstrap;
private Channel connector;

// constructors -----------------------------------------------------------
Expand All @@ -44,26 +40,27 @@ public Client(String id, String serverId, ClientListener listener) {

public boolean start() {
// Standard netty bootstrapping stuff.
Executor bossPool = Executors.newCachedThreadPool();
Executor workerPool = Executors.newCachedThreadPool();
ChannelFactory factory =
new NioClientSocketChannelFactory(bossPool, workerPool);
this.bootstrap = new ClientBootstrap(factory);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
this.bootstrap = new Bootstrap()
.channel(NioSocketChannel.class)
.group(workerGroup);

// Declared outside to fit under 80 char limit
final DelimiterBasedFrameDecoder frameDecoder =
new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,
Delimiters.lineDelimiter());
this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
this.bootstrap.handler(new ChannelInitializer() {

@Override
protected void initChannel(Channel ch) throws Exception {
ByteCounter byteCounter =
new ByteCounter("--- CLIENT-COUNTER :: ");
MessageCounter messageCounter =
new MessageCounter("--- CLIENT-MSGCOUNTER :: ");
ClientHandshakeHandler handshakeHandler =
new ClientHandshakeHandler(id, serverId, 5000);

return Channels.pipeline(byteCounter,
ch.pipeline().addLast(byteCounter,
frameDecoder,
new StringDecoder(),
new StringEncoder(),
Expand All @@ -78,29 +75,29 @@ public ChannelPipeline getPipeline() throws Exception {
if (!future.awaitUninterruptibly().isSuccess()) {
System.out.println("--- CLIENT - Failed to connect to server at " +
"localhost:12345.");
this.bootstrap.releaseExternalResources();
workerGroup.shutdownGracefully();
return false;
}

this.connector = future.getChannel();
return this.connector.isConnected();
this.connector = future.channel();
return this.connector.isActive();
}

public void stop() {
if (this.connector != null) {
this.connector.close().awaitUninterruptibly();
}
this.bootstrap.releaseExternalResources();
this.bootstrap.group().shutdownGracefully();
System.out.println("--- CLIENT - Stopped.");
}

public boolean sendMessage(String message) {
if (this.connector.isConnected()) {
if (this.connector.isActive()) {
// Append \n if it's not present, because of the frame delimiter
if (!message.endsWith("\n")) {
this.connector.write(message + '\n');
this.connector.writeAndFlush(message + '\n');
} else {
this.connector.write(message);
this.connector.writeAndFlush(message);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package com.biasedbit.nettytutorials.handshake.client;

import com.biasedbit.nettytutorials.handshake.common.HandshakeEvent;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author <a href="mailto:[email protected]">Bruno de Carvalho</a>
*/
public class ClientHandler extends SimpleChannelUpstreamHandler {
public class ClientHandler extends ChannelInboundHandlerAdapter {

// internal vars ----------------------------------------------------------

Expand All @@ -29,7 +26,7 @@ public ClientHandler(ClientListener listener) {
// SimpleChannelUpstreamHandler -------------------------------------------

@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
public void userEventTriggered(ChannelHandlerContext ctx, Object e)
throws Exception {
if (e instanceof HandshakeEvent) {
if (((HandshakeEvent) e).isSuccessful()) {
Expand All @@ -41,22 +38,21 @@ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
return;
}

super.handleUpstream(ctx, e);
super.userEventTriggered(ctx, e);
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
this.counter.incrementAndGet();
this.listener.messageReceived(e.getMessage().toString());
this.listener.messageReceived(msg.toString());
}

@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
super.channelClosed(ctx, e);
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
out("--- CLIENT-HANDLER :: Channel closed, received " +
this.counter.get() + " messages: " + e.getChannel());
this.counter.get() + " messages: " + ctx.channel());
}

// private static helpers -------------------------------------------------
Expand Down
Loading