diff --git a/.gitignore b/.gitignore index 54d435eb..a51e35a2 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,6 @@ data.etcd/ *.log +# vscode +.vscode + diff --git a/authutils/auth_service.go b/authutils/auth_service.go index 63a94f34..9001b078 100755 --- a/authutils/auth_service.go +++ b/authutils/auth_service.go @@ -109,12 +109,12 @@ func CheckIfTokenHasExpired(expiredBlock *big.Int) error { // CurrentBlock Get the current block number from on chain func CurrentBlock() (*big.Int, error) { - if ethClient, err := blockchain.GetEthereumClient(); err != nil { + if ethHttpClient, _, err := blockchain.CreateEthereumClients(); err != nil { return nil, err } else { - defer ethClient.RawClient.Close() + defer ethHttpClient.RawClient.Close() var currentBlockHex string - if err = ethClient.RawClient.CallContext(context.Background(), ¤tBlockHex, "eth_blockNumber"); err != nil { + if err = ethHttpClient.RawClient.CallContext(context.Background(), ¤tBlockHex, "eth_blockNumber"); err != nil { zap.L().Error("error determining current block", zap.Error(err)) return nil, fmt.Errorf("error determining current block: %v", err) } diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index b336f0e8..bfa5b7e9 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -29,8 +29,10 @@ type jobInfo struct { type Processor struct { enabled bool - ethClient *ethclient.Client - rawClient *rpc.Client + ethHttpClient *ethclient.Client + rawHttpClient *rpc.Client + ethWSClient *ethclient.Client + rawWSClient *rpc.Client sigHasher func([]byte) []byte privateKey *ecdsa.PrivateKey address string @@ -55,11 +57,13 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) { // Setup ethereum client - if ethclients, err := GetEthereumClient(); err != nil { + if ethHttpClients, ethWSClients, err := CreateEthereumClients(); err != nil { return p, errors.Wrap(err, "error creating RPC client") } else { - p.rawClient = ethclients.RawClient - p.ethClient = ethclients.EthClient + p.rawHttpClient = ethHttpClients.RawClient + p.ethHttpClient = ethHttpClients.EthClient + p.rawWSClient = ethWSClients.RawClient + p.ethWSClient = ethWSClients.EthClient } // TODO: if address is not in config, try to load it using network @@ -68,7 +72,7 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) { p.escrowContractAddress = metadata.GetMpeAddress() - if mpe, err := NewMultiPartyEscrow(p.escrowContractAddress, p.ethClient); err != nil { + if mpe, err := NewMultiPartyEscrow(p.escrowContractAddress, p.ethHttpClient); err != nil { return p, errors.Wrap(err, "error instantiating MultiPartyEscrow contract") } else { p.multiPartyEscrow = mpe @@ -82,6 +86,23 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) { return p, nil } +func (processor *Processor) ReconnectToWsClient() error { + processor.ethWSClient.Close() + processor.rawHttpClient.Close() + + zap.L().Debug("Try to reconnect to websocket client") + + newEthWSClients, err := CreateWSEthereumClient() + if err != nil { + return err + } + + processor.ethWSClient = newEthWSClients.EthClient + processor.rawWSClient = newEthWSClients.RawClient + + return nil +} + func (processor *Processor) Enabled() (enabled bool) { return processor.enabled } @@ -94,11 +115,19 @@ func (processor *Processor) MultiPartyEscrow() *MultiPartyEscrow { return processor.multiPartyEscrow } +func (processor *Processor) GetEthHttpClient() *ethclient.Client { + return processor.ethHttpClient +} + +func (processor *Processor) GetEthWSClient() *ethclient.Client { + return processor.ethWSClient +} + func (processor *Processor) CurrentBlock() (currentBlock *big.Int, err error) { // We have to do a raw call because the standard method of ethClient.HeaderByNumber(ctx, nil) errors on // unmarshaling the response currently. See https://github.com/ethereum/go-ethereum/issues/3230 var currentBlockHex string - if err = processor.rawClient.CallContext(context.Background(), ¤tBlockHex, "eth_blockNumber"); err != nil { + if err = processor.rawHttpClient.CallContext(context.Background(), ¤tBlockHex, "eth_blockNumber"); err != nil { zap.L().Error("error determining current block", zap.Error(err)) return nil, fmt.Errorf("error determining current block: %v", err) } @@ -114,6 +143,8 @@ func (processor *Processor) HasIdentity() bool { } func (processor *Processor) Close() { - processor.ethClient.Close() - processor.rawClient.Close() + processor.ethHttpClient.Close() + processor.rawHttpClient.Close() + processor.ethWSClient.Close() + processor.rawWSClient.Close() } diff --git a/blockchain/ethereumClient.go b/blockchain/ethereumClient.go index 0cd32b97..d394c874 100644 --- a/blockchain/ethereumClient.go +++ b/blockchain/ethereumClient.go @@ -3,10 +3,12 @@ package blockchain import ( "context" "encoding/base64" + + "github.com/singnet/snet-daemon/config" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" "github.com/pkg/errors" - "github.com/singnet/snet-daemon/config" ) type EthereumClient struct { @@ -19,21 +21,49 @@ func basicAuth(username, password string) string { return base64.StdEncoding.EncodeToString([]byte(auth)) } -func GetEthereumClient() (*EthereumClient, error) { +func CreateEthereumClients() (*EthereumClient, *EthereumClient, error) { + ethereumHttpClient, err := CreateHTTPEthereumClient() + if err != nil { + return nil, nil, err + } + + ethereumWsClient, err := CreateWSEthereumClient() + if err != nil { + return nil, nil, err + } + + return ethereumHttpClient, ethereumWsClient, nil +} - ethereumClient := new(EthereumClient) - if client, err := rpc.DialOptions(context.Background(), - config.GetBlockChainEndPoint(), - rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey)))); err != nil { +func CreateHTTPEthereumClient() (*EthereumClient, error) { + ethereumHttpClient := new(EthereumClient) + httpClient, err := rpc.DialOptions( + context.Background(), + config.GetBlockChainHTTPEndPoint(), + rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey)))) + if err != nil { return nil, errors.Wrap(err, "error creating RPC client") - } else { - ethereumClient.RawClient = client - ethereumClient.EthClient = ethclient.NewClient(client) } - return ethereumClient, nil + ethereumHttpClient.RawClient = httpClient + ethereumHttpClient.EthClient = ethclient.NewClient(httpClient) + return ethereumHttpClient, nil +} +func CreateWSEthereumClient() (*EthereumClient, error) { + ethereumWsClient := new(EthereumClient) + wsClient, err := rpc.DialOptions( + context.Background(), + config.GetBlockChainWSEndPoint(), + rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey)))) + if err != nil { + return nil, errors.Wrap(err, "error creating RPC WebSocket client") + } + ethereumWsClient.RawClient = wsClient + ethereumWsClient.EthClient = ethclient.NewClient(wsClient) + return ethereumWsClient, nil } + func (ethereumClient *EthereumClient) Close() { if ethereumClient != nil { ethereumClient.EthClient.Close() diff --git a/blockchain/orginzationMetadata.go b/blockchain/orginzationMetadata.go index f30cbe78..af97bbaa 100644 --- a/blockchain/orginzationMetadata.go +++ b/blockchain/orginzationMetadata.go @@ -125,10 +125,11 @@ func checkMandatoryFields(metaData *OrganizationMetaData) (err error) { if metaData.daemonGroup.PaymentDetails.PaymentChannelStorageClient.Endpoints == nil { err = fmt.Errorf("Mandatory field : ETCD Client Endpoints are mising for the Group %v ", metaData.daemonGroup.GroupName) } - if &metaData.recipientPaymentAddress == nil { - err = fmt.Errorf("Mandatory field : Recipient Address is missing for the Group %v ", metaData.daemonGroup.GroupName) + + if metaData.recipientPaymentAddress == (common.Address{}) { + err = fmt.Errorf("Mandatory field : Recepient Address is missing for the Group %v ", metaData.daemonGroup.GroupName) } - return + return err } func setDerivedAttributes(metaData *OrganizationMetaData) (err error) { @@ -187,7 +188,7 @@ func getMetaDataURI() []byte { organizationRegistered, err := reg.GetOrganizationById(nil, orgId) if err != nil || !organizationRegistered.Found { - zap.L().Panic("Error Retrieving contract details for the Given Organization", zap.String("OrganizationId", config.GetString(config.OrganizationId))) + zap.L().Panic("Error Retrieving contract details for the Given Organization", zap.String("OrganizationId", config.GetString(config.OrganizationId)), zap.Error(err)) } return organizationRegistered.OrgMetadataURI[:] } diff --git a/blockchain/serviceMetadata.go b/blockchain/serviceMetadata.go index ea18972c..7f22dacf 100644 --- a/blockchain/serviceMetadata.go +++ b/blockchain/serviceMetadata.go @@ -11,6 +11,7 @@ import ( "github.com/bufbuild/protocompile" pproto "github.com/emicklei/proto" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" "github.com/pkg/errors" "github.com/singnet/snet-daemon/config" "github.com/singnet/snet-daemon/ipfsutils" @@ -277,13 +278,22 @@ func ReadServiceMetaDataFromLocalFile(filename string) (*ServiceMetadata, error) } func getRegistryCaller() (reg *RegistryCaller) { - ethClient, err := GetEthereumClient() + ethHttpClient, err := CreateHTTPEthereumClient() if err != nil { zap.L().Panic("Unable to get Blockchain client ", zap.Error(err)) } - defer ethClient.Close() + defer ethHttpClient.Close() registryContractAddress := getRegistryAddressKey() - reg, err = NewRegistryCaller(registryContractAddress, ethClient.EthClient) + reg, err = NewRegistryCaller(registryContractAddress, ethHttpClient.EthClient) + if err != nil { + zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress)) + } + return reg +} + +func GetRegistryFilterer(ethWsClient *ethclient.Client) *RegistryFilterer { + registryContractAddress := getRegistryAddressKey() + reg, err := NewRegistryFilterer(registryContractAddress, ethWsClient) if err != nil { zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress)) } diff --git a/blockchain/utils.go b/blockchain/utils.go index 7bca8a61..dea7e943 100644 --- a/blockchain/utils.go +++ b/blockchain/utils.go @@ -81,3 +81,21 @@ func ToChecksumAddress(hexAddress string) string { mixedAddress := common.NewMixedcaseAddress(address) return mixedAddress.Address().String() } + +/* +MakeTopicFilterer is used to generate a filter for querying Ethereum logs or contract events. +Ethereum topics (such as for events) are 32-byte fixed-size values (common for hashing +in Ethereum logs). This function takes a string parameter, converts it into a 32-byte array, +and returns it in a slice. This allows developers to create filters when looking for +specific events or log entries based on the topic. +*/ +func MakeTopicFilterer(param string) [][32]byte { + // Create a 32-byte array + var param32Byte [32]byte + + // Convert the string to a byte slice and copy up to 32 bytes + copy(param32Byte[:], []byte(param)[:min(len(param), 32)]) + + // Return the filter with a single element (the 32-byte array) + return [][32]byte{param32Byte} +} diff --git a/config/blockchain_network_config.go b/config/blockchain_network_config.go index 92c30ea5..238ecf70 100644 --- a/config/blockchain_network_config.go +++ b/config/blockchain_network_config.go @@ -12,17 +12,19 @@ import ( ) type NetworkSelected struct { - NetworkName string - EthereumJSONRPCEndpoint string - NetworkId string - RegistryAddressKey string + NetworkName string + EthereumJSONRPCHTTPEndpoint string + EthereumJSONRPCWSEndpoint string + NetworkId string + RegistryAddressKey string } const ( - BlockChainNetworkFileName = "resources/blockchain_network_config.json" - EthereumJsonRpcEndpointKey = "ethereum_json_rpc_endpoint" - NetworkId = "network_id" - RegistryAddressKey = "registry_address_key" + BlockChainNetworkFileName = "resources/blockchain_network_config.json" + EthereumJsonRpcHTTPEndpointKey = "ethereum_json_rpc_http_endpoint" + EthereumJsonRpcWSEndpointKey = "ethereum_json_rpc_ws_endpoint" + NetworkId = "network_id" + RegistryAddressKey = "registry_address_key" ) var networkSelected = &NetworkSelected{} @@ -38,7 +40,8 @@ func determineNetworkSelected(data []byte) (err error) { //Ethereum End point and Network ID mapped to networkSelected.NetworkName = networkName networkSelected.RegistryAddressKey = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[RegistryAddressKey], RegistryAddressKey) - networkSelected.EthereumJSONRPCEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcEndpointKey], EthereumJsonRpcEndpointKey) + networkSelected.EthereumJSONRPCHTTPEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcHTTPEndpointKey], EthereumJsonRpcHTTPEndpointKey) + networkSelected.EthereumJSONRPCWSEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcWSEndpointKey], EthereumJsonRpcWSEndpointKey) networkSelected.NetworkId = fmt.Sprintf("%v", dynamicBinding[networkName].(map[string]any)[NetworkId]) return err @@ -61,9 +64,13 @@ func GetNetworkId() string { return networkSelected.NetworkId } -// Get the block chain end point associated with the Network selected -func GetBlockChainEndPoint() string { - return networkSelected.EthereumJSONRPCEndpoint +// Get the blockchain endpoint associated with the Network selected +func GetBlockChainHTTPEndPoint() string { + return networkSelected.EthereumJSONRPCHTTPEndpoint +} + +func GetBlockChainWSEndPoint() string { + return networkSelected.EthereumJSONRPCWSEndpoint } // Get the Registry address of the contract @@ -109,9 +116,11 @@ func deriveDatafromJSON(data []byte) (err error) { networkSelected.RegistryAddressKey = fmt.Sprintf("%v", m[GetNetworkId()].(map[string]any)["address"]) zap.L().Info("Derive data from JSON", zap.String("Network", GetString(BlockChainNetworkSelected)), - zap.String("NetwrokId", GetNetworkId()), - zap.String("RegistryAddress", GetRegistryAddress()), - zap.String("Blockchain endpoint", GetBlockChainEndPoint())) + zap.String("Netwrok id", GetNetworkId()), + zap.String("Registry address", GetRegistryAddress()), + zap.String("Blockchain http endpoint", GetBlockChainHTTPEndPoint()), + zap.String("Blockchain ws endpoint", GetBlockChainWSEndPoint()), + ) return nil } diff --git a/config/blockchain_network_config_test.go b/config/blockchain_network_config_test.go index 9f6771c0..acaeff5e 100644 --- a/config/blockchain_network_config_test.go +++ b/config/blockchain_network_config_test.go @@ -16,31 +16,35 @@ func TestGetNetworkId(t *testing.T) { var defaultBlockChainNetworkConfig = ` { "local": { - "ethereum_json_rpc_endpoint": "http://localhost:8545", - "network_id": "42", + "ethereum_json_rpc_http_endpoint": "http://localhost:8545", + "ethereum_json_rpc_ws_endpoint": "ws://localhost:443", + "network_id": "42", "registry_address_key": "0x4e74fefa82e83e0964f0d9f53c68e03f7298a8b2" }, "main": { - "ethereum_json_rpc_endpoint": "https://mainnet.infura.io/v3", + "ethereum_json_rpc_http_endpoint": "https://mainnet.infura.io/v3", + "ethereum_json_rpc_ws_endpoint": "wss://mainnet.infura.io/v3", "network_id": "1" }, "goerli": { - "ethereum_json_rpc_endpoint": "https://goerli.infura.io/v3", + "ethereum_json_rpc_http_endpoint": "https://goerli.infura.io/v3", + "ethereum_json_rpc_ws_endpoint": "wss://goerli.infura.io/v3", "network_id": "5" }, "sepolia": { - "ethereum_json_rpc_endpoint": "https://sepolia.infura.io/v3", + "ethereum_json_rpc_http_endpoint": "https://sepolia.infura.io/v3", + "ethereum_json_rpc_ws_endpoint": "wss://sepolia.infura.io/v3", "network_id": "11155111" } }` func TestGetBlockChainEndPoint(t *testing.T) { Vip().Set(BlockChainNetworkSelected, "local") - determineNetworkSelected([]byte(defaultBlockChainNetworkConfig)) - - assert.Matches(t, GetBlockChainEndPoint(), GetString(BlockChainNetworkSelected)) + err := determineNetworkSelected([]byte(defaultBlockChainNetworkConfig)) + assert.Equal(t, err, nil) + assert.Matches(t, GetBlockChainHTTPEndPoint(), GetString(BlockChainNetworkSelected)) + assert.Matches(t, GetBlockChainWSEndPoint(), GetString(BlockChainNetworkSelected)) assert2.NotEqual(t, GetNetworkId(), nil) - } func TestGetRegistryAddress(t *testing.T) { @@ -92,7 +96,7 @@ func Test_GetDetailsFromJsonOrConfig(t *testing.T) { want string network string }{ - {EthereumJsonRpcEndpointKey, "https://sepolia.infura.io/v3", "sepolia"}, + {EthereumJsonRpcHTTPEndpointKey, "https://sepolia.infura.io/v3", "sepolia"}, {RegistryAddressKey, "0x4e74fefa82e83e0964f0d9f53c68e03f7298a8b2", "local"}, } for _, tt := range tests { diff --git a/config/config.go b/config/config.go index 802f5caa..5e6c0647 100644 --- a/config/config.go +++ b/config/config.go @@ -13,10 +13,9 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "go.uber.org/zap" - "github.com/spf13/cast" "github.com/spf13/viper" + "go.uber.org/zap" ) const ( @@ -113,7 +112,8 @@ const ( }, "payment_channel_storage_client": { "connection_timeout": "0s", - "request_timeout": "0s" + "request_timeout": "0s", + "hot_reload": true }, "payment_channel_storage_server": { "id": "storage-1", diff --git a/contract_event_listener/contract_event_listener.go b/contract_event_listener/contract_event_listener.go new file mode 100644 index 00000000..cd6323e9 --- /dev/null +++ b/contract_event_listener/contract_event_listener.go @@ -0,0 +1,14 @@ +package contractlistener + +import ( + "github.com/singnet/snet-daemon/blockchain" + "github.com/singnet/snet-daemon/etcddb" +) + +type EventSignature string + +type ContractEventListener struct { + BlockchainProcessor *blockchain.Processor + CurrentOrganizationMetaData *blockchain.OrganizationMetaData + CurrentEtcdClient *etcddb.EtcdClient +} diff --git a/contract_event_listener/listen_organization_metadata_changing.go b/contract_event_listener/listen_organization_metadata_changing.go new file mode 100644 index 00000000..cf253459 --- /dev/null +++ b/contract_event_listener/listen_organization_metadata_changing.go @@ -0,0 +1,75 @@ +package contractlistener + +import ( + "context" + "slices" + + "github.com/singnet/snet-daemon/blockchain" + "github.com/singnet/snet-daemon/etcddb" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/gorilla/websocket" + "go.uber.org/zap" +) + +func (l *ContractEventListener) ListenOrganizationMetadataChanging() { + zap.L().Info("Starting contract event listener for organization metedata changing") + + watchOpts := &bind.WatchOpts{ + Start: nil, + Context: context.Background(), + } + + ethWSClient := l.BlockchainProcessor.GetEthWSClient() + + registryFilterer := blockchain.GetRegistryFilterer(ethWSClient) + orgIdFilter := blockchain.MakeTopicFilterer(l.CurrentOrganizationMetaData.OrgID) + + eventContractChannel := make(chan *blockchain.RegistryOrganizationModified) + sub, err := registryFilterer.WatchOrganizationModified(watchOpts, eventContractChannel, orgIdFilter) + + if err != nil { + zap.L().Fatal("Failed to subscribe to logs", zap.Error(err)) + } + + for { + select { + case err := <-sub.Err(): + if err != nil { + zap.L().Error("Subscription error: ", zap.Error(err)) + if websocket.IsCloseError( + err, + websocket.CloseNormalClosure, + websocket.CloseAbnormalClosure, + websocket.CloseGoingAway, + websocket.CloseServiceRestart, + websocket.CloseTryAgainLater, + websocket.CloseTLSHandshake, + ) { + err = l.BlockchainProcessor.ReconnectToWsClient() + if err != nil { + zap.L().Error("Error in reconnecting to websockets", zap.Error(err)) + } + } + } + case logData := <-eventContractChannel: + zap.L().Debug("Log received", zap.Any("value", logData)) + + // Get metaDataUri from smart contract and organizationMetaData from IPFS + newOrganizationMetaData := blockchain.GetOrganizationMetaData() + zap.L().Info("Get new organization metadata", zap.Any("value", newOrganizationMetaData)) + + if slices.Compare(l.CurrentOrganizationMetaData.GetPaymentStorageEndPoints(), newOrganizationMetaData.GetPaymentStorageEndPoints()) != 0 { + l.CurrentEtcdClient.Close() + newEtcdbClient, err := etcddb.Reconnect(newOrganizationMetaData) + if err != nil { + zap.L().Error("Error in reconnecting to etcd", zap.Error(err)) + } + l.CurrentEtcdClient = newEtcdbClient + } + + l.CurrentOrganizationMetaData = newOrganizationMetaData + zap.L().Info("Update current organization metadata", zap.Any("value", l.CurrentOrganizationMetaData)) + } + } +} diff --git a/etcddb/etcddb_client.go b/etcddb/etcddb_client.go index f36473dc..36730014 100644 --- a/etcddb/etcddb_client.go +++ b/etcddb/etcddb_client.go @@ -37,9 +37,10 @@ func (mutex *EtcdClientMutex) Unlock(ctx context.Context) (err error) { // EtcdClient struct has some useful methods to work with an etcd client type EtcdClient struct { - timeout time.Duration - session *concurrency.Session - etcdv3 *clientv3.Client + hotReaload bool + timeout time.Duration + session *concurrency.Session + etcdv3 *clientv3.Client } // NewEtcdClient create new etcd storage client. @@ -93,13 +94,23 @@ func NewEtcdClientFromVip(vip *viper.Viper, metaData *blockchain.OrganizationMet } client = &EtcdClient{ - timeout: conf.RequestTimeout, - session: session, - etcdv3: etcdv3, + hotReaload: conf.HotReload, + timeout: conf.RequestTimeout, + session: session, + etcdv3: etcdv3, } return } +func Reconnect(metadata *blockchain.OrganizationMetaData) (*EtcdClient, error) { + etcdClient, err := NewEtcdClientFromVip(config.Vip(), metadata) + if err != nil { + return nil, err + } + zap.L().Info("Successful reconnet to new etcd endpoints", zap.Strings("New endpoints", metadata.GetPaymentStorageEndPoints())) + return etcdClient, nil +} + func getTlsConfig() (*tls.Config, error) { zap.L().Debug("enabling SSL support via X509 keypair") cert, err := tls.LoadX509KeyPair(config.GetString(config.PaymentChannelCertPath), config.GetString(config.PaymentChannelKeyPath)) @@ -466,6 +477,10 @@ func (client *EtcdClient) StartTransaction(keys []string) (_transaction storage. return transaction, nil } +func (client *EtcdClient) IsHotReloadEnabled() bool { + return client.hotReaload +} + type keyValueVersion struct { Key string Value string diff --git a/etcddb/etcddb_conf.go b/etcddb/etcddb_conf.go index 653ec338..ca33c890 100644 --- a/etcddb/etcddb_conf.go +++ b/etcddb/etcddb_conf.go @@ -7,6 +7,7 @@ import ( "github.com/singnet/snet-daemon/blockchain" "github.com/singnet/snet-daemon/config" "github.com/spf13/viper" + "go.uber.org/zap" ) // EtcdClientConf config @@ -16,6 +17,7 @@ import ( type EtcdClientConf struct { ConnectionTimeout time.Duration `json:"connection_timeout" mapstructure:"connection_timeout"` RequestTimeout time.Duration `json:"request_timeout" mapstructure:"request_timeout"` + HotReload bool `json:"hot_reload" mapstructure:"hot_reload"` Endpoints []string } @@ -27,6 +29,7 @@ func GetEtcdClientConf(vip *viper.Viper, metaData *blockchain.OrganizationMetaDa conf = &EtcdClientConf{ ConnectionTimeout: metaData.GetConnectionTimeOut(), RequestTimeout: metaData.GetRequestTimeOut(), + HotReload: true, Endpoints: metaData.GetPaymentStorageEndPoints(), } @@ -46,6 +49,8 @@ func GetEtcdClientConf(vip *viper.Viper, metaData *blockchain.OrganizationMetaDa conf.ConnectionTimeout = confFromVip.ConnectionTimeout } + conf.HotReload = confFromVip.HotReload + zap.L().Info("Etcd client hot reload", zap.Bool("enable", conf.HotReload)) return } diff --git a/ipfsutils/ipfsutils_test.go b/ipfsutils/ipfsutils_test.go new file mode 100644 index 00000000..ddd4caef --- /dev/null +++ b/ipfsutils/ipfsutils_test.go @@ -0,0 +1,59 @@ +package ipfsutils + +import ( + "testing" + + "github.com/ipfs/kubo/client/rpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + _ "github.com/singnet/snet-daemon/config" +) + +type IpfsUtilsTestSuite struct { + suite.Suite + ipfsClient *rpc.HttpApi +} + +func TestIpfsUtilsTestSuite(t *testing.T) { + suite.Run(t, new(IpfsUtilsTestSuite)) +} + +func (suite *IpfsUtilsTestSuite) BeforeTest() { + suite.ipfsClient = GetIPFSClient() + assert.NotNil(suite.T(), suite.ipfsClient) +} + +func (suite *IpfsUtilsTestSuite) TestReadFiles() { + // For testing purposes, a hash is used from the calculator service. + hash := "QmeyrQkEyba8dd4rc3jrLd5pEwsxHutfH2RvsSaeSMqTtQ" + data := GetIpfsFile(hash) + assert.NotNil(suite.T(), data) + + protoFiles, err := ReadFilesCompressed(data) + + assert.Nil(suite.T(), err) + assert.NotNil(suite.T(), protoFiles) + + excpectedProtoFiles := []string{`syntax = "proto3"; + +package example_service; + +message Numbers { + float a = 1; + float b = 2; +} + +message Result { + float value = 1; +} + +service Calculator { + rpc add(Numbers) returns (Result) {} + rpc sub(Numbers) returns (Result) {} + rpc mul(Numbers) returns (Result) {} + rpc div(Numbers) returns (Result) {} +}`} + + assert.Equal(suite.T(), excpectedProtoFiles, protoFiles) +} diff --git a/metrics/request_stats.go b/metrics/request_stats.go index cf24dca7..d9eff875 100644 --- a/metrics/request_stats.go +++ b/metrics/request_stats.go @@ -35,7 +35,7 @@ func createRequestStat(commonStat *CommonStats) *RequestStats { request := &RequestStats{ Type: "request", RegistryAddressKey: config.GetRegistryAddress(), - EthereumJsonRpcEndpointKey: config.GetBlockChainEndPoint(), + EthereumJsonRpcEndpointKey: config.GetBlockChainHTTPEndPoint(), RequestID: commonStat.ID, GroupID: commonStat.GroupID, DaemonEndPoint: commonStat.DaemonEndPoint, diff --git a/metrics/response_stats.go b/metrics/response_stats.go index bf4d132c..72be4b09 100644 --- a/metrics/response_stats.go +++ b/metrics/response_stats.go @@ -1,12 +1,13 @@ package metrics import ( - "github.com/singnet/snet-daemon/config" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "math/big" "strconv" "time" + + "github.com/singnet/snet-daemon/config" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -100,7 +101,7 @@ func createResponseStats(commonStat *CommonStats, duration time.Duration, err er response := &ResponseStats{ Type: "response", RegistryAddressKey: config.GetRegistryAddress(), - EthereumJsonRpcEndpointKey: config.GetBlockChainEndPoint(), + EthereumJsonRpcEndpointKey: config.GetBlockChainHTTPEndPoint(), RequestID: commonStat.ID, ResponseTime: strconv.FormatFloat(duration.Seconds(), 'f', 4, 64), GroupID: daemonGroupId, diff --git a/resources/blockchain_network_config.json b/resources/blockchain_network_config.json index 3b390c19..6f93c13b 100644 --- a/resources/blockchain_network_config.json +++ b/resources/blockchain_network_config.json @@ -5,15 +5,18 @@ "registry_address_key": "0x4e74fefa82e83e0964f0d9f53c68e03f7298a8b2" }, "main": { - "ethereum_json_rpc_endpoint": "https://mainnet.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_http_endpoint": "https://mainnet.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_ws_endpoint": "wss://mainnet.infura.io/ws/v3/09027f4a13e841d48dbfefc67e7685d5", "network_id": "1" }, "goerli": { - "ethereum_json_rpc_endpoint": "https://goerli.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_http_endpoint": "https://goerli.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_ws_endpoint": "wss://goerli.infura.io/ws/v3/09027f4a13e841d48dbfefc67e7685d5", "network_id": "5" }, "sepolia": { - "ethereum_json_rpc_endpoint": "https://sepolia.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_http_endpoint": "https://sepolia.infura.io/v3/98064002908248a0b0d837940d2c647b", + "ethereum_json_rpc_ws_endpoint": "wss://sepolia.infura.io/ws/v3/98064002908248a0b0d837940d2c647b", "network_id": "11155111" } } \ No newline at end of file diff --git a/snetd/cmd/serve.go b/snetd/cmd/serve.go index e026c96f..225984d8 100644 --- a/snetd/cmd/serve.go +++ b/snetd/cmd/serve.go @@ -10,19 +10,21 @@ import ( "strings" "syscall" - "github.com/gorilla/handlers" - "github.com/improbable-eng/grpc-web/go/grpcweb" - "github.com/pkg/errors" - "github.com/rs/cors" "github.com/singnet/snet-daemon/blockchain" "github.com/singnet/snet-daemon/config" "github.com/singnet/snet-daemon/configuration_service" + contractListener "github.com/singnet/snet-daemon/contract_event_listener" "github.com/singnet/snet-daemon/escrow" "github.com/singnet/snet-daemon/handler" "github.com/singnet/snet-daemon/handler/httphandler" "github.com/singnet/snet-daemon/logger" "github.com/singnet/snet-daemon/metrics" "github.com/singnet/snet-daemon/training" + + "github.com/gorilla/handlers" + "github.com/improbable-eng/grpc-web/go/grpcweb" + "github.com/pkg/errors" + "github.com/rs/cors" "github.com/soheilhy/cmux" "github.com/spf13/cobra" "go.uber.org/zap" @@ -63,6 +65,18 @@ var ServeCmd = &cobra.Command{ d.start() defer d.stop() + // Check if the payment storage client is etcd by verifying if d.components.etcdClient exists. + // If etcdClient is not nil and hot reload is enabled, initialize a ContractEventListener + // to listen for changes in the organization metadata. + if d.components.etcdClient != nil && d.components.etcdClient.IsHotReloadEnabled() { + contractEventLister := contractListener.ContractEventListener{ + BlockchainProcessor: &d.blockProc, + CurrentOrganizationMetaData: components.OrganizationMetaData(), + CurrentEtcdClient: components.EtcdClient(), + } + go contractEventLister.ListenOrganizationMetadataChanging() + } + sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) <-sigChan