diff --git a/go/mysql/conn.go b/go/mysql/conn.go index dd63f6ebd42..8a8bc684280 100644 --- a/go/mysql/conn.go +++ b/go/mysql/conn.go @@ -469,7 +469,7 @@ func (c *Conn) readPacketAsMemBuffer() (mem.Buffer, error) { return mem.SliceBuffer(data), nil } -const RawPacketsPos = 8 +const RawPacketsPos = 20 func updateProtoHeader(b []byte, v int) { b[0] = byte(protowire.EncodeTag(RawPacketsPos, protowire.BytesType)) diff --git a/go/sqltypes/proto3.go b/go/sqltypes/proto3.go index 8758b2305a2..6eaefcf4b0c 100644 --- a/go/sqltypes/proto3.go +++ b/go/sqltypes/proto3.go @@ -115,9 +115,6 @@ func Proto3ToResult(qr *querypb.QueryResult) *Result { if qr == nil { return nil } - if qr.RawPackets != nil { - panic("Proto3ToResult with raw mysql packets") - } return &Result{ Fields: qr.Fields, RowsAffected: qr.RowsAffected, diff --git a/go/vt/proto/query/query.pb.go b/go/vt/proto/query/query.pb.go index 00198053dfd..50107d5a9e0 100644 --- a/go/vt/proto/query/query.pb.go +++ b/go/vt/proto/query/query.pb.go @@ -25,6 +25,7 @@ package query import ( + "google.golang.org/grpc/mem" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -1730,8 +1731,6 @@ type QueryResult struct { Rows []*Row `protobuf:"bytes,4,rep,name=rows,proto3" json:"rows,omitempty"` Info string `protobuf:"bytes,6,opt,name=info,proto3" json:"info,omitempty"` SessionStateChanges string `protobuf:"bytes,7,opt,name=session_state_changes,json=sessionStateChanges,proto3" json:"session_state_changes,omitempty"` - // raw_packets contains raw MySQL packets if raw_mysql_packets was set in ExecuteOptions. - RawPackets [][]byte `protobuf:"bytes,8,rep,name=raw_packets,json=rawPackets,proto3" json:"raw_packets,omitempty"` } func (x *QueryResult) Reset() { @@ -2035,6 +2034,10 @@ type ExecuteResponse struct { unknownFields protoimpl.UnknownFields Result *QueryResult `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` + + // raw_packets contains raw MySQL packets if raw_mysql_packets was set in ExecuteOptions. + RawPackets [][]byte `protobuf:"bytes,2,rep,name=raw_packets,json=rawPackets,proto3" json:"raw_packets,omitempty"` + RawPackets_ []mem.Buffer } func (x *ExecuteResponse) Reset() { diff --git a/go/vt/proto/query/query_membuffer.go b/go/vt/proto/query/query_membuffer.go index d3e481596cf..9d878d2798a 100644 --- a/go/vt/proto/query/query_membuffer.go +++ b/go/vt/proto/query/query_membuffer.go @@ -18,12 +18,9 @@ package query import "google.golang.org/grpc/mem" -func (m *ExecuteResponse) MarshalToMemBufferVT() (mem.BufferSlice, error) { - // m.unknownFields = [][]byte{} - - var x mem.BufferSlice - for _, packet := range m.Result.RawPackets { - x = append(x, mem.SliceBuffer(packet)) +func (m *ExecuteResponse) MarshalToMemBufferVT() (mem.BufferSlice, bool) { + if m.Result != nil { + return nil, false } - return x, nil + return m.RawPackets_, true } diff --git a/go/vt/servenv/grpc_codec.go b/go/vt/servenv/grpc_codec.go index a06058b07c6..e41c5bd7ca9 100644 --- a/go/vt/servenv/grpc_codec.go +++ b/go/vt/servenv/grpc_codec.go @@ -35,7 +35,8 @@ type vtprotoMessage interface { } type vtprotoMemBuffer interface { - MarshalToMemBufferVT() (mem.BufferSlice, error) + vtprotoMessage + MarshalToMemBufferVT() (mem.BufferSlice, bool) } type Codec struct { @@ -46,25 +47,32 @@ func (Codec) Name() string { return Name } var defaultBufferPool = mem.DefaultBufferPool() +func marshalPooled(m vtprotoMessage) (mem.BufferSlice, error) { + size := m.SizeVT() + if mem.IsBelowBufferPoolingThreshold(size) { + buf := make([]byte, size) + if _, err := m.MarshalToSizedBufferVT(buf[:size]); err != nil { + return nil, err + } + return mem.BufferSlice{mem.SliceBuffer(buf)}, nil + } + buf := defaultBufferPool.Get(size) + if _, err := m.MarshalToSizedBufferVT((*buf)[:size]); err != nil { + defaultBufferPool.Put(buf) + return nil, err + } + return mem.BufferSlice{mem.NewBuffer(buf, defaultBufferPool)}, nil +} + func (c *Codec) Marshal(v any) (mem.BufferSlice, error) { switch m := v.(type) { case vtprotoMemBuffer: - return m.MarshalToMemBufferVT() - case vtprotoMessage: - size := m.SizeVT() - if mem.IsBelowBufferPoolingThreshold(size) { - buf := make([]byte, size) - if _, err := m.MarshalToSizedBufferVT(buf[:size]); err != nil { - return nil, err - } - return mem.BufferSlice{mem.SliceBuffer(buf)}, nil + if s, ok := m.MarshalToMemBufferVT(); ok { + return s, nil } - buf := defaultBufferPool.Get(size) - if _, err := m.MarshalToSizedBufferVT((*buf)[:size]); err != nil { - defaultBufferPool.Put(buf) - return nil, err - } - return mem.BufferSlice{mem.NewBuffer(buf, defaultBufferPool)}, nil + return marshalPooled(m) + case vtprotoMessage: + return marshalPooled(m) default: return c.fallback.Marshal(v) } diff --git a/go/vt/vttablet/grpcqueryservice/server.go b/go/vt/vttablet/grpcqueryservice/server.go index a1a1283b4ad..931cbfdb055 100644 --- a/go/vt/vttablet/grpcqueryservice/server.go +++ b/go/vt/vttablet/grpcqueryservice/server.go @@ -53,7 +53,8 @@ func (q *query) Execute(ctx context.Context, request *querypb.ExecuteRequest) (r return nil, vterrors.ToGRPC(err) } return &querypb.ExecuteResponse{ - Result: sqltypes.ResultToProto3(result), + Result: sqltypes.ResultToProto3(result), + RawPackets_: result.RawPackets, }, nil } diff --git a/proto/query.proto b/proto/query.proto index e3bf3eaeb3e..d4d390075f5 100644 --- a/proto/query.proto +++ b/proto/query.proto @@ -430,9 +430,6 @@ message QueryResult { repeated Row rows = 4; string info = 6; string session_state_changes = 7; - - // raw_packets contains raw MySQL packets if raw_mysql_packets was set in ExecuteOptions. - repeated bytes raw_packets = 8; } // QueryWarning is used to convey out of band query execution warnings @@ -487,6 +484,9 @@ message ExecuteRequest { // ExecuteResponse is the returned value from Execute message ExecuteResponse { QueryResult result = 1; + + // raw_packets contains raw MySQL packets if raw_mysql_packets was set in ExecuteOptions. + repeated bytes raw_packets = 2; } // ResultWithError represents a query response @@ -693,6 +693,8 @@ message BeginExecuteResponse { // The session_state_changes might be set if the transaction is a snapshot transaction // and the MySQL implementation supports getting a start gtid on snapshot string session_state_changes = 5; + + repeated bytes raw_packets = 20; } // BeginStreamExecuteRequest is the payload to BeginStreamExecute