Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support graceful shutdown #487

Merged
merged 5 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ All notable changes to this project will be documented in this file.
- Default stackableVersion to operator version ([#467]).
- Document we don't create PodDisruptionBudgets ([#480]).
- Added support for 0.57.0 ([#482]).
- Support graceful shutdown ([#487]).
- Disable OPA telemetry ([#487]).

### Changed

Expand All @@ -23,6 +25,7 @@ All notable changes to this project will be documented in this file.
[#467]: https://github.com/stackabletech/opa-operator/pull/467
[#480]: https://github.com/stackabletech/opa-operator/pull/480
[#482]: https://github.com/stackabletech/opa-operator/pull/482
[#487]: https://github.com/stackabletech/opa-operator/pull/487

## [23.7.0] - 2023-07-14

Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ repository = "https://github.com/stackabletech/opa-operator"
[workspace.dependencies]
built = { version = "0.6", features = ["chrono", "git2"] }
clap = "4.3"
futures = { version = "0.3", features = ["compat"] }
fnv = "1.0"
futures = { version = "0.3", features = ["compat"] }
indoc = "2.0"
pin-project = "1.1"
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.6.0" }
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = "0.7"
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.56.0" }
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.6.0" }
strum = { version = "0.25", features = ["derive"] }
tokio = { version = "1.29", features = ["full"] }
tracing = "0.1"
Expand Down
8 changes: 8 additions & 0 deletions deploy/helm/opa-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ spec:
config:
default: {}
properties:
gracefulShutdownTimeout:
description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
nullable: true
type: string
logging:
default:
enableVectorAgent: null
Expand Down Expand Up @@ -3066,6 +3070,10 @@ spec:
config:
default: {}
properties:
gracefulShutdownTimeout:
description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
nullable: true
type: string
logging:
default:
enableVectorAgent: null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
= Graceful shutdown

Graceful shutdown of OPA nodes is either not supported by the product itself
or we have not implemented it yet.
You can configure the graceful shutdown as described in xref:concepts:operations/graceful_shutdown.adoc[].

Outstanding implementation work for the graceful shutdowns of all products where this functionality is relevant is tracked in
https://github.com/stackabletech/issues/issues/357
== Servers

As a default, OPA servers have `2 minutes` to shut down gracefully.

The OPA server process will receive a `SIGTERM` signal when Kubernetes wants to terminate the Pod.
It will acknowledge the shutdown as shown in the log below and initiate a graceful shutdown.
After the graceful shutdown timeout runs out, and the process still didn't exit, Kubernetes will issue a `SIGKILL` signal.

[source,text]
----
{"level":"info","msg":"Shutting down...","time":"2023-11-06T15:16:08Z"}
{"level":"info","msg":"Server shutdown.","time":"2023-11-06T15:16:08Z"}
{"level":"info","msg":"Stopping bundle loader.","name":"stackable","plugin":"bundle","time":"2023-11-06T15:16:08Z"}
----

== Implementation

Once a server Pod is asked to terminate the following timeline occurs:

1. The server stops accepting any new queries.
2. The server waits until all running queries have finished.
3. If the graceful shutdown doesn't complete quick enough (e.g. a query runs longer than the graceful shutdown period), after `<graceful shutdown period> + 5s safety overhead` the Pod gets killed, regardless if it has shut down gracefully or not. This is achieved by setting `terminationGracePeriodSeconds` on the server Pods. Running queries on the sever will fail.
14 changes: 14 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use stackable_operator::{
role_utils::{EmptyRoleConfig, RoleGroup, RoleGroupRef},
schemars::{self, JsonSchema},
status::condition::{ClusterCondition, HasStatusCondition},
time::Duration,
};
use strum::{Display, EnumIter, EnumString};

Expand All @@ -28,17 +29,24 @@ pub const OPERATOR_NAME: &str = "opa.stackable.tech";

pub const CONFIG_FILE: &str = "config.yaml";

pub const DEFAULT_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_minutes_unchecked(2);
/// Safety puffer to guarantee the graceful shutdown works every time.
pub const SERVER_GRACEFUL_SHUTDOWN_SAFETY_OVERHEAD: Duration = Duration::from_secs(5);

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("the role group {role_group} is not defined"))]
CannotRetrieveOpaRoleGroup { role_group: String },

#[snafu(display("unknown role {role}"))]
UnknownOpaRole {
source: strum::ParseError,
role: String,
},

#[snafu(display("the role group [{role_group}] is missing"))]
MissingRoleGroup { role_group: String },

#[snafu(display("fragment validation failure"))]
FragmentValidationFailure { source: ValidationError },
}
Expand Down Expand Up @@ -173,8 +181,13 @@ pub enum Container {
pub struct OpaConfig {
#[fragment_attrs(serde(default))]
pub logging: Logging<Container>,

#[fragment_attrs(serde(default))]
pub resources: Resources<OpaStorageConfig, NoRuntimeLimits>,

/// Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
#[fragment_attrs(serde(default))]
pub graceful_shutdown_timeout: Option<Duration>,
}

impl OpaConfig {
Expand All @@ -192,6 +205,7 @@ impl OpaConfig {
},
storage: OpaStorageConfigFragment {},
},
graceful_shutdown_timeout: Some(DEFAULT_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT),
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ stackable-opa-crd = { path = "../crd" }
clap.workspace = true
fnv.workspace = true
futures.workspace = true
indoc.workspace = true
pin-project.workspace = true
product-config.workspace = true
semver.workspace = true
serde.workspace = true
serde_json.workspace = true
serde.workspace = true
snafu.workspace = true
stackable-operator.workspace = true
product-config.workspace = true
strum.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
87 changes: 62 additions & 25 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use std::{
sync::Arc,
};

use indoc::formatdoc;
use product_config::{types::PropertyNameKind, ProductConfigManager};
use serde_json::json;
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_opa_crd::{
Container, OpaCluster, OpaClusterStatus, OpaConfig, OpaRole, APP_NAME, OPERATOR_NAME,
Container, OpaCluster, OpaClusterStatus, OpaConfig, OpaRole, APP_NAME,
DEFAULT_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT, OPERATOR_NAME,
};
use stackable_operator::{
builder::{
Expand Down Expand Up @@ -39,6 +41,7 @@ use stackable_operator::{
product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config},
product_logging::{
self,
framework::{create_vector_shutdown_file_command, remove_vector_shutdown_file_command},
spec::{
AppenderConfig, AutomaticContainerLogConfig, ContainerLogConfig,
ContainerLogConfigChoice, LogLevel,
Expand All @@ -50,11 +53,13 @@ use stackable_operator::{
operations::ClusterOperationsConditionBuilder,
},
time::Duration,
utils::COMMON_BASH_TRAP_FUNCTIONS,
};
use strum::{EnumDiscriminants, IntoStaticStr};

use crate::{
discovery::{self, build_discovery_configmaps},
operations::graceful_shutdown::add_graceful_shutdown_config,
product_logging::{
extend_role_group_config_map, resolve_vector_aggregator_address, BundleBuilderLogLevel,
OpaLogLevel,
Expand All @@ -75,7 +80,7 @@ pub const BUNDLE_BUILDER_PORT: i32 = 3030;
const CONFIG_VOLUME_NAME: &str = "config";
const CONFIG_DIR: &str = "/stackable/config";
const LOG_VOLUME_NAME: &str = "log";
const LOG_DIR: &str = "/stackable/log";
const STACKABLE_LOG_DIR: &str = "/stackable/log";
const BUNDLES_VOLUME_NAME: &str = "bundles";
const BUNDLES_DIR: &str = "/bundles";

Expand Down Expand Up @@ -227,6 +232,11 @@ pub enum Error {
BuildRbacResources {
source: stackable_operator::error::Error,
},

#[snafu(display("failed to configure graceful shutdown"))]
GracefulShutdown {
source: crate::operations::graceful_shutdown::Error,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -590,7 +600,8 @@ fn build_server_rolegroup_daemonset(
cb_prepare
.image_from_product_image(resolved_product_image)
.command(vec![
"bash".to_string(),
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
Expand All @@ -601,13 +612,14 @@ fn build_server_rolegroup_daemonset(
)
.join(" && ")])
.add_volume_mount(BUNDLES_VOLUME_NAME, BUNDLES_DIR)
.add_volume_mount(LOG_VOLUME_NAME, LOG_DIR)
.add_volume_mount(LOG_VOLUME_NAME, STACKABLE_LOG_DIR)
.resources(merged_config.resources.to_owned().into());

cb_bundle_builder
.image_from_product_image(resolved_product_image)
.command(vec![
"bash".to_string(),
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
Expand All @@ -622,7 +634,7 @@ fn build_server_rolegroup_daemonset(
bundle_builder_log_level(merged_config).to_string(),
)
.add_volume_mount(BUNDLES_VOLUME_NAME, BUNDLES_DIR)
.add_volume_mount(LOG_VOLUME_NAME, LOG_DIR)
.add_volume_mount(LOG_VOLUME_NAME, STACKABLE_LOG_DIR)
.resources(
ResourceRequirementsBuilder::new()
.with_cpu_request("100m")
Expand Down Expand Up @@ -656,7 +668,8 @@ fn build_server_rolegroup_daemonset(
cb_opa
.image_from_product_image(resolved_product_image)
.command(vec![
"bash".to_string(),
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
Expand All @@ -668,7 +681,7 @@ fn build_server_rolegroup_daemonset(
.add_env_vars(env)
.add_container_port(APP_PORT_NAME, APP_PORT.into())
.add_volume_mount(CONFIG_VOLUME_NAME, CONFIG_DIR)
.add_volume_mount(LOG_VOLUME_NAME, LOG_DIR)
.add_volume_mount(LOG_VOLUME_NAME, STACKABLE_LOG_DIR)
.resources(merged_config.resources.to_owned().into())
.readiness_probe(Probe {
initial_delay_seconds: Some(5),
Expand Down Expand Up @@ -753,6 +766,8 @@ fn build_server_rolegroup_daemonset(
));
}

add_graceful_shutdown_config(merged_config, &mut pb).context(GracefulShutdownSnafu)?;

let mut pod_template = pb.build_template();
pod_template.merge_from(role.config.pod_overrides.clone());
pod_template.merge_from(role_group.config.pod_overrides.clone());
Expand Down Expand Up @@ -841,15 +856,30 @@ fn build_opa_start_command(merged_config: &OpaConfig, container_name: &str) -> S
}
}

let mut start_command = format!("/stackable/opa/opa run -s -a 0.0.0.0:{APP_PORT} -c {CONFIG_DIR}/config.yaml -l {opa_log_level}");

if console_logging_off {
start_command.push_str(&format!(" |& /stackable/multilog s{OPA_ROLLING_LOG_FILE_SIZE_BYTES} n{OPA_ROLLING_LOG_FILES} {LOG_DIR}/{container_name}"));
} else {
start_command.push_str(&format!(" |& tee >(/stackable/multilog s{OPA_ROLLING_LOG_FILE_SIZE_BYTES} n{OPA_ROLLING_LOG_FILES} {LOG_DIR}/{container_name})"));
// TODO: Think about adding --shutdown-wait-period, as suggested by https://github.com/open-policy-agent/opa/issues/2764
formatdoc! {"
{COMMON_BASH_TRAP_FUNCTIONS}
{remove_vector_shutdown_file_command}
prepare_signal_handlers
/stackable/opa/opa run -s -a 0.0.0.0:{APP_PORT} -c {CONFIG_DIR}/config.yaml -l {opa_log_level} --shutdown-grace-period {shutdown_grace_period_s} --disable-telemetry{logging_redirects} &
wait_for_termination $!
{create_vector_shutdown_file_command}
",
remove_vector_shutdown_file_command =
remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
create_vector_shutdown_file_command =
create_vector_shutdown_file_command(STACKABLE_LOG_DIR),
shutdown_grace_period_s = merged_config.graceful_shutdown_timeout.unwrap_or(DEFAULT_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT).as_secs(),
// Redirects matter!
// We need to watch out, that the following "$!" call returns the PID of the main (opa-bundle-builder) process,
// and not some utility (e.g. multilog or tee) process.
// See https://stackoverflow.com/a/8048493
logging_redirects = if console_logging_off {
format!(" &> >(/stackable/multilog s{OPA_ROLLING_LOG_FILE_SIZE_BYTES} n{OPA_ROLLING_LOG_FILES} {STACKABLE_LOG_DIR}/{container_name})")
} else {
format!(" &> >(tee >(/stackable/multilog s{OPA_ROLLING_LOG_FILE_SIZE_BYTES} n{OPA_ROLLING_LOG_FILES} {STACKABLE_LOG_DIR}/{container_name}))")
},
}

start_command
}

fn build_bundle_builder_start_command(merged_config: &OpaConfig, container_name: &str) -> String {
Expand All @@ -872,15 +902,22 @@ fn build_bundle_builder_start_command(merged_config: &OpaConfig, container_name:
}
};

let mut start_command = "/stackable/opa-bundle-builder".to_string();

if console_logging_off {
start_command.push_str(&format!(" |& /stackable/multilog s{OPA_ROLLING_BUNDLE_BUILDER_LOG_FILE_SIZE_BYTES} n{OPA_ROLLING_BUNDLE_BUILDER_LOG_FILES} {LOG_DIR}/{container_name}"))
} else {
start_command.push_str(&format!(" |& tee >(/stackable/multilog s{OPA_ROLLING_BUNDLE_BUILDER_LOG_FILE_SIZE_BYTES} n{OPA_ROLLING_BUNDLE_BUILDER_LOG_FILES} {LOG_DIR}/{container_name})"));
formatdoc! {"
{COMMON_BASH_TRAP_FUNCTIONS}
prepare_signal_handlers
/stackable/opa-bundle-builder{logging_redirects} &
wait_for_termination $!
",
// Redirects matter!
// We need to watch out, that the following "$!" call returns the PID of the main (opa-bundle-builder) process,
// and not some utility (e.g. multilog or tee) process.
// See https://stackoverflow.com/a/8048493
logging_redirects = if console_logging_off {
format!(" &> >(/stackable/multilog s{OPA_ROLLING_BUNDLE_BUILDER_LOG_FILE_SIZE_BYTES} n{OPA_ROLLING_BUNDLE_BUILDER_LOG_FILES} {STACKABLE_LOG_DIR}/{container_name})")
} else {
format!(" &> >(tee >(/stackable/multilog s{OPA_ROLLING_BUNDLE_BUILDER_LOG_FILE_SIZE_BYTES} n{OPA_ROLLING_BUNDLE_BUILDER_LOG_FILES} {STACKABLE_LOG_DIR}/{container_name}))")
},
}

start_command
}

fn bundle_builder_log_level(merged_config: &OpaConfig) -> BundleBuilderLogLevel {
Expand Down Expand Up @@ -909,7 +946,7 @@ fn build_prepare_start_command(merged_config: &OpaConfig, container_name: &str)
}) = merged_config.logging.containers.get(&Container::Prepare)
{
prepare_container_args.push(product_logging::framework::capture_shell_output(
LOG_DIR,
STACKABLE_LOG_DIR,
container_name,
log_config,
));
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::controller::OPA_CONTROLLER_NAME;

mod controller;
mod discovery;
mod operations;
mod product_logging;

pub mod built_info {
Expand Down
Loading