Skip to content

Commit

Permalink
Add command for copy files (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
debebantur authored May 7, 2024
1 parent 4bf374b commit 182829f
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 34 deletions.
42 changes: 39 additions & 3 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
)

var cfgPath string
var oldCfgPath string
var logLevel string
var decrypt bool
var encrypt bool
Expand Down Expand Up @@ -51,7 +52,7 @@ var catCmd = &cobra.Command{
return err
}

ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed message")
ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed cat message")

_, err = io.Copy(os.Stdout, con)
if err != nil {
Expand All @@ -62,6 +63,36 @@ var catCmd = &cobra.Command{
},
}

var copyCmd = &cobra.Command{
Use: "copy",
Short: "copy",
RunE: func(cmd *cobra.Command, args []string) error {
ylogger.Zero.Info().Msg("Execute copy command")
err := config.LoadInstanceConfig(cfgPath)
if err != nil {
return err
}
instanceCnf := config.InstanceConfig()

con, err := net.Dial("unix", instanceCnf.SocketPath)
if err != nil {
return err
}

defer con.Close()
ylogger.Zero.Info().Str("name", args[0]).Msg("copy")
msg := message.NewCopyMessage(args[0], oldCfgPath, encrypt, decrypt).Encode()
_, err = con.Write(msg)
if err != nil {
return err
}

ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed copy msg")

return nil
},
}

var putCmd = &cobra.Command{
Use: "put",
Short: "put",
Expand Down Expand Up @@ -90,7 +121,7 @@ var putCmd = &cobra.Command{
return err
}

ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed message")
ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed put message")

const SZ = 65536
chunk := make([]byte, SZ)
Expand Down Expand Up @@ -170,7 +201,7 @@ var listCmd = &cobra.Command{
return err
}

ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed message")
ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed list message")

ycl := client.NewYClient(con)
r := proc.NewProtoReader(ycl)
Expand Down Expand Up @@ -216,6 +247,11 @@ func init() {
catCmd.PersistentFlags().BoolVarP(&decrypt, "decrypt", "d", false, "decrypt external object or not")
rootCmd.AddCommand(catCmd)

copyCmd.PersistentFlags().BoolVarP(&decrypt, "decrypt", "d", false, "decrypt external object or not")
copyCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put")
copyCmd.PersistentFlags().StringVarP(&oldCfgPath, "old-config", "", "/etc/yproxy/yproxy.yaml", "path to old yproxy config file")
rootCmd.AddCommand(copyCmd)

putCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put")
rootCmd.AddCommand(putCmd)

Expand Down
39 changes: 22 additions & 17 deletions config/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,29 @@ func EmbedDefaults(cfgInstance *Instance) {
}
}

func LoadInstanceConfig(cfgPath string) error {
func LoadInstanceConfig(cfgPath string) (err error) {
cfgInstance, err = ReadInstanceConfig(cfgPath)
if err != nil {
return
}

cfgInstance.ReadSystemdSocketPath()
EmbedDefaults(&cfgInstance)

configBytes, err := json.MarshalIndent(cfgInstance, "", " ")
if err != nil {
return
}

log.Println("Running config:", string(configBytes))
return
}

func ReadInstanceConfig(cfgPath string) (Instance, error) {
var cfg Instance
file, err := os.Open(cfgPath)
if err != nil {
cfgInstance = cfg
return err
return cfg, err
}
defer func(file *os.File) {
err := file.Close()
Expand All @@ -81,20 +98,8 @@ func LoadInstanceConfig(cfgPath string) error {
}(file)

if err := initInstanceConfig(file, &cfg); err != nil {
cfgInstance = cfg
return err
return cfg, err
}

configBytes, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
cfgInstance = cfg
return err
}

cfg.ReadSystemdSocketPath()
EmbedDefaults(&cfg)

log.Println("Running config:", string(configBytes))
cfgInstance = cfg
return nil
return cfg, nil
}
73 changes: 73 additions & 0 deletions pkg/message/copy_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package message

import (
"encoding/binary"
"fmt"

"github.com/yezzey-gp/yproxy/pkg/ylogger"
)

type CopyMessage struct {
Decrypt bool
Encrypt bool
Name string
OldCfgPath string
}

var _ ProtoMessage = &CopyMessage{}

func NewCopyMessage(name, oldCfgPath string, encrypt, decrypt bool) *CopyMessage {
return &CopyMessage{
Name: name,
Encrypt: encrypt,
Decrypt: decrypt,
OldCfgPath: oldCfgPath,
}
}

func (message *CopyMessage) Encode() []byte {
encodedMessage := []byte{
byte(MessageTypeCopy),
byte(NoDecryptMessage),
byte(NoEncryptMessage),
0,
}

if message.Decrypt {
encodedMessage[1] = byte(DecryptMessage)
}

if message.Encrypt {
encodedMessage[2] = byte(EncryptMessage)
}

byteName := []byte(message.Name)
byteLen := make([]byte, 8)
binary.BigEndian.PutUint64(byteLen, uint64(len(byteName)))
encodedMessage = append(encodedMessage, byteLen...)
encodedMessage = append(encodedMessage, byteName...)

byteOldCfg := []byte(message.OldCfgPath)
binary.BigEndian.PutUint64(byteLen, uint64(len(byteOldCfg)))
encodedMessage = append(encodedMessage, byteLen...)
encodedMessage = append(encodedMessage, byteOldCfg...)

binary.BigEndian.PutUint64(byteLen, uint64(len(encodedMessage)+8))
fmt.Printf("send: %v\n", MessageType(encodedMessage[0]))
ylogger.Zero.Debug().Str("object-path", MessageType(encodedMessage[0]).String()).Msg("decrypt object")
return append(byteLen, encodedMessage...)
}

func (encodedMessage *CopyMessage) Decode(data []byte) {
if data[1] == byte(DecryptMessage) {
encodedMessage.Decrypt = true
}
if data[2] == byte(EncryptMessage) {
encodedMessage.Encrypt = true
}

nameLen := binary.BigEndian.Uint64(data[4:12])
encodedMessage.Name = string(data[12 : 12+nameLen])
oldConfLen := binary.BigEndian.Uint64(data[12+nameLen : 12+nameLen+8])
encodedMessage.OldCfgPath = string(data[12+nameLen+8 : 12+nameLen+8+oldConfLen])
}
3 changes: 3 additions & 0 deletions pkg/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
MessageTypeList = MessageType(48)
MessageTypeObjectMeta = MessageType(49)
MessageTypePatch = MessageType(50)
MessageTypeCopy = MessageType(51)

DecryptMessage = RequestEncryption(1)
NoDecryptMessage = RequestEncryption(0)
Expand All @@ -45,6 +46,8 @@ func (m MessageType) String() string {
return "LIST"
case MessageTypeObjectMeta:
return "OBJECT META"
case MessageTypeCopy:
return "COPY"
}
return "UNKNOWN"
}
17 changes: 17 additions & 0 deletions pkg/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,20 @@ func TestListMsg(t *testing.T) {
assert.Equal(msg.Prefix, msg2.Prefix)
}
}

func TestCopyMsg(t *testing.T) {
assert := assert.New(t)

msg := message.NewCopyMessage("myname/mynextname", "myoldcfg/path", true, true)
body := msg.Encode()

assert.Equal(body[8], byte(message.MessageTypeCopy))

msg2 := message.CopyMessage{}
msg2.Decode(body[8:])

assert.Equal("myname/mynextname", msg2.Name)
assert.Equal("myoldcfg/path", msg2.OldCfgPath)
assert.True(msg2.Decrypt)
assert.True(msg2.Encrypt)
}
115 changes: 114 additions & 1 deletion pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package proc
import (
"fmt"
"io"
"strings"
"sync"

"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/crypt"
"github.com/yezzey-gp/yproxy/pkg/message"
Expand Down Expand Up @@ -48,10 +50,11 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
return err
}
}
_, err = io.Copy(ycl.Conn, contentReader)
n, err := io.Copy(ycl.Conn, contentReader)
if err != nil {
_ = ycl.ReplyError(err, "copy failed to complete")
}
ylogger.Zero.Debug().Int64("copied bytes", n).Msg("decrypt object")

case message.MessageTypePut:

Expand Down Expand Up @@ -169,7 +172,117 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl *client.YClient
return nil
}

case message.MessageTypeCopy:
msg := message.CopyMessage{}
msg.Decode(body)

//get config for old bucket
instanceCnf, err := config.ReadInstanceConfig(msg.OldCfgPath)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not read old config: %s", err), "failed to compelete request")
return nil
}
config.EmbedDefaults(&instanceCnf)
oldStorage := storage.NewStorage(&instanceCnf.StorageCnf)
fmt.Printf("ok new conf: %v\n", instanceCnf)

//list objects
objectMetas, err := oldStorage.ListPath(msg.Name)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request")
return nil
}

var failed []*storage.S3ObjectMeta
retryCount := 0
for len(objectMetas) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(objectMetas); i++ {
path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix)

//get reader
yr := NewYRetryReader(NewRestartReader(oldStorage, path))
var fromReader io.Reader
fromReader = yr
defer yr.Close()

if msg.Decrypt {
fromReader, err = cr.Decrypt(yr)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to decrypt object")
failed = append(failed, objectMetas[i])
continue
}
}

//reencrypt
r, w := io.Pipe()

go func() {
defer func() {
if err := w.Close(); err != nil {
ylogger.Zero.Warn().Err(err).Msg("failed to close writer")
}
}()

var ww io.WriteCloser = w

if msg.Encrypt {
var err error
ww, err = cr.Encrypt(w)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to encrypt object")
failed = append(failed, objectMetas[i])
return
}
}

if _, err := io.Copy(ww, fromReader); err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to copy data")
failed = append(failed, objectMetas[i])
return
}

if err := ww.Close(); err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to close writer")
failed = append(failed, objectMetas[i])
return
}
}()

//write file
err = s.PutFileToDest(path, r)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to upload file")
failed = append(failed, objectMetas[i])
continue
}
}
objectMetas = failed
fmt.Printf("failed files count: %d\n", len(objectMetas))
failed = make([]*storage.S3ObjectMeta, 0)
}

if _, err = ycl.Conn.Write(message.NewReadyForQueryMessage().Encode()); err != nil {
_ = ycl.ReplyError(err, "failed to upload")
return nil
}

if len(objectMetas) > 0 {
fmt.Printf("failed files count: %d\n", len(objectMetas))
fmt.Printf("failed files: %v\n", objectMetas)
ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("failed to upload some files")
ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to upload some files")

_ = ycl.ReplyError(err, "failed to copy some files")
return nil
} else {
fmt.Println("Copy finished successfully")
ylogger.Zero.Info().Msg("Copy finished successfully")
}

default:
ylogger.Zero.Error().Any("type", tp).Msg("what tip is it")
_ = ycl.ReplyError(nil, "wrong request type")

return nil
Expand Down
Loading

0 comments on commit 182829f

Please sign in to comment.