workerPool) {
- this.workerPool = workerPool;
- }
-
/**
* Handle downstream event.
*
@@ -99,10 +93,6 @@ public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
}
}
- AbstractNioWorker nextWorker() {
- return workerPool.nextWorker();
- }
-
private static final class ParentMessageEvent implements MessageEvent {
private final MessageEvent delegate;
diff --git a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioClientDatagramChannelFactory.java
similarity index 63%
rename from mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java
rename to mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioClientDatagramChannelFactory.java
index ed1c4c7ae3..3ddac8b50c 100644
--- a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java
+++ b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioClientDatagramChannelFactory.java
@@ -35,13 +35,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.InternetProtocolFamily;
-import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
@@ -53,19 +51,19 @@
*
* How threads work
*
- * There is only one thread type in a {@link NioDatagramChannelFactory};
+ * There is only one thread type in a {@link NioClientDatagramChannelFactory};
* worker threads.
*
*
Worker threads
*
- * One {@link NioDatagramChannelFactory} can have one or more worker
+ * One {@link NioClientDatagramChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or
* more {@link DatagramChannel}s in a non-blocking mode.
*
*
Life cycle of threads and graceful shutdown
*
* All worker threads are acquired from the {@link Executor} which was specified
- * when a {@link NioDatagramChannelFactory} was created. Therefore, you should
+ * when a {@link NioClientDatagramChannelFactory} was created. Therefore, you should
* make sure the specified {@link Executor} is able to lend the sufficient
* number of threads. It is the best bet to specify
* {@linkplain Executors#newCachedThreadPool() a cached thread pool}.
@@ -89,56 +87,27 @@
*
* Multicast is not supported. Please use {@link OioDatagramChannelFactory}
* instead.
- *
- * @apiviz.landmark
*/
-public class NioDatagramChannelFactory implements DatagramChannelFactory {
+public class NioClientDatagramChannelFactory implements DatagramChannelFactory {
private final NioDatagramPipelineSink sink;
- private final NioChildDatagramPipelineSink childSink;
- private final WorkerPool workerPool;
- private final WorkerPool childPool;
+ private final WorkerPool workerPool;
private final InternetProtocolFamily family;
private boolean releasePool;
- /**
- * Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()}
- * and without preferred {@link InternetProtocolFamily}. Please note that the {@link InternetProtocolFamily}
- * of the channel will be platform (and possibly configuration) dependent and therefore
- * unspecified. Use {@link #NioDatagramChannelFactory(InternetProtocolFamily)} if unsure.
- *
- * See {@link #NioDatagramChannelFactory(Executor)}
- */
- public NioDatagramChannelFactory() {
- this((InternetProtocolFamily) null);
- }
-
- /**
- * Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()}.
- *
- * See {@link #NioDatagramChannelFactory(Executor)}
- */
- public NioDatagramChannelFactory(InternetProtocolFamily family) {
- workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), SelectorUtil.DEFAULT_IO_THREADS);
- childPool = new NioWorkerPool(Executors.newCachedThreadPool(), SelectorUtil.DEFAULT_IO_THREADS);
- this.family = family;
- sink = new NioDatagramPipelineSink(workerPool);
- childSink = new NioChildDatagramPipelineSink(childPool);
+ public NioClientDatagramChannelFactory(WorkerPool workerPool) {
+ this.workerPool = workerPool;
+ this.family = null;
+ sink = new NioDatagramPipelineSink();
releasePool = true;
}
public DatagramChannel newChannel(final ChannelPipeline pipeline) {
- return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker(), family);
- }
-
- // mina.netty change - adding this to create child datagram channels
- public NioChildDatagramChannel newChildChannel(Channel parent, final ChannelPipeline pipeline) {
- return new NioChildDatagramChannel(parent, this, pipeline, childSink, childSink.nextWorker(), family);
+ return new NioDatagramChannel(this, pipeline, sink, workerPool.nextWorker(), family);
}
public void shutdown() {
workerPool.shutdown();
- childPool.shutdown();
if (releasePool) {
releasePool();
}
@@ -146,7 +115,6 @@ public void shutdown() {
public void releaseExternalResources() {
workerPool.shutdown();
- childPool.shutdown();
releasePool();
}
@@ -154,8 +122,5 @@ private void releasePool() {
if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources();
}
- if (childPool instanceof ExternalResourceReleasable) {
- ((ExternalResourceReleasable) childPool).releaseExternalResources();
- }
}
-}
\ No newline at end of file
+}
diff --git a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramBossPool.java b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramBossPool.java
new file mode 100644
index 0000000000..3578b5f21c
--- /dev/null
+++ b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramBossPool.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2007-2016, Kaazing Corporation. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jboss.netty.channel.socket.nio;
+
+import org.jboss.netty.util.ThreadNameDeterminer;
+
+import java.util.concurrent.Executor;
+
+
+/**
+ * Holds {@link NioServerDatagramBoss} instances to use
+ */
+public class NioDatagramBossPool extends AbstractNioBossPool {
+ private final ThreadNameDeterminer determiner;
+
+ /**
+ * Create a new instance
+ *
+ * @param bossExecutor the {@link Executor} to use for server the {@link NioServerDatagramBoss}
+ * @param bossCount the number of {@link NioServerDatagramBoss} instances this {@link NioDatagramBossPool} will hold
+ * @param determiner the {@link ThreadNameDeterminer} to use for name the threads. Use {@code null}
+ * if you not want to set one explicit.
+ */
+ public NioDatagramBossPool(Executor bossExecutor, int bossCount, ThreadNameDeterminer determiner) {
+ super(bossExecutor, bossCount, false);
+ this.determiner = determiner;
+ init();
+ }
+
+ /**
+ * Create a new instance using no {@link ThreadNameDeterminer}
+ *
+ * @param bossExecutor the {@link Executor} to use for server the {@link NioServerDatagramBoss}
+ * @param bossCount the number of {@link NioServerDatagramBoss} instances this {@link NioDatagramBossPool} will hold
+ */
+ public NioDatagramBossPool(Executor bossExecutor, int bossCount) {
+ this(bossExecutor, bossCount, null);
+ }
+
+ @Override
+ protected NioServerDatagramBoss newBoss(Executor executor) {
+ return new NioServerDatagramBoss(executor);
+ }
+}
diff --git a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java
index 7d0d15a245..8961b8d6e9 100644
--- a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java
+++ b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java
@@ -70,7 +70,7 @@ public class NioDatagramChannel extends AbstractNioChannel
NioDatagramChannel(final ChannelFactory factory,
final ChannelPipeline pipeline, final ChannelSink sink,
- final NioDatagramWorker worker, InternetProtocolFamily family) {
+ final AbstractNioWorker worker, InternetProtocolFamily family) {
super(null, factory, pipeline, sink, worker, openNonBlockingChannel(family), true);
config = new DefaultNioDatagramChannelConfig(channel);
@@ -111,11 +111,6 @@ private static DatagramChannel openNonBlockingChannel(InternetProtocolFamily fam
}
}
- @Override
- public NioDatagramWorker getWorker() {
- return (NioDatagramWorker) super.getWorker();
- }
-
public boolean isBound() {
return isOpen() && channel.socket().isBound();
}
diff --git a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java
index da33bb211b..ab8e972a25 100644
--- a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java
+++ b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java
@@ -33,7 +33,6 @@
import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress;
-import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
@@ -49,23 +48,6 @@
*/
class NioDatagramPipelineSink extends AbstractNioChannelSink {
- private final WorkerPool workerPool;
-
- /**
- * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s
- * specified in workerCount. The {@link NioDatagramWorker}s take care of reading and writing
- * for the {@link NioDatagramChannel}.
- *
- * @param workerExecutor
- * the {@link Executor} that will run the {@link NioDatagramWorker}s
- * for this sink
- * @param workerCount
- * the number of {@link NioDatagramWorker}s for this sink
- */
- NioDatagramPipelineSink(final WorkerPool workerPool) {
- this.workerPool = workerPool;
- }
-
/**
* Handle downstream event.
*
@@ -98,7 +80,7 @@ public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
if (value != null) {
connect(channel, future, (InetSocketAddress) value);
} else {
- NioDatagramWorker.disconnect(channel, future);
+ NioServerDatagramBoss.disconnect(channel, future);
}
break;
case INTEREST_OPS:
@@ -201,8 +183,4 @@ private static void connect(
}
}
- NioDatagramWorker nextWorker() {
- return workerPool.nextWorker();
- }
-
}
diff --git a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioServerDatagramBoss.java
similarity index 99%
rename from mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java
rename to mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioServerDatagramBoss.java
index f880fa90de..5c27adb672 100644
--- a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java
+++ b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioServerDatagramBoss.java
@@ -55,7 +55,7 @@
* A class responsible for registering channels with {@link Selector}.
* It also implements the {@link Selector} loop.
*/
-public class NioDatagramWorker extends AbstractNioWorker {
+public class NioServerDatagramBoss extends AbstractNioWorker implements Boss {
/**
* Sole constructor.
@@ -63,7 +63,7 @@ public class NioDatagramWorker extends AbstractNioWorker {
* @param executor the {@link Executor} used to execute {@link Runnable}s
* such as {@link ChannelRegistionTask}
*/
- NioDatagramWorker(final Executor executor) {
+ NioServerDatagramBoss(final Executor executor) {
super(executor);
}
diff --git a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioServerDatagramChannelFactory.java b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioServerDatagramChannelFactory.java
new file mode 100644
index 0000000000..0d699818eb
--- /dev/null
+++ b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioServerDatagramChannelFactory.java
@@ -0,0 +1,117 @@
+/**
+ * Copyright 2007-2016, Kaazing Corporation. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jboss.netty.channel.socket.nio;
+
+import java.util.concurrent.Executor;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.socket.DatagramChannel;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.InternetProtocolFamily;
+import org.jboss.netty.util.ExternalResourceReleasable;
+
+/**
+ * A {@link DatagramChannelFactory} that creates a NIO-based connectionless
+ * {@link DatagramChannel}. It utilizes the non-blocking I/O mode which
+ * was introduced with NIO to serve many number of concurrent connections
+ * efficiently.
+ *
+ * How threads work
+ *
+ * There are two types of threads in a {@link NioServerDatagramChannelFactory};
+ * one is boss thread and the other is worker thread.
+ *
+ *
Boss threads
+ *
+ * Each bound {@link NioDatagramChannel} has its own boss thread.
+ * For example, if you opened two server ports such as 80 and 443, you will
+ * have two boss threads. A boss thread creates child sessions based on
+ * the remote address of client. Once a child connection is created
+ * successfully, the boss thread passes the child {@link Channel} to one of
+ * the worker threads that the {@link NioServerDatagramChannelFactory} manages.
+ *
+ *
Worker threads
+ *
+ * One {@link NioServerDatagramChannelFactory} can have one or more worker
+ * threads. A worker thread performs non-blocking read and write for one or
+ * more {@link Channel}s in a non-blocking mode.
+ */
+public class NioServerDatagramChannelFactory implements DatagramChannelFactory {
+
+ private final NioDatagramPipelineSink sink;
+ private final NioChildDatagramPipelineSink childSink;
+ private final BossPool bossPool;
+ private final WorkerPool workerPool;
+ private final InternetProtocolFamily family;
+ private boolean releasePool;
+
+ public NioServerDatagramChannelFactory(Executor bossExecutor, int bossCount, WorkerPool workerPool) {
+ bossPool = new NioDatagramBossPool(bossExecutor, bossCount, null);
+ this.workerPool = workerPool;
+ this.family = null;
+ sink = new NioDatagramPipelineSink();
+ childSink = new NioChildDatagramPipelineSink();
+ releasePool = true;
+ }
+
+ public DatagramChannel newChannel(final ChannelPipeline pipeline) {
+ return new NioDatagramChannel(this, pipeline, sink, bossPool.nextBoss(), family);
+ }
+
+ // mina.netty change - adding this to create child datagram channels
+ public NioChildDatagramChannel newChildChannel(Channel parent, final ChannelPipeline pipeline) {
+ return new NioChildDatagramChannel(parent, this, pipeline, childSink, workerPool.nextWorker(), family);
+ }
+
+ public void shutdown() {
+ workerPool.shutdown();
+ bossPool.shutdown();
+ if (releasePool) {
+ releasePool();
+ }
+ }
+
+ public void releaseExternalResources() {
+ workerPool.shutdown();
+ bossPool.shutdown();
+ releasePool();
+ }
+
+ private void releasePool() {
+ if (workerPool instanceof ExternalResourceReleasable) {
+ ((ExternalResourceReleasable) workerPool).releaseExternalResources();
+ }
+ if (bossPool instanceof ExternalResourceReleasable) {
+ ((ExternalResourceReleasable) bossPool).releaseExternalResources();
+ }
+ }
+}
diff --git a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java
index ae9723c9b1..14de230c81 100644
--- a/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java
+++ b/mina.netty/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java
@@ -30,6 +30,7 @@
*/
package org.jboss.netty.channel.socket.nio;
+import static org.jboss.netty.channel.Channels.fireWriteComplete;
import static org.kaazing.mina.netty.config.InternalSystemProperty.MAXIMUM_PROCESS_TASKS_TIME;
import static java.lang.String.format;
import static org.jboss.netty.channel.Channels.fireChannelBound;
@@ -41,10 +42,13 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -53,6 +57,7 @@
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.util.ThreadNameDeterminer;
@@ -100,9 +105,12 @@ protected final long getMaximumProcessTaskQueueTimeNanos() {
@Override
protected boolean read(SelectionKey k) {
- final SocketChannel ch = (SocketChannel) k.channel();
- final NioSocketChannel channel = (NioSocketChannel) k.attachment();
+ ReadDispatcher dispatcher = (ReadDispatcher) k.attachment();
+ return dispatcher.dispatch(this, k);
+ }
+ private boolean readTcp(SelectionKey k, NioSocketChannel channel) {
+ final SocketChannel ch = (SocketChannel) k.channel();
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
@@ -176,16 +184,20 @@ protected int select(Selector selector, boolean quickSelect) throws IOException
@Override
protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
- boolean server = !(channel instanceof NioClientSocketChannel);
- return new RegisterTask((NioSocketChannel) channel, future, server);
+ if (channel instanceof NioSocketChannel) {
+ boolean server = !(channel instanceof NioClientSocketChannel);
+ return new TcpChannelRegisterTask((NioSocketChannel) channel, future, server);
+ } else {
+ return new UdpChannelRegistionTask((NioDatagramChannel) channel, future);
+ }
}
- private final class RegisterTask implements Runnable {
+ private final class TcpChannelRegisterTask implements Runnable {
private final NioSocketChannel channel;
private final ChannelFuture future;
private final boolean server;
- RegisterTask(
+ TcpChannelRegisterTask(
NioSocketChannel channel, ChannelFuture future, boolean server) {
this.channel = channel;
@@ -212,7 +224,7 @@ public void run() {
}
channel.channel.register(
- selector, channel.getRawInterestOps(), channel);
+ selector, channel.getRawInterestOps(), new TcpReadDispatcher(channel));
if (future != null) {
channel.setConnected();
@@ -241,4 +253,330 @@ public void run() {
super.run();
recvBufferPool.releaseExternalResources();
}
+
+ @Override
+ protected void write0(final AbstractNioChannel> channel) {
+ if (channel instanceof NioSocketChannel || channel instanceof NioChildDatagramChannel) {
+ super.write0(channel);
+ } else {
+ write0Udp(channel);
+ }
+ }
+
+ @Override
+ void writeFromUserCode(final AbstractNioChannel> channel) {
+ if (channel instanceof NioDatagramChannel) {
+ writeFromUserCodeUdp(channel);
+ } else {
+ super.writeFromUserCode(channel);
+ }
+ }
+
+ @Override
+ protected void close(SelectionKey k) {
+ ReadDispatcher dispatcher = (ReadDispatcher) k.attachment();
+ AbstractNioChannel> ch = dispatcher.channel();
+ close(ch, succeededFuture(ch));
+ }
+
+ @Override
+ void writeFromSelectorLoop(final SelectionKey k) {
+ ReadDispatcher dispatcher = (ReadDispatcher) k.attachment();
+ AbstractNioChannel> ch = dispatcher.channel();
+ ch.writeSuspended = false;
+ write0(ch);
+ }
+
+ //
+ // ---------------- UDP worker -----------------------
+ //
+ private boolean readUdp(final SelectionKey key, NioDatagramChannel channel) {
+ ReceiveBufferSizePredictor predictor =
+ channel.getConfig().getReceiveBufferSizePredictor();
+ final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
+ final DatagramChannel nioChannel = (DatagramChannel) key.channel();
+ final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
+
+ final ByteBuffer byteBuffer = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
+
+ boolean failure = true;
+ SocketAddress remoteAddress = null;
+ try {
+ // Receive from the channel in a non blocking mode. We have already been notified that
+ // the channel is ready to receive.
+ remoteAddress = nioChannel.receive(byteBuffer);
+ failure = false;
+ } catch (ClosedChannelException e) {
+ // Can happen, and does not need a user attention.
+ } catch (Throwable t) {
+ fireExceptionCaught(channel, t);
+ }
+
+ if (remoteAddress != null) {
+ // Flip the buffer so that we can wrap it.
+ byteBuffer.flip();
+
+ int readBytes = byteBuffer.remaining();
+ if (readBytes > 0) {
+ // Update the predictor.
+ predictor.previousReceiveBufferSize(readBytes);
+
+ final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
+ buffer.setBytes(0, byteBuffer);
+ buffer.writerIndex(readBytes);
+
+ // Update the predictor.
+ predictor.previousReceiveBufferSize(readBytes);
+
+ // Notify the interested parties about the newly arrived message.
+ fireMessageReceived(
+ channel, buffer, remoteAddress);
+ }
+ }
+
+ if (failure) {
+ key.cancel(); // Some JDK implementations run into an infinite loop without this.
+ close(channel, succeededFuture(channel));
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * RegisterTask is a task responsible for registering a channel with a
+ * selector.
+ */
+ private final class UdpChannelRegistionTask implements Runnable {
+ private final NioDatagramChannel channel;
+
+ private final ChannelFuture future;
+
+ UdpChannelRegistionTask(final NioDatagramChannel channel,
+ final ChannelFuture future) {
+ this.channel = channel;
+ this.future = future;
+ }
+
+ /**
+ * This runnable's task. Does the actual registering by calling the
+ * underlying DatagramChannels peer DatagramSocket register method.
+ */
+ public void run() {
+ final SocketAddress localAddress = channel.getLocalAddress();
+ final SocketAddress remoteAddress = channel.getRemoteAddress();
+ if (localAddress == null) {
+ if (future != null) {
+ future.setFailure(new ClosedChannelException());
+ }
+ close(channel, succeededFuture(channel));
+ return;
+ }
+
+ try {
+ channel.getDatagramChannel().register(
+ selector, channel.getInternalInterestOps(), new UdpReadDispatcher(channel));
+
+ if (future != null) {
+ future.setSuccess();
+ }
+ // mina.netty change - similar to tcp, connected event is fired here instead
+ // in NioDatagramPipelineSink. This means NioDatagramChannelIoSession is
+ // created in the correct i/o thread
+ fireChannelConnected(channel, remoteAddress);
+ } catch (final IOException e) {
+ if (future != null) {
+ future.setFailure(e);
+ }
+ close(channel, succeededFuture(channel));
+
+ if (!(e instanceof ClosedChannelException)) {
+ throw new ChannelException(
+ "Failed to register a socket to the selector.", e);
+ }
+ }
+ }
+ }
+
+ void writeFromUserCodeUdp(final AbstractNioChannel> channel) {
+ assert channel instanceof NioDatagramChannel;
+
+ /*
+ * Note that we are not checking if the channel is connected. Connected
+ * has a different meaning in UDP and means that the channels socket is
+ * configured to only send and receive from a given remote peer.
+ */
+ if (!channel.isBound()) {
+ cleanUpWriteBuffer(channel);
+ return;
+ }
+
+ if (scheduleWriteIfNecessary(channel)) {
+ return;
+ }
+
+ // From here, we are sure Thread.currentThread() == workerThread.
+
+ if (channel.writeSuspended) {
+ return;
+ }
+
+ if (channel.inWriteNowLoop) {
+ return;
+ }
+
+ write0(channel);
+ }
+
+ private void write0Udp(final AbstractNioChannel> channel) {
+ assert channel instanceof NioDatagramChannel;
+
+ boolean addOpWrite = false;
+ boolean removeOpWrite = false;
+
+ long writtenBytes = 0;
+
+ final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
+ final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
+ final Queue writeBuffer = channel.writeBufferQueue;
+ final int writeSpinCount = channel.getConfig().getWriteSpinCount();
+ synchronized (channel.writeLock) {
+ // inform the channel that write is in-progress
+ channel.inWriteNowLoop = true;
+
+ // loop forever...
+ for (;;) {
+ MessageEvent evt = channel.currentWriteEvent;
+ SocketSendBufferPool.SendBuffer buf;
+ if (evt == null) {
+ if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
+ removeOpWrite = true;
+ channel.writeSuspended = false;
+ break;
+ }
+ // mina.netty change - similar to mina.netty's AbstractNioWorker, passing channel as parameter
+ channel.currentWriteBuffer = buf = sendBufferPool.acquire(channel, evt.getMessage());
+ } else {
+ buf = channel.currentWriteBuffer;
+ }
+
+ try {
+ long localWrittenBytes = 0;
+ SocketAddress raddr = evt.getRemoteAddress();
+ if (raddr == null) {
+ for (int i = writeSpinCount; i > 0; i --) {
+ localWrittenBytes = buf.transferTo(ch);
+ if (localWrittenBytes != 0) {
+ writtenBytes += localWrittenBytes;
+ break;
+ }
+ if (buf.finished()) {
+ break;
+ }
+ }
+ } else {
+ for (int i = writeSpinCount; i > 0; i --) {
+ localWrittenBytes = buf.transferTo(ch, raddr);
+ if (localWrittenBytes != 0) {
+ writtenBytes += localWrittenBytes;
+ break;
+ }
+ if (buf.finished()) {
+ break;
+ }
+ }
+ }
+
+ if (localWrittenBytes > 0 || buf.finished()) {
+ // Successful write - proceed to the next message.
+ buf.release();
+ ChannelFuture future = evt.getFuture();
+ channel.currentWriteEvent = null;
+ channel.currentWriteBuffer = null;
+ evt = null;
+ buf = null;
+ future.setSuccess();
+ } else {
+ // Not written at all - perhaps the kernel buffer is full.
+ addOpWrite = true;
+ channel.writeSuspended = true;
+ break;
+ }
+ } catch (final AsynchronousCloseException e) {
+ // Doesn't need a user attention - ignore.
+ } catch (final Throwable t) {
+ buf.release();
+ ChannelFuture future = evt.getFuture();
+ channel.currentWriteEvent = null;
+ channel.currentWriteBuffer = null;
+ // Mark the event object for garbage collection.
+ //noinspection UnusedAssignment
+ buf = null;
+ //noinspection UnusedAssignment
+ evt = null;
+ future.setFailure(t);
+ fireExceptionCaught(channel, t);
+ }
+ }
+ channel.inWriteNowLoop = false;
+
+ // Initially, the following block was executed after releasing
+ // the writeLock, but there was a race condition, and it has to be
+ // executed before releasing the writeLock:
+ //
+ // https://issues.jboss.org/browse/NETTY-410
+ //
+ if (addOpWrite) {
+ setOpWrite(channel);
+ } else if (removeOpWrite) {
+ clearOpWrite(channel);
+ }
+ }
+
+ fireWriteComplete(channel, writtenBytes);
+ }
+
+ interface ReadDispatcher {
+ AbstractNioChannel channel();
+ boolean dispatch(NioWorker worker, SelectionKey key);
+ }
+
+ static final class TcpReadDispatcher implements ReadDispatcher {
+
+ private final NioSocketChannel channel;
+
+ TcpReadDispatcher(NioSocketChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public AbstractNioChannel channel() {
+ return channel;
+ }
+
+ @Override
+ public boolean dispatch(NioWorker worker, SelectionKey key) {
+ return worker.readTcp(key, channel);
+ }
+ }
+
+ static final class UdpReadDispatcher implements ReadDispatcher {
+
+ private final NioDatagramChannel channel;
+
+ UdpReadDispatcher(NioDatagramChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public AbstractNioChannel channel() {
+ return channel;
+ }
+
+ @Override
+ public boolean dispatch(NioWorker worker, SelectionKey key) {
+ return worker.readUdp(key, channel);
+ }
+ }
+
}
diff --git a/mina.netty/src/main/java/org/kaazing/mina/netty/bootstrap/ConnectionlessServerBootstrap.java b/mina.netty/src/main/java/org/kaazing/mina/netty/bootstrap/ConnectionlessServerBootstrap.java
index 39df45d4b8..f574cb4004 100644
--- a/mina.netty/src/main/java/org/kaazing/mina/netty/bootstrap/ConnectionlessServerBootstrap.java
+++ b/mina.netty/src/main/java/org/kaazing/mina/netty/bootstrap/ConnectionlessServerBootstrap.java
@@ -29,19 +29,16 @@
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.AbstractNioWorker;
import org.jboss.netty.channel.socket.nio.NioChildDatagramChannel;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannel;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerDatagramChannelFactory;
import org.kaazing.mina.netty.IoAcceptorChannelHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
import static org.jboss.netty.channel.Channels.fireChannelConnected;
import static org.jboss.netty.channel.Channels.fireChannelOpen;
-import static org.jboss.netty.channel.Channels.fireMessageReceived;
import static org.jboss.netty.channel.Channels.pipeline;
class ConnectionlessServerBootstrap extends ConnectionlessBootstrap implements ServerBootstrap {
@@ -153,7 +150,7 @@ private NioChildDatagramChannel getChildChannel(Channel channel, SocketAddress r
}
ChannelFactory channelFactory = channel.getFactory();
- NioChildDatagramChannel childChannel = ((NioDatagramChannelFactory)channelFactory).newChildChannel(channel, childPipeline);
+ NioChildDatagramChannel childChannel = ((NioServerDatagramChannelFactory)channelFactory).newChildChannel(channel, childPipeline);
childChannel.setLocalAddress((InetSocketAddress) channel.getLocalAddress());
childChannel.setRemoteAddress((InetSocketAddress) remoteAddress);
fireChannelOpen(childChannel);
diff --git a/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoAcceptor.java b/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoAcceptor.java
index 85493f2305..6a242c032c 100644
--- a/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoAcceptor.java
+++ b/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoAcceptor.java
@@ -21,9 +21,8 @@
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioChildDatagramChannel;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannel;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.kaazing.mina.core.service.IoProcessorEx;
import org.kaazing.mina.netty.ChannelIoSession;
import org.kaazing.mina.netty.socket.DatagramChannelIoAcceptor;
@@ -37,11 +36,10 @@ public class NioDatagramChannelIoAcceptor extends DatagramChannelIoAcceptor {
"Kaazing", "NioDatagramChannel", true, true, InetSocketAddress.class,
DatagramSessionConfig.class, Object.class);
- public NioDatagramChannelIoAcceptor(DatagramChannelIoSessionConfig sessionConfig) {
- super(sessionConfig, new NioDatagramChannelFactory(), new SimpleChannelUpstreamHandler());
+ public NioDatagramChannelIoAcceptor(DatagramChannelIoSessionConfig sessionConfig, DatagramChannelFactory channelFactory) {
+ super(sessionConfig, channelFactory, new SimpleChannelUpstreamHandler());
}
-
@Override
public TransportMetadata getTransportMetadata() {
return NIO_DATAGRAM_TRANSPORT_METADATA;
diff --git a/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoConnector.java b/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoConnector.java
index 0c144d3f84..391144a061 100644
--- a/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoConnector.java
+++ b/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoConnector.java
@@ -21,6 +21,7 @@
import org.apache.mina.core.service.TransportMetadata;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelConfig;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannel;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
@@ -35,8 +36,8 @@ public class NioDatagramChannelIoConnector extends DatagramChannelIoConnector {
"Kaazing", "NioDatagramChannel", true, true, InetSocketAddress.class,
DatagramChannelIoSessionConfig.class, Object.class);
- public NioDatagramChannelIoConnector(DatagramChannelIoSessionConfig sessionConfig) {
- super(sessionConfig, new NioDatagramChannelFactory());
+ public NioDatagramChannelIoConnector(DatagramChannelIoSessionConfig sessionConfig, DatagramChannelFactory channelFactory) {
+ super(sessionConfig, channelFactory);
}
@Override
diff --git a/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoSession.java b/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoSession.java
index f23bc94b88..b31a17a16b 100644
--- a/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoSession.java
+++ b/mina.netty/src/main/java/org/kaazing/mina/netty/socket/nio/NioDatagramChannelIoSession.java
@@ -73,9 +73,7 @@ else if (isClosedReceived()) {
}
else {
AbstractNioWorker newWorker = ((WorkerExecutor) ioExecutor).worker;
- channel.getWorker().deregister(channel);
channel.setWorker(newWorker);
- newWorker.register(channel);
}
}
diff --git a/mina.netty/src/test/java/org/kaazing/mina/netty/NioDatagramChannelIoAcceptorIT.java b/mina.netty/src/test/java/org/kaazing/mina/netty/NioDatagramChannelIoAcceptorIT.java
index 94d6deedae..0fd920f9a3 100644
--- a/mina.netty/src/test/java/org/kaazing/mina/netty/NioDatagramChannelIoAcceptorIT.java
+++ b/mina.netty/src/test/java/org/kaazing/mina/netty/NioDatagramChannelIoAcceptorIT.java
@@ -15,6 +15,7 @@
*/
package org.kaazing.mina.netty;
+import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.kaazing.mina.netty.PortUtil.nextPort;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -32,6 +33,8 @@
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.logging.LoggingFilter;
+import org.jboss.netty.channel.socket.nio.NioServerDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -48,7 +51,6 @@
/**
* Integration test for mina.netty layer. Similar to IT, but for datagram transport.
*/
-@Ignore // Not yet working. gateway.server is still using Mina for UDP.
public class NioDatagramChannelIoAcceptorIT {
@Rule
@@ -61,7 +63,9 @@ public class NioDatagramChannelIoAcceptorIT {
public void initResources() throws Exception {
DatagramChannelIoSessionConfig sessionConfig = new DefaultDatagramChannelIoSessionConfig();
sessionConfig.setReuseAddress(true);
- acceptor = new NioDatagramChannelIoAcceptor(sessionConfig);
+ NioWorkerPool workerPool = new NioWorkerPool(newCachedThreadPool(), 4);
+ NioServerDatagramChannelFactory channelFactory = new NioServerDatagramChannelFactory(newCachedThreadPool(), 1, workerPool);
+ acceptor = new NioDatagramChannelIoAcceptor(sessionConfig, channelFactory);
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
socket = new DatagramSocket();
socket.setReuseAddress(true);
diff --git a/mina.netty/src/test/java/org/kaazing/mina/netty/NioDatagramChannelIoConnectorIT.java b/mina.netty/src/test/java/org/kaazing/mina/netty/NioDatagramChannelIoConnectorIT.java
index f4544d3e37..f8d01985d5 100644
--- a/mina.netty/src/test/java/org/kaazing/mina/netty/NioDatagramChannelIoConnectorIT.java
+++ b/mina.netty/src/test/java/org/kaazing/mina/netty/NioDatagramChannelIoConnectorIT.java
@@ -15,12 +15,15 @@
*/
package org.kaazing.mina.netty;
+import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.kaazing.mina.netty.PortUtil.nextPort;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -39,6 +42,9 @@
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.logging.LoggingFilter;
+import org.jboss.netty.channel.socket.nio.NioClientDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -63,7 +69,9 @@ public void initAcceptor() throws Exception {
executor = Executors.newFixedThreadPool(1);
acceptor = new ServerSocket();
acceptor.setReuseAddress(true);
- connector = new NioDatagramChannelIoConnector(new DefaultDatagramChannelIoSessionConfig());
+ NioWorkerPool workerPool = new NioWorkerPool(newCachedThreadPool(), 4);
+ NioClientDatagramChannelFactory channelFactory = new NioClientDatagramChannelFactory(workerPool);
+ connector = new NioDatagramChannelIoConnector(new DefaultDatagramChannelIoSessionConfig(), channelFactory);
connector.getFilterChain().addLast("logger", new LoggingFilter());
}
diff --git a/pom.xml b/pom.xml
index b8ad43ab1b..ea4a736f8f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,7 @@
1.8
1.8
1.9.4.8
- 3.0.0-alpha-41
+ 3.0.0-alpha-45
1.1
2.6.0
1.7.21
@@ -75,7 +75,6 @@
service/http.proxy
service/proxy
service/turn.rest
- service/turn.proxy
service/update.check
service/update.check.management
transport/spi
@@ -227,6 +226,12 @@
${k3po.version}
test