Skip to content

Commit

Permalink
3.11 fixed #17
Browse files Browse the repository at this point in the history
  • Loading branch information
RuedigerMoeller committed Oct 14, 2015
1 parent 33c0404 commit 360ea84
Show file tree
Hide file tree
Showing 23 changed files with 73 additions and 67 deletions.
4 changes: 2 additions & 2 deletions examples/http-ws-javascript/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor-http</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion examples/misc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor-http</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion examples/node2java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor-http</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion examples/non-shared-classpath/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion examples/non-shared-classpath/common-interface/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion examples/non-shared-classpath/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion examples/pub-sub/tcp-based/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor-http</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

</dependencies>
Expand Down
2 changes: 1 addition & 1 deletion modules/kontraktor-bare/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor-bare</artifactId>
<version>3.10</version>
<version>3.11</version>

<description>minimalistic http client to kontraktor apps</description>
<url>https://github.com/RuedigerMoeller/kontraktor</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public boolean alwaysCopy() {
@Override
public Object instantiate(Class objectClass, FSTObjectInput in, FSTClazzInfo serializationInfo, FSTClazzInfo.FSTFieldInfo referencee, int streamPositioin) throws Exception {
// fixme: detect local actors returned from foreign
int id = in.readInt();
long id = in.readLong();
// AtomicReference<ObjectSocket> chan = reg.getWriteObjectSocket();
// Callback cb = (Object result, Object error) -> {
// try {
Expand All @@ -62,8 +62,8 @@ public Object instantiate(Class objectClass, FSTObjectInput in, FSTClazzInfo ser

@Override
public void writeObject(FSTObjectOutput out, Object toWrite, FSTClazzInfo clzInfo, FSTClazzInfo.FSTFieldInfo referencedBy, int streamPosition) throws IOException {
int id = reg.registerCallback((Callback) toWrite); // register published host side
out.writeInt(id);
long id = reg.registerCallback((Callback) toWrite); // register published host side
out.writeLong(id);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -108,11 +109,11 @@ public HeaderElement[] getElements() throws ParseException {
/**
* callback id => promise or callback
*/
protected ConcurrentHashMap<Integer,Callback> callbackMap = new ConcurrentHashMap<>();
protected ConcurrentHashMap<Long,Callback> callbackMap = new ConcurrentHashMap<>();
/**
* used to generate unique ids for callbacks/promises/actors
*/
protected AtomicInteger idCount = new AtomicInteger(0);
protected AtomicLong idCount = new AtomicLong(0);
protected boolean requestUnderway = false; // avoid opening a second http connection
/**
* buffered requests to be sent, will be batched
Expand Down Expand Up @@ -358,7 +359,7 @@ public void run() {

protected void addRequest(RemoteCallEntry remoteCallEntry, Promise res) {
if ( res != null ) {
int key = registerCallback(res);
long key = registerCallback(res);
remoteCallEntry.futureKey = key;
openFutureRequests.incrementAndGet();
}
Expand Down Expand Up @@ -595,8 +596,8 @@ public void run() {
}
}

protected int registerCallback(Callback res) {
int key = idCount.incrementAndGet();
protected long registerCallback(Callback res) {
long key = idCount.incrementAndGet();
callbackMap.put(key, res );
return key;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
*/
public class RemoteCallEntry implements Serializable {

int receiverKey; // id of published actor in host, contains cbId in case of callbacks
int futureKey; // id of future if any
long receiverKey; // id of published actor in host, contains cbId in case of callbacks
long futureKey; // id of future if any
String method;
Object args[];
int queue;
Expand All @@ -39,11 +39,11 @@ public RemoteCallEntry(int receiverKey, int futureKey, String method, Object[] a
this.queue = queue;
}

public int getReceiverKey() {
public long getReceiverKey() {
return receiverKey;
}

public int getFutureKey() {
public long getFutureKey() {
return futureKey;
}

Expand Down
2 changes: 1 addition & 1 deletion modules/kontraktor-db/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

<dependency>
Expand Down
4 changes: 2 additions & 2 deletions modules/kontraktor-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor-http</artifactId>
<version>3.10.01</version>
<version>3.11</version>

<description>http connectivity for kontraktor</description>
<url>https://github.com/RuedigerMoeller/kontraktor</url>
Expand Down Expand Up @@ -99,7 +99,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion modules/kontraktor-http/src/main/javascript/js4k/js4k.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

// version 3.0.9
// version 3.11.0
// JavaScript to Kontraktor bridge
// matches kontraktor 3.0 json-no-ref encoded remoting
// as I am kind of a JS beginner, hints are welcome :)
Expand Down
2 changes: 1 addition & 1 deletion modules/kontraktor-reallive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

<dependency>
Expand Down
6 changes: 3 additions & 3 deletions modules/reactive-streams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor-reactive-streams</artifactId>
<version>3.10</version>
<version>3.11</version>

<description>http connectivity for kontraktor</description>
<url>https://github.com/RuedigerMoeller/kontraktor</url>
Expand Down Expand Up @@ -99,7 +99,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor</artifactId>
<version>3.10</version>
<version>3.11</version>
</dependency>

<dependency>
Expand All @@ -115,7 +115,7 @@
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor-http</artifactId>
<version>3.10</version>
<version>3.11</version>
<scope>test</scope>
</dependency>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>de.ruedigermoeller</groupId>
<artifactId>kontraktor</artifactId>
<version>3.10</version>
<version>3.11</version>

<description>a light weight, efficient actor lib</description>
<url>https://github.com/RuedigerMoeller/kontraktor</url>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/nustaq/kontraktor/Actor.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static boolean inside() {
public Scheduler __scheduler;
public volatile boolean __stopped;
public Actor __self; // the proxy object
public int __remoteId; // id in case this actor is published via network
public long __remoteId; // id in case this actor is published via network
public boolean __throwExAtBlock; // if true, trying to send a message to full queue will throw an exception instead of blocking
public volatile ConcurrentLinkedQueue<RemoteConnection> __connections; // a list of connections required to be notified on close (publisher/server side))
public RemoteConnection __clientConnection; // remoteconnection this in case of remote ref
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/nustaq/kontraktor/RemoteConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface RemoteConnection {
*/
void close();
void setClassLoader( ClassLoader l );
int getRemoteId( Actor act );
long getRemoteId(Actor act);

/**
* unpublishes this actor by removing mappings and stuff. Does not actively close the underlying connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Kontraktor Copyright (c) Ruediger Moeller, All rights reserved.
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -63,12 +63,12 @@ public static void registerDefaultClassMappings(FSTConfiguration conf) {
protected FSTConfiguration conf;
protected RemoteScheduler scheduler = new RemoteScheduler(); // unstarted thread dummy
// holds published actors, futures and callbacks of this process
protected AtomicInteger actorIdCount = new AtomicInteger(0);
protected ConcurrentHashMap<Integer, Object> publishedActorMapping = new ConcurrentHashMap<>();
protected ConcurrentHashMap<Object, Integer> publishedActorMappingReverse = new ConcurrentHashMap<>();
protected AtomicLong actorIdCount = new AtomicLong(0);
protected ConcurrentHashMap<Long, Object> publishedActorMapping = new ConcurrentHashMap<>();
protected ConcurrentHashMap<Object, Long> publishedActorMappingReverse = new ConcurrentHashMap<>();
// have disabled dispacther thread
protected ConcurrentLinkedQueue<Actor> remoteActors = new ConcurrentLinkedQueue<>();
protected ConcurrentHashMap<Integer,Actor> remoteActorSet = new ConcurrentHashMap<>();
protected ConcurrentHashMap<Long,Actor> remoteActorSet = new ConcurrentHashMap<>();
protected volatile boolean terminated = false;
protected BiFunction<Actor,String,Boolean> remoteCallInterceptor =
(actor,methodName) -> {
Expand Down Expand Up @@ -114,11 +114,11 @@ protected void configureSerialization(Coding code) {
conf.registerSerializer(Timeout.class, new TimeoutSerializer(), false);
}

public Actor getPublishedActor(int id) {
public Actor getPublishedActor(long id) {
return (Actor) publishedActorMapping.get(id);
}

public Callback getPublishedCallback(int id) {
public Callback getPublishedCallback(long id) {
return (Callback) publishedActorMapping.get(id);
}

Expand All @@ -138,16 +138,21 @@ public void setTerminated(boolean terminated) {
this.terminated = terminated;
}

public int publishActor(Actor act) {
Integer integer = publishedActorMappingReverse.get(act.getActorRef());
public long publishActor(Actor act) {
Long integer = publishedActorMappingReverse.get(act.getActorRef());
if ( integer == null ) {
integer = actorIdCount.incrementAndGet();
integer = newActId();
publishActorDirect(integer, act);
}
return integer;
}

private void publishActorDirect(Integer integer, Actor act) {
private long newActId() {
long id = actorIdCount.incrementAndGet();
return id;
}

private void publishActorDirect(Long integer, Actor act) {
publishedActorMapping.put(integer, act.getActorRef());
publishedActorMappingReverse.put(act.getActorRef(), integer);
act.__addRemoteConnection(this);
Expand All @@ -160,7 +165,7 @@ private void publishActorDirect(Integer integer, Actor act) {
*
*/
public void unpublishActor(Actor act) {
Integer integer = publishedActorMappingReverse.get(act.getActorRef());
Long integer = publishedActorMappingReverse.get(act.getActorRef());
if ( integer != null ) {
Log.Debug(this, ""+act.getClass().getSimpleName()+" unpublished");
publishedActorMapping.remove(integer);
Expand All @@ -172,17 +177,17 @@ public void unpublishActor(Actor act) {
}
}

public int registerPublishedCallback(Callback cb) {
Integer integer = publishedActorMappingReverse.get(cb);
public long registerPublishedCallback(Callback cb) {
Long integer = publishedActorMappingReverse.get(cb);
if ( integer == null ) {
integer = actorIdCount.incrementAndGet();
integer = newActId();
publishedActorMapping.put(integer, cb);
publishedActorMappingReverse.put(cb, integer);
}
return integer;
}

public void removePublishedObject(int receiverKey) {
public void removePublishedObject(long receiverKey) {
Object remove = publishedActorMapping.remove(receiverKey);
if ( remove != null ) {
publishedActorMappingReverse.remove(remove);
Expand All @@ -199,7 +204,7 @@ public void registerRemoteRefDirect(Actor act) {
});
}

public Actor registerRemoteActorRef(Class actorClazz, int remoteId, Object client) {
public Actor registerRemoteActorRef(Class actorClazz, long remoteId, Object client) {
Actor actorRef = remoteActorSet.get(remoteId);
if ( actorRef == null ) {
Actor res = Actors.AsActor(actorClazz, getScheduler());
Expand Down Expand Up @@ -393,7 +398,7 @@ protected void writeObject(ObjectSocket chan, RemoteCallEntry rce) throws Except
}
}

public void receiveCBResult(ObjectSocket chan, int id, Object result, Object error) throws Exception {
public void receiveCBResult(ObjectSocket chan, long id, Object result, Object error) throws Exception {
if (facadeActor!=null) {
Thread debug = facadeActor.getCurrentDispatcher();
if ( Thread.currentThread() != facadeActor.getCurrentDispatcher() ) {
Expand Down Expand Up @@ -441,8 +446,8 @@ public void setClassLoader(ClassLoader l) {
conf.setClassLoader(l);
}

public int getRemoteId(Actor act) {
Integer integer = publishedActorMappingReverse.get(act.getActorRef());
public long getRemoteId(Actor act) {
Long integer = publishedActorMappingReverse.get(act.getActorRef());
return integer == null ? -1 : integer;
}

Expand Down Expand Up @@ -475,7 +480,7 @@ public boolean pollAndSend2Remote(AtomicReference<ObjectSocket> chanHolder) thro
if ( ce.getMethod().getName().equals("asyncstop") ) {
Log.Lg.error(this, null, "cannot stop remote actors" );
} else {
int futId = 0;
long futId = 0;
if (ce.hasFutureResult()) {
futId = registerPublishedCallback(ce.getFutureCB());
}
Expand Down
Loading

0 comments on commit 360ea84

Please sign in to comment.