Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for pending commits to finish during shutdown. #1663

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1077,18 +1077,6 @@ class SamzaContainer(
}
}

if (commitThreadPool != null) {
info("Shutting down task commit thread pool")
try {
commitThreadPool.shutdown()
if(!commitThreadPool.awaitTermination(shutdownMs, TimeUnit.MILLISECONDS)) {
commitThreadPool.shutdownNow()
}
} catch {
case e: Exception => error(e.getMessage, e)
}
}

if (timerExecutor != null) {
info("Shutting down timer executor")
try {
Expand All @@ -1102,6 +1090,18 @@ class SamzaContainer(
}

taskInstances.values.foreach(_.shutdownTask)

if (commitThreadPool != null) {
info("Shutting down task commit thread pool")
try {
commitThreadPool.shutdown()
if(!commitThreadPool.awaitTermination(shutdownMs, TimeUnit.MILLISECONDS)) {
commitThreadPool.shutdownNow()
}
} catch {
case e: Exception => error(e.getMessage, e)
}
}
}

def shutdownStores {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,19 +523,28 @@ class TaskInstance(
}

def shutdownTask {
// wait for in-flight commits to complete
val numTasksInContainer = containerContext.getContainerModel.getTasks.size()
val waitTimeMillis = taskConfig.getShutdownMs / numTasksInContainer
val acquired = commitInProgress.tryAcquire(waitTimeMillis, TimeUnit.MILLISECONDS)
if (!acquired) {
info("Pending commit did not complete within %d ms. Proceeding with shutdown." format waitTimeMillis)
}

if (commitManager != null) {
debug("Shutting down commit manager for taskName: %s" format taskName)
commitManager.close()
} else {
debug("Skipping commit manager shutdown for taskName: %s" format taskName)
}

applicationTaskContextOption.foreach(applicationTaskContext => {
debug("Stopping application-defined task context for taskName: %s" format taskName)
applicationTaskContext.stop()
})

if (task.isInstanceOf[ClosableTask]) {
debug("Shutting down stream task for taskName: %s" format taskName)

task.asInstanceOf[ClosableTask].close
} else {
debug("Skipping stream task shutdown for taskName: %s" format taskName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public static Collection<Boolean> data() {

put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
put(TaskConfig.TASK_SHUTDOWN_MS, "10000"); // wait for pending commits to complete.

// override store level state backend for in memory stores to use Kafka changelogs
put(String.format(StorageConfig.STORE_BACKUP_FACTORIES, IN_MEMORY_STORE_NAME),
Expand Down