Skip to content

Commit

Permalink
Lots of work on UpsertBuilder and switched schema.rule to be JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Sep 29, 2023
1 parent 8f88ca3 commit 204f678
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 6 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ val scala3 = "3.3.1"

name := "scarango"
ThisBuild / organization := "com.outr"
ThisBuild / version := "3.15.0"
ThisBuild / version := "3.15.1-SNAPSHOT"
ThisBuild / scalaVersion := scala213
ThisBuild / crossScalaVersions := List(scala3, scala213)
ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package com.outr.arango.core

case class CollectionSchema(rule: Option[String] = None, level: Option[Level] = None, message: Option[String] = None)
import fabric.Json

case class CollectionSchema(rule: Option[Json] = None, level: Option[Level] = None, message: Option[String] = None)
1 change: 0 additions & 1 deletion core/src/main/scala/com/outr/arango/query/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import scala.language.implicitConversions

package object query {
implicit def sc2AQL(sc: StringContext): AQLInterpolator = new AQLInterpolator(sc)

implicit def rw2QueryPart[T: RW](t: T): QueryPart = value2QueryPart(t.json)
implicit def value2QueryPart(value: Json): QueryPart = QueryPart.Variable(value)
implicit def tuple2QueryPart(t: (String, Json)): QueryPart = QueryPart.NamedVariable(t._1, t._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ class DocumentCollection[D <: Document[D], M <: DocumentModel[D]](protected[aran
def ref: DocumentRef[D, M] = DocumentRef[D, M](model, None)

lazy val update: UpdateBuilder[D, M] = UpdateBuilder(this)
lazy val upsert: UpsertBuilder[D, M] = UpsertBuilder(this)
}
111 changes: 111 additions & 0 deletions driver/src/main/scala/com/outr/arango/collection/UpsertBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.outr.arango.collection

import cats.effect.IO
import com.outr.arango.query._
import com.outr.arango.query.dsl._
import com.outr.arango.{Document, DocumentModel, DocumentRef, FieldAndValue}
import fabric.{Json, Str}
import fabric.io.JsonFormatter
import fabric.rw._

case class UpsertBuilder[D <: Document[D], M <: DocumentModel[D]](collection: DocumentCollection[D, M],
search: List[(String, QueryPart)] = Nil,
insert: Option[Json] = None,
upsert: Option[Upsert[D]] = None,
ignoreErrors: Boolean = false,
keepNull: Boolean = true,
mergeObjects: Boolean = true,
waitForSync: Boolean = false,
ignoreRevs: Boolean = true,
exclusive: Boolean = false,
indexHint: Option[String] = None,
forceIndexHint: Boolean = false) {
private implicit def rw: RW[D] = collection.model.rw

def withSearch(f: FieldAndValue[_]): UpsertBuilder[D, M] =
withSearch(f.field.fieldName, QueryPart.Variable(f.value))
def withSearch(entry: (String, QueryPart)): UpsertBuilder[D, M] = copy(
search = entry :: search
)
def withListSearch[T <: Document[T], TM <: DocumentModel[T]](list: List[T])
(f: DocumentRef[T, TM] => List[SearchEntry]): UpsertBuilder[D, M] = {
???
}
def withInsert(doc: D): UpsertBuilder[D, M] = withInsert(doc.json(collection.model.rw))
def withInsert(json: Json): UpsertBuilder[D, M] = withInsert(JsonFormatter.Compact(json))
def withInsert(insert: String): UpsertBuilder[D, M] = copy(insert = Some(insert))

def withUpdate(doc: D): UpsertBuilder[D, M] = withUpdate(doc.json(collection.model.rw))
def withUpdate(json: Json): UpsertBuilder[D, M] = withUpdate(JsonFormatter.Compact(json))
def withUpdate(update: String): UpsertBuilder[D, M] = copy(upsert = Some(Upsert.Update(update)))

def withReplace(doc: D): UpsertBuilder[D, M] = copy(upsert = Some(Upsert.Replace(doc)))

def withOptions(ignoreErrors: Boolean = false,
keepNull: Boolean = true,
mergeObjects: Boolean = true,
waitForSync: Boolean = false,
ignoreRevs: Boolean = true,
exclusive: Boolean = false,
indexHint: Option[String] = None,
forceIndexHint: Boolean = false): UpsertBuilder[D, M] = copy(
ignoreErrors = ignoreErrors,
keepNull = keepNull,
mergeObjects = mergeObjects,
waitForSync = waitForSync,
ignoreRevs = ignoreRevs,
exclusive = exclusive,
indexHint = indexHint,
forceIndexHint = forceIndexHint
)

def toQuery(includeReturn: Boolean): Query = {
assert(search.nonEmpty, "At least one search criteria must be defined")
assert(insert.nonEmpty, "Insert must be defined")
assert(upsert.nonEmpty, "Update or Replace must be defined")

val commaPart = QueryPart.Static(", ")
val searchQuery = Query(search.flatMap {
case (key, value) => List(
commaPart,
QueryPart.Static(s"$key: "),
value
)
}.tail)

val upsertQuery = aql"""UPSERT { $searchQuery }"""
val insertQuery = aql"""INSERT ${insert.get}"""
val updateReplaceQuery = upsert.get match {
case Upsert.Update(value) => aql"""UPDATE ${QueryPart.Static(value)} IN $collection"""
case Upsert.Replace(replacement) => aql"""REPLACE $replacement IN $collection"""
}
var queries = List(upsertQuery, insertQuery, updateReplaceQuery)
if (includeReturn) {
val returnQuery = aql"""RETURN { original: OLD, newValue: NEW }"""
queries = queries ::: List(returnQuery)
}
Query.merge(queries)
}

def toStream: fs2.Stream[IO, UpsertResult[D]] = {
val query = toQuery(includeReturn = true)
collection.graph.query[UpsertResult[D]](query).stream()
}

def toList: IO[List[UpsertResult[D]]] = toStream.compile.toList
}

sealed trait Upsert[D <: Document[D]]

object Upsert {
case class Update[D <: Document[D]](value: String) extends Upsert[D]
case class Replace[D <: Document[D]](replacement: D) extends Upsert[D]
}

case class UpsertResult[D](original: Option[D], newValue: D)

object UpsertResult {
implicit def rw[D: RW]: RW[UpsertResult[D]] = RW.gen
}

sealed trait SearchEntry
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.outr.arango.mutation.DataMutation
import com.outr.arango.util.Helpers._
import com.outr.arango.{Field, Index, IndexInfo, IndexType}
import fabric.Json
import fabric.io.JsonFormatter
import fabric.rw.RW

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -49,7 +50,7 @@ class ArangoDBCollection(val _collection: arangodb.ArangoCollection) extends Ara
val o = new CollectionPropertiesOptions
val arangoSchema = new model.CollectionSchema
schema.foreach { s =>
s.rule.foreach(arangoSchema.setRule)
s.rule.foreach(json => arangoSchema.setRule(JsonFormatter.Default(json)))
s.level.foreach { l =>
arangoSchema.setLevel(l match {
case Level.Moderate => model.CollectionSchema.Level.MODERATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.outr.arango.core
import com.arangodb.model
import com.arangodb.model.OptionsBuilder
import com.outr.arango.util.Helpers._
import fabric.io.JsonFormatter

class ArangoDBCollectionCreateOptions(collectionName: String, o: CreateCollectionOptions) {
private[arango] lazy val arango: model.CollectionCreateOptions = {
Expand Down Expand Up @@ -34,7 +35,7 @@ class ArangoDBCollectionCreateOptions(collectionName: String, o: CreateCollectio
o.shardingStrategy.foreach(c.shardingStrategy)
o.smartJoinAttribute.foreach(c.smartJoinAttribute)
val schema = new model.CollectionSchema
o.collectionSchema.rule.foreach(schema.setRule)
o.collectionSchema.rule.foreach(json => schema.setRule(JsonFormatter.Default(json)))
o.collectionSchema.level.foreach { l =>
schema.setLevel(l match {
case Level.None => model.CollectionSchema.Level.NONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait OperationQueueSupport {
private val ops = TrieMap.empty[String, OperationsQueue[_, _]]

implicit def collectionToOps[D <: Document[D], M <: DocumentModel[D]](collection: DocumentCollection[D, M]): OperationsQueue[D, M] = {
val q = ops.getOrElseUpdate(collection.dbName, OperationsQueue(collection, opFlushSize, opChunkSize))
val q = ops.getOrElseUpdate(collection.name, OperationsQueue(collection, opFlushSize, opChunkSize))
q.asInstanceOf[OperationsQueue[D, M]]
}

Expand Down
19 changes: 19 additions & 0 deletions driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package com.outr.arango.queue
import cats.effect.IO
import cats.implicits.toTraverseOps
import com.outr.arango.collection.DocumentCollection
import com.outr.arango.query._
import com.outr.arango.{Document, DocumentModel}
import fabric.rw.RW

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -20,6 +22,23 @@ case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection:
lazy val insert: OpQueue = OpQueue(stream => collection.stream.insert(stream, chunkSize).void)
lazy val upsert: OpQueue = OpQueue(stream => collection.stream.upsert(stream, chunkSize).void)
lazy val delete: OpQueue = OpQueue(stream => collection.stream.delete(stream.map(_._id), chunkSize).void)
def createUpsertReplace(searchFields: String*): OpQueue = OpQueue { stream =>
val searchQuery = QueryPart.Static(searchFields.map { field =>
s"$field: doc.$field"
}.mkString("{", ", ", "}"))
implicit def rw: RW[D] = collection.model.rw
stream.compile.toList.flatMap { list =>
val query =
aql"""
FOR doc IN $list
UPSERT $searchQuery
INSERT doc
REPLACE doc
IN $collection
"""
collection.graph.execute(query)
}
}

/**
* Flushes the queue
Expand Down
21 changes: 21 additions & 0 deletions driver/src/test/scala/spec/AdvancedSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,27 @@ class AdvancedSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with Ope
person.modified should be > bethanyLastModified
}
}
"upsert Donna's bio" in {
database.people.upsert
.withSearch(Person.name("Donna"))
.withInsert(Person(name = "Donna", age = 41, bio = "New Record"))
.withReplace(Person(name = "Donna", age = 41, bio = "Replaced!"))
.toList
.map { results =>
results.length should be(1)
val result = results.head
result.original should not be None
result.original.get.name should be("Donna")
result.newValue.name should be("Donna")
result.newValue.age should be(41)
result.newValue.bio should be("Replaced!")
}
}
// "upsert multiple bios" in {
// database.people.upsert
// .withListSearch()
// }
// TODO: Upsert from a list
"batch delete" in {
database.people.query.byFilter(_.age > 10).toList.flatMap { list =>
list.map(_.name).toSet should be(Set("Bethany", "Donna", "Adam"))
Expand Down

0 comments on commit 204f678

Please sign in to comment.