Skip to content

Commit

Permalink
Merge pull request #192 from Fabszn/externalizedConfBuffer
Browse files Browse the repository at this point in the history
Makes buffer size can be setting up from properties
  • Loading branch information
scullxbones authored May 13, 2018
2 parents 4f933cf + 401bc76 commit 1add327
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
7 changes: 6 additions & 1 deletion common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
# ...
#


akka {
contrib {
persistence {
stream-buffer-max-size {
event-by-pid = 1000
all-events = 1000
events-by-tag = 1000
pid = 1000
}
mongodb {
mongo {
// legacy approach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ object MongoReadJournal {

class MongoReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider {


private[this] val impl = MongoPersistenceExtension(system)(config).readJournal
private[this] implicit val materializer = ActorMaterializer()(system)

override def scaladslReadJournal(): scaladsl.ReadJournal = new ScalaDslMongoReadJournal(impl)
override def scaladslReadJournal(): scaladsl.ReadJournal = new ScalaDslMongoReadJournal(impl,system.settings.config)

override def javadslReadJournal(): javadsl.ReadJournal = new JavaDslMongoReadJournal(new ScalaDslMongoReadJournal(impl))
override def javadslReadJournal(): javadsl.ReadJournal = new JavaDslMongoReadJournal(new ScalaDslMongoReadJournal(impl, system.settings.config))
}

object ScalaDslMongoReadJournal {
Expand All @@ -49,7 +50,7 @@ object ScalaDslMongoReadJournal {
}
}

class ScalaDslMongoReadJournal(impl: MongoPersistenceReadJournallingApi)(implicit m: Materializer)
class ScalaDslMongoReadJournal(impl: MongoPersistenceReadJournallingApi, config: Config)(implicit m: Materializer)
extends scaladsl.ReadJournal
with CurrentPersistenceIdsQuery
with CurrentEventsByPersistenceIdQuery
Expand All @@ -60,6 +61,10 @@ class ScalaDslMongoReadJournal(impl: MongoPersistenceReadJournallingApi)(implici

import ScalaDslMongoReadJournal._

val streamBufferSizeMaxConfig = config.getConfig("akka.contrib.persistence.stream-buffer-max-size")



def currentAllEvents(): Source[EventEnvelope, NotUsed] = impl.currentAllEvents.toEventEnvelopes

override def currentPersistenceIds(): Source[String, NotUsed] = impl.currentPersistenceIds
Expand All @@ -78,7 +83,7 @@ class ScalaDslMongoReadJournal(impl: MongoPersistenceReadJournallingApi)(implici
def allEvents(): Source[EventEnvelope, NotUsed] = {
val pastSource = impl.currentAllEvents
val realtimeSource =
Source.actorRef[(Event, Offset)](100, OverflowStrategy.dropTail)
Source.actorRef[(Event, Offset)](streamBufferSizeMaxConfig.getInt("all-events"), OverflowStrategy.dropTail)
.mapMaterializedValue(impl.subscribeJournalEvents)
.map{ case(e,_) => e }
(pastSource ++ realtimeSource).via(new RemoveDuplicatedEventsByPersistenceId).toEventEnvelopes
Expand All @@ -91,7 +96,7 @@ class ScalaDslMongoReadJournal(impl: MongoPersistenceReadJournallingApi)(implici
.withAttributes(Attributes.logLevels(Logging.InfoLevel, Logging.InfoLevel))

val realtimeSource =
Source.actorRef[(Event,Offset)](100, OverflowStrategy.dropTail)
Source.actorRef[(Event,Offset)](streamBufferSizeMaxConfig.getInt("event-by-pid"), OverflowStrategy.dropTail)
.mapMaterializedValue{ar => impl.subscribeJournalEvents(ar); NotUsed}
.map{ case(e,_) => e }
.filter(_.pid == persistenceId)
Expand All @@ -116,7 +121,7 @@ class ScalaDslMongoReadJournal(impl: MongoPersistenceReadJournallingApi)(implici
override def persistenceIds(): Source[String, NotUsed] = {

val pastSource = impl.currentPersistenceIds
val realtimeSource = Source.actorRef[(Event, Offset)](100, OverflowStrategy.dropHead)
val realtimeSource = Source.actorRef[(Event, Offset)](streamBufferSizeMaxConfig.getInt("pid"), OverflowStrategy.dropHead)
.map{case (e,_) => e.pid}
.mapMaterializedValue{actor => impl.subscribeJournalEvents(actor); NotUsed}
(pastSource ++ realtimeSource).via(new RemoveDuplicates)
Expand All @@ -130,7 +135,7 @@ class ScalaDslMongoReadJournal(impl: MongoPersistenceReadJournallingApi)(implici
impl.currentEventsByTag(tag, offset)
.toEventEnvelopes
val realtimeSource =
Source.actorRef[(Event, Offset)](100, OverflowStrategy.dropTail)
Source.actorRef[(Event, Offset)](streamBufferSizeMaxConfig.getInt("events-by-tag"), OverflowStrategy.dropTail)
.mapMaterializedValue[NotUsed]{ar => impl.subscribeJournalEvents(ar); NotUsed}
.filter{ case (ev, off) =>
ev.tags.contains(tag) && ordering.gt(off, offset)
Expand Down
12 changes: 12 additions & 0 deletions docs/akka25.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ akka.persistence.snapshot-store.plugin = "akka-contrib-mongodb-persistence-snaps
* [Collection and Index](#mongocollection)
* [Write Concerns](#writeconcern)
* [ReactiveMongo Failover](#rxmfailover)
* [Stream buffer size](#buffer-size)
* [Dispatcher](#dispatcher)
* [Pass-Through BSON](#passthru)
* [Legacy Serialization](#legacyser)
Expand Down Expand Up @@ -157,6 +158,17 @@ akka.contrib.persistence.mongodb.rxmongo.failover {

See [Reactive Mongo documentation](http://reactivemongo.org/releases/0.12/documentation/advanced-topics/failoverstrategy.html) for more information.

<a name="buffer-size"/>
##### Configuring size of buffer for read stream

Akka persitence mongo uses streams to feed the read side with events. By default, the buffer's size is fixed at `1000
To modify the default value for a specific use case, you can add the following configuration lines in your `application.conf`.

akka.contrib.persistence.stream-buffer-max-size.stream-buffer-max-size.event-by-pid = [your value]
akka.contrib.persistence.stream-buffer-max-size.stream-buffer-max-size.all-events = [your value]
akka.contrib.persistence.stream-buffer-max-size.stream-buffer-max-size.events-by-tag = [your value]
akka.contrib.persistence.stream-buffer-max-size.stream-buffer-max-sizepid = [your value]

<a name="dispatcher"/>
##### Configuring the dispatcher used

Expand Down

0 comments on commit 1add327

Please sign in to comment.