Skip to content

Commit

Permalink
Reverting to previous code where child future is fired after actual w…
Browse files Browse the repository at this point in the history
…rite on parent (#820)

Reverting to previous change where child future is fired after actual
write happens on parent channel. Otherwise, DownstreamMessageEventEx
gets reused by ChannelIoProcessor and corrupts the buffers that are
not yet written to parent channel.

* Reverting to previous change where child future is fired after actual
write happens on parent channel. Otherwise, DownstreamMessageEventEx
gets reused by ChannelIoProcessor and corrupts the buffers that are
not yet written to parent channel.

* Change k3po dependency to alpha-58
  • Loading branch information
jitsni authored and Chris Barrow committed Dec 14, 2016
1 parent d6e9e3f commit 0dc3bda
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,25 @@ public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
break;
}
} else if (e instanceof MessageEvent) {
// Making sure that child channel WriteFuture is fired on child channel's worker thread
final MessageEvent childMessageEvent = (MessageEvent) e;
ParentMessageEvent parentMessageEvent = new ParentMessageEvent(childMessageEvent);
ChannelFuture parentFuture = parentMessageEvent.getFuture();
parentFuture.addListener(f -> {
childChannel.getWorker().executeInIoThread(() -> {
if (f.isSuccess()) {
childFuture.setSuccess();
} else {
childFuture.setFailure(f.getCause());
}
});
});

// Write to parent channel
NioDatagramChannel parentChannel = (NioDatagramChannel) childChannel.getParent();
boolean offered = parentChannel.writeBufferQueue.offer(parentMessageEvent);
assert offered;
parentChannel.worker.writeFromUserCode(parentChannel);

// No need to propagate parentMessageEvent.getFuture() to childFuture
// as UDP is unreliable.
childFuture.setSuccess();
}
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<maven.compiler.testSource>1.8</maven.compiler.testSource>
<maven.compiler.testTarget>1.8</maven.compiler.testTarget>
<hazelcast.version>1.9.4.8</hazelcast.version>
<k3po.version>3.0.0-alpha-56</k3po.version>
<k3po.version>3.0.0-alpha-58</k3po.version>
<jdom.version>1.1</jdom.version>
<jmock.version>2.6.0</jmock.version>
<slf4j.log4j.version>1.7.21</slf4j.log4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import static org.kaazing.test.util.ITUtil.createRuleChain;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.future.CloseFuture;
Expand Down Expand Up @@ -243,6 +246,33 @@ protected void doMessageReceived(IoSessionEx session, Object message) {
k3po.finish();
}

@Test
@Specification("concurrent.writes.together/client")
public void concurrentWritesTogether() throws Exception {
class ConcurrentHandler extends IoHandlerAdapter<IoSessionEx> {

@Override
protected void doMessageReceived(IoSessionEx session, Object message) {
AtomicInteger counter = (AtomicInteger) session.getAttribute("test-counter");
List<Object> messages = (List<Object>) session.getAttribute("test-messages");
if (counter == null) {
counter = new AtomicInteger();
messages = new ArrayList<>();
session.setAttribute("test-counter", counter);
session.setAttribute("test-messages", messages);
}
int noreads = counter.incrementAndGet();
messages.add(message);
if (noreads == 3) {
messages.forEach(session::write);
}
}
};

bindTo8080(new ConcurrentHandler());
k3po.finish();
}

@Test
@Specification("idle.concurrent.connections/client")
public void idleConcurrentConnections() throws Exception {
Expand Down

0 comments on commit 0dc3bda

Please sign in to comment.