From 017c527a043ed7ed13bd1bd09ab4a546af3339d3 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 28 Nov 2024 13:55:30 -0400 Subject: [PATCH 1/2] feat_: async nwaku --- third_party/nwaku | 2 +- wakuv2/nwaku.go | 426 +++++++++++++++++++++++++++---------------- wakuv2/nwaku_test.go | 6 +- 3 files changed, 273 insertions(+), 161 deletions(-) diff --git a/third_party/nwaku b/third_party/nwaku index 507b1fc4d9..03431a9f30 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit 507b1fc4d97a01ee5695a205f7f981bd4accc694 +Subproject commit 03431a9f3083971bffd266f455431760e0b70e3a diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index bf70e0101d..fe1592e8f4 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -17,10 +17,13 @@ package wakuv2 int ret; char* msg; size_t len; + void* wg; } Resp; - static void* allocResp() { - return calloc(1, sizeof(Resp)); + static void* allocResp(void* wg) { + Resp* r = calloc(1, sizeof(Resp)); + r->wg = wg; + return r; } static void freeResp(void* resp) { @@ -52,54 +55,46 @@ package wakuv2 Resp* m = (Resp*) resp; return m->ret; } - // resp must be set != NULL in case interest on retrieving data from the callback - static void callback(int ret, char* msg, size_t len, void* resp) { - if (resp != NULL) { - Resp* m = (Resp*) resp; - m->ret = ret; - m->msg = msg; - m->len = len; - } - } + void GoCallback(int ret, char* msg, size_t len, void* resp); #define WAKU_CALL(call) \ do { \ - int ret = call; \ - if (ret != 0) { \ - printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ - exit(1); \ - } \ + int ret = call; \ + if (ret != 0) { \ + printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ + exit(1); \ + } \ } while (0) static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback - void* ret = waku_new(configJson, (WakuCallBack) callback, resp); + void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp); return ret; } static void cGoWakuStart(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStop(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuDestroy(void* wakuCtx, void* resp) { - WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuVersion(void* wakuCtx, void* resp) { - WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuSetEventCallback(void* wakuCtx) { @@ -130,16 +125,16 @@ package wakuv2 appVersion, contentTopicName, encoding, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) callback, resp) ); + WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelayPublish(void* wakuCtx, @@ -152,14 +147,14 @@ package wakuv2 pubSubTopic, jsonWakuMessage, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { WAKU_CALL ( waku_relay_subscribe(wakuCtx, pubSubTopic, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -167,7 +162,7 @@ package wakuv2 WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, pubSubTopic, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -175,7 +170,7 @@ package wakuv2 WAKU_CALL( waku_connect(wakuCtx, peerMultiAddr, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -189,7 +184,7 @@ package wakuv2 peerMultiAddr, protocol, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -203,47 +198,47 @@ package wakuv2 peerId, protocol, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, peerId, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyENR(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuLightpushPublish(void* wakuCtx, @@ -254,7 +249,7 @@ package wakuv2 WAKU_CALL (waku_lightpush_publish(wakuCtx, pubSubTopic, jsonWakuMessage, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -268,7 +263,7 @@ package wakuv2 jsonQuery, peerAddr, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -278,7 +273,7 @@ package wakuv2 WAKU_CALL (waku_peer_exchange_request(wakuCtx, numPeers, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -288,7 +283,7 @@ package wakuv2 WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, protocol, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -302,7 +297,7 @@ package wakuv2 entTreeUrl, nameDnsServer, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -2386,6 +2381,18 @@ type response struct { value any } +//export GoCallback +func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { + if resp != nil { + m := (*C.Resp)(resp) + m.ret = ret + m.msg = msg + m.len = len + wg := (*sync.WaitGroup)(m.wg) + wg.Done() + } +} + // WakuNode represents an instance of an nwaku node type WakuNode struct { wakuCtx unsafe.Pointer @@ -2477,76 +2484,81 @@ func (n *WakuNode) postTask(reqType requestType, input any) (any, error) { } func (n *WakuNode) processLoop(ctx context.Context) { - for req := range n.requestCh { - switch req.reqType { - case requestTypeNew: - req.responseCh <- response{err: n.newNode(req.input.(*WakuConfig))} - case requestTypePing: - duration, err := n.pingPeer(req.input.(pingRequest)) - req.responseCh <- response{value: duration, err: err} - case requestTypeStart: - req.responseCh <- response{err: n.start()} - case requestTypeRelayPublish: - hash, err := n.relayPublish(req.input.(relayPublishRequest)) - req.responseCh <- response{value: hash, err: err} - case requestTypeStoreQuery: - results, err := n.storeQuery(req.input.(storeQueryRequest)) - req.responseCh <- response{value: results, err: err} - case requestTypeDestroy: - req.responseCh <- response{err: n.destroy()} - case requestTypePeerID: - peerID, err := n.peerID() - req.responseCh <- response{value: peerID, err: err} - case requestTypeStop: - req.responseCh <- response{err: n.stop()} - case requestTypeStartDiscV5: - req.responseCh <- response{err: n.startDiscV5()} - case requestTypeStopDiscV5: - req.responseCh <- response{err: n.stopDiscV5()} - case requestTypeVersion: - version, err := n.version() - req.responseCh <- response{value: version, err: err} - case requestTypePeerExchangeRequest: - numPeers, err := n.peerExchangeRequest(req.input.(uint64)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeRelaySubscribe: - req.responseCh <- response{err: n.relaySubscribe(req.input.(string))} - case requestTypeRelayUnsubscribe: - req.responseCh <- response{err: n.relayUnsubscribe(req.input.(string))} - case requestTypeConnect: - req.responseCh <- response{err: n.connect(req.input.(connectRequest))} - case requestTypeDialPeerByID: - req.responseCh <- response{err: n.dialPeerById(req.input.(dialPeerByIDRequest))} - case requestTypeListenAddresses: - addrs, err := n.listenAddresses() - req.responseCh <- response{value: addrs, err: err} - case requestTypeENR: - enr, err := n.enr() - req.responseCh <- response{value: enr, err: err} - case requestTypeListPeersInMesh: - numPeers, err := n.listPeersInMesh(req.input.(string)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeGetConnectedPeers: - peers, err := n.getConnectedPeers() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsFromPeerStore: - peers, err := n.getPeerIDsFromPeerStore() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsByProtocol: - peers, err := n.getPeerIDsByProtocol(req.input.(libp2pproto.ID)) - req.responseCh <- response{value: peers, err: err} - case requestTypeDisconnectPeerByID: - req.responseCh <- response{err: n.disconnectPeerByID(req.input.(peer.ID))} - case requestTypeDnsDiscovery: - addrs, err := n.dnsDiscovery(req.input.(dnsDiscoveryRequest)) - req.responseCh <- response{value: addrs, err: err} - case requestTypeDialPeer: - req.responseCh <- response{err: n.dialPeer(req.input.(dialPeerRequest))} - case requestTypeGetNumConnectedRelayPeers: - numPeers, err := n.getNumConnectedRelayPeers(req.input.([]string)...) - req.responseCh <- response{value: numPeers, err: err} - default: - req.responseCh <- response{err: errors.New("invalid operation")} + for { + select { + case <-ctx.Done(): + return + case req := <-n.requestCh: + switch req.reqType { + case requestTypeNew: + req.responseCh <- response{err: n.newNode(req.input.(*WakuConfig))} + case requestTypePing: + duration, err := n.pingPeer(req.input.(pingRequest)) + req.responseCh <- response{value: duration, err: err} + case requestTypeStart: + req.responseCh <- response{err: n.start()} + case requestTypeRelayPublish: + hash, err := n.relayPublish(req.input.(relayPublishRequest)) + req.responseCh <- response{value: hash, err: err} + case requestTypeStoreQuery: + results, err := n.storeQuery(req.input.(storeQueryRequest)) + req.responseCh <- response{value: results, err: err} + case requestTypeDestroy: + req.responseCh <- response{err: n.destroy()} + case requestTypePeerID: + peerID, err := n.peerID() + req.responseCh <- response{value: peerID, err: err} + case requestTypeStop: + req.responseCh <- response{err: n.stop()} + case requestTypeStartDiscV5: + req.responseCh <- response{err: n.startDiscV5()} + case requestTypeStopDiscV5: + req.responseCh <- response{err: n.stopDiscV5()} + case requestTypeVersion: + version, err := n.version() + req.responseCh <- response{value: version, err: err} + case requestTypePeerExchangeRequest: + numPeers, err := n.peerExchangeRequest(req.input.(uint64)) + req.responseCh <- response{value: numPeers, err: err} + case requestTypeRelaySubscribe: + req.responseCh <- response{err: n.relaySubscribe(req.input.(string))} + case requestTypeRelayUnsubscribe: + req.responseCh <- response{err: n.relayUnsubscribe(req.input.(string))} + case requestTypeConnect: + req.responseCh <- response{err: n.connect(req.input.(connectRequest))} + case requestTypeDialPeerByID: + req.responseCh <- response{err: n.dialPeerById(req.input.(dialPeerByIDRequest))} + case requestTypeListenAddresses: + addrs, err := n.listenAddresses() + req.responseCh <- response{value: addrs, err: err} + case requestTypeENR: + enr, err := n.enr() + req.responseCh <- response{value: enr, err: err} + case requestTypeListPeersInMesh: + numPeers, err := n.listPeersInMesh(req.input.(string)) + req.responseCh <- response{value: numPeers, err: err} + case requestTypeGetConnectedPeers: + peers, err := n.getConnectedPeers() + req.responseCh <- response{value: peers, err: err} + case requestTypeGetPeerIDsFromPeerStore: + peers, err := n.getPeerIDsFromPeerStore() + req.responseCh <- response{value: peers, err: err} + case requestTypeGetPeerIDsByProtocol: + peers, err := n.getPeerIDsByProtocol(req.input.(libp2pproto.ID)) + req.responseCh <- response{value: peers, err: err} + case requestTypeDisconnectPeerByID: + req.responseCh <- response{err: n.disconnectPeerByID(req.input.(peer.ID))} + case requestTypeDnsDiscovery: + addrs, err := n.dnsDiscovery(req.input.(dnsDiscoveryRequest)) + req.responseCh <- response{value: addrs, err: err} + case requestTypeDialPeer: + req.responseCh <- response{err: n.dialPeer(req.input.(dialPeerRequest))} + case requestTypeGetNumConnectedRelayPeers: + numPeers, err := n.getNumConnectedRelayPeers(req.input.([]string)...) + req.responseCh <- response{value: numPeers, err: err} + default: + req.responseCh <- response{err: errors.New("invalid operation")} + } } } } @@ -2559,9 +2571,12 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err pubsubTopic = optPubsubTopic[0] } - var resp = C.allocResp() - var cPubsubTopic = C.CString(pubsubTopic) + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + var cPubsubTopic = C.CString(pubsubTopic) defer C.free(unsafe.Pointer(cPubsubTopic)) C.cGoWakuGetNumConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp) @@ -2581,12 +2596,16 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err } func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPeerId = C.CString(peerID.String()) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerId)) + wg.Add(1) C.cGoWakuDisconnectPeerById(n.wakuCtx, cPeerId, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -2596,9 +2615,15 @@ func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { } func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + wg.Add(1) C.cGoWakuGetConnectedPeers(n.wakuCtx, resp) + wg.Wait() + if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -2626,7 +2651,9 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error { return errors.New("pubsub topic is empty") } - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) @@ -2636,7 +2663,9 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error { return errors.New("wakuCtx is nil") } + wg.Add(1) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -2651,7 +2680,9 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { return errors.New("pubsub topic is empty") } - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) @@ -2661,7 +2692,9 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { return errors.New("wakuCtx is nil") } + wg.Add(1) C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -2672,10 +2705,14 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { } func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuPeerExchangeQuery(n.wakuCtx, C.uint64_t(numPeers), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) @@ -2690,10 +2727,14 @@ func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { } func (n *WakuNode) startDiscV5() error { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) - C.cGoWakuStartDiscV5(n.wakuCtx, resp) + wg.Add(1) + C.cGoWakuStartDiscV5(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2702,9 +2743,14 @@ func (n *WakuNode) startDiscV5() error { } func (n *WakuNode) stopDiscV5() error { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + wg.Add(1) C.cGoWakuStopDiscV5(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -2714,10 +2760,14 @@ func (n *WakuNode) stopDiscV5() error { } func (n *WakuNode) version() (string, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuVersion(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -2745,13 +2795,18 @@ func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) var cJsonQuery = C.CString(string(b)) var cPeerAddr = C.CString(strings.Join(addrs, ",")) - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.free(unsafe.Pointer(cJsonQuery)) defer C.free(unsafe.Pointer(cPeerAddr)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStoreQuery(n.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) + wg.Wait() + if C.getRet(resp) == C.RET_OK { jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) storeQueryResponse := &storepb.StoreQueryResponse{} @@ -2775,15 +2830,18 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes return pb.MessageHash{}, err } + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(relayPublishRequest.pubsubTopic) var msg = C.CString(string(jsonMsg)) - var resp = C.allocResp() - defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) defer C.free(unsafe.Pointer(msg)) + wg.Add(1) C.cGoWakuRelayPublish(n.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) msgHashBytes, err := hexutil.Decode(msgHash) @@ -2797,14 +2855,19 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes } func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cEnrTree = C.CString(dnsDiscRequest.enrTreeUrl) var cDnsServer = C.CString(dnsDiscRequest.nameDnsServer) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cEnrTree)) defer C.free(unsafe.Pointer(cDnsServer)) + // TODO: extract timeout from context + wg.Add(1) C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(time.Minute.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var addrsRet []multiaddr.Multiaddr nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -2830,13 +2893,18 @@ func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { addrs[i] = addr.String() } - var resp = C.allocResp() - var cPeerId = C.CString(strings.Join(addrs, ",")) + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + var cPeerId = C.CString(strings.Join(addrs, ",")) defer C.free(unsafe.Pointer(cPeerId)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) rttInt, err := strconv.ParseInt(rttStr, 10, 64) @@ -2851,11 +2919,14 @@ func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { } func (n *WakuNode) start() error { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStart(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2865,11 +2936,14 @@ func (n *WakuNode) start() error { } func (n *WakuNode) stop() error { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStop(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2879,11 +2953,14 @@ func (n *WakuNode) stop() error { } func (n *WakuNode) destroy() error { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuDestroy(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2893,11 +2970,14 @@ func (n *WakuNode) destroy() error { } func (n *WakuNode) peerID() (peer.ID, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuGetMyPeerId(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) id, err := peer.Decode(peerIdStr) @@ -2912,14 +2992,17 @@ func (n *WakuNode) peerID() (peer.ID, error) { } func (n *WakuNode) connect(connReq connectRequest) error { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPeerMultiAddr = C.CString(connReq.addr.String()) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2929,7 +3012,9 @@ func (n *WakuNode) connect(connReq connectRequest) error { } func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPeerId = C.CString(dialPeerByIDReq.peerID.String()) var cProtocol = C.CString(string(dialPeerByIDReq.protocol)) defer C.freeResp(resp) @@ -2937,8 +3022,9 @@ func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { defer C.free(unsafe.Pointer(cProtocol)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(time.Minute.Milliseconds()), resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -2948,10 +3034,14 @@ func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { } func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) - C.cGoWakuListenAddresses(n.wakuCtx, resp) + wg.Add(1) + C.cGoWakuListenAddresses(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var addrsRet []multiaddr.Multiaddr listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -2970,10 +3060,14 @@ func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { } func (n *WakuNode) enr() (*enode.Node, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) - C.cGoWakuGetMyENR(n.wakuCtx, resp) + wg.Add(1) + C.cGoWakuGetMyENR(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) n, err := enode.Parse(enode.ValidSchemes, enrStr) @@ -2988,13 +3082,16 @@ func (n *WakuNode) enr() (*enode.Node, error) { } func (n *WakuNode) listPeersInMesh(pubsubTopic string) (int, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) + wg.Add(1) C.cGoWakuListPeersInMesh(n.wakuCtx, cPubsubTopic, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) @@ -3010,10 +3107,14 @@ func (n *WakuNode) listPeersInMesh(pubsubTopic string) (int, error) { } func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) - C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) + wg.Add(1) + C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -3037,13 +3138,16 @@ func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { } func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice, error) { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cProtocol = C.CString(string(protocolID)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cProtocol)) + wg.Add(1) C.cGoWakuGetPeerIdsByProtocol(n.wakuCtx, cProtocol, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -3074,7 +3178,9 @@ func (n *WakuNode) newNode(config *WakuConfig) error { } var cJsonConfig = C.CString(string(jsonConfig)) - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.free(unsafe.Pointer(cJsonConfig)) defer C.freeResp(resp) @@ -3084,7 +3190,9 @@ func (n *WakuNode) newNode(config *WakuConfig) error { return errors.New(errMsg) } + wg.Add(1) wakuCtx := C.cGoWakuNew(cJsonConfig, resp) + wg.Wait() n.wakuCtx = unsafe.Pointer(wakuCtx) // Notice that the events for self node are handled by the 'MyEventCallback' method @@ -3094,14 +3202,18 @@ func (n *WakuNode) newNode(config *WakuConfig) error { } func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String()) var cProtocol = C.CString(string(dialPeerReq.protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) defer C.free(unsafe.Pointer(cProtocol)) // TODO: extract timeout from context + wg.Add(1) C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index b57c8fa2c1..24b07fe958 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -192,6 +192,8 @@ func TestBasicWakuV2(t *testing.T) { storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort) require.NoError(t, err) + ctx := context.Background() + wakuConfig := Config{ UseThrottledPublish: true, ClusterID: 16, @@ -240,9 +242,7 @@ func TestBasicWakuV2(t *testing.T) { storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - for i := 0; i <= 100; i++ { - time.Sleep(2 * time.Second) - } + w.node.DialPeer(ctx, storeNode.Addrs[0], "") w.StorenodeCycle.SetStorenodeConfigProvider(newTestStorenodeConfigProvider(*storeNode)) From 4855b80974cf021119dc51417871c6ca5b1ba0df Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 29 Nov 2024 20:45:00 -0400 Subject: [PATCH 2/2] fix_: remove nwaku process loop --- third_party/nwaku | 2 +- .../go-waku/waku/v2/api/history/cycle.go | 1 + wakuv2/nwaku.go | 512 +++--------------- 3 files changed, 68 insertions(+), 447 deletions(-) diff --git a/third_party/nwaku b/third_party/nwaku index 03431a9f30..47a6235414 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit 03431a9f3083971bffd266f455431760e0b70e3a +Subproject commit 47a6235414c2910ad9f540882bc5193ece84c552 diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go index 5a13966b68..b8b5bbb217 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go @@ -76,6 +76,7 @@ func NewStorenodeCycle(logger *zap.Logger, pinger common.Pinger) *StorenodeCycle StorenodeNotWorkingEmitter: NewEmitter[struct{}](), StorenodeAvailableEmitter: NewEmitter[peer.ID](), logger: logger.Named("storenode-cycle"), + pinger: pinger, } } diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index fe1592e8f4..c0f9a298a9 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -2395,175 +2395,60 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { - wakuCtx unsafe.Pointer - cancel context.CancelFunc - requestCh chan *request -} - -type requestType int - -const ( - requestTypeNew requestType = iota + 1 - requestTypePing - requestTypeStart - requestTypeRelayPublish - requestTypeStoreQuery - requestTypePeerID - requestTypeStop - requestTypeDestroy - requestTypeStartDiscV5 - requestTypeStopDiscV5 - requestTypeVersion - requestTypeRelaySubscribe - requestTypeRelayUnsubscribe - requestTypePeerExchangeRequest - requestTypeConnect - requestTypeDialPeerByID - requestTypeListenAddresses - requestTypeENR - requestTypeListPeersInMesh - requestTypeGetConnectedPeers - requestTypeGetPeerIDsFromPeerStore - requestTypeGetPeerIDsByProtocol - requestTypeDisconnectPeerByID - requestTypeDnsDiscovery - requestTypeDialPeer - requestTypeGetNumConnectedRelayPeers -) - -type request struct { - id string - reqType requestType - input any - responseCh chan response + wakuCtx unsafe.Pointer + cancel context.CancelFunc } func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { ctx, cancel := context.WithCancel(ctx) n := &WakuNode{ - requestCh: make(chan *request), - cancel: cancel, + cancel: cancel, } - // Notice this runs insto a separate goroutine. This is because we can't be sure - // from which OS thread will go call nwaku operations (They need to be done from - // the same thread that started nwaku). Communication with the goroutine to send - // operations to nwaku will be done via channels + wg := sync.WaitGroup{} + wg.Add(1) go func() { defer gocommon.LogOnPanic() runtime.LockOSThread() defer runtime.UnlockOSThread() - C.waku_setup() + wg.Done() - n.processLoop(ctx) + <-ctx.Done() }() - _, err := n.postTask(requestTypeNew, config) + wg.Wait() + + jsonConfig, err := json.Marshal(config) if err != nil { - cancel() return nil, err } - return n, nil -} -func (n *WakuNode) postTask(reqType requestType, input any) (any, error) { - responseCh := make(chan response) - n.requestCh <- &request{ - reqType: reqType, - input: input, - responseCh: responseCh, - } - response := <-responseCh - if response.err != nil { - return nil, response.err - } - return response.value, nil -} + var cJsonConfig = C.CString(string(jsonConfig)) -func (n *WakuNode) processLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case req := <-n.requestCh: - switch req.reqType { - case requestTypeNew: - req.responseCh <- response{err: n.newNode(req.input.(*WakuConfig))} - case requestTypePing: - duration, err := n.pingPeer(req.input.(pingRequest)) - req.responseCh <- response{value: duration, err: err} - case requestTypeStart: - req.responseCh <- response{err: n.start()} - case requestTypeRelayPublish: - hash, err := n.relayPublish(req.input.(relayPublishRequest)) - req.responseCh <- response{value: hash, err: err} - case requestTypeStoreQuery: - results, err := n.storeQuery(req.input.(storeQueryRequest)) - req.responseCh <- response{value: results, err: err} - case requestTypeDestroy: - req.responseCh <- response{err: n.destroy()} - case requestTypePeerID: - peerID, err := n.peerID() - req.responseCh <- response{value: peerID, err: err} - case requestTypeStop: - req.responseCh <- response{err: n.stop()} - case requestTypeStartDiscV5: - req.responseCh <- response{err: n.startDiscV5()} - case requestTypeStopDiscV5: - req.responseCh <- response{err: n.stopDiscV5()} - case requestTypeVersion: - version, err := n.version() - req.responseCh <- response{value: version, err: err} - case requestTypePeerExchangeRequest: - numPeers, err := n.peerExchangeRequest(req.input.(uint64)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeRelaySubscribe: - req.responseCh <- response{err: n.relaySubscribe(req.input.(string))} - case requestTypeRelayUnsubscribe: - req.responseCh <- response{err: n.relayUnsubscribe(req.input.(string))} - case requestTypeConnect: - req.responseCh <- response{err: n.connect(req.input.(connectRequest))} - case requestTypeDialPeerByID: - req.responseCh <- response{err: n.dialPeerById(req.input.(dialPeerByIDRequest))} - case requestTypeListenAddresses: - addrs, err := n.listenAddresses() - req.responseCh <- response{value: addrs, err: err} - case requestTypeENR: - enr, err := n.enr() - req.responseCh <- response{value: enr, err: err} - case requestTypeListPeersInMesh: - numPeers, err := n.listPeersInMesh(req.input.(string)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeGetConnectedPeers: - peers, err := n.getConnectedPeers() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsFromPeerStore: - peers, err := n.getPeerIDsFromPeerStore() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsByProtocol: - peers, err := n.getPeerIDsByProtocol(req.input.(libp2pproto.ID)) - req.responseCh <- response{value: peers, err: err} - case requestTypeDisconnectPeerByID: - req.responseCh <- response{err: n.disconnectPeerByID(req.input.(peer.ID))} - case requestTypeDnsDiscovery: - addrs, err := n.dnsDiscovery(req.input.(dnsDiscoveryRequest)) - req.responseCh <- response{value: addrs, err: err} - case requestTypeDialPeer: - req.responseCh <- response{err: n.dialPeer(req.input.(dialPeerRequest))} - case requestTypeGetNumConnectedRelayPeers: - numPeers, err := n.getNumConnectedRelayPeers(req.input.([]string)...) - req.responseCh <- response{value: numPeers, err: err} - default: - req.responseCh <- response{err: errors.New("invalid operation")} - } - } + var resp = C.allocResp(unsafe.Pointer(&wg)) + + defer C.free(unsafe.Pointer(cJsonConfig)) + defer C.freeResp(resp) + + if C.getRet(resp) != C.RET_OK { + errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) } + + wg.Add(1) + n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) + wg.Wait() + + // Notice that the events for self node are handled by the 'MyEventCallback' method + C.cGoWakuSetEventCallback(n.wakuCtx) + + return n, nil } -func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { +func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 { pubsubTopic = "" @@ -2595,7 +2480,7 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err return 0, errors.New(errMsg) } -func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { +func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2614,7 +2499,7 @@ func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { return errors.New(errMsg) } -func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { +func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2646,7 +2531,7 @@ func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { } -func (n *WakuNode) relaySubscribe(pubsubTopic string) error { +func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { if pubsubTopic == "" { return errors.New("pubsub topic is empty") } @@ -2675,7 +2560,7 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error { return errors.New(errMsg) } -func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { +func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { if pubsubTopic == "" { return errors.New("pubsub topic is empty") } @@ -2704,7 +2589,7 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { return errors.New(errMsg) } -func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { +func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2726,7 +2611,7 @@ func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { return 0, errors.New(errMsg) } -func (n *WakuNode) startDiscV5() error { +func (n *WakuNode) StartDiscV5() error { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2742,7 +2627,7 @@ func (n *WakuNode) startDiscV5() error { return errors.New(errMsg) } -func (n *WakuNode) stopDiscV5() error { +func (n *WakuNode) StopDiscV5() error { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2759,7 +2644,7 @@ func (n *WakuNode) stopDiscV5() error { return errors.New(errMsg) } -func (n *WakuNode) version() (string, error) { +func (n *WakuNode) Version() (string, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2779,17 +2664,17 @@ func (n *WakuNode) version() (string, error) { return "", errors.New(errMsg) } -func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) { +func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { // TODO: extract timeout from context timeoutMs := time.Minute.Milliseconds() - b, err := json.Marshal(storeQueryRequest.storeRequest) + b, err := json.Marshal(storeRequest) if err != nil { return nil, err } - addrs := make([]string, len(storeQueryRequest.peerInfo.Addrs)) - for i, addr := range utils.EncapsulatePeerID(storeQueryRequest.peerInfo.ID, storeQueryRequest.peerInfo.Addrs...) { + addrs := make([]string, len(peerInfo.Addrs)) + for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { addrs[i] = addr.String() } @@ -2821,11 +2706,11 @@ func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) return nil, errors.New(errMsg) } -func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) { +func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { // TODO: extract timeout from context timeoutMs := time.Minute.Milliseconds() - jsonMsg, err := json.Marshal(relayPublishRequest.message) + jsonMsg, err := json.Marshal(message) if err != nil { return pb.MessageHash{}, err } @@ -2833,7 +2718,7 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPubsubTopic = C.CString(relayPublishRequest.pubsubTopic) + var cPubsubTopic = C.CString(pubsubTopic) var msg = C.CString(string(jsonMsg)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) @@ -2854,12 +2739,12 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes return pb.MessageHash{}, errors.New(errMsg) } -func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr.Multiaddr, error) { +func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) - var cEnrTree = C.CString(dnsDiscRequest.enrTreeUrl) - var cDnsServer = C.CString(dnsDiscRequest.nameDnsServer) + var cEnrTree = C.CString(enrTreeUrl) + var cDnsServer = C.CString(nameDnsServer) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cEnrTree)) defer C.free(unsafe.Pointer(cDnsServer)) @@ -2885,9 +2770,7 @@ func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr return nil, errors.New(errMsg) } -func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { - peerInfo := request.peerInfo - +func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) { addrs := make([]string, len(peerInfo.Addrs)) for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { addrs[i] = addr.String() @@ -2918,7 +2801,7 @@ func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { return 0, fmt.Errorf("PingPeer: %s", errMsg) } -func (n *WakuNode) start() error { +func (n *WakuNode) Start() error { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2935,7 +2818,7 @@ func (n *WakuNode) start() error { return errors.New(errMsg) } -func (n *WakuNode) stop() error { +func (n *WakuNode) Stop() error { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2952,7 +2835,7 @@ func (n *WakuNode) stop() error { return errors.New(errMsg) } -func (n *WakuNode) destroy() error { +func (n *WakuNode) Destroy() error { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2969,7 +2852,7 @@ func (n *WakuNode) destroy() error { return errors.New(errMsg) } -func (n *WakuNode) peerID() (peer.ID, error) { +func (n *WakuNode) PeerID() (peer.ID, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -2991,11 +2874,11 @@ func (n *WakuNode) peerID() (peer.ID, error) { return "", errors.New(errMsg) } -func (n *WakuNode) connect(connReq connectRequest) error { +func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPeerMultiAddr = C.CString(connReq.addr.String()) + var cPeerMultiAddr = C.CString(addr.String()) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) @@ -3011,12 +2894,12 @@ func (n *WakuNode) connect(connReq connectRequest) error { return errors.New(errMsg) } -func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { +func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPeerId = C.CString(dialPeerByIDReq.peerID.String()) - var cProtocol = C.CString(string(dialPeerByIDReq.protocol)) + var cPeerId = C.CString(peerID.String()) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerId)) defer C.free(unsafe.Pointer(cProtocol)) @@ -3033,7 +2916,7 @@ func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { return errors.New(errMsg) } -func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { +func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -3059,7 +2942,7 @@ func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { return nil, errors.New(errMsg) } -func (n *WakuNode) enr() (*enode.Node, error) { +func (n *WakuNode) ENR() (*enode.Node, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -3081,7 +2964,7 @@ func (n *WakuNode) enr() (*enode.Node, error) { return nil, errors.New(errMsg) } -func (n *WakuNode) listPeersInMesh(pubsubTopic string) (int, error) { +func (n *WakuNode) ListPeersInMesh(pubsubTopic string) (int, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -3106,7 +2989,7 @@ func (n *WakuNode) listPeersInMesh(pubsubTopic string) (int, error) { return 0, errors.New(errMsg) } -func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { +func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -3137,11 +3020,11 @@ func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) } -func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice, error) { +func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) - var cProtocol = C.CString(string(protocolID)) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cProtocol)) @@ -3171,42 +3054,12 @@ func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice return nil, fmt.Errorf("GetPeerIdsByProtocol: %s", errMsg) } -func (n *WakuNode) newNode(config *WakuConfig) error { - jsonConfig, err := json.Marshal(config) - if err != nil { - return err - } - - var cJsonConfig = C.CString(string(jsonConfig)) - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - - defer C.free(unsafe.Pointer(cJsonConfig)) - defer C.freeResp(resp) - - if C.getRet(resp) != C.RET_OK { - errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) - } - - wg.Add(1) - wakuCtx := C.cGoWakuNew(cJsonConfig, resp) - wg.Wait() - n.wakuCtx = unsafe.Pointer(wakuCtx) - - // Notice that the events for self node are handled by the 'MyEventCallback' method - C.cGoWakuSetEventCallback(n.wakuCtx) - - return nil -} - -func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { +func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String()) - var cProtocol = C.CString(string(dialPeerReq.protocol)) + var cPeerMultiAddr = C.CString(peerAddr.String()) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) defer C.free(unsafe.Pointer(cProtocol)) @@ -3221,177 +3074,6 @@ func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { return errors.New(errMsg) } -type pingRequest struct { - ctx context.Context - peerInfo peer.AddrInfo -} - -func (n *WakuNode) PingPeer(ctx context.Context, info peer.AddrInfo) (time.Duration, error) { - response, err := n.postTask(requestTypePing, pingRequest{ - ctx: ctx, - peerInfo: info, - }) - if err != nil { - return 0, err - } - return response.(time.Duration), nil -} - -func (n *WakuNode) Start() error { - _, err := n.postTask(requestTypeStart, nil) - return err -} - -type relayPublishRequest struct { - ctx context.Context - pubsubTopic string - message *pb.WakuMessage -} - -func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { - response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{ - ctx: ctx, - pubsubTopic: pubsubTopic, - message: message, - }) - if err != nil { - return pb.MessageHash{}, err - } - return response.(pb.MessageHash), nil -} - -type storeQueryRequest struct { - ctx context.Context - storeRequest *storepb.StoreQueryRequest - peerInfo peer.AddrInfo -} - -func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { - response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{ - ctx: ctx, - peerInfo: peerInfo, - storeRequest: storeRequest, - }) - if err != nil { - return nil, err - } - return response.(*storepb.StoreQueryResponse), nil -} - -func (n *WakuNode) PeerID() (peer.ID, error) { - response, err := n.postTask(requestTypePeerID, nil) - if err != nil { - return "", err - } - return response.(peer.ID), nil -} - -func (n *WakuNode) Stop() error { - _, err := n.postTask(requestTypeStop, nil) - return err -} - -func (n *WakuNode) Destroy() error { - _, err := n.postTask(requestTypeDestroy, nil) - return err -} - -func (n *WakuNode) StartDiscV5() error { - _, err := n.postTask(requestTypeStartDiscV5, nil) - return err -} - -func (n *WakuNode) StopDiscV5() error { - _, err := n.postTask(requestTypeStopDiscV5, nil) - return err -} - -func (n *WakuNode) Version() (string, error) { - response, err := n.postTask(requestTypeVersion, nil) - if err != nil { - return "", err - } - return response.(string), nil -} - -func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { - _, err := n.postTask(requestTypeRelaySubscribe, pubsubTopic) - return err -} - -func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { - _, err := n.postTask(requestTypeRelayUnsubscribe, pubsubTopic) - return err -} - -func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { - response, err := n.postTask(requestTypePeerExchangeRequest, numPeers) - if err != nil { - return 0, err - } - return response.(uint64), nil -} - -type connectRequest struct { - ctx context.Context - addr multiaddr.Multiaddr -} - -func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { - _, err := n.postTask(requestTypeConnect, connectRequest{ - ctx: ctx, - addr: addr, - }) - return err -} - -type dialPeerByIDRequest struct { - ctx context.Context - peerID peer.ID - protocol libp2pproto.ID -} - -func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { - _, err := n.postTask(requestTypeDialPeerByID, dialPeerByIDRequest{ - ctx: ctx, - peerID: peerID, - protocol: protocol, - }) - return err -} - -func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { - response, err := n.postTask(requestTypeListenAddresses, nil) - if err != nil { - return nil, err - } - return response.([]multiaddr.Multiaddr), nil -} - -func (n *WakuNode) ENR() (*enode.Node, error) { - response, err := n.postTask(requestTypeENR, nil) - if err != nil { - return nil, err - } - return response.(*enode.Node), nil -} - -func (n *WakuNode) ListPeersInMesh(pubsubTopic string) (int, error) { - response, err := n.postTask(requestTypeListPeersInMesh, pubsubTopic) - if err != nil { - return 0, err - } - return response.(int), nil -} - -func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetConnectedPeers, nil) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - func (n *WakuNode) GetNumConnectedPeers() (int, error) { peers, err := n.GetConnectedPeers() if err != nil { @@ -3399,65 +3081,3 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) { } return len(peers), nil } - -func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetPeerIDsFromPeerStore, nil) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - -func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetPeerIDsByProtocol, protocol) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - -func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { - _, err := n.postTask(requestTypeDisconnectPeerByID, peerID) - return err -} - -type dnsDiscoveryRequest struct { - ctx context.Context - enrTreeUrl string - nameDnsServer string -} - -func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { - response, err := n.postTask(requestTypeDnsDiscovery, dnsDiscoveryRequest{ - ctx: ctx, - enrTreeUrl: enrTreeUrl, - nameDnsServer: nameDnsServer, - }) - if err != nil { - return nil, err - } - return response.([]multiaddr.Multiaddr), nil -} - -type dialPeerRequest struct { - ctx context.Context - peerAddr multiaddr.Multiaddr - protocol libp2pproto.ID -} - -func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { - _, err := n.postTask(requestTypeDialPeer, dialPeerRequest{ - ctx: ctx, - peerAddr: peerAddr, - protocol: protocol, - }) - return err -} - -func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { - response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic) - if err != nil { - return 0, err - } - return response.(int), nil -}