Skip to content

Commit

Permalink
no need sync map
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 26, 2024
1 parent 5234a8a commit adb4d3b
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type partitionProgress struct {
partition int32
watermark uint64
watermarkOffset kafka.Offset
// tableSinkMap -> [tableID]tableSink
tableSinkMap sync.Map

tableSinkMap map[model.TableID]tablesink.TableSink

eventGroups map[int64]*eventsGroup
decoder codec.RowEventDecoder
Expand Down Expand Up @@ -432,16 +432,16 @@ func (w *writer) resolveRowChangedEvents(eventGroup map[int64]*eventsGroup, newW
if len(events) == 0 {
continue
}
tableSink, ok := progress.tableSinkMap.Load(tableID)
tableSink, ok := progress.tableSinkMap[tableID]
if !ok {
tableSink = w.sinkFactory.CreateTableSinkForConsumer(
model.DefaultChangeFeedID("kafka-consumer"),
spanz.TableIDToComparableSpan(tableID),
events[0].CommitTs,
)
progress.tableSinkMap.Store(tableID, tableSink)
progress.tableSinkMap[tableID] = tableSink
}
tableSink.(tablesink.TableSink).AppendRowChangedEvents(events...)
tableSink.AppendRowChangedEvents(events...)
}
}

Expand Down Expand Up @@ -553,16 +553,14 @@ func syncFlushRowChangedEvents(ctx context.Context, progress *partitionProgress,
default:
}
flushedResolvedTs := true
progress.tableSinkMap.Range(func(key, value interface{}) bool {
tableSink := value.(tablesink.TableSink)
for _, tableSink := range progress.tableSinkMap {
if err := tableSink.UpdateResolvedTs(resolvedTs); err != nil {
log.Panic("Failed to update resolved ts", zap.Error(err))
}
if tableSink.GetCheckpointTs().Less(resolvedTs) {
flushedResolvedTs = false
}
return true
})
}
if flushedResolvedTs {
return
}
Expand Down

0 comments on commit adb4d3b

Please sign in to comment.