diff --git a/Cargo.lock b/Cargo.lock index c904497a31..0c5df0b9be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1460,6 +1460,7 @@ dependencies = [ "bytes_ext", "catalog", "common_types", + "compaction_client", "etcd-client", "future_ext", "generic_error", @@ -1559,6 +1560,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "compaction_client" +version = "2.0.0" +dependencies = [ + "async-trait", + "common_types", + "futures 0.3.28", + "generic_error", + "horaedbproto 2.0.0", + "logger", + "macros", + "prost 0.11.8", + "reqwest", + "serde", + "serde_json", + "snafu 0.6.10", + "time_ext", + "tokio", + "tonic 0.8.3", + "url", +] + [[package]] name = "concurrent-queue" version = "2.1.0" @@ -3062,6 +3085,8 @@ dependencies = [ "catalog_impls", "clap", "cluster", + "common_types", + "compaction_client", "datafusion", "df_operator", "etcd-client", @@ -3135,7 +3160,7 @@ dependencies = [ [[package]] name = "horaedbproto" version = "2.0.0" -source = "git+https://github.com/apache/incubator-horaedb-proto.git?rev=19ece8f771fc0b3e8e734072cc3d8040de6c74cb#19ece8f771fc0b3e8e734072cc3d8040de6c74cb" +source = "git+https://github.com/LeslieKid/incubator-horaedb-proto.git?rev=8c9fb72d95476caf867cd62993be6664e34c5ce5#8c9fb72d95476caf867cd62993be6664e34c5ce5" dependencies = [ "prost 0.11.8", "protoc-bin-vendored", @@ -6005,6 +6030,7 @@ dependencies = [ "async-trait", "cluster", "common_types", + "compaction_client", "generic_error", "horaedbproto 2.0.0", "logger", diff --git a/Cargo.toml b/Cargo.toml index 4563acf5c2..6f6efb7f34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ license = "Apache-2.0" [workspace] resolver = "2" # In alphabetical order -members = [ +members = [ "horaectl", "integration_tests", "integration_tests/sdk/rust", @@ -33,7 +33,8 @@ members = [ "src/catalog", "src/catalog_impls", "src/cluster", - "src/common_types", + "src/common_types", + "src/compaction_client", "src/components/alloc_tracker", "src/components/arena", "src/components/arrow_ext", @@ -101,7 +102,8 @@ thiserror = "1" bytes_ext = { path = "src/components/bytes_ext" } catalog = { path = "src/catalog" } catalog_impls = { path = "src/catalog_impls" } -horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "19ece8f771fc0b3e8e734072cc3d8040de6c74cb" } +# TODO(leslie): modify it when the related pr in incubator-horaedb-proto is merged. +horaedbproto = { git = "https://github.com/LeslieKid/incubator-horaedb-proto.git", rev = "8c9fb72d95476caf867cd62993be6664e34c5ce5" } codec = { path = "src/components/codec" } chrono = "0.4" clap = { version = "4.5.1", features = ["derive"] } @@ -110,6 +112,7 @@ cluster = { path = "src/cluster" } criterion = "0.5" horaedb-client = "1.0.2" common_types = { path = "src/common_types" } +compaction_client = { path = "src/compaction_client" } datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" } datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" } derive_builder = "0.12" diff --git a/horaemeta/Makefile b/horaemeta/Makefile index b2a8e55903..08d679acb8 100644 --- a/horaemeta/Makefile +++ b/horaemeta/Makefile @@ -30,7 +30,7 @@ install-tools: @mkdir -p $(GO_TOOLS_BIN_PATH) @(which golangci-lint && golangci-lint version | grep '1.54') >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.54.2 -META_PKG := github.com/apache/incubator-horaedb-meta +META_PKG := github.com/LeslieKid/incubator-horaedb-meta PACKAGES := $(shell go list ./... | tail -n +2) PACKAGE_DIRECTORIES := $(subst $(META_PKG)/,,$(PACKAGES)) diff --git a/horaemeta/go.mod b/horaemeta/go.mod index d04cd9d166..8610b1d8a1 100644 --- a/horaemeta/go.mod +++ b/horaemeta/go.mod @@ -3,7 +3,7 @@ module github.com/apache/incubator-horaedb-meta go 1.21 require ( - github.com/apache/incubator-horaedb-proto/golang v0.0.0-20231228071726-92152841fc8a + github.com/LeslieKid/incubator-horaedb-proto/golang v0.0.0-20240819012818-8c9fb72d9547 github.com/caarlos0/env/v6 v6.10.1 github.com/julienschmidt/httprouter v1.3.0 github.com/looplab/fsm v0.3.0 diff --git a/horaemeta/go.sum b/horaemeta/go.sum index 9195584dc6..7abb741a82 100644 --- a/horaemeta/go.sum +++ b/horaemeta/go.sum @@ -1,9 +1,9 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/LeslieKid/incubator-horaedb-proto/golang v0.0.0-20240819012818-8c9fb72d9547 h1:X/YU9jswF/FTnBANq+2SprBn4pA9Ca93PuA2kZPeV/s= +github.com/LeslieKid/incubator-horaedb-proto/golang v0.0.0-20240819012818-8c9fb72d9547/go.mod h1:lNDTvwHO70v9gtVzaefpPmBdTs63hs57wFT2WdmJI+Q= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/apache/incubator-horaedb-proto/golang v0.0.0-20231228071726-92152841fc8a h1:lQVr4wkixN4N5dOKRUBU1lnqkiHTFg6Im+tqJ8Y7XOE= -github.com/apache/incubator-horaedb-proto/golang v0.0.0-20231228071726-92152841fc8a/go.mod h1:Ch92HPIAoGbrgFCtpSgxcYSRgWdpNsIcPG1lfv24Ufs= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/horaemeta/server/cluster/metadata/cluster_metadata.go b/horaemeta/server/cluster/metadata/cluster_metadata.go index d68b4d9b36..5e110adfdc 100644 --- a/horaemeta/server/cluster/metadata/cluster_metadata.go +++ b/horaemeta/server/cluster/metadata/cluster_metadata.go @@ -28,6 +28,7 @@ import ( "sort" "sync" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/clusterpb" "github.com/apache/incubator-horaedb-meta/server/id" "github.com/apache/incubator-horaedb-meta/server/storage" "github.com/pkg/errors" @@ -53,7 +54,13 @@ type ClusterMetadata struct { topologyManager TopologyManager // Manage the registered nodes from heartbeat. - registeredNodesCache map[string]RegisteredNode // nodeName -> NodeName + // TODO(leslie): Rename it to registeredHoraedbNodesCache? + registeredNodesCache map[string]RegisteredNode // nodeName -> NodeName + registeredCompactionNodesCache map[string]RegisteredNode // nodeName -> NodeName + + // Maintain a list to store the keys of compaction nodes. + // TODO(leslie): Unused now, will be used in compaction nodes scheduling strategy. + compactionNodesKeyList []string storage storage.Storage kv clientv3.KV @@ -67,16 +74,18 @@ func NewClusterMetadata(logger *zap.Logger, meta storage.Cluster, storage storag shardIDAlloc := id.NewReusableAllocatorImpl([]uint64{}, MinShardID) cluster := &ClusterMetadata{ - logger: logger, - clusterID: meta.ID, - lock: sync.RWMutex{}, - metaData: meta, - tableManager: NewTableManagerImpl(logger, storage, meta.ID, schemaIDAlloc, tableIDAlloc), - topologyManager: NewTopologyManagerImpl(logger, storage, meta.ID, shardIDAlloc), - registeredNodesCache: map[string]RegisteredNode{}, - storage: storage, - kv: kv, - shardIDAlloc: shardIDAlloc, + logger: logger, + clusterID: meta.ID, + lock: sync.RWMutex{}, + metaData: meta, + tableManager: NewTableManagerImpl(logger, storage, meta.ID, schemaIDAlloc, tableIDAlloc), + topologyManager: NewTopologyManagerImpl(logger, storage, meta.ID, shardIDAlloc), + registeredNodesCache: map[string]RegisteredNode{}, + registeredCompactionNodesCache: map[string]RegisteredNode{}, + compactionNodesKeyList: []string{}, + storage: storage, + kv: kv, + shardIDAlloc: shardIDAlloc, } return cluster @@ -437,6 +446,20 @@ func (c *ClusterMetadata) GetShardNodeByTableIDs(tableIDs []storage.TableID) (Ge func (c *ClusterMetadata) RegisterNode(ctx context.Context, registeredNode RegisteredNode) error { registeredNode.Node.State = storage.NodeStateOnline + // Register compaction node. + if registeredNode.Node.NodeStats.NodeType == clusterpb.NodeType_CompactionServer { + c.lock.Lock() + defer c.lock.Unlock() + + _, exists := c.registeredCompactionNodesCache[registeredNode.Node.Name] + c.registeredCompactionNodesCache[registeredNode.Node.Name] = registeredNode + + if !exists { + c.compactionNodesKeyList = append(c.compactionNodesKeyList, registeredNode.Node.Name) + } + return nil + } + err := c.storage.CreateOrUpdateNode(ctx, storage.CreateOrUpdateNodeRequest{ ClusterID: c.clusterID, Node: registeredNode.Node, @@ -456,10 +479,11 @@ func (c *ClusterMetadata) RegisterNode(ctx context.Context, registeredNode Regis } } - // Update shard node mapping. - // Check whether to update persistence data. + // Register horaedb node. oldCache, exists := c.registeredNodesCache[registeredNode.Node.Name] c.registeredNodesCache[registeredNode.Node.Name] = registeredNode + + // Check whether to update persistent data. enableUpdateWhenStable := c.metaData.TopologyType == storage.TopologyTypeDynamic if !enableUpdateWhenStable && c.topologyManager.GetClusterState() == storage.ClusterStateStable { return nil @@ -470,6 +494,7 @@ func (c *ClusterMetadata) RegisterNode(ctx context.Context, registeredNode Regis return nil } + // Update shard node mapping. shardNodes := make(map[string][]storage.ShardNode, 1) shardNodes[registeredNode.Node.Name] = make([]storage.ShardNode, 0, len(registeredNode.ShardInfos)) for _, shardInfo := range registeredNode.ShardInfos { @@ -498,6 +523,17 @@ func (c *ClusterMetadata) GetRegisteredNodes() []RegisteredNode { return nodes } +func (c *ClusterMetadata) GetRegisteredCompactionNodes() []RegisteredNode { + c.lock.RLock() + defer c.lock.RUnlock() + + nodes := make([]RegisteredNode, 0, len(c.registeredCompactionNodesCache)) + for _, node := range c.registeredCompactionNodesCache { + nodes = append(nodes, node) + } + return nodes +} + func (c *ClusterMetadata) GetRegisteredNodeByName(nodeName string) (RegisteredNode, bool) { c.lock.RLock() defer c.lock.RUnlock() diff --git a/horaemeta/server/cluster/metadata/types.go b/horaemeta/server/cluster/metadata/types.go index f45117b268..770f9c7a41 100644 --- a/horaemeta/server/cluster/metadata/types.go +++ b/horaemeta/server/cluster/metadata/types.go @@ -23,7 +23,7 @@ import ( "time" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/pkg/errors" ) diff --git a/horaemeta/server/coordinator/eventdispatch/dispatch_impl.go b/horaemeta/server/coordinator/eventdispatch/dispatch_impl.go index c9dc07f9a2..95ef12f2b7 100644 --- a/horaemeta/server/coordinator/eventdispatch/dispatch_impl.go +++ b/horaemeta/server/coordinator/eventdispatch/dispatch_impl.go @@ -26,7 +26,7 @@ import ( "github.com/apache/incubator-horaedb-meta/pkg/coderr" "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" "github.com/apache/incubator-horaedb-meta/server/service" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaeventpb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaeventpb" "github.com/pkg/errors" "google.golang.org/grpc" ) diff --git a/horaemeta/server/coordinator/factory.go b/horaemeta/server/coordinator/factory.go index 9e12c7f109..32697ae2b3 100644 --- a/horaemeta/server/coordinator/factory.go +++ b/horaemeta/server/coordinator/factory.go @@ -33,7 +33,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/operation/transferleader" "github.com/apache/incubator-horaedb-meta/server/id" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/pkg/errors" "go.uber.org/zap" ) diff --git a/horaemeta/server/coordinator/factory_test.go b/horaemeta/server/coordinator/factory_test.go index 04c83c67a5..d055401acf 100644 --- a/horaemeta/server/coordinator/factory_test.go +++ b/horaemeta/server/coordinator/factory_test.go @@ -27,7 +27,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/stretchr/testify/require" "go.uber.org/zap" ) diff --git a/horaemeta/server/coordinator/procedure/ddl/common_util.go b/horaemeta/server/coordinator/procedure/ddl/common_util.go index aa55e7570a..4e418f7ada 100644 --- a/horaemeta/server/coordinator/procedure/ddl/common_util.go +++ b/horaemeta/server/coordinator/procedure/ddl/common_util.go @@ -27,7 +27,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator/eventdispatch" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/pkg/errors" "go.uber.org/zap" ) diff --git a/horaemeta/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go b/horaemeta/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go index cb7ac55fce..4b995451a8 100644 --- a/horaemeta/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go +++ b/horaemeta/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go @@ -32,7 +32,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/looplab/fsm" "github.com/pkg/errors" "go.uber.org/zap" diff --git a/horaemeta/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table_test.go b/horaemeta/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table_test.go index 2c9f7922bb..77804ba0c7 100644 --- a/horaemeta/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table_test.go +++ b/horaemeta/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table_test.go @@ -28,7 +28,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl/createpartitiontable" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/stretchr/testify/require" ) diff --git a/horaemeta/server/coordinator/procedure/ddl/createtable/create_table.go b/horaemeta/server/coordinator/procedure/ddl/createtable/create_table.go index 292f0900ae..9099ba0b7c 100644 --- a/horaemeta/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/horaemeta/server/coordinator/procedure/ddl/createtable/create_table.go @@ -30,7 +30,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/looplab/fsm" "github.com/pkg/errors" "go.uber.org/zap" diff --git a/horaemeta/server/coordinator/procedure/ddl/createtable/create_table_test.go b/horaemeta/server/coordinator/procedure/ddl/createtable/create_table_test.go index 1dd08932b9..949381153d 100644 --- a/horaemeta/server/coordinator/procedure/ddl/createtable/create_table_test.go +++ b/horaemeta/server/coordinator/procedure/ddl/createtable/create_table_test.go @@ -27,7 +27,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl/createtable" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/stretchr/testify/require" ) diff --git a/horaemeta/server/coordinator/procedure/ddl/droppartitiontable/create_drop_partition_table_test.go b/horaemeta/server/coordinator/procedure/ddl/droppartitiontable/create_drop_partition_table_test.go index 6ec07bff4e..139e56b0ff 100644 --- a/horaemeta/server/coordinator/procedure/ddl/droppartitiontable/create_drop_partition_table_test.go +++ b/horaemeta/server/coordinator/procedure/ddl/droppartitiontable/create_drop_partition_table_test.go @@ -33,8 +33,8 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl/droppartitiontable" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/clusterpb" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/clusterpb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/stretchr/testify/require" ) diff --git a/horaemeta/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go b/horaemeta/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go index f47ba773ec..162070df9b 100644 --- a/horaemeta/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go +++ b/horaemeta/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go @@ -31,7 +31,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/looplab/fsm" "github.com/pkg/errors" "go.uber.org/zap" diff --git a/horaemeta/server/coordinator/procedure/ddl/droptable/create_drop_table_test.go b/horaemeta/server/coordinator/procedure/ddl/droptable/create_drop_table_test.go index 06f3a2f15f..4dd363a544 100644 --- a/horaemeta/server/coordinator/procedure/ddl/droptable/create_drop_table_test.go +++ b/horaemeta/server/coordinator/procedure/ddl/droptable/create_drop_table_test.go @@ -31,7 +31,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl/droptable" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/stretchr/testify/require" ) diff --git a/horaemeta/server/coordinator/procedure/ddl/droptable/drop_table.go b/horaemeta/server/coordinator/procedure/ddl/droptable/drop_table.go index 6f704b7f7c..1f5611b9f6 100644 --- a/horaemeta/server/coordinator/procedure/ddl/droptable/drop_table.go +++ b/horaemeta/server/coordinator/procedure/ddl/droptable/drop_table.go @@ -30,7 +30,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure" "github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/looplab/fsm" "github.com/pkg/errors" "go.uber.org/zap" diff --git a/horaemeta/server/coordinator/watch/watch.go b/horaemeta/server/coordinator/watch/watch.go index 18b1a0eacd..ab2f9afa7a 100644 --- a/horaemeta/server/coordinator/watch/watch.go +++ b/horaemeta/server/coordinator/watch/watch.go @@ -27,7 +27,7 @@ import ( "sync" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaeventpb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaeventpb" "github.com/pkg/errors" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" diff --git a/horaemeta/server/coordinator/watch/watch_test.go b/horaemeta/server/coordinator/watch/watch_test.go index 41e6628d56..ff0fca60b2 100644 --- a/horaemeta/server/coordinator/watch/watch_test.go +++ b/horaemeta/server/coordinator/watch/watch_test.go @@ -26,7 +26,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/etcdutil" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaeventpb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaeventpb" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" diff --git a/horaemeta/server/member/member.go b/horaemeta/server/member/member.go index 9239498144..08404cdd5a 100644 --- a/horaemeta/server/member/member.go +++ b/horaemeta/server/member/member.go @@ -27,7 +27,7 @@ import ( "github.com/apache/incubator-horaedb-meta/pkg/log" "github.com/apache/incubator-horaedb-meta/server/etcdutil" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metastoragepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metastoragepb" "github.com/pkg/errors" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" diff --git a/horaemeta/server/member/watch_leader.go b/horaemeta/server/member/watch_leader.go index 8ca6ee0ca7..68408ce0aa 100644 --- a/horaemeta/server/member/watch_leader.go +++ b/horaemeta/server/member/watch_leader.go @@ -26,7 +26,7 @@ import ( "github.com/apache/incubator-horaedb-meta/pkg/assert" "github.com/apache/incubator-horaedb-meta/pkg/log" "github.com/apache/incubator-horaedb-meta/server/etcdutil" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metastoragepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metastoragepb" "go.uber.org/zap" ) diff --git a/horaemeta/server/server.go b/horaemeta/server/server.go index 4ed6e39542..70006461ac 100644 --- a/horaemeta/server/server.go +++ b/horaemeta/server/server.go @@ -40,7 +40,7 @@ import ( "github.com/apache/incubator-horaedb-meta/server/service/http" "github.com/apache/incubator-horaedb-meta/server/status" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/pkg/errors" "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" diff --git a/horaemeta/server/service/grpc/forward.go b/horaemeta/server/service/grpc/forward.go index b4507f362d..2c2e433d67 100644 --- a/horaemeta/server/service/grpc/forward.go +++ b/horaemeta/server/service/grpc/forward.go @@ -24,7 +24,7 @@ import ( "github.com/apache/incubator-horaedb-meta/pkg/log" "github.com/apache/incubator-horaedb-meta/server/service" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" diff --git a/horaemeta/server/service/grpc/service.go b/horaemeta/server/service/grpc/service.go index d4693de4b6..d5e2e58d33 100644 --- a/horaemeta/server/service/grpc/service.go +++ b/horaemeta/server/service/grpc/service.go @@ -26,6 +26,9 @@ import ( "sync" "time" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/clusterpb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/commonpb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/apache/incubator-horaedb-meta/pkg/coderr" "github.com/apache/incubator-horaedb-meta/pkg/log" "github.com/apache/incubator-horaedb-meta/server/cluster" @@ -34,9 +37,6 @@ import ( "github.com/apache/incubator-horaedb-meta/server/limiter" "github.com/apache/incubator-horaedb-meta/server/member" "github.com/apache/incubator-horaedb-meta/server/storage" - "github.com/apache/incubator-horaedb-proto/golang/pkg/clusterpb" - "github.com/apache/incubator-horaedb-proto/golang/pkg/commonpb" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -92,6 +92,7 @@ func (s *Service) NodeHeartbeat(ctx context.Context, req *metaservicepb.NodeHear Lease: req.GetInfo().Lease, Zone: req.GetInfo().Zone, NodeVersion: req.GetInfo().BinaryVersion, + NodeType: req.GetInfo().NodeType, }, LastTouchTime: uint64(time.Now().UnixMilli()), State: storage.NodeStateOnline, diff --git a/horaemeta/server/storage/storage_impl.go b/horaemeta/server/storage/storage_impl.go index e6b130e88c..de01fc2b7a 100644 --- a/horaemeta/server/storage/storage_impl.go +++ b/horaemeta/server/storage/storage_impl.go @@ -25,9 +25,9 @@ import ( "strconv" "strings" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/clusterpb" "github.com/apache/incubator-horaedb-meta/pkg/log" "github.com/apache/incubator-horaedb-meta/server/etcdutil" - "github.com/apache/incubator-horaedb-proto/golang/pkg/clusterpb" "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/clientv3util" @@ -137,7 +137,7 @@ func (s *metaStorageImpl) UpdateCluster(ctx context.Context, req UpdateClusterRe c := convertClusterToPB(req.Cluster) value, err := proto.Marshal(&c) if err != nil { - return ErrEncode.WithCausef("encode cluster,clusterID:%d, err:%v", req.Cluster.ID, err) + return ErrEncode.WithCausef("encode cluster, clusterID:%d, err:%v", req.Cluster.ID, err) } key := makeClusterKey(s.rootPath, c.Id) diff --git a/horaemeta/server/storage/types.go b/horaemeta/server/storage/types.go index 5711283a7c..bc54538f85 100644 --- a/horaemeta/server/storage/types.go +++ b/horaemeta/server/storage/types.go @@ -23,8 +23,8 @@ import ( "fmt" "time" - "github.com/apache/incubator-horaedb-proto/golang/pkg/clusterpb" - "github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/clusterpb" + "github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb" ) type ( @@ -279,6 +279,7 @@ type NodeStats struct { Lease uint32 Zone string NodeVersion string + NodeType clusterpb.NodeType } func NewEmptyNodeStats() NodeStats { @@ -549,6 +550,7 @@ func convertNodeStatsToPB(stats NodeStats) clusterpb.NodeStats { Lease: stats.Lease, Zone: stats.Zone, NodeVersion: stats.NodeVersion, + NodeType: stats.NodeType, } } @@ -557,6 +559,7 @@ func convertNodeStatsPB(stats *clusterpb.NodeStats) NodeStats { Lease: stats.Lease, Zone: stats.Zone, NodeVersion: stats.NodeVersion, + NodeType: stats.NodeType, } } diff --git a/src/analytic_engine/src/compaction/mod.rs b/src/analytic_engine/src/compaction/mod.rs index 34048d6b35..dce46d3e9c 100644 --- a/src/analytic_engine/src/compaction/mod.rs +++ b/src/analytic_engine/src/compaction/mod.rs @@ -20,15 +20,17 @@ use std::{collections::HashMap, fmt, str::FromStr, sync::Arc}; use common_types::COMPACTION_STRATEGY; +use generic_error::{BoxError, GenericError}; +use macros::define_result; use serde::{Deserialize, Serialize}; use size_ext::ReadableSize; -use snafu::{ensure, Backtrace, GenerateBacktrace, ResultExt, Snafu}; +use snafu::{ensure, Backtrace, GenerateBacktrace, OptionExt, ResultExt, Snafu}; use time_ext::TimeUnit; use tokio::sync::oneshot; use crate::{ compaction::picker::{CommonCompactionPicker, CompactionPickerRef}, - sst::file::{FileHandle, Level}, + sst::file::{FileHandle, FileMeta, FilePurgeQueue, Level}, table::data::TableDataRef, }; @@ -72,8 +74,22 @@ pub enum Error { }, #[snafu(display("Invalid compaction option value, err: {}", error))] InvalidOption { error: String, backtrace: Backtrace }, + + #[snafu(display("Empty file meta.\nBacktrace:\n{}", backtrace))] + EmptyFileMeta { backtrace: Backtrace }, + + #[snafu(display("Failed to convert file meta, err:{}", source))] + ConvertFileMeta { source: GenericError }, + + #[snafu(display("Empty purge queue.\nBacktrace:\n{}", backtrace))] + EmptyPurgeQueue { backtrace: Backtrace }, + + #[snafu(display("Failed to convert level, err:{}", source))] + ConvertLevel { source: GenericError }, } +define_result!(Error); + #[derive(Debug, Clone, Copy, Deserialize, Default, PartialEq, Serialize)] pub enum CompactionStrategy { #[default] @@ -145,7 +161,7 @@ impl CompactionStrategy { pub(crate) fn parse_from( value: &str, options: &HashMap, - ) -> Result { + ) -> Result { match value.trim().to_lowercase().as_str() { DEFAULT_STRATEGY => Ok(CompactionStrategy::Default), STC_STRATEGY => Ok(CompactionStrategy::SizeTiered( @@ -182,7 +198,7 @@ impl CompactionStrategy { } impl SizeTieredCompactionOptions { - pub(crate) fn validate(&self) -> Result<(), Error> { + pub(crate) fn validate(&self) -> Result<()> { ensure!( self.bucket_high > self.bucket_low, InvalidOption { @@ -215,7 +231,7 @@ impl SizeTieredCompactionOptions { pub(crate) fn parse_from( options: &HashMap, - ) -> Result { + ) -> Result { let mut opts = SizeTieredCompactionOptions::default(); if let Some(v) = options.get(BUCKET_LOW_KEY) { opts.bucket_low = v.parse().context(ParseFloat { @@ -278,7 +294,7 @@ impl TimeWindowCompactionOptions { ); } - pub(crate) fn validate(&self) -> Result<(), Error> { + pub(crate) fn validate(&self) -> Result<()> { if !Self::valid_timestamp_unit(self.timestamp_resolution) { return InvalidOption { error: format!( @@ -294,7 +310,7 @@ impl TimeWindowCompactionOptions { pub(crate) fn parse_from( options: &HashMap, - ) -> Result { + ) -> Result { let mut opts = TimeWindowCompactionOptions { size_tiered: SizeTieredCompactionOptions::parse_from(options)?, ..Default::default() @@ -326,6 +342,43 @@ pub struct CompactionInputFiles { pub output_level: Level, } +impl TryFrom for CompactionInputFiles { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::CompactionInputFiles) -> Result { + let level: Level = value.level.try_into().box_err().context(ConvertLevel)?; + let output_level: Level = value + .output_level + .try_into() + .box_err() + .context(ConvertLevel)?; + + let mut files: Vec = Vec::with_capacity(value.files.len()); + for file in value.files { + let meta: FileMeta = file + .meta + .context(EmptyFileMeta)? + .try_into() + .box_err() + .context(ConvertFileMeta)?; + + let purge_queue: FilePurgeQueue = file.purge_queue.context(EmptyPurgeQueue)?.into(); + + files.push({ + let handle = FileHandle::new(meta, purge_queue); + handle.set_being_compacted(file.being_compacted); + handle + }); + } + + Ok(CompactionInputFiles { + level, + files, + output_level, + }) + } +} + #[derive(Debug, Default, Clone)] pub struct ExpiredFiles { /// Level of the expired files. diff --git a/src/analytic_engine/src/compaction/runner/local_runner.rs b/src/analytic_engine/src/compaction/runner/local_runner.rs index fc34b2bfa6..e379d78544 100644 --- a/src/analytic_engine/src/compaction/runner/local_runner.rs +++ b/src/analytic_engine/src/compaction/runner/local_runner.rs @@ -45,6 +45,7 @@ use crate::{ const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64; /// Executor carrying for actual compaction work +#[derive(Clone)] pub struct LocalCompactionRunner { runtime: Arc, scan_options: ScanOptions, diff --git a/src/analytic_engine/src/compaction/runner/mod.rs b/src/analytic_engine/src/compaction/runner/mod.rs index 12f333eac3..39ef356f36 100644 --- a/src/analytic_engine/src/compaction/runner/mod.rs +++ b/src/analytic_engine/src/compaction/runner/mod.rs @@ -21,12 +21,15 @@ use std::sync::Arc; use async_trait::async_trait; use common_types::{request_id::RequestId, schema::Schema, SequenceNumber}; +use generic_error::{BoxError, GenericError}; +use macros::define_result; use object_store::Path; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; use crate::{ compaction::CompactionInputFiles, - instance::flush_compaction::Result, + instance::flush_compaction, row_iter::IterOptions, space::SpaceId, sst::{ @@ -39,12 +42,50 @@ use crate::{ /// Compaction runner #[async_trait] pub trait CompactionRunner: Send + Sync + 'static { - async fn run(&self, task: CompactionRunnerTask) -> Result; + async fn run( + &self, + task: CompactionRunnerTask, + ) -> flush_compaction::Result; } pub type CompactionRunnerPtr = Box; pub type CompactionRunnerRef = Arc; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))] + EmptyTableSchema { backtrace: Backtrace }, + + #[snafu(display("Empty input context.\nBacktrace:\n{}", backtrace))] + EmptyInputContext { backtrace: Backtrace }, + + #[snafu(display("Empty ouput context.\nBacktrace:\n{}", backtrace))] + EmptyOuputContext { backtrace: Backtrace }, + + #[snafu(display("Empty compaction input files.\nBacktrace:\n{}", backtrace))] + EmptyCompactionInputFiles { backtrace: Backtrace }, + + #[snafu(display("Empty write options.\nBacktrace:\n{}", backtrace))] + EmptySstWriteOptions { backtrace: Backtrace }, + + #[snafu(display("Failed to convert table schema, err:{}", source))] + ConvertTableSchema { source: GenericError }, + + #[snafu(display("Failed to convert input context, err:{}", source))] + ConvertInputContext { source: GenericError }, + + #[snafu(display("Failed to convert ouput context, err:{}", source))] + ConvertOuputContext { source: GenericError }, + + #[snafu(display("Failed to convert compaction input files, err:{}", source))] + ConvertCompactionInputFiles { source: GenericError }, + + #[snafu(display("Failed to convert write options, err:{}", source))] + ConvertSstWriteOptions { source: GenericError }, +} + +define_result!(Error); + /// Compaction runner task #[derive(Debug, Clone)] pub struct CompactionRunnerTask { @@ -113,6 +154,55 @@ impl CompactionRunnerTask { } } +impl TryFrom + for CompactionRunnerTask +{ + type Error = Error; + + fn try_from( + request: horaedbproto::compaction_service::ExecuteCompactionTaskRequest, + ) -> Result { + let task_key = request.task_key; + let request_id: RequestId = request.request_id.into(); + + let schema: Schema = request + .schema + .context(EmptyTableSchema)? + .try_into() + .box_err() + .context(ConvertTableSchema)?; + + let space_id: SpaceId = request.space_id; + let table_id: TableId = request.table_id.into(); + let sequence: SequenceNumber = request.sequence; + + let input_ctx: InputContext = request + .input_ctx + .context(EmptyInputContext)? + .try_into() + .box_err() + .context(ConvertInputContext)?; + + let output_ctx: OutputContext = request + .output_ctx + .context(EmptyOuputContext)? + .try_into() + .box_err() + .context(ConvertOuputContext)?; + + Ok(Self { + task_key, + request_id, + schema, + space_id, + table_id, + sequence, + input_ctx, + output_ctx, + }) + } +} + pub struct CompactionRunnerResult { pub output_file_path: Path, pub sst_info: SstInfo, @@ -128,6 +218,32 @@ pub struct InputContext { pub need_dedup: bool, } +impl TryFrom for InputContext { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::InputContext) -> Result { + let num_rows_per_row_group: usize = value.num_rows_per_row_group as usize; + let merge_iter_options = IterOptions { + batch_size: value.merge_iter_options as usize, + }; + let need_dedup = value.need_dedup; + + let files: CompactionInputFiles = value + .files + .context(EmptyCompactionInputFiles)? + .try_into() + .box_err() + .context(ConvertCompactionInputFiles)?; + + Ok(InputContext { + files, + num_rows_per_row_group, + merge_iter_options, + need_dedup, + }) + } +} + #[derive(Debug, Clone)] pub struct OutputContext { /// Output sst file path @@ -135,3 +251,22 @@ pub struct OutputContext { /// Output sst write context pub write_options: SstWriteOptions, } + +impl TryFrom for OutputContext { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::OutputContext) -> Result { + let file_path: Path = value.file_path.into(); + let write_options: SstWriteOptions = value + .write_options + .context(EmptySstWriteOptions)? + .try_into() + .box_err() + .context(ConvertSstWriteOptions)?; + + Ok(OutputContext { + file_path, + write_options, + }) + } +} diff --git a/src/analytic_engine/src/lib.rs b/src/analytic_engine/src/lib.rs index c1308d88a8..f2cc51ad15 100644 --- a/src/analytic_engine/src/lib.rs +++ b/src/analytic_engine/src/lib.rs @@ -19,7 +19,7 @@ #![feature(option_get_or_insert_default)] -mod compaction; +pub mod compaction; mod context; mod engine; pub mod error; diff --git a/src/analytic_engine/src/memtable/mod.rs b/src/analytic_engine/src/memtable/mod.rs index f53bff149f..b21c54da27 100644 --- a/src/analytic_engine/src/memtable/mod.rs +++ b/src/analytic_engine/src/memtable/mod.rs @@ -119,6 +119,7 @@ impl From for manifest::LayeredMemtableOptions { fn from(value: LayeredMemtableOptions) -> Self { Self { mutable_segment_switch_threshold: value.mutable_segment_switch_threshold.0, + disable: value.mutable_segment_switch_threshold.0 == 0, } } } diff --git a/src/analytic_engine/src/sst/factory.rs b/src/analytic_engine/src/sst/factory.rs index 2ddeb24668..51703d771e 100644 --- a/src/analytic_engine/src/sst/factory.rs +++ b/src/analytic_engine/src/sst/factory.rs @@ -21,10 +21,11 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use async_trait::async_trait; use common_types::projected_schema::RowProjectorBuilder; +use generic_error::{BoxError, GenericError}; use macros::define_result; use object_store::{ObjectStoreRef, Path}; use runtime::Runtime; -use snafu::{ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::predicate::PredicateRef; use trace_metric::MetricsCollector; @@ -50,6 +51,15 @@ use crate::{ pub enum Error { #[snafu(display("Failed to parse sst header, err:{}", source,))] ParseHeader { source: header::Error }, + + #[snafu(display("Empty storage format hint.\nBacktrace:\n{}", backtrace))] + EmptyStorageFormatHint { backtrace: Backtrace }, + + #[snafu(display("Failed to convert storage format hint, err:{}", source))] + ConvertStorageFormatHint { source: GenericError }, + + #[snafu(display("Failed to convert compression, err:{}", source))] + ConvertCompression { source: GenericError }, } define_result!(Error); @@ -164,6 +174,41 @@ pub struct SstWriteOptions { pub column_stats: HashMap, } +impl TryFrom for SstWriteOptions { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::SstWriteOptions) -> Result { + let storage_format_hint: StorageFormatHint = value + .storage_format_hint + .context(EmptyStorageFormatHint)? + .try_into() + .box_err() + .context(ConvertStorageFormatHint)?; + + let num_rows_per_row_group = value.num_rows_per_row_group as usize; + let compression: Compression = value + .compression + .try_into() + .box_err() + .context(ConvertCompression)?; + let max_buffer_size = value.max_buffer_size as usize; + + let column_stats: HashMap = value + .column_stats + .into_iter() + .map(|(k, v)| (k, ColumnStats { low_cardinality: v })) + .collect(); + + Ok(SstWriteOptions { + storage_format_hint, + num_rows_per_row_group, + compression, + max_buffer_size, + column_stats, + }) + } +} + impl From<&ColumnStats> for ColumnEncoding { fn from(value: &ColumnStats) -> Self { ColumnEncoding { diff --git a/src/analytic_engine/src/sst/file.rs b/src/analytic_engine/src/sst/file.rs index 39cdc7c7d1..33c4bcb96b 100644 --- a/src/analytic_engine/src/sst/file.rs +++ b/src/analytic_engine/src/sst/file.rs @@ -35,12 +35,13 @@ use common_types::{ SequenceNumber, }; use future_ext::{retry_async, BackoffConfig, RetryConfig}; +use generic_error::{BoxError, GenericError}; use logger::{error, info, trace, warn}; use macros::define_result; use metric_ext::Meter; use object_store::{ObjectStoreRef, Path}; use runtime::{JoinHandle, Runtime}; -use snafu::{ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; use tokio::sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -54,6 +55,18 @@ use crate::{space::SpaceId, sst::manager::FileId, table::sst_util, table_options pub enum Error { #[snafu(display("Failed to join purger, err:{}", source))] StopPurger { source: runtime::Error }, + + #[snafu(display("Empty time range.\nBacktrace:\n{}", backtrace))] + EmptyTimeRange { backtrace: Backtrace }, + + #[snafu(display("Failed to convert time range, err:{}", source))] + ConvertTimeRange { source: GenericError }, + + #[snafu(display("Failed to convert storage format, err:{}", source))] + ConvertStorageFormat { source: GenericError }, + + #[snafu(display("Converted overflow, err:{}", source))] + ConvertOverflow { source: GenericError }, } define_result!(Error); @@ -95,6 +108,15 @@ impl From for Level { } } +impl TryFrom for Level { + type Error = Error; + + fn try_from(value: u32) -> Result { + let value: u16 = value.try_into().box_err().context(ConvertOverflow)?; + Ok(value.into()) + } +} + impl fmt::Display for Level { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) @@ -460,6 +482,39 @@ impl FileMeta { } } +impl TryFrom for FileMeta { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::FileMeta) -> Result { + let time_range: TimeRange = value + .time_range + .context(EmptyTimeRange)? + .try_into() + .box_err() + .context(ConvertTimeRange)?; + + let storage_format: StorageFormat = value + .storage_format + .try_into() + .box_err() + .context(ConvertStorageFormat)?; + let mut associated_files: Vec = Vec::with_capacity(value.associated_files.len()); + for file in value.associated_files { + associated_files.push(file); + } + + Ok(FileMeta { + id: value.file_id, + size: value.size, + row_num: value.row_num, + time_range, + max_seq: value.max_seq, + storage_format, + associated_files, + }) + } +} + // Queue to store files to be deleted for a table. #[derive(Clone)] pub struct FilePurgeQueue { @@ -510,6 +565,13 @@ impl FilePurgeQueue { } } +impl From for FilePurgeQueue { + fn from(value: horaedbproto::compaction_service::FilePurgeQueue) -> Self { + let (tx, _rx) = mpsc::unbounded_channel(); + FilePurgeQueue::new(value.space_id, value.table_id.into(), tx) + } +} + struct FilePurgeQueueInner { space_id: SpaceId, table_id: TableId, diff --git a/src/analytic_engine/src/sst/writer.rs b/src/analytic_engine/src/sst/writer.rs index e424e8af48..280d279f5b 100644 --- a/src/analytic_engine/src/sst/writer.rs +++ b/src/analytic_engine/src/sst/writer.rs @@ -117,6 +117,18 @@ pub struct SstInfo { pub time_range: TimeRange, } +impl From for horaedbproto::compaction_service::SstInfo { + fn from(value: SstInfo) -> Self { + Self { + file_size: value.file_size as u64, + row_num: value.row_num as u64, + storage_format: value.storage_format.into(), + meta_path: value.meta_path, + time_range: Some(value.time_range.into()), + } + } +} + #[derive(Debug, Clone)] pub struct MetaData { /// Min key of the sst. @@ -131,6 +143,18 @@ pub struct MetaData { pub schema: Schema, } +impl From for horaedbproto::compaction_service::MetaData { + fn from(meta: MetaData) -> Self { + Self { + min_key: meta.min_key.to_vec(), + max_key: meta.max_key.to_vec(), + max_sequence: meta.max_sequence, + time_range: Some(meta.time_range.into()), + schema: Some((&meta.schema).into()), + } + } +} + /// The writer for sst. /// /// The caller provides a stream of [RecordBatch] and the writer takes diff --git a/src/analytic_engine/src/table_options.rs b/src/analytic_engine/src/table_options.rs index c5651618e2..dec42074b3 100644 --- a/src/analytic_engine/src/table_options.rs +++ b/src/analytic_engine/src/table_options.rs @@ -130,6 +130,13 @@ pub enum Error { ))] UnknownStorageFormatHint { value: String, backtrace: Backtrace }, + #[snafu(display( + "Unknown compression type. value:{:?}.\nBacktrace:\n{}", + value, + backtrace + ))] + UnknownCompressionType { value: i32, backtrace: Backtrace }, + #[snafu(display("Storage format hint is missing.\nBacktrace:\n{}", backtrace))] MissingStorageFormatHint { backtrace: Backtrace }, @@ -237,6 +244,22 @@ impl From for Compression { } } +impl TryFrom for Compression { + type Error = Error; + + fn try_from(compression: i32) -> Result { + let compression = match compression { + 0 => Compression::Uncompressed, + 1 => Compression::Lz4, + 2 => Compression::Snappy, + 4 => Compression::Zstd, + _ => return UnknownCompressionType { value: compression }.fail(), + }; + + Ok(compression) + } +} + impl From for ParquetCompression { fn from(compression: Compression) -> Self { match compression { @@ -343,6 +366,14 @@ impl From for manifest_pb::StorageFormat { } } +impl From for i32 { + fn from(value: StorageFormat) -> Self { + match value { + StorageFormat::Columnar => 0, + } + } +} + impl TryFrom for StorageFormat { type Error = Error; @@ -366,6 +397,18 @@ impl TryFrom<&str> for StorageFormat { } } +impl TryFrom for StorageFormat { + type Error = Error; + + fn try_from(value: i32) -> Result { + let format = match value { + 0 => Self::Columnar, + _ => return UnknownStorageFormatType { value }.fail(), + }; + Ok(format) + } +} + impl ToString for StorageFormat { fn to_string(&self) -> String { match self { diff --git a/src/cluster/Cargo.toml b/src/cluster/Cargo.toml index e48fd847c1..bd7025ea3e 100644 --- a/src/cluster/Cargo.toml +++ b/src/cluster/Cargo.toml @@ -35,6 +35,7 @@ async-trait = { workspace = true } bytes_ext = { workspace = true } catalog = { workspace = true } common_types = { workspace = true } +compaction_client = { workspace = true } etcd-client = { workspace = true } future_ext = { workspace = true } generic_error = { workspace = true } diff --git a/src/cluster/src/cluster_impl.rs b/src/cluster/src/cluster_impl.rs index aee54e42b3..2700e7f3ae 100644 --- a/src/cluster/src/cluster_impl.rs +++ b/src/cluster/src/cluster_impl.rs @@ -22,6 +22,11 @@ use std::{ use async_trait::async_trait; use common_types::table::ShardId; +use compaction_client::{ + compaction_impl::{build_compaction_client, CompactionClientConfig}, + types::{ExecuteCompactionTaskRequest, ExecuteCompactionTaskResponse}, + CompactionClientRef, +}; use etcd_client::{Certificate, ConnectOptions, Identity, TlsOptions}; use generic_error::BoxError; use logger::{error, info, warn}; @@ -45,9 +50,10 @@ use crate::{ shard_lock_manager::{self, ShardLockManager, ShardLockManagerRef}, shard_set::{Shard, ShardRef, ShardSet}, topology::ClusterTopology, - Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause, - InitEtcdClientConfig, InvalidArguments, MetaClientFailure, OpenShard, OpenShardWithCause, - Result, ShardNotFound, TableStatus, + Cluster, ClusterNodesNotFound, ClusterNodesResp, CompactionClientFailure, + CompactionOffloadNotAllowed, EtcdClientFailureWithCause, InitEtcdClientConfig, + InvalidArguments, MetaClientFailure, NodeType, OpenShard, OpenShardWithCause, Result, + ShardNotFound, TableStatus, }; /// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`]. @@ -341,10 +347,48 @@ impl Inner { shards.iter().map(|shard| shard.shard_info()).collect() } + + /// Get proper remote compaction node for compaction offload with meta + /// client. + async fn get_compaction_node(&self) -> Result { + unimplemented!() + } + + /// Return a new compaction client. + async fn compaction_client(&self) -> CompactionClientRef { + // TODO(leslie): impl better error handling with snafu. + let config = self + .get_compaction_node() + .await + .expect("fail to get remote compaction node"); + + build_compaction_client(config) + .await + .expect("fail to build compaction client") + } + + async fn compact( + &self, + req: &ExecuteCompactionTaskRequest, + ) -> Result { + // TODO(leslie): Execute the compaction task locally when fails to build + // compaction client. + let compact_resp = self + .compaction_client() + .await + .execute_compaction_task(req.clone()) + .await + .context(CompactionClientFailure)?; + + Ok(compact_resp) + } } #[async_trait] impl Cluster for ClusterImpl { + /// Type of the server in cluster mode. + type NodeType = NodeType; + async fn start(&self) -> Result<()> { info!("Cluster is starting with config:{:?}", self.config); @@ -376,6 +420,10 @@ impl Cluster for ClusterImpl { Ok(()) } + fn node_type(&self) -> NodeType { + self.config.node_type.clone() + } + async fn open_shard(&self, shard_info: &ShardInfo) -> Result { self.inner.open_shard(shard_info).await } @@ -409,6 +457,19 @@ impl Cluster for ClusterImpl { fn shard_lock_manager(&self) -> ShardLockManagerRef { self.shard_lock_manager.clone() } + + async fn compact( + &self, + req: &ExecuteCompactionTaskRequest, + ) -> Result { + ensure!( + self.node_type() == NodeType::HoraeDB, + CompactionOffloadNotAllowed { + node_type: self.node_type() + } + ); + self.inner.compact(req).await + } } /// Build the connect options for accessing etcd cluster. diff --git a/src/cluster/src/config.rs b/src/cluster/src/config.rs index 29e0da9719..d0b1c694b9 100644 --- a/src/cluster/src/config.rs +++ b/src/cluster/src/config.rs @@ -23,6 +23,8 @@ use serde::{Deserialize, Serialize}; use table_engine::ANALYTIC_ENGINE_TYPE; use time_ext::ReadableDuration; +use crate::NodeType; + #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(default)] // TODO: move this to table_engine crates @@ -133,6 +135,7 @@ impl Default for TlsConfig { #[serde(default)] pub struct ClusterConfig { pub cmd_channel_buffer_size: usize, + pub node_type: NodeType, pub meta_client: MetaClientConfig, pub etcd_client: EtcdClientConfig, } diff --git a/src/cluster/src/lib.rs b/src/cluster/src/lib.rs index a97c945a0b..978b075696 100644 --- a/src/cluster/src/lib.rs +++ b/src/cluster/src/lib.rs @@ -28,7 +28,8 @@ use std::sync::Arc; use async_trait::async_trait; -use common_types::schema::SchemaName; +use common_types::{cluster::NodeType, schema::SchemaName}; +use compaction_client::types::{ExecuteCompactionTaskRequest, ExecuteCompactionTaskResponse}; use generic_error::GenericError; use macros::define_result; use meta_client::types::{ @@ -67,6 +68,9 @@ pub enum Error { #[snafu(display("Meta client execute failed, err:{source}."))] MetaClientFailure { source: meta_client::Error }, + #[snafu(display("Compaction client execute failed, err:{source}."))] + CompactionClientFailure { source: compaction_client::Error }, + #[snafu(display("Failed to init etcd client config, err:{source}.\nBacktrace:\n{backtrace}"))] InitEtcdClientConfig { source: std::io::Error, @@ -161,6 +165,14 @@ pub enum Error { "Cluster nodes are not found in the topology, version:{version}.\nBacktrace:\n{backtrace}", ))] ClusterNodesNotFound { version: u64, backtrace: Backtrace }, + + #[snafu(display( + "Not allowed to execute compaction offload in node_type:{node_type:?}.\nBacktrace:\n{backtrace:?}" + ))] + CompactionOffloadNotAllowed { + node_type: NodeType, + backtrace: Backtrace, + }, } define_result!(Error); @@ -182,7 +194,7 @@ impl From for TableStatus { } } -pub type ClusterRef = Arc; +pub type ClusterRef = Arc + Send + Sync>; #[derive(Clone, Debug)] pub struct ClusterNodesResp { @@ -190,12 +202,19 @@ pub struct ClusterNodesResp { pub cluster_nodes: ClusterNodesRef, } -/// Cluster manages tables and shard infos in cluster mode. +/// Cluster has the following functions: +/// + Manages tables and shard infos in cluster mode. +/// + (Optional) Executes compaction task remotely. #[async_trait] pub trait Cluster { + type NodeType: Send + Sync; + async fn start(&self) -> Result<()>; async fn stop(&self) -> Result<()>; + /// Get cluster type. + fn node_type(&self) -> NodeType; + /// Fetch related information and open shard. async fn open_shard(&self, shard_info: &ShardInfo) -> Result; @@ -218,4 +237,10 @@ pub trait Cluster { async fn route_tables(&self, req: &RouteTablesRequest) -> Result; async fn fetch_nodes(&self) -> Result; fn shard_lock_manager(&self) -> ShardLockManagerRef; + + /// Execute compaction task in remote compaction node. + async fn compact( + &self, + req: &ExecuteCompactionTaskRequest, + ) -> Result; } diff --git a/src/common_types/src/cluster.rs b/src/common_types/src/cluster.rs new file mode 100644 index 0000000000..ad302023e9 --- /dev/null +++ b/src/common_types/src/cluster.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Deserialize, Serialize}; + +/// Type to distinguish different node type in cluster mode. +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum NodeType { + #[default] + HoraeDB, + CompactionServer, +} diff --git a/src/common_types/src/lib.rs b/src/common_types/src/lib.rs index a92f48d3f2..449a800e30 100644 --- a/src/common_types/src/lib.rs +++ b/src/common_types/src/lib.rs @@ -18,6 +18,7 @@ //! Contains common types pub mod bitset; +pub mod cluster; pub mod column; pub mod column_block; pub mod column_schema; diff --git a/src/compaction_client/Cargo.toml b/src/compaction_client/Cargo.toml new file mode 100644 index 0000000000..36374130ff --- /dev/null +++ b/src/compaction_client/Cargo.toml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "compaction_client" + +[package.license] +workspace = true + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] +async-trait = { workspace = true } +common_types = { workspace = true } +futures = { workspace = true } +generic_error = { workspace = true } +horaedbproto = { workspace = true } +logger = { workspace = true } +macros = { workspace = true } +prost = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +snafu = { workspace = true } +time_ext = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true } +url = "2.2" diff --git a/src/compaction_client/src/compaction_impl.rs b/src/compaction_client/src/compaction_impl.rs new file mode 100644 index 0000000000..1e14b4df91 --- /dev/null +++ b/src/compaction_client/src/compaction_impl.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use generic_error::BoxError; +use horaedbproto::{ + common::ResponseHeader, + compaction_service::{self, compaction_service_client::CompactionServiceClient}, +}; +use logger::info; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use time_ext::ReadableDuration; + +use crate::{ + BadResponse, CompactionClient, CompactionClientRef, ExecuteCompactionTaskRequest, + ExecuteCompactionTaskResponse, FailConnect, FailExecuteCompactionTask, MissingHeader, Result, +}; + +type CompactionServiceGrpcClient = CompactionServiceClient; + +#[derive(Debug, Deserialize, Clone, Serialize)] +#[serde(default)] +pub struct CompactionClientConfig { + pub cluster_name: String, + pub compaction_server_addr: String, + pub timeout: ReadableDuration, +} + +impl Default for CompactionClientConfig { + fn default() -> Self { + unimplemented!() + } +} + +/// Default compaction client impl, will interact with the remote compaction +/// node. +pub struct CompactionClientImpl { + client: CompactionServiceGrpcClient, +} + +impl CompactionClientImpl { + pub async fn connect(config: CompactionClientConfig) -> Result { + let client = { + let endpoint = + tonic::transport::Endpoint::from_shared(config.compaction_server_addr.to_string()) + .box_err() + .context(FailConnect { + addr: &config.compaction_server_addr, + })? + .timeout(config.timeout.0); + CompactionServiceGrpcClient::connect(endpoint) + .await + .box_err() + .context(FailConnect { + addr: &config.compaction_server_addr, + })? + }; + + Ok(Self { client }) + } + + #[inline] + fn client(&self) -> CompactionServiceGrpcClient { + self.client.clone() + } +} + +#[async_trait] +impl CompactionClient for CompactionClientImpl { + async fn execute_compaction_task( + &self, + req: ExecuteCompactionTaskRequest, + ) -> Result { + let pb_req = compaction_service::ExecuteCompactionTaskRequest::from(req); + + // TODO(leslie): Add request header for ExecuteCompactionTaskRequest. + + info!( + "Compaction client try to execute compaction task in remote compaction node, req:{:?}", + pb_req + ); + + let pb_resp = self + .client() + .execute_compaction_task(pb_req) + .await + .box_err() + .context(FailExecuteCompactionTask)? + .into_inner(); + + info!( + "Compaction client finish executing compaction task in remote compaction node, req:{:?}", + pb_resp + ); + + check_response_header(&pb_resp.header)?; + ExecuteCompactionTaskResponse::try_from(pb_resp) + } +} + +// TODO(leslie): Consider to refactor and reuse the similar function in +// meta_client. +fn check_response_header(header: &Option) -> Result<()> { + let header = header.as_ref().context(MissingHeader)?; + if header.code == 0 { + Ok(()) + } else { + BadResponse { + code: header.code, + msg: header.error.clone(), + } + .fail() + } +} + +pub async fn build_compaction_client( + config: CompactionClientConfig, +) -> Result { + let compaction_client = CompactionClientImpl::connect(config).await?; + Ok(Arc::new(compaction_client)) +} diff --git a/src/compaction_client/src/lib.rs b/src/compaction_client/src/lib.rs new file mode 100644 index 0000000000..c8ec47f246 --- /dev/null +++ b/src/compaction_client/src/lib.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use generic_error::GenericError; +use macros::define_result; +use snafu::{Backtrace, Snafu}; +use types::{ExecuteCompactionTaskRequest, ExecuteCompactionTaskResponse}; + +pub mod compaction_impl; +pub mod types; + +#[derive(Debug, Snafu)] +#[snafu(visibility = "pub")] +pub enum Error { + #[snafu(display( + "Failed to connect the service endpoint:{}, err:{}\nBacktrace:\n{}", + addr, + source, + backtrace + ))] + FailConnect { + addr: String, + source: GenericError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to execute compaction task, err:{}", source))] + FailExecuteCompactionTask { source: GenericError }, + + #[snafu(display("Missing header in rpc response.\nBacktrace:\n{}", backtrace))] + MissingHeader { backtrace: Backtrace }, + + #[snafu(display( + "Bad response, resp code:{}, msg:{}.\nBacktrace:\n{}", + code, + msg, + backtrace + ))] + BadResponse { + code: u32, + msg: String, + backtrace: Backtrace, + }, +} + +define_result!(Error); + +/// CompactionClient is the abstraction of client used for HoraeDB to +/// communicate with CompactionServer cluster. +#[async_trait] +pub trait CompactionClient: Send + Sync { + async fn execute_compaction_task( + &self, + req: ExecuteCompactionTaskRequest, + ) -> Result; +} + +pub type CompactionClientRef = Arc; diff --git a/src/compaction_client/src/types.rs b/src/compaction_client/src/types.rs new file mode 100644 index 0000000000..ca72149677 --- /dev/null +++ b/src/compaction_client/src/types.rs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use horaedbproto::compaction_service; +use macros::define_result; + +use crate::Error; + +define_result!(Error); + +#[derive(Debug, Clone)] +pub struct ExecuteCompactionTaskRequest {} + +impl From for compaction_service::ExecuteCompactionTaskRequest { + fn from(_value: ExecuteCompactionTaskRequest) -> Self { + unimplemented!() + } +} + +#[derive(Debug, Clone)] +pub struct ExecuteCompactionTaskResponse {} + +impl TryFrom for ExecuteCompactionTaskResponse { + type Error = Error; + + fn try_from(_value: compaction_service::ExecuteCompactionTaskResponse) -> Result { + unimplemented!() + } +} diff --git a/src/horaedb/Cargo.toml b/src/horaedb/Cargo.toml index 2abfa49e17..943de17bbe 100644 --- a/src/horaedb/Cargo.toml +++ b/src/horaedb/Cargo.toml @@ -38,32 +38,34 @@ wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"] wal-local-storage = ["wal/wal-local-storage", "analytic_engine/wal-local-storage"] [dependencies] -analytic_engine = { workspace = true } -catalog = { workspace = true } -catalog_impls = { workspace = true } -clap = { workspace = true } -cluster = { workspace = true } -datafusion = { workspace = true } -df_operator = { workspace = true } -etcd-client = { workspace = true } -interpreters = { workspace = true } -logger = { workspace = true } -meta_client = { workspace = true } -moka = { version = "0.10", features = ["future"] } -panic_ext = { workspace = true } -proxy = { workspace = true } -query_engine = { workspace = true } -router = { workspace = true } -runtime = { workspace = true } -serde = { workspace = true } -server = { workspace = true } -signal-hook = "0.3" -size_ext = { workspace = true } -table_engine = { workspace = true } -toml = { workspace = true } -toml_ext = { workspace = true } -tracing_util = { workspace = true } -wal = { workspace = true } +analytic_engine = { workspace = true } +catalog = { workspace = true } +catalog_impls = { workspace = true } +clap = { workspace = true } +cluster = { workspace = true } +common_types = { workspace = true } +compaction_client = { workspace = true } +datafusion = { workspace = true } +df_operator = { workspace = true } +etcd-client = { workspace = true } +interpreters = { workspace = true } +logger = { workspace = true } +meta_client = { workspace = true } +moka = { version = "0.10", features = ["future"] } +panic_ext = { workspace = true } +proxy = { workspace = true } +query_engine = { workspace = true } +router = { workspace = true } +runtime = { workspace = true } +serde = { workspace = true } +server = { workspace = true } +signal-hook = "0.3" +size_ext = { workspace = true } +table_engine = { workspace = true } +toml = { workspace = true } +toml_ext = { workspace = true } +tracing_util = { workspace = true } +wal = { workspace = true } [build-dependencies] vergen = { version = "8", default-features = false, features = [ diff --git a/src/horaedb/src/config.rs b/src/horaedb/src/config.rs index b9f8932f19..e7f19233f0 100644 --- a/src/horaedb/src/config.rs +++ b/src/horaedb/src/config.rs @@ -26,8 +26,8 @@ use size_ext::ReadableSize; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(default)] pub struct NodeInfo { - /// The address of the horaedb node. It can be a domain name or an IP - /// address without port followed. + /// The address of the horaedb (or compaction server) node. It can be a + /// domain name or an IP address without port followed. pub addr: String, pub zone: String, pub idc: String, diff --git a/src/horaedb/src/setup.rs b/src/horaedb/src/setup.rs index 9bdb46daf9..ae2ff59624 100644 --- a/src/horaedb/src/setup.rs +++ b/src/horaedb/src/setup.rs @@ -26,6 +26,7 @@ use analytic_engine::{ use catalog::{manager::ManagerRef, schema::OpenOptions, table_operator::TableOperator}; use catalog_impls::{table_based::TableBasedManager, volatile, CatalogManagerImpl}; use cluster::{cluster_impl::ClusterImpl, config::ClusterConfig, shard_set::ShardSet}; +use common_types::cluster::NodeType; use datafusion::execution::runtime_env::RuntimeConfig as DfRuntimeConfig; use df_operator::registry::{FunctionRegistry, FunctionRegistryImpl}; use interpreters::table_manipulator::{catalog_based, meta_based}; @@ -313,6 +314,7 @@ async fn build_with_meta( zone: config.node.zone.clone(), idc: config.node.idc.clone(), binary_version: config.node.binary_version.clone(), + node_type: cluster_config.node_type.clone(), }; info!("Build horaedb with node meta info:{node_meta_info:?}"); @@ -350,7 +352,10 @@ async fn build_with_meta( engine_runtimes: runtimes.clone(), opened_wals: opened_wals.clone(), }; - let TableEngineContext { table_engine, .. } = engine_builder + let TableEngineContext { + table_engine, + local_compaction_runner, + } = engine_builder .build() .await .expect("Failed to setup analytic engine"); @@ -368,14 +373,20 @@ async fn build_with_meta( let table_manipulator = Arc::new(meta_based::TableManipulatorImpl::new(meta_client)); let schema_config_provider = Arc::new(ClusterBasedProvider::new(cluster.clone())); - builder + + let mut builder = builder .table_engine(engine_proxy) .catalog_manager(catalog_manager) .table_manipulator(table_manipulator) .cluster(cluster) .opened_wals(opened_wals) .router(router) - .schema_config_provider(schema_config_provider) + .schema_config_provider(schema_config_provider); + if let NodeType::CompactionServer = cluster_config.node_type { + builder = + builder.compaction_runner(local_compaction_runner.expect("Empty compaction runner.")); + } + builder } async fn build_without_meta( diff --git a/src/meta_client/src/types.rs b/src/meta_client/src/types.rs index 6a6aba6918..e832cfda62 100644 --- a/src/meta_client/src/types.rs +++ b/src/meta_client/src/types.rs @@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt, sync::Arc}; pub use common_types::table::{ShardId, ShardVersion}; use common_types::{ + cluster::NodeType, schema::{SchemaId, SchemaName}, table::{TableId, TableName}, }; @@ -163,6 +164,7 @@ pub struct NodeMetaInfo { pub zone: String, pub idc: String, pub binary_version: String, + pub node_type: NodeType, } impl NodeMetaInfo { @@ -281,6 +283,11 @@ impl From for meta_service_pb::NodeInfo { binary_version: node_info.node_meta_info.binary_version, shard_infos, lease: 0, + node_type: if node_info.node_meta_info.node_type == NodeType::HoraeDB { + cluster_pb::NodeType::HoraeDb + } else { + cluster_pb::NodeType::CompactionServer + } as i32, } } } diff --git a/src/router/Cargo.toml b/src/router/Cargo.toml index 7f3d8c23ac..7183c270e5 100644 --- a/src/router/Cargo.toml +++ b/src/router/Cargo.toml @@ -34,6 +34,7 @@ workspace = true async-trait = { workspace = true } cluster = { workspace = true } common_types = { workspace = true } +compaction_client = { workspace = true } generic_error = { workspace = true } horaedbproto = { workspace = true } logger = { workspace = true } diff --git a/src/router/src/cluster_based.rs b/src/router/src/cluster_based.rs index d929104407..897369bc71 100644 --- a/src/router/src/cluster_based.rs +++ b/src/router/src/cluster_based.rs @@ -205,7 +205,8 @@ mod tests { shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster, ClusterNodesResp, TableStatus, }; - use common_types::table::ShardId; + use common_types::{cluster::NodeType, table::ShardId}; + use compaction_client::types::{ExecuteCompactionTaskRequest, ExecuteCompactionTaskResponse}; use horaedbproto::storage::{RequestContext, RouteRequest as RouteRequestPb}; use meta_client::types::{ NodeShard, RouteEntry, RouteTablesResponse, ShardInfo, ShardRole::Leader, TableInfo, @@ -218,6 +219,8 @@ mod tests { #[async_trait] impl Cluster for MockClusterImpl { + type NodeType = NodeType; + async fn start(&self) -> cluster::Result<()> { unimplemented!(); } @@ -226,6 +229,10 @@ mod tests { unimplemented!(); } + fn node_type(&self) -> NodeType { + unimplemented!() + } + async fn open_shard(&self, _: &ShardInfo) -> cluster::Result { unimplemented!(); } @@ -288,6 +295,13 @@ mod tests { fn shard_lock_manager(&self) -> ShardLockManagerRef { unimplemented!(); } + + async fn compact( + &self, + _req: &ExecuteCompactionTaskRequest, + ) -> cluster::Result { + unimplemented!() + } } #[tokio::test] diff --git a/src/server/src/grpc/compaction_service/error.rs b/src/server/src/grpc/compaction_service/error.rs new file mode 100644 index 0000000000..eadb3f2418 --- /dev/null +++ b/src/server/src/grpc/compaction_service/error.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Error definitions for compaction service. + +use generic_error::GenericError; +use horaedbproto::common::ResponseHeader; +use macros::define_result; +use snafu::Snafu; + +use crate::error_util; + +define_result!(Error); + +#[derive(Snafu, Debug)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Server error, code:{:?}, message:{}", code, msg))] + ErrNoCause { code: StatusCode, msg: String }, + + #[snafu(display("Server error, code:{:?}, message:{}, cause:{}", code, msg, source))] + ErrWithCause { + code: StatusCode, + msg: String, + source: GenericError, + }, +} + +impl Error { + pub fn code(&self) -> StatusCode { + match *self { + Error::ErrNoCause { code, .. } => code, + Error::ErrWithCause { code, .. } => code, + } + } + + /// Get the error message returned to the user. + pub fn error_message(&self) -> String { + match self { + Error::ErrNoCause { msg, .. } => msg.clone(), + + Error::ErrWithCause { msg, source, .. } => { + let err_string = source.to_string(); + let first_line = error_util::remove_backtrace_from_err(&err_string); + format!("{msg}. Caused by: {first_line}") + } + } + } +} + +/// A set of codes for compaction service. +/// +/// Note that such a set of codes is different with the codes (alias to http +/// status code) used by storage service. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum StatusCode { + #[default] + Ok = 0, + BadRequest = 401, + Internal = 500, +} + +impl StatusCode { + #[inline] + pub fn as_u32(self) -> u32 { + self as u32 + } +} + +pub fn build_err_header(err: Error) -> ResponseHeader { + ResponseHeader { + code: err.code().as_u32(), + error: err.error_message(), + } +} + +pub fn build_ok_header() -> ResponseHeader { + ResponseHeader { + code: StatusCode::Ok.as_u32(), + ..Default::default() + } +} diff --git a/src/server/src/grpc/compaction_service/mod.rs b/src/server/src/grpc/compaction_service/mod.rs new file mode 100644 index 0000000000..c2635d585c --- /dev/null +++ b/src/server/src/grpc/compaction_service/mod.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Compaction rpc service implementation. + +use std::sync::Arc; + +use analytic_engine::compaction::runner::{CompactionRunnerRef, CompactionRunnerTask}; +use async_trait::async_trait; +use cluster::ClusterRef; +use error::{build_err_header, build_ok_header, ErrWithCause, StatusCode}; +use generic_error::BoxError; +use horaedbproto::compaction_service::{ + compaction_service_server::CompactionService, ExecResult, ExecuteCompactionTaskRequest, + ExecuteCompactionTaskResponse, +}; +use proxy::instance::InstanceRef; +use runtime::Runtime; +use snafu::ResultExt; +use tonic::{Request, Response, Status}; + +mod error; + +/// Builder for [CompactionServiceImpl] +pub struct Builder { + pub cluster: ClusterRef, + pub instance: InstanceRef, + pub runtime: Arc, + pub compaction_runner: CompactionRunnerRef, +} + +impl Builder { + pub fn build(self) -> CompactionServiceImpl { + let Self { + cluster, + instance, + runtime, + compaction_runner, + } = self; + + CompactionServiceImpl { + cluster, + instance, + runtime, + compaction_runner, + } + } +} + +#[derive(Clone)] +pub struct CompactionServiceImpl { + pub cluster: ClusterRef, + pub instance: InstanceRef, + pub runtime: Arc, + pub compaction_runner: CompactionRunnerRef, +} + +#[async_trait] +impl CompactionService for CompactionServiceImpl { + async fn execute_compaction_task( + &self, + request: Request, + ) -> Result, Status> { + let request: Result = request + .into_inner() + .try_into() + .box_err() + .context(ErrWithCause { + code: StatusCode::BadRequest, + msg: "fail to convert the execute compaction task request", + }); + + let mut resp: ExecuteCompactionTaskResponse = ExecuteCompactionTaskResponse::default(); + match request { + Ok(task) => { + let request_id = task.request_id.clone(); + let res = self + .compaction_runner + .run(task) + .await + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to compact task, request:{request_id}"), + }); + + match res { + Ok(res) => { + resp.header = Some(build_ok_header()); + resp.result = Some(ExecResult { + output_file_path: res.output_file_path.into(), + sst_info: Some(res.sst_info.into()), + sst_meta: Some(res.sst_meta.into()), + }); + // TODO(leslie): Add status. + } + Err(e) => { + resp.header = Some(build_err_header(e)); + } + } + } + Err(e) => { + resp.header = Some(build_err_header(e)); + } + } + + Ok(Response::new(resp)) + } +} diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs index 7b02a3a2a2..dcee780125 100644 --- a/src/server/src/grpc/mod.rs +++ b/src/server/src/grpc/mod.rs @@ -24,11 +24,14 @@ use std::{ time::Duration, }; +use analytic_engine::compaction::runner::CompactionRunnerRef; use cluster::ClusterRef; -use common_types::column_schema; +use common_types::{cluster::NodeType, column_schema}; +use compaction_service::CompactionServiceImpl; use futures::FutureExt; use generic_error::GenericError; use horaedbproto::{ + compaction_service::compaction_service_server::CompactionServiceServer, meta_event::meta_event_service_server::MetaEventServiceServer, remote_engine::remote_engine_service_server::RemoteEngineServiceServer, storage::storage_service_server::StorageServiceServer, @@ -60,6 +63,7 @@ use crate::{ }, }; +mod compaction_service; mod meta_event_service; mod metrics; mod remote_engine_service; @@ -105,6 +109,9 @@ pub enum Error { #[snafu(display("Missing wals.\nBacktrace:\n{}", backtrace))] MissingWals { backtrace: Backtrace }, + #[snafu(display("Missing compaction runner.\nBacktrace:\n{}", backtrace))] + MissingCompactionRunner { backtrace: Backtrace }, + #[snafu(display("Missing timeout.\nBacktrace:\n{}", backtrace))] MissingTimeout { backtrace: Backtrace }, @@ -163,6 +170,7 @@ define_result!(Error); pub struct RpcServices { serve_addr: SocketAddr, rpc_server: InterceptedService, AuthWithFile>, + compaction_rpc_server: Option>, meta_rpc_server: Option>, remote_engine_server: RemoteEngineServiceServer, runtime: Arc, @@ -173,6 +181,7 @@ pub struct RpcServices { impl RpcServices { pub async fn start(&mut self) -> Result<()> { let rpc_server = self.rpc_server.clone(); + let compaction_rpc_server = self.compaction_rpc_server.clone(); let meta_rpc_server = self.meta_rpc_server.clone(); let remote_engine_server = self.remote_engine_server.clone(); let serve_addr = self.serve_addr; @@ -182,6 +191,11 @@ impl RpcServices { let mut router = Server::builder().add_service(rpc_server); + if let Some(s) = compaction_rpc_server { + info!("Grpc server serves compaction service"); + router = router.add_service(s); + }; + if let Some(s) = meta_rpc_server { info!("Grpc server serves meta rpc service"); router = router.add_service(s); @@ -226,6 +240,7 @@ pub struct Builder { proxy: Option>, query_dedup_config: Option, hotspot_recorder: Option>, + compaction_runner: Option, } impl Builder { @@ -241,6 +256,7 @@ impl Builder { proxy: None, query_dedup_config: None, hotspot_recorder: None, + compaction_runner: None, } } @@ -294,6 +310,12 @@ impl Builder { self.query_dedup_config = Some(config); self } + + // Compaction runner is an optional field for building [RpcServices]. + pub fn compaction_runner(mut self, runner: Option) -> Self { + self.compaction_runner = runner; + self + } } impl Builder { @@ -301,19 +323,46 @@ impl Builder { let auth = self.auth.context(MissingAuth)?; let runtimes = self.runtimes.context(MissingRuntimes)?; let instance = self.instance.context(MissingInstance)?; - let opened_wals = self.opened_wals.context(MissingWals)?; let proxy = self.proxy.context(MissingProxy)?; let hotspot_recorder = self.hotspot_recorder.context(MissingHotspotRecorder)?; - - let meta_rpc_server = self.cluster.map(|v| { - let builder = meta_event_service::Builder { - cluster: v, - instance: instance.clone(), - runtime: runtimes.meta_runtime.clone(), - opened_wals, - }; - MetaEventServiceServer::new(builder.build()) - }); + let mut meta_rpc_server: Option> = None; + let mut compaction_rpc_server: Option> = + None; + + self.cluster + .map(|v| { + let result: Result<()> = (|| { + match v.node_type() { + NodeType::HoraeDB => { + // Support meta rpc service. + let opened_wals = self.opened_wals.context(MissingWals)?; + let builder = meta_event_service::Builder { + cluster: v, + instance: instance.clone(), + runtime: runtimes.meta_runtime.clone(), + opened_wals, + }; + meta_rpc_server = Some(MetaEventServiceServer::new(builder.build())); + } + NodeType::CompactionServer => { + // Support remote rpc service. + let compaction_runner = + self.compaction_runner.context(MissingCompactionRunner)?; + let builder = compaction_service::Builder { + cluster: v, + instance: instance.clone(), + runtime: runtimes.compact_runtime.clone(), + compaction_runner, + }; + compaction_rpc_server = + Some(CompactionServiceServer::new(builder.build())); + } + } + Ok(()) + })(); + result + }) + .transpose()?; let remote_engine_server = { let query_dedup = self @@ -349,6 +398,7 @@ impl Builder { Ok(RpcServices { serve_addr, rpc_server, + compaction_rpc_server, meta_rpc_server, remote_engine_server, runtime, diff --git a/src/server/src/server.rs b/src/server/src/server.rs index f7cd72ec7b..bca6c8d151 100644 --- a/src/server/src/server.rs +++ b/src/server/src/server.rs @@ -19,6 +19,7 @@ use std::sync::Arc; +use analytic_engine::compaction::runner::CompactionRunnerRef; use catalog::manager::ManagerRef; use cluster::ClusterRef; use datafusion::execution::{runtime_env::RuntimeConfig, FunctionRegistry}; @@ -251,6 +252,7 @@ pub struct Builder { opened_wals: Option, remote_engine: Option, datatfusion_context: Option, + compaction_runner: Option, } impl Builder { @@ -274,6 +276,7 @@ impl Builder { opened_wals: None, remote_engine: None, datatfusion_context: None, + compaction_runner: None, } } @@ -368,6 +371,11 @@ impl Builder { self } + pub fn compaction_runner(mut self, runner: CompactionRunnerRef) -> Self { + self.compaction_runner = Some(runner); + self + } + /// Build and run the server pub fn build(self) -> Result { // Build instance @@ -527,6 +535,7 @@ impl Builder { .proxy(proxy) .hotspot_recorder(hotspot_recorder) .query_dedup(self.server_config.query_dedup) + .compaction_runner(self.compaction_runner.clone()) .build() .context(BuildGrpcService)?; diff --git a/src/table_engine/src/predicate.rs b/src/table_engine/src/predicate.rs index b316b99e24..3a3294fcd9 100644 --- a/src/table_engine/src/predicate.rs +++ b/src/table_engine/src/predicate.rs @@ -112,7 +112,7 @@ impl Predicate { impl TryFrom<&Predicate> for horaedbproto::remote_engine::Predicate { type Error = Error; - fn try_from(predicate: &Predicate) -> std::result::Result { + fn try_from(predicate: &Predicate) -> Result { let time_range = predicate.time_range; let mut exprs = Vec::with_capacity(predicate.exprs.len()); for expr in &predicate.exprs { @@ -135,9 +135,7 @@ impl TryFrom<&Predicate> for horaedbproto::remote_engine::Predicate { impl TryFrom for Predicate { type Error = Error; - fn try_from( - pb: horaedbproto::remote_engine::Predicate, - ) -> std::result::Result { + fn try_from(pb: horaedbproto::remote_engine::Predicate) -> Result { let time_range = pb.time_range.context(EmptyTimeRange)?; let mut exprs = Vec::with_capacity(pb.exprs.len()); for pb_expr in pb.exprs {