Skip to content

flex-seongbok/nats.java

 
 

Repository files navigation

NATS

NATS - Java Client

A Java client for the NATS messaging system.

Current Release: 2.17.1   Current Snapshot: 2.17.2-SNAPSHOT

License Apache 2 Maven Central javadoc Coverage Status Build Main Badge Release Badge

Check out NATS by example - An evolving collection of runnable, cross-client reference examples for NATS.

Simplification

There is a new simplified api that makes working with streams and consumers well, simpler! Simplification is released as of 2.16.14.

Check out the examples:

Service Framework

The service API allows you to easily build NATS services. The Service Framework is released as of 2.16.14

The Services Framework introduces a higher-level API for implementing services with NATS. NATS has always been a strong technology on which to build services, as they are easy to write, are location and DNS independent and can be scaled up or down by simply adding or removing instances of the service.

The Services Framework further streamlines their development by providing observability and standardization. The Service Framework allows your services to be discovered, queried for status and schema information without additional work.

Check out the ServiceExample

Versions Specific Notes

This is version 2.x of the java-nats library. This version is a ground up rewrite of the original library. Part of the goal of this re-write was to address the excessive use of threads, we created a Dispatcher construct to allow applications to control thread creation more intentionally. This version also removes all non-JDK runtime dependencies.

The API is simple to use and highly performant.

Version 2+ uses a simplified versioning scheme. Any issues will be fixed in the incremental version number. As a major release, the major version has been updated to 2 to allow clients to limit there use of this new API. With the addition of drain() we updated to 2.1, NKey support moved us to 2.2.

The NATS server renamed itself from gnatsd to nats-server around 2.4.4. This and other files try to use the new names, but some underlying code may change over several versions. If you are building yourself, please keep an eye out for issues and report them.

Version 2.5.0 adds some back pressure to publish calls to alleviate issues when there is a slow network. This may alter performance characteristics of publishing apps, although the total performance is equivalent.

Previous versions are still available in the repo.

Version 2.17.1 Support for TLS First

There is a new connection Option, tlsFirst for "TLSHandshakeFirst"

In Server 2.10.3 and later, there is the ability to have TLS Handshake First. The server config will add this:

tls {
  ...
  handshake_first: 300ms
}

TLS Handshake First is used to instruct the library perform the TLS handshake right after the connect and before receiving the INFO protocol from the server. If this option is enabled but the server is not configured to perform the TLS handshake first, the connection will fail.

Version 2.17.0: Server 2.10 support. Subject and Queue Name Validation

With the release of the 2.10 server, the client has been updated to support new features. The most important new feature is the ability to have multipler filter subjects for any single JetStream consumer.

ConsumerConfiguration cc = ConsumerConfiguration.builder()
    ...
    .filterSubjects("subject1", "subject2")
    .build();

For subjects, up until now, the client has been very strict when validating subject names for consumer subject filters and subscriptions. It only allowed printable ascii characters except for *, >, ., \\ and /. This restriction has been changed to the following:

  • cannot contain spaces \r \n \t
  • cannot start or end with subject token delimiter .
  • cannot have empty segments

This means that UTF characters are now allowed in this client.

For queue names, there has been inconsistent validation, if any. Queue names now require the same validation as subjects. Important We realize this may affect existing applications, but need to require consistency across clients

Version 2.16.14: Options properties improvements

In this release, support was added to

  • support properties keys with or without the prefix 'io.nats.client.'
  • allow creation of connections requiring an AuthHandler for JWT to specify the credentials file in a properties files, instead of needing to provide an instance of AuthHandler in code.
  • allow creation of connections requiring an SSL context to specify key and trust store information in a properties files so an SSLContext can be created automatically instead of needing to provide an instance of an SSLContext in code.

For details on the other features, see the "Options" sections

Version 2.16.8: Websocket Support

As of version 2.16.8 Websocket (ws and wss) protocols are supported for connecting to the server. For instance, your server bootstrap url might be ws://my-nats-host:80 or wss://my-nats-host:443.

Your server must be properly configured for websocket, see the NATS.IO docs WebSocket Configuration Example for more information.

If you use secure websockets (wss), your connection must be securely configured in the same way you would configure a tls connection.

Version 2.16.0: Consumer Create

This release by default will use a new JetStream consumer create API when interacting with nats-server version 2.9.0 or higher. This changes the subjects used by the client to create consumers, which might in some cases require changes in access and import/export configuration. The developer can opt out of using this feature by using a custom JetStreamOptions and using it when creating JetStream, Key Value and Object Store regular and management contexts.

JetStreamOptions jso = JetStreamOptions.builder().optOut290ConsumerCreate(true).build();

JetStream js = connection.jetStream(jso);
JetStreamManagement jsm = connection.jetStreamManagement(jso);
KeyValue kv = connection.keyValue("bucket", KeyValueOptions.builder(jso).build());
KeyValueManagement kvm = connection.keyValueManagement(KeyValueOptions.builder(jso).build());
ObjectStore os = connection.objectStore("bucket", ObjectStoreOptions.builder(jso).build());
ObjectStoreManagement osm = connection.objectStoreManagement(ObjectStoreOptions.builder(jso).build());

Options

Properties with or without prefix...

The io.nats.client. prefix is not required in the properties file anymore. These are now equivalent:

io.nats.client.servers=nats://localhost:4222
servers=nats://localhost:4222

Last one wins

The Options builder allows you to use both properties and code. When it comes to the builder, the last one called wins. This applies to each individual property.

props.setProperty(Options.PROP_MAX_MESSAGES_IN_OUTGOING_QUEUE, "7000");

o = new Options.Builder()
   .properties(props)
   .maxMessagesInOutgoingQueue(6000)
   .build();
assertEquals(6000, o.getMaxMessagesInOutgoingQueue());

o = new Options.Builder()
   .maxMessagesInOutgoingQueue(6000)
   .properties(props)
   .build();
assertEquals(7000, o.getMaxMessagesInOutgoingQueue());

o = new Options.Builder()
    .maxMessagesInOutgoingQueue(6000)
    .maxMessagesInOutgoingQueue(8000)
    .build();
assertEquals(8000, o.getMaxMessagesInOutgoingQueue());

AuthHandler / JWT

In previous versions the user would have to manually create the AuthHandler and set it in the options

AuthHandler ah = Nats.credentials("path/to/my.creds");
Options options = new Options.Builder()
    .authHandler(ah)
    .build();

The developer can now set the file path directly and an AuthHandler will be created:

Options options = new Options.Builder()
    .credentialPath("path/to/my.creds")
    .build();

The developer can also set the credential path in a properties file:

io.nats.client.credential.path=path/to/my.creds

Options - SSLContext

The Options builder has several options set use or affect creation of an SSLContext

// Provide the SSLContext
public Builder sslContext(SSLContext ctx)

// Generic SSLContext Creation
public Builder secure()
public Builder opentls()

// Custom SSLContext Creation Properties
public Builder keystore(String keystore)
public Builder keystorePassword(char[] keystorePassword)
public Builder truststore(String truststore)
public Builder truststorePassword(char[] truststorePassword)
public Builder tlsAlgorithm(String tlsAlgorithm)

There are equivalent properties for these builder methods (except sslContext):

# Generic SSLContext Creation
io.nats.client.secure=true
io.nats.client.opentls=true

# Custom SSLContext Creation Properties
io.nats.client.keyStore=path/to/keystore.jks
io.nats.client.keyStorePassword=kspassword
io.nats.client.trustStore=path/to/truststore.jks
io.nats.client.trustStorePassword=tspassword
io.nats.client.tls.algorithm=SunX509

When options are built, the SSLContext will be accepted or created in the following order.

  1. If it's directly provided via the builder sslContext(SSLContext ctx) method.
  2. If keyStore is provided, an SSLContext will be created using all custom properties. If not supplied, the tls algorithm is SunX509
  3. If opentls is true or any of the bootstrap servers has opentls as their scheme, a generic SSLContext will be created that "trusts all certs".
  4. If secure is true or any of the bootstrap servers has tls or wss as their scheme, the javax.net.ssl.SSLContext.getDefault() will be used.

Options - Properties

Name Description
io.nats.client.callback.connection Property used to configure a connectionListener.
io.nats.client.dataport.type Property used to configure a dataPortType.
io.nats.client.callback.error Property used to configure an errorListener.
io.nats.client.statisticscollector Property used to configure the statisticsCollector.
io.nats.client.maxpings Property used to configure maxPingsOut.
io.nats.client.pinginterval Property used to configure pingInterval.
io.nats.client.cleanupinterval Property used to configure requestCleanupInterval.
io.nats.client.timeout Property used to configure connectionTimeout.
io.nats.client.reconnect.buffer.size Property used to configure reconnectBufferSize.
io.nats.client.reconnect.wait Property used to configure reconnectWait.
io.nats.client.reconnect.max Property used to configure maxReconnects.
io.nats.client.reconnect.jitter Property used to configure reconnectJitter.
io.nats.client.reconnect.jitter.tls Property used to configure reconnectJitterTls.
io.nats.client.pedantic Property used to configure pedantic.
io.nats.client.verbose Property used to configure verbose.
io.nats.client.noecho Property used to configure noEcho.
io.nats.client.noheaders Property used to configure noHeaders.
io.nats.client.name Property used to configure connectionName.
io.nats.client.nonoresponders Property used to configure noNoResponders.
io.nats.client.norandomize Property used to configure noRandomize.
io.nats.client.noResolveHostnames Property used to configure noResolveHostnames.
io.nats.client.reportNoResponders Property used to configure reportNoResponders.
io.nats.clientsidelimitchecks Property use to configure clientsidelimitchecks.
io.nats.client.url Property used to configure server. The value can be a comma-separated list of server URLs.
io.nats.client.servers Property used to configure servers. The value can be a comma-separated list of server URLs.
io.nats.client.password Property used to configure userinfo password.
io.nats.client.username Property used to configure userinfo username.
io.nats.client.token Property used to configure token.
io.nats.client.secure See notes above on ssl configruration.
io.nats.client.opentls See notes above on ssl configruration.
io.nats.client.outgoingqueue.maxmessages Property used to configure maxMessagesInOutgoingQueue.
io.nats.client.outgoingqueue.discardwhenfull Property used to configure discardMessagesWhenOutgoingQueueFull.
use.old.request.style Property used to configure oldRequestStyle.
max.control.line Property used to configure maxControlLine.
inbox.prefix Property used to set the inbox prefix
ignore.discovered.servers Preferred property used to set whether to ignore discovered servers when connecting.
servers.pool.implementation.class Preferred property used to set class name for ServerPool implementation.
io.nats.client.keyStore Property for the keystore path used to create an SSLContext
io.nats.client.keyStorePassword Property for the keystore password used to create an SSLContext
io.nats.client.trustStore Property for the truststore path used to create an SSLContext
io.nats.client.trustStorePassword Property for the truststore password used to create an SSLContext
io.nats.client.tls.algorithm Property for the algorithm used to create an SSLContext
io.nats.client.credential.path Property used to set the path to a credentials file to be used in a FileAuthHandler

SSL/TLS Performance

After recent tests we realized that TLS performance is lower than we would like. After researching the problem and possible solutions we came to a few conclusions:

  • TLS performance for the native JDK has not been historically great
  • TLS performance is better in JDK12 than JDK8
  • A small fix to the library in 2.5.1 allows the use of https://github.com/google/conscrypt and https://github.com/wildfly/wildfly-openssl, conscrypt provides the best performance in our tests
  • TLS still comes at a price (1gb/s vs 4gb/s in some tests), but using the JNI libraries can result in a 10x boost in our testing
  • If TLS performance is reasonable for your application we recommend using the j2se implementation for simplicity

To use conscrypt or wildfly, you will need to add the appropriate jars to your class path and create an SSL context manually. This context can be passed to the Options used when creating a connection. The NATSAutoBench example provides a conscrypt flag which can be used to try out the library, manually including the jar is required.

OCSP Stapling

The server supports OCSP stapling. To enable Java to automatically check the stapling when making TLS connections, you must set system properties. Please be aware that this affect the entire JVM, so all connections.

These properties can be set from your command line or from your Java code:

System.setProperty("jdk.tls.client.enableStatusRequestExtension", "true");
System.setProperty("com.sun.net.ssl.checkRevocation", "true");

For more information, see the Oracle Java documentation page on Client-Driven OCSP and OCSP Stapling

Also, there is a detailed OCSP Example that shows how to create SSL contexts enabling OCSP stapling.

Subject Validation

The current version of this client supports subjects with ASCII printable characters and wildcards when subscribing.

JetStream Subscribe Subject

With the introduction of simplification, the various original subscribe methods available will eventually be deprecated. But since they are available, we need to address the concept of the subscribe subject.

All the subscribe methods take a "subject" as the first parameter. We call this the subscribe subject. The subject could be something like my.subject or my.star.* or my.gt.> or even >. This parameter is used and validated in different ways depending on the context of the call, including looking up the stream, if stream is not provided via subscribe options.

The subscribe subject could be used to make a simple subscription. In this case it is required. An ephemeral consumer will be created for that subject, assuming that subject can be looked up in a stream.

JetStream js = nc.jetStream();
JetStreamSubscription sub = subscribe(subject)

If subscribe call has either a PushSubscribeOptions or PullSubscribeOptions that have a ConsumerConfiguration with one or more filter subjects, the subscribe subject is optional since we can use the first filter subject as the subscribe subject.

PushSubscribeOptions pso = ConsumerConfiguration.builder().filterSubject("my.subject").buildPushSubscribeOptions();
js.subscribe(null, pso);

The other time you can skip the subject parameter is when you "bind". Since the stream name and consumer name are part of the bind, the subject will be retrieved from the consumer looked-up via the bind stream and consumer name information.

On-the-fly Subscribes

On the fly subscribes rely on

NKey-based Challenge Response Authentication

The NATS server is adding support for a challenge response authentication scheme based on NKeys. Version 2.2.0 of the Java client supports this scheme via an AuthHandler interface. Version 2.3.0 replaced several NKey methods that used strings with methods using char[] to improve security.

Installation

The java-nats client is provided in a single jar file, with a single external dependency for the encryption in NKey support. See Building From Source for details on building the library. Replace {major.minor.patch} with the correct version in the examples.

Downloading the Jar

You can download the latest jar at https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.17.1/jnats-2.17.1.jar.

The examples are available at https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.17.1/jnats-2.17.1-examples.jar.

To use NKeys, you will need the ed25519 library, which can be downloaded at https://repo1.maven.org/maven2/net/i2p/crypto/eddsa/0.3.0/eddsa-0.3.0.jar.

Using Gradle

The NATS client is available in the Maven central repository, and can be imported as a standard dependency in your build.gradle file:

dependencies {
    implementation 'io.nats:jnats:{major.minor.patch}'
}

If you need the latest and greatest before Maven central updates, you can use:

repositories {
    mavenCentral()
    maven {
        url "https://oss.sonatype.org/content/repositories/releases"
    }
}

If you need a snapshot version, you must add the url for the snapshots and change your dependency.

repositories {
    mavenCentral()
    maven {
        url "https://oss.sonatype.org/content/repositories/snapshots"
    }
}

dependencies {
   implementation 'io.nats:jnats:{major.minor.patch}-SNAPSHOT'
}

Using Maven

The NATS client is available on the Maven central repository, and can be imported as a normal dependency in your pom.xml file:

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>{major.minor.patch}</version>
</dependency>

If you need the absolute latest, before it propagates to maven central, you can use the repository:

<repositories>
    <repository>
        <id>sonatype releases</id>
        <url>https://oss.sonatype.org/content/repositories/releases</url>
        <releases>
           <enabled>true</enabled>
        </releases>
    </repository>
</repositories>

If you need a snapshot version, you must enable snapshots and change your dependency.

<repositories>
    <repository>
        <id>sonatype snapshots</id>
        <url>https://oss.sonatype.org/content/repositories/snapshots</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>{major.minor.patch}-SNAPSHOT</version>
</dependency>

If you are using the 1.x version of java-nats and don't want to upgrade to 2.0.0 please use ranges in your POM file, java-nats-streaming 1.x is using [1.1, 1.9.9) for this.

Basic Usage

Sending and receiving with NATS is as simple as connecting to the nats-server and publishing or subscribing for messages.

Please see the examples in this project. The Examples Readme is a good place to start.

There are also examples in the java-nats-examples repo.

Connecting

There are five different ways to connect using the Java library, each with a parallel method that will allow doing reconnect logic if the initial connect fails. The ability to reconnect on the initial connection failure is NOT an Options setting.

  1. Connect to a local server on the default url. From the Options class: DEFAULT_URL = "nats://localhost:4222";

    // default options 
    Connection nc = Nats.connect();
    
    // default options, reconnect on connect 
    Connection nc = Nats.connectReconnectOnConnect();
  2. Connect to one or more servers using a URL:

    // single URL, all other default options
    Connection nc = Nats.connect("nats://myhost:4222");
    
    // comma-separated list of URLs, all other default options
    Connection nc = Nats.connect("nats://myhost:4222,nats://myhost:4223");
    
    // single URL, all other default options, reconnect on connect
    Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222");
    
    // comma-separated list of URLs, all other default options, reconnect on connect
    Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222,nats://myhost:4223");
  3. Connect to one or more servers with a custom configuration:

    Options o = new Options.Builder().server("nats://serverone:4222").server("nats://servertwo:4222").maxReconnects(-1).build();
    
    // custom options
    Connection nc = Nats.connect(o);
    
    // custom options, reconnect on connect
    Connection nc = Nats.connectReconnectOnConnect(o);
  4. Connect asynchronously, this requires a callback to tell the application when the client is connected:

    Options options = new Options.Builder().server(Options.DEFAULT_URL).connectionListener(handler).build();
    Nats.connectAsynchronously(options, true);
  5. Connect with authentication handler:

    AuthHandler authHandler = Nats.credentials(System.getenv("NATS_CREDS"));
    
    // single URL, all other default options
    Connection nc = Nats.connect("nats://myhost:4222", authHandler);
    
    // comma-separated list of URLs, all other default options
    Connection nc = Nats.connect("nats://myhost:4222,nats://myhost:4223", authHandler);
    
    // single URL, all other default options, reconnect on connect
    Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222", authHandler);
    
    // comma-separated list of URLs, all other default options, reconnect on connect
    Connection nc = Nats.connectReconnectOnConnect("nats://myhost:4222,nats://myhost:4223", authHandler);

Publishing

Once connected, publishing is accomplished via one of three methods:

  1. With a subject and message body:

    nc.publish("subject", "hello world".getBytes(StandardCharsets.UTF_8));
  2. With a subject and message body, as well as a subject for the receiver to reply to:

    nc.publish("subject", "replyto", "hello world".getBytes(StandardCharsets.UTF_8));
  3. As a request that expects a reply. This method uses a Future to allow the application code to wait for the response. Under the covers a request/reply pair is the same as a publish/subscribe only the library manages the subscription for you.

    Future<Message> incoming = nc.request("subject", "hello world".getBytes(StandardCharsets.UTF_8));
    Message msg = incoming.get(500, TimeUnit.MILLISECONDS);
    String response = new String(msg.getData(), StandardCharsets.UTF_8);

All of these methods, as well as the incoming message code use byte arrays for maximum flexibility. Applications can send JSON, Strings, YAML, Protocol Buffers, or any other format through NATS to applications written in a wide range of languages.

ReplyTo When Making A Request

The Message object allows you to set a replyTo, but in requests, the replyTo is reserved for internal use as the address for the server to respond to the client with the consumer's reply.

Listening for Incoming Messages

The Java NATS library provides two mechanisms to listen for messages, three if you include the request/reply discussed above.

  1. Synchronous subscriptions where the application code manually asks for messages and blocks until they arrive. Each subscription is associated with a single subject, although that subject can be a wildcard.

    Subscription sub = nc.subscribe("subject");
    Message msg = sub.nextMessage(Duration.ofMillis(500));
    
    String response = new String(msg.getData(), StandardCharsets.UTF_8);
  2. A Dispatcher that will call application code in a background thread. Dispatchers can manage multiple subjects with a single thread and shared callback.

    Dispatcher d = nc.createDispatcher((msg) -> {
        String response = new String(msg.getData(), StandardCharsets.UTF_8);
        ...
    });
    
    d.subscribe("subject");

    A dispatcher can also accept individual callbacks for any given subscription.

    Dispatcher d = nc.createDispatcher((msg) -> {});
    
    Subscription s = d.subscribe("some.subject", (msg) -> {
        String response = new String(msg.getData(), StandardCharsets.UTF_8);
        System.out.println("Message received (up to 100 times): " + response);
    });
    d.unsubscribe(s, 100);

JetStream

Publishing and subscribing to JetStream-enabled servers is straightforward. A JetStream-enabled application will connect to a server, establish a JetStream context, and then publish or subscribe. This can be mixed and matched with standard NATS subject, and JetStream subscribers, depending on configuration, receive messages from both streams and directly from other NATS producers.

The JetStream Context

After establishing a connection as described above, create a JetStream Context.

JetStream js = nc.jetStream();

You can pass options to configure the JetStream client, although the defaults should suffice for most users. See the JetStreamOptions class.

There is no limit to the number of contexts used, although normally one would only require a single context. Contexts may be prefixed to be used in conjunction with NATS authorization.

Publishing

To publish messages, use the JetStream.publish(...) API. A stream must be established before publishing. You can publish in either a synchronous or asynchronous manner.

Synchronous:

// create a typical NATS message
Message msg = NatsMessage.builder()
   .subject("foo")
   .data("hello", StandardCharsets.UTF_8)
   .build();

PublishAck pa = js.publish(msg);

See NatsJsPub.java in the JetStream examples for a detailed and runnable example.

If there is a problem an exception will be thrown, and the message may not have been persisted. Otherwise, the stream name and sequence number is returned in the publish acknowledgement.

There are a variety of publish options that can be set when publishing. When duplicate checking has been enabled on the stream, a message ID should be set. One set of options are expectations. You can set a publish expectation such as a particular stream name, previous message ID, or previous sequence number. These are hints to the server that it should reject messages where these are not met, primarily for enforcing your ordering or ensuring messages are not stored on the wrong stream.

The PublishOptions are immutable, but the builder an be re-used for expectations by clearing the expected.

For example:

PublishOptions.Builder pubOptsBuilder = PublishOptions.builder()
        .expectedStream("TEST")
        .messageId("mid1");
PublishAck pa = js.publish("foo", null, pubOptsBuilder.build());

pubOptsBuilder.clearExpected()
        .setExpectedLastMsgId("mid1")
        .setExpectedLastSequence(1)
        .messageId("mid2");
pa = js.publish("foo", null, pubOptsBuilder.build());

See NatsJsPubWithOptionsUseCases.java in the JetStream examples for a detailed and runnable example.

Asynchronous:

List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
for (int x = 1; x < roundCount; x++) {
    // create a typical NATS message
    Message msg = NatsMessage.builder()
    .subject("foo")
    .data("hello", StandardCharsets.UTF_8)
    .build();

    // Publish a message
    futures.add(js.publishAsync(msg));
}

for (CompletableFuture<PublishAck> future : futures) {
   ... process the futures
}

See the NatsJsPubAsync.java in the JetStream examples for a detailed and runnable example.

ReplyTo When Publishing

The Message object allows you to set a replyTo, but in publish requests, the replyTo is reserved for internal use as the address for the server to respond to the client with the PublishAck.

Subscribing

There are two methods of subscribing, Push and Pull with each variety having its own set of options and abilities.

Push Subscribing

Push subscriptions can be synchronous or asynchronous. The server pushes messages to the client.

Asynchronous:

Dispatcher disp = ...;

MessageHandler handler = (msg) -> {
// Process the message.
// Ack the message depending on the ack model
};

PushSubscribeOptions so = PushSubscribeOptions.builder()
   .durable("optional-durable-name")
   .build();

boolean autoAck = ...

js.subscribe("my-subject", disp, handler, autoAck);

See the NatsJsPushSubWithHandler.java in the JetStream examples for a detailed and runnable example.

Synchronous:

See NatsJsPushSub.java in the JetStream examples for a detailed and runnable example.

PushSubscribeOptions so = PushSubscribeOptions.builder()
        .durable("optional-durable-name")
        .build();

// Subscribe synchronously, then just wait for messages.
JetStreamSubscription sub = js.subscribe("subject", so);
nc.flush(Duration.ofSeconds(5));

Message msg = sub.nextMessage(Duration.ofSeconds(1));

Pull Subscriptions

Pull subscriptions are always synchronous. The server organizes messages into a batch which it sends when requested.

PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
   .durable("durable-name-is-optional")
   .build();
JetStreamSubscription sub = js.subscribe("subject", pullOptions);

Bind:

Pull subscriptions allow for binding to existing consumers. The best practice is to provide null for the subscribe subject, but if you do provide it, it must match the consumer subject filter, or you will receive an IllegalArgumentException. See client errors below and JsSubSubjectDoesNotMatchFilter 90011

  1. Short Form

    PullSubscribeOptions pullOptions = PullSubscribeOptions.bind("stream", "durable-name");
    JetStreamSubscription sub = js.subscribe(null, pullOptions);
  2. Long Form

    PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
        .stream("stream")
        .durable("durable-name")
        .bind(true)
        .build();

Fetch:

List<Message> message = sub.fetch(100, Duration.ofSeconds(1));
for (Message m : messages) {
   // process message
   m.ack();
}

The fetch method is a macro pull that uses advanced pulls under the covers to return a list of messages. The list may be empty or contain at most the batch size. All status messages are handled for you except terminal status messages. See Pull Exception Handling below. The client can provide a timeout to wait for the first message in a batch. The fetch call returns when the batch is ready. If the timeout is exceeded while messages are in flight, but before they reach the client, those messages will be available via nextMessage or will be used to fulfill the next fetch.

One important thing to consider when using this is ack wait. Once the server sends a message, it's specific ack wait timer is started. If you ask for too many messages, you may fail to ack all messages in time and can get redeliveries.

See NatsJsPullSubFetch.java and NatsJsPullSubFetchUseCases.java in the JetStream examples for a detailed and runnable example.

Iterate:

        Iterator<Message> iter = sub.iterate(100, Duration.ofSeconds(1));
        while (iter.hasNext()) {
            Message m = iter.next();
            // process message
            m.ack();
        }

The iterate method is a macro pull that uses advanced pulls under the covers to return an iterator. The iterator may have no messages up to at most the batch size. All status messages are handled for you except terminal status messages. See Pull Exception Handling below. The client can provide a timeout to wait for the first message in a batch. The iterate call returns the iterator immediately, but under the covers it will wait for the first message based on the timeout.

The iterate method is usually preferred to the fetch method as it allows you to start processing messages right away instead of waiting until the entire batch is filled. This reduces problems with ack wait and generally is more efficient.

See NatsJsPullSubIterate.java and NatsJsPullSubIterateUseCases.java in the JetStream examples for a detailed and runnable example.

Batch Size:

sub.pull(100);
...
Message m = sub.nextMessage(Duration.ofSeconds(1));

This is an advanced / raw pull that specifies a batch size. When asked, the server will send whatever messages it has up to the batch size. If it has no messages it will wait until it has some to send. The pull request only completes on the server once the entire batch has been sent. It's up to you to track this and only send pulls when the batch is complete, or you risk having pulls stack up and possibly receiving a status 409 Exceeded MaxWaiting warning. The nextMessage request may time out but that does not indicate that there are no more messages in the pull. Instead, it indicates that there is no message available at that time. Once the entire batch size has been filled, you must make another pull request.

See NatsJsPullSubBatchSize.java and NatsJsPullSubBatchSizeUseCases.java in the JetStream examples for detailed and runnable example.

No Wait and Batch Size:

sub.pullNoWait(100);
...
Message m = sub.nextMessage(Duration.ofSeconds(1));

This is an advanced / raw pull that also specifies a batch size. When asked, the server will send whatever messages it has at the moment the pull request is processed by the server, up to the batch size. If there are less than the batch size available, you will get what is available. You must make a pull request every time.

See the NatsJsPullSubNoWaitUseCases.java in the JetStream examples for a detailed and runnable example.

Expires In and Batch Size:

sub.pullExpiresIn(100, Duration.ofSeconds(3));
...
Message m = sub.nextMessage(Duration.ofSeconds(4));

Another advanced version of pull specifies a maximum time to wait for the batch to fill. The server sends messages up until the batch is filled or the time expires. It's important to set your client's nextMessage timeout to be longer than the time you've asked the server to expire in. Once nextMessage returns null, you know your pull is done, and you can make another one.

See NatsJsPullSubExpire.java and NatsJsPullSubExpireUseCases.java in the JetStream examples for detailed and runnable examples.

Pull Exception Handling:

Ordered Push Subscription Option

You can now set a Push Subscription option called "Ordered". When you set this flag, library will take over creation of the consumer and create a subscription that guarantees the order of messages. This consumer will use flow control with a default heartbeat of 5 seconds. Messages will not require acks as the Ack Policy will be set to No Ack. When creating the subscription, there are some restrictions for the consumer configuration settings.

  • Ack policy must be AckPolicy.None (or left un-set). maxAckPending will be ignored.
  • Deliver Group (aka Queue) cannot be used
  • You cannot set a durable consumer name
  • You cannot set the deliver subject
  • max deliver can only be set to 1 (or left un-set)
  • The idle heartbeat cannot be less than 5 seconds. Flow control will automatically be used.

You can however set the deliver policy which will be used to start the subscription.

Client Error Messages

`In addition to some generic validation messages for values in builders, there are also additional grouped and numbered client error messages:

  • Subscription building and creation
  • Consumer creation
  • Object Store operations
Name Group Code Description
JsSoDurableMismatch SO 90101 Builder durable must match the consumer configuration durable if both are provided.
JsSoDeliverGroupMismatch SO 90102 Builder deliver group must match the consumer configuration deliver group if both are provided.
JsSoDeliverSubjectMismatch SO 90103 Builder deliver subject must match the consumer configuration deliver subject if both are provided.
JsSoOrderedNotAllowedWithBind SO 90104 Bind is not allowed with an ordered consumer.
JsSoOrderedNotAllowedWithDeliverGroup SO 90105 Deliver group is not allowed with an ordered consumer.
JsSoOrderedNotAllowedWithDurable SO 90106 Durable is not allowed with an ordered consumer.
JsSoOrderedNotAllowedWithDeliverSubject SO 90107 Deliver subject is not allowed with an ordered consumer.
JsSoOrderedRequiresAckPolicyNone SO 90108 Ordered consumer requires Ack Policy None.
JsSoOrderedRequiresMaxDeliverOfOne SO 90109 Max deliver is limited to 1 with an ordered consumer.
JsSoNameMismatch SO 90110 Builder name must match the consumer configuration name if both are provided.
JsSoOrderedMemStorageNotSuppliedOrTrue SO 90111 Mem Storage must be true if supplied.
JsSoOrderedReplicasNotSuppliedOrOne SO 90112 Replicas must be 1 if supplied.
JsSoNameOrDurableRequiredForBind SO 90113 Name or Durable required for Bind.
JsSubPullCantHaveDeliverGroup SUB 90001 Pull subscriptions can't have a deliver group.
JsSubPullCantHaveDeliverSubject SUB 90002 Pull subscriptions can't have a deliver subject.
JsSubPushCantHaveMaxPullWaiting SUB 90003 Push subscriptions cannot supply max pull waiting.
JsSubQueueDeliverGroupMismatch SUB 90004 Queue / deliver group mismatch.
JsSubFcHbNotValidPull SUB 90005 Flow Control and/or heartbeat is not valid with a pull subscription.
JsSubFcHbNotValidQueue SUB 90006 Flow Control and/or heartbeat is not valid in queue mode.
JsSubNoMatchingStreamForSubject SUB 90007 No matching streams for subject.
JsSubConsumerAlreadyConfiguredAsPush SUB 90008 Consumer is already configured as a push consumer.
JsSubConsumerAlreadyConfiguredAsPull SUB 90009 Consumer is already configured as a pull consumer.
removed SUB 90010
JsSubSubjectDoesNotMatchFilter SUB 90011 Subject does not match consumer configuration filter.
JsSubConsumerAlreadyBound SUB 90012 Consumer is already bound to a subscription.
JsSubExistingConsumerNotQueue SUB 90013 Existing consumer is not configured as a queue / deliver group.
JsSubExistingConsumerIsQueue SUB 90014 Existing consumer is configured as a queue / deliver group.
JsSubExistingQueueDoesNotMatchRequestedQueue SUB 90015 Existing consumer deliver group does not match requested queue / deliver group.
JsSubExistingConsumerCannotBeModified SUB 90016 Existing consumer cannot be modified.
JsSubConsumerNotFoundRequiredInBind SUB 90017 Consumer not found, required in bind mode.
JsSubOrderedNotAllowOnQueues SUB 90018 Ordered consumer not allowed on queues.
JsSubPushCantHaveMaxBatch SUB 90019 Push subscriptions cannot supply max batch.
JsSubPushCantHaveMaxBytes SUB 90020 Push subscriptions cannot supply max bytes.
JsSubPushAsyncCantSetPending SUB 90021 Pending limits must be set directly on the dispatcher.
JsSubSubjectNeededToLookupStream SUB 90022 Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject.
JsConsumerCreate290NotAvailable CON 90301 Name field not valid when v2.9.0 consumer create api is not available.
JsConsumerNameDurableMismatch CON 90302 Name must match durable if both are supplied.
JsMultipleFilterSubjects210NotAvailable CON 90303 Multiple filter subjects not available until server version 2.10.0.
OsObjectNotFound OS 90201 The object was not found.
OsObjectIsDeleted OS 90202 The object is deleted.
OsObjectAlreadyExists OS 90203 An object with that name already exists.
OsCantLinkToLink OS 90204 A link cannot link to another link.
OsGetDigestMismatch OS 90205 Digest does not match meta data.
OsGetChunksMismatch OS 90206 Number of chunks does not match meta data.
OsGetSizeMismatch OS 90207 Total size does not match meta data.
OsGetLinkToBucket OS 90208 Cannot get object, it is a link to a bucket.
OsLinkNotAllowOnPut OS 90209 Link not allowed in metadata when putting an object.

Message Acknowledgements

There are multiple types of acknowledgements in JetStream:

  • Message.ack(): Acknowledges a message.
  • Message.ackSync(Duration): Acknowledges a message and waits for a confirmation. When used with deduplications this creates exactly once delivery guarantees (within the deduplication window). This may significantly impact performance of the system.
  • Message.nak(): A negative acknowledgment indicating processing failed and the message should be resent later.
  • Message.term(): Never send this message again, regardless of configuration.
  • Message.inProgress(): The message is being processed and reset the redelivery timer in the server. The message must be acknowledged later when processing is complete.

Note that exactly once delivery guarantee can be achieved by using a consumer with explicit ack mode attached to stream setup with a deduplication window and using the ackSync to acknowledge messages. The guarantee is only valid for the duration of the deduplication window.

Advanced Usage

TLS

NATS supports TLS 1.2. The server can be configured to verify client certificates or not. Depending on this setting the client has several options.

  1. The Java library allows the use of the tls:// protocol in its urls. This setting expects a default SSLContext to be set. You can set this default context using System properties, or in code. For example, you could run the publish example using:

    java -Djavax.net.ssl.keyStore=src/test/resources/keystore.jks -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStore=src/test/resources/truststore.jks -Djavax.net.ssl.trustStorePassword=password io.nats.examples.NatsPub tls://localhost:4443 test "hello world"

    where the following properties are being set:

    -Djavax.net.ssl.keyStore=src/test/resources/keystore.jks
    -Djavax.net.ssl.keyStorePassword=password
    -Djavax.net.ssl.trustStore=src/test/resources/truststore.jks
    -Djavax.net.ssl.trustStorePassword=password

    This method can be used with or without client verification.

  2. During development, or behind a firewall where the client can trust the server, the library supports the opentls:// protocol which will use a special SSLContext that trusts all server certificates, but provides no client certificates.

    java io.nats.examples.NatsSub opentls://localhost:4443 test 3

    This method requires that client verification is off.

  3. Your code can build an SSLContext to work with or without client verification.

    SSLContext ctx = createContext();
    Options options = new Options.Builder().server(ts.getURI()).sslContext(ctx).build();
    Connection nc = Nats.connect(options);

If you want to try out these techniques, take a look at the README.md for instructions.

Also, here are some places in the code that may help https://github.com/nats-io/nats.java/blob/main/src/main/java/io/nats/client/support/SSLUtils.java https://github.com/nats-io/nats.java/blob/main/src/test/java/io/nats/client/TestSSLUtils.java

Clusters & Reconnecting

The Java client will automatically reconnect if it loses its connection the nats-server. If given a single server, the client will keep trying that one. If given a list of servers, the client will rotate between them. When the nats servers are in a cluster, they will tell the client about the other servers, so that in the simplest case a client could connect to one server, learn about the cluster and reconnect to another server if its initial one goes down.

To tell the connection about multiple servers for the initial connection, use the servers() method on the options builder, or call server() multiple times.

String[] serverUrls = {"nats://serverOne:4222", "nats://serverTwo:4222"};
Options o = new Options.Builder().servers(serverUrls).build();

Reconnection behavior is controlled via a few options, see the javadoc for the Options.Builder class for specifics on reconnect limits, delays and buffers.

Benchmarking

The io.nats.examples package contains two benchmarking tools, modeled after tools in other NATS clients. Both examples run against an existing nats-server. The first called io.nats.examples.benchmark.NatsBench runs two simple tests, the first simply publishes messages, the second also receives messages. Tests are run with 1 thread/connection per publisher or subscriber. Running on an iMac (2017), with 4.2 GHz Intel Core i7 and 64GB of memory produced results like:

Starting benchmark(s) [msgs=5000000, msgsize=256, pubs=2, subs=2]
Current memory usage is 966.14 mb / 981.50 mb / 14.22 gb free/total/max
Use ctrl-C to cancel.
Pub Only stats: 9,584,263 msgs/sec ~ 2.29 gb/sec
 [ 1] 4,831,495 msgs/sec ~ 1.15 gb/sec (2500000 msgs)
 [ 2] 4,792,145 msgs/sec ~ 1.14 gb/sec (2500000 msgs)
  min 4,792,145 | avg 4,811,820 | max 4,831,495 | stddev 19,675.00 msgs
Pub/Sub stats: 3,735,744 msgs/sec ~ 912.05 mb/sec
 Pub stats: 1,245,680 msgs/sec ~ 304.12 mb/sec
  [ 1] 624,385 msgs/sec ~ 152.44 mb/sec (2500000 msgs)
  [ 2] 622,840 msgs/sec ~ 152.06 mb/sec (2500000 msgs)
   min 622,840 | avg 623,612 | max 624,385 | stddev 772.50 msgs
 Sub stats: 2,490,461 msgs/sec ~ 608.02 mb/sec
  [ 1] 1,245,230 msgs/sec ~ 304.01 mb/sec (5000000 msgs)
  [ 2] 1,245,231 msgs/sec ~ 304.01 mb/sec (5000000 msgs)
   min 1,245,230 | avg 1,245,230 | max 1,245,231 | stddev .71 msgs
Final memory usage is 2.02 gb / 2.94 gb / 14.22 gb free/total/max

The second, called io.nats.examples.autobench.NatsAutoBench runs a series of tests with various message sizes. Running this test on the same iMac, resulted in:

PubOnly 0b           10,000,000          8,464,850 msg/s       0.00 b/s
PubOnly 8b           10,000,000         10,065,263 msg/s     76.79 mb/s
PubOnly 32b          10,000,000         12,534,612 msg/s    382.53 mb/s
PubOnly 256b         10,000,000          7,996,057 msg/s      1.91 gb/s
PubOnly 512b         10,000,000          5,942,165 msg/s      2.83 gb/s
PubOnly 1k            1,000,000          4,043,937 msg/s      3.86 gb/s
PubOnly 4k              500,000          1,114,947 msg/s      4.25 gb/s
PubOnly 8k              100,000            460,630 msg/s      3.51 gb/s
PubSub 0b            10,000,000          3,155,673 msg/s       0.00 b/s
PubSub 8b            10,000,000          3,218,427 msg/s     24.55 mb/s
PubSub 32b           10,000,000          2,681,550 msg/s     81.83 mb/s
PubSub 256b          10,000,000          2,020,481 msg/s    493.28 mb/s
PubSub 512b           5,000,000          2,000,918 msg/s    977.01 mb/s
PubSub 1k             1,000,000          1,170,448 msg/s      1.12 gb/s
PubSub 4k               100,000            382,964 msg/s      1.46 gb/s
PubSub 8k               100,000            196,474 msg/s      1.50 gb/s
PubDispatch 0b       10,000,000          4,645,438 msg/s       0.00 b/s
PubDispatch 8b       10,000,000          4,500,006 msg/s     34.33 mb/s
PubDispatch 32b      10,000,000          4,458,481 msg/s    136.06 mb/s
PubDispatch 256b     10,000,000          2,586,563 msg/s    631.49 mb/s
PubDispatch 512b      5,000,000          2,187,592 msg/s      1.04 gb/s
PubDispatch 1k        1,000,000          1,369,985 msg/s      1.31 gb/s
PubDispatch 4k          100,000            403,314 msg/s      1.54 gb/s
PubDispatch 8k          100,000            203,320 msg/s      1.55 gb/s
ReqReply 0b              20,000              9,548 msg/s       0.00 b/s
ReqReply 8b              20,000              9,491 msg/s     74.15 kb/s
ReqReply 32b             10,000              9,778 msg/s    305.59 kb/s
ReqReply 256b            10,000              8,394 msg/s      2.05 mb/s
ReqReply 512b            10,000              8,259 msg/s      4.03 mb/s
ReqReply 1k              10,000              8,193 msg/s      8.00 mb/s
ReqReply 4k              10,000              7,915 msg/s     30.92 mb/s
ReqReply 8k              10,000              7,454 msg/s     58.24 mb/s
Latency 0b    5,000     35 /  49.20 / 134    +/- 0.77  (microseconds)
Latency 8b    5,000     35 /  49.54 / 361    +/- 0.80  (microseconds)
Latency 32b   5,000     35 /  49.27 / 135    +/- 0.79  (microseconds)
Latency 256b  5,000     41 /  56.41 / 142    +/- 0.90  (microseconds)
Latency 512b  5,000     40 /  56.41 / 174    +/- 0.91  (microseconds)
Latency 1k    5,000     35 /  49.76 / 160    +/- 0.80  (microseconds)
Latency 4k    5,000     36 /  50.64 / 193    +/- 0.83  (microseconds)
Latency 8k    5,000     38 /  55.45 / 206    +/- 0.88  (microseconds)

It is worth noting that in both cases memory was not a factor, the processor and OS were more of a consideration. To test this, take a look at the NatsBench results again. Those are run without any constraint on the Java heap and end up doubling the used memory. However, if we run the same test again with a constraint of 1Gb using -Xmx1g, the performance is comparable, differentiated primarily by "noise" that we can see between test runs with the same settings.

Starting benchmark(s) [msgs=5000000, msgsize=256, pubs=2, subs=2]
Current memory usage is 976.38 mb / 981.50 mb / 981.50 mb free/total/max
Use ctrl-C to cancel.

Pub Only stats: 10,123,382 msgs/sec ~ 2.41 gb/sec
 [ 1] 5,068,256 msgs/sec ~ 1.21 gb/sec (2500000 msgs)
 [ 2] 5,061,691 msgs/sec ~ 1.21 gb/sec (2500000 msgs)
  min 5,061,691 | avg 5,064,973 | max 5,068,256 | stddev 3,282.50 msgs

Pub/Sub stats: 3,563,770 msgs/sec ~ 870.06 mb/sec
 Pub stats: 1,188,261 msgs/sec ~ 290.10 mb/sec
  [ 1] 594,701 msgs/sec ~ 145.19 mb/sec (2500000 msgs)
  [ 2] 594,130 msgs/sec ~ 145.05 mb/sec (2500000 msgs)
   min 594,130 | avg 594,415 | max 594,701 | stddev 285.50 msgs
 Sub stats: 2,375,839 msgs/sec ~ 580.04 mb/sec
  [ 1] 1,187,919 msgs/sec ~ 290.02 mb/sec (5000000 msgs)
  [ 2] 1,187,920 msgs/sec ~ 290.02 mb/sec (5000000 msgs)
   min 1,187,919 | avg 1,187,919 | max 1,187,920 | stddev .71 msgs


Final memory usage is 317.62 mb / 960.50 mb / 960.50 mb free/total/max

Building From Source

The build depends on Gradle, and contains gradlew to simplify the process. After cloning, you can build the repository and run the tests with a single command:

> git clone https://github.com/nats-io/nats.java
> cd nats.java
> ./gradlew clean build

Or to build without tests

> ./gradlew clean build -x test

This will place the class files in a new build folder. To just build the jar:

> ./gradlew jar

The jar will be placed in build/libs.

You can also build the java doc, and the samples jar using:

> ./gradlew javadoc
> ./gradlew exampleJar

The java doc is located in build/docs and the example jar is in build/libs. Finally, to run the tests with the coverage report:

> ./gradlew test jacocoTestReport

which will create a folder called build/reports/jacoco containing the file index.html you can open and use to browse the coverage. Keep in mind we have focused on library test coverage, not coverage for the examples.

Many of the tests run nats-server on a custom port. If nats-server is in your path they should just work, but in cases where it is not, or an IDE running tests has issues with the path you can specify the nats-server location with the environment variable nats_-_server_path.

TLS Certs

The raw TLS test certs are in src/test/resources/certs and come from the nats.go repository. However, the java client also needs a keystore and truststore.jks files for creating a context. These can be created using:

> cd src/test/resources
> keytool -keystore truststore.jks -alias CARoot -import -file certs/ca.pem -storepass password -noprompt -storetype pkcs12
> cat certs/client-key.pem certs/client-cert.pem > combined.pem
> openssl pkcs12 -export -in combined.pem -out cert.p12
> keytool -importkeystore -srckeystore cert.p12 -srcstoretype pkcs12 -deststoretype pkcs12 -destkeystore keystore.jks
> keytool -keystore keystore.jks -alias CARoot -import -file certs/ca.pem -storepass password -noprompt
> rm cert.p12 combined.pem

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

About

Java client for NATS

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 100.0%