Skip to content

Commit

Permalink
Initial version of durable state plugin docs (#574)
Browse files Browse the repository at this point in the history
* Initial version of durable state plugin docs

* Changed Optional#isEmpty since it's not there in JDK 8
  • Loading branch information
debasishg authored Jul 29, 2021
1 parent 60b649b commit f3942ec
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 49 deletions.
95 changes: 48 additions & 47 deletions core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,68 +36,69 @@
import java.util.concurrent.CompletionStage;

final class JavadslSnippets {
void create() {
// #create
void create() {
// #create

ActorSystem actorSystem = ActorSystem.create("example");
CompletionStage<Done> done = SchemaUtils.createIfNotExists(actorSystem);
// #create
}
ActorSystem actorSystem = ActorSystem.create("example");
CompletionStage<Done> done = SchemaUtils.createIfNotExists(actorSystem);
// #create
}

void readJournal() {
ActorSystem system = ActorSystem.create("example");
// #read-journal
void readJournal() {
ActorSystem system = ActorSystem.create("example");
// #read-journal

final JdbcReadJournal readJournal = PersistenceQuery.get(system)
.getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());
// #read-journal
final JdbcReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());
// #read-journal

}
}

void persistenceIds() {
ActorSystem system = ActorSystem.create();
// #persistence-ids
void persistenceIds() {
ActorSystem system = ActorSystem.create();
// #persistence-ids

JdbcReadJournal readJournal = PersistenceQuery.get(system)
.getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());
JdbcReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());

Source<String, NotUsed> willNotCompleteTheStream = readJournal.persistenceIds();
Source<String, NotUsed> willNotCompleteTheStream = readJournal.persistenceIds();

Source<String, NotUsed> willCompleteTheStream = readJournal.currentPersistenceIds();
// #persistence-ids
}
Source<String, NotUsed> willCompleteTheStream = readJournal.currentPersistenceIds();
// #persistence-ids
}

void eventsByPersistenceIds() {
ActorSystem system = ActorSystem.create();
void eventsByPersistenceIds() {
ActorSystem system = ActorSystem.create();

// #events-by-persistence-id
// #events-by-persistence-id

JdbcReadJournal readJournal = PersistenceQuery.get(system)
.getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());
JdbcReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());

Source<EventEnvelope, NotUsed> willNotCompleteTheStream = readJournal
.eventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE);
Source<EventEnvelope, NotUsed> willNotCompleteTheStream =
readJournal.eventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE);

Source<EventEnvelope, NotUsed> willCompleteTheStream = readJournal
.currentEventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE);
// #events-by-persistence-id
}
Source<EventEnvelope, NotUsed> willCompleteTheStream =
readJournal.currentEventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE);
// #events-by-persistence-id
}

void eventsByTag() {
ActorSystem system = ActorSystem.create();
// #events-by-tag

JdbcReadJournal readJournal = PersistenceQuery.get(system)
.getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());

Source<EventEnvelope, NotUsed> willNotCompleteTheStream = readJournal
.eventsByTag("apple", Offset.sequence(0L));

Source<EventEnvelope, NotUsed> willCompleteTheStream = readJournal
.currentEventsByTag("apple", Offset.sequence(0L));
// #events-by-tag
}
void eventsByTag() {
ActorSystem system = ActorSystem.create();
// #events-by-tag

JdbcReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());

Source<EventEnvelope, NotUsed> willNotCompleteTheStream =
readJournal.eventsByTag("apple", Offset.sequence(0L));

Source<EventEnvelope, NotUsed> willCompleteTheStream =
readJournal.currentEventsByTag("apple", Offset.sequence(0L));
// #events-by-tag
}
}
159 changes: 159 additions & 0 deletions core/src/test/java/akka/persistence/jdbc/state/JavadslSnippets.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package akka.persistence.jdbc.state;

import java.util.concurrent.CompletionStage;
import akka.actor.ActorSystem;
import akka.Done;
import akka.NotUsed;
// #create
import akka.persistence.jdbc.testkit.javadsl.SchemaUtils;
// #create
// #jdbc-durable-state-store
import akka.persistence.state.DurableStateStoreRegistry;
import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
// #jdbc-durable-state-store
// #get-object
import akka.persistence.state.DurableStateStoreRegistry;
import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import akka.persistence.state.javadsl.GetObjectResult;
// #get-object
// #upsert-get-object
import akka.persistence.state.DurableStateStoreRegistry;
import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import akka.persistence.state.javadsl.GetObjectResult;
// #upsert-get-object
// #delete-object
import akka.persistence.state.DurableStateStoreRegistry;
import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
// #delete-object
// #current-changes
import akka.NotUsed;
import akka.stream.javadsl.Source;
import akka.persistence.state.DurableStateStoreRegistry;
import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import akka.persistence.query.DurableStateChange;
import akka.persistence.query.NoOffset;
// #current-changes
// #changes
import akka.NotUsed;
import akka.stream.javadsl.Source;
import akka.persistence.state.DurableStateStoreRegistry;
import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import akka.persistence.query.DurableStateChange;
import akka.persistence.query.NoOffset;
// #changes

final class JavadslSnippets {
void create() {
// #create

ActorSystem system = ActorSystem.create("example");
CompletionStage<Done> done = SchemaUtils.createIfNotExists(system);
// #create
}

void durableStatePlugin() {
ActorSystem system = ActorSystem.create("example");

// #jdbc-durable-state-store

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc");
// #jdbc-durable-state-store
}

void getObject() {
ActorSystem system = ActorSystem.create("example");

// #get-object

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc");

CompletionStage<GetObjectResult<String>> futureResult = store.getObject("InvalidPersistenceId");
try {
GetObjectResult<String> result = futureResult.toCompletableFuture().get();
assert !result.value().isPresent();
} catch (Exception e) {
// handle exceptions
}
// #get-object
}

void upsertAndGetObject() {
ActorSystem system = ActorSystem.create("example");

// #upsert-get-object

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc");

CompletionStage<GetObjectResult<String>> r =
store
.upsertObject("p234", 1, "a valid string", "t123")
.thenCompose(d -> store.getObject("p234"))
.thenCompose(o -> store.upsertObject("p234", 2, "updated valid string", "t123"))
.thenCompose(d -> store.getObject("p234"));

try {
assert r.toCompletableFuture().get().value().get().equals("updated valid string");
} catch (Exception e) {
// handle exceptions
}
// #upsert-get-object
}

void deleteObject() {
ActorSystem system = ActorSystem.create("example");

// #delete-object

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc");

CompletionStage<Done> futureResult = store.deleteObject("p123");
try {
assert futureResult.toCompletableFuture().get().equals(Done.getInstance());
} catch (Exception e) {
// handle exceptions
}
// #delete-object
}

void currentChanges() {
ActorSystem system = ActorSystem.create("example");

// #current-changes

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc");

Source<DurableStateChange<String>, NotUsed> willCompleteTheStream =
store.currentChanges("tag-1", NoOffset.getInstance());
// #current-changes
}

void changes() {
ActorSystem system = ActorSystem.create("example");

// #changes

@SuppressWarnings("unchecked")
JdbcDurableStateStore<String> store =
DurableStateStoreRegistry.get(system)
.getDurableStateStoreFor(JdbcDurableStateStore.class, "akka.persistence.state.jdbc");

Source<DurableStateChange<String>, NotUsed> willNotCompleteTheStream =
store.changes("tag-1", NoOffset.getInstance());
// #changes
}
}
Loading

0 comments on commit f3942ec

Please sign in to comment.