Skip to content

Commit

Permalink
Add offset to cat message
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Jun 7, 2024
1 parent 0130970 commit 536eaa2
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 21 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ build:
####################### TESTS #######################

unittest:
go test -race ./pkg/message/...

go test -race ./pkg/message/... ./pkg/proc/...

mockgen:
mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock
Expand Down
4 changes: 3 additions & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var oldCfgPath string
var logLevel string
var decrypt bool
var encrypt bool
var offset uint64

var rootCmd = &cobra.Command{
Use: "",
Expand All @@ -46,7 +47,7 @@ var catCmd = &cobra.Command{
}

defer con.Close()
msg := message.NewCatMessage(args[0], decrypt).Encode()
msg := message.NewCatMessage(args[0], decrypt, offset).Encode()
_, err = con.Write(msg)
if err != nil {
return err
Expand Down Expand Up @@ -258,6 +259,7 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", "", "log level")

catCmd.PersistentFlags().BoolVarP(&decrypt, "decrypt", "d", false, "decrypt external object or not")
catCmd.PersistentFlags().Uint64VarP(&offset, "offset", "o", 0, "start offset for read")
rootCmd.AddCommand(catCmd)

copyCmd.PersistentFlags().BoolVarP(&decrypt, "decrypt", "d", false, "decrypt external object or not")
Expand Down
22 changes: 17 additions & 5 deletions pkg/message/cat_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
)

type CatMessage struct {
Decrypt bool
Name string
Decrypt bool
Name string
StartOffset uint64
}

var _ ProtoMessage = &CatMessage{}

func NewCatMessage(name string, decrypt bool) *CatMessage {
func NewCatMessage(name string, decrypt bool, StartOffset uint64) *CatMessage {
return &CatMessage{
Name: name,
Decrypt: decrypt,
Name: name,
Decrypt: decrypt,
StartOffset: StartOffset,
}
}

Expand All @@ -33,8 +35,15 @@ func (c *CatMessage) Encode() []byte {
bt[1] = byte(NoDecryptMessage)
}

if c.StartOffset != 0 {
bt[2] = byte(ExtendedMesssage)
}

bt = append(bt, []byte(c.Name)...)
bt = append(bt, 0)
if c.StartOffset != 0 {
bt = binary.BigEndian.AppendUint64(bt, c.StartOffset)
}
ln := len(bt) + 8

bs := make([]byte, 8)
Expand All @@ -47,6 +56,9 @@ func (c *CatMessage) Decode(body []byte) {
if body[1] == byte(DecryptMessage) {
c.Decrypt = true
}
if body[2] == byte(ExtendedMesssage) {
c.StartOffset = binary.BigEndian.Uint64(body[4+len(c.Name)+1:])
}
}

func (c *CatMessage) GetCatName(b []byte) string {
Expand Down
2 changes: 2 additions & 0 deletions pkg/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (

EncryptMessage = RequestEncryption(1)
NoEncryptMessage = RequestEncryption(0)

ExtendedMesssage = byte(1)
)

func (m MessageType) String() string {
Expand Down
10 changes: 9 additions & 1 deletion pkg/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,26 @@ func TestCatMsg(t *testing.T) {
type tcase struct {
name string
decrypt bool
off uint64
err error
}

for _, tt := range []tcase{
{
"nam1",
true,
0,
nil,
},
{
"nam1",
true,
10,
nil,
},
} {

msg := message.NewCatMessage(tt.name, tt.decrypt)
msg := message.NewCatMessage(tt.name, tt.decrypt, tt.off)
body := msg.Encode()

msg2 := message.CatMessage{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
msg := message.CatMessage{}
msg.Decode(body)

yr := NewYRetryReader(NewRestartReader(s, msg.Name))
yr := NewYRetryReader(NewRestartReader(s, msg.Name), msg.StartOffset)

var contentReader io.Reader
contentReader = yr
Expand Down
16 changes: 8 additions & 8 deletions pkg/proc/yrreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type YproxyRetryReader struct {
io.ReadCloser
underlying RestartReader

bytesWrite int64
offsetReached int64
retryLimit int
needReacquire bool
}
Expand All @@ -79,12 +79,12 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) {

if y.needReacquire {

err := y.underlying.Restart(y.bytesWrite)
err := y.underlying.Restart(y.offsetReached)

if err != nil {
// log error and continue.
// Try to mitigate overload problems with random sleep
ylogger.Zero.Error().Err(err).Int("offset reached", int(y.bytesWrite)).Int("retry count", int(retry)).Msg("failed to reacquire external storage connection, wait and retry")
ylogger.Zero.Error().Err(err).Int("offset reached", int(y.offsetReached)).Int("retry count", int(retry)).Msg("failed to reacquire external storage connection, wait and retry")

time.Sleep(time.Second)
continue
Expand All @@ -98,7 +98,7 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) {
return n, err
}
if err != nil || n < 0 {
ylogger.Zero.Error().Err(err).Int("offset reached", int(y.bytesWrite)).Int("retry count", int(retry)).Msg("encounter read error")
ylogger.Zero.Error().Err(err).Int("offset reached", int(y.offsetReached)).Int("retry count", int(retry)).Msg("encounter read error")

// what if close failed?
_ = y.underlying.Close()
Expand All @@ -109,7 +109,7 @@ func (y *YproxyRetryReader) Read(p []byte) (int, error) {
y.needReacquire = true
continue
} else {
y.bytesWrite += int64(n)
y.offsetReached += int64(n)

return n, err
}
Expand All @@ -121,12 +121,12 @@ const (
defaultRetryLimit = 100
)

func NewYRetryReader(r RestartReader) io.ReadCloser {
func NewYRetryReader(r RestartReader, initOffset uint64) io.ReadCloser {
return &YproxyRetryReader{
underlying: r,
retryLimit: defaultRetryLimit,
bytesWrite: 0,
needReacquire: true,
offsetReached: int64(initOffset),
needReacquire: true, /* do initial storage request */
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/proc/yrreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestYproxyRetryReaderEmpty(t *testing.T) {

rr := mock.NewMockRestartReader(ctrl)

yr := proc.NewYRetryReader(rr)
yr := proc.NewYRetryReader(rr, 0)

buf := []byte{1, 233, 45}

Expand All @@ -38,7 +38,7 @@ func TestYproxyRetryReaderSimpleRead(t *testing.T) {

rr := mock.NewMockRestartReader(ctrl)

yr := proc.NewYRetryReader(rr)
yr := proc.NewYRetryReader(rr, 0)

buf := []byte{0, 0, 0}

Expand Down Expand Up @@ -74,7 +74,7 @@ func TestYproxyRetryReaderSimpleReadRetry(t *testing.T) {

rr := mock.NewMockRestartReader(ctrl)

yr := proc.NewYRetryReader(rr)
yr := proc.NewYRetryReader(rr, 0)

buf := []byte{0, 0, 0}

Expand Down

0 comments on commit 536eaa2

Please sign in to comment.