Skip to content

Commit

Permalink
Filter dropped indexes to avoid META NOT FOUND (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Sep 30, 2024
1 parent 386b603 commit 6b52ce7
Show file tree
Hide file tree
Showing 24 changed files with 3,626 additions and 175 deletions.
8 changes: 8 additions & 0 deletions pkg/ccr/ingest_binlog_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit
}

for _, indexId := range indexIds {
if j.srcMeta.IsIndexDropped(indexId) {
continue
}
srcIndexMeta, ok := srcIndexIdMap[indexId]
if !ok {
j.setError(xerror.Errorf(xerror.Meta, "index id %v not found in src meta", indexId))
Expand All @@ -400,6 +403,11 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit
destPartitionId: destPartitionId,
}
for _, indexId := range indexIds {
if j.srcMeta.IsIndexDropped(indexId) {
log.Infof("skip the dropped index %d", indexId)
continue
}

srcIndexMeta := srcIndexIdMap[indexId]
destIndexMeta := destIndexNameMap[getSrcIndexName(job, srcIndexMeta)]
prepareIndexArg.srcIndexMeta = srcIndexMeta
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccr/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,3 +1177,7 @@ func (m *Meta) IsPartitionDropped(partitionId int64) bool {
func (m *Meta) IsTableDropped(partitionId int64) bool {
panic("IsTableDropped is not supported, please use ThriftMeta instead")
}

func (m *Meta) IsIndexDropped(indexId int64) bool {
panic("IsIndexDropped is not supported, please use ThriftMeta instead")
}
1 change: 1 addition & 0 deletions pkg/ccr/metaer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type IngestBinlogMetaer interface {
GetBackendMap() (map[int64]*base.Backend, error)
IsPartitionDropped(partitionId int64) bool
IsTableDropped(tableId int64) bool
IsIndexDropped(indexId int64) bool
}

type Metaer interface {
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccr/thrift_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,24 @@ func NewThriftMeta(spec *base.Spec, rpcFactory rpc.IRpcFactory, tableIds []int64
for _, table := range dbMeta.GetDroppedTables() {
droppedTables[table] = struct{}{}
}
droppedIndexes := make(map[int64]struct{})
for _, index := range dbMeta.GetDroppedIndexes() {
droppedIndexes[index] = struct{}{}
}

return &ThriftMeta{
meta: meta,
droppedPartitions: droppedPartitions,
droppedTables: droppedTables,
droppedIndexes: droppedIndexes,
}, nil
}

type ThriftMeta struct {
meta *Meta
droppedPartitions map[int64]struct{}
droppedTables map[int64]struct{}
droppedIndexes map[int64]struct{}
}

func (tm *ThriftMeta) GetTablets(tableId, partitionId, indexId int64) (*btree.Map[int64, *TabletMeta], error) {
Expand Down Expand Up @@ -246,3 +252,9 @@ func (tm *ThriftMeta) IsTableDropped(tableId int64) bool {
_, ok := tm.droppedTables[tableId]
return ok
}

// Whether the target index are dropped
func (tm *ThriftMeta) IsIndexDropped(tableId int64) bool {
_, ok := tm.droppedIndexes[tableId]
return ok
}
96 changes: 96 additions & 0 deletions pkg/rpc/kitex_gen/datasinks/DataSinks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions pkg/rpc/kitex_gen/datasinks/k-DataSinks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6b52ce7

Please sign in to comment.