Skip to content

Commit

Permalink
Fixes (#12)
Browse files Browse the repository at this point in the history
* Fix messaged

* Fix storage put/cat
  • Loading branch information
reshke authored Dec 15, 2023
1 parent dcd3c2d commit d0304c9
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 30 deletions.
16 changes: 9 additions & 7 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,28 +91,30 @@ var putCmd = &cobra.Command{
if n > 0 {
msg := proc.NewCopyDataMessage()
msg.Sz = uint64(n)
msg.Data = make([]byte, msg.Sz)
copy(msg.Data, chunk[:n])

_, err = con.Write(msg.Encode())
nwr, err := con.Write(msg.Encode())
if err != nil {
return err
}

ylogger.Zero.Debug().Int("len", nwr).Msg("written copy data msg")
}

if err == nil {
continue
}
if err == io.EOF {
break
} else {
return err
}
}

msg = proc.NewCommandCompleteMessage().Encode()
_, err = con.Write(msg)
if err != nil {
return err
}
ylogger.Zero.Debug().Msg("send command complete msg")

msg = proc.NewReadyForQueryMessage().Encode()
msg = proc.NewCommandCompleteMessage().Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion examples/yproxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ socket_path: "/tmp/yezzey.sock"

storage:
storage_endpoint: "https://storage.yandexcloud.net"
storage_prefix: "s3://gpyezzey/wal-e/6/"
storage_prefix: "wal-e/6/"
storage_bucket: "gpyezzey"
storage_region: "us-west-2"
7 changes: 3 additions & 4 deletions pkg/proc/cat_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
)

type CatMessage struct {
ProtoMessage
Decrypt bool
Name string
}

var _ ProtoMessage = &CatMessage{}

func NewCatMessage(name string, decrypt bool) *CatMessage {
return &CatMessage{
Name: name,
Expand Down Expand Up @@ -41,13 +42,11 @@ func (c *CatMessage) Encode() []byte {
return append(bs, bt...)
}

func (c *CatMessage) Decode(body []byte) error {
func (c *CatMessage) Decode(body []byte) {
c.Name = c.GetCatName(body[4:])
if body[1] == byte(DecryptMessage) {
c.Decrypt = true
}

return nil
}

func (c *CatMessage) GetCatName(b []byte) string {
Expand Down
6 changes: 3 additions & 3 deletions pkg/proc/command_complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package proc
import "encoding/binary"

type CommandCompleteMessage struct {
ProtoMessage
}

var _ ProtoMessage = &CommandCompleteMessage{}

func NewCommandCompleteMessage() *CommandCompleteMessage {
return &CommandCompleteMessage{}
}
Expand All @@ -25,6 +26,5 @@ func (cc *CommandCompleteMessage) Encode() []byte {
return append(bs, bt...)
}

func (c *CommandCompleteMessage) Decode(body []byte) error {
return nil
func (c *CommandCompleteMessage) Decode(body []byte) {
}
6 changes: 3 additions & 3 deletions pkg/proc/copy_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ func NewCopyDataMessage() *CopyDataMessage {
}

func (cc *CopyDataMessage) Encode() []byte {
bt := make([]byte, 4+cc.Sz)
bt := make([]byte, 4+8+cc.Sz)

bt[0] = byte(MessageTypeCopyData)

// sizeof(sz) + data
ln := len(bt) + 8 + 8 + int(cc.Sz)
ln := len(bt) + 8

bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, uint64(ln))

binary.BigEndian.PutUint64(bt[4:], uint64(cc.Sz))

// check data len more than cc.sz?
copy(bt[4+8:], cc.Data[:cc.Sz])
copy(bt[(4+8):], cc.Data[:cc.Sz])

return append(bs, bt...)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
return
}

ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request")

switch tp {
case MessageTypeCopyData:
msg := CopyDataMessage{}
Expand All @@ -86,9 +88,9 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
case MessageTypeCommandComplete:
msg := CommandCompleteMessage{}
msg.Decode(body)
case MessageTypeReadyForQuery:
msg := ReadyForQueryMessage{}
msg.Decode(body)
w.Close()

ylogger.Zero.Debug().Msg("closing msg writer")
return
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/proc/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ func (m MessageType) String() string {
switch m {
case MessageTypeCat:
return "CAT"
case MessageTypePut:
return "PUT"
case MessageTypeCommandComplete:
return "COMMAND COMPLETE"
case MessageTypeReadyForQuery:
return "READY FOR QUERY"
case MessageTypeCopyData:
return "COPY DATA"
}
return "UNKNOWN"
}
42 changes: 38 additions & 4 deletions pkg/proc/message_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proc_test

import (
"encoding/binary"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -29,9 +30,8 @@ func TestCatMsg(t *testing.T) {

msg2 := proc.CatMessage{}

err := msg2.Decode(body[8:])
msg2.Decode(body[8:])

assert.NoError(err)
assert.Equal(msg.Name, msg2.Name)
assert.Equal(msg.Decrypt, msg2.Decrypt)
}
Expand Down Expand Up @@ -59,10 +59,44 @@ func TestPutMsg(t *testing.T) {

msg2 := proc.CatMessage{}

err := msg2.Decode(body[8:])
msg2.Decode(body[8:])

assert.NoError(err)
assert.Equal(msg.Name, msg2.Name)
assert.Equal(msg.Encrypt, msg2.Decrypt)
}
}

func TestCopyDataMsg(t *testing.T) {
assert := assert.New(t)

type tcase struct {
body []byte
err error
}

for _, tt := range []tcase{
{
[]byte(
"hiuefheiufheuif",
),
nil,
},
} {

msg := proc.NewCopyDataMessage()
msg.Data = tt.body
msg.Sz = uint64(len(tt.body))
body := msg.Encode()

msg2 := proc.CopyDataMessage{}

msg2.Decode(body[8:])

sz := binary.BigEndian.Uint64(body[:8])

assert.Equal(int(sz), len(body))

assert.Equal(msg.Data, msg2.Data)
assert.Equal(msg.Sz, msg2.Sz)
}
}
8 changes: 4 additions & 4 deletions pkg/proc/put_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
)

type PutMessage struct {
ProtoMessage
Encrypt bool
Name string
}

var _ ProtoMessage = &PutMessage{}

func NewPutMessage(name string, encrypt bool) *PutMessage {
return &PutMessage{
Name: name,
Expand All @@ -20,7 +21,7 @@ func NewPutMessage(name string, encrypt bool) *PutMessage {

func (c *PutMessage) Encode() []byte {
bt := []byte{
byte(MessageTypeCat),
byte(MessageTypePut),
0,
0,
0,
Expand Down Expand Up @@ -54,10 +55,9 @@ func (c *PutMessage) GetPutName(b []byte) string {
return buff.String()
}

func (c *PutMessage) Decode(body []byte) error {
func (c *PutMessage) Decode(body []byte) {
if body[1] == byte(EncryptMessage) {
c.Encrypt = true
}
c.Name = c.GetPutName(body[4:])
return nil
}
2 changes: 1 addition & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error {
objectPath := path.Join(s.cnf.StoragePrefix, name)

up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) {
uploader.PartSize = int64(1 << 20)
uploader.PartSize = int64(1 << 24)
uploader.Concurrency = 1
})

Expand Down

0 comments on commit d0304c9

Please sign in to comment.