diff --git a/third_party/nwaku b/third_party/nwaku index 507b1fc4d9..47a6235414 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit 507b1fc4d97a01ee5695a205f7f981bd4accc694 +Subproject commit 47a6235414c2910ad9f540882bc5193ece84c552 diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go index 5a13966b68..b8b5bbb217 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go @@ -76,6 +76,7 @@ func NewStorenodeCycle(logger *zap.Logger, pinger common.Pinger) *StorenodeCycle StorenodeNotWorkingEmitter: NewEmitter[struct{}](), StorenodeAvailableEmitter: NewEmitter[peer.ID](), logger: logger.Named("storenode-cycle"), + pinger: pinger, } } diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index bf70e0101d..c0f9a298a9 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -17,10 +17,13 @@ package wakuv2 int ret; char* msg; size_t len; + void* wg; } Resp; - static void* allocResp() { - return calloc(1, sizeof(Resp)); + static void* allocResp(void* wg) { + Resp* r = calloc(1, sizeof(Resp)); + r->wg = wg; + return r; } static void freeResp(void* resp) { @@ -52,54 +55,46 @@ package wakuv2 Resp* m = (Resp*) resp; return m->ret; } - // resp must be set != NULL in case interest on retrieving data from the callback - static void callback(int ret, char* msg, size_t len, void* resp) { - if (resp != NULL) { - Resp* m = (Resp*) resp; - m->ret = ret; - m->msg = msg; - m->len = len; - } - } + void GoCallback(int ret, char* msg, size_t len, void* resp); #define WAKU_CALL(call) \ do { \ - int ret = call; \ - if (ret != 0) { \ - printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ - exit(1); \ - } \ + int ret = call; \ + if (ret != 0) { \ + printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ + exit(1); \ + } \ } while (0) static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback - void* ret = waku_new(configJson, (WakuCallBack) callback, resp); + void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp); return ret; } static void cGoWakuStart(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStop(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuDestroy(void* wakuCtx, void* resp) { - WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuVersion(void* wakuCtx, void* resp) { - WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuSetEventCallback(void* wakuCtx) { @@ -130,16 +125,16 @@ package wakuv2 appVersion, contentTopicName, encoding, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) callback, resp) ); + WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelayPublish(void* wakuCtx, @@ -152,14 +147,14 @@ package wakuv2 pubSubTopic, jsonWakuMessage, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { WAKU_CALL ( waku_relay_subscribe(wakuCtx, pubSubTopic, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -167,7 +162,7 @@ package wakuv2 WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, pubSubTopic, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -175,7 +170,7 @@ package wakuv2 WAKU_CALL( waku_connect(wakuCtx, peerMultiAddr, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -189,7 +184,7 @@ package wakuv2 peerMultiAddr, protocol, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -203,47 +198,47 @@ package wakuv2 peerId, protocol, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, peerId, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyENR(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuLightpushPublish(void* wakuCtx, @@ -254,7 +249,7 @@ package wakuv2 WAKU_CALL (waku_lightpush_publish(wakuCtx, pubSubTopic, jsonWakuMessage, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -268,7 +263,7 @@ package wakuv2 jsonQuery, peerAddr, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -278,7 +273,7 @@ package wakuv2 WAKU_CALL (waku_peer_exchange_request(wakuCtx, numPeers, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -288,7 +283,7 @@ package wakuv2 WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, protocol, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -302,7 +297,7 @@ package wakuv2 entTreeUrl, nameDnsServer, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -2386,172 +2381,74 @@ type response struct { value any } +//export GoCallback +func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { + if resp != nil { + m := (*C.Resp)(resp) + m.ret = ret + m.msg = msg + m.len = len + wg := (*sync.WaitGroup)(m.wg) + wg.Done() + } +} + // WakuNode represents an instance of an nwaku node type WakuNode struct { - wakuCtx unsafe.Pointer - cancel context.CancelFunc - requestCh chan *request -} - -type requestType int - -const ( - requestTypeNew requestType = iota + 1 - requestTypePing - requestTypeStart - requestTypeRelayPublish - requestTypeStoreQuery - requestTypePeerID - requestTypeStop - requestTypeDestroy - requestTypeStartDiscV5 - requestTypeStopDiscV5 - requestTypeVersion - requestTypeRelaySubscribe - requestTypeRelayUnsubscribe - requestTypePeerExchangeRequest - requestTypeConnect - requestTypeDialPeerByID - requestTypeListenAddresses - requestTypeENR - requestTypeListPeersInMesh - requestTypeGetConnectedPeers - requestTypeGetPeerIDsFromPeerStore - requestTypeGetPeerIDsByProtocol - requestTypeDisconnectPeerByID - requestTypeDnsDiscovery - requestTypeDialPeer - requestTypeGetNumConnectedRelayPeers -) - -type request struct { - id string - reqType requestType - input any - responseCh chan response + wakuCtx unsafe.Pointer + cancel context.CancelFunc } func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { ctx, cancel := context.WithCancel(ctx) n := &WakuNode{ - requestCh: make(chan *request), - cancel: cancel, + cancel: cancel, } - // Notice this runs insto a separate goroutine. This is because we can't be sure - // from which OS thread will go call nwaku operations (They need to be done from - // the same thread that started nwaku). Communication with the goroutine to send - // operations to nwaku will be done via channels + wg := sync.WaitGroup{} + wg.Add(1) go func() { defer gocommon.LogOnPanic() runtime.LockOSThread() defer runtime.UnlockOSThread() - C.waku_setup() + wg.Done() - n.processLoop(ctx) + <-ctx.Done() }() - _, err := n.postTask(requestTypeNew, config) + wg.Wait() + + jsonConfig, err := json.Marshal(config) if err != nil { - cancel() return nil, err } - return n, nil -} -func (n *WakuNode) postTask(reqType requestType, input any) (any, error) { - responseCh := make(chan response) - n.requestCh <- &request{ - reqType: reqType, - input: input, - responseCh: responseCh, - } - response := <-responseCh - if response.err != nil { - return nil, response.err - } - return response.value, nil -} - -func (n *WakuNode) processLoop(ctx context.Context) { - for req := range n.requestCh { - switch req.reqType { - case requestTypeNew: - req.responseCh <- response{err: n.newNode(req.input.(*WakuConfig))} - case requestTypePing: - duration, err := n.pingPeer(req.input.(pingRequest)) - req.responseCh <- response{value: duration, err: err} - case requestTypeStart: - req.responseCh <- response{err: n.start()} - case requestTypeRelayPublish: - hash, err := n.relayPublish(req.input.(relayPublishRequest)) - req.responseCh <- response{value: hash, err: err} - case requestTypeStoreQuery: - results, err := n.storeQuery(req.input.(storeQueryRequest)) - req.responseCh <- response{value: results, err: err} - case requestTypeDestroy: - req.responseCh <- response{err: n.destroy()} - case requestTypePeerID: - peerID, err := n.peerID() - req.responseCh <- response{value: peerID, err: err} - case requestTypeStop: - req.responseCh <- response{err: n.stop()} - case requestTypeStartDiscV5: - req.responseCh <- response{err: n.startDiscV5()} - case requestTypeStopDiscV5: - req.responseCh <- response{err: n.stopDiscV5()} - case requestTypeVersion: - version, err := n.version() - req.responseCh <- response{value: version, err: err} - case requestTypePeerExchangeRequest: - numPeers, err := n.peerExchangeRequest(req.input.(uint64)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeRelaySubscribe: - req.responseCh <- response{err: n.relaySubscribe(req.input.(string))} - case requestTypeRelayUnsubscribe: - req.responseCh <- response{err: n.relayUnsubscribe(req.input.(string))} - case requestTypeConnect: - req.responseCh <- response{err: n.connect(req.input.(connectRequest))} - case requestTypeDialPeerByID: - req.responseCh <- response{err: n.dialPeerById(req.input.(dialPeerByIDRequest))} - case requestTypeListenAddresses: - addrs, err := n.listenAddresses() - req.responseCh <- response{value: addrs, err: err} - case requestTypeENR: - enr, err := n.enr() - req.responseCh <- response{value: enr, err: err} - case requestTypeListPeersInMesh: - numPeers, err := n.listPeersInMesh(req.input.(string)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeGetConnectedPeers: - peers, err := n.getConnectedPeers() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsFromPeerStore: - peers, err := n.getPeerIDsFromPeerStore() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsByProtocol: - peers, err := n.getPeerIDsByProtocol(req.input.(libp2pproto.ID)) - req.responseCh <- response{value: peers, err: err} - case requestTypeDisconnectPeerByID: - req.responseCh <- response{err: n.disconnectPeerByID(req.input.(peer.ID))} - case requestTypeDnsDiscovery: - addrs, err := n.dnsDiscovery(req.input.(dnsDiscoveryRequest)) - req.responseCh <- response{value: addrs, err: err} - case requestTypeDialPeer: - req.responseCh <- response{err: n.dialPeer(req.input.(dialPeerRequest))} - case requestTypeGetNumConnectedRelayPeers: - numPeers, err := n.getNumConnectedRelayPeers(req.input.([]string)...) - req.responseCh <- response{value: numPeers, err: err} - default: - req.responseCh <- response{err: errors.New("invalid operation")} - } + var cJsonConfig = C.CString(string(jsonConfig)) + + var resp = C.allocResp(unsafe.Pointer(&wg)) + + defer C.free(unsafe.Pointer(cJsonConfig)) + defer C.freeResp(resp) + + if C.getRet(resp) != C.RET_OK { + errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) } + + wg.Add(1) + n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) + wg.Wait() + + // Notice that the events for self node are handled by the 'MyEventCallback' method + C.cGoWakuSetEventCallback(n.wakuCtx) + + return n, nil } -func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { +func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 { pubsubTopic = "" @@ -2559,9 +2456,12 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err pubsubTopic = optPubsubTopic[0] } - var resp = C.allocResp() - var cPubsubTopic = C.CString(pubsubTopic) + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + var cPubsubTopic = C.CString(pubsubTopic) defer C.free(unsafe.Pointer(cPubsubTopic)) C.cGoWakuGetNumConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp) @@ -2580,13 +2480,17 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err return 0, errors.New(errMsg) } -func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { - var resp = C.allocResp() +func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPeerId = C.CString(peerID.String()) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerId)) + wg.Add(1) C.cGoWakuDisconnectPeerById(n.wakuCtx, cPeerId, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -2595,10 +2499,16 @@ func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { return errors.New(errMsg) } -func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { - var resp = C.allocResp() +func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + wg.Add(1) C.cGoWakuGetConnectedPeers(n.wakuCtx, resp) + wg.Wait() + if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -2621,12 +2531,14 @@ func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { } -func (n *WakuNode) relaySubscribe(pubsubTopic string) error { +func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { if pubsubTopic == "" { return errors.New("pubsub topic is empty") } - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) @@ -2636,7 +2548,9 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error { return errors.New("wakuCtx is nil") } + wg.Add(1) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -2646,12 +2560,14 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error { return errors.New(errMsg) } -func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { +func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { if pubsubTopic == "" { return errors.New("pubsub topic is empty") } - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) @@ -2661,7 +2577,9 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { return errors.New("wakuCtx is nil") } + wg.Add(1) C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -2671,11 +2589,15 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { return errors.New(errMsg) } -func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { - var resp = C.allocResp() +func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuPeerExchangeQuery(n.wakuCtx, C.uint64_t(numPeers), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) @@ -2689,11 +2611,15 @@ func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { return 0, errors.New(errMsg) } -func (n *WakuNode) startDiscV5() error { - var resp = C.allocResp() +func (n *WakuNode) StartDiscV5() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) - C.cGoWakuStartDiscV5(n.wakuCtx, resp) + wg.Add(1) + C.cGoWakuStartDiscV5(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2701,10 +2627,15 @@ func (n *WakuNode) startDiscV5() error { return errors.New(errMsg) } -func (n *WakuNode) stopDiscV5() error { - var resp = C.allocResp() +func (n *WakuNode) StopDiscV5() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + wg.Add(1) C.cGoWakuStopDiscV5(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -2713,11 +2644,15 @@ func (n *WakuNode) stopDiscV5() error { return errors.New(errMsg) } -func (n *WakuNode) version() (string, error) { - var resp = C.allocResp() +func (n *WakuNode) Version() (string, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuVersion(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -2729,29 +2664,34 @@ func (n *WakuNode) version() (string, error) { return "", errors.New(errMsg) } -func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) { +func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { // TODO: extract timeout from context timeoutMs := time.Minute.Milliseconds() - b, err := json.Marshal(storeQueryRequest.storeRequest) + b, err := json.Marshal(storeRequest) if err != nil { return nil, err } - addrs := make([]string, len(storeQueryRequest.peerInfo.Addrs)) - for i, addr := range utils.EncapsulatePeerID(storeQueryRequest.peerInfo.ID, storeQueryRequest.peerInfo.Addrs...) { + addrs := make([]string, len(peerInfo.Addrs)) + for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { addrs[i] = addr.String() } var cJsonQuery = C.CString(string(b)) var cPeerAddr = C.CString(strings.Join(addrs, ",")) - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.free(unsafe.Pointer(cJsonQuery)) defer C.free(unsafe.Pointer(cPeerAddr)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStoreQuery(n.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) + wg.Wait() + if C.getRet(resp) == C.RET_OK { jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) storeQueryResponse := &storepb.StoreQueryResponse{} @@ -2766,24 +2706,27 @@ func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) return nil, errors.New(errMsg) } -func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) { +func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { // TODO: extract timeout from context timeoutMs := time.Minute.Milliseconds() - jsonMsg, err := json.Marshal(relayPublishRequest.message) + jsonMsg, err := json.Marshal(message) if err != nil { return pb.MessageHash{}, err } - var cPubsubTopic = C.CString(relayPublishRequest.pubsubTopic) - var msg = C.CString(string(jsonMsg)) - var resp = C.allocResp() + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPubsubTopic = C.CString(pubsubTopic) + var msg = C.CString(string(jsonMsg)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) defer C.free(unsafe.Pointer(msg)) + wg.Add(1) C.cGoWakuRelayPublish(n.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) msgHashBytes, err := hexutil.Decode(msgHash) @@ -2796,15 +2739,20 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes return pb.MessageHash{}, errors.New(errMsg) } -func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() - var cEnrTree = C.CString(dnsDiscRequest.enrTreeUrl) - var cDnsServer = C.CString(dnsDiscRequest.nameDnsServer) +func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cEnrTree = C.CString(enrTreeUrl) + var cDnsServer = C.CString(nameDnsServer) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cEnrTree)) defer C.free(unsafe.Pointer(cDnsServer)) + // TODO: extract timeout from context + wg.Add(1) C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(time.Minute.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var addrsRet []multiaddr.Multiaddr nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -2822,21 +2770,24 @@ func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr return nil, errors.New(errMsg) } -func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { - peerInfo := request.peerInfo - +func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) { addrs := make([]string, len(peerInfo.Addrs)) for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { addrs[i] = addr.String() } - var resp = C.allocResp() - var cPeerId = C.CString(strings.Join(addrs, ",")) + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + var cPeerId = C.CString(strings.Join(addrs, ",")) defer C.free(unsafe.Pointer(cPeerId)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) rttInt, err := strconv.ParseInt(rttStr, 10, 64) @@ -2850,12 +2801,15 @@ func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { return 0, fmt.Errorf("PingPeer: %s", errMsg) } -func (n *WakuNode) start() error { - var resp = C.allocResp() +func (n *WakuNode) Start() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStart(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2864,12 +2818,15 @@ func (n *WakuNode) start() error { return errors.New(errMsg) } -func (n *WakuNode) stop() error { - var resp = C.allocResp() +func (n *WakuNode) Stop() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStop(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2878,12 +2835,15 @@ func (n *WakuNode) stop() error { return errors.New(errMsg) } -func (n *WakuNode) destroy() error { - var resp = C.allocResp() +func (n *WakuNode) Destroy() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuDestroy(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2892,12 +2852,15 @@ func (n *WakuNode) destroy() error { return errors.New(errMsg) } -func (n *WakuNode) peerID() (peer.ID, error) { - var resp = C.allocResp() +func (n *WakuNode) PeerID() (peer.ID, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuGetMyPeerId(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) id, err := peer.Decode(peerIdStr) @@ -2911,15 +2874,18 @@ func (n *WakuNode) peerID() (peer.ID, error) { return "", errors.New(errMsg) } -func (n *WakuNode) connect(connReq connectRequest) error { - var resp = C.allocResp() - var cPeerMultiAddr = C.CString(connReq.addr.String()) +func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPeerMultiAddr = C.CString(addr.String()) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2928,17 +2894,20 @@ func (n *WakuNode) connect(connReq connectRequest) error { return errors.New(errMsg) } -func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { - var resp = C.allocResp() - var cPeerId = C.CString(dialPeerByIDReq.peerID.String()) - var cProtocol = C.CString(string(dialPeerByIDReq.protocol)) +func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPeerId = C.CString(peerID.String()) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerId)) defer C.free(unsafe.Pointer(cProtocol)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(time.Minute.Milliseconds()), resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2947,11 +2916,15 @@ func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { return errors.New(errMsg) } -func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() +func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) - C.cGoWakuListenAddresses(n.wakuCtx, resp) + wg.Add(1) + C.cGoWakuListenAddresses(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var addrsRet []multiaddr.Multiaddr listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -2969,11 +2942,15 @@ func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { return nil, errors.New(errMsg) } -func (n *WakuNode) enr() (*enode.Node, error) { - var resp = C.allocResp() +func (n *WakuNode) ENR() (*enode.Node, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) - C.cGoWakuGetMyENR(n.wakuCtx, resp) + wg.Add(1) + C.cGoWakuGetMyENR(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) n, err := enode.Parse(enode.ValidSchemes, enrStr) @@ -2987,14 +2964,17 @@ func (n *WakuNode) enr() (*enode.Node, error) { return nil, errors.New(errMsg) } -func (n *WakuNode) listPeersInMesh(pubsubTopic string) (int, error) { - var resp = C.allocResp() +func (n *WakuNode) ListPeersInMesh(pubsubTopic string) (int, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) + wg.Add(1) C.cGoWakuListPeersInMesh(n.wakuCtx, cPubsubTopic, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) @@ -3009,11 +2989,15 @@ func (n *WakuNode) listPeersInMesh(pubsubTopic string) (int, error) { return 0, errors.New(errMsg) } -func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { - var resp = C.allocResp() +func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) - C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) + wg.Add(1) + C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -3036,14 +3020,17 @@ func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) } -func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice, error) { - var resp = C.allocResp() - var cProtocol = C.CString(string(protocolID)) +func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cProtocol)) + wg.Add(1) C.cGoWakuGetPeerIdsByProtocol(n.wakuCtx, cProtocol, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -3067,41 +3054,19 @@ func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice return nil, fmt.Errorf("GetPeerIdsByProtocol: %s", errMsg) } -func (n *WakuNode) newNode(config *WakuConfig) error { - jsonConfig, err := json.Marshal(config) - if err != nil { - return err - } - - var cJsonConfig = C.CString(string(jsonConfig)) - var resp = C.allocResp() - - defer C.free(unsafe.Pointer(cJsonConfig)) - defer C.freeResp(resp) - - if C.getRet(resp) != C.RET_OK { - errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) - } - - wakuCtx := C.cGoWakuNew(cJsonConfig, resp) - n.wakuCtx = unsafe.Pointer(wakuCtx) - - // Notice that the events for self node are handled by the 'MyEventCallback' method - C.cGoWakuSetEventCallback(n.wakuCtx) - - return nil -} +func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { + wg := sync.WaitGroup{} -func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { - var resp = C.allocResp() - var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String()) - var cProtocol = C.CString(string(dialPeerReq.protocol)) + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPeerMultiAddr = C.CString(peerAddr.String()) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) defer C.free(unsafe.Pointer(cProtocol)) // TODO: extract timeout from context + wg.Add(1) C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -3109,177 +3074,6 @@ func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { return errors.New(errMsg) } -type pingRequest struct { - ctx context.Context - peerInfo peer.AddrInfo -} - -func (n *WakuNode) PingPeer(ctx context.Context, info peer.AddrInfo) (time.Duration, error) { - response, err := n.postTask(requestTypePing, pingRequest{ - ctx: ctx, - peerInfo: info, - }) - if err != nil { - return 0, err - } - return response.(time.Duration), nil -} - -func (n *WakuNode) Start() error { - _, err := n.postTask(requestTypeStart, nil) - return err -} - -type relayPublishRequest struct { - ctx context.Context - pubsubTopic string - message *pb.WakuMessage -} - -func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { - response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{ - ctx: ctx, - pubsubTopic: pubsubTopic, - message: message, - }) - if err != nil { - return pb.MessageHash{}, err - } - return response.(pb.MessageHash), nil -} - -type storeQueryRequest struct { - ctx context.Context - storeRequest *storepb.StoreQueryRequest - peerInfo peer.AddrInfo -} - -func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { - response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{ - ctx: ctx, - peerInfo: peerInfo, - storeRequest: storeRequest, - }) - if err != nil { - return nil, err - } - return response.(*storepb.StoreQueryResponse), nil -} - -func (n *WakuNode) PeerID() (peer.ID, error) { - response, err := n.postTask(requestTypePeerID, nil) - if err != nil { - return "", err - } - return response.(peer.ID), nil -} - -func (n *WakuNode) Stop() error { - _, err := n.postTask(requestTypeStop, nil) - return err -} - -func (n *WakuNode) Destroy() error { - _, err := n.postTask(requestTypeDestroy, nil) - return err -} - -func (n *WakuNode) StartDiscV5() error { - _, err := n.postTask(requestTypeStartDiscV5, nil) - return err -} - -func (n *WakuNode) StopDiscV5() error { - _, err := n.postTask(requestTypeStopDiscV5, nil) - return err -} - -func (n *WakuNode) Version() (string, error) { - response, err := n.postTask(requestTypeVersion, nil) - if err != nil { - return "", err - } - return response.(string), nil -} - -func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { - _, err := n.postTask(requestTypeRelaySubscribe, pubsubTopic) - return err -} - -func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { - _, err := n.postTask(requestTypeRelayUnsubscribe, pubsubTopic) - return err -} - -func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { - response, err := n.postTask(requestTypePeerExchangeRequest, numPeers) - if err != nil { - return 0, err - } - return response.(uint64), nil -} - -type connectRequest struct { - ctx context.Context - addr multiaddr.Multiaddr -} - -func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { - _, err := n.postTask(requestTypeConnect, connectRequest{ - ctx: ctx, - addr: addr, - }) - return err -} - -type dialPeerByIDRequest struct { - ctx context.Context - peerID peer.ID - protocol libp2pproto.ID -} - -func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { - _, err := n.postTask(requestTypeDialPeerByID, dialPeerByIDRequest{ - ctx: ctx, - peerID: peerID, - protocol: protocol, - }) - return err -} - -func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { - response, err := n.postTask(requestTypeListenAddresses, nil) - if err != nil { - return nil, err - } - return response.([]multiaddr.Multiaddr), nil -} - -func (n *WakuNode) ENR() (*enode.Node, error) { - response, err := n.postTask(requestTypeENR, nil) - if err != nil { - return nil, err - } - return response.(*enode.Node), nil -} - -func (n *WakuNode) ListPeersInMesh(pubsubTopic string) (int, error) { - response, err := n.postTask(requestTypeListPeersInMesh, pubsubTopic) - if err != nil { - return 0, err - } - return response.(int), nil -} - -func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetConnectedPeers, nil) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - func (n *WakuNode) GetNumConnectedPeers() (int, error) { peers, err := n.GetConnectedPeers() if err != nil { @@ -3287,65 +3081,3 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) { } return len(peers), nil } - -func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetPeerIDsFromPeerStore, nil) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - -func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetPeerIDsByProtocol, protocol) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - -func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { - _, err := n.postTask(requestTypeDisconnectPeerByID, peerID) - return err -} - -type dnsDiscoveryRequest struct { - ctx context.Context - enrTreeUrl string - nameDnsServer string -} - -func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { - response, err := n.postTask(requestTypeDnsDiscovery, dnsDiscoveryRequest{ - ctx: ctx, - enrTreeUrl: enrTreeUrl, - nameDnsServer: nameDnsServer, - }) - if err != nil { - return nil, err - } - return response.([]multiaddr.Multiaddr), nil -} - -type dialPeerRequest struct { - ctx context.Context - peerAddr multiaddr.Multiaddr - protocol libp2pproto.ID -} - -func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { - _, err := n.postTask(requestTypeDialPeer, dialPeerRequest{ - ctx: ctx, - peerAddr: peerAddr, - protocol: protocol, - }) - return err -} - -func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { - response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic) - if err != nil { - return 0, err - } - return response.(int), nil -} diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index b57c8fa2c1..24b07fe958 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -192,6 +192,8 @@ func TestBasicWakuV2(t *testing.T) { storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort) require.NoError(t, err) + ctx := context.Background() + wakuConfig := Config{ UseThrottledPublish: true, ClusterID: 16, @@ -240,9 +242,7 @@ func TestBasicWakuV2(t *testing.T) { storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - for i := 0; i <= 100; i++ { - time.Sleep(2 * time.Second) - } + w.node.DialPeer(ctx, storeNode.Addrs[0], "") w.StorenodeCycle.SetStorenodeConfigProvider(newTestStorenodeConfigProvider(*storeNode))