Skip to content

Commit

Permalink
try to fix pulsar consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 6, 2025
1 parent 1cedebf commit de5956e
Showing 1 changed file with 12 additions and 23 deletions.
35 changes: 12 additions & 23 deletions cmd/pulsar-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,6 @@ func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) {
if err != nil {
log.Panic("invalid enable-tidb-extension of upstream-uri")
}
if enableTiDBExtension {
if o.protocol != config.ProtocolCanalJSON && o.protocol != config.ProtocolAvro {
log.Panic("enable-tidb-extension only work with canal-json / avro")
}
}
o.enableTiDBExtension = enableTiDBExtension
}

Expand Down Expand Up @@ -180,10 +175,7 @@ func run(cmd *cobra.Command, args []string) {
err := logutil.InitLogger(&logutil.Config{
Level: consumerOption.logLevel,
File: consumerOption.logPath,
},
logutil.WithInitGRPCLogger(),
logutil.WithInitSaramaLogger(),
)
})
if err != nil {
log.Error("init logger failed", zap.Error(err))
return
Expand Down Expand Up @@ -215,7 +207,7 @@ func run(cmd *cobra.Command, args []string) {
defer pulsarConsumer.Close()
msgChan := pulsarConsumer.Chan()

wg := &sync.WaitGroup{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -228,7 +220,7 @@ func run(cmd *cobra.Command, args []string) {
log.Debug(fmt.Sprintf("Received message msgId: %#v -- content: '%s'\n",
consumerMsg.ID(),
string(consumerMsg.Payload())))
err := consumer.HandleMsg(consumerMsg.Message)
err = consumer.HandleMsg(consumerMsg.Message)
if err != nil {
log.Panic("Error consuming message", zap.Error(err))
}
Expand All @@ -243,7 +235,7 @@ func run(cmd *cobra.Command, args []string) {
wg.Add(1)
go func() {
defer wg.Done()
if err := consumer.Run(ctx); err != nil {
if err = consumer.Run(ctx); err != nil {
if err != context.Canceled {
log.Panic("Error running consumer", zap.Error(err))
}
Expand Down Expand Up @@ -327,6 +319,8 @@ func NewPulsarConsumer(option *ConsumerOption) (pulsar.Consumer, pulsar.Client)

// partitionSinks maintained for each partition, it may sync data for multiple tables.
type partitionSinks struct {
decoder codec.RowEventDecoder

tablesCommitTsMap sync.Map
tableSinksMap sync.Map
// resolvedTs record the maximum timestamp of the received event
Expand All @@ -353,8 +347,7 @@ type Consumer struct {

codecConfig *common.Config

option *ConsumerOption
decoder codec.RowEventDecoder
option *ConsumerOption
}

// NewConsumer creates a new cdc pulsar consumer
Expand All @@ -373,21 +366,17 @@ func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) {

c.codecConfig = common.NewConfig(o.protocol)
c.codecConfig.EnableTiDBExtension = o.enableTiDBExtension
if c.codecConfig.Protocol != config.ProtocolCanalJSON {
log.Panic("Protocol not supported", zap.Any("Protocol", c.codecConfig.Protocol))
}

decoder, err := canal.NewBatchDecoder(ctx, c.codecConfig, nil)
if err != nil {
return nil, errors.Trace(err)
}
c.decoder = decoder

c.sinks = make([]*partitionSinks, o.partitionNum)
ctx, cancel := context.WithCancel(ctx)
errChan := make(chan error, 1)
for i := 0; i < o.partitionNum; i++ {
c.sinks[i] = &partitionSinks{}
c.sinks[i] = &partitionSinks{
decoder: decoder,
}
}

changefeedID := model.DefaultChangeFeedID("pulsar-consumer")
Expand Down Expand Up @@ -451,7 +440,7 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error {
panic("sink should initialized")
}

decoder := c.decoder
decoder := sink.decoder
if err := decoder.AddKeyValue([]byte(msg.Key()), msg.Payload()); err != nil {
log.Error("add key value to the decoder failed", zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -683,7 +672,7 @@ func (c *Consumer) Run(ctx context.Context) error {
}

// 4. flush all the DMLs that commitTs <= globalResolvedTs
if err := c.forEachSink(func(sink *partitionSinks) error {
if err = c.forEachSink(func(sink *partitionSinks) error {
return flushRowChangedEvents(ctx, sink, c.globalResolvedTs)
}); err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit de5956e

Please sign in to comment.