Skip to content

Commit

Permalink
Skip decoding of excluded table rows (#831)
Browse files Browse the repository at this point in the history
* refactor: move decompression into DecodeData method

* feat: skip decoding of row events for tables we are not interested in
  • Loading branch information
mcuelenaere authored Oct 17, 2023
1 parent d5dd3d6 commit e598b6e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
13 changes: 13 additions & 0 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,19 @@ func (c *Canal) prepareSyncer() error {
Logger: c.cfg.Logger,
Dialer: c.cfg.Dialer,
Localhost: c.cfg.Localhost,
RowsEventDecodeFunc: func(event *replication.RowsEvent, data []byte) error {
pos, err := event.DecodeHeader(data)
if err != nil {
return err
}

key := fmt.Sprintf("%s.%s", string(event.Table.Schema), string(event.Table.Table))
if !c.checkTableMatch(key) {
return nil
}

return event.DecodeData(pos, data)
},
}

if strings.Contains(c.cfg.Addr, "/") {
Expand Down
15 changes: 8 additions & 7 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,14 @@ func (e *RowsEvent) decodeExtraData(data []byte) (err2 error) {
}

func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {
if e.compressed {
data, err2 = DecompressMariadbData(data[pos:])
if err2 != nil {
//nolint:nakedret
return
}
}

// Rows_log_event::print_verbose()

var (
Expand Down Expand Up @@ -1073,13 +1081,6 @@ func (e *RowsEvent) Decode(data []byte) error {
if err != nil {
return err
}
if e.compressed {
uncompressedData, err := DecompressMariadbData(data[pos:])
if err != nil {
return err
}
return e.DecodeData(0, uncompressedData)
}
return e.DecodeData(pos, data)
}

Expand Down

0 comments on commit e598b6e

Please sign in to comment.