-
Notifications
You must be signed in to change notification settings - Fork 48
Kontraktor 4 Core
Kontraktor provides an abstraction layer to build distributed applications (SOA, microservices, client server, webapps, p2p) with little effort and high flexibility in Java and JavaScript.
The smallest distributable entity is an Actor (or eventloop, isolate, async service ..), made up by an asynchronous API. These actors can be distributed physically as needed using various networking "connectors" (tcp, websockets, http). Of course they can also run inside a single process providing a message passing concurrency paradigm instead of traditional multi threading.
As Kontraktor frees applications from networking and marshalling details, it provides a mighty abstraction with many use cases. In addition, message passing system designs also ease polyglott applications. Though java is primary target platform, there are modules to support interoperation with javascript (both browser and nodejs).
(This is a very non-academical explanation ..). An actor can be seen as a class which methods are not executed directly, but are queued into a "mailbox" queue instead. An event loop then reads messages from the mailbox and runs the actual code. It's very similar to node.js' main event loop. Note that I'll use "messages" and "methods" as a synonym here.
As there is plenty of information on the web regarding the Actor Model and Node's Event Loop I won't dig deeper here. The following insights should be present:
- as the message is executed delayed (its queued), the result of a computation is available "some time later", so it can only be made available to the caller asynchronously.
- Promises and Callbacks are used to map and handle asynchronous calls/results in a simple way. The code of a
Promise.then
handler or a callback is always executed in the thread of the calling actor (at least in kontraktor, some frameworks get this wrong).
Other approaches to implement Actors require the definition of "Message Classes" which then can be sent to an Actor. As this requires some boilerplate (at least in java), kontraktor takes a "regular" java class and generates a proxy object, which basically redirects all method calls to a queue (mailbox). The eventloop (or dispatcherthread) then takes the messages (in exact order) and calls the "real" implementation of the original actor class.
The proxy object ( 'self()' ) is thread safe and can be passed to any outer multithreaded code or other actors. Even more: the proxy object can be remoted transparently, either by explicitely "publishing" it on a network or implicitely by returning it from a remoted actor method. As kontraktor makes heavy use of (fast-)serialization, application code has not to deal with remoting details such as network protocol or en-/decoding.
Convention: all public methods of an 'Actor'-Class are made async and make up the actors message interface. Protected and private methods are executed synchronous. Because of this, all public methods can only return IPromise as they are executed async inside the actors main thread. If you think about it: As the actor pradigm binds state to an executing thread, this makes perfect sense to decouple public methods from the callers execution thread.
Remoting: instead of putting a message on the queue, it gets serialized, sent over the network and is put on the mailbox queue of the remote target actor. This way it does not make a difference wether one talks to a local or a remoted actor instance.
The following code defines and publishes a Greeting Service using WebSockets and JSON encoded messages:
public class Greeter extends Actor<Greeter> {
public IPromise<String> greet(String name, long duration ) {
Promise res = new Promise();
delayed(duration,() -> res.resolve("Hello "+name));
return res;
}
public static void main(String[] args) {
new WebSocketPublisher()
.facade(AsActor(Greeter.class))
.serType(SerializerType.FSTSer)
.urlPath("/")
.port(3999)
.publish();
}
}
A client:
public class PlainGreeterClient {
public static void main(String[] args) {
Greeter remote = (Greeter) new WebSocketConnectable()
.actorClass(Greeter.class)
.url("ws://localhost:3999")
.serType(SerializerType.FSTSer)
.connect( (x,y) -> System.out.println("disconnect " + x+" "+y)).await();
remote.greet("World", 1000)
.then( r -> System.out.println(r) );
}
}
This covers the basic idea of kontraktor: An Actor is defined by a class fulfilling some conventions. This actor then can be remoted (nonblocking, asynchronous) using various network transports and encodings (fast-serialization, json, ..).
Even without remoting/networking: By encapsulating threads, data and behaviour into a single unit, complexity of concurrency is reduced ("Shared nothing - message passing" instead of "Shared state - synchronzing").
1. Define an Actor
As the "real" actor implementation must not be exposed to the outer world (read "other threads"), all public methods are automatically transformed to asynchronous "actor messages". As asynchronous messages cannot return a result (because that's synchronous), the only allowed return type is "IPromise".
public class MyActor<T extends MyActor> extends Actor<T> {
// async Request-Response pattern using Promises
public IPromise<Long> whatTimeIsIt() {
return new Promise(System.currentTimeMillis());
}
// these are ordinary synchronous methods accessible from within the actor only
private String normalHelperMethod() { return "nothing special with me"; }
protected String normalHelperMethod1() { return "nothing special with me"; }
}
(Generics trickery in class definition just used to get proper return type of inherited self()
)
All public methods become async actor messages.
Don't define a constructor (use init() method instead).
only void or
IPromise
-returning methods are allowed public.
Overloading is not supported for public actor methods. There cannot be 2 public methods with same name (regardless of argument types and agument list length)
2. Run your actor
MyActor act = Actors.AsActor(MyActor.class); // act == act.self() - a proxy
act.init();
AsActor
constructs and returns a threadsafe ActorProxy (==self()) and starts the eventloop on a thread
dedicated to run this particular actor. It's possible to explictely pass a scheduler as a second argument
in order to run several actors on a single thread.
3. keep this
secret, use self()
instead
childActor.init( self() );
When passing references to the outer world (other actors or multithreaded code) always pass self() instead of this.
4. publish it on a network
new TCPNIOPublisher(act, 4001).publish();
There are more "connectors" in addition to TCP available in the kontraktor-http package (Http Longpoll, WebSockets).
5. connect it from another machine/process
ConnectableActor con = new TCPConnectable(MyActor.class,"localhost",4001);
MyActor act = (MyActor) con.connect( disconnectCallback );
6. stop it
act.stop();
This will stop an Actor. If the Thread running this actor is not assigned to other Actors, it will terminate.
actors cannot be stopped by a remote call (security)
Callbacks and Promises are used to organize asynchronous datatransfer, namely request-response style results and multiple results (streaming). Differences are
- a Promise is created and returned by the callee, a Callback is created by the caller and passed as (last!) argument to a method.
- a Promise can only receive exactly one result or error
- a Callback can receive multiple result objects (streaming)
Don't let convenience API and interfacing required for remoting confuse you, there are actually only 1,2 methods (all other methods are sugar):
The callback interface has one method (all other methods are convenience sugar):
void complete( result, error )
Example Actor Server:
public class CBSampleServer<T extends CBSampleServer> extends Actor<T> {
// Actor method using a Callback
public void withCallbackRaw(String arg, Callback callback) {
callback.complete("Hello", "CNT");
callback.complete(arg,"CNT");
callback.complete(null,null);
}
// run and publish
public static void main(String[] args) {
CBSampleServer actor = Actors.AsActor(CBSampleServer.class);
new TCPNIOPublisher().port(9090).serType(SerializerType.FSTSer)
.facade(actor)
.publish( discact -> System.out.println("a client disconnected "+discact));
}
}
if error has the special value 'CNT', this signals to the receiver (and the remoting machinery) there are more results to come.
using convenience methods this looks somewhat more readable:
public void withCallback(String arg, Callback callback) {
callback.pipe("Hello");
callback.pipe(arg);
callback.finish();
}
Client code:
public static void main(String[] args) {
CBSampleServer server = (CBSampleServer) new TCPConnectable()
.actorClass(CBSampleServer.class)
.serType(SerializerType.FSTSer).host("localhost").port(9090)
.connect((connector,actor) -> System.out.println("disconnected from server"))
.await();
server.withCallback("beep", (res,err) -> System.out.println("res:"+res+" err:"+err));
}
Callbacks can be seen as a kind of unidirected channel or stream between 2 actors (wether they are run by different threads or on different machines).
example:
Implementation inside Actor (server).
public void streamMyName( Callback<Character> channel ) {
myName.chars().forEach( ch -> channel.pipe( (char) ch) );
channel.finish();
}
client usage:
actor.streamMyName( (ch,err) -> {
if ( ! Actors.isFinal(err) ) // == 'CNT'.equals(err)
System.out.print(ch);
});
Remoting, Lifecycle
If callbacks are used inside a single process, GC takes care of cleanup. However in case of remoted actors, there is some bookkeeping required in order to map network message to their target actor/callback instance.
Bookkeeping for a callback is cleaned up as soon the 'error' value of 'complete(result,error)' does not equal 'CNT' (for continue). If values are emitted to a callback after that, the remote instance won't receive the value, as internal maps have been cleand up. As a callback is not bidirectional only the sending side (the server above) can close ('finish') a callback.
The JavaScript implementation does not support 'convenience' method, so only basic methods
complete(r,e)
, reject(.)
, resolve(.)
are available.
example server javascript implementation
const kontraktor = require("kontraktor-server");
const KPromise = kontraktor.KPromise;
const decodingHelper = new kontraktor.DecodingHelper();
class KServer {
withCallback(astring,callback) {
callback.complete("hello",'CNT');
callback.complete(astring,'CNT');
callback.complete(null,null);
}
}
const server = new kontraktor.KontraktorServer(new KServer(),{port:3998},decodingHelper);
and the client ..
const kontraktorClient = require("kontraktor-client");
const KClient = kontraktorClient.KClient;
const kclient = new KClient();
kclient.connect("ws://localhost:3998","WS").then( (remote,e) => {
remote.withCallback('its me', (r, e) => {
console.log("withCallback received ", r, e);
});
});
Note: kontraktor-js has some limitations as its primary target was to be able to connect a java server from the browser (for details on JavaScript see kontraktor-http docs chapter javascript).
There can only be one callback in an actor method signature and it has to be the last parameter
A Promise is a handle to a future result. Its similar to a "callback created at callee side".
Promises are used for asynchronous request-response communication patterns, so a Promise can receive exactly on result or error.
Note that Kontraktor's Promises work different compared to es6 promises.
- a promise is created using
ǹew Promise()
. Its 'unresolved', but can be returned immediately to a remote caller. Its used as a handle for a result which will be avaiable in the future. The caller can register a result handler using thethen( (result,error) -> {..} )
method of the Promise. - later on, the emmitter (callee) can 'resolve' the promise by calling
complete(result,error)
. This emits a message which then triggers thethen()
handler at caller side (might be remote or inside another actor's thread).
IPromise must be used to denote the type of a remoteable method. This is required as various redirectors and thread-switching wrappers are required internally for remoting and proper thread isolation.
Promise is the actual implementation. Though its not separated properly (proper typing would actually make things more complicated),
one set of methods is intended for use at caller side (then()
, onResult()
, ..) while others are typically used
on callee side (the method creating and emitting a Promise).
Similar to Callback, there is just one base method for use by the Promise-emiting side (callee)
void complete( result, error )
to resolve a promise. There are some convenience methods (resolve()
,reject()
, etc.) which internlly just call complete()
.
at the caller side the "mother-method" is
void then( Callback<T, Object> )
e.g.:
remoteActor.getData().then( (result,error) -> {
// process result/response/error
})
There are some convenience methods including chaining like onResult( .. ).onError(..)
, if result can be ignored (pure trigger): then( () -> {..})
etc. .
Its also possible to define a Timeout and and register a TimeoutHandler.
Example (service side):
this method creates an unresolved promise. A non-blocking network-request is started, then the promise is returned (still unressolved). Once the response to the network request comes in, the promise is finally resolved.
public IPromise<Boolean> findInUrl( URL url, String toFind ) {
Promise result = new Promise();
someAsyncApi.getURLData().then(
data -> result.resolve( data.indexOf(toFind) >= 0 )
).catchError( err -> result.reject(err) );
return result;
}
some examples for client side code:
actor.findInUrl( url, "Java" ).then( (res, err) -> ... );
or separate handlers for result and error
actor.findInUrl( url, "Java" ).onResult( res -> ... ).onError( err -> ... );
or return another promise and execute the chained one therafter:
actor.findInUrl( url, "Java" ).thenAnd( (res, err) -> {
...; return promise;
})
.onResult( resOfPromise -> .. )
.catchError( catchedErr -> .. );
or use a timeout
actor.findInUrl( url, "Java" ).timeoutIn(5000)
.then( res -> ... )
.onTimeout( to -> ... );
.catchError( err -> ... );
Promise.await
enters the event-poll loop and returns once the the promise is resolved. This means other messages probably modifying your Actor's state are processed meanwhile. Note that await
does not block the Actor's thread this way.
If await
is called from a a non-actor thread, the calling thread is blocked until the Promise
is resolved.
// non blocking await, errors will be transformed to exceptions
Boolean found = actor.findInUrl( url, "Java" ).await(5000);
This reduces the amount of nesting when using asynchronous API.
As await can only be implemented stack based (jvm limit), DO NOT USE AWAIT ON CRITICAL PATH (e.g. where you expect high load). You will run into stack overflows quickly.
As any actor communication takes place by message-passing, transparent remoting can be implemented by forwarding messages to a remote actors mailbox. This can be done using an arbitrary transport and encoding, so your application code does not depend on a specific transport, protocol or encoding used. On a high level, a distributed system's protocol behaviour is separated from low level details like physical transmission and message encoding.
This is modelled by
-
Publisher. A
Publisher
is capable to 'publish' an actor on a network. Only one 'root' actor (called facade actor) is published by a Publisher. More actors can be started and passed to remote clients. A Publisher detects references to actors passed and automatically manages creation of proxy objects and mapping of callbacks, promises and remoted actors. E.g. in a WebApp the facade actor might create and hand out a newly created 'SessionActor' for each client calling the "login" method with valid credials. - ConnectableActor. A Connectable is a complete description of a published actor (e.g. Transport: http, url: localhost:8887, actorClass: me.MyActor). E.g. a service in a microservice cluster might ask a service registry for a connectable of a particular service and then connect to it. This way a client can connect to services without prior knowledge about encoding and network transport used at deploy time.
-
Connector. A connector manages the client side of an actor-connecting client, its usually created implicitely by calling
connect
on aConnectableActor
instance.
The following transport options for publishing (=server) are available
- TCP (TCPPublisher, TCPNioPublisher)
- WebSockets (WebSocketPublisher via embedded undertow)
- Http (HttpPublisher, long poll for server push/bidirectional communication)
Encodings
The following encodings are available (they all directly support Java's Serializable and Externalizable interfaces, so zero to none extra work for application code):
- fast serialization binary (java to java only). Best performance. Can encode arbitrary complex object graphs.
- JsonNoRef (java to java, java to javascript). **Note: as this encoding does not support reference restauration (no cyclic structures), care must be taken on arguments of remoted methods.
- [deprecated] minbin binary json (java to java, java to javascript).
Code example of common Publishers:
HelloActor myService = AsActor(HelloActor.class);
// as websocket service fast serialialized
new WebSocketPublisher()
.facade(myService)
.hostName("localhost")
.urlPath("/hello")
.port(8080)
.serType(SerializerType.FSTSer)
.publish();
// as http (long poll) service, json encoding
new HttpPublisher()
.facade(myService)
.hostName("localhost")
.urlPath("/hellohttp")
.port(8080)
.serType(SerializerType.JsonNoRefPretty)
.publish();
// as tcp nio service, fast serialized
new TCPNIOPublisher()
.facade(myService)
.port(6789)
.serType(SerializerType.FSTSer)
.publish();
fixme: client examples
All implementors of ConnectableActor
can be used to connect to a remote actor ("facade").
ConnectableActor (org.nustaq.kontraktor.remoting.base)
TCPConnectable (org.nustaq.kontraktor.remoting.tcp)
LocalConnectable (org.nustaq.kontraktor.remoting.base)
HttpConnectable (org.nustaq.kontraktor.remoting.http)
WebSocketConnectable (org.nustaq.kontraktor.remoting.websockets)
todo: benchmarks encoding
Kontraktor is connection oriented, this means there is no central registry tracking location and availability of services/actors (one can build one on top ofc).
Its important to understand the lifecycle and mechanics passing references of a dynamically created actor to remote clients. Else one risks memory leaks and security holes.
- the 'facade' Actor is the initially published actor. It's the only (singleton) connectable central all clients connect to.
- a facade actor might schedule subactors and pass references to it to remote clients. Its possible to create a server side actor for each connecting client e.g. in order to cache data related to the client.
- a facade might also create other singleton actors (e.g. AdminAPIActor). This way its possible to provide services depending on client role/credentials.
- Actor-Remote mapping is bound to the physical connection, its impossible to gain access to ActorInstances published to other clients, so e.g. conditionally granting access to an AdminAPIActor is safe.
Let's take a Login-Based WebApp. The facade consists of a public API with only 2 methods:
- login(user,pwd) returning a "WebSessionActor" if successful
- register() managing the creation of valid client credentials
(simplified code based on a real app)
public class MyWebApp extends Actor<MyWebApp> {
public IPromise<WebSessionActor> login(String user, String pw, String jwt) {
Promise res = new Promise();
getCredentials(user,pw,jwt).then( (authres, err) -> {
if ( err != null ) {
res.reject(err);
} else {
WebSessionActor sess = AsActor(
WebSessionActor.class,
sessionThreads[(int) (Math.random()*sessionThreads.length)]
);
sess.init(self(),authres).then( (res,err) -> {
if ( err == null ) {
res.resolve(sess);
} else
res.reject(err);
});
}
});
return res;
}
@Local // not callable by remote clients
public IPromise<AdminAPIActor> getAdminAPI() {
...
}
}
If user and password are valid, a newly started WebSessionActor is scheduled and is returned (via promise) to the remote client. Note that there is a fixed set of Schedulers (~Threads) such that hundred or even thousands of session actors share one thread.
The remote reference passed is available only to the client who called the login method, its impossible to break this as the mapping of ID => ActorInstance is bound to the pysical connection (socket, websocket or http session).
We can hold client specific state in this WebSessionActor or (better) cache frequently required client related data.
The WebSessionActor implements the API used by a logged in client. Again: its impossible for any outside intruder to obtain access without having passed the login() authentication method.
public class WebSessionActor extends Actor<WebSessionActor> {
MyWebApp app; // initialized in init() above
// [....]
public IPromise<AdminAPIActor> getAdminAPI() {
if ( isAdmin() ) {
// return global adminapi singleton actor
return resolve(app.getAdminAPI()); // == return new Promise(app.getAdminAPI())
}
return reject("access denied"); // == return new Promise(null,"access denied")
}
}
public class AdminAPIActor extends Actor<WebSessionActor> {
// [.. administrative functionality ...]
}
Once the connection of a client closes, dynamically scheduled Actors are not stopped or GC'ed.
An application explicitely has to call stop()
on connection/client private actors.
Pure exposure like AdminAPIActor
in example above don't need any special cleanup, as the mapping
id => actorproxy is cleaned up automatically once the connection object is discarded.
todo:thread, encoding time, batching
Its very important to understand both scheduling and remoting only scan and transform
return value and arguments of an public actor method. E.g. if a Callback or Promise is passed as an argument from one
Actor to another, a wrapper will be created such that callback invocations done in a foreign thread will result in an event on
the callback queue of a sender (thread safe callbacks).
Similar, if the same call is done on a remote reference,
a remote forwarding wrapper will be generated such that invocation of a callback (or promise resolve) will
we transmitted over the network to the receiver.
These transformations are not applied reliably to embedded objects, only objects of a message-methods signature are managed by kontraktor (performance, memory management)
Example:
public class OtherActor extends Actor<OtherActor> {
public IPromise doSomething( Callback cb ) {
cb.stream("A"); cb.stream("B"); cb.finish();
}
}
public class SendingActor extends Actor<..> {
HashMap plainMap = new HashMap(); // <= This
Callback sampleCallback = (res,err) -> {
// some code <= This
};
otherActor.doSomething( sampleCallback ).then( result -> {
plainMap.put( "Yes", "you can" ); // and this run in the sender Actor's thread !
});
}
If OtherActor
and SendingActor
run on different machines, this will be transparent.
Note that thread safe callbacks/promises are a unique feature of kontraktor. Though it
might seem a mere detail, in practice this has big implications as accidental multithreading
does not immediately fail. In most cases, related errors and data corruption show up under load/scale out conditions.
Expensive production failures and long winded debug session put a big price tag on this one.
So now for the pitfall (embedded callback objects instead of putting them to method arguments:
public class PitFall implements Serializable {
public PitFall( Callback cb ) { .... }
Callback cb; // WRONG: this promise will be wrongly scheduled and not correctly remoted
}
public class OtherActor extends Actor<OtherActor> {
public void doSomething( Pitfall pitfall ) {
pitfall.cb.stream("I'll come in a foreign thread"); // wrong
pitfall.cb.stream("I'll come in a foreign thread"); // wrong
pitfall.cb.finish();
}
}
Workaround using @Callerside
Its possible to break down parameters containing embedded async primitives in case to provide a clean and simple interface, however this is advanced stuff and requires some understanding of kontraktor's internal mechanics.
Outline: Mark doSomething( new Pitfall )
as @CallersideMethod
so its executed synchronously/at remote.
Then decompose the Pitfall instance to plain kontraktor primitives of a helpter method e.g.
@CallersideMethod public void doSomething( Pitfall pitfall ) {
// executed at receiver side, now decompose to primitives and
// properly callback remote sender
__doSomething( pitfall.cb );
}
public void __doSomething( Callback pitfallCB) {
// executed at sender side with proper remote forwarding
}
Its possible to connect Java actors from JavaScript in the browser as well as nodejs. Its also possible to write services using javascript and connect them from remote.
see Kontraktor Http
If you have used or read about other actor systems, notice that Kontraktor has a different approach.
- Kontraktor favours a course grained Actor design. Its primary target is to partition applications into distributable/separatable async components. Think of a kontraktor Actor more like a Node.js instance or something like Dart's Isolates. You'll be astonished what can be done on a single thread with nonblocking programming/app design.
- Replace threads by actors (not classes).
- Frequently an app will consist of <10 Actors, often only 1.
- There are patterns where creation of many actors (sharing a handful of threads) makes sense:
- 'Sessions' in a server role actor having many clients (see HttpApp examples), each represented by an Actor instance.
- Simulation alike applications.
Reasoning:
- Asynchronous programs can be hard to debug and understand, so limit async programming to the most rewarding use cases (remote messaging, bulkheading, simplify complex multithreading).
- Fine grained Actor design creates significant performance overhead as many method calls are replaced by messages then.
- Kontraktor aims for a deterministic threading model (messages are not processed on a thread pool or similar machinery). This is less ambitious in favour of simplicity and easier reasoning. It has advantages, e.g. promise callbacks are executed on the sending actor's thread (low risk of accidental concurrency). One can "close over actor state" safely. Execution is mostly single threaded resulting in excellent serial performance, without negative drawbacks to scalability/concurrent processing. Scale out is then done by intent, not automatic.
A Spore
is a snippet of code sent to and processed by the receiver (remotely or inside the thread of another actor). This enables to "send code" to a remote actor, let it work on remote data and stream results back to the callee (=sender).
See examples for on how to use Spores.
Actor.current()
delivers the currently active actor. As the Actor class implements the Executor
interface, this can be used to execute code on the actor's thread. It throws an exception if called from outside actor code. Use Actor.inside()
to find out if running inside an Actor thread.
Actor.delayed(timeMs,Runnable)
executes given runnable delayed on the actor's thread.
Example below does not produce a stack overflow (runnable is put on actor mailbox like a message). Also the actor is not blocked. One can schedule thousands of parallel tasks on a single thread this way
public void parallelLoop() {
if ( ! isStopped() ) {
// do stuff each 500 ms
delayed( 500, () -> parallelLoop() );
}
}
DispatcherThread
Each Actor is assigned a thread ("DispatcherThread
"), however many actors can share one. The dispatcher thread polls events from the actor's mailbox + callback queue and executes them. So inside one actor, everything is single threaded.
Mailbox, Callback queue
Kontraktor internally manages 2 queues per Actor: mailbox and callback queue. The mailbox queues regular method calls on the public Actor interface, the callback queue (higher priority) queues callback's and promise.then handlers coming back from network or other actors. This way typical concurrency issues caused by foreign-thread-scheduled asynchronous replies is avoided.
(implementation inside an actor)
HashMap plainMap = new HashMap(); // <= This
otherActor.doSomething().then( result -> {
plainMap.put( "Yes", "you can" ); // and this run in the same thread !
});
Kontraktor manages the thread switching for built in Promise
and Callback
classes. With help of ìnThread()
, its possible to wrap any interface on-the-fly into a thread-switching callback.
This avoids the need to deal with tricky and error prone details of concurrent state modification and rules of memory visibility as defined in the JMM.
Actor.exec(Callable)
enables executing blocking operations (e.g. database queries) in a thread pool. The result of the callable will be delivered thread safe inside the actor's thread to the promise returned by 'exec'. This way the actor thread is not blocked.
Actor.stop()
stops execution of current actor. If there are no other actors scheduled on it, the associated thread will terminate.
Actor.tell( "messageName", ... args ), IPromise Actor.ask("messageName", ... args )
untyped messaging interface for actors.
Actor.serialOn( key, promise )
enables guaranteed order of processing by key. Useful to implement non-blocking transactional processing.
One can use actors (message passing/shared nothing) instead of multithreading/shared state by using actors to replace threads.
However, asynchronous programming can be hard to get used to, in addition most of current Java API / frameworks contain lots of blocking calls.
Kontraktor's async primitives (Promise, Callback) automatically detect wether they are running inside an actor DispatcherThread
or not.
E.g. Promise.await()
invoked inside an actor does "pseudo-block" instead of blocking execution. The current (Dispatcher-)thread enters the actor's eventloop, processing incoming messages while waiting (after each message processed, the promise state is checked).
If Promise.await()
is called from within an 'ordinary' java thread (no actor code), await will block the current thread until the promise result is available.
So in order to avoid Promise/Callback style in ordinary multithreaded java code, just use await()
to get de-facto synchronous calls.
Deal with blocking API (e.g. JDBC)
As you CANNOT block the actor's DispatcherThread (even if done rarely, it always fails ..), blocking calls need to get 'outsourced' to a thread pool. Use Promises/Callback's to stream back results 'into' the actors thread. The Actor.exec( Callable )
is an util to safely execute blocking API without blocking the actor's DispatcherThread
. Note there is Actors.MAX_EXTERNAL_THREADS_POOL_SIZE
(default 1000) defining the max. amount of threads available to run blocking calls.
example: Use exec()
to fetch an URL content using thread blocking API
public IPromise<String> get( final String url ) {
Promise<String> content = new Promise();
exec( () -> {
// here goes the blocking code snippet (runs in threadpool thread)
return new Scanner(new URL(url).openStream(), "UTF-8").useDelimiter("\\A").next();
}).then( (result, error) -> {
content.complete(result, error); // runs in actor thread
});
return content;
}
just as a note: this can be shortened:
public IPromise<String> get( final String url ) {
Promise<String> content = new Promise();
exec( () -> {
// here goes the blocking code snippet (read URL content)
return new Scanner(new URL(url).openStream(), "UTF-8").useDelimiter("\\A").next();
}).then( content ); // don't need manual forwarding code here
return content;
}
As kontraktor uses (fast-)serialization for marshalling/unmarshalling, distributed actor systems are version coupled: Each Node requires all classes involved in communication (sent inbetween remoted actors) in its classpath.
In order to update a single node only, care has to be taken not to change classes which are part of the remotable method signatures.
A simple solution is, to always update all nodes, this is even reuqired if remote lambda execution is used (Spore
s). As Spore
's
are an expert feature, usually its safe to update single nodes as long all classes part of public method arguments are unchanged.
A safe way to enable sngle node updates is to have a dedicated jar defining actor stubs and related classes which can be used by clients and subclassed
for the real implementation.
Examples can be found here and here.
When connecting a remote actor, connect
method should be given a callback which is fired on disconnect.
There is no catch all solution to recover from a disconnect. Because its undefined which messages actually have reached the remote actor, most of the time a stop actor + reset + reconnect will be the only available option.
Advanced mechanisms such as acknowledged message transfer have to be added at application level (e.g. sequence important messages or let them be acknowledged by the receiver).
SubActors can listen to disconnects by implementing the RemotedActor
interface:
e.g.:
public class MyHttpAppSession extends Actor<MyHttpAppSession> implements RemotedActor {
[...]
@Override
public void hasBeenUnpublished() {
app.clientClosed(self());
self().stop();
}
}
Creation of an actor immediately starts it. Actor.stop()
stops the actor. If there are no other Actors run by the associated DispatcherThread, the DispatcherThread will terminate.
todo: unpublish, no stop remote
One can assign a scheduler to an actor at creation time. Either by creating a scheduler explicitely like
SimpleScheduler sched = new SimpleScheduler(DefaultQSize);
MyActor act = Actors.AsActor(MyActor.class, sched);
MyActor act1 = Actors.AsActor(MyActor.class, sched);
act
,act1
will be run on the same thread.
Example for simple load balancing (random assignment) from a web app server:
public class MyServer extends Actor<MyServer> {
[...]
Scheduler clientThreads[];
public void init() {
clientThreads = new Scheduler[]{
new SimpleScheduler(CLIENT_QSIZE),
new SimpleScheduler(CLIENT_QSIZE),
new SimpleScheduler(CLIENT_QSIZE),
new SimpleScheduler(CLIENT_QSIZE),
};
Thread.currentThread().setName("MyServer Dispatcher");
}
public IPromise<MyHttpAppSession> login( String user, String pwd ) {
// create new session and assign it a random scheduler (~thread).
// Note that with async nonblocking style
// one thread will be sufficient most of the time.
// For computing intensive apps increase clientThreads to like 2-4.
// This means you need to delegate any blocking operation outside of an actors thread
int random = (int) (Math.random() * clientThreads.length);
MyHttpAppSession sess = AsActor(MyHttpAppSession.class,clientThreads[random]);
}
[...]
}
By default kontraktor logs (asynchronous) to System.out. You can set a global Logging
delegate using Log.Lg.setLogWrapper()
. As the kontraktor Logger is asynchronous (it's also an Actor),
sometimes its useful to turn this off as your preferred logging system is already asynchronous.
Call Log.SetSynchronous()
in order to avoid double queueing.