Skip to content
This repository has been archived by the owner on Apr 15, 2018. It is now read-only.

Propagate extension failures to user-space (refs #171) #172

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ member-joined │
└───────────────────┘ └───────────────────┘
```

If something goes finally wrong when interacting with the coordination service, e.g. a permanent timeout after a configurable number of retries, ConstructR terminates its `ActorSystem` in the spirit of "fail fast".
If something goes finally wrong when interacting with the coordination service, e.g. a permanent timeout after a configurable number of retries, ConstructR terminates itself in the spirit of "failing fast".

``` scala
// All releases including intermediate ones are published here,
Expand Down Expand Up @@ -77,6 +77,14 @@ constructr {
}
```

Exceeding the number of max retries will lead to internal failure of the `ConstructrExtension`. You can hook to this event in the following manner:
``` scala
val constructr = ConstructrExtension(system)
constructr.registerOnFailure {
// do something if ConstructR failes
}
```

## Coordination

ConstructR comes with out-of-the-box support for etcd: simply depend on the "constructr-coordination-etcd" module. If you want to use some other coordination backend, e.g. Consul, simply implement the `Coordination` trait from the "constructr-coordination" module and make sure to provide the fully qualified class name via the `constructr.coordination.class-name` configuration setting.
Expand Down
99 changes: 73 additions & 26 deletions core/src/main/scala/de/heikoseeberger/constructr/Constructr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,66 @@
package de.heikoseeberger.constructr

import akka.actor.{ Actor, ActorLogging, ActorRef, Props, SupervisorStrategy, Terminated }
import akka.cluster.{ Cluster, Member }
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ InitialStateAsEvents, MemberExited, MemberLeft, MemberRemoved }
import akka.cluster.MemberStatus.Up
import de.heikoseeberger.constructr.coordination.Coordination

import scala.concurrent.duration.{ FiniteDuration, NANOSECONDS }
import scala.util.control.NonFatal

object Constructr {
private[constructr] object Constructr {

final val Name = "constructr"

def props: Props =
Props(new Constructr)
def props: Props = Props(new Constructr)

case class RegisterFailureHandler(callback: Runnable)
}

final class Constructr private extends Actor with ActorLogging {
private[constructr] class Constructr private
extends Actor
with ActorLogging
with ConstructrFailureListening {

import Constructr.RegisterFailureHandler

override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
override val supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy

private val cluster = Cluster(context.system)
private[this] val cluster = Cluster(context.system)
private[this] var machineOption: Option[ActorRef] = None
private[this] var failureListeners = Set.empty[ActorRef]

if (cluster.settings.SeedNodes.isEmpty) {
log.info("Creating constructr-machine, because no seed-nodes defined")
log.info("Creating constructr-machine, because no seed-nodes are defined")
cluster.subscribe(self,
InitialStateAsEvents,
classOf[MemberLeft],
classOf[MemberExited],
classOf[MemberRemoved])
context.become(active(context.watch(createConstructrMachine())))
machineOption = Some(context.watch(createConstructrMachine()))
} else {
log.info("Stopping self, because seed-nodes defined")
log.info("Stopping ConstructR, because seed-nodes are defined in configuration.")
context.stop(self)
}

override def receive = Actor.emptyBehavior
override def receive: Receive = {
case Terminated(machine) if machineOption.contains(machine) =>
machineOption = None

private def active(machine: ActorRef): Receive = {
case Terminated(`machine`) =>
val selfAddress = cluster.selfAddress
def isSelfAndUp(member: Member) =
member.address == selfAddress && member.status == Up
if (cluster.state.members.exists(isSelfAndUp)) {
log.error("Leaving, because constructr-machine terminated!")
cluster.leave(selfAddress)
} else {
log.error("Terminating system, because constructr-machine terminated!")
context.system.terminate()
}
case Terminated(failureListener) if failureListeners.contains(failureListener) =>
failureListeners -= failureListener

case RegisterFailureHandler(callback) if machineOption.nonEmpty =>
failureListeners += context.actorOf(
Props(classOf[ConstructrFailureListener], machineOption.get, callback)
)

case RegisterFailureHandler(callback) if machineOption.isEmpty =>
executeFailureHandler(callback)

case MemberRemoved(member, _) if member.address == cluster.selfAddress =>
log.error("Terminating system, because member has been removed!")
context.system.terminate()
log.warning("Stopping ConstructR because cluster member has been removed!")
context.stop(self)
}

private def createConstructrMachine() = {
Expand Down Expand Up @@ -101,3 +110,41 @@ final class Constructr private extends Actor with ActorLogging {
)
}
}

private[constructr] class ConstructrFailureListener(machine: ActorRef, callback: Runnable)
extends Actor
with ActorLogging
with ConstructrFailureListening {

override def preStart(): Unit = {
super.preStart()
context.watch(machine)
}

override def postStop(): Unit = {
context.unwatch(machine)
super.postStop()
}

override def receive: Receive = {
case Terminated(`machine`) =>
try {
executeFailureHandler(callback)
} finally {
context.stop(self)
}
}
}

private[constructr] trait ConstructrFailureListening {
this: Actor with ActorLogging =>

def executeFailureHandler(callback: Runnable): Unit =
try {
callback.run()
} catch {
case NonFatal(e) =>
log.error(e, "ConstructR failure callback failed with [{}]", e.getMessage)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,32 @@

package de.heikoseeberger.constructr

import akka.actor.{ ExtendedActorSystem, Extension, ExtensionKey }
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }

object ConstructrExtension extends ExtensionKey[ConstructrExtension]
object ConstructrExtension extends ExtensionId[ConstructrExtension] with ExtensionIdProvider {

override def lookup(): ExtensionId[ConstructrExtension] = ConstructrExtension

override def createExtension(system: ExtendedActorSystem): ConstructrExtension =
new ConstructrExtension(system)

/**
* Java API
*/
override def get(system: ActorSystem): ConstructrExtension = super.get(system)
}

final class ConstructrExtension private (system: ExtendedActorSystem) extends Extension {
system.systemActorOf(Constructr.props, Constructr.Name)

private[this] val supervisor = system.systemActorOf(Constructr.props, Constructr.Name)

def registerOnFailure[T](code: => T): Unit = {
val callback = new Runnable {
override def run(): Unit = code
}
registerOnFailure(callback)
}

def registerOnFailure(callback: Runnable): Unit =
supervisor ! Constructr.RegisterFailureHandler(callback)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@

package de.heikoseeberger.constructr

import akka.actor.ActorDSL.{ actor, Act }
import akka.actor.Address
import akka.cluster.{ Cluster, ClusterEvent }
import akka.actor.{Actor, Address, PoisonPill, Props}
import akka.cluster.{Cluster, ClusterEvent}
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.model.StatusCodes.{ NotFound, OK }
import akka.http.scaladsl.model.StatusCodes.{NotFound, OK}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.pattern.ask
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec}
import akka.stream.ActorMaterializer
import akka.testkit.TestDuration
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.scalatest.{ BeforeAndAfterAll, FreeSpecLike, Matchers }
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FreeSpecLike, Matchers}

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

object ConstructrMultiNodeConfig {
val coordinationHost = {
val coordinationHost: String = {
val dockerHostPattern = """tcp://(\S+):\d{1,5}""".r
sys.env
.get("DOCKER_HOST")
Expand Down Expand Up @@ -98,14 +99,14 @@ abstract class MultiNodeConstructrSpec(
enterBarrier("coordination-started")

ConstructrExtension(system)
val listener = actor(new Act {
val listener = system.actorOf(Props(new Actor {
import ClusterEvent._
var isMember = false
Cluster(context.system).subscribe(self,
InitialStateAsEvents,
classOf[MemberJoined],
classOf[MemberUp])
become {
override def receive: Receive = {
case "isMember" => sender() ! isMember

case MemberJoined(member) if member.address == Cluster(context.system).selfAddress =>
Expand All @@ -114,7 +115,7 @@ abstract class MultiNodeConstructrSpec(
case MemberUp(member) if member.address == Cluster(context.system).selfAddress =>
isMember = true
}
})
}))
within(20.seconds.dilated) {
awaitAssert {
implicit val timeout = Timeout(1.second.dilated)
Expand All @@ -141,17 +142,42 @@ abstract class MultiNodeConstructrSpec(
}
}

enterBarrier("extension-killed")

val failureHandlers = 1.to(5).map(_ => mock(classOf[Runnable]))
failureHandlers.foreach(ConstructrExtension(system).registerOnFailure(_))
system.actorSelection(s"/system/${Constructr.Name}/${ConstructrMachine.Name}") ! PoisonPill

within(5.seconds.dilated) {
awaitAssert {
for (i <- failureHandlers.indices) {
verify(failureHandlers(i), times(1)).run()
}
}
}

enterBarrier("post-extension-killed")

val postFailureHandler = mock(classOf[Runnable])
ConstructrExtension(system).registerOnFailure(postFailureHandler)

within(5.seconds.dilated) {
awaitAssert {
verify(postFailureHandler, times(1)).run()
}
}

enterBarrier("done")
}

override def initialParticipants = roles.size
override def initialParticipants: Int = roles.size

override protected def beforeAll() = {
override protected def beforeAll(): Unit = {
super.beforeAll()
multiNodeSpecBeforeAll()
}

override protected def afterAll() = {
override protected def afterAll(): Unit = {
multiNodeSpecAfterAll()
super.afterAll()
}
Expand Down