From 57c3505d17fb5ff6dacb126564123198976dc8a3 Mon Sep 17 00:00:00 2001 From: leandro-driguez Date: Sun, 11 Jun 2023 17:12:29 -0400 Subject: [PATCH] fix(net): fix joinNetwork and bootstrap to avoid that different subnets collapse --- cmd/main.go | 10 +-- core/dht.go | 13 ++-- core/fullNode.go | 155 +++++++++++++++++++++++++--------------- main.go | 8 +-- structs/node.go | 6 +- structs/routingTable.go | 14 +++- structs/shortList.go | 4 +- structs/storage.go | 9 ++- utils/cli.go | 2 +- 9 files changed, 132 insertions(+), 89 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 02f4eb0..d413a80 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -62,16 +62,16 @@ func main() { defer cancel() sender, err := client.Store(ctx) if err != nil { - fmt.Println(err.Error()) + //fmt.Println(err.Error()) } keyHash := utils.GetSha1Hash(data) dataBytes := []byte(data) - //fmt.Println("data bytes", dataBytes) + ////fmt.Println("data bytes", dataBytes) err = sender.Send(&pb.StoreData{Key: keyHash, Value: &pb.Data{Init: 0, End: int64(len(dataBytes)), Buffer: dataBytes}}) if err != nil { - fmt.Println(err.Error()) + //fmt.Println(err.Error()) } - fmt.Println("Stored ID: ", keyHash, "Stored Data:", string(dataBytes)) + //fmt.Println("Stored ID: ", keyHash, "Stored Data:", string(dataBytes)) case "ping": if len(input) != 5 { @@ -204,7 +204,7 @@ func main() { idSender, _ := utils.NewID(bootIp, bootPort) pbNode, err := client.Ping(ctx, &pb.Node{ID: idSender, IP: bootIp, Port: int32(bootPort)}) if err != nil { - fmt.Println(err) + //fmt.Println(err) } fmt.Println("Made Ping from ", bootIp, ":", bootPort, "to", pbNode.IP, ":", pbNode.Port) nearestNodes, _ := fullNodeServer.LookUp(target) diff --git a/core/dht.go b/core/dht.go index 0df18a2..c312489 100644 --- a/core/dht.go +++ b/core/dht.go @@ -2,7 +2,6 @@ package core import ( "bytes" - "fmt" "github.com/science-engineering-art/kademlia-grpc/interfaces" "github.com/science-engineering-art/kademlia-grpc/structs" @@ -15,14 +14,14 @@ type DHT struct { } func (fn *DHT) Store(key []byte, data *[]byte) error { - fmt.Printf("INIT DHT.Store(%v) len(*data)=%d\n", key, len(*data)) - defer fmt.Printf("END DHT.Store(%v)\n", key) + //fmt.Printf("INIT DHT.Store(%v) len(*data)=%d\n", key, len(*data)) + // defer //fmt.Printf("END DHT.Store(%v)\n", key) - fmt.Println("Before Storage.Create()") + //fmt.Println("Before Storage.Create()") err := fn.Storage.Create(key, data) - fmt.Println("After Storage.Create()") + //fmt.Println("After Storage.Create()") if err != nil { - fmt.Println("ERROR line:23 DHT.Storage.Create()") + //fmt.Println("ERROR line:23 DHT.Storage.Create()") return err } return nil @@ -31,7 +30,7 @@ func (fn *DHT) Store(key []byte, data *[]byte) error { func (fn *DHT) FindValue(infoHash *[]byte, start int64, end int64) (value *[]byte, neighbors *[]structs.Node) { value, err := fn.Storage.Read(*infoHash, start, end) if err != nil { - //fmt.Println("Find Value error: ", err) + ////fmt.Println("Find Value error: ", err) neighbors = fn.RoutingTable.GetClosestContacts(structs.Alpha, *infoHash, []*structs.Node{&fn.Node}).Nodes return nil, neighbors } diff --git a/core/fullNode.go b/core/fullNode.go index 0bdb7b3..fcaae73 100644 --- a/core/fullNode.go +++ b/core/fullNode.go @@ -34,14 +34,15 @@ func NewFullNode(nodeIP string, nodePort, bootstrapPort int, storage interfaces. dht := DHT{Node: node, RoutingTable: routingTable, Storage: storage} fullNode := FullNode{dht: &dht} - // go func() { - // for { - // <-time.After(10 * time.Second) - // fmt.Println("\nROUTING TABLE:") - // fullNode.PrintRoutingTable() - // fmt.Printf("\n") - // } - // }() + go func() { + for { + <-time.After(10 * time.Second) + fmt.Println("\nROUTING TABLE:") + fmt.Printf("ME: %v\n\n", fullNode.dht.Node) + fullNode.PrintRoutingTable() + fmt.Printf("\n") + } + }() fullNode.joinNetwork(bootstrapPort) @@ -99,8 +100,8 @@ func (fn *FullNode) Ping(ctx context.Context, sender *pb.Node) (*pb.Node, error) } func (fn *FullNode) Store(stream pb.FullNode_StoreServer) error { - fmt.Printf("INIT FullNode.Store()\n\n") - defer fmt.Printf("END FullNode.Store()\n\n") + //fmt.Printf("INIT FullNode.Store()\n\n") + // defer //fmt.Printf("END FullNode.Store()\n\n") key := []byte{} buffer := []byte{} @@ -109,16 +110,16 @@ func (fn *FullNode) Store(stream pb.FullNode_StoreServer) error { for { data, err := stream.Recv() if data == nil { - fmt.Printf("END Streaming\n\n") + //fmt.Printf("END Streaming\n\n") break } if err != nil { - fmt.Printf("EXIT line:133 Store() method\n\n") + //fmt.Printf("EXIT line:133 Store() method\n\n") return errors.New("missing chunck") } if init == 0 { - fmt.Printf("INIT Streaming\n\n") + //fmt.Printf("INIT Streaming\n\n") // add the sender to the Routing Table sender := structs.Node{ ID: data.Sender.ID, @@ -133,16 +134,16 @@ func (fn *FullNode) Store(stream pb.FullNode_StoreServer) error { buffer = append(buffer, data.Value.Buffer...) init = data.Value.End } else { - fmt.Printf("ERROR missing chunck\n\n") + //fmt.Printf("ERROR missing chunck\n\n") return err } - fmt.Printf("OKKKK ===> FullNode(%s).Recv(%d, %d)\n", fn.dht.IP, data.Value.Init, data.Value.End) + //fmt.Printf("OKKKK ===> FullNode(%s).Recv(%d, %d)\n", fn.dht.IP, data.Value.Init, data.Value.End) } - // fmt.Println("Received Data:", buffer) + // //fmt.Println("Received Data:", buffer) err := fn.dht.Store(key, &buffer) if err != nil { - fmt.Printf("ERROR line:140 DHT.Store()\n\n") + //fmt.Printf("ERROR line:140 DHT.Store()\n\n") return err } return nil @@ -184,7 +185,7 @@ func (fn *FullNode) FindValue(target *pb.Target, stream pb.FullNode_FindValueSer }, } } else if value != nil && neighbors == nil { - fmt.Println("Value from FindValue:", value) + //fmt.Println("Value from FindValue:", value) response = pb.FindValueResponse{ KNeartestBuckets: &pb.KBucket{Bucket: []*pb.Node{}}, Value: &pb.Data{ @@ -250,9 +251,9 @@ func (fn *FullNode) LookUp(target []byte) ([]structs.Node, error) { } sl.Append(kBucket) } - // fmt.Println("Before timeout") + // //fmt.Println("Before timeout") // <-time.After(10 * time.Second) - // fmt.Println("After timeout") + // //fmt.Println("After timeout") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -268,7 +269,7 @@ func (fn *FullNode) LookUp(target []byte) ([]structs.Node, error) { }, ) if err != nil && err.Error() == "rpc error: code = DeadlineExceeded desc = context deadline exceeded" { - fmt.Println("Crash connection") + //fmt.Println("Crash connection") sl.RemoveNode(&node) continue } @@ -282,7 +283,7 @@ func (fn *FullNode) LookUp(target []byte) ([]structs.Node, error) { sort.Sort(sl) if addedNodes == 0 { - fmt.Println("0 added nodes") + //fmt.Println("0 added nodes") break } } @@ -293,7 +294,7 @@ func (fn *FullNode) LookUp(target []byte) ([]structs.Node, error) { if i == structs.K { break } - fmt.Println("append node", node.IP) + //fmt.Println("append node", node.IP) kBucket = append(kBucket, structs.Node{ ID: node.ID, IP: node.IP, @@ -304,13 +305,13 @@ func (fn *FullNode) LookUp(target []byte) ([]structs.Node, error) { } func (fn *FullNode) StoreValue(key string, data *[]byte) (string, error) { - fmt.Printf("INIT FullNode.StoreValue(%s) method\n\n", key) - defer fmt.Printf("EXIT FullNode.StoreValue(%s) method\n\n", key) + //fmt.Printf("INIT FullNode.StoreValue(%s) method\n\n", key) + // defer //fmt.Printf("EXIT FullNode.StoreValue(%s) method\n\n", key) keyHash := base58.Decode(key) nearestNeighbors, err := fn.LookUp(keyHash) if err != nil { - fmt.Printf("ERROR LookUP() method\n\n") + //fmt.Printf("ERROR LookUP() method\n\n") return "", err } @@ -337,15 +338,15 @@ func (fn *FullNode) StoreValue(key string, data *[]byte) (string, error) { sender, err := client.Store(ctx) if err != nil { - fmt.Printf("ERROR Store(%v, %d) method", node.IP, node.Port) + //fmt.Printf("ERROR Store(%v, %d) method", node.IP, node.Port) if ctx.Err() == context.DeadlineExceeded { // Handle timeout error - fmt.Println("Timeout exceeded") + //fmt.Println("Timeout exceeded") continue } - fmt.Println(err.Error()) + //fmt.Println(err.Error()) } - // fmt.Println("data bytes", dataBytes) + // //fmt.Println("data bytes", dataBytes) for i := 0; i < len(*data); i += 1024 { j := int(math.Min(float64(i+1024), float64(len(*data)))) @@ -366,17 +367,17 @@ func (fn *FullNode) StoreValue(key string, data *[]byte) (string, error) { }, ) if err != nil { - fmt.Printf("ERROR SendChunck(0, %d) method\n\n", len(*data)) + //fmt.Printf("ERROR SendChunck(0, %d) method\n\n", len(*data)) break // return "", err } - fmt.Printf("OKKKK ===> FullNode(%s).Send(%d, %d)\n", fn.dht.IP, i, j) + //fmt.Printf("OKKKK ===> FullNode(%s).Send(%d, %d)\n", fn.dht.IP, i, j) } } - // fmt.Println("Stored ID: ", key, "Stored Data:", data) - fmt.Println("===> OKKKK") + // //fmt.Println("Stored ID: ", key, "Stored Data:", data) + //fmt.Println("===> OKKKK") return key, nil } @@ -392,12 +393,12 @@ func (fn *FullNode) GetValue(target string, start int64, end int64) ([]byte, err if err != nil { return nil, nil } - //fmt.Println(nearestNeighbors) + ////fmt.Println(nearestNeighbors) buffer := []byte{} for _, node := range nearestNeighbors { if len(target) == 0 { - fmt.Println("Invalid target decoding.") + //fmt.Println("Invalid target decoding.") continue } @@ -409,13 +410,13 @@ func (fn *FullNode) GetValue(target string, start int64, end int64) ([]byte, err return } clientChnn <- client.FullNodeClient - fmt.Println("Channel value is: ", clientChnn) + //fmt.Println("Channel value is: ", clientChnn) }() - fmt.Println("Init Select-Case") + //fmt.Println("Init Select-Case") select { case <-time.After(5 * time.Second): - fmt.Println("Timeout") + //fmt.Println("Timeout") continue case client := <-clientChnn: ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) @@ -425,7 +426,7 @@ func (fn *FullNode) GetValue(target string, start int64, end int64) ([]byte, err continue } - fmt.Println("Init FindValue") + //fmt.Println("Init FindValue") receiver, err := client.FindValue(ctx, &pb.Target{ ID: keyHash, @@ -441,14 +442,14 @@ func (fn *FullNode) GetValue(target string, start int64, end int64) ([]byte, err if err != nil || receiver == nil { continue } - fmt.Println("End FindValue") + //fmt.Println("End FindValue") if err != nil { if ctx.Err() == context.DeadlineExceeded { // Handle timeout error - // fmt.Println("Timeout exceeded") + // //fmt.Println("Timeout exceeded") continue } - fmt.Println(err.Error()) + //fmt.Println(err.Error()) continue } var init int64 = 0 @@ -456,7 +457,7 @@ func (fn *FullNode) GetValue(target string, start int64, end int64) ([]byte, err for { data, err := receiver.Recv() if err != nil { - fmt.Println(err.Error()) + //fmt.Println(err.Error()) break } if data == nil { @@ -466,7 +467,7 @@ func (fn *FullNode) GetValue(target string, start int64, end int64) ([]byte, err init = data.Value.End } } - //fmt.Println("Received value from STREAMING in GetValue():", buffer) + ////fmt.Println("Received value from STREAMING in GetValue():", buffer) // Received data if len(buffer) > 0 { goto RETURN @@ -490,25 +491,35 @@ func (fn *FullNode) bootstrap(port int) { addr, err := net.ResolveUDPAddr("udp4", strAddr) if err != nil { + fmt.Println("line:494") log.Fatal(err) } conn, err := net.ListenUDP("udp4", addr) if err != nil { + fmt.Println("line:500") log.Fatal(err) } + defer conn.Close() buffer := make([]byte, 1024) - defer conn.Close() for { _, rAddr, err := conn.ReadFrom(buffer) if err != nil { + fmt.Println("line:510") log.Fatal(err) } - // fmt.Printf("Received %d bytes from %v\n", n, rAddr) + // Check if node are in the same port + portInt := binary.LittleEndian.Uint32(buffer[20:24]) + if int(portInt) != fn.dht.Port { + fmt.Println("UDP Message with != Ports") + continue + } + host, _, _ := net.SplitHostPort(rAddr.String()) + rAddr = &net.TCPAddr{IP: net.ParseIP(host), Port: port + 1} - go func(rAddr net.Addr) { + go func(rAddr net.Addr, buffer []byte) { connChan := make(chan net.Conn) go func() { @@ -521,6 +532,7 @@ func (fn *FullNode) bootstrap(port int) { kBucket, err := fn.LookUp(buffer[:20]) if err != nil { + fmt.Println("line:529") log.Fatal(err) } kBucket = append(kBucket, fn.dht.Node) @@ -528,6 +540,9 @@ func (fn *FullNode) bootstrap(port int) { //Convert port from byte to int portInt := binary.LittleEndian.Uint32(buffer[20:24]) intVal := int(portInt) + if intVal != fn.dht.Port { + return + } host, _, _ := net.SplitHostPort(rAddr.String()) id, _ := utils.NewID(host, intVal) @@ -535,29 +550,32 @@ func (fn *FullNode) bootstrap(port int) { bytesKBucket, err := utils.SerializeMessage(&kBucket) if err != nil { + fmt.Println("line:544") log.Fatal(err) } respConn := <-connChan respConn.Write(*bytesKBucket) - }(rAddr) + }(rAddr, buffer) } } func (fn *FullNode) joinNetwork(boostrapPort int) { - raddr := net.UDPAddr{ + broadcastAddr := net.UDPAddr{ IP: net.IPv4(255, 255, 255, 255), Port: boostrapPort, } fmt.Println("In Dial UDP") - conn, err := net.DialUDP("udp4", nil, &raddr) + conn, err := net.DialUDP("udp4", nil, &broadcastAddr) if err != nil { + fmt.Println("line:558") log.Fatal(err) } - host, port, err := net.SplitHostPort(conn.LocalAddr().String()) + host, _, err := net.SplitHostPort(conn.LocalAddr().String()) if err != nil { + fmt.Println("line:564") log.Fatal(err) } @@ -571,20 +589,24 @@ func (fn *FullNode) joinNetwork(boostrapPort int) { } conn.Close() - address := fmt.Sprintf("%s:%s", host, port) + address := fmt.Sprintf("%s:%d", host, boostrapPort+1) tcpAddr, err := net.ResolveTCPAddr("tcp", address) - for err != nil { + if err != nil { + fmt.Println("line:579") log.Fatal(err) } - fmt.Println("In Listen TCP") + fmt.Println("In Listen TCP", tcpAddr) lis, err := net.ListenTCP("tcp", tcpAddr) if err != nil { + fmt.Println("line:586") log.Fatal(err) } + // ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) // defer cancel() +RECONNECT: connChannel := make(chan net.Conn) go func() { @@ -594,25 +616,37 @@ func (fn *FullNode) joinNetwork(boostrapPort int) { select { case <-time.After(5 * time.Second): - fmt.Println("timeout reached") - return + lis.Close() + break + // go func() { + // time.After(5 * time.Minute) + // go fn.joinNetwork(boostrapPort) + // }() case tcpConn := <-connChannel: - kBucket, err := utils.DeserializeMessage(tcpConn) if err != nil { + fmt.Println("line:604") log.Fatal(err) } - fmt.Println("In Deserialize Messages") + fmt.Println("line:610 In Deserialize Messages") for i := 0; i < len(*kBucket); i++ { node := (*kBucket)[i] + if node.Port != fn.dht.Port { + goto RECONNECT + } + + // fmt.Println("Before NewClient") client := NewClientNode(node.IP, node.Port) + // fmt.Println("After NewClient") if client == nil { continue } + // fmt.Println("Before Ping") recvNode, err := client.Ping(fn.dht.Node) + // fmt.Println("After Ping") if err != nil { log.Fatal(err) } @@ -620,6 +654,9 @@ func (fn *FullNode) joinNetwork(boostrapPort int) { if recvNode.Equal(node) { fn.dht.RoutingTable.AddNode(node) } else { + fmt.Println("Recv Node", *recvNode) + fmt.Println("Node", node) + fmt.Println("Me", fn.dht.Node) log.Fatal(errors.New("bad ping")) } } diff --git a/main.go b/main.go index f825c01..263af3e 100644 --- a/main.go +++ b/main.go @@ -48,7 +48,7 @@ func main() { fullNode = *core.NewFullNode(ip, port, bPort, storage, isB) go fullNode.CreateGRPCServer(grpcServerAddress) - fmt.Println("Node running at:", ip, ":", port) + //fmt.Println("Node running at:", ip, ":", port) case "store": if len(input) != 3 { @@ -73,12 +73,12 @@ func main() { value, err := fullNode.GetValue(key, int64(start), int64(end)) if err != nil { - fmt.Println(err.Error()) + //fmt.Println(err.Error()) } if value == nil || (value != nil && len(value) == 0) { - fmt.Println("There is no value in the network for that key") + //fmt.Println("There is no value in the network for that key") } else { - fmt.Println("The retrived value is:", string(value)) + //fmt.Println("The retrived value is:", string(value)) } case "dht": fullNode.PrintRoutingTable() diff --git a/structs/node.go b/structs/node.go index 02cc490..17a3a05 100644 --- a/structs/node.go +++ b/structs/node.go @@ -1,7 +1,6 @@ package structs import ( - "bytes" "strings" ) @@ -11,8 +10,7 @@ type Node struct { Port int `json:"port,omitempty"` } -func (b Node) Equal(other Node) bool { - return bytes.Equal(b.ID, other.ID) && - strings.EqualFold(b.IP, other.IP) && +func (b *Node) Equal(other Node) bool { + return strings.EqualFold(b.IP, other.IP) && b.Port == other.Port } diff --git a/structs/routingTable.go b/structs/routingTable.go index a2b880d..cc19afa 100644 --- a/structs/routingTable.go +++ b/structs/routingTable.go @@ -3,6 +3,7 @@ package structs import ( "bytes" "context" + "errors" "fmt" "sort" "sync" @@ -72,6 +73,15 @@ func (rt *RoutingTable) AddNode(b Node) error { rt.mutex.Lock() defer rt.mutex.Unlock() + if b.Equal(rt.NodeInfo) { + return errors.New("you cannot add yourself into the Routing Table") + } + + if b.Port != rt.NodeInfo.Port { + err := fmt.Sprintf("you cannot add a node with port %d", b.Port) + return errors.New(err) + } + // get the correspondient bucket bIndex := getBucketIndex(rt.NodeInfo.ID, b.ID) bucket := rt.KBuckets[bIndex] @@ -94,7 +104,7 @@ func (rt *RoutingTable) AddNode(b Node) error { RETURN: rt.KBuckets[bIndex] = bucket - //fmt.Println(rt.KBuckets) + ////fmt.Println(rt.KBuckets) return nil } @@ -137,7 +147,7 @@ func (rt *RoutingTable) GetClosestContacts(num int, target []byte, ignoredNodes defer rt.mutex.Unlock() // First we need to build the list of adjacent indices to our target // in order - //fmt.Println(rt.NodeInfo.ID, target) + ////fmt.Println(rt.NodeInfo.ID, target) index := getBucketIndex(rt.NodeInfo.ID, target) indexList := []int{index} i := index - 1 diff --git a/structs/shortList.go b/structs/shortList.go index 0fed248..84be3fb 100644 --- a/structs/shortList.go +++ b/structs/shortList.go @@ -46,10 +46,10 @@ func (n *ShortList) RemoveNode(node *Node) { } func (n *ShortList) Append(nodes []*Node) { - //fmt.Println(nodes) + ////fmt.Println(nodes) for _, vv := range nodes { exists := false - //fmt.Println(*n.Nodes) + ////fmt.Println(*n.Nodes) for _, v := range *n.Nodes { if bytes.Equal(v.ID, vv.ID) { exists = true diff --git a/structs/storage.go b/structs/storage.go index f42634b..1315da6 100644 --- a/structs/storage.go +++ b/structs/storage.go @@ -2,7 +2,6 @@ package structs import ( "errors" - "fmt" "github.com/jbenet/go-base58" ) @@ -18,16 +17,16 @@ func NewStorage() *Storage { } func (s *Storage) Create(key []byte, data *[]byte) error { - fmt.Println("INTO Create Method:", key) + //fmt.Println("INTO Create Method:", key) id := string(key) - fmt.Println("The id is:", id) + //fmt.Println("The id is:", id) _, exists := s.KV[id] if exists { return errors.New("the key already exists") } s.KV[id] = data - fmt.Println("The stored value in KV is: ", s.KV[id]) + //fmt.Println("The stored value in KV is: ", s.KV[id]) return nil } @@ -40,7 +39,7 @@ func (s *Storage) Read(key []byte, start int64, end int64) (*[]byte, error) { end = int64(len(*v)) } result := (*v)[start:end] - fmt.Println("Result is: ", result) + //fmt.Println("Result is: ", result) return &result, nil } diff --git a/utils/cli.go b/utils/cli.go index 4d6f8f3..a79cc50 100644 --- a/utils/cli.go +++ b/utils/cli.go @@ -12,7 +12,7 @@ func GetIpFromHost() string { cmd.Stdout = &out err := cmd.Run() if err != nil { - fmt.Println("Error running docker inspect:", err) + //fmt.Println("Error running docker inspect:", err) return "" } ip := strings.TrimSpace(out.String())