Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable horaemeta to monitor compaction nodes. #1555

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c1729f4
feat: add compaction server supporting remote compaction service
LeslieKid Jul 18, 2024
41f166b
fix style.
LeslieKid Jul 21, 2024
51666ca
fix style.
LeslieKid Jul 22, 2024
3d007e1
define error for compaction service.
LeslieKid Jul 23, 2024
453b22c
enable conversation from request to task.
LeslieKid Jul 23, 2024
b549119
update remote compact execution.
LeslieKid Jul 23, 2024
ec60fdc
enable conversation from task result to response.
LeslieKid Jul 25, 2024
d3b4db2
introduce CompactionCluster for compaction server in distribute mode.
LeslieKid Aug 5, 2024
798dd41
enable compaction cluster deployment.
LeslieKid Aug 6, 2024
cd7c9ae
refactor: replace CompactionCluster with ClusterType.
LeslieKid Aug 7, 2024
818d61f
remove compaction cluster,
LeslieKid Aug 7, 2024
31a306c
fix style and comment.
LeslieKid Aug 7, 2024
29894f8
provide cluster type for communication between horaedb/cs (as client)…
LeslieKid Aug 7, 2024
12f135a
introduce compaction client for horaedb to access remote compaction n…
LeslieKid Aug 12, 2024
a999f41
impl compact in Cluster trait.
LeslieKid Aug 14, 2024
165c18d
fix style and add comment.
LeslieKid Aug 14, 2024
f306c14
impl type conversation.
LeslieKid Aug 16, 2024
c01ef00
remove dead code.
LeslieKid Aug 16, 2024
e170f65
Merge branch 'main' into remote-compaction-service
LeslieKid Aug 16, 2024
f2c0e38
remove cluster type in meta client.
LeslieKid Aug 16, 2024
421d2c0
Merge branch 'main' into remote-compaction-service
LeslieKid Aug 16, 2024
c6521a4
fix bug
LeslieKid Aug 16, 2024
723a533
update Cargo.lock
LeslieKid Aug 16, 2024
6eabac1
update dependencies.
LeslieKid Aug 18, 2024
9795cc0
update dependencies.
LeslieKid Aug 18, 2024
faeec3e
enable horaemeta to register compaction nodes.
LeslieKid Aug 20, 2024
3da8fe5
fix style.
LeslieKid Aug 20, 2024
debabab
rename ClusterType to NodeType.
LeslieKid Aug 20, 2024
be6d196
Merge branch 'remote-compaction-service' into horaemeta-monitor
LeslieKid Aug 20, 2024
a4286e5
introduce node type in meta client.
LeslieKid Aug 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ license = "Apache-2.0"
[workspace]
resolver = "2"
# In alphabetical order
members = [
members = [
"horaectl",
"integration_tests",
"integration_tests/sdk/rust",
Expand All @@ -33,7 +33,8 @@ members = [
"src/catalog",
"src/catalog_impls",
"src/cluster",
"src/common_types",
"src/common_types",
"src/compaction_client",
"src/components/alloc_tracker",
"src/components/arena",
"src/components/arrow_ext",
Expand Down Expand Up @@ -101,7 +102,8 @@ thiserror = "1"
bytes_ext = { path = "src/components/bytes_ext" }
catalog = { path = "src/catalog" }
catalog_impls = { path = "src/catalog_impls" }
horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "19ece8f771fc0b3e8e734072cc3d8040de6c74cb" }
# TODO(leslie): modify it when the related pr in incubator-horaedb-proto is merged.
horaedbproto = { git = "https://github.com/LeslieKid/incubator-horaedb-proto.git", rev = "8c9fb72d95476caf867cd62993be6664e34c5ce5" }
codec = { path = "src/components/codec" }
chrono = "0.4"
clap = { version = "4.5.1", features = ["derive"] }
Expand All @@ -110,6 +112,7 @@ cluster = { path = "src/cluster" }
criterion = "0.5"
horaedb-client = "1.0.2"
common_types = { path = "src/common_types" }
compaction_client = { path = "src/compaction_client" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
derive_builder = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion horaemeta/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ install-tools:
@mkdir -p $(GO_TOOLS_BIN_PATH)
@(which golangci-lint && golangci-lint version | grep '1.54') >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.54.2

META_PKG := github.com/apache/incubator-horaedb-meta
META_PKG := github.com/LeslieKid/incubator-horaedb-meta
PACKAGES := $(shell go list ./... | tail -n +2)
PACKAGE_DIRECTORIES := $(subst $(META_PKG)/,,$(PACKAGES))

Expand Down
2 changes: 1 addition & 1 deletion horaemeta/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/apache/incubator-horaedb-meta
go 1.21

require (
github.com/apache/incubator-horaedb-proto/golang v0.0.0-20231228071726-92152841fc8a
github.com/LeslieKid/incubator-horaedb-proto/golang v0.0.0-20240819012818-8c9fb72d9547
github.com/caarlos0/env/v6 v6.10.1
github.com/julienschmidt/httprouter v1.3.0
github.com/looplab/fsm v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions horaemeta/go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/LeslieKid/incubator-horaedb-proto/golang v0.0.0-20240819012818-8c9fb72d9547 h1:X/YU9jswF/FTnBANq+2SprBn4pA9Ca93PuA2kZPeV/s=
github.com/LeslieKid/incubator-horaedb-proto/golang v0.0.0-20240819012818-8c9fb72d9547/go.mod h1:lNDTvwHO70v9gtVzaefpPmBdTs63hs57wFT2WdmJI+Q=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/incubator-horaedb-proto/golang v0.0.0-20231228071726-92152841fc8a h1:lQVr4wkixN4N5dOKRUBU1lnqkiHTFg6Im+tqJ8Y7XOE=
github.com/apache/incubator-horaedb-proto/golang v0.0.0-20231228071726-92152841fc8a/go.mod h1:Ch92HPIAoGbrgFCtpSgxcYSRgWdpNsIcPG1lfv24Ufs=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
62 changes: 49 additions & 13 deletions horaemeta/server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sort"
"sync"

"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/clusterpb"
"github.com/apache/incubator-horaedb-meta/server/id"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/pkg/errors"
Expand All @@ -53,7 +54,13 @@ type ClusterMetadata struct {
topologyManager TopologyManager

// Manage the registered nodes from heartbeat.
registeredNodesCache map[string]RegisteredNode // nodeName -> NodeName
// TODO(leslie): Rename it to registeredHoraedbNodesCache?
registeredNodesCache map[string]RegisteredNode // nodeName -> NodeName
registeredCompactionNodesCache map[string]RegisteredNode // nodeName -> NodeName

// Maintain a list to store the keys of compaction nodes.
// TODO(leslie): Unused now, will be used in compaction nodes scheduling strategy.
compactionNodesKeyList []string

storage storage.Storage
kv clientv3.KV
Expand All @@ -67,16 +74,18 @@ func NewClusterMetadata(logger *zap.Logger, meta storage.Cluster, storage storag
shardIDAlloc := id.NewReusableAllocatorImpl([]uint64{}, MinShardID)

cluster := &ClusterMetadata{
logger: logger,
clusterID: meta.ID,
lock: sync.RWMutex{},
metaData: meta,
tableManager: NewTableManagerImpl(logger, storage, meta.ID, schemaIDAlloc, tableIDAlloc),
topologyManager: NewTopologyManagerImpl(logger, storage, meta.ID, shardIDAlloc),
registeredNodesCache: map[string]RegisteredNode{},
storage: storage,
kv: kv,
shardIDAlloc: shardIDAlloc,
logger: logger,
clusterID: meta.ID,
lock: sync.RWMutex{},
metaData: meta,
tableManager: NewTableManagerImpl(logger, storage, meta.ID, schemaIDAlloc, tableIDAlloc),
topologyManager: NewTopologyManagerImpl(logger, storage, meta.ID, shardIDAlloc),
registeredNodesCache: map[string]RegisteredNode{},
registeredCompactionNodesCache: map[string]RegisteredNode{},
compactionNodesKeyList: []string{},
storage: storage,
kv: kv,
shardIDAlloc: shardIDAlloc,
}

return cluster
Expand Down Expand Up @@ -437,6 +446,20 @@ func (c *ClusterMetadata) GetShardNodeByTableIDs(tableIDs []storage.TableID) (Ge

func (c *ClusterMetadata) RegisterNode(ctx context.Context, registeredNode RegisteredNode) error {
registeredNode.Node.State = storage.NodeStateOnline
// Register compaction node.
if registeredNode.Node.NodeStats.NodeType == clusterpb.NodeType_CompactionServer {
c.lock.Lock()
defer c.lock.Unlock()

_, exists := c.registeredCompactionNodesCache[registeredNode.Node.Name]
c.registeredCompactionNodesCache[registeredNode.Node.Name] = registeredNode

if !exists {
c.compactionNodesKeyList = append(c.compactionNodesKeyList, registeredNode.Node.Name)
}
return nil
}

err := c.storage.CreateOrUpdateNode(ctx, storage.CreateOrUpdateNodeRequest{
ClusterID: c.clusterID,
Node: registeredNode.Node,
Expand All @@ -456,10 +479,11 @@ func (c *ClusterMetadata) RegisterNode(ctx context.Context, registeredNode Regis
}
}

// Update shard node mapping.
// Check whether to update persistence data.
// Register horaedb node.
oldCache, exists := c.registeredNodesCache[registeredNode.Node.Name]
c.registeredNodesCache[registeredNode.Node.Name] = registeredNode

// Check whether to update persistent data.
enableUpdateWhenStable := c.metaData.TopologyType == storage.TopologyTypeDynamic
if !enableUpdateWhenStable && c.topologyManager.GetClusterState() == storage.ClusterStateStable {
return nil
Expand All @@ -470,6 +494,7 @@ func (c *ClusterMetadata) RegisterNode(ctx context.Context, registeredNode Regis
return nil
}

// Update shard node mapping.
shardNodes := make(map[string][]storage.ShardNode, 1)
shardNodes[registeredNode.Node.Name] = make([]storage.ShardNode, 0, len(registeredNode.ShardInfos))
for _, shardInfo := range registeredNode.ShardInfos {
Expand Down Expand Up @@ -498,6 +523,17 @@ func (c *ClusterMetadata) GetRegisteredNodes() []RegisteredNode {
return nodes
}

func (c *ClusterMetadata) GetRegisteredCompactionNodes() []RegisteredNode {
c.lock.RLock()
defer c.lock.RUnlock()

nodes := make([]RegisteredNode, 0, len(c.registeredCompactionNodesCache))
for _, node := range c.registeredCompactionNodesCache {
nodes = append(nodes, node)
}
return nodes
}

func (c *ClusterMetadata) GetRegisteredNodeByName(nodeName string) (RegisteredNode, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion horaemeta/server/cluster/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"

"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/pkg/errors"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/apache/incubator-horaedb-meta/pkg/coderr"
"github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
"github.com/apache/incubator-horaedb-meta/server/service"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaeventpb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaeventpb"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
Expand Down
2 changes: 1 addition & 1 deletion horaemeta/server/coordinator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/operation/transferleader"
"github.com/apache/incubator-horaedb-meta/server/id"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/pkg/errors"
"go.uber.org/zap"
)
Expand Down
2 changes: 1 addition & 1 deletion horaemeta/server/coordinator/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down
2 changes: 1 addition & 1 deletion horaemeta/server/coordinator/procedure/ddl/common_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator/eventdispatch"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/pkg/errors"
"go.uber.org/zap"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/looplab/fsm"
"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl/createpartitiontable"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/stretchr/testify/require"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/looplab/fsm"
"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl/createtable"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/stretchr/testify/require"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl/droppartitiontable"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/clusterpb"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/clusterpb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/stretchr/testify/require"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/looplab/fsm"
"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl/droptable"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/stretchr/testify/require"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure"
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/ddl"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaservicepb"
"github.com/looplab/fsm"
"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down
2 changes: 1 addition & 1 deletion horaemeta/server/coordinator/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"sync"

"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaeventpb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaeventpb"
"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down
2 changes: 1 addition & 1 deletion horaemeta/server/coordinator/watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

"github.com/apache/incubator-horaedb-meta/server/etcdutil"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metaeventpb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metaeventpb"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down
2 changes: 1 addition & 1 deletion horaemeta/server/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/apache/incubator-horaedb-meta/pkg/log"
"github.com/apache/incubator-horaedb-meta/server/etcdutil"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metastoragepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metastoragepb"
"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down
2 changes: 1 addition & 1 deletion horaemeta/server/member/watch_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/apache/incubator-horaedb-meta/pkg/assert"
"github.com/apache/incubator-horaedb-meta/pkg/log"
"github.com/apache/incubator-horaedb-meta/server/etcdutil"
"github.com/apache/incubator-horaedb-proto/golang/pkg/metastoragepb"
"github.com/LeslieKid/incubator-horaedb-proto/golang/pkg/metastoragepb"
"go.uber.org/zap"
)

Expand Down
Loading
Loading