Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VStreamer: For larger compressed transaction payloads, stream the internal contents #17239

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ type TransactionPayload struct {
payload []byte
reader io.Reader
iterator func() (BinlogEvent, error)
// StreamingContents tells the consumer that we are streaming the
// decompressed payload and they should also stream the events.
// This ensures that neither the producer nor the consumer are
// holding the entire payload's contents in memory.
StreamingContents bool
}

// IsTransactionPayload returns true if a compressed transaction
Expand Down Expand Up @@ -292,6 +297,8 @@ func (tp *TransactionPayload) decompress() error {
}
compressedTrxPayloadsUsingStream.Add(1)
tp.reader = streamDecoder
// Signal the consumer to also stream the contents.
tp.StreamingContents = true
return nil
}

Expand Down
32 changes: 28 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
return fmt.Errorf("unexpected server EOF")
}
vevents, err := vs.parseEvent(ev)
vevents, err := vs.parseEvent(ev, bufferAndTransmit)
if err != nil {
vs.vse.errorCounts.Add("ParseEvent", 1)
return err
Expand Down Expand Up @@ -416,7 +416,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}

// parseEvent parses an event from the binlog and converts it to a list of VEvents.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) {
// The bufferAndTransmit function must be passed if the event is a TransactionPayloadEvent
// as for larger payloads (> ZstdInMemoryDecompressorMaxSize) the internal events need
// to be streamed directly here in order to avoid holding the entire payload's contents,
// which can be 10s or even 100s of GiBs, all in memory.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vevent *binlogdatapb.VEvent) error) ([]*binlogdatapb.VEvent, error) {
if !ev.IsValid() {
return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
}
Expand Down Expand Up @@ -672,11 +676,31 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
return nil, err
}
tpvevents, err := vs.parseEvent(tpevent)
tpvevents, err := vs.parseEvent(tpevent, nil) // Parse the internal event
if err != nil {
return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event")
}
vevents = append(vevents, tpvevents...)
if tp.StreamingContents {
// Transmit each internal event individually to avoid buffering
// the large transaction's entire payload of events in memory, as
// the uncompressed size can be 10s or even 100s of GiBs in size.
if bufferAndTransmit == nil {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[bug] cannot stream compressed transaction payload's internal events as no bufferAndTransmit function was provided")
}
for _, tpvevent := range tpvevents {
tpvevent.Timestamp = int64(ev.Timestamp())
tpvevent.CurrentTime = time.Now().UnixNano()
if err := bufferAndTransmit(tpvevent); err != nil {
if err == io.EOF {
return nil, nil
}
vs.vse.errorCounts.Add("TransactionPayloadBufferAndTransmit", 1)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error sending compressed transaction payload's internal event: %v", err)
}
}
} else { // Process the payload's internal events all at once
vevents = append(vevents, tpvevents...)
}
}
vs.vse.vstreamerCompressedTransactionsDecoded.Add(1)
}
Expand Down
Loading