From fcb8b86918dc8121a7d240f5e4adbb480f1574d0 Mon Sep 17 00:00:00 2001 From: Mark Goddard Date: Thu, 31 Aug 2023 14:55:28 +0100 Subject: [PATCH] Add support for resource management Adds CLI arguments to specify limits for: * S3 connections * Memory (bytes) * Threads used for CPU bound tasks (when not using Rayon) This allows us to control the resource usage of the server and helps to avoid exceeding open file limits, overloading upstream object stores, and running out of memory. Currently these limits must be specified manually. The memory limit is fairly rough, and currently only accounts for the size of the data returned from the S3 object store. It does not account for decompressed data which may be larger than the compressed size, nor does it account for operations such as deshuffling which may require a second buffer of the same size. --- benches/s3_client.rs | 6 +- src/app.rs | 41 +++++++++++-- src/cli.rs | 10 ++++ src/error.rs | 39 +++++++++++- src/lib.rs | 1 + src/resource_manager.rs | 127 ++++++++++++++++++++++++++++++++++++++++ src/s3_client.rs | 15 ++++- 7 files changed, 228 insertions(+), 11 deletions(-) create mode 100644 src/resource_manager.rs diff --git a/benches/s3_client.rs b/benches/s3_client.rs index f45f71e..9126540 100644 --- a/benches/s3_client.rs +++ b/benches/s3_client.rs @@ -5,6 +5,7 @@ use aws_sdk_s3::Client; use aws_types::region::Region; use axum::body::Bytes; use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use reductionist::resource_manager::ResourceManager; use reductionist::s3_client::{S3Client, S3ClientMap}; use url::Url; // Bring trait into scope to use as_bytes method. @@ -42,6 +43,7 @@ fn criterion_benchmark(c: &mut Criterion) { let bucket = "s3-client-bench"; let runtime = tokio::runtime::Runtime::new().unwrap(); let map = S3ClientMap::new(); + let resource_manager = ResourceManager::new(None, None, None); for size_k in [64, 256, 1024] { let size: isize = size_k * 1024; let data: Vec = (0_u32..(size as u32)).collect::>(); @@ -53,7 +55,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.to_async(&runtime).iter(|| async { let client = S3Client::new(&url, username, password).await; client - .download_object(black_box(bucket), &key, None) + .download_object(black_box(bucket), &key, None, &resource_manager, &mut None) .await .unwrap(); }) @@ -63,7 +65,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.to_async(&runtime).iter(|| async { let client = map.get(&url, username, password).await; client - .download_object(black_box(bucket), &key, None) + .download_object(black_box(bucket), &key, None, &resource_manager, &mut None) .await .unwrap(); }) diff --git a/src/app.rs b/src/app.rs index f96fc78..4c54099 100644 --- a/src/app.rs +++ b/src/app.rs @@ -7,6 +7,7 @@ use crate::metrics::{metrics_handler, track_metrics}; use crate::models; use crate::operation; use crate::operations; +use crate::resource_manager::ResourceManager; use crate::s3_client; use crate::types::{ByteOrder, NATIVE_BYTE_ORDER}; use crate::validated_json::ValidatedJson; @@ -25,6 +26,7 @@ use axum::{ }; use std::sync::Arc; +use tokio::sync::SemaphorePermit; use tower::Layer; use tower::ServiceBuilder; use tower_http::normalize_path::NormalizePathLayer; @@ -54,14 +56,21 @@ struct AppState { /// Map of S3 client objects. s3_client_map: s3_client::S3ClientMap, + + /// Resource manager. + resource_manager: ResourceManager, } impl AppState { /// Create and return an [AppState]. fn new(args: &CommandLineArgs) -> Self { + let task_limit = args.thread_limit.or_else(|| Some(num_cpus::get() - 1)); + let resource_manager = + ResourceManager::new(args.s3_connection_limit, args.memory_limit, task_limit); Self { args: args.clone(), s3_client_map: s3_client::S3ClientMap::new(), + resource_manager, } } } @@ -176,14 +185,26 @@ async fn schema() -> &'static str { /// /// * `auth`: Basic authentication credentials /// * `request_data`: RequestData object for the request -#[tracing::instrument(level = "DEBUG", skip(client, request_data))] -async fn download_object( +#[tracing::instrument( + level = "DEBUG", + skip(client, request_data, resource_manager, mem_permits) +)] +async fn download_object<'a>( client: &s3_client::S3Client, request_data: &models::RequestData, + resource_manager: &'a ResourceManager, + mem_permits: &mut Option>, ) -> Result { let range = s3_client::get_range(request_data.offset, request_data.size); + let _conn_permits = resource_manager.s3_connection().await?; client - .download_object(&request_data.bucket, &request_data.object, range) + .download_object( + &request_data.bucket, + &request_data.object, + range, + resource_manager, + mem_permits, + ) .await } @@ -206,19 +227,27 @@ async fn operation_handler( TypedHeader(auth): TypedHeader>, ValidatedJson(request_data): ValidatedJson, ) -> Result { + let memory = request_data.size.unwrap_or(0); + let mut _mem_permits = state.resource_manager.memory(memory).await?; let s3_client = state .s3_client_map .get(&request_data.source, auth.username(), auth.password()) .instrument(tracing::Span::current()) .await; - let data = download_object(&s3_client, &request_data) - .instrument(tracing::Span::current()) - .await?; + let data = download_object( + &s3_client, + &request_data, + &state.resource_manager, + &mut _mem_permits, + ) + .instrument(tracing::Span::current()) + .await?; // All remaining work is synchronous. If the use_rayon argument was specified, delegate to the // Rayon thread pool. Otherwise, execute as normal using Tokio. if state.args.use_rayon { tokio_rayon::spawn(move || operation::(request_data, data)).await } else { + let _task_permit = state.resource_manager.task().await?; operation::(request_data, data) } } diff --git a/src/cli.rs b/src/cli.rs index 69d9477..ba6f450 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -37,6 +37,16 @@ pub struct CommandLineArgs { /// Whether to use Rayon for execution of CPU-bound tasks. #[arg(long, default_value_t = false, env = "REDUCTIONIST_USE_RAYON")] pub use_rayon: bool, + /// Memory limit in bytes. Default is no limit. + #[arg(long, env = "REDUCTIONIST_MEMORY_LIMIT")] + pub memory_limit: Option, + /// S3 connection limit. Default is no limit. + #[arg(long, env = "REDUCTIONIST_S3_CONNECTION_LIMIT")] + pub s3_connection_limit: Option, + /// Thread limit for CPU-bound tasks. Default is one less than the number of CPUs. Used only + /// when use_rayon is false. + #[arg(long, env = "REDUCTIONIST_THREAD_LIMIT")] + pub thread_limit: Option, } /// Returns parsed command line arguments. diff --git a/src/error.rs b/src/error.rs index 8b93c11..3491580 100644 --- a/src/error.rs +++ b/src/error.rs @@ -14,6 +14,7 @@ use ndarray::ShapeError; use serde::{Deserialize, Serialize}; use std::error::Error; use thiserror::Error; +use tokio::sync::AcquireError; use tracing::{event, Level}; use zune_inflate::errors::InflateDecodeErrors; @@ -41,9 +42,14 @@ pub enum ActiveStorageError { #[error("failed to convert from bytes to {type_name}")] FromBytes { type_name: &'static str }, + /// Incompatible missing data descriptor #[error("Incompatible value {0} for missing")] IncompatibleMissing(DValue), + /// Insufficient memory to process request + #[error("Insufficient memory to process request ({requested} > {total})")] + InsufficientMemory { requested: usize, total: usize }, + /// Error deserialising request data into RequestData #[error("request data is not valid")] RequestDataJsonRejection(#[from] JsonRejection), @@ -64,6 +70,10 @@ pub enum ActiveStorageError { #[error("error retrieving object from S3 storage")] S3GetObject(#[from] SdkError), + /// Error acquiring a semaphore + #[error("error acquiring resources")] + SemaphoreAcquireError(#[from] AcquireError), + /// Error creating ndarray ArrayView from Shape #[error("failed to create array from shape")] ShapeInvalid(#[from] ShapeError), @@ -196,6 +206,10 @@ impl From for ErrorResponse { | ActiveStorageError::DecompressionZune(_) | ActiveStorageError::EmptyArray { operation: _ } | ActiveStorageError::IncompatibleMissing(_) + | ActiveStorageError::InsufficientMemory { + requested: _, + total: _, + } | ActiveStorageError::RequestDataJsonRejection(_) | ActiveStorageError::RequestDataValidationSingle(_) | ActiveStorageError::RequestDataValidation(_) @@ -207,7 +221,8 @@ impl From for ErrorResponse { // Internal server error ActiveStorageError::FromBytes { type_name: _ } | ActiveStorageError::TryFromInt(_) - | ActiveStorageError::S3ByteStream(_) => Self::internal_server_error(&error), + | ActiveStorageError::S3ByteStream(_) + | ActiveStorageError::SemaphoreAcquireError(_) => Self::internal_server_error(&error), ActiveStorageError::S3GetObject(sdk_error) => { // Tailor the response based on the specific SdkError variant. @@ -377,6 +392,17 @@ mod tests { test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await; } + #[tokio::test] + async fn insufficient_memory() { + let error = ActiveStorageError::InsufficientMemory { + requested: 2, + total: 1, + }; + let message = "Insufficient memory to process request (2 > 1)"; + let caused_by = None; + test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await; + } + #[tokio::test] async fn request_data_validation_single() { let validation_error = validator::ValidationError::new("foo"); @@ -504,6 +530,17 @@ mod tests { .await; } + #[tokio::test] + async fn semaphore_acquire_error() { + let sem = tokio::sync::Semaphore::new(1); + sem.close(); + let error = ActiveStorageError::SemaphoreAcquireError(sem.acquire().await.unwrap_err()); + let message = "error acquiring resources"; + let caused_by = Some(vec!["semaphore closed"]); + test_active_storage_error(error, StatusCode::INTERNAL_SERVER_ERROR, message, caused_by) + .await; + } + #[tokio::test] async fn shape_error() { let error = ActiveStorageError::ShapeInvalid(ShapeError::from_kind( diff --git a/src/lib.rs b/src/lib.rs index 8505319..697a197 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -296,6 +296,7 @@ pub mod metrics; pub mod models; pub mod operation; pub mod operations; +pub mod resource_manager; pub mod s3_client; pub mod server; #[cfg(test)] diff --git a/src/resource_manager.rs b/src/resource_manager.rs new file mode 100644 index 0000000..0b4f1e6 --- /dev/null +++ b/src/resource_manager.rs @@ -0,0 +1,127 @@ +//! Resource management + +use crate::error::ActiveStorageError; + +use tokio::sync::{Semaphore, SemaphorePermit}; + +/// [crate::resource_manager::ResourceManager] provides a simple way to allocate various resources +/// to tasks. Resource management is performed using a Tokio Semaphore for each type of resource. +pub struct ResourceManager { + /// Optional semaphore for S3 connections. + s3_connections: Option, + + /// Optional semaphore for memory (bytes). + memory: Option, + + /// Optional total memory pool in bytes. + total_memory: Option, + + /// Optional semaphore for tasks. + tasks: Option, +} + +impl ResourceManager { + /// Returns a new ResourceManager object. + pub fn new( + s3_connection_limit: Option, + memory_limit: Option, + task_limit: Option, + ) -> Self { + Self { + s3_connections: s3_connection_limit.map(Semaphore::new), + memory: memory_limit.map(Semaphore::new), + total_memory: memory_limit, + tasks: task_limit.map(Semaphore::new), + } + } + + /// Acquire an S3 connection resource. + pub async fn s3_connection(&self) -> Result, ActiveStorageError> { + optional_acquire(&self.s3_connections, 1).await + } + + /// Acquire memory resource. + pub async fn memory( + &self, + bytes: usize, + ) -> Result, ActiveStorageError> { + if let Some(total_memory) = self.total_memory { + if bytes > total_memory { + return Err(ActiveStorageError::InsufficientMemory { + requested: bytes, + total: total_memory, + }); + }; + }; + optional_acquire(&self.memory, bytes).await + } + + /// Acquire a task resource. + pub async fn task(&self) -> Result, ActiveStorageError> { + optional_acquire(&self.tasks, 1).await + } +} + +/// Acquire permits on an optional Semaphore, if present. +async fn optional_acquire( + sem: &Option, + n: usize, +) -> Result, ActiveStorageError> { + let n = n.try_into()?; + if let Some(sem) = sem { + sem.acquire_many(n) + .await + .map(Some) + .map_err(|err| err.into()) + } else { + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use tokio::sync::TryAcquireError; + + #[tokio::test] + async fn no_resource_management() { + let rm = ResourceManager::new(None, None, None); + assert!(rm.s3_connections.is_none()); + assert!(rm.memory.is_none()); + assert!(rm.tasks.is_none()); + let _c = rm.s3_connection().await.unwrap(); + let _m = rm.memory(1).await.unwrap(); + let _t = rm.task().await.unwrap(); + assert!(_c.is_none()); + assert!(_m.is_none()); + assert!(_t.is_none()); + } + + #[tokio::test] + async fn full_resource_management() { + let rm = ResourceManager::new(Some(1), Some(1), Some(1)); + assert!(rm.s3_connections.is_some()); + assert!(rm.memory.is_some()); + assert!(rm.tasks.is_some()); + let _c = rm.s3_connection().await.unwrap(); + let _m = rm.memory(1).await.unwrap(); + let _t = rm.task().await.unwrap(); + assert!(_c.is_some()); + assert!(_m.is_some()); + assert!(_t.is_some()); + // Check that there are no more resources (without blocking). + assert_eq!( + rm.s3_connections.as_ref().unwrap().try_acquire().err(), + Some(TryAcquireError::NoPermits) + ); + assert_eq!( + rm.memory.as_ref().unwrap().try_acquire().err(), + Some(TryAcquireError::NoPermits) + ); + assert_eq!( + rm.tasks.as_ref().unwrap().try_acquire().err(), + Some(TryAcquireError::NoPermits) + ); + } +} diff --git a/src/s3_client.rs b/src/s3_client.rs index 69fa51f..3a80253 100644 --- a/src/s3_client.rs +++ b/src/s3_client.rs @@ -2,13 +2,14 @@ //! It attempts to hide the complexities of working with the AWS SDK for S3. use crate::error::ActiveStorageError; +use crate::resource_manager::ResourceManager; use aws_credential_types::Credentials; use aws_sdk_s3::Client; use aws_types::region::Region; use axum::body::Bytes; use hashbrown::HashMap; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, SemaphorePermit}; use tokio_stream::StreamExt; use tracing::Instrument; use url::Url; @@ -103,11 +104,15 @@ impl S3Client { /// * `bucket`: Name of the bucket /// * `key`: Name of the object in the bucket /// * `range`: Optional byte range - pub async fn download_object( + /// * `resource_manager`: ResourceManager object + /// * `mem_permits`: Optional SemaphorePermit for any memory resources reserved + pub async fn download_object<'a>( self: &S3Client, bucket: &str, key: &str, range: Option, + resource_manager: &'a ResourceManager, + mem_permits: &mut Option>, ) -> Result { let mut response = self .client @@ -119,6 +124,12 @@ impl S3Client { .instrument(tracing::Span::current()) .await?; let content_length = response.content_length(); + + // FIXME: how to account for compressed data? + if mem_permits.is_none() { + let memory = content_length.try_into()?; + *mem_permits = resource_manager.memory(memory).await?; + }; // The data returned by the S3 client does not have any alignment guarantees. In order to // reinterpret the data as an array of numbers with a higher alignment than 1, we need to // return the data in Bytes object in which the underlying data has a higher alignment.