Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: revamp integration tests #93

Merged
merged 12 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,55 @@ orbs:
rust: circleci/[email protected]

jobs:
integration-test:
integration-tests:
parameters:
module:
type: string
test:
type: string
machine:
image: ubuntu-2004:current
docker_layer_caching: true
working_directory: ~/kiwi
steps:
- checkout
- run:
name: Install Docker Compose
environment:
COMPOSE_VERSION: '1.29.2'
command: |
curl -L "https://github.com/docker/compose/releases/download/${COMPOSE_VERSION}/docker-compose-$(uname -s)-$(uname -m)" -o ~/docker-compose
chmod +x ~/docker-compose
sudo mv ~/docker-compose /usr/local/bin/docker-compose
- run:
name: Start all services declared in docker-compose.yml
# TODO: Implement a better way to wait for Kafka to be ready
command: docker-compose up -d && sleep 10
- when:
condition:
equal: [<< parameters.module >>, kafka]
steps:
- run:
name: Install Docker Compose
environment:
COMPOSE_VERSION: 'v2.24.7'
command: |
curl -L "https://github.com/docker/compose/releases/download/${COMPOSE_VERSION}/docker-compose-$(uname -s)-$(uname -m)" -o ~/docker-compose
chmod +x ~/docker-compose
sudo mv ~/docker-compose /usr/local/bin/docker-compose
- when:
condition:
equal: [<< parameters.module >>, kafka]
steps:
- run:
name: Start all services declared in docker-compose.yml
command: docker-compose up --wait
- run:
name: Build Test Docker Image
command: |
docker build -t kiwi:latest -f Dockerfile.ci .
- run:
name: Run Test
command: |
echo 'Running: << parameters.module >>::<< parameters.test >>'
NETWORK_NAME=$(docker network ls --filter name='kiwi' -q)
echo Network: $NETWORK_NAME
docker run --network $NETWORK_NAME --name kiwi-tests kiwi:latest cargo test --test '<< parameters.module >>' -- --nocapture --test-threads 1 --test '<< parameters.test >>'
[[ -n $NETWORK_NAME ]] && NETWORK_ARG="--network $NETWORK_NAME" || NETWORK_ARG=""
docker run $NETWORK_ARG \
--env 'BOOTSTRAP_SERVERS=kafka:19092' \
--name kiwi-tests \
kiwi:latest \
cargo test --test '<< parameters.module >>' -- --nocapture --test-threads=1

workflows:
kafka:
integration-tests:
jobs:
- integration-test:
- integration-tests:
matrix:
parameters:
module: ["kafka"]
test:
- "test_kafka_source"
- "test_closes_subscription_on_changed_metadata"
- "test_prioritizes_source_id"
module:
- kafka
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ services:
image: confluentinc/cp-kafka:latest
hostname: kafka
container_name: kafka
restart: always
healthcheck:
test: kafka-topics --list --bootstrap-server localhost:9092
interval: 10s
timeout: 20s
retries: 5
start_period: 10s
ports:
- "9092:9092"
- "29092:29092"
Expand Down
1 change: 1 addition & 0 deletions src/kiwi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ arc-swap = "1.7.0"

[dev-dependencies]
tempfile = "3"
nix = { version = "0.28.0", features = ["signal"] }
167 changes: 167 additions & 0 deletions src/kiwi/tests/common/kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use std::collections::{BTreeMap, BTreeSet};

use maplit::btreemap;
use rdkafka::admin::{AdminClient as RdKafkaAdminClient, AdminOptions, NewPartitions};
use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};

type KafkaAdminClient = RdKafkaAdminClient<DefaultClientContext>;

pub struct AdminClient {
inner: KafkaAdminClient,
options: AdminOptions,
topics: BTreeSet<String>,
}

impl AdminClient {
pub fn new(bootstrap_servers: &str) -> anyhow::Result<Self> {
Self::new_with_config(btreemap! {
"bootstrap.servers" => bootstrap_servers,
})
}

pub fn new_with_config<K, V>(config: BTreeMap<K, V>) -> anyhow::Result<Self>
where
K: Into<String>,
V: Into<String>,
{
let mut client_config = ClientConfig::new();

client_config.extend(config.into_iter().map(|(k, v)| (k.into(), v.into())));

let admin_client = client_config.create::<KafkaAdminClient>()?;

Ok(Self {
inner: admin_client,
options: Default::default(),
topics: Default::default(),
})
}

pub async fn create_random_topic(&mut self, num_partitions: i32) -> anyhow::Result<String> {
let topic = format!("test-{}", nanoid::nanoid!());

self.create_topic(&topic, num_partitions, 1).await?;

Ok(topic)
}

#[allow(dead_code)]
pub async fn create_named_topic(
&mut self,
topic: &str,
num_partitions: i32,
) -> anyhow::Result<()> {
self.create_topic(topic, num_partitions, 1).await
}

pub async fn update_partitions(
&mut self,
topic_name: &str,
new_partition_count: usize,
) -> anyhow::Result<()> {
let result = self
.inner
.create_partitions(
&[NewPartitions {
topic_name,
new_partition_count,
assignment: None,
}],
&self.options,
)
.await?;

result[0].as_ref().map_err(|(topic, error)| {
anyhow::anyhow!("Failed to add partitions to topic {}: {}", topic, error)
})?;

Ok(())
}

async fn create_topic(
&mut self,
topic: &str,
num_partitions: i32,
replication_factor: i32,
) -> anyhow::Result<()> {
let new_topic = rdkafka::admin::NewTopic::new(
topic,
num_partitions,
rdkafka::admin::TopicReplication::Fixed(replication_factor),
);

let result = self
.inner
.create_topics(&[new_topic], &self.options)
.await?;

result[0].as_ref().map_err(|(topic, error)| {
anyhow::anyhow!("Failed to create topic {}: {}", topic, error)
})?;

assert!(self.topics.insert(topic.to_string()));

Ok(())
}

async fn delete_topics(&mut self, topics: &[&str]) -> anyhow::Result<()> {
let result = self.inner.delete_topics(topics, &self.options).await?;

for result in result {
match result {
Ok(topic) => {
self.topics.remove(&topic);
}
Err((topic, error)) => {
return Err(anyhow::anyhow!(
"Failed to delete topic {}: {}",
topic,
error
));
}
}
}

Ok(())
}
}

impl Drop for AdminClient {
fn drop(&mut self) {
let topics = self.topics.clone();
let topics: Vec<&str> = topics.iter().map(|s| s.as_ref()).collect();

if !topics.is_empty() {
futures::executor::block_on(async { self.delete_topics(topics.as_slice()).await })
.expect("Failed to delete topics");
}
}
}

pub struct Producer {
inner: FutureProducer,
}

impl Producer {
pub fn new(bootstrap_servers: &str) -> anyhow::Result<Self> {
let producer: FutureProducer = rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", bootstrap_servers)
.set("message.timeout.ms", "5000")
.create()?;

Ok(Self { inner: producer })
}

pub async fn send(&self, topic: &str, key: &str, payload: &str) -> anyhow::Result<()> {
let record = FutureRecord::to(topic).payload(payload).key(key);

self.inner
.send(record, std::time::Duration::from_secs(0))
.await
.map_err(|(e, _)| anyhow::anyhow!("Failed to send message: {}", e))?;

Ok(())
}
}
68 changes: 68 additions & 0 deletions src/kiwi/tests/common/kiwi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::io::Write;

use anyhow::Context;
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
use tempfile::{NamedTempFile, TempPath};

pub struct Process {
proc: std::process::Child,
}

impl Process {
pub fn new_with_args(args: &[&str]) -> anyhow::Result<Self> {
let proc = std::process::Command::new("kiwi")
.args(args)
.spawn()
.context("failed to spawn kiwi process")?;

Ok(Self { proc })
}

pub fn kill(&mut self) {
self.proc.kill().expect("failed to kill kiwi process");
}

#[allow(dead_code)]
pub fn signal(&mut self, signal: Signal) {
signal::kill(Pid::from_raw(self.proc.id().try_into().unwrap()), signal)
.expect("failed to send signal to kiwi process");
}
}

impl Drop for Process {
fn drop(&mut self) {
self.kill();
}
}

pub struct ConfigFile {
inner: NamedTempFile,
}

impl ConfigFile {
pub fn from_str(s: &str) -> anyhow::Result<Self> {
let mut file = NamedTempFile::new().context("failed to create temporary file")?;
file.as_file_mut()
.write_all(s.as_bytes())
.context("failed to write config to temporary file")?;

Ok(Self { inner: file })
}

pub fn as_file_mut(&mut self) -> &std::fs::File {
self.inner.as_file_mut()
}

pub fn path(&self) -> &std::path::Path {
self.inner.path()
}

pub fn path_str(&self) -> &str {
self.path().to_str().expect("path is not valid utf-8")
}

pub fn into_temp_path(self) -> TempPath {
self.inner.into_temp_path()
}
}
3 changes: 3 additions & 0 deletions src/kiwi/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod kafka;
pub mod kiwi;
pub mod ws;
Loading
Loading