Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Upgrading KPL/KCL libraries, replacing deprecated shutdown calls (#26)
Browse files Browse the repository at this point in the history
* Upgrading KPL/KCL libraries, replacing deprecated shutdown calls

* auto format from compile
  • Loading branch information
etspaceman authored and agaro1121 committed Sep 7, 2017
1 parent b0b587d commit fc7fee2
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 10 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ lazy val library =
"com.fasterxml.uuid" % "java-uuid-generator" % "3.1.4" % Compile)

val amazon = Seq(
"com.amazonaws" % "amazon-kinesis-client" % "1.7.5" % Compile
"com.amazonaws" % "amazon-kinesis-client" % "1.8.1" % Compile
excludeAll(
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")),
"com.amazonaws" % "amazon-kinesis-producer" % "0.12.3" % Compile
"com.amazonaws" % "amazon-kinesis-producer" % "0.12.5" % Compile
excludeAll(
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private[consumer] class ConsumerProcessingManager(
private[consumer] def closeManager(): Unit = {
val canCloseManager = shuttingDown.compareAndSet(false, true)
if (canCloseManager) {
Future(kclWorker.requestShutdown()) //Needs to be async otherwise we hog the processRecords thread
Future(kclWorker.startGracefulShutdown()) //Needs to be async otherwise we hog the processRecords thread
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ class KinesisConsumer(consumerConf: ConsumerConf,
val shutdownTimeoutUnit = consumerConf.workerConf.shutdownTimeout.duration.unit

Try {
kclWorker.requestShutdown
kclWorker
.startGracefulShutdown()
.get(shutdownTimeoutLength, shutdownTimeoutUnit)
} match {
case Success(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class ConsumerProcessingManagerSpec

Mockito
.verify(kcl, Mockito.times(1))
.requestShutdown() //we failed so shutdown should have been called
.startGracefulShutdown() //we failed so shutdown should have been called
}
}

Expand All @@ -175,7 +175,7 @@ class ConsumerProcessingManagerSpec

Mockito
.verify(kcl, Mockito.times(1))
.requestShutdown() //we failed so shutdown should have been called
.startGracefulShutdown() //we failed so shutdown should have been called
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ class KinesisConsumerSpec

"Should call requestShutdown on worker, when stop is called for consumer" in {
val worker = mock[Worker]
Mockito.when(worker.requestShutdown()).thenReturn(mock[java.util.concurrent.Future[Void]])
Mockito
.when(worker.startGracefulShutdown())
.thenReturn(mock[java.util.concurrent.Future[java.lang.Boolean]])

Given("A running consumer")

Expand All @@ -286,15 +288,17 @@ class KinesisConsumerSpec
consumer.stop()

Then("It should call request shutdown on worker once")
Mockito.verify(worker, Mockito.times(1)).requestShutdown()
Mockito.verify(worker, Mockito.times(1)).startGracefulShutdown()
}
}

"Should only call shutdown once, even on multiple invocations of stop" in {
val worker = mock[Worker]

Given("A consumer with our mocked worker")
Mockito.when(worker.requestShutdown()).thenReturn(mock[java.util.concurrent.Future[Void]])
Mockito
.when(worker.startGracefulShutdown())
.thenReturn(mock[java.util.concurrent.Future[java.lang.Boolean]])

val consumer = new KinesisConsumer(ConsumerConf(kinesisConfig, "testConsumer-1"),
null, // scalastyle:ignore
Expand All @@ -313,7 +317,7 @@ class KinesisConsumerSpec
consumer.stop()

Then("It should only call request shutdown on worker once")
Mockito.verify(worker, Mockito.times(1)).requestShutdown()
Mockito.verify(worker, Mockito.times(1)).startGracefulShutdown()
}
}
}
Expand Down

0 comments on commit fc7fee2

Please sign in to comment.