diff --git a/driver/src/main/scala/com/outr/arango/queue/OperationQueueSupport.scala b/driver/src/main/scala/com/outr/arango/queue/OperationQueueSupport.scala index e86725b5..7fa79ab8 100644 --- a/driver/src/main/scala/com/outr/arango/queue/OperationQueueSupport.scala +++ b/driver/src/main/scala/com/outr/arango/queue/OperationQueueSupport.scala @@ -15,7 +15,8 @@ import scala.language.implicitConversions * database.collection.op.upsert(docs: _*): IO[Unit] * * This will queue up to `opFlushSize` and then stream the batch in `opChunkSize` into the database. Very useful when - * you are needing to do various operations across potentially multiple collections efficiently. + * you are needing to do various operations across potentially multiple collections efficiently. Make sure to call + * `flushQueue()` when finished to avoid un-pushed operations. */ trait OperationQueueSupport { protected def opFlushSize: Int = 10_000 @@ -28,7 +29,13 @@ trait OperationQueueSupport { q.asInstanceOf[OperationsQueue[D, M]] } - def flushQueue(): IO[Unit] = ops.values.map(_.flush()).toList.sequence.void + /** + * Fully flushes all pending operation queues. + */ + def flushQueue(): IO[Unit] = ops.values.map(_.op.flush()).toList.sequence.void - def clear(): IO[Unit] = IO(ops.clear()) + /** + * Clears the operation queues and removes all pending operations. + */ + def clearQueue(): IO[Unit] = IO(ops.clear()) } \ No newline at end of file diff --git a/driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala b/driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala index c29b5a05..a21a9258 100644 --- a/driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala +++ b/driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala @@ -10,17 +10,25 @@ import java.util.concurrent.atomic.AtomicInteger case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection: DocumentCollection[D, M], flushSize: Int, - chunkSize: Int) { - oq => + chunkSize: Int) { oq => private var queues = List.empty[OpQueue] + /** + * Provide queue operations on a collection. Call `flush()` at the end to make sure all batched data is pushed. + */ object op { lazy val insert: OpQueue = OpQueue(stream => collection.stream.insert(stream, chunkSize).void) lazy val upsert: OpQueue = OpQueue(stream => collection.stream.upsert(stream, chunkSize).void) lazy val delete: OpQueue = OpQueue(stream => collection.stream.delete(stream.map(_._id), chunkSize).void) - } - def flush(): IO[Unit] = queues.map(_.flush()).sequence.void + /** + * Flushes the queue + * + * @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count + * is below the flushSize threshold. + */ + def flush(fullFlush: Boolean = true): IO[Unit] = queues.map(_.flush(fullFlush)).sequence.void + } case class OpQueue(process: fs2.Stream[IO, D] => IO[Unit]) { oq.synchronized { @@ -46,18 +54,28 @@ case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection: } } + /** + * Queue operations for the supplied docs. If this causes the flushSize to overflow, a flush will occur before this + * returns. Otherwise, this is a very fast operation. + */ def apply(docs: D*): IO[Unit] = IO { docs.foreach(queue.add) counter.addAndGet(docs.length) }.flatMap { size => if (size >= flushSize) { - flush() + flush(fullFlush = false) } else { IO.unit } } - def flush(fullFlush: Boolean = false): IO[Unit] = IO(take(chunkSize)).flatMap { list => + /** + * Flushes the queue + * + * @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count + * is below the flushSize threshold. + */ + def flush(fullFlush: Boolean = true): IO[Unit] = IO(take(chunkSize)).flatMap { list => if (list.isEmpty) { IO.unit } else { @@ -73,4 +91,4 @@ case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection: } } } -} +} \ No newline at end of file