Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add compaction server supporting remote compaction service #1547

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c1729f4
feat: add compaction server supporting remote compaction service
LeslieKid Jul 18, 2024
41f166b
fix style.
LeslieKid Jul 21, 2024
51666ca
fix style.
LeslieKid Jul 22, 2024
3d007e1
define error for compaction service.
LeslieKid Jul 23, 2024
453b22c
enable conversation from request to task.
LeslieKid Jul 23, 2024
b549119
update remote compact execution.
LeslieKid Jul 23, 2024
ec60fdc
enable conversation from task result to response.
LeslieKid Jul 25, 2024
d3b4db2
introduce CompactionCluster for compaction server in distribute mode.
LeslieKid Aug 5, 2024
798dd41
enable compaction cluster deployment.
LeslieKid Aug 6, 2024
cd7c9ae
refactor: replace CompactionCluster with ClusterType.
LeslieKid Aug 7, 2024
818d61f
remove compaction cluster,
LeslieKid Aug 7, 2024
31a306c
fix style and comment.
LeslieKid Aug 7, 2024
29894f8
provide cluster type for communication between horaedb/cs (as client)…
LeslieKid Aug 7, 2024
12f135a
introduce compaction client for horaedb to access remote compaction n…
LeslieKid Aug 12, 2024
a999f41
impl compact in Cluster trait.
LeslieKid Aug 14, 2024
165c18d
fix style and add comment.
LeslieKid Aug 14, 2024
f306c14
impl type conversation.
LeslieKid Aug 16, 2024
c01ef00
remove dead code.
LeslieKid Aug 16, 2024
e170f65
Merge branch 'main' into remote-compaction-service
LeslieKid Aug 16, 2024
f2c0e38
remove cluster type in meta client.
LeslieKid Aug 16, 2024
421d2c0
Merge branch 'main' into remote-compaction-service
LeslieKid Aug 16, 2024
c6521a4
fix bug
LeslieKid Aug 16, 2024
723a533
update Cargo.lock
LeslieKid Aug 16, 2024
3da8fe5
fix style.
LeslieKid Aug 20, 2024
debabab
rename ClusterType to NodeType.
LeslieKid Aug 20, 2024
03af564
fix style.
LeslieKid Aug 21, 2024
16d1f3d
impl default.
LeslieKid Aug 26, 2024
3f1c562
support conversation from compaction runner task to execute compactio…
LeslieKid Aug 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ license = "Apache-2.0"
[workspace]
resolver = "2"
# In alphabetical order
members = [
members = [
"horaectl",
"integration_tests",
"integration_tests/sdk/rust",
Expand All @@ -33,7 +33,8 @@ members = [
"src/catalog",
"src/catalog_impls",
"src/cluster",
"src/common_types",
"src/common_types",
"src/compaction_client",
"src/components/alloc_tracker",
"src/components/arena",
"src/components/arrow_ext",
Expand Down Expand Up @@ -101,7 +102,8 @@ thiserror = "1"
bytes_ext = { path = "src/components/bytes_ext" }
catalog = { path = "src/catalog" }
catalog_impls = { path = "src/catalog_impls" }
horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "19ece8f771fc0b3e8e734072cc3d8040de6c74cb" }
# TODO(leslie): modify it when the related pr in incubator-horaedb-proto is merged.
horaedbproto = { git = "https://github.com/LeslieKid/incubator-horaedb-proto.git", rev = "c8a073af23a7e5d29ec88011dd3c5abeffd37d23" }
codec = { path = "src/components/codec" }
chrono = "0.4"
clap = { version = "4.5.1", features = ["derive"] }
Expand All @@ -110,6 +112,7 @@ cluster = { path = "src/cluster" }
criterion = "0.5"
horaedb-client = "1.0.2"
common_types = { path = "src/common_types" }
compaction_client = { path = "src/compaction_client" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
derive_builder = "0.12"
Expand Down
91 changes: 84 additions & 7 deletions src/analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
use std::{collections::HashMap, fmt, str::FromStr, sync::Arc};

use common_types::COMPACTION_STRATEGY;
use generic_error::{BoxError, GenericError};
use macros::define_result;
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use snafu::{ensure, Backtrace, GenerateBacktrace, ResultExt, Snafu};
use snafu::{ensure, Backtrace, GenerateBacktrace, OptionExt, ResultExt, Snafu};
use time_ext::TimeUnit;
use tokio::sync::oneshot;

use crate::{
compaction::picker::{CommonCompactionPicker, CompactionPickerRef},
sst::file::{FileHandle, Level},
sst::file::{FileHandle, FileMeta, FilePurgeQueue, Level},
table::data::TableDataRef,
};

Expand Down Expand Up @@ -72,8 +74,22 @@ pub enum Error {
},
#[snafu(display("Invalid compaction option value, err: {}", error))]
InvalidOption { error: String, backtrace: Backtrace },

#[snafu(display("Empty file meta.\nBacktrace:\n{}", backtrace))]
EmptyFileMeta { backtrace: Backtrace },

#[snafu(display("Failed to convert file meta, err:{}", source))]
ConvertFileMeta { source: GenericError },

#[snafu(display("Empty purge queue.\nBacktrace:\n{}", backtrace))]
EmptyPurgeQueue { backtrace: Backtrace },

#[snafu(display("Failed to convert level, err:{}", source))]
ConvertLevel { source: GenericError },
}

define_result!(Error);

#[derive(Debug, Clone, Copy, Deserialize, Default, PartialEq, Serialize)]
pub enum CompactionStrategy {
#[default]
Expand Down Expand Up @@ -145,7 +161,7 @@ impl CompactionStrategy {
pub(crate) fn parse_from(
value: &str,
options: &HashMap<String, String>,
) -> Result<CompactionStrategy, Error> {
) -> Result<CompactionStrategy> {
match value.trim().to_lowercase().as_str() {
DEFAULT_STRATEGY => Ok(CompactionStrategy::Default),
STC_STRATEGY => Ok(CompactionStrategy::SizeTiered(
Expand Down Expand Up @@ -182,7 +198,7 @@ impl CompactionStrategy {
}

impl SizeTieredCompactionOptions {
pub(crate) fn validate(&self) -> Result<(), Error> {
pub(crate) fn validate(&self) -> Result<()> {
ensure!(
self.bucket_high > self.bucket_low,
InvalidOption {
Expand Down Expand Up @@ -215,7 +231,7 @@ impl SizeTieredCompactionOptions {

pub(crate) fn parse_from(
options: &HashMap<String, String>,
) -> Result<SizeTieredCompactionOptions, Error> {
) -> Result<SizeTieredCompactionOptions> {
let mut opts = SizeTieredCompactionOptions::default();
if let Some(v) = options.get(BUCKET_LOW_KEY) {
opts.bucket_low = v.parse().context(ParseFloat {
Expand Down Expand Up @@ -278,7 +294,7 @@ impl TimeWindowCompactionOptions {
);
}

pub(crate) fn validate(&self) -> Result<(), Error> {
pub(crate) fn validate(&self) -> Result<()> {
if !Self::valid_timestamp_unit(self.timestamp_resolution) {
return InvalidOption {
error: format!(
Expand All @@ -294,7 +310,7 @@ impl TimeWindowCompactionOptions {

pub(crate) fn parse_from(
options: &HashMap<String, String>,
) -> Result<TimeWindowCompactionOptions, Error> {
) -> Result<TimeWindowCompactionOptions> {
let mut opts = TimeWindowCompactionOptions {
size_tiered: SizeTieredCompactionOptions::parse_from(options)?,
..Default::default()
Expand Down Expand Up @@ -326,6 +342,67 @@ pub struct CompactionInputFiles {
pub output_level: Level,
}

impl TryFrom<horaedbproto::compaction_service::CompactionInputFiles> for CompactionInputFiles {
type Error = Error;

fn try_from(value: horaedbproto::compaction_service::CompactionInputFiles) -> Result<Self> {
let level: Level = value.level.try_into().box_err().context(ConvertLevel)?;
let output_level: Level = value
.output_level
.try_into()
.box_err()
.context(ConvertLevel)?;

let mut files: Vec<FileHandle> = Vec::with_capacity(value.files.len());
for file in value.files {
let meta: FileMeta = file
.meta
.context(EmptyFileMeta)?
.try_into()
.box_err()
.context(ConvertFileMeta)?;

let purge_queue: FilePurgeQueue = file.purge_queue.context(EmptyPurgeQueue)?.into();

files.push({
let handle = FileHandle::new(meta, purge_queue);
handle.set_being_compacted(file.being_compacted);
handle
});
}

Ok(CompactionInputFiles {
level,
files,
output_level,
})
}
}

impl From<CompactionInputFiles> for horaedbproto::compaction_service::CompactionInputFiles {
fn from(value: CompactionInputFiles) -> Self {
let mut files = Vec::with_capacity(value.files.len());
for file in value.files {
let handle = horaedbproto::compaction_service::FileHandle {
meta: Some(file.meta().into()),
purge_queue: Some(horaedbproto::compaction_service::FilePurgeQueue {
space_id: file.space_id(),
table_id: file.table_id().into(),
}),
being_compacted: file.being_compacted(),
metrics: Some(horaedbproto::compaction_service::SstMetrics {}),
};
files.push(handle);
}

Self {
level: value.level.as_u32(),
files,
output_level: value.output_level.as_u32(),
}
}
}

#[derive(Debug, Default, Clone)]
pub struct ExpiredFiles {
/// Level of the expired files.
Expand Down
1 change: 1 addition & 0 deletions src/analytic_engine/src/compaction/runner/local_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::{
const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;

/// Executor carrying for actual compaction work
#[derive(Clone)]
pub struct LocalCompactionRunner {
runtime: Arc<Runtime>,
scan_options: ScanOptions,
Expand Down
Loading
Loading