Skip to content

Commit

Permalink
Allow for "heavy load" migration to suffixed collections (#233)
Browse files Browse the repository at this point in the history
Migration Tool - support `parallel-by-pid` (heavy mode) for larger event journal migrations

* add 'heavy load' migration feature
* update documentation, and add 'heavy-load' unit tests

Work done by @JeanFrancoisGuena
  • Loading branch information
JeanFrancoisGuena authored and scullxbones committed Jun 10, 2019
1 parent 9b722f8 commit 7eb936f
Show file tree
Hide file tree
Showing 7 changed files with 491 additions and 149 deletions.
11 changes: 8 additions & 3 deletions common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ akka {

## used with ScalaDriverMigrateToSuffixedCollections tool (see docs)
suffix-migration {
# for these 3 properties, a value of zero means unlimited retries (not recommanded)
max-insert-retry = 1
max-delete-retry = 1
heavy-load = false

# for these 3 properties, a value of zero means unlimited retries (not recommended)
max-insert-retry = 1 // ignored if heavy-load = true
max-delete-retry = 1 // ignored if heavy-load = true
max-empty-metadata-retry = 1

# if set to zero or negative value, defaults to 1
parallelism = 1 // ignored if heavy-load = false
}

metrics-builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,13 @@ class MongoSettings(val config: Config) {
val SuffixSeparator: String = config.getString("suffix-builder.separator")
val SuffixDropEmptyCollections: Boolean = config.getBoolean("suffix-drop-empty-collections")

val SuffixMigrationMaxInsertRetry: Int = config.getInt("suffix-migration.max-insert-retry")
val SuffixMigrationMaxDeleteRetry: Int = config.getInt("suffix-migration.max-delete-retry")
val SuffixMigrationMaxEmptyMetadataRetry: Int = config.getInt("suffix-migration.max-empty-metadata-retry")
val SuffixMigrationHeavyLoad: Boolean = Option(config.getBoolean("suffix-migration.heavy-load")).getOrElse(false)

val SuffixMigrationMaxInsertRetry: Int = Option(config.getInt("suffix-migration.max-insert-retry")).filter(_ >= 0).getOrElse(1)
val SuffixMigrationMaxDeleteRetry: Int = Option(config.getInt("suffix-migration.max-delete-retry")).filter(_ >= 0).getOrElse(1)
val SuffixMigrationMaxEmptyMetadataRetry: Int = Option(config.getInt("suffix-migration.max-empty-metadata-retry")).filter(_ >= 0).getOrElse(1)

val SuffixMigrationParallelism: Int = Option(config.getInt("suffix-migration.parallelism")).filter(_ > 0).getOrElse(1)

val MongoMetricsBuilderClass: String = config.getString("metrics-builder.class")

Expand Down
123 changes: 59 additions & 64 deletions docs/akka24.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,14 +546,14 @@ Keep in mind, while designing `getSuffixfromPersistenceId` and `validateMongoCha
##### Batch writing
Writes remain *atomic at the batch level*, as explained [above](#model) but, as events are now persisted in a "per collection manner", it does not mean anymore that *if the plugin is sent 100 events, these are persisted in mongo as a single document*.

Events are first *grouped* by collection, then batch-persisted, each group of events in its own correspondant suffixed journal. This means our 100 events may be persisted in mongo as *several* documents, decreasing performances but allowing multiple journals.
Events are first *grouped* by collection, then batch-persisted, each group of events in its own correspondent suffixed journal. This means our 100 events may be persisted in mongo as *several* documents, decreasing performances but allowing multiple journals.

If enabled (via the `akka.contrib.persistence.mongodb.mongo.realtime-enable-persistence` configuration property) inserts inside capped collections for live queries are performed the usual way, in one step. No grouping here, our 100 events are still persisted as a single document in "akka_persistence_realtime" collection.

##### Reading
Instead of reading a single journal, we now collect all journals and, for each of them, perform the appropriate Mongo queries.

Of course, for reading via the "xxxByPersistenceId" methods, we directly point to the correspondant journal collection.
Of course, for reading via the "xxxByPersistenceId" methods, we directly point to the correspondent journal collection.

<a name="suffixmigration"/>

Expand All @@ -565,9 +565,7 @@ We provide a **basic** migration tool from **1.x** unique journal and snapshot t
###### How does it work ?
The main idea is to parse unique journal, pick up every record, insert it in newly created appropriate suffixed journal, and finally remove it from unique journal. Additionally, we do the same for snapshots, and remove all records from "akka_persistence_metadata" capped collection. This capped collection will be built again through usual event sourcing process...

Of course, this process would be very long, but thanks to *aggregation*, we actually "gather" records by future suffixed collection, then by *persistence Id*, append (i.e. *INSERT*) them **in one step** (meaning all records of each *persistence Id*) to that new suffixed collection, and remove (i.e. *DELETE*) them **in one step**, from unique original collection.

Additionally, we offer the possibility to try these *INSERT* and *DELETE* operations multiple times, as the process runs such operations in parallel and may lead to Mongo timeouts. We also offer the same possibility for removing all records from "akka_persistence_metadata" capped collection (see configuration below)
Of course, this process would be very long, but thanks to *aggregation*, we actually "gather" records by future suffixed collection, append them **one by one** to that new suffixed collection, and remove them **in one step**, from unique original collection. Appending records to new collection *one by one* may appear as a bad choice regarding performance issues, but trying to append a great number of records in a single bulk operation may lead to `OutOfMemoryError` exception. Remember, its a *basic* tool and there is no need to hurry here, as this is actually a maintenance operation. So, once more, let's keep it simple but efficient.

###### Recommended migration steps:
* **backup your database** (use, for example, the `mongodump` command)
Expand Down Expand Up @@ -596,88 +594,85 @@ Of course, once this is done, you should **not** start your application, unless
###### Configuration
Add the following to your `build.sbt` file:
```scala
libraryDependencies ++= Seq( "com.github.scullxbones" %% "akka-persistence-mongo-tools" % "2.2.5",
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" )
libraryDependencies ++= Seq( "com.github.scullxbones" %% "akka-persistence-mongo-tools" % "1.4.3",
"org.mongodb" %% "casbah" % "3.1.0" )
```
Notice that even if you currently don't use it, migration process is performed through Official Scala driver.

Notice that if you use Official Scala driver, `"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2"` dependency should already be part of your `build.sbt` file.
Notice that even if you currently don't use it, migration process is performed through Casbah driver.

Additionally, you may configure your logging system with **INFO** level for `ScalaDriverMigrateToSuffixedCollections` class, otherwise there will be no output to console or log files. With *log4J*, this should be done like that:
```xml
<logger name="akka.contrib.persistence.mongodb.ScalaDriverMigrateToSuffixedCollections" level="INFO" />
```
Notice that if you use Casbah driver, `"org.mongodb" %% "casbah" % "3.1.0"` dependency should already be part of your `build.sbt` file.

Optionally, you can configure how many times *INSERT* and *DELETE* operations may take place, and how many attempts to empty "akka_persistence_metadata" capped collection may occur, through the following properties:
```
akka.contrib.persistence.mongodb.mongo.suffix-migration.max-insert-retry = 1
akka.contrib.persistence.mongodb.mongo.suffix-migration.max-delete-retry = 1
akka.contrib.persistence.mongodb.mongo.suffix-migration.max-empty-metadata-retry = 1
Additionally, you may configure your logging system with **INFO** level for `MigrateToSuffixedCollections` class, otherwise there will be no output to console or log files. With *log4J*, this should be done like that:
```xml
<logger name="akka.contrib.persistence.mongodb.MigrateToSuffixedCollections" level="INFO" />
```
Careful, the value `0` means **unlimited** retries (not recommanded)

###### Code
Provide an `ActorSystem`, instantiate a `ScalaDriverMigrateToSuffixedCollections` class and call its `migrateToSuffixCollections` method as shown in the very basic following example:
Provide an `ActorSystem`, instantiate a `MigrateToSuffixedCollections` class and call its `migrateToSuffixCollections` method as shown in the following example:
```scala
package com.mycompany.myproject.myapplication.main


import akka.actor.ActorSystem
val system: ActorSystem = ActorSystem("my system name", myConfig)

import akka.contrib.persistence.mongodb.ScalaDriverMigrateToSuffixedCollections
try {
Await.result(new ScalaDriverMigrateToSuffixedCollections(system).migrateToSuffixCollections, myDuration)
} catch {
case t: Throwable =>
println("Error occurred on migration to suffixed collections")
t.printStackTrace()
object Migrate extends App {
import akka.actor.ActorSystem
val system: ActorSystem = ActorSystem("my system name", myConfig)

import akka.contrib.persistence.mongodb.MigrateToSuffixedCollections
val migration = new MigrateToSuffixedCollections(system)
try {
migration.migrateToSuffixCollections()
} catch {
case t: Throwable =>
println("Error occurred on migration to suffixed collections")
t.printStackTrace()
System.exit(-1)
}
}

```
Providing an `ActorSystem` depends on the manner your application is designed and is beyond the scope of this documentation.

Running this process, we should see something like this (remember to configure INFO level for `ScalaDriverMigrateToSuffixedCollections` class)
As the process **must** be performed offline, its a good idea to use an object (we call it "Migrate" in our example) extending the `scala.App` trait and run it through the `sbt run` command that allows us to choose which one to run:

```
Multiple main classes detected, select one to run:
[1] com.mycompany.myproject.myapplication.main.Main
[2] com.mycompany.myproject.myapplication.main.Migrate
Enter number:
```
If we choose number 2 here, we should see something like this (remember to configure INFO level for `MigrateToSuffixedCollections` class)
```
2019-04-24_15:43:31.823 INFO - Starting automatic migration to collections with suffixed names
2016-09-23_15:43:31.823 INFO - Starting automatic migration to collections with suffixed names
This may take a while...
2019-04-24_15:43:36.517 INFO - 1/1 records were handled for suffixed collection 'akka_persistence_journal_foo1'
2019-04-24_15:43:36.519 INFO - 1/1 records were successfully transferred to 'akka_persistence_journal_foo1'
2019-04-24_15:43:36.536 INFO - 1/1 records, previously copied to 'akka_persistence_journal_foo1', were successfully removed from 'akka_persistence_journal'
2019-04-24_15:43:36.647 INFO - 24/24 records were handled for suffixed collection 'akka_persistence_journal_foo2'
2019-04-24_15:43:36.649 INFO - 24/24 records were successfully transferred to 'akka_persistence_journal_foo2'
2019-04-24_15:43:36.652 INFO - 24/24 records, previously copied to 'akka_persistence_journal_foo2', were successfully removed from 'akka_persistence_journal'
2019-04-24_15:44:58.088 INFO - 74013/74013 records were handled for suffixed collection 'akka_persistence_journal_foo3'
2019-04-24_15:44:58.090 INFO - 74013/74013 records were successfully transferred to 'akka_persistence_journal_foo3'
2019-04-24_15:45:07.559 INFO - 74013/74013 records, previously copied to 'akka_persistence_journal_foo3', were successfully removed from 'akka_persistence_journal'
2019-04-24_15:45:20.421 INFO - 54845/54845 records were handled for suffixed collection 'akka_persistence_journal_foo4'
2019-04-24_15:45:20.423 INFO - 54845/54845 records were successfully transferred to 'akka_persistence_journal_foo4'
2019-04-24_15:45:25.494 INFO - 54845/54845 records, previously copied to 'akka_persistence_journal_foo4', were successfully removed from 'akka_persistence_journal'
2019-04-24_15:45:25.500 INFO - JOURNALS: 128959/128959 records were handled
2019-04-24_15:45:25.502 INFO - JOURNALS: 128883/128959 records were successfully transferred to suffixed collections
2019-04-24_15:45:25.502 INFO - JOURNALS: 128883/128959 records were successfully removed from 'akka_persistence_journal'collection
2019-04-24_15:45:25.502 INFO - JOURNALS: 76/128959 records were ignored and remain in 'akka_persistence_journal'
2019-04-24_15:45:25.783 INFO - 2/2 records were handled for suffixed collection 'akka_persistence_snaps_foo4'
2019-04-24_15:45:25.785 INFO - 2/2 records were successfully transferred to 'akka_persistence_snaps_foo4'
2019-04-24_15:45:25.788 INFO - 2/2 records, previously copied to 'akka_persistence_snaps_foo4', were successfully removed from 'akka_persistence_snaps'
2019-04-24_15:45:25.912 INFO - 101/101 records were handled for suffixed collection 'akka_persistence_snaps_foo3'
2019-04-24_15:45:25.915 INFO - 101/101 records were successfully transferred to 'akka_persistence_snaps_foo3'
2019-04-24_15:45:25.931 INFO - 101/101 records, previously copied to 'akka_persistence_snaps_foo3', were successfully removed from 'akka_persistence_snaps'
2019-04-24_15:45:25.932 INFO - SNAPSHOTS: 103/103 records were handled
2019-04-24_15:45:25.932 INFO - SNAPSHOTS: 103/103 records were successfully transferred to suffixed collections
2019-04-24_15:45:25.933 INFO - SNAPSHOTS: 103/103 records were successfully removed from 'akka_persistence_snaps'collection
2019-04-24_15:45:25.936 INFO - METADATA: all 106 records were successfully removed from 'akka_persistence_metadata' collection
2019-04-24_15:45:25.974 INFO - Automatic migration to collections with suffixed names has completed
```
Notice that records **may** remain in unique collections "akka_persistence_journal" and "akka_persistence_snapshot" in case your `getSuffixfromPersistenceId` and `validateMongoCharacters` methods sometimes return an empty string. In that case, an information regarding these records is printed in the console above, and a warning is also printed if *inserted* records does not equal *removed* records.
2016-09-23_15:43:36.519 INFO - 1/1 records were inserted into 'akka_persistence_journal_foo1'
2016-09-23_15:43:36.536 INFO - 1/1 records, previously copied to 'akka_persistence_journal_foo1', were removed from 'akka_persistence_journal'
2016-09-23_15:43:36.649 INFO - 24/24 records were inserted into 'akka_persistence_journal_foo2'
2016-09-23_15:43:36.652 INFO - 24/24 records, previously copied to 'akka_persistence_journal_foo2', were removed from 'akka_persistence_journal'
2016-09-23_15:44:58.090 INFO - 74013/74013 records were inserted into 'akka_persistence_journal_foo3'
2016-09-23_15:45:07.559 INFO - 74013/74013 records, previously copied to 'akka_persistence_journal_foo3', were removed from 'akka_persistence_journal'
2016-09-23_15:45:20.423 INFO - 54845/54845 records were inserted into 'akka_persistence_journal_foo4'
2016-09-23_15:45:25.494 INFO - 54845/54845 records, previously copied to 'akka_persistence_journal_foo4', were removed from 'akka_persistence_journal'
2016-09-23_15:45:25.502 INFO - 76 records were ignored and remain in 'akka_persistence_journal'
2016-09-23_15:45:25.502 INFO - JOURNALS: 128883/128959 records were successfully transfered to suffixed collections
2016-09-23_15:45:25.502 INFO - JOURNALS: 76/128959 records were ignored and remain in 'akka_persistence_journal'
2016-09-23_15:45:25.502 INFO - JOURNALS: 128883 + 76 = 128959, all records were successfully handled
2016-09-23_15:45:25.785 INFO - 2/2 records were inserted into 'akka_persistence_snaps_foo4'
2016-09-23_15:45:25.788 INFO - 2/2 records, previously copied to 'akka_persistence_snaps_foo4', were removed from 'akka_persistence_snaps'
2016-09-23_15:45:25.915 INFO - 101/101 records were inserted into 'akka_persistence_snaps_foo3'
2016-09-23_15:45:25.931 INFO - 101/101 records, previously copied to 'akka_persistence_snaps_foo3', were removed from 'akka_persistence_snaps'
2016-09-23_15:45:25.932 INFO - SNAPSHOTS: 103/103 records were successfully transfered to suffixed collections
2016-09-23_15:45:25.936 INFO - METADATA: 106/106 records were successfully removed from akka_persistence_metadata collection
2016-09-23_15:45:25.974 INFO - Automatic migration to collections with suffixed names has completed
```
Notice that records **may** remain in unique collections "akka_persistence_journal" and "akka_persistence_snapshot" in case your `getSuffixfromPersistenceId` and `validateMongoCharacters` methods sometimes return an empty string. In that case, an information regarding these records is printed in the console above, and a warning is also printed if *migrated* + *ignored* records does not equal *total* records.

Notice that unique collections "akka_persistence_journal" and "akka_persistence_snapshot" remain in the database, even if empty. You should remove them if you want, using mongo shell...

###### What's next ?
**Keep *suffixed collection names* feature enabled** as explained in [*suffixed collection names* usage](#suffixusage), and of course, **do not modify** your `getSuffixfromPersistenceId` and `validateMongoCharacters` methods.

Keep your database safe, **avoid running again the migration process**, so:
* remove migration code
* remove migration code (in our example, we remove our `Migrate` object)
* remove `akka-persistence-mongo-tools` dependency from your `build.sbt` file

That's it, you should **start your application** and enjoy *suffixed collection names* feature.
Loading

0 comments on commit 7eb936f

Please sign in to comment.