forked from libp2p/go-libp2p
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
129 lines (116 loc) · 2.96 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package main
import (
"bufio"
"context"
"flag"
"fmt"
"os"
"sync"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
)
var (
topicNameFlag = flag.String("topicName", "applesauce", "name of topic to join")
)
func main() {
flag.Parse()
ctx := context.Background()
h, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
if err != nil {
panic(err)
}
go discoverPeers(ctx, h)
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
panic(err)
}
topic, err := ps.Join(*topicNameFlag)
if err != nil {
panic(err)
}
go streamConsoleTo(ctx, topic)
sub, err := topic.Subscribe()
if err != nil {
panic(err)
}
printMessagesFrom(ctx, sub)
}
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
// Start a DHT, for use in peer discovery. We can't just make a new DHT
// client because we want each peer to maintain its own local copy of the
// DHT, so that the bootstrapping node of the DHT can go down without
// inhibiting future peer discovery.
kademliaDHT, err := dht.New(ctx, h)
if err != nil {
panic(err)
}
if err = kademliaDHT.Bootstrap(ctx); err != nil {
panic(err)
}
var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := h.Connect(ctx, *peerinfo); err != nil {
fmt.Println("Bootstrap warning:", err)
}
}()
}
wg.Wait()
return kademliaDHT
}
func discoverPeers(ctx context.Context, h host.Host) {
kademliaDHT := initDHT(ctx, h)
routingDiscovery := drouting.NewRoutingDiscovery(kademliaDHT)
dutil.Advertise(ctx, routingDiscovery, *topicNameFlag)
// Look for others who have announced and attempt to connect to them
anyConnected := false
for !anyConnected {
fmt.Println("Searching for peers...")
peerChan, err := routingDiscovery.FindPeers(ctx, *topicNameFlag)
if err != nil {
panic(err)
}
for peer := range peerChan {
if peer.ID == h.ID() {
continue // No self connection
}
err := h.Connect(ctx, peer)
if err != nil {
fmt.Printf("Failed connecting to %s, error: %s\n", peer.ID, err)
} else {
fmt.Println("Connected to:", peer.ID)
anyConnected = true
}
}
}
fmt.Println("Peer discovery complete")
}
func streamConsoleTo(ctx context.Context, topic *pubsub.Topic) {
reader := bufio.NewReader(os.Stdin)
for {
s, err := reader.ReadString('\n')
if err != nil {
panic(err)
}
if err := topic.Publish(ctx, []byte(s)); err != nil {
fmt.Println("### Publish error:", err)
}
}
}
func printMessagesFrom(ctx context.Context, sub *pubsub.Subscription) {
for {
m, err := sub.Next(ctx)
if err != nil {
panic(err)
}
fmt.Println(m.ReceivedFrom, ": ", string(m.Message.Data))
}
}