From a669044c291af0263e5d592244fa256f20c46c3c Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Sun, 27 Oct 2024 19:53:40 +0530 Subject: [PATCH] chore: Tests for pipeline forwarder (#2192) Signed-off-by: Sreekanth --- rust/numaflow-core/src/lib.rs | 2 +- rust/numaflow-core/src/pipeline.rs | 329 ++++++++++++++++++++++++++++- 2 files changed, 325 insertions(+), 6 deletions(-) diff --git a/rust/numaflow-core/src/lib.rs b/rust/numaflow-core/src/lib.rs index 6dc2020bf..e324c0ff3 100644 --- a/rust/numaflow-core/src/lib.rs +++ b/rust/numaflow-core/src/lib.rs @@ -74,7 +74,7 @@ pub async fn run() -> Result<()> { } CustomResourceType::Pipeline(config) => { info!("Starting pipeline forwarder with config: {:?}", config); - if let Err(e) = pipeline::start_forwarder(cln_token, &config).await { + if let Err(e) = pipeline::start_forwarder(cln_token, config).await { error!("Application error running pipeline: {:?}", e); // abort the signal handler task since we have an error and we are shutting down diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index a949dbe06..ffe1c0694 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -32,17 +32,17 @@ mod isb; /// Starts the appropriate forwarder based on the pipeline configuration. pub(crate) async fn start_forwarder( cln_token: CancellationToken, - config: &PipelineConfig, + config: PipelineConfig, ) -> Result<()> { let js_context = create_js_context(config.js_client_config.clone()).await?; match &config.vertex_config { pipeline::VertexType::Source(source) => { let buffer_writers = - create_buffer_writers(config, js_context.clone(), cln_token.clone()).await?; + create_buffer_writers(&config, js_context.clone(), cln_token.clone()).await?; let (source_type, source_grpc_client) = - create_source_type(source, config, cln_token.clone()).await?; + create_source_type(source, &config, cln_token.clone()).await?; let (transformer, transformer_grpc_client) = create_transformer(source, cln_token.clone()).await?; @@ -68,13 +68,13 @@ pub(crate) async fn start_forwarder( } pipeline::VertexType::Sink(sink) => { // Create buffer readers for each partition - let buffer_readers = create_buffer_readers(config, js_context.clone()).await?; + let buffer_readers = create_buffer_readers(&config, js_context.clone()).await?; // Create sink writers and clients let mut sink_writers = Vec::new(); for _ in &buffer_readers { let (sink_writer, sink_grpc_client, fb_sink_grpc_client) = - create_sink_writer(config, sink, cln_token.clone()).await?; + create_sink_writer(&config, sink, cln_token.clone()).await?; sink_writers.push((sink_writer, sink_grpc_client, fb_sink_grpc_client)); } @@ -302,3 +302,322 @@ async fn create_js_context(config: pipeline::isb::jetstream::ClientConfig) -> Re .map_err(|e| error::Error::Connection(e.to_string()))?; Ok(jetstream::new(js_client)) } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use async_nats::jetstream; + use async_nats::jetstream::{consumer, stream}; + use futures::StreamExt; + + use super::*; + + use crate::config::components::metrics::MetricsConfig; + use crate::config::components::sink::{BlackholeConfig, SinkConfig, SinkType}; + use crate::config::components::source::GeneratorConfig; + use crate::config::components::source::SourceConfig; + use crate::config::components::source::SourceType; + use crate::config::pipeline::PipelineConfig; + use crate::pipeline::pipeline::isb; + use crate::pipeline::pipeline::isb::{BufferReaderConfig, BufferWriterConfig}; + use crate::pipeline::pipeline::VertexType; + use crate::pipeline::pipeline::{FromVertexConfig, ToVertexConfig}; + use crate::pipeline::pipeline::{SinkVtxConfig, SourceVtxConfig}; + use crate::pipeline::tests::isb::BufferFullStrategy::RetryUntilSuccess; + + #[cfg(feature = "nats-tests")] + #[tokio::test] + async fn test_forwarder_for_source_vetex() { + // Unique names for the streams we use in this test + let streams = vec![ + "default-test-forwarder-for-source-vertex-out-0", + "default-test-forwarder-for-source-vertex-out-1", + "default-test-forwarder-for-source-vertex-out-2", + "default-test-forwarder-for-source-vertex-out-3", + "default-test-forwarder-for-source-vertex-out-4", + ]; + + let js_url = "localhost:4222"; + let client = async_nats::connect(js_url).await.unwrap(); + let context = jetstream::new(client); + + let mut consumers = vec![]; + // Create streams to which the generator source vertex we create later will forward + // messages to. The consumers created for the corresponding streams will be used to ensure + // that messages were actually written to the streams. + for stream_name in &streams { + let stream_name = *stream_name; + let _stream = context + .get_or_create_stream(stream::Config { + name: stream_name.into(), + subjects: vec![stream_name.into()], + max_message_size: 64 * 1024, + max_messages: 10000, + ..Default::default() + }) + .await + .unwrap(); + + let c: consumer::PullConsumer = context + .create_consumer_on_stream( + consumer::pull::Config { + name: Some(stream_name.to_string()), + ack_policy: consumer::AckPolicy::Explicit, + ..Default::default() + }, + stream_name, + ) + .await + .unwrap(); + consumers.push((stream_name.to_string(), c)); + } + + let pipeline_config = PipelineConfig { + pipeline_name: "simple-pipeline".to_string(), + vertex_name: "in".to_string(), + replica: 0, + batch_size: 1000, + paf_batch_size: 30000, + read_timeout: Duration::from_secs(1), + js_client_config: isb::jetstream::ClientConfig { + url: "localhost:4222".to_string(), + user: None, + password: None, + }, + from_vertex_config: vec![], + to_vertex_config: vec![ToVertexConfig { + name: "out".to_string(), + writer_config: BufferWriterConfig { + streams: streams + .iter() + .enumerate() + .map(|(i, stream_name)| (stream_name.to_string(), i as u16)) + .collect(), + partitions: 5, + max_length: 30000, + refresh_interval: Duration::from_secs(1), + usage_limit: 0.8, + buffer_full_strategy: RetryUntilSuccess, + retry_interval: Duration::from_millis(10), + }, + partitions: 5, + conditions: None, + }], + vertex_config: VertexType::Source(SourceVtxConfig { + source_config: SourceConfig { + source_type: SourceType::Generator(GeneratorConfig { + rpu: 10, + content: bytes::Bytes::new(), + duration: Duration::from_secs(1), + value: None, + key_count: 0, + msg_size_bytes: 300, + jitter: Duration::from_millis(0), + }), + }, + transformer_config: None, + }), + metrics_config: MetricsConfig { + metrics_server_listen_port: 2469, + lag_check_interval_in_secs: 5, + lag_refresh_interval_in_secs: 3, + }, + }; + + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let forwarder_task = tokio::spawn({ + let cancellation_token = cancellation_token.clone(); + async move { + start_forwarder(cancellation_token, pipeline_config) + .await + .unwrap(); + } + }); + + // Wait for a few messages to be forwarded + tokio::time::sleep(Duration::from_secs(2)).await; + cancellation_token.cancel(); + forwarder_task.await.unwrap(); + + for (stream_name, stream_consumer) in consumers { + let messages: Vec = stream_consumer + .batch() + .max_messages(10) + .expires(Duration::from_millis(50)) + .messages() + .await + .unwrap() + .map(|msg| msg.unwrap()) + .collect() + .await; + assert!( + !messages.is_empty(), + "Stream {} is expected to have messages", + stream_name + ); + } + + // Delete all streams created in this test + for stream_name in streams { + context.delete_stream(stream_name).await.unwrap(); + } + } + + #[cfg(feature = "nats-tests")] + #[tokio::test] + async fn test_forwarder_for_sink_vetex() { + // Unique names for the streams we use in this test + let streams = vec![ + "default-test-forwarder-for-sink-vertex-out-0", + "default-test-forwarder-for-sink-vertex-out-1", + "default-test-forwarder-for-sink-vertex-out-2", + "default-test-forwarder-for-sink-vertex-out-3", + "default-test-forwarder-for-sink-vertex-out-4", + ]; + + let js_url = "localhost:4222"; + let client = async_nats::connect(js_url).await.unwrap(); + let context = jetstream::new(client); + + const MESSAGE_COUNT: usize = 10; + let mut consumers = vec![]; + // Create streams to which the generator source vertex we create later will forward + // messages to. The consumers created for the corresponding streams will be used to ensure + // that messages were actually written to the streams. + for stream_name in &streams { + let stream_name = *stream_name; + // Delete stream if it exists + let _ = context.delete_stream(stream_name).await; + let _stream = context + .get_or_create_stream(stream::Config { + name: stream_name.into(), + subjects: vec![stream_name.into()], + max_message_size: 64 * 1024, + max_messages: 10000, + ..Default::default() + }) + .await + .unwrap(); + + // Publish some messages into the stream + use crate::message::{Message, MessageID, Offset, StringOffset}; + use chrono::{TimeZone, Utc}; + let message = Message { + keys: vec!["key1".to_string()], + value: vec![1, 2, 3].into(), + offset: Some(Offset::String(StringOffset::new("123".to_string(), 0))), + event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), + id: MessageID { + vertex_name: "vertex".to_string(), + offset: "123".to_string(), + index: 0, + }, + headers: HashMap::new(), + }; + let message: bytes::BytesMut = message.try_into().unwrap(); + + for _ in 0..MESSAGE_COUNT { + context + .publish(stream_name.to_string(), message.clone().into()) + .await + .unwrap() + .await + .unwrap(); + } + + let c: consumer::PullConsumer = context + .create_consumer_on_stream( + consumer::pull::Config { + name: Some(stream_name.to_string()), + ack_policy: consumer::AckPolicy::Explicit, + ..Default::default() + }, + stream_name, + ) + .await + .unwrap(); + consumers.push((stream_name.to_string(), c)); + } + + let pipeline_config = PipelineConfig { + pipeline_name: "simple-pipeline".to_string(), + vertex_name: "in".to_string(), + replica: 0, + batch_size: 1000, + paf_batch_size: 30000, + read_timeout: Duration::from_secs(1), + js_client_config: isb::jetstream::ClientConfig { + url: "localhost:4222".to_string(), + user: None, + password: None, + }, + to_vertex_config: vec![], + from_vertex_config: vec![FromVertexConfig { + name: "in".to_string(), + reader_config: BufferReaderConfig { + partitions: 5, + streams: streams + .iter() + .enumerate() + .map(|(i, key)| (key.to_string(), i as u16)) + .collect(), + batch_size: 500, + read_timeout: Duration::from_secs(1), + wip_ack_interval: Duration::from_secs(1), + }, + partitions: 0, + }], + vertex_config: VertexType::Sink(SinkVtxConfig { + sink_config: SinkConfig { + sink_type: SinkType::Blackhole(BlackholeConfig::default()), + retry_config: None, + }, + fb_sink_config: None, + }), + metrics_config: MetricsConfig { + metrics_server_listen_port: 2469, + lag_check_interval_in_secs: 5, + lag_refresh_interval_in_secs: 3, + }, + }; + + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let forwarder_task = tokio::spawn({ + let cancellation_token = cancellation_token.clone(); + async move { + start_forwarder(cancellation_token, pipeline_config) + .await + .unwrap(); + } + }); + + // Wait for a few messages to be forwarded + tokio::time::sleep(Duration::from_secs(3)).await; + cancellation_token.cancel(); + // token cancellation is not aborting the forwarder since we fetch messages from jetstream + // as a stream of messages (not using `consumer.batch()`). + // See `JetstreamReader::start` method in src/pipeline/isb/jetstream/reader.rs + //forwarder_task.await.unwrap(); + forwarder_task.abort(); + + for (stream_name, mut stream_consumer) in consumers { + let stream_info = stream_consumer.info().await.unwrap(); + assert_eq!( + stream_info.delivered.stream_sequence, MESSAGE_COUNT as u64, + "Stream={}, expected delivered stream sequence to be {}, current value is {}", + stream_name, MESSAGE_COUNT, stream_info.delivered.stream_sequence + ); + assert_eq!( + stream_info.ack_floor.stream_sequence, MESSAGE_COUNT as u64, + "Stream={}, expected ack'ed stream sequence to be {}, current value is {}", + stream_name, MESSAGE_COUNT, stream_info.ack_floor.stream_sequence + ); + } + + // Delete all streams created in this test + for stream_name in streams { + context.delete_stream(stream_name).await.unwrap(); + } + } +}