Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binlog: Improve ZstdInMemoryDecompressorMaxSize management #17220

Merged
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Flags:
--backup_storage_compress if set, the backup files will be compressed. (default true)
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728)
--binlog_host string PITR restore parameter: hostname/IP of binlog server.
--binlog_password string PITR restore parameter: password of binlog server.
--binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc")
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Flags:
--backup_storage_implementation string Which backup storage implementation to use for creating and restoring backups.
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728)
--binlog_host string PITR restore parameter: hostname/IP of binlog server.
--binlog_password string PITR restore parameter: password of binlog server.
--binlog_player_grpc_ca string the server ca to use to validate servers when connecting
Expand Down
21 changes: 12 additions & 9 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ const (
// Length of the binlog event header for internal events within
// the transaction payload.
headerLen = binlogEventLenOffset + eventLenBytes

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory.
zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
)

var (
Expand All @@ -75,13 +70,18 @@ var (
compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory")
compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream")

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory all at once.
ZstdInMemoryDecompressorMaxSize = uint64(128 << (10 * 2)) // 128MiB

// A concurrent stateless decoder that caches decompressors. This is
// used for smaller payloads that we want to handle entirely using
// in-memory buffers via DecodeAll.
statelessDecoder *zstd.Decoder

// A pool of stateful decoders for larger payloads that we want to
// stream. The number of large (> zstdInMemoryDecompressorMaxSize)
// stream. The number of large (> ZstdInMemoryDecompressorMaxSize)
// payloads should typically be relatively low, but there may be times
// where there are many of them -- and users like vstreamer may have
// N concurrent streams per tablet which could lead to a lot of
Expand Down Expand Up @@ -271,7 +271,7 @@ func (tp *TransactionPayload) decode() error {
}

// decompress decompresses the payload. If the payload is larger than
// zstdInMemoryDecompressorMaxSize then we stream the decompression via
// ZstdInMemoryDecompressorMaxSize then we stream the decompression via
// the package's pool of zstd.Decoders, otherwise we use in-memory
// buffers with the package's concurrent statelessDecoder.
// In either case, we setup the reader that can be used within the
Expand All @@ -284,7 +284,7 @@ func (tp *TransactionPayload) decompress() error {

// Switch to slower but less memory intensive stream mode for
// larger payloads.
if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize {
if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize {
in := bytes.NewReader(tp.payload)
streamDecoder, err := statefulDecoderPool.Get(in)
if err != nil {
Expand Down Expand Up @@ -366,7 +366,10 @@ func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected *zstd.Decoder but got %T", pooled)
}
} else {
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize))
// Use the minimum amount of memory we can in processing the transaction by
// setting lowMem to true and limiting the decoder concurrency to 1 so that
// there's no async decoding of multiple windows or blocks.
d, err := zstd.NewReader(nil, zstd.WithDecoderLowmem(true), zstd.WithDecoderConcurrency(1))
if err != nil { // Should only happen e.g. due to ENOMEM
return nil, vterrors.Wrap(err, "failed to create stateful stream decoder")
}
Expand Down
8 changes: 7 additions & 1 deletion go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
},
}

// Ensure that we can process events where the *uncompressed* size is
// larger than ZstdInMemoryDecompressorMaxSize. The *compressed* size
// of the payload in large_compressed_trx_payload.bin is 16KiB so we
// set the max to 2KiB to test this.
ZstdInMemoryDecompressorMaxSize = 2048

for _, tc := range testCases {
memDecodingCnt := compressedTrxPayloadsInMem.Get()
streamDecodingCnt := compressedTrxPayloadsUsingStream.Get()
Expand Down Expand Up @@ -191,7 +197,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
totalSize += len(eventStr)
require.True(t, strings.HasPrefix(eventStr, want))
}
require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize)
require.Greater(t, uint64(totalSize), ZstdInMemoryDecompressorMaxSize)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/servenv"
)

Expand Down Expand Up @@ -94,4 +95,6 @@ func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table")

fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.")

fs.Uint64Var(&mysql.ZstdInMemoryDecompressorMaxSize, "binlog-in-memory-decompressor-max-size", mysql.ZstdInMemoryDecompressorMaxSize, "This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add validation to the entered value - there have to be some bounds I guess? And first read it into a local variable before assigning it into mysql.ZstdInMemoryDecompressorMaxSize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bounds are set by the type. If it’s 0 then streaming mode will always be used. If it’s the max then the in-memory buffers will always be used.

Why would we read it into a local variable first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we read it into a local variable first?

We'd need to if there was need for boundary checking. But per your comment boundary checking is not requried, so a local variable is not needed.

}
Loading