Skip to content

Commit

Permalink
file transfer fully functional
Browse files Browse the repository at this point in the history
Signed-off-by: Azanul <[email protected]>
  • Loading branch information
Azanul committed Jul 25, 2023
1 parent 02b3f5c commit c71d73c
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 13 deletions.
15 changes: 11 additions & 4 deletions oldNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,22 @@ func receiveFile(ctx context.Context, nodeName string) {
// Create a buffer stream for non blocking read and write.
rw := bufio.NewReader(stream)

f, err := os.Create("nodes/saveFilePath.txt")
generatedName := fmt.Sprintf("nodes/%s.part", util.RandString(10))
f, err := os.Create(generatedName)
if err != nil {
log.Println(err)
return
}

go util.StreamToFile(rw, f)

// 'stream' will stay open until you close it (or the other side closes it).
receivedName := util.StreamToFile(rw, f)
log.Println(receivedName)
if receivedName != "" {
receivedName = filepath.Base(receivedName)
err = os.Rename(generatedName, "nodes/"+receivedName)
if err != nil {
log.Println(err)
}
}
})

f, err := os.Open(nodeDir)
Expand Down
87 changes: 78 additions & 9 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package util

import (
"bufio"
"encoding/binary"
"fmt"
"io"
"log"
"math/rand"
"os"
"time"
"unsafe"

"github.com/Azanul/peer-pressure/pkg/pressure/pb"
"google.golang.org/protobuf/proto"
Expand All @@ -32,28 +36,50 @@ func AppendStringToFile(path string, content string) {
}
}

func StreamToFile(rw *bufio.Reader, file *os.File) {
func StreamToFile(rw *bufio.Reader, file *os.File) (filename string) {
writer := bufio.NewWriter(file)
lenBytes := make([]byte, 4)
for {
str, err := io.ReadAll(rw)
if err == io.EOF || len(str) == 0 {
n, err := rw.Read(lenBytes)
if err == io.EOF || n == 0 {
log.Printf("%s done writing", file.Name())
break
} else if err != nil {
fmt.Println("Error reading from buffer")
panic(err)
}
messageSize := binary.BigEndian.Uint32(lenBytes)

str := make([]byte, messageSize)
_, err = io.ReadFull(rw, str)
if err != nil {
fmt.Println("Error reading from buffer")
panic(err)
}
log.Println(n, len(str), str)

chunk := pb.Chunk{}
err = proto.Unmarshal(str, &chunk)
if err != nil {
log.Println("Error unmarshaling proto chunk")
panic(err)
}

log.Println(str)
log.Println(chunk.Data)
log.Println(*chunk.Len)
log.Println(*chunk.Filename)
log.Println()
_, err = writer.Write(chunk.Data)
if chunk.Filename != nil {
filename = *chunk.Filename
}

var data []byte
if chunk.Len == nil {
data = chunk.Data
} else {
data = chunk.Data[:*chunk.Len]
}
_, err = writer.Write(data)
if err != nil {
log.Println(err)
}
Expand All @@ -62,14 +88,17 @@ func StreamToFile(rw *bufio.Reader, file *os.File) {
if err != nil {
log.Panic(err)
}
return
}

func FileToStream(rw *bufio.Writer, file *os.File) {
data := make([]byte, chunkSize)
filename := file.Name()

var partNum int32 = 0
for {
_, err := file.Read(data)
n, err := file.Read(data)
lenData := int32(n)
if err == io.EOF {
break
} else if err != nil {
Expand All @@ -79,24 +108,64 @@ func FileToStream(rw *bufio.Writer, file *os.File) {

partNum++
chunk := &pb.Chunk{
Index: partNum,
Data: data,
Index: partNum,
Data: data,
Filename: &filename,
Len: &lenData,
}
sendData, err := proto.Marshal(chunk)
if err != nil {
log.Println("Error marshaling proto chunk")
panic(err)
}

messageSize := uint32(len(sendData))
messageSizeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(messageSizeBytes, messageSize)

_, err = rw.Write(messageSizeBytes)
if err != nil {
log.Println("Error writing size prefix to buffer")
panic(err)
}

_, err = rw.Write(sendData)
if err != nil {
log.Println("Error writing to buffer")
panic(err)
}
log.Println(rw.Available())
log.Println(lenData, len(sendData), sendData)
err = rw.Flush()
if err != nil {
log.Println("Error flushing buffer")
panic(err)
}
}
}

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)

var src = rand.NewSource(time.Now().UnixNano())

func RandString(n int) string {
b := make([]byte, n)
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}

return *(*string)(unsafe.Pointer(&b))
}

0 comments on commit c71d73c

Please sign in to comment.