diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 8c6eafe9c1c..01a391d0cad 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -21,6 +21,7 @@ Flags: --backup_storage_compress if set, the backup files will be compressed. (default true) --backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2) --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. + --binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728) --binlog_host string PITR restore parameter: hostname/IP of binlog server. --binlog_password string PITR restore parameter: password of binlog server. --binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc") diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 35a07b265bc..8be7b620469 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -56,6 +56,7 @@ Flags: --backup_storage_implementation string Which backup storage implementation to use for creating and restoring backups. --backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2) --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. + --binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728) --binlog_host string PITR restore parameter: hostname/IP of binlog server. --binlog_password string PITR restore parameter: password of binlog server. --binlog_player_grpc_ca string the server ca to use to validate servers when connecting diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index 1cb38d5cb16..7455218d4b5 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -58,11 +58,6 @@ const ( // Length of the binlog event header for internal events within // the transaction payload. headerLen = binlogEventLenOffset + eventLenBytes - - // At what size should we switch from the in-memory buffer - // decoding to streaming mode which is much slower, but does - // not require everything be done in memory. - zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB ) var ( @@ -75,13 +70,18 @@ var ( compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory") compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream") + // At what size should we switch from the in-memory buffer + // decoding to streaming mode which is much slower, but does + // not require everything be done in memory all at once. + ZstdInMemoryDecompressorMaxSize = uint64(128 << (10 * 2)) // 128MiB + // A concurrent stateless decoder that caches decompressors. This is // used for smaller payloads that we want to handle entirely using // in-memory buffers via DecodeAll. statelessDecoder *zstd.Decoder // A pool of stateful decoders for larger payloads that we want to - // stream. The number of large (> zstdInMemoryDecompressorMaxSize) + // stream. The number of large (> ZstdInMemoryDecompressorMaxSize) // payloads should typically be relatively low, but there may be times // where there are many of them -- and users like vstreamer may have // N concurrent streams per tablet which could lead to a lot of @@ -271,7 +271,7 @@ func (tp *TransactionPayload) decode() error { } // decompress decompresses the payload. If the payload is larger than -// zstdInMemoryDecompressorMaxSize then we stream the decompression via +// ZstdInMemoryDecompressorMaxSize then we stream the decompression via // the package's pool of zstd.Decoders, otherwise we use in-memory // buffers with the package's concurrent statelessDecoder. // In either case, we setup the reader that can be used within the @@ -284,7 +284,7 @@ func (tp *TransactionPayload) decompress() error { // Switch to slower but less memory intensive stream mode for // larger payloads. - if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize { + if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize { in := bytes.NewReader(tp.payload) streamDecoder, err := statefulDecoderPool.Get(in) if err != nil { @@ -366,7 +366,10 @@ func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected *zstd.Decoder but got %T", pooled) } } else { - d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) + // Use the minimum amount of memory we can in processing the transaction by + // setting lowMem to true and limiting the decoder concurrency to 1 so that + // there's no async decoding of multiple windows or blocks. + d, err := zstd.NewReader(nil, zstd.WithDecoderLowmem(true), zstd.WithDecoderConcurrency(1)) if err != nil { // Should only happen e.g. due to ENOMEM return nil, vterrors.Wrap(err, "failed to create stateful stream decoder") } diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index f173e27e4af..861d98c6e4f 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -141,6 +141,12 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) { }, } + // Ensure that we can process events where the *uncompressed* size is + // larger than ZstdInMemoryDecompressorMaxSize. The *compressed* size + // of the payload in large_compressed_trx_payload.bin is 16KiB so we + // set the max to 2KiB to test this. + ZstdInMemoryDecompressorMaxSize = 2048 + for _, tc := range testCases { memDecodingCnt := compressedTrxPayloadsInMem.Get() streamDecodingCnt := compressedTrxPayloadsUsingStream.Get() @@ -191,7 +197,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) { totalSize += len(eventStr) require.True(t, strings.HasPrefix(eventStr, want)) } - require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize) + require.Greater(t, uint64(totalSize), ZstdInMemoryDecompressorMaxSize) } } } diff --git a/go/vt/vttablet/common/flags.go b/go/vt/vttablet/common/flags.go index f9775b8af3e..3c6141d62eb 100644 --- a/go/vt/vttablet/common/flags.go +++ b/go/vt/vttablet/common/flags.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/pflag" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/servenv" ) @@ -94,4 +95,6 @@ func registerFlags(fs *pflag.FlagSet) { fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table") fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.") + + fs.Uint64Var(&mysql.ZstdInMemoryDecompressorMaxSize, "binlog-in-memory-decompressor-max-size", mysql.ZstdInMemoryDecompressorMaxSize, "This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode.") }