From fb1bd0edd6eb3bb236e6a273253de60fd0b3aed7 Mon Sep 17 00:00:00 2001 From: pls-github-dont-suspend-me Date: Mon, 26 Aug 2024 13:18:20 +0300 Subject: [PATCH 1/7] add tests for ipfsutils --- ipfsutils/ipfsutils_test.go | 59 +++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 ipfsutils/ipfsutils_test.go diff --git a/ipfsutils/ipfsutils_test.go b/ipfsutils/ipfsutils_test.go new file mode 100644 index 00000000..ddd4caef --- /dev/null +++ b/ipfsutils/ipfsutils_test.go @@ -0,0 +1,59 @@ +package ipfsutils + +import ( + "testing" + + "github.com/ipfs/kubo/client/rpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + _ "github.com/singnet/snet-daemon/config" +) + +type IpfsUtilsTestSuite struct { + suite.Suite + ipfsClient *rpc.HttpApi +} + +func TestIpfsUtilsTestSuite(t *testing.T) { + suite.Run(t, new(IpfsUtilsTestSuite)) +} + +func (suite *IpfsUtilsTestSuite) BeforeTest() { + suite.ipfsClient = GetIPFSClient() + assert.NotNil(suite.T(), suite.ipfsClient) +} + +func (suite *IpfsUtilsTestSuite) TestReadFiles() { + // For testing purposes, a hash is used from the calculator service. + hash := "QmeyrQkEyba8dd4rc3jrLd5pEwsxHutfH2RvsSaeSMqTtQ" + data := GetIpfsFile(hash) + assert.NotNil(suite.T(), data) + + protoFiles, err := ReadFilesCompressed(data) + + assert.Nil(suite.T(), err) + assert.NotNil(suite.T(), protoFiles) + + excpectedProtoFiles := []string{`syntax = "proto3"; + +package example_service; + +message Numbers { + float a = 1; + float b = 2; +} + +message Result { + float value = 1; +} + +service Calculator { + rpc add(Numbers) returns (Result) {} + rpc sub(Numbers) returns (Result) {} + rpc mul(Numbers) returns (Result) {} + rpc div(Numbers) returns (Result) {} +}`} + + assert.Equal(suite.T(), excpectedProtoFiles, protoFiles) +} From 8981ae140b30808941e1720991a73495b5405ae4 Mon Sep 17 00:00:00 2001 From: pls-github-dont-suspend-me Date: Wed, 4 Sep 2024 12:02:29 +0300 Subject: [PATCH 2/7] Add contract event listener --- .gitignore | 3 + README.md | 4 +- authutils/auth_service.go | 6 +- blockchain/blockchain.go | 32 +++++++--- blockchain/ethereumClient.go | 34 ++++++---- blockchain/orginzationMetadata.go | 8 +-- blockchain/serviceMetadata.go | 16 ++++- blockchain/utils.go | 17 +++++ config/blockchain_network_config.go | 39 +++++++----- config/blockchain_network_config_test.go | 24 ++++--- .../contract_event_listener.go | 20 ++++++ .../listen_organization_metadata_changing.go | 62 +++++++++++++++++++ etcddb/etcddb_client.go | 9 +++ metrics/request_stats.go | 6 +- metrics/response_stats.go | 59 ++++++++++-------- resources/blockchain_network_config.json | 9 ++- snetd/cmd/serve.go | 9 +++ utils/common.go | 23 +++++++ 18 files changed, 288 insertions(+), 92 deletions(-) create mode 100644 contract_event_listener/contract_event_listener.go create mode 100644 contract_event_listener/listen_organization_metadata_changing.go diff --git a/.gitignore b/.gitignore index 8531d46e..806551b3 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,6 @@ storage-* *log data.etcd/ +# vscode +.vscode + diff --git a/README.md b/README.md index d895dee9..eff67de3 100644 --- a/README.md +++ b/README.md @@ -182,9 +182,7 @@ time. Based on the network selected blockchain_network_selected the end point is auto determined Example `"https://sepolia.infura.io/v3"` for sepolia testnet. -* **blockchain_provider_api_key** (optional) - basic header authorization key for blockchain providers. Tested with - infura api - key secret. +* **blockchain_provider_api_key** (optional) - basic header authorization key for blockchain providers. Tested with infura api key secret. * **organization_id** (required) - Id of the organization to search for [service configuration diff --git a/authutils/auth_service.go b/authutils/auth_service.go index c794e344..c7db94f1 100755 --- a/authutils/auth_service.go +++ b/authutils/auth_service.go @@ -109,12 +109,12 @@ func CheckIfTokenHasExpired(expiredBlock *big.Int) error { // Get the current block number from on chain func CurrentBlock() (*big.Int, error) { - if ethClient, err := blockchain.GetEthereumClient(); err != nil { + if ethHttpClient, _, err := blockchain.GetEthereumClient(); err != nil { return nil, err } else { - defer ethClient.RawClient.Close() + defer ethHttpClient.RawClient.Close() var currentBlockHex string - if err = ethClient.RawClient.CallContext(context.Background(), ¤tBlockHex, "eth_blockNumber"); err != nil { + if err = ethHttpClient.RawClient.CallContext(context.Background(), ¤tBlockHex, "eth_blockNumber"); err != nil { zap.L().Error("error determining current block", zap.Error(err)) return nil, fmt.Errorf("error determining current block: %v", err) } diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index b336f0e8..1fa55048 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -29,8 +29,10 @@ type jobInfo struct { type Processor struct { enabled bool - ethClient *ethclient.Client - rawClient *rpc.Client + ethHttpClient *ethclient.Client + rawHttpClient *rpc.Client + ethWSClient *ethclient.Client + rawWSClient *rpc.Client sigHasher func([]byte) []byte privateKey *ecdsa.PrivateKey address string @@ -55,11 +57,13 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) { // Setup ethereum client - if ethclients, err := GetEthereumClient(); err != nil { + if ethHttpClients, ethWSClients, err := GetEthereumClient(); err != nil { return p, errors.Wrap(err, "error creating RPC client") } else { - p.rawClient = ethclients.RawClient - p.ethClient = ethclients.EthClient + p.rawHttpClient = ethHttpClients.RawClient + p.ethHttpClient = ethHttpClients.EthClient + p.rawWSClient = ethWSClients.RawClient + p.ethWSClient = ethWSClients.EthClient } // TODO: if address is not in config, try to load it using network @@ -68,7 +72,7 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) { p.escrowContractAddress = metadata.GetMpeAddress() - if mpe, err := NewMultiPartyEscrow(p.escrowContractAddress, p.ethClient); err != nil { + if mpe, err := NewMultiPartyEscrow(p.escrowContractAddress, p.ethHttpClient); err != nil { return p, errors.Wrap(err, "error instantiating MultiPartyEscrow contract") } else { p.multiPartyEscrow = mpe @@ -94,11 +98,19 @@ func (processor *Processor) MultiPartyEscrow() *MultiPartyEscrow { return processor.multiPartyEscrow } +func (processor *Processor) GetEthHttpClient() *ethclient.Client { + return processor.ethHttpClient +} + +func (processor *Processor) GetEthWSClient() *ethclient.Client { + return processor.ethWSClient +} + func (processor *Processor) CurrentBlock() (currentBlock *big.Int, err error) { // We have to do a raw call because the standard method of ethClient.HeaderByNumber(ctx, nil) errors on // unmarshaling the response currently. See https://github.com/ethereum/go-ethereum/issues/3230 var currentBlockHex string - if err = processor.rawClient.CallContext(context.Background(), ¤tBlockHex, "eth_blockNumber"); err != nil { + if err = processor.rawHttpClient.CallContext(context.Background(), ¤tBlockHex, "eth_blockNumber"); err != nil { zap.L().Error("error determining current block", zap.Error(err)) return nil, fmt.Errorf("error determining current block: %v", err) } @@ -114,6 +126,8 @@ func (processor *Processor) HasIdentity() bool { } func (processor *Processor) Close() { - processor.ethClient.Close() - processor.rawClient.Close() + processor.ethHttpClient.Close() + processor.rawHttpClient.Close() + processor.ethWSClient.Close() + processor.rawWSClient.Close() } diff --git a/blockchain/ethereumClient.go b/blockchain/ethereumClient.go index 0cd32b97..f3ae5b9e 100644 --- a/blockchain/ethereumClient.go +++ b/blockchain/ethereumClient.go @@ -3,10 +3,12 @@ package blockchain import ( "context" "encoding/base64" + + "github.com/singnet/snet-daemon/config" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" "github.com/pkg/errors" - "github.com/singnet/snet-daemon/config" ) type EthereumClient struct { @@ -19,21 +21,31 @@ func basicAuth(username, password string) string { return base64.StdEncoding.EncodeToString([]byte(auth)) } -func GetEthereumClient() (*EthereumClient, error) { +func GetEthereumClient() (*EthereumClient, *EthereumClient, error) { - ethereumClient := new(EthereumClient) - if client, err := rpc.DialOptions(context.Background(), - config.GetBlockChainEndPoint(), - rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey)))); err != nil { - return nil, errors.Wrap(err, "error creating RPC client") - } else { - ethereumClient.RawClient = client - ethereumClient.EthClient = ethclient.NewClient(client) + ethereumHttpClient := new(EthereumClient) + ethereumWsClient := new(EthereumClient) + httpClient, err := rpc.DialOptions(context.Background(), + config.GetBlockChainHTTPEndPoint(), + rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey)))) + if err != nil { + return nil, nil, errors.Wrap(err, "error creating RPC client") } - return ethereumClient, nil + ethereumHttpClient.RawClient = httpClient + ethereumHttpClient.EthClient = ethclient.NewClient(httpClient) + wsClient, err := rpc.DialOptions(context.Background(), config.GetBlockChainWSEndPoint()) + if err != nil { + return nil, nil, errors.Wrap(err, "error creating RPC WebSocket client") + } + + ethereumWsClient.RawClient = wsClient + ethereumWsClient.EthClient = ethclient.NewClient(wsClient) + + return ethereumHttpClient, ethereumWsClient, nil } + func (ethereumClient *EthereumClient) Close() { if ethereumClient != nil { ethereumClient.EthClient.Close() diff --git a/blockchain/orginzationMetadata.go b/blockchain/orginzationMetadata.go index b8a54609..add64c1b 100644 --- a/blockchain/orginzationMetadata.go +++ b/blockchain/orginzationMetadata.go @@ -114,7 +114,7 @@ func InitOrganizationMetaDataFromJson(jsonData string) (metaData *OrganizationMe return nil, err } if err = checkMandatoryFields(metaData); err != nil { - zap.L().Error("Error in check mdandatory fields", zap.Error(err)) + zap.L().Error("Error in check mandatory fields", zap.Error(err)) return nil, err } @@ -125,10 +125,10 @@ func checkMandatoryFields(metaData *OrganizationMetaData) (err error) { if metaData.daemonGroup.PaymentDetails.PaymentChannelStorageClient.Endpoints == nil { err = fmt.Errorf("Mandatory field : ETCD Client Endpoints are mising for the Group %v ", metaData.daemonGroup.GroupName) } - if &metaData.recipientPaymentAddress == nil { + if metaData.recipientPaymentAddress == (common.Address{}) { err = fmt.Errorf("Mandatory field : Recepient Address is missing for the Group %v ", metaData.daemonGroup.GroupName) } - return + return err } func setDerivedAttributes(metaData *OrganizationMetaData) (err error) { @@ -187,7 +187,7 @@ func getMetaDataURI() []byte { organizationRegistered, err := reg.GetOrganizationById(nil, orgId) if err != nil || !organizationRegistered.Found { - zap.L().Panic("Error Retrieving contract details for the Given Organization", zap.String("OrganizationId", config.GetString(config.OrganizationId))) + zap.L().Panic("Error Retrieving contract details for the Given Organization", zap.String("OrganizationId", config.GetString(config.OrganizationId)), zap.Error(err)) } return organizationRegistered.OrgMetadataURI[:] } diff --git a/blockchain/serviceMetadata.go b/blockchain/serviceMetadata.go index 31b5ab74..c0b36886 100644 --- a/blockchain/serviceMetadata.go +++ b/blockchain/serviceMetadata.go @@ -11,6 +11,7 @@ import ( "github.com/bufbuild/protocompile" pproto "github.com/emicklei/proto" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" "github.com/pkg/errors" "github.com/singnet/snet-daemon/config" "github.com/singnet/snet-daemon/ipfsutils" @@ -277,13 +278,22 @@ func ReadServiceMetaDataFromLocalFile(filename string) (*ServiceMetadata, error) } func getRegistryCaller() (reg *RegistryCaller) { - ethClient, err := GetEthereumClient() + ethHttpClient, _, err := GetEthereumClient() if err != nil { zap.L().Panic("Unable to get Blockchain client ", zap.Error(err)) } - defer ethClient.Close() + defer ethHttpClient.Close() registryContractAddress := getRegistryAddressKey() - reg, err = NewRegistryCaller(registryContractAddress, ethClient.EthClient) + reg, err = NewRegistryCaller(registryContractAddress, ethHttpClient.EthClient) + if err != nil { + zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress)) + } + return reg +} + +func GetRegistryCaller(ethHttpClient *ethclient.Client) *RegistryCaller { + registryContractAddress := getRegistryAddressKey() + reg, err := NewRegistryCaller(registryContractAddress, ethHttpClient) if err != nil { zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress)) } diff --git a/blockchain/utils.go b/blockchain/utils.go index eba3974b..0600c503 100644 --- a/blockchain/utils.go +++ b/blockchain/utils.go @@ -2,6 +2,7 @@ package blockchain import ( "encoding/base64" + "encoding/hex" "fmt" "regexp" "strings" @@ -82,3 +83,19 @@ func ToChecksumAddress(hexAddress string) string { mixedAddress := common.NewMixedcaseAddress(address) return mixedAddress.Address().String() } + +func StringToHex(str string) string { + hexStr := hex.EncodeToString([]byte(str)) + + // Pad the result to 32 bytes (64 hex characters) + paddedHexStr := hexStr + "0000000000000000000000000000000000000000000000000000000000000000"[len(hexStr):] + + // Add the 0x prefix to indicate that it's a hexadecimal value + result := "0x" + paddedHexStr + return result +} + +func StringToHash(str string) common.Hash { + hexStr := StringToHex(str) + return common.HexToHash(hexStr) +} diff --git a/config/blockchain_network_config.go b/config/blockchain_network_config.go index cd2f37d7..ca9313c5 100644 --- a/config/blockchain_network_config.go +++ b/config/blockchain_network_config.go @@ -12,17 +12,19 @@ import ( ) type NetworkSelected struct { - NetworkName string - EthereumJSONRPCEndpoint string - NetworkId string - RegistryAddressKey string + NetworkName string + EthereumJSONRPCHTTPEndpoint string + EthereumJSONRPCWSEndpoint string + NetworkId string + RegistryAddressKey string } const ( - BlockChainNetworkFileName = "resources/blockchain_network_config.json" - EthereumJsonRpcEndpointKey = "ethereum_json_rpc_endpoint" - NetworkId = "network_id" - RegistryAddressKey = "registry_address_key" + BlockChainNetworkFileName = "resources/blockchain_network_config.json" + EthereumJsonRpcHTTPEndpointKey = "ethereum_json_rpc_http_endpoint" + EthereumJsonRpcWSEndpointKey = "ethereum_json_rpc_ws_endpoint" + NetworkId = "network_id" + RegistryAddressKey = "registry_address_key" ) var networkSelected = &NetworkSelected{} @@ -38,7 +40,8 @@ func determineNetworkSelected(data []byte) (err error) { //Ethereum End point and Network ID mapped to networkSelected.NetworkName = networkName networkSelected.RegistryAddressKey = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[RegistryAddressKey], RegistryAddressKey) - networkSelected.EthereumJSONRPCEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcEndpointKey], EthereumJsonRpcEndpointKey) + networkSelected.EthereumJSONRPCHTTPEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcHTTPEndpointKey], EthereumJsonRpcHTTPEndpointKey) + networkSelected.EthereumJSONRPCWSEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcWSEndpointKey], EthereumJsonRpcWSEndpointKey) networkSelected.NetworkId = fmt.Sprintf("%v", dynamicBinding[networkName].(map[string]any)[NetworkId]) return err @@ -61,9 +64,13 @@ func GetNetworkId() string { return networkSelected.NetworkId } -// Get the block chain end point associated with the Network selected -func GetBlockChainEndPoint() string { - return networkSelected.EthereumJSONRPCEndpoint +// Get the blockchain endpoint associated with the Network selected +func GetBlockChainHTTPEndPoint() string { + return networkSelected.EthereumJSONRPCHTTPEndpoint +} + +func GetBlockChainWSEndPoint() string { + return networkSelected.EthereumJSONRPCWSEndpoint } // Get the Registry address of the contract @@ -110,9 +117,11 @@ func deriveDatafromJSON(data []byte) (err error) { networkSelected.RegistryAddressKey = fmt.Sprintf("%v", m[GetNetworkId()].(map[string]any)["address"]) zap.L().Info("Derive data from JSON", zap.String("Network", GetString(BlockChainNetworkSelected)), - zap.String("NetwrokId", GetNetworkId()), - zap.String("RegistryAddress", GetRegistryAddress()), - zap.String("Blockchain endpoint", GetBlockChainEndPoint())) + zap.String("Netwrok id", GetNetworkId()), + zap.String("Registry address", GetRegistryAddress()), + zap.String("Blockchain http endpoint", GetBlockChainHTTPEndPoint()), + zap.String("Blockchain ws endpoint", GetBlockChainWSEndPoint()), + ) return nil } diff --git a/config/blockchain_network_config_test.go b/config/blockchain_network_config_test.go index 9f6771c0..acaeff5e 100644 --- a/config/blockchain_network_config_test.go +++ b/config/blockchain_network_config_test.go @@ -16,31 +16,35 @@ func TestGetNetworkId(t *testing.T) { var defaultBlockChainNetworkConfig = ` { "local": { - "ethereum_json_rpc_endpoint": "http://localhost:8545", - "network_id": "42", + "ethereum_json_rpc_http_endpoint": "http://localhost:8545", + "ethereum_json_rpc_ws_endpoint": "ws://localhost:443", + "network_id": "42", "registry_address_key": "0x4e74fefa82e83e0964f0d9f53c68e03f7298a8b2" }, "main": { - "ethereum_json_rpc_endpoint": "https://mainnet.infura.io/v3", + "ethereum_json_rpc_http_endpoint": "https://mainnet.infura.io/v3", + "ethereum_json_rpc_ws_endpoint": "wss://mainnet.infura.io/v3", "network_id": "1" }, "goerli": { - "ethereum_json_rpc_endpoint": "https://goerli.infura.io/v3", + "ethereum_json_rpc_http_endpoint": "https://goerli.infura.io/v3", + "ethereum_json_rpc_ws_endpoint": "wss://goerli.infura.io/v3", "network_id": "5" }, "sepolia": { - "ethereum_json_rpc_endpoint": "https://sepolia.infura.io/v3", + "ethereum_json_rpc_http_endpoint": "https://sepolia.infura.io/v3", + "ethereum_json_rpc_ws_endpoint": "wss://sepolia.infura.io/v3", "network_id": "11155111" } }` func TestGetBlockChainEndPoint(t *testing.T) { Vip().Set(BlockChainNetworkSelected, "local") - determineNetworkSelected([]byte(defaultBlockChainNetworkConfig)) - - assert.Matches(t, GetBlockChainEndPoint(), GetString(BlockChainNetworkSelected)) + err := determineNetworkSelected([]byte(defaultBlockChainNetworkConfig)) + assert.Equal(t, err, nil) + assert.Matches(t, GetBlockChainHTTPEndPoint(), GetString(BlockChainNetworkSelected)) + assert.Matches(t, GetBlockChainWSEndPoint(), GetString(BlockChainNetworkSelected)) assert2.NotEqual(t, GetNetworkId(), nil) - } func TestGetRegistryAddress(t *testing.T) { @@ -92,7 +96,7 @@ func Test_GetDetailsFromJsonOrConfig(t *testing.T) { want string network string }{ - {EthereumJsonRpcEndpointKey, "https://sepolia.infura.io/v3", "sepolia"}, + {EthereumJsonRpcHTTPEndpointKey, "https://sepolia.infura.io/v3", "sepolia"}, {RegistryAddressKey, "0x4e74fefa82e83e0964f0d9f53c68e03f7298a8b2", "local"}, } for _, tt := range tests { diff --git a/contract_event_listener/contract_event_listener.go b/contract_event_listener/contract_event_listener.go new file mode 100644 index 00000000..707b8cc6 --- /dev/null +++ b/contract_event_listener/contract_event_listener.go @@ -0,0 +1,20 @@ +package contract_event_listener + +import ( + "github.com/singnet/snet-daemon/blockchain" + "github.com/singnet/snet-daemon/etcddb" +) + +type EventSignature string + +const ( + contractAddress = "0x4DCc70c6FCE4064803f0ae0cE48497B3f7182e5D" + UpdateMetadataUriEventSignature EventSignature = "0x06ccb920be65231f5c9d04dd4883d3c7648ebe5f5317cc7177ee4f4a7cc2d038" +) + +type ContractEventListener struct { + BlockchainProcessor *blockchain.Processor + EventSignature EventSignature + CurrentOrganizationMetaData *blockchain.OrganizationMetaData + CurrentEtcdClient *etcddb.EtcdClient +} diff --git a/contract_event_listener/listen_organization_metadata_changing.go b/contract_event_listener/listen_organization_metadata_changing.go new file mode 100644 index 00000000..2d619ccb --- /dev/null +++ b/contract_event_listener/listen_organization_metadata_changing.go @@ -0,0 +1,62 @@ +package contract_event_listener + +import ( + "context" + + "github.com/singnet/snet-daemon/blockchain" + "github.com/singnet/snet-daemon/etcddb" + "github.com/singnet/snet-daemon/utils" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "go.uber.org/zap" +) + +func (l *ContractEventListener) ListenOrganizationMetadataChanging() { + query := ethereum.FilterQuery{ + Addresses: []common.Address{ + common.HexToAddress(contractAddress), + }, + Topics: [][]common.Hash{ + { + common.HexToHash(string(UpdateMetadataUriEventSignature)), + blockchain.StringToHash(l.CurrentOrganizationMetaData.OrgID), + }, + }, + } + + ethWSClient := l.BlockchainProcessor.GetEthWSClient() + + logs := make(chan types.Log) + sub, err := ethWSClient.SubscribeFilterLogs(context.Background(), query, logs) + if err != nil { + zap.L().Fatal("Failed to subscribe to logs", zap.Error(err)) + } + + for { + select { + case err := <-sub.Err(): + zap.L().Error("Subscription error: ", zap.Error(err)) + case logData := <-logs: + zap.L().Debug("Log received", zap.Any("value", logData)) + + // Get metaDataUri from smart contract and organizationMetaData from IPFS + newOrganizationMetaData := blockchain.GetOrganizationMetaData() + zap.L().Info("Get new organization metadata", zap.Any("value", newOrganizationMetaData)) + + if !utils.CompareSlices(l.CurrentOrganizationMetaData.GetPaymentStorageEndPoints(), newOrganizationMetaData.GetPaymentStorageEndPoints()) { + // mutex + l.CurrentEtcdClient.Close() + newEtcdbClient, err := etcddb.Reconnect(newOrganizationMetaData) + if err != nil { + zap.L().Error("Error in reconnecting to etcd", zap.Error(err)) + } + l.CurrentEtcdClient = newEtcdbClient + } + + l.CurrentOrganizationMetaData = newOrganizationMetaData + zap.L().Info("Update current organization metadata", zap.Any("value", l.CurrentOrganizationMetaData)) + } + } +} diff --git a/etcddb/etcddb_client.go b/etcddb/etcddb_client.go index 529f9ea4..22891a03 100644 --- a/etcddb/etcddb_client.go +++ b/etcddb/etcddb_client.go @@ -100,6 +100,15 @@ func NewEtcdClientFromVip(vip *viper.Viper, metaData *blockchain.OrganizationMet return } +func Reconnect(metadata *blockchain.OrganizationMetaData) (*EtcdClient, error) { + etcdClient, err := NewEtcdClientFromVip(config.Vip(), metadata) + if err != nil { + return nil, err + } + zap.L().Info("Successful reconnet to new etcd endpoints", zap.Strings("New enpoints", metadata.GetPaymentStorageEndPoints())) + return etcdClient, nil +} + func getTlsConfig() (*tls.Config, error) { zap.L().Debug("enabling SSL support via X509 keypair") diff --git a/metrics/request_stats.go b/metrics/request_stats.go index 83e8cbac..d9eff875 100644 --- a/metrics/request_stats.go +++ b/metrics/request_stats.go @@ -6,7 +6,7 @@ import ( "strconv" ) -//Request stats that will be captured +// Request stats that will be captured type RequestStats struct { Type string `json:"type"` RegistryAddressKey string `json:"registry_address_key"` @@ -26,8 +26,6 @@ type RequestStats struct { ChannelId string `json:"channel_id"` } - - func (request *RequestStats) setDataFromContext(md metadata.MD) { request.InputDataSize = strconv.FormatUint(GetSize(md), 10) @@ -37,7 +35,7 @@ func createRequestStat(commonStat *CommonStats) *RequestStats { request := &RequestStats{ Type: "request", RegistryAddressKey: config.GetRegistryAddress(), - EthereumJsonRpcEndpointKey: config.GetBlockChainEndPoint(), + EthereumJsonRpcEndpointKey: config.GetBlockChainHTTPEndPoint(), RequestID: commonStat.ID, GroupID: commonStat.GroupID, DaemonEndPoint: commonStat.DaemonEndPoint, diff --git a/metrics/response_stats.go b/metrics/response_stats.go index c30c3d43..72be4b09 100644 --- a/metrics/response_stats.go +++ b/metrics/response_stats.go @@ -1,17 +1,19 @@ package metrics import ( - "github.com/singnet/snet-daemon/config" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "math/big" "strconv" "time" + + "github.com/singnet/snet-daemon/config" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( - timeFormat="2006-01-02 15:04:05.999999999" + timeFormat = "2006-01-02 15:04:05.999999999" ) + type CommonStats struct { ID string ServiceMethod string @@ -31,14 +33,15 @@ type CommonStats struct { } type ChannelStats struct { - OrganizationID string - ServiceID string - GroupID string - AuthorizedAmount *big.Int - FullAmount *big.Int - ChannelId *big.Int - Nonce *big.Int + OrganizationID string + ServiceID string + GroupID string + AuthorizedAmount *big.Int + FullAmount *big.Int + ChannelId *big.Int + Nonce *big.Int } + func BuildCommonStats(receivedTime time.Time, methodName string) *CommonStats { commonStats := &CommonStats{ ID: GenXid(), @@ -53,7 +56,7 @@ func BuildCommonStats(receivedTime time.Time, methodName string) *CommonStats { } -//Response stats that will be captured and published +// Response stats that will be captured and published type ResponseStats struct { Type string `json:"type"` RegistryAddressKey string `json:"registry_address_key"` @@ -85,20 +88,20 @@ type ResponseStats struct { UserAddress string `json:"user_address"` } -//Publish response received as a payload for reporting /metrics analysis -//If there is an error in the response received from the service, then send out a notification as well. +// Publish response received as a payload for reporting /metrics analysis +// If there is an error in the response received from the service, then send out a notification as well. func PublishResponseStats(commonStats *CommonStats, duration time.Duration, err error) bool { response := createResponseStats(commonStats, duration, err) - return Publish(response, config.GetString(config.MeteringEndPoint) + "/metering/usage",commonStats) + return Publish(response, config.GetString(config.MeteringEndPoint)+"/metering/usage", commonStats) } func createResponseStats(commonStat *CommonStats, duration time.Duration, err error) *ResponseStats { - currentTime := time.Now().UTC().Format(timeFormat) + currentTime := time.Now().UTC().Format(timeFormat) response := &ResponseStats{ Type: "response", RegistryAddressKey: config.GetRegistryAddress(), - EthereumJsonRpcEndpointKey: config.GetBlockChainEndPoint(), + EthereumJsonRpcEndpointKey: config.GetBlockChainHTTPEndPoint(), RequestID: commonStat.ID, ResponseTime: strconv.FormatFloat(duration.Seconds(), 'f', 4, 64), GroupID: daemonGroupId, @@ -114,21 +117,23 @@ func createResponseStats(commonStat *CommonStats, duration time.Duration, err er UserDetails: commonStat.UserDetails, UserAgent: commonStat.UserAgent, ChannelId: commonStat.ChannelId, - UserName:commonStat.UserName, - StartTime:commonStat.RequestReceivedTime, - EndTime:currentTime, - Status:getStatus(err), - UsageValue:1, - UsageType:"apicall", - Operation:"read", - PaymentMode:commonStat.PaymentMode, - UserAddress:commonStat.UserAddress, + UserName: commonStat.UserName, + StartTime: commonStat.RequestReceivedTime, + EndTime: currentTime, + Status: getStatus(err), + UsageValue: 1, + UsageType: "apicall", + Operation: "read", + PaymentMode: commonStat.PaymentMode, + UserAddress: commonStat.UserAddress, } return response } func getStatus(err error) string { - if err != nil {return "failed"} + if err != nil { + return "failed" + } return "success" } diff --git a/resources/blockchain_network_config.json b/resources/blockchain_network_config.json index 3b390c19..59c9dd5e 100644 --- a/resources/blockchain_network_config.json +++ b/resources/blockchain_network_config.json @@ -5,15 +5,18 @@ "registry_address_key": "0x4e74fefa82e83e0964f0d9f53c68e03f7298a8b2" }, "main": { - "ethereum_json_rpc_endpoint": "https://mainnet.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_http_endpoint": "https://mainnet.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_ws_endpoint": "wss://mainnet.infura.io/ws/v3/09027f4a13e841d48dbfefc67e7685d5", "network_id": "1" }, "goerli": { - "ethereum_json_rpc_endpoint": "https://goerli.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_http_endpoint": "https://goerli.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_ws_endpoint": "wss://goerli.infura.io/ws/v3/09027f4a13e841d48dbfefc67e7685d5", "network_id": "5" }, "sepolia": { - "ethereum_json_rpc_endpoint": "https://sepolia.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_http_endpoint": "https://sepolia.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_ws_endpoint": "wss://sepolia.infura.io/ws/v3/98064002908248a0b0d837940d2c647b", "network_id": "11155111" } } \ No newline at end of file diff --git a/snetd/cmd/serve.go b/snetd/cmd/serve.go index e51ddeb4..59da76b7 100644 --- a/snetd/cmd/serve.go +++ b/snetd/cmd/serve.go @@ -17,6 +17,7 @@ import ( "github.com/singnet/snet-daemon/blockchain" "github.com/singnet/snet-daemon/config" "github.com/singnet/snet-daemon/configuration_service" + "github.com/singnet/snet-daemon/contract_event_listener" "github.com/singnet/snet-daemon/escrow" "github.com/singnet/snet-daemon/handler" "github.com/singnet/snet-daemon/handler/httphandler" @@ -60,6 +61,14 @@ var ServeCmd = &cobra.Command{ zap.L().Fatal("Unable to initialize daemon", zap.Error(err)) } + contractEventLister := contract_event_listener.ContractEventListener{ + BlockchainProcessor: &d.blockProc, + EventSignature: contract_event_listener.UpdateMetadataUriEventSignature, + CurrentOrganizationMetaData: components.OrganizationMetaData(), + CurrentEtcdClient: components.EtcdClient(), + } + go contractEventLister.ListenOrganizationMetadataChanging() + d.start() defer d.stop() diff --git a/utils/common.go b/utils/common.go index a62c6e7d..b68a4b9f 100644 --- a/utils/common.go +++ b/utils/common.go @@ -3,6 +3,7 @@ package utils import ( "bytes" "encoding/gob" + "errors" "strings" "github.com/ethereum/go-ethereum/common" @@ -54,3 +55,25 @@ func CheckIfHttps(endpoints []string) bool { } return false } + +func CompareSlices[T comparable](s1, s2 []T) bool { + if len(s1) != len(s2) { + return false + } + for i := range s1 { + if s1[i] != s2[i] { + return false + } + } + return true +} + +func ConvertHTTPToWS(url string) (string, error) { + if strings.HasPrefix(url, "https://") { + return strings.Replace(url, "https://", "wss://", 1), nil + } else if strings.HasPrefix(url, "http://") { + return strings.Replace(url, "http://", "ws://", 1), nil + } else { + return "", errors.New("invalid URL scheme. URL must start with 'http://' or 'https://'") + } +} From 8c576b49b56aace815a7f4ef7f76a96d5eaa95f0 Mon Sep 17 00:00:00 2001 From: pls-github-dont-suspend-me Date: Mon, 9 Sep 2024 20:45:09 +0300 Subject: [PATCH 3/7] Add hot reloading for etcd client --- authutils/auth_service.go | 2 +- blockchain/blockchain.go | 19 +++++++- blockchain/ethereumClient.go | 38 +++++++++++---- blockchain/serviceMetadata.go | 11 ++++- blockchain/utils.go | 31 ++++++------ config/config.go | 6 +-- .../contract_event_listener.go | 8 +--- .../listen_organization_metadata_changing.go | 47 ++++++++++++------- etcddb/etcddb_client.go | 20 +++++--- etcddb/etcddb_conf.go | 5 ++ resources/blockchain_network_config.json | 2 +- snetd/cmd/serve.go | 31 +++++++----- 12 files changed, 143 insertions(+), 77 deletions(-) diff --git a/authutils/auth_service.go b/authutils/auth_service.go index c7db94f1..87193a37 100755 --- a/authutils/auth_service.go +++ b/authutils/auth_service.go @@ -109,7 +109,7 @@ func CheckIfTokenHasExpired(expiredBlock *big.Int) error { // Get the current block number from on chain func CurrentBlock() (*big.Int, error) { - if ethHttpClient, _, err := blockchain.GetEthereumClient(); err != nil { + if ethHttpClient, _, err := blockchain.CreateEthereumClients(); err != nil { return nil, err } else { defer ethHttpClient.RawClient.Close() diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 1fa55048..bfa5b7e9 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -57,7 +57,7 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) { // Setup ethereum client - if ethHttpClients, ethWSClients, err := GetEthereumClient(); err != nil { + if ethHttpClients, ethWSClients, err := CreateEthereumClients(); err != nil { return p, errors.Wrap(err, "error creating RPC client") } else { p.rawHttpClient = ethHttpClients.RawClient @@ -86,6 +86,23 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) { return p, nil } +func (processor *Processor) ReconnectToWsClient() error { + processor.ethWSClient.Close() + processor.rawHttpClient.Close() + + zap.L().Debug("Try to reconnect to websocket client") + + newEthWSClients, err := CreateWSEthereumClient() + if err != nil { + return err + } + + processor.ethWSClient = newEthWSClients.EthClient + processor.rawWSClient = newEthWSClients.RawClient + + return nil +} + func (processor *Processor) Enabled() (enabled bool) { return processor.enabled } diff --git a/blockchain/ethereumClient.go b/blockchain/ethereumClient.go index f3ae5b9e..d394c874 100644 --- a/blockchain/ethereumClient.go +++ b/blockchain/ethereumClient.go @@ -21,29 +21,47 @@ func basicAuth(username, password string) string { return base64.StdEncoding.EncodeToString([]byte(auth)) } -func GetEthereumClient() (*EthereumClient, *EthereumClient, error) { +func CreateEthereumClients() (*EthereumClient, *EthereumClient, error) { + ethereumHttpClient, err := CreateHTTPEthereumClient() + if err != nil { + return nil, nil, err + } + + ethereumWsClient, err := CreateWSEthereumClient() + if err != nil { + return nil, nil, err + } + return ethereumHttpClient, ethereumWsClient, nil +} + +func CreateHTTPEthereumClient() (*EthereumClient, error) { ethereumHttpClient := new(EthereumClient) - ethereumWsClient := new(EthereumClient) - httpClient, err := rpc.DialOptions(context.Background(), + httpClient, err := rpc.DialOptions( + context.Background(), config.GetBlockChainHTTPEndPoint(), rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey)))) if err != nil { - return nil, nil, errors.Wrap(err, "error creating RPC client") + return nil, errors.Wrap(err, "error creating RPC client") } ethereumHttpClient.RawClient = httpClient ethereumHttpClient.EthClient = ethclient.NewClient(httpClient) - wsClient, err := rpc.DialOptions(context.Background(), config.GetBlockChainWSEndPoint()) + return ethereumHttpClient, nil +} + +func CreateWSEthereumClient() (*EthereumClient, error) { + ethereumWsClient := new(EthereumClient) + wsClient, err := rpc.DialOptions( + context.Background(), + config.GetBlockChainWSEndPoint(), + rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey)))) if err != nil { - return nil, nil, errors.Wrap(err, "error creating RPC WebSocket client") + return nil, errors.Wrap(err, "error creating RPC WebSocket client") } - ethereumWsClient.RawClient = wsClient ethereumWsClient.EthClient = ethclient.NewClient(wsClient) - - return ethereumHttpClient, ethereumWsClient, nil - + return ethereumWsClient, nil } func (ethereumClient *EthereumClient) Close() { diff --git a/blockchain/serviceMetadata.go b/blockchain/serviceMetadata.go index c0b36886..8463cca9 100644 --- a/blockchain/serviceMetadata.go +++ b/blockchain/serviceMetadata.go @@ -278,7 +278,7 @@ func ReadServiceMetaDataFromLocalFile(filename string) (*ServiceMetadata, error) } func getRegistryCaller() (reg *RegistryCaller) { - ethHttpClient, _, err := GetEthereumClient() + ethHttpClient, err := CreateHTTPEthereumClient() if err != nil { zap.L().Panic("Unable to get Blockchain client ", zap.Error(err)) } @@ -300,6 +300,15 @@ func GetRegistryCaller(ethHttpClient *ethclient.Client) *RegistryCaller { return reg } +func GetRegistryFilterer(ethWsClient *ethclient.Client) *RegistryFilterer { + registryContractAddress := getRegistryAddressKey() + reg, err := NewRegistryFilterer(registryContractAddress, ethWsClient) + if err != nil { + zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress)) + } + return reg +} + func getServiceMetaDataUrifromRegistry() []byte { reg := getRegistryCaller() diff --git a/blockchain/utils.go b/blockchain/utils.go index 0600c503..d60314c7 100644 --- a/blockchain/utils.go +++ b/blockchain/utils.go @@ -2,7 +2,6 @@ package blockchain import ( "encoding/base64" - "encoding/hex" "fmt" "regexp" "strings" @@ -84,18 +83,20 @@ func ToChecksumAddress(hexAddress string) string { return mixedAddress.Address().String() } -func StringToHex(str string) string { - hexStr := hex.EncodeToString([]byte(str)) - - // Pad the result to 32 bytes (64 hex characters) - paddedHexStr := hexStr + "0000000000000000000000000000000000000000000000000000000000000000"[len(hexStr):] - - // Add the 0x prefix to indicate that it's a hexadecimal value - result := "0x" + paddedHexStr - return result -} - -func StringToHash(str string) common.Hash { - hexStr := StringToHex(str) - return common.HexToHash(hexStr) +/* +MakeTopicFilterer is used to generate a filter for querying Ethereum logs or contract events. +Ethereum topics (such as for events) are 32-byte fixed-size values (common for hashing +in Ethereum logs). This function takes a string parameter, converts it into a 32-byte array, +and returns it in a slice. This allows developers to create filters when looking for +specific events or log entries based on the topic. +*/ +func MakeTopicFilterer(param string) [][32]byte { + // Create a 32-byte array + var param32Byte [32]byte + + // Convert the string to a byte slice and copy up to 32 bytes + copy(param32Byte[:], []byte(param)[:min(len(param), 32)]) + + // Return the filter with a single element (the 32-byte array) + return [][32]byte{param32Byte} } diff --git a/config/config.go b/config/config.go index c88942dc..4fb87ac5 100644 --- a/config/config.go +++ b/config/config.go @@ -13,10 +13,9 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "go.uber.org/zap" - "github.com/spf13/cast" "github.com/spf13/viper" + "go.uber.org/zap" ) const ( @@ -113,7 +112,8 @@ const ( }, "payment_channel_storage_client": { "connection_timeout": "0s", - "request_timeout": "0s" + "request_timeout": "0s", + "hot_reload": true }, "payment_channel_storage_server": { "id": "storage-1", diff --git a/contract_event_listener/contract_event_listener.go b/contract_event_listener/contract_event_listener.go index 707b8cc6..cd6323e9 100644 --- a/contract_event_listener/contract_event_listener.go +++ b/contract_event_listener/contract_event_listener.go @@ -1,4 +1,4 @@ -package contract_event_listener +package contractlistener import ( "github.com/singnet/snet-daemon/blockchain" @@ -7,14 +7,8 @@ import ( type EventSignature string -const ( - contractAddress = "0x4DCc70c6FCE4064803f0ae0cE48497B3f7182e5D" - UpdateMetadataUriEventSignature EventSignature = "0x06ccb920be65231f5c9d04dd4883d3c7648ebe5f5317cc7177ee4f4a7cc2d038" -) - type ContractEventListener struct { BlockchainProcessor *blockchain.Processor - EventSignature EventSignature CurrentOrganizationMetaData *blockchain.OrganizationMetaData CurrentEtcdClient *etcddb.EtcdClient } diff --git a/contract_event_listener/listen_organization_metadata_changing.go b/contract_event_listener/listen_organization_metadata_changing.go index 2d619ccb..23775fc0 100644 --- a/contract_event_listener/listen_organization_metadata_changing.go +++ b/contract_event_listener/listen_organization_metadata_changing.go @@ -1,4 +1,4 @@ -package contract_event_listener +package contractlistener import ( "context" @@ -7,29 +7,27 @@ import ( "github.com/singnet/snet-daemon/etcddb" "github.com/singnet/snet-daemon/utils" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/gorilla/websocket" "go.uber.org/zap" ) func (l *ContractEventListener) ListenOrganizationMetadataChanging() { - query := ethereum.FilterQuery{ - Addresses: []common.Address{ - common.HexToAddress(contractAddress), - }, - Topics: [][]common.Hash{ - { - common.HexToHash(string(UpdateMetadataUriEventSignature)), - blockchain.StringToHash(l.CurrentOrganizationMetaData.OrgID), - }, - }, + zap.L().Info("Starting contract event listener for organization metedata changing") + + watchOpts := &bind.WatchOpts{ + Start: nil, + Context: context.Background(), } ethWSClient := l.BlockchainProcessor.GetEthWSClient() - logs := make(chan types.Log) - sub, err := ethWSClient.SubscribeFilterLogs(context.Background(), query, logs) + registryFilterer := blockchain.GetRegistryFilterer(ethWSClient) + orgIdFilter := blockchain.MakeTopicFilterer(l.CurrentOrganizationMetaData.OrgID) + + eventContractChannel := make(chan *blockchain.RegistryOrganizationModified) + sub, err := registryFilterer.WatchOrganizationModified(watchOpts, eventContractChannel, orgIdFilter) + if err != nil { zap.L().Fatal("Failed to subscribe to logs", zap.Error(err)) } @@ -38,7 +36,21 @@ func (l *ContractEventListener) ListenOrganizationMetadataChanging() { select { case err := <-sub.Err(): zap.L().Error("Subscription error: ", zap.Error(err)) - case logData := <-logs: + if websocket.IsCloseError( + err, + websocket.CloseNormalClosure, + websocket.CloseAbnormalClosure, + websocket.CloseGoingAway, + websocket.CloseServiceRestart, + websocket.CloseTryAgainLater, + websocket.CloseTLSHandshake, + ) { + err = l.BlockchainProcessor.ReconnectToWsClient() + if err != nil { + zap.L().Error("Error in reconnecting to websockets", zap.Error(err)) + } + } + case logData := <-eventContractChannel: zap.L().Debug("Log received", zap.Any("value", logData)) // Get metaDataUri from smart contract and organizationMetaData from IPFS @@ -46,7 +58,6 @@ func (l *ContractEventListener) ListenOrganizationMetadataChanging() { zap.L().Info("Get new organization metadata", zap.Any("value", newOrganizationMetaData)) if !utils.CompareSlices(l.CurrentOrganizationMetaData.GetPaymentStorageEndPoints(), newOrganizationMetaData.GetPaymentStorageEndPoints()) { - // mutex l.CurrentEtcdClient.Close() newEtcdbClient, err := etcddb.Reconnect(newOrganizationMetaData) if err != nil { diff --git a/etcddb/etcddb_client.go b/etcddb/etcddb_client.go index 22891a03..ff3650b6 100644 --- a/etcddb/etcddb_client.go +++ b/etcddb/etcddb_client.go @@ -35,11 +35,12 @@ func (mutex *EtcdClientMutex) Unlock(ctx context.Context) (err error) { return mutex.mutex.Unlock(ctx) } -// EtcdClient struct has some useful methods to wolrk with etcd client +// EtcdClient struct has some useful methods to work with etcd client type EtcdClient struct { - timeout time.Duration - session *concurrency.Session - etcdv3 *clientv3.Client + hotReaload bool + timeout time.Duration + session *concurrency.Session + etcdv3 *clientv3.Client } // NewEtcdClient create new etcd storage client. @@ -93,9 +94,10 @@ func NewEtcdClientFromVip(vip *viper.Viper, metaData *blockchain.OrganizationMet } client = &EtcdClient{ - timeout: conf.RequestTimeout, - session: session, - etcdv3: etcdv3, + hotReaload: conf.HotReload, + timeout: conf.RequestTimeout, + session: session, + etcdv3: etcdv3, } return } @@ -478,6 +480,10 @@ func (client *EtcdClient) StartTransaction(keys []string) (_transaction storage. return transaction, nil } +func (client *EtcdClient) IsHotReloadEnabled() bool { + return client.hotReaload +} + type keyValueVersion struct { Key string Value string diff --git a/etcddb/etcddb_conf.go b/etcddb/etcddb_conf.go index 1414385a..7dc5405e 100644 --- a/etcddb/etcddb_conf.go +++ b/etcddb/etcddb_conf.go @@ -7,6 +7,7 @@ import ( "github.com/singnet/snet-daemon/blockchain" "github.com/singnet/snet-daemon/config" "github.com/spf13/viper" + "go.uber.org/zap" ) // EtcdClientConf config @@ -16,6 +17,7 @@ import ( type EtcdClientConf struct { ConnectionTimeout time.Duration `json:"connection_timeout" mapstructure:"connection_timeout"` RequestTimeout time.Duration `json:"request_timeout" mapstructure:"request_timeout"` + HotReload bool `json:"hot_reload" mapstructure:"hot_reload"` Endpoints []string } @@ -28,6 +30,7 @@ func GetEtcdClientConf(vip *viper.Viper, metaData *blockchain.OrganizationMetaDa conf = &EtcdClientConf{ ConnectionTimeout: metaData.GetConnectionTimeOut(), RequestTimeout: metaData.GetRequestTimeOut(), + HotReload: true, Endpoints: metaData.GetPaymentStorageEndPoints(), } @@ -47,6 +50,8 @@ func GetEtcdClientConf(vip *viper.Viper, metaData *blockchain.OrganizationMetaDa conf.ConnectionTimeout = confFromVip.ConnectionTimeout } + conf.HotReload = confFromVip.HotReload + zap.L().Info("Etcd client hot reload", zap.Bool("enable", conf.HotReload)) return } diff --git a/resources/blockchain_network_config.json b/resources/blockchain_network_config.json index 59c9dd5e..6f93c13b 100644 --- a/resources/blockchain_network_config.json +++ b/resources/blockchain_network_config.json @@ -15,7 +15,7 @@ "network_id": "5" }, "sepolia": { - "ethereum_json_rpc_http_endpoint": "https://sepolia.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", + "ethereum_json_rpc_http_endpoint": "https://sepolia.infura.io/v3/98064002908248a0b0d837940d2c647b", "ethereum_json_rpc_ws_endpoint": "wss://sepolia.infura.io/ws/v3/98064002908248a0b0d837940d2c647b", "network_id": "11155111" } diff --git a/snetd/cmd/serve.go b/snetd/cmd/serve.go index 59da76b7..05740b49 100644 --- a/snetd/cmd/serve.go +++ b/snetd/cmd/serve.go @@ -10,20 +10,21 @@ import ( "strings" "syscall" - "github.com/gorilla/handlers" - "github.com/improbable-eng/grpc-web/go/grpcweb" - "github.com/pkg/errors" - "github.com/rs/cors" "github.com/singnet/snet-daemon/blockchain" "github.com/singnet/snet-daemon/config" "github.com/singnet/snet-daemon/configuration_service" - "github.com/singnet/snet-daemon/contract_event_listener" + contractListener "github.com/singnet/snet-daemon/contract_event_listener" "github.com/singnet/snet-daemon/escrow" "github.com/singnet/snet-daemon/handler" "github.com/singnet/snet-daemon/handler/httphandler" "github.com/singnet/snet-daemon/logger" "github.com/singnet/snet-daemon/metrics" "github.com/singnet/snet-daemon/training" + + "github.com/gorilla/handlers" + "github.com/improbable-eng/grpc-web/go/grpcweb" + "github.com/pkg/errors" + "github.com/rs/cors" "github.com/soheilhy/cmux" "github.com/spf13/cobra" "go.uber.org/zap" @@ -61,17 +62,21 @@ var ServeCmd = &cobra.Command{ zap.L().Fatal("Unable to initialize daemon", zap.Error(err)) } - contractEventLister := contract_event_listener.ContractEventListener{ - BlockchainProcessor: &d.blockProc, - EventSignature: contract_event_listener.UpdateMetadataUriEventSignature, - CurrentOrganizationMetaData: components.OrganizationMetaData(), - CurrentEtcdClient: components.EtcdClient(), - } - go contractEventLister.ListenOrganizationMetadataChanging() - d.start() defer d.stop() + // Check if the payment storage client is etcd by verifying if d.components.etcdClient exists. + // If etcdClient is not nil and hot reload is enabled, initialize a ContractEventListener + // to listen for changes in the organization metadata. + if d.components.etcdClient != nil && d.components.etcdClient.IsHotReloadEnabled() { + contractEventLister := contractListener.ContractEventListener{ + BlockchainProcessor: &d.blockProc, + CurrentOrganizationMetaData: components.OrganizationMetaData(), + CurrentEtcdClient: components.EtcdClient(), + } + go contractEventLister.ListenOrganizationMetadataChanging() + } + sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) <-sigChan From ed508f8c07f7917ee9a0e242efc66b8affa19489 Mon Sep 17 00:00:00 2001 From: pls-github-dont-suspend-me Date: Tue, 10 Sep 2024 16:49:18 +0300 Subject: [PATCH 4/7] Delete unused ConvertHTTPToWS --- utils/common.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/utils/common.go b/utils/common.go index b68a4b9f..2f0cc2d6 100644 --- a/utils/common.go +++ b/utils/common.go @@ -3,7 +3,6 @@ package utils import ( "bytes" "encoding/gob" - "errors" "strings" "github.com/ethereum/go-ethereum/common" @@ -67,13 +66,3 @@ func CompareSlices[T comparable](s1, s2 []T) bool { } return true } - -func ConvertHTTPToWS(url string) (string, error) { - if strings.HasPrefix(url, "https://") { - return strings.Replace(url, "https://", "wss://", 1), nil - } else if strings.HasPrefix(url, "http://") { - return strings.Replace(url, "http://", "ws://", 1), nil - } else { - return "", errors.New("invalid URL scheme. URL must start with 'http://' or 'https://'") - } -} From ea362edc8580a92abd907366799bbf7eb86032c5 Mon Sep 17 00:00:00 2001 From: pls-github-dont-suspend-me Date: Wed, 11 Sep 2024 15:41:06 +0300 Subject: [PATCH 5/7] Delete unused GetRegistryCaller --- blockchain/serviceMetadata.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/blockchain/serviceMetadata.go b/blockchain/serviceMetadata.go index 8463cca9..9c408b14 100644 --- a/blockchain/serviceMetadata.go +++ b/blockchain/serviceMetadata.go @@ -291,15 +291,6 @@ func getRegistryCaller() (reg *RegistryCaller) { return reg } -func GetRegistryCaller(ethHttpClient *ethclient.Client) *RegistryCaller { - registryContractAddress := getRegistryAddressKey() - reg, err := NewRegistryCaller(registryContractAddress, ethHttpClient) - if err != nil { - zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress)) - } - return reg -} - func GetRegistryFilterer(ethWsClient *ethclient.Client) *RegistryFilterer { registryContractAddress := getRegistryAddressKey() reg, err := NewRegistryFilterer(registryContractAddress, ethWsClient) From 5b3d65c65cc928cedbf3fb0a59db567d5f8a20a8 Mon Sep 17 00:00:00 2001 From: pls-github-dont-suspend-me Date: Wed, 11 Sep 2024 16:31:10 +0300 Subject: [PATCH 6/7] Fix log, add using slices.Compare --- .../listen_organization_metadata_changing.go | 32 ++++++++++--------- etcddb/etcddb_client.go | 2 +- utils/common.go | 12 ------- 3 files changed, 18 insertions(+), 28 deletions(-) diff --git a/contract_event_listener/listen_organization_metadata_changing.go b/contract_event_listener/listen_organization_metadata_changing.go index 23775fc0..2aa8188f 100644 --- a/contract_event_listener/listen_organization_metadata_changing.go +++ b/contract_event_listener/listen_organization_metadata_changing.go @@ -2,10 +2,10 @@ package contractlistener import ( "context" + "slices" "github.com/singnet/snet-daemon/blockchain" "github.com/singnet/snet-daemon/etcddb" - "github.com/singnet/snet-daemon/utils" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/gorilla/websocket" @@ -35,19 +35,21 @@ func (l *ContractEventListener) ListenOrganizationMetadataChanging() { for { select { case err := <-sub.Err(): - zap.L().Error("Subscription error: ", zap.Error(err)) - if websocket.IsCloseError( - err, - websocket.CloseNormalClosure, - websocket.CloseAbnormalClosure, - websocket.CloseGoingAway, - websocket.CloseServiceRestart, - websocket.CloseTryAgainLater, - websocket.CloseTLSHandshake, - ) { - err = l.BlockchainProcessor.ReconnectToWsClient() - if err != nil { - zap.L().Error("Error in reconnecting to websockets", zap.Error(err)) + if err != nil { + zap.L().Error("Subscription error: ", zap.Error(err)) + if websocket.IsCloseError( + err, + websocket.CloseNormalClosure, + websocket.CloseAbnormalClosure, + websocket.CloseGoingAway, + websocket.CloseServiceRestart, + websocket.CloseTryAgainLater, + websocket.CloseTLSHandshake, + ) { + err = l.BlockchainProcessor.ReconnectToWsClient() + if err != nil { + zap.L().Error("Error in reconnecting to websockets", zap.Error(err)) + } } } case logData := <-eventContractChannel: @@ -57,7 +59,7 @@ func (l *ContractEventListener) ListenOrganizationMetadataChanging() { newOrganizationMetaData := blockchain.GetOrganizationMetaData() zap.L().Info("Get new organization metadata", zap.Any("value", newOrganizationMetaData)) - if !utils.CompareSlices(l.CurrentOrganizationMetaData.GetPaymentStorageEndPoints(), newOrganizationMetaData.GetPaymentStorageEndPoints()) { + if slices.Compare(l.CurrentOrganizationMetaData.GetPaymentStorageEndPoints(), newOrganizationMetaData.GetPaymentStorageEndPoints()) > 0 { l.CurrentEtcdClient.Close() newEtcdbClient, err := etcddb.Reconnect(newOrganizationMetaData) if err != nil { diff --git a/etcddb/etcddb_client.go b/etcddb/etcddb_client.go index ff3650b6..d23009aa 100644 --- a/etcddb/etcddb_client.go +++ b/etcddb/etcddb_client.go @@ -107,7 +107,7 @@ func Reconnect(metadata *blockchain.OrganizationMetaData) (*EtcdClient, error) { if err != nil { return nil, err } - zap.L().Info("Successful reconnet to new etcd endpoints", zap.Strings("New enpoints", metadata.GetPaymentStorageEndPoints())) + zap.L().Info("Successful reconnet to new etcd endpoints", zap.Strings("New endpoints", metadata.GetPaymentStorageEndPoints())) return etcdClient, nil } diff --git a/utils/common.go b/utils/common.go index 2f0cc2d6..a62c6e7d 100644 --- a/utils/common.go +++ b/utils/common.go @@ -54,15 +54,3 @@ func CheckIfHttps(endpoints []string) bool { } return false } - -func CompareSlices[T comparable](s1, s2 []T) bool { - if len(s1) != len(s2) { - return false - } - for i := range s1 { - if s1[i] != s2[i] { - return false - } - } - return true -} From b449b6570193dfaf0ed4e263f3f6c3980628b684 Mon Sep 17 00:00:00 2001 From: pls-github-dont-suspend-me Date: Wed, 11 Sep 2024 16:35:36 +0300 Subject: [PATCH 7/7] Fix compare slices --- .../listen_organization_metadata_changing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contract_event_listener/listen_organization_metadata_changing.go b/contract_event_listener/listen_organization_metadata_changing.go index 2aa8188f..cf253459 100644 --- a/contract_event_listener/listen_organization_metadata_changing.go +++ b/contract_event_listener/listen_organization_metadata_changing.go @@ -59,7 +59,7 @@ func (l *ContractEventListener) ListenOrganizationMetadataChanging() { newOrganizationMetaData := blockchain.GetOrganizationMetaData() zap.L().Info("Get new organization metadata", zap.Any("value", newOrganizationMetaData)) - if slices.Compare(l.CurrentOrganizationMetaData.GetPaymentStorageEndPoints(), newOrganizationMetaData.GetPaymentStorageEndPoints()) > 0 { + if slices.Compare(l.CurrentOrganizationMetaData.GetPaymentStorageEndPoints(), newOrganizationMetaData.GetPaymentStorageEndPoints()) != 0 { l.CurrentEtcdClient.Close() newEtcdbClient, err := etcddb.Reconnect(newOrganizationMetaData) if err != nil {