Not a very new, and, probably completely useless approach to implement the distributed pessimistic locking on top of MongoDB. Allows to use MongoDB as a key-value storage with the ability to read-write using pessimistic locks. Additionally this library contains the simple queue implementation that uses tailing cursor.
Add the following dependency to your pom.xml:
<dependency>
<groupId>ru.qatools</groupId>
<artifactId>pessimistic-mongodb</artifactId>
<version>1.9</version>
</dependency>
To use locking as a named locks (by key):
final PessimisticLockng locking = new MongoPessimisticLocking(
mongoClient, // an instance of MongoClient
"locks-database", // mongo database name
"locks-collection", // collection to use
30 // a maximum interval of polling the locks (ms)
);
locking.tryLock("some-key", 5000); // trying to lock the key "some-key" with 5s timeout
new Thread(() -> locking.tryLock("some-key", 5000) ).start(); // thread will wait for 5s
locking.unlock("some-key"); // after this the thread will obtain the lock forever
To use a pessimistic repository:
final PessimisticRepository<String> repo = new MongoPessimisticRepository<String>( locking, String.class );
String value = repo.tryLockAndGet("key", 3000); // locking the 'key'
repo.putAndUnlock("key", "new value"); // releasing the 'key' and writing 'new value' to the repository
repo.removeAndUnlock("key"); // removing key from the repository
To obtain a new unique lock (implementing java.util.concurrent.Lock):
final Lock lock = new MongoPessimisticLock( locking )
lock.tryLock(5, SECONDS); // lock the unique lock
lock.unlock();
To use the simple queue polling
final TailableQueue<String> queue = new MongoTailableQueue(
mongoClient, // instance of MongoClient
"databaseName", // mongo db name
"collectionName", // collection name to use (must be capped!)
10000 // max messages count within queue
);
new Thread(() -> queue.poll( m -> System.out.println(m))).start(); // launch polling thread
queue.add( "hello" ); // send some messages to the queue (will be displayed on console)