Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/singnet/snet-daemon into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
semyon-dev committed Sep 11, 2024
2 parents 5e091f0 + 543fedd commit e650c12
Show file tree
Hide file tree
Showing 19 changed files with 367 additions and 75 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ data.etcd/
*.log


# vscode
.vscode

6 changes: 3 additions & 3 deletions authutils/auth_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ func CheckIfTokenHasExpired(expiredBlock *big.Int) error {

// CurrentBlock Get the current block number from on chain
func CurrentBlock() (*big.Int, error) {
if ethClient, err := blockchain.GetEthereumClient(); err != nil {
if ethHttpClient, _, err := blockchain.CreateEthereumClients(); err != nil {
return nil, err
} else {
defer ethClient.RawClient.Close()
defer ethHttpClient.RawClient.Close()
var currentBlockHex string
if err = ethClient.RawClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
if err = ethHttpClient.RawClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
zap.L().Error("error determining current block", zap.Error(err))
return nil, fmt.Errorf("error determining current block: %v", err)
}
Expand Down
49 changes: 40 additions & 9 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ type jobInfo struct {

type Processor struct {
enabled bool
ethClient *ethclient.Client
rawClient *rpc.Client
ethHttpClient *ethclient.Client
rawHttpClient *rpc.Client
ethWSClient *ethclient.Client
rawWSClient *rpc.Client
sigHasher func([]byte) []byte
privateKey *ecdsa.PrivateKey
address string
Expand All @@ -55,11 +57,13 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) {

// Setup ethereum client

if ethclients, err := GetEthereumClient(); err != nil {
if ethHttpClients, ethWSClients, err := CreateEthereumClients(); err != nil {
return p, errors.Wrap(err, "error creating RPC client")
} else {
p.rawClient = ethclients.RawClient
p.ethClient = ethclients.EthClient
p.rawHttpClient = ethHttpClients.RawClient
p.ethHttpClient = ethHttpClients.EthClient
p.rawWSClient = ethWSClients.RawClient
p.ethWSClient = ethWSClients.EthClient
}

// TODO: if address is not in config, try to load it using network
Expand All @@ -68,7 +72,7 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) {

p.escrowContractAddress = metadata.GetMpeAddress()

if mpe, err := NewMultiPartyEscrow(p.escrowContractAddress, p.ethClient); err != nil {
if mpe, err := NewMultiPartyEscrow(p.escrowContractAddress, p.ethHttpClient); err != nil {
return p, errors.Wrap(err, "error instantiating MultiPartyEscrow contract")
} else {
p.multiPartyEscrow = mpe
Expand All @@ -82,6 +86,23 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) {
return p, nil
}

func (processor *Processor) ReconnectToWsClient() error {
processor.ethWSClient.Close()
processor.rawHttpClient.Close()

zap.L().Debug("Try to reconnect to websocket client")

newEthWSClients, err := CreateWSEthereumClient()
if err != nil {
return err
}

processor.ethWSClient = newEthWSClients.EthClient
processor.rawWSClient = newEthWSClients.RawClient

return nil
}

func (processor *Processor) Enabled() (enabled bool) {
return processor.enabled
}
Expand All @@ -94,11 +115,19 @@ func (processor *Processor) MultiPartyEscrow() *MultiPartyEscrow {
return processor.multiPartyEscrow
}

func (processor *Processor) GetEthHttpClient() *ethclient.Client {
return processor.ethHttpClient
}

func (processor *Processor) GetEthWSClient() *ethclient.Client {
return processor.ethWSClient
}

func (processor *Processor) CurrentBlock() (currentBlock *big.Int, err error) {
// We have to do a raw call because the standard method of ethClient.HeaderByNumber(ctx, nil) errors on
// unmarshaling the response currently. See https://github.com/ethereum/go-ethereum/issues/3230
var currentBlockHex string
if err = processor.rawClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
if err = processor.rawHttpClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
zap.L().Error("error determining current block", zap.Error(err))
return nil, fmt.Errorf("error determining current block: %v", err)
}
Expand All @@ -114,6 +143,8 @@ func (processor *Processor) HasIdentity() bool {
}

func (processor *Processor) Close() {
processor.ethClient.Close()
processor.rawClient.Close()
processor.ethHttpClient.Close()
processor.rawHttpClient.Close()
processor.ethWSClient.Close()
processor.rawWSClient.Close()
}
50 changes: 40 additions & 10 deletions blockchain/ethereumClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package blockchain
import (
"context"
"encoding/base64"

"github.com/singnet/snet-daemon/config"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"github.com/singnet/snet-daemon/config"
)

type EthereumClient struct {
Expand All @@ -19,21 +21,49 @@ func basicAuth(username, password string) string {
return base64.StdEncoding.EncodeToString([]byte(auth))
}

func GetEthereumClient() (*EthereumClient, error) {
func CreateEthereumClients() (*EthereumClient, *EthereumClient, error) {
ethereumHttpClient, err := CreateHTTPEthereumClient()
if err != nil {
return nil, nil, err
}

ethereumWsClient, err := CreateWSEthereumClient()
if err != nil {
return nil, nil, err
}

return ethereumHttpClient, ethereumWsClient, nil
}

ethereumClient := new(EthereumClient)
if client, err := rpc.DialOptions(context.Background(),
config.GetBlockChainEndPoint(),
rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey)))); err != nil {
func CreateHTTPEthereumClient() (*EthereumClient, error) {
ethereumHttpClient := new(EthereumClient)
httpClient, err := rpc.DialOptions(
context.Background(),
config.GetBlockChainHTTPEndPoint(),
rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey))))
if err != nil {
return nil, errors.Wrap(err, "error creating RPC client")
} else {
ethereumClient.RawClient = client
ethereumClient.EthClient = ethclient.NewClient(client)
}

return ethereumClient, nil
ethereumHttpClient.RawClient = httpClient
ethereumHttpClient.EthClient = ethclient.NewClient(httpClient)
return ethereumHttpClient, nil
}

func CreateWSEthereumClient() (*EthereumClient, error) {
ethereumWsClient := new(EthereumClient)
wsClient, err := rpc.DialOptions(
context.Background(),
config.GetBlockChainWSEndPoint(),
rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey))))
if err != nil {
return nil, errors.Wrap(err, "error creating RPC WebSocket client")
}
ethereumWsClient.RawClient = wsClient
ethereumWsClient.EthClient = ethclient.NewClient(wsClient)
return ethereumWsClient, nil
}

func (ethereumClient *EthereumClient) Close() {
if ethereumClient != nil {
ethereumClient.EthClient.Close()
Expand Down
9 changes: 5 additions & 4 deletions blockchain/orginzationMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ func checkMandatoryFields(metaData *OrganizationMetaData) (err error) {
if metaData.daemonGroup.PaymentDetails.PaymentChannelStorageClient.Endpoints == nil {
err = fmt.Errorf("Mandatory field : ETCD Client Endpoints are mising for the Group %v ", metaData.daemonGroup.GroupName)
}
if &metaData.recipientPaymentAddress == nil {
err = fmt.Errorf("Mandatory field : Recipient Address is missing for the Group %v ", metaData.daemonGroup.GroupName)

if metaData.recipientPaymentAddress == (common.Address{}) {
err = fmt.Errorf("Mandatory field : Recepient Address is missing for the Group %v ", metaData.daemonGroup.GroupName)
}
return
return err
}

func setDerivedAttributes(metaData *OrganizationMetaData) (err error) {
Expand Down Expand Up @@ -187,7 +188,7 @@ func getMetaDataURI() []byte {

organizationRegistered, err := reg.GetOrganizationById(nil, orgId)
if err != nil || !organizationRegistered.Found {
zap.L().Panic("Error Retrieving contract details for the Given Organization", zap.String("OrganizationId", config.GetString(config.OrganizationId)))
zap.L().Panic("Error Retrieving contract details for the Given Organization", zap.String("OrganizationId", config.GetString(config.OrganizationId)), zap.Error(err))
}
return organizationRegistered.OrgMetadataURI[:]
}
Expand Down
16 changes: 13 additions & 3 deletions blockchain/serviceMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/bufbuild/protocompile"
pproto "github.com/emicklei/proto"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/pkg/errors"
"github.com/singnet/snet-daemon/config"
"github.com/singnet/snet-daemon/ipfsutils"
Expand Down Expand Up @@ -277,13 +278,22 @@ func ReadServiceMetaDataFromLocalFile(filename string) (*ServiceMetadata, error)
}

func getRegistryCaller() (reg *RegistryCaller) {
ethClient, err := GetEthereumClient()
ethHttpClient, err := CreateHTTPEthereumClient()
if err != nil {
zap.L().Panic("Unable to get Blockchain client ", zap.Error(err))
}
defer ethClient.Close()
defer ethHttpClient.Close()
registryContractAddress := getRegistryAddressKey()
reg, err = NewRegistryCaller(registryContractAddress, ethClient.EthClient)
reg, err = NewRegistryCaller(registryContractAddress, ethHttpClient.EthClient)
if err != nil {
zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress))
}
return reg
}

func GetRegistryFilterer(ethWsClient *ethclient.Client) *RegistryFilterer {
registryContractAddress := getRegistryAddressKey()
reg, err := NewRegistryFilterer(registryContractAddress, ethWsClient)
if err != nil {
zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress))
}
Expand Down
18 changes: 18 additions & 0 deletions blockchain/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,21 @@ func ToChecksumAddress(hexAddress string) string {
mixedAddress := common.NewMixedcaseAddress(address)
return mixedAddress.Address().String()
}

/*
MakeTopicFilterer is used to generate a filter for querying Ethereum logs or contract events.
Ethereum topics (such as for events) are 32-byte fixed-size values (common for hashing
in Ethereum logs). This function takes a string parameter, converts it into a 32-byte array,
and returns it in a slice. This allows developers to create filters when looking for
specific events or log entries based on the topic.
*/
func MakeTopicFilterer(param string) [][32]byte {
// Create a 32-byte array
var param32Byte [32]byte

// Convert the string to a byte slice and copy up to 32 bytes
copy(param32Byte[:], []byte(param)[:min(len(param), 32)])

// Return the filter with a single element (the 32-byte array)
return [][32]byte{param32Byte}
}
39 changes: 24 additions & 15 deletions config/blockchain_network_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ import (
)

type NetworkSelected struct {
NetworkName string
EthereumJSONRPCEndpoint string
NetworkId string
RegistryAddressKey string
NetworkName string
EthereumJSONRPCHTTPEndpoint string
EthereumJSONRPCWSEndpoint string
NetworkId string
RegistryAddressKey string
}

const (
BlockChainNetworkFileName = "resources/blockchain_network_config.json"
EthereumJsonRpcEndpointKey = "ethereum_json_rpc_endpoint"
NetworkId = "network_id"
RegistryAddressKey = "registry_address_key"
BlockChainNetworkFileName = "resources/blockchain_network_config.json"
EthereumJsonRpcHTTPEndpointKey = "ethereum_json_rpc_http_endpoint"
EthereumJsonRpcWSEndpointKey = "ethereum_json_rpc_ws_endpoint"
NetworkId = "network_id"
RegistryAddressKey = "registry_address_key"
)

var networkSelected = &NetworkSelected{}
Expand All @@ -38,7 +40,8 @@ func determineNetworkSelected(data []byte) (err error) {
//Ethereum End point and Network ID mapped to
networkSelected.NetworkName = networkName
networkSelected.RegistryAddressKey = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[RegistryAddressKey], RegistryAddressKey)
networkSelected.EthereumJSONRPCEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcEndpointKey], EthereumJsonRpcEndpointKey)
networkSelected.EthereumJSONRPCHTTPEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcHTTPEndpointKey], EthereumJsonRpcHTTPEndpointKey)
networkSelected.EthereumJSONRPCWSEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcWSEndpointKey], EthereumJsonRpcWSEndpointKey)
networkSelected.NetworkId = fmt.Sprintf("%v", dynamicBinding[networkName].(map[string]any)[NetworkId])

return err
Expand All @@ -61,9 +64,13 @@ func GetNetworkId() string {
return networkSelected.NetworkId
}

// Get the block chain end point associated with the Network selected
func GetBlockChainEndPoint() string {
return networkSelected.EthereumJSONRPCEndpoint
// Get the blockchain endpoint associated with the Network selected
func GetBlockChainHTTPEndPoint() string {
return networkSelected.EthereumJSONRPCHTTPEndpoint
}

func GetBlockChainWSEndPoint() string {
return networkSelected.EthereumJSONRPCWSEndpoint
}

// Get the Registry address of the contract
Expand Down Expand Up @@ -109,9 +116,11 @@ func deriveDatafromJSON(data []byte) (err error) {
networkSelected.RegistryAddressKey = fmt.Sprintf("%v", m[GetNetworkId()].(map[string]any)["address"])

zap.L().Info("Derive data from JSON", zap.String("Network", GetString(BlockChainNetworkSelected)),
zap.String("NetwrokId", GetNetworkId()),
zap.String("RegistryAddress", GetRegistryAddress()),
zap.String("Blockchain endpoint", GetBlockChainEndPoint()))
zap.String("Netwrok id", GetNetworkId()),
zap.String("Registry address", GetRegistryAddress()),
zap.String("Blockchain http endpoint", GetBlockChainHTTPEndPoint()),
zap.String("Blockchain ws endpoint", GetBlockChainWSEndPoint()),
)
return nil
}

Expand Down
24 changes: 14 additions & 10 deletions config/blockchain_network_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,35 @@ func TestGetNetworkId(t *testing.T) {
var defaultBlockChainNetworkConfig = `
{
"local": {
"ethereum_json_rpc_endpoint": "http://localhost:8545",
"network_id": "42",
"ethereum_json_rpc_http_endpoint": "http://localhost:8545",
"ethereum_json_rpc_ws_endpoint": "ws://localhost:443",
"network_id": "42",
"registry_address_key": "0x4e74fefa82e83e0964f0d9f53c68e03f7298a8b2"
},
"main": {
"ethereum_json_rpc_endpoint": "https://mainnet.infura.io/v3",
"ethereum_json_rpc_http_endpoint": "https://mainnet.infura.io/v3",
"ethereum_json_rpc_ws_endpoint": "wss://mainnet.infura.io/v3",
"network_id": "1"
},
"goerli": {
"ethereum_json_rpc_endpoint": "https://goerli.infura.io/v3",
"ethereum_json_rpc_http_endpoint": "https://goerli.infura.io/v3",
"ethereum_json_rpc_ws_endpoint": "wss://goerli.infura.io/v3",
"network_id": "5"
},
"sepolia": {
"ethereum_json_rpc_endpoint": "https://sepolia.infura.io/v3",
"ethereum_json_rpc_http_endpoint": "https://sepolia.infura.io/v3",
"ethereum_json_rpc_ws_endpoint": "wss://sepolia.infura.io/v3",
"network_id": "11155111"
}
}`

func TestGetBlockChainEndPoint(t *testing.T) {
Vip().Set(BlockChainNetworkSelected, "local")
determineNetworkSelected([]byte(defaultBlockChainNetworkConfig))

assert.Matches(t, GetBlockChainEndPoint(), GetString(BlockChainNetworkSelected))
err := determineNetworkSelected([]byte(defaultBlockChainNetworkConfig))
assert.Equal(t, err, nil)
assert.Matches(t, GetBlockChainHTTPEndPoint(), GetString(BlockChainNetworkSelected))
assert.Matches(t, GetBlockChainWSEndPoint(), GetString(BlockChainNetworkSelected))
assert2.NotEqual(t, GetNetworkId(), nil)

}

func TestGetRegistryAddress(t *testing.T) {
Expand Down Expand Up @@ -92,7 +96,7 @@ func Test_GetDetailsFromJsonOrConfig(t *testing.T) {
want string
network string
}{
{EthereumJsonRpcEndpointKey, "https://sepolia.infura.io/v3", "sepolia"},
{EthereumJsonRpcHTTPEndpointKey, "https://sepolia.infura.io/v3", "sepolia"},
{RegistryAddressKey, "0x4e74fefa82e83e0964f0d9f53c68e03f7298a8b2", "local"},
}
for _, tt := range tests {
Expand Down
Loading

0 comments on commit e650c12

Please sign in to comment.