From c394d25ccc3a30e0174f7853072ee31887b1c610 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 4 Aug 2020 11:04:50 +1200 Subject: [PATCH] Update failure handling in java support --- .../javasupport/CloudStateRunner.scala | 2 +- .../javasupport/impl/Contexts.scala | 3 + .../AnnotationBasedEventSourcedSupport.scala | 8 +- .../impl/eventsourced/EventSourcedImpl.scala | 109 +++++-- .../eventsourced/EventSourcedImplSpec.scala | 305 ++++++++++++++++++ .../impl/eventsourced/TestEventSourced.scala | 58 ++++ 6 files changed, 449 insertions(+), 36 deletions(-) create mode 100644 java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImplSpec.scala create mode 100644 java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/TestEventSourced.scala diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala b/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala index eb0e2bae7..7705e2d73 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala @@ -62,7 +62,7 @@ object CloudStateRunner { * CloudStateRunner can be seen as a low-level API for cases where [[io.cloudstate.javasupport.CloudState.start()]] isn't enough. */ final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[String, StatefulService]) { - private[this] implicit final val system = _system + private[javasupport] implicit final val system = _system private[this] implicit final val materializer: Materializer = ActorMaterializer() private[this] final val configuration = diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/Contexts.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/Contexts.scala index b1ed1f8d6..b23962304 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/Contexts.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/Contexts.scala @@ -61,6 +61,7 @@ private[impl] trait AbstractClientActionContext extends ClientActionContext { checkActive() if (error.isEmpty) { error = Some(errorMessage) + logError(errorMessage) throw FailInvoked } else throw new IllegalStateException("fail(…) already previously invoked!") } @@ -81,6 +82,8 @@ private[impl] trait AbstractClientActionContext extends ClientActionContext { final def hasError: Boolean = error.isDefined + protected def logError(message: String): Unit = () + final def createClientAction(reply: Optional[JavaPbAny], allowNoReply: Boolean): Option[ClientAction] = error match { case Some(msg) => Some(ClientAction(ClientAction.Action.Failure(Failure(commandId, msg)))) diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala index 8f4f05dd5..37bab1682 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala @@ -25,6 +25,7 @@ import io.cloudstate.javasupport.impl.{AnySupport, ReflectionHelper, ResolvedEnt import scala.collection.concurrent.TrieMap import com.google.protobuf.{Descriptors, Any => JavaPbAny} +import io.cloudstate.javasupport.impl.eventsourced.EventSourcedImpl.EntityException import io.cloudstate.javasupport.{EntityFactory, ServiceCallFactory} /** @@ -78,7 +79,7 @@ private[impl] class AnnotationBasedEventSourcedSupport( } handler.invoke(entity, event, ctx) case None => - throw new RuntimeException( + throw EntityException( s"No event handler found for event ${event.getClass} on $behaviorsString" ) } @@ -88,7 +89,8 @@ private[impl] class AnnotationBasedEventSourcedSupport( behavior.commandHandlers.get(context.commandName()).map { handler => handler.invoke(entity, command, context) } getOrElse { - throw new RuntimeException( + throw EntityException( + context, s"No command handler found for command [${context.commandName()}] on $behaviorsString" ) } @@ -104,7 +106,7 @@ private[impl] class AnnotationBasedEventSourcedSupport( } handler.invoke(entity, snapshot, ctx) case None => - throw new RuntimeException( + throw EntityException( s"No snapshot handler found for snapshot ${snapshot.getClass} on $behaviorsString" ) } diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala index bbabd0151..5187cb9be 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala @@ -20,6 +20,7 @@ import java.util.Optional import akka.NotUsed import akka.actor.ActorSystem +import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.Flow import com.google.protobuf.{Descriptors, Any => JavaPbAny} import com.google.protobuf.any.{Any => ScalaPbAny} @@ -35,14 +36,16 @@ import io.cloudstate.javasupport.impl.{ ResolvedEntityFactory, ResolvedServiceMethod } +import io.cloudstate.protocol.entity.{Command, Failure} import io.cloudstate.protocol.event_sourced.EventSourcedStreamIn.Message.{ Command => InCommand, Empty => InEmpty, Event => InEvent, Init => InInit } -import io.cloudstate.protocol.event_sourced.EventSourcedStreamOut.Message.{Reply => OutReply} +import io.cloudstate.protocol.event_sourced.EventSourcedStreamOut.Message.{Failure => OutFailure, Reply => OutReply} import io.cloudstate.protocol.event_sourced._ +import scala.util.control.NonFatal final class EventSourcedStatefulService(val factory: EventSourcedEntityFactory, override val descriptor: Descriptors.ServiceDescriptor, @@ -65,11 +68,53 @@ final class EventSourcedStatefulService(val factory: EventSourcedEntityFactory, this } +object EventSourcedImpl { + final case class EntityException(entityId: String, commandId: Long, commandName: String, message: String) + extends RuntimeException(message) + + object EntityException { + def apply(message: String): EntityException = + EntityException(entityId = "", commandId = 0, commandName = "", message) + + def apply(command: Command, message: String): EntityException = + EntityException(command.entityId, command.id, command.name, message) + + def apply(context: CommandContext, message: String): EntityException = + EntityException(context.entityId, context.commandId, context.commandName, message) + } + + object ProtocolException { + def apply(message: String): EntityException = + EntityException(entityId = "", commandId = 0, commandName = "", "Protocol error: " + message) + + def apply(init: EventSourcedInit, message: String): EntityException = + EntityException(init.entityId, commandId = 0, commandName = "", "Protocol error: " + message) + + def apply(command: Command, message: String): EntityException = + EntityException(command.entityId, command.id, command.name, "Protocol error: " + message) + } + + def failure(cause: Throwable): Failure = cause match { + case e: EntityException => Failure(e.commandId, e.message) + case e => Failure(description = "Unexpected failure: " + e.getMessage) + } + + def failureMessage(cause: Throwable): String = cause match { + case EntityException(entityId, commandId, commandName, _) => + val commandDescription = if (commandId != 0) s" for command [$commandName]" else "" + val entityDescription = if (entityId.nonEmpty) s"entity [$entityId]" else "entity" + s"Terminating $entityDescription due to unexpected failure$commandDescription" + case _ => "Terminating entity due to unexpected failure" + } +} + final class EventSourcedImpl(_system: ActorSystem, _services: Map[String, EventSourcedStatefulService], rootContext: Context, configuration: Configuration) extends EventSourced { + import EventSourcedImpl._ + private final val system = _system private final val services = _services.iterator .map({ @@ -79,6 +124,8 @@ final class EventSourcedImpl(_system: ActorSystem, }) .toMap + private val log = Logging(system.eventStream, this.getClass) + /** * The stream. One stream will be established per active entity. * Once established, the first message sent will be Init, which contains the entity ID, and, @@ -99,18 +146,17 @@ final class EventSourcedImpl(_system: ActorSystem, case (Seq(EventSourcedStreamIn(InInit(init), _)), source) => source.via(runEntity(init)) case _ => - // todo better error - throw new RuntimeException("Expected Init message") + throw ProtocolException("Expected Init message") } .recover { - case e => - // FIXME translate to failure message - throw e + case error => + log.error(error, failureMessage(error)) + EventSourcedStreamOut(OutFailure(failure(error))) } private def runEntity(init: EventSourcedInit): Flow[EventSourcedStreamIn, EventSourcedStreamOut, NotUsed] = { val service = - services.getOrElse(init.serviceName, throw new RuntimeException(s"Service not found: ${init.serviceName}")) + services.getOrElse(init.serviceName, throw ProtocolException(init, s"Service not found: ${init.serviceName}")) val handler = service.factory.create(new EventSourcedContextImpl(init.entityId)) val thisEntityId = init.entityId @@ -137,22 +183,18 @@ final class EventSourcedImpl(_system: ActorSystem, (event.sequence, None) case ((sequence, _), InCommand(command)) => if (thisEntityId != command.entityId) - throw new IllegalStateException("Receiving entity is not the intended recipient of command") - val cmd = ScalaPbAny.toJavaProto(command.payload.get) - val context = new CommandContextImpl(thisEntityId, - sequence, - command.name, - command.id, - service.anySupport, - handler, - service.snapshotEvery) + throw ProtocolException(command, "Receiving entity is not the intended recipient of command") + val cmd = + ScalaPbAny.toJavaProto(command.payload.getOrElse(throw ProtocolException(command, "No command payload"))) + val context = + new CommandContextImpl(thisEntityId, sequence, command.name, command.id, service.anySupport, log) val reply = try { - handler.handleCommand(cmd, context) // FIXME is this allowed to throw + handler.handleCommand(cmd, context) } catch { - case FailInvoked => - Optional.empty[JavaPbAny]() - // Ignore, error already captured + case FailInvoked => Optional.empty[JavaPbAny]() // Ignore, error already captured + case e: EntityException => throw e + case NonFatal(error) => throw EntityException(command, "Unexpected failure: " + error.getMessage) } finally { context.deactivate() // Very important! } @@ -160,10 +202,16 @@ final class EventSourcedImpl(_system: ActorSystem, val clientAction = context.createClientAction(reply, false) if (!context.hasError) { - val endSequenceNumber = sequence + context.events.size + // apply events from successful command to local entity state + context.events.zipWithIndex.foreach { + case (event, i) => + handler.handleEvent(ScalaPbAny.toJavaProto(event), new EventContextImpl(thisEntityId, sequence + i + 1)) + } + val endSequenceNumber = sequence + context.events.size + val performSnapshot = (endSequenceNumber / service.snapshotEvery) > (sequence / service.snapshotEvery) val snapshot = - if (context.performSnapshot) { + if (performSnapshot) { val s = handler.snapshot(new SnapshotContext with AbstractContext { override def entityId: String = entityId override def sequenceNumber: Long = endSequenceNumber @@ -195,9 +243,9 @@ final class EventSourcedImpl(_system: ActorSystem, )) } case (_, InInit(i)) => - throw new IllegalStateException("Entity already inited") + throw ProtocolException(init, "Entity already inited") case (_, InEmpty) => - throw new IllegalStateException("Received empty/unknown message") + throw ProtocolException(init, "Received empty/unknown message") } .collect { case (_, Some(message)) => EventSourcedStreamOut(message) @@ -213,8 +261,7 @@ final class EventSourcedImpl(_system: ActorSystem, override val commandName: String, override val commandId: Long, val anySupport: AnySupport, - val handler: EventSourcedEntityHandler, - val snapshotEvery: Int) + val log: LoggingAdapter) extends CommandContext with AbstractContext with AbstractClientActionContext @@ -222,16 +269,14 @@ final class EventSourcedImpl(_system: ActorSystem, with ActivatableContext { final var events: Vector[ScalaPbAny] = Vector.empty - final var performSnapshot: Boolean = false override def emit(event: AnyRef): Unit = { checkActive() - val encoded = anySupport.encodeScala(event) - val nextSequenceNumber = sequenceNumber + events.size + 1 - handler.handleEvent(ScalaPbAny.toJavaProto(encoded), new EventContextImpl(entityId, nextSequenceNumber)) - events :+= encoded - performSnapshot = (snapshotEvery > 0) && (performSnapshot || (nextSequenceNumber % snapshotEvery == 0)) + events :+= anySupport.encodeScala(event) } + + override protected def logError(message: String): Unit = + log.error("Fail invoked for command [{}] for entity [{}]: {}", commandName, entityId, message) } class EventSourcedContextImpl(override final val entityId: String) extends EventSourcedContext with AbstractContext diff --git a/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImplSpec.scala b/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImplSpec.scala new file mode 100644 index 000000000..3ec2d3807 --- /dev/null +++ b/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImplSpec.scala @@ -0,0 +1,305 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.impl.eventsourced + +import com.google.protobuf.Empty +import io.cloudstate.javasupport.EntityId +import io.cloudstate.javasupport.eventsourced._ +import io.cloudstate.testkit.eventsourced.{EventSourcedMessages, TestEventSourcedClient} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import scala.collection.mutable +import scala.reflect.ClassTag + +class EventSourcedImplSpec extends WordSpec with Matchers with BeforeAndAfterAll { + import EventSourcedImplSpec._ + import EventSourcedMessages._ + import ShoppingCart.Item + import ShoppingCart.Protocol._ + + val service: TestEventSourcedService = ShoppingCart.testService + val client: TestEventSourcedClient = TestEventSourcedClient(service.port) + + override def afterAll(): Unit = { + client.terminate() + service.terminate() + } + + "EventSourcedImpl" should { + + "manage entities with expected commands and events" in { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart")) + entity.send(command(1, "cart", "GetCart")) + entity.expect(reply(1, EmptyCart)) + entity.send(command(2, "cart", "AddItem", addItem("abc", "apple", 1))) + entity.expect(reply(2, EmptyPayload, itemAdded("abc", "apple", 1))) + entity.send(command(3, "cart", "AddItem", addItem("abc", "apple", 2))) + entity.expect(reply(3, EmptyPayload, Seq(itemAdded("abc", "apple", 2)), cartSnapshot(Item("abc", "apple", 3)))) + entity.send(command(4, "cart", "GetCart")) + entity.expect(reply(4, cart(Item("abc", "apple", 3)))) + entity.send(command(5, "cart", "AddItem", addItem("123", "banana", 4))) + entity.expect(reply(5, EmptyPayload, itemAdded("123", "banana", 4))) + entity.passivate() + val reactivated = client.connect + reactivated.send(init(ShoppingCart.Name, "cart", snapshot(3, cartSnapshot(Item("abc", "apple", 3))))) + reactivated.send(event(4, itemAdded("123", "banana", 4))) + reactivated.send(command(1, "cart", "GetCart")) + reactivated.expect(reply(1, cart(Item("abc", "apple", 3), Item("123", "banana", 4)))) + reactivated.passivate() + } + + "fail when first message is not init" in { + service.expectLogError("Terminating entity due to unexpected failure") { + val entity = client.connect + entity.send(command(1, "cart", "command")) + entity.expect(failure("Protocol error: Expected Init message")) + entity.expectClosed() + } + } + + "fail when service doesn't exist" in { + service.expectLogError("Terminating entity [foo] due to unexpected failure") { + val entity = client.connect + entity.send(init(serviceName = "DoesNotExist", entityId = "foo")) + entity.expect(failure("Protocol error: Service not found: DoesNotExist")) + entity.expectClosed() + } + } + + "fail when command payload is missing" in { + service.expectLogError("Terminating entity [cart] due to unexpected failure for command [foo]") { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart")) + entity.send(command(1, "cart", "foo", payload = None)) + entity.expect(failure(1, "Protocol error: No command payload")) + entity.expectClosed() + } + } + + "fail when command entity id is incorrect" in { + service.expectLogError("Terminating entity [cart2] due to unexpected failure for command [foo]") { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart1")) + entity.send(command(1, "cart2", "foo")) + entity.expect(failure(1, "Protocol error: Receiving entity is not the intended recipient of command")) + entity.expectClosed() + } + } + + "fail when entity is sent multiple init" in { + service.expectLogError("Terminating entity [cart] due to unexpected failure") { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart")) + entity.send(init(ShoppingCart.Name, "cart")) + entity.expect(failure("Protocol error: Entity already inited")) + entity.expectClosed() + } + } + + "fail when entity is sent empty message" in { + service.expectLogError("Terminating entity [cart] due to unexpected failure") { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart")) + entity.send(EmptyMessage) + entity.expect(failure("Protocol error: Received empty/unknown message")) + entity.expectClosed() + } + } + + "fail when snapshot handler does not exist" in { + service.expectLogError("Terminating entity due to unexpected failure") { + val entity = client.connect + val notSnapshot = domainLineItem("?", "not a cart snapshot", 1) + val snapshotClass = notSnapshot.getClass + entity.send(init(ShoppingCart.Name, "cart", snapshot(42, notSnapshot))) + entity.expect( + failure(s"No snapshot handler found for snapshot $snapshotClass on ${ShoppingCart.TestCartClass}") + ) + entity.expectClosed() + } + } + + "fail when snapshot handler throws exception" in { + service.expectLogError("Terminating entity due to unexpected failure") { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart", snapshot(42, cartSnapshot()))) + entity.expect(failure("Unexpected failure: Boom: no items")) + entity.expectClosed() + } + } + + "fail when event handler does not exist" in { + service.expectLogError("Terminating entity due to unexpected failure") { + val entity = client.connect + val notEvent = domainLineItem("?", "not an event", 1) + val eventClass = notEvent.getClass + entity.send(init(ShoppingCart.Name, "cart")) + entity.send(event(1, notEvent)) + entity.expect(failure(s"No event handler found for event $eventClass on ${ShoppingCart.TestCartClass}")) + entity.expectClosed() + } + } + + "fail when event handler throws exception" in { + service.expectLogError("Terminating entity due to unexpected failure") { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart")) + entity.send(event(1, itemAdded("123", "FAIL", 42))) + entity.expect(failure("Unexpected failure: Boom: name is FAIL")) + entity.expectClosed() + } + } + + "fail when command handler does not exist" in { + service.expectLogError("Terminating entity [cart] due to unexpected failure for command [foo]") { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart")) + entity.send(command(1, "cart", "foo")) + entity.expect(failure(1, s"No command handler found for command [foo] on ${ShoppingCart.TestCartClass}")) + entity.expectClosed() + } + } + + "fail action when command handler uses context fail" in { + service.expectLogError( + "Fail invoked for command [AddItem] for entity [cart]: Cannot add negative quantity of item [foo]" + ) { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart")) + entity.send(command(1, "cart", "AddItem", addItem("foo", "bar", -1))) + entity.expect(actionFailure(1, "Cannot add negative quantity of item [foo]")) + entity.send(command(2, "cart", "GetCart")) + entity.expect(reply(2, EmptyCart)) // check emit-then-fail doesn't change entity state + entity.passivate() + val reactivated = client.connect + reactivated.send(init(ShoppingCart.Name, "cart")) + reactivated.send(command(1, "cart", "GetCart")) + reactivated.expect(reply(1, EmptyCart)) + reactivated.passivate() + } + } + + "fail when command handler throws exception" in { + service.expectLogError("Terminating entity [cart] due to unexpected failure for command [RemoveItem]") { + val entity = client.connect + entity.send(init(ShoppingCart.Name, "cart")) + entity.send(command(1, "cart", "RemoveItem", removeItem("foo"))) + entity.expect(failure(1, "Unexpected failure: Boom: foo")) + entity.expectClosed() + } + } + } +} + +object EventSourcedImplSpec { + object ShoppingCart { + import com.example.shoppingcart.Shoppingcart + import com.example.shoppingcart.persistence.Domain + + val Name: String = Shoppingcart.getDescriptor.findServiceByName("ShoppingCart").getFullName + + def testService: TestEventSourcedService = service[TestCart] + + def service[T: ClassTag]: TestEventSourcedService = + TestEventSourced.service[T]( + Shoppingcart.getDescriptor.findServiceByName("ShoppingCart"), + Domain.getDescriptor + ) + + case class Item(id: String, name: String, quantity: Int) + + object Protocol { + import scala.jdk.CollectionConverters._ + + val EmptyCart: Shoppingcart.Cart = Shoppingcart.Cart.newBuilder.build + + def cart(items: Item*): Shoppingcart.Cart = + Shoppingcart.Cart.newBuilder.addAllItems(lineItems(items)).build + + def lineItems(items: Seq[Item]): java.lang.Iterable[Shoppingcart.LineItem] = + items.sortBy(_.id).map(item => lineItem(item.id, item.name, item.quantity)).asJava + + def lineItem(id: String, name: String, quantity: Int): Shoppingcart.LineItem = + Shoppingcart.LineItem.newBuilder.setProductId(id).setName(name).setQuantity(quantity).build + + def addItem(id: String, name: String, quantity: Int): Shoppingcart.AddLineItem = + Shoppingcart.AddLineItem.newBuilder.setProductId(id).setName(name).setQuantity(quantity).build + + def removeItem(id: String): Shoppingcart.RemoveLineItem = + Shoppingcart.RemoveLineItem.newBuilder.setProductId(id).build + + def itemAdded(id: String, name: String, quantity: Int): Domain.ItemAdded = + Domain.ItemAdded.newBuilder.setItem(domainLineItem(id, name, quantity)).build + + def domainLineItems(items: Seq[Item]): java.lang.Iterable[Domain.LineItem] = + items.sortBy(_.id).map(item => domainLineItem(item.id, item.name, item.quantity)).asJava + + def domainLineItem(id: String, name: String, quantity: Int): Domain.LineItem = + Domain.LineItem.newBuilder.setProductId(id).setName(name).setQuantity(quantity).build + + def cartSnapshot(items: Item*): Domain.Cart = + Domain.Cart.newBuilder.addAllItems(domainLineItems(items)).build + } + + val TestCartClass: Class[_] = classOf[TestCart] + + @EventSourcedEntity(persistenceId = "shopping-cart", snapshotEvery = 2) + class TestCart(@EntityId val entityId: String) { + val cart = mutable.Map.empty[String, Item] + + @CommandHandler + def getCart: Shoppingcart.Cart = Protocol.cart(cart.values.toSeq: _*) + + @CommandHandler + def addItem(item: Shoppingcart.AddLineItem, ctx: CommandContext): Empty = { + // emit and then fail on negative quantities, for testing atomicity + ctx.emit(Protocol.itemAdded(item.getProductId, item.getName, item.getQuantity)) + if (item.getQuantity <= 0) ctx.fail(s"Cannot add negative quantity of item [${item.getProductId}]") + Empty.getDefaultInstance + } + + @EventHandler + def itemAdded(itemAdded: Domain.ItemAdded): Unit = { + if (itemAdded.getItem.getName == "FAIL") throw new RuntimeException("Boom: name is FAIL") // fail for testing + val currentQuantity = cart.get(itemAdded.getItem.getProductId).map(_.quantity).getOrElse(0) + cart.update(itemAdded.getItem.getProductId, + Item(itemAdded.getItem.getProductId, + itemAdded.getItem.getName, + currentQuantity + itemAdded.getItem.getQuantity)) + } + + @CommandHandler + def removeItem(item: Shoppingcart.RemoveLineItem): Empty = { + if (true) throw new RuntimeException("Boom: " + item.getProductId) // always fail for testing + Empty.getDefaultInstance + } + + @Snapshot + def snapshot: Domain.Cart = Protocol.cartSnapshot(cart.values.toSeq: _*) + + @SnapshotHandler + def handleSnapshot(cartSnapshot: Domain.Cart): Unit = { + import scala.jdk.CollectionConverters._ + if (cartSnapshot.getItemsList.isEmpty) throw new RuntimeException("Boom: no items") // fail for testing + cart.clear() + cartSnapshot.getItemsList.asScala.foreach { item => + cart.update(item.getProductId, Item(item.getProductId, item.getName, item.getQuantity)) + } + } + } + } +} diff --git a/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/TestEventSourced.scala b/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/TestEventSourced.scala new file mode 100644 index 000000000..7e8d9bf39 --- /dev/null +++ b/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/TestEventSourced.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.impl.eventsourced + +import akka.actor.ActorSystem +import akka.testkit.{EventFilter, SocketUtil} +import com.google.protobuf.Descriptors.{FileDescriptor, ServiceDescriptor} +import com.typesafe.config.{Config, ConfigFactory} +import io.cloudstate.javasupport.{CloudState, CloudStateRunner} +import scala.reflect.ClassTag + +object TestEventSourced { + def service[T: ClassTag](descriptor: ServiceDescriptor, fileDescriptors: FileDescriptor*): TestEventSourcedService = + new TestEventSourcedService(implicitly[ClassTag[T]].runtimeClass, descriptor, fileDescriptors) +} + +class TestEventSourcedService(entityClass: Class[_], + descriptor: ServiceDescriptor, + fileDescriptors: Seq[FileDescriptor]) { + val port: Int = SocketUtil.temporaryLocalPort() + + val config: Config = ConfigFactory.load(ConfigFactory.parseString(s""" + cloudstate.user-function-port = $port + akka { + loglevel = ERROR + loggers = ["akka.testkit.TestEventListener"] + http.server { + preview.enable-http2 = on + idle-timeout = infinite + } + } + """)) + + val runner: CloudStateRunner = new CloudState() + .registerEventSourcedEntity(entityClass, descriptor, fileDescriptors: _*) + .createRunner(config) + + runner.run() + + def expectLogError[T](message: String)(block: => T): T = + EventFilter.error(message, occurrences = 1).intercept(block)(runner.system) + + def terminate(): Unit = runner.terminate() +}