Java idiomatic client for Cloud Pub/Sub Lite.
Note: This client is a work-in-progress, and may occasionally make backwards-incompatible changes.
If you are using Maven, add this to your pom.xml file:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.105.0</version>
</dependency>
<!-- A logging dependency used by the underlying library -->
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>flogger-system-backend</artifactId>
<version>0.5.1</version>
<scope>runtime</scope>
</dependency>
If you are using Gradle, add this to your dependencies
compile 'com.google.cloud:google-cloud-pubsublite:0.1.6'
If you are using SBT, add this to your dependencies
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "0.1.6"
See the Authentication section in the base directory's README.
You will need a Google Cloud Platform Console project with the Cloud Pub/Sub Lite API enabled.
You will need to enable billing to use Google Cloud Pub/Sub Lite.
Follow these instructions to get your project set up. You will also need to set up the local development environment by
installing the Google Cloud SDK and running the following commands in command line:
gcloud auth login
and gcloud config set project [YOUR PROJECT ID]
.
You'll need to obtain the google-cloud-pubsublite
library. See the Quickstart section
to add google-cloud-pubsublite
as a dependency in your code.
Google Pub/Sub Lite is designed to provide reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers, Google Cloud Pub/Sub allows developers to communicate between independently written applications.
Compared to Google Pub/Sub, Pub/Sub Lite provides partitioned zonal data storage with predefined capacity. Both products present a similar API, but Pub/Sub Lite has more usage caveats.
See the Google Pub/Sub Lite docs for more details on how to activate Pub/Sub Lite for your project, as well as guidance on how to choose between Cloud Pub/Sub and Pub/Sub Lite.
With Pub/Sub Lite you can create topics. A topic is a named resource to which messages are sent by publishers. Add the following imports at the top of your file:
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.Topic.*;
import com.google.protobuf.util.Durations;
Then, to create the topic, use the following code:
CloudRegion cloudRegion = CloudRegion.of(CLOUD_REGION);
CloudZone zone = CloudZone.of(cloudRegion, ZONE_ID);
ProjectNumber projectNum = ProjectNumber.of(PROJECT_NUMBER);
TopicName topicName = TopicName.of(TOPIC_NAME);
TopicPath topicPath =
TopicPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setTopicName(topicName)
.build();
Topic topic =
Topic.newBuilder()
.setPartitionConfig(
PartitionConfig.newBuilder()
// Set publishing throughput to 1 times the standard partition
// throughput of 4 MiB per sec. This must be in the range [1,4]. A
// topic with `scale` of 2 and count of 10 is charged for 20 partitions.
.setScale(1)
.setCount(PARTITIONS))
.setRetentionConfig(
RetentionConfig.newBuilder()
// How long messages are retained.
.setPeriod(Durations.fromDays(1))
// Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB.
// If the number of bytes stored in any of the topic's partitions grows
// beyond this value, older messages will be dropped to make room for
// newer ones, regardless of the value of `period`.
.setPerPartitionBytes(100 * 1024 * 1024 * 1024L))
.setName(topicPath.value())
.build();
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(cloudRegion).build();
try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
Topic response = adminClient.createTopic(topic).get();
System.out.println(response.getAllFields() + "created successfully.");
}
With Pub/Sub Lite, you can publish messages to a topic. Add the following import at the top of your file:
import com.google.api.core.*;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.cloudpubsub.*;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.*;
Then, to publish messages asynchronously, use the following code:
public class PublisherExample {
private static final int MESSAGE_COUNT = 10;
// Load the project number from a commandline flag.
private static final long PROJECT_NUMBER = 123L;
// Load the zone from a commandline flag.
private static final String ZONE = "us-central1-b";
// Load the topic name from a commandline flag.
private static final String TOPIC_NAME = "my-new-topic";
public static List<ApiFuture<String>> runPublisher(Publisher publisher) throws Exception {
List<ApiFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Schedule a message to be published. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
return futures;
}
// Publish messages to a topic.
public static void main(String[] args) throws Exception {
PublisherSettings settings =
PublisherSettings.newBuilder()
.setTopicPath(
TopicPaths.newBuilder()
.setProjectNumber(ProjectNumber.of(PROJECT_NUMBER))
.setZone(CloudZone.parse(ZONE))
.setTopicName(TopicName.of(TOPIC_NAME))
.build())
.build();
Publisher publisher = Publisher.create(settings);
publisher.startAsync().awaitRunning();
List<ApiFuture<String>> futureAckIds = runPublisher(publisher);
publisher.stopAsync().awaitTerminated();
List<String> ackIds = ApiFutures.allAsList(futureAckIds).get();
ArrayList<PublishMetadata> metadata = new ArrayList<>();
for (String id : ackIds) {
metadata.add(PublishMetadata.decode(id));
}
for (PublishMetadata one : metadata) {
System.out.println(one);
}
}
}
With Pub/Sub Lite you can create subscriptions. A subscription represents the stream of messages from a single, specific topic. Add the following imports at the top of your file:
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Subscription.*;
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.*;
Then, to create the subscription, use the following code:
CloudRegion cloudRegion = CloudRegion.of(CLOUD_REGION);
CloudZone zone = CloudZone.of(cloudRegion, ZONE_ID);
ProjectNumber projectNum = ProjectNumber.of(PROJECT_NUMBER);
TopicName topicName = TopicName.of(TOPIC_NAME);
SubscriptionName subscriptionName = SubscriptionName.of(SUBSCRIPTION_NAME);
TopicPath topicPath =
TopicPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setTopicName(topicName)
.build();
SubscriptionPath subscriptionPath =
SubscriptionPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setSubscriptionName(subscriptionName)
.build();
Subscription subscription =
Subscription.newBuilder()
.setDeliveryConfig(
// The server does not wait for a published message to be successfully
// written to storage before delivering it to subscribers. As such, a
// subscriber may receive a message for which the write to storage failed.
// If the subscriber re-reads the offset of that message later on, there
// may be a gap at that offset.
DeliveryConfig.newBuilder()
.setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY))
.setName(subscriptionPath.value())
.setTopic(topicPath.value())
.build();
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(cloudRegion).build();
try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
Subscription response = adminClient.createSubscription(subscription).get();
System.out.println(response.getAllFields() + "created successfully.");
}
With Pub/Sub Lite you can receive messages from a subscription. Add the following imports at the top of your file:
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.cloudpubsub.*;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import java.util.*;
Then, to pull messages asynchronously, use the following code:
CloudRegion cloudRegion = CloudRegion.of(CLOUD_REGION);
CloudZone zone = CloudZone.of(cloudRegion, ZONE_ID);
ProjectNumber projectNum = ProjectNumber.of(PROJECT_NUMBER);
SubscriptionName subscriptionName = SubscriptionName.of(SUBSCRIPTION_NAME);
SubscriptionPath subscriptionPath =
SubscriptionPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setSubscriptionName(subscriptionName)
.build();
FlowControlSettings flowControlSettings =
FlowControlSettings.builder()
// Set outstanding bytes to 10 MiB per partition.
.setBytesOutstanding(10 * 1024 * 1024L)
.setMessagesOutstanding(Long.MAX_VALUE)
.build();
List<Partition> partitions = new ArrayList<>();
for (Integer num : PARTITION_NOS) {
partitions.add(Partition.of(num));
}
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("Id : " + message.getMessageId());
System.out.println("Data : " + message.getData().toStringUtf8());
consumer.ack();
}
};
SubscriberSettings subscriberSettings =
SubscriberSettings.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPerPartitionFlowControlSettings(flowControlSettings)
.setPartitions(partitions)
.setReceiver(receiver)
.build();
Subscriber subscriber = Subscriber.create(subscriberSettings);
// Start the subscriber. Upon successful starting, its state will become RUNNING.
subscriber.startAsync().awaitRunning();
System.out.println("Listening to messages on " + subscriptionPath.value() + " ...");
try {
System.out.println(subscriber.state());
// Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters
// unrecoverable errors before then, its state will change to FAILED and
// an IllegalStateException will be thrown.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException t) {
// Shut down the subscriber. This will change the state of the
// subscriber to TERMINATED.
subscriber.stopAsync();
System.out.println(subscriber.state());
}
Samples are in the samples/
directory. The samples' README.md
has instructions for running the samples.
Sample | Source Code | Try it |
---|---|---|
Create Subscription Example | source code | |
Create Topic Example | source code | |
Delete Subscription Example | source code | |
Delete Topic Example | source code | |
Get Subscription Example | source code | |
Get Topic Example | source code | |
List Subscriptions In Project Example | source code | |
List Subscriptions In Topic Example | source code | |
List Topics Example | source code | |
Publisher Example | source code | |
Subscriber Example | source code | |
Update Subscription Example | source code | |
Update Topic Example | source code |
To get help, follow the instructions in the shared Troubleshooting document.
Cloud Pub/Sub Lite uses gRPC for the transport layer.
Java 8 or above is required for using this client.
This library follows Semantic Versioning.
It is currently in major version zero (0.y.z
), which means that anything may change at any time
and the public API should not be considered stable.
Contributions to this library are always welcome and highly encouraged.
See CONTRIBUTING for more information how to get started.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.
Apache 2.0 - See LICENSE for more information.
Java Version | Status |
---|---|
Java 8 | |
Java 8 OSX | |
Java 8 Windows | |
Java 11 |