Skip to content

Commit

Permalink
Add close() method to Coordination for resource cleanup. Some cod…
Browse files Browse the repository at this point in the history
…e cleanups. Closes hseeberger#167.
  • Loading branch information
Francisco José Bermejo Herrera committed Nov 15, 2017
1 parent 21aabdd commit 5e4972c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ final class EtcdCoordination(clusterName: String, system: ActorSystem) extends C
override def refresh(self: Address, ttl: FiniteDuration) =
addSelfOrRefresh(self, ttl)

override def close(): Future[Done] =
Future.successful(Done)

private def addSelfOrRefresh(self: Address, ttl: FiniteDuration) = {
val node = getUrlEncoder.encodeToString(self.toString.getBytes(UTF_8))
val query = Uri.Query("ttl" -> toSeconds(ttl), "value" -> self.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ trait Coordination {
def getNodes(): Future[Set[Address]]

/**
* Akquire a lock for bootstrapping the cluster (first node).
* Acquire a lock for bootstrapping the cluster (first node).
*
* @param self self node
* @param ttl TTL for the lock
* @return true, if lock could be akquired, else false
* @return true, if lock could be acquired, else false
*/
def lock(self: Address, ttl: FiniteDuration): Future[Boolean]

Expand All @@ -80,4 +80,11 @@ trait Coordination {
* @return future signaling done
*/
def refresh(self: Address, ttl: FiniteDuration): Future[Done]

/**
* Performs resource cleanup on termination
*
* @return future signaling done
*/
def close(): Future[Done]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import akka.actor.{ Address, FSM, Props, Status }
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ InitialStateAsEvents, MemberJoined, MemberUp }
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import de.heikoseeberger.constructr.coordination.Coordination
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.{ Success, Failure => SFailure }

object ConstructrMachine {

Expand Down Expand Up @@ -105,8 +105,7 @@ final class ConstructrMachine(
s"ttl-factor must be greater or equal 1 + ((coordination-timeout * (1 + nr-of-retries) + retry-delay * nr-of-retries)/ refresh-interval), i.e. $minTtlFactor, but was $ttlFactor!"
)

private implicit val mat = ActorMaterializer()
private val cluster = Cluster(context.system)
private val cluster = Cluster(context.system)

startWith(State.GettingNodes, Data(Set.empty, State.GettingNodes, nrOfRetries))

Expand Down Expand Up @@ -295,6 +294,18 @@ final class ConstructrMachine(

initialize()

// Performs resource cleanup on termination

onTermination {
case se: StopEvent =>
log.warning("StopEvent received, reason: {}. Closing Coordinator for resource cleanup",
se.reason)
coordination.close().onComplete {
case Success(_) => log.info("Coordinator closed successfully")
case SFailure(e) => log.error(e, "Coordinator failed to close")
}
}

// Helpers

private def retry(retryState: ConstructrMachine.State) =
Expand Down

0 comments on commit 5e4972c

Please sign in to comment.