Skip to content

Commit

Permalink
wip: hacky serialization
Browse files Browse the repository at this point in the history
Signed-off-by: Vicent Marti <[email protected]>
  • Loading branch information
vmg committed Nov 6, 2024
1 parent a975be1 commit cc6ea94
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 33 deletions.
2 changes: 1 addition & 1 deletion go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (c *Conn) readPacketAsMemBuffer() (mem.Buffer, error) {
return mem.SliceBuffer(data), nil
}

const RawPacketsPos = 8
const RawPacketsPos = 20

func updateProtoHeader(b []byte, v int) {
b[0] = byte(protowire.EncodeTag(RawPacketsPos, protowire.BytesType))
Expand Down
3 changes: 0 additions & 3 deletions go/sqltypes/proto3.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ func Proto3ToResult(qr *querypb.QueryResult) *Result {
if qr == nil {
return nil
}
if qr.RawPackets != nil {
panic("Proto3ToResult with raw mysql packets")
}
return &Result{
Fields: qr.Fields,
RowsAffected: qr.RowsAffected,
Expand Down
7 changes: 5 additions & 2 deletions go/vt/proto/query/query.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 4 additions & 7 deletions go/vt/proto/query/query_membuffer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 24 additions & 16 deletions go/vt/servenv/grpc_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type vtprotoMessage interface {
}

type vtprotoMemBuffer interface {
MarshalToMemBufferVT() (mem.BufferSlice, error)
vtprotoMessage
MarshalToMemBufferVT() (mem.BufferSlice, bool)
}

type Codec struct {
Expand All @@ -46,25 +47,32 @@ func (Codec) Name() string { return Name }

var defaultBufferPool = mem.DefaultBufferPool()

func marshalPooled(m vtprotoMessage) (mem.BufferSlice, error) {
size := m.SizeVT()
if mem.IsBelowBufferPoolingThreshold(size) {
buf := make([]byte, size)
if _, err := m.MarshalToSizedBufferVT(buf[:size]); err != nil {
return nil, err
}
return mem.BufferSlice{mem.SliceBuffer(buf)}, nil
}
buf := defaultBufferPool.Get(size)
if _, err := m.MarshalToSizedBufferVT((*buf)[:size]); err != nil {
defaultBufferPool.Put(buf)
return nil, err
}
return mem.BufferSlice{mem.NewBuffer(buf, defaultBufferPool)}, nil
}

func (c *Codec) Marshal(v any) (mem.BufferSlice, error) {
switch m := v.(type) {
case vtprotoMemBuffer:
return m.MarshalToMemBufferVT()
case vtprotoMessage:
size := m.SizeVT()
if mem.IsBelowBufferPoolingThreshold(size) {
buf := make([]byte, size)
if _, err := m.MarshalToSizedBufferVT(buf[:size]); err != nil {
return nil, err
}
return mem.BufferSlice{mem.SliceBuffer(buf)}, nil
if s, ok := m.MarshalToMemBufferVT(); ok {
return s, nil
}
buf := defaultBufferPool.Get(size)
if _, err := m.MarshalToSizedBufferVT((*buf)[:size]); err != nil {
defaultBufferPool.Put(buf)
return nil, err
}
return mem.BufferSlice{mem.NewBuffer(buf, defaultBufferPool)}, nil
return marshalPooled(m)
case vtprotoMessage:
return marshalPooled(m)
default:
return c.fallback.Marshal(v)
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func (q *query) Execute(ctx context.Context, request *querypb.ExecuteRequest) (r
return nil, vterrors.ToGRPC(err)
}
return &querypb.ExecuteResponse{
Result: sqltypes.ResultToProto3(result),
Result: sqltypes.ResultToProto3(result),
RawPackets_: result.RawPackets,
}, nil
}

Expand Down
8 changes: 5 additions & 3 deletions proto/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,6 @@ message QueryResult {
repeated Row rows = 4;
string info = 6;
string session_state_changes = 7;

// raw_packets contains raw MySQL packets if raw_mysql_packets was set in ExecuteOptions.
repeated bytes raw_packets = 8;
}

// QueryWarning is used to convey out of band query execution warnings
Expand Down Expand Up @@ -487,6 +484,9 @@ message ExecuteRequest {
// ExecuteResponse is the returned value from Execute
message ExecuteResponse {
QueryResult result = 1;

// raw_packets contains raw MySQL packets if raw_mysql_packets was set in ExecuteOptions.
repeated bytes raw_packets = 2;
}

// ResultWithError represents a query response
Expand Down Expand Up @@ -693,6 +693,8 @@ message BeginExecuteResponse {
// The session_state_changes might be set if the transaction is a snapshot transaction
// and the MySQL implementation supports getting a start gtid on snapshot
string session_state_changes = 5;

repeated bytes raw_packets = 20;
}

// BeginStreamExecuteRequest is the payload to BeginStreamExecute
Expand Down

0 comments on commit cc6ea94

Please sign in to comment.