From a58e372e29af28a51176b828797697847b1ac5e9 Mon Sep 17 00:00:00 2001 From: Stoyan Panayotov Date: Thu, 26 Aug 2021 14:45:25 +0300 Subject: [PATCH 1/2] Integration with Hedera Hashgraph Signed-off-by: Stoyan Panayotov --- .gitignore | 2 + blockchain/common.go | 14 +- blockchain/hedera.go | 441 ++++++++++++++++++ blockchain/hedera_test.go | 243 ++++++++++ client/service.go | 3 +- store/database.go | 14 +- store/migrations/migrate.go | 6 + .../migrations/migration1614545435/migrate.go | 58 +++ 8 files changed, 777 insertions(+), 4 deletions(-) create mode 100644 blockchain/hedera.go create mode 100644 blockchain/hedera_test.go create mode 100644 store/migrations/migration1614545435/migrate.go diff --git a/.gitignore b/.gitignore index 96e7dd0e..dc7c9adb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ db/ .env external-initiator +.DS_STORE +.idea diff --git a/blockchain/common.go b/blockchain/common.go index 3be40dd0..8586cbf3 100644 --- a/blockchain/common.go +++ b/blockchain/common.go @@ -46,6 +46,7 @@ var blockchains = []string{ BIRITA, Agoric, Klaytn, + HEDERA, } type Params struct { @@ -53,6 +54,7 @@ type Params struct { Addresses []string `json:"addresses"` Topics []string `json:"topics"` AccountIds []string `json:"accountIds"` + AccountId string `json:"accountId"` Address string `json:"address"` UpkeepID string `json:"upkeepId"` ServiceName string `json:"serviceName"` @@ -98,6 +100,8 @@ func CreateClientManager(sub store.Subscription) (subscriber.ISubscriber, error) return createKeeperSubscriber(sub) case BIRITA: return createBSNIritaSubscriber(sub) + case HEDERA: + return createHederaSubscriber(sub), nil } return nil, errors.New("unknown blockchain type for Client subscription") @@ -106,7 +110,7 @@ func CreateClientManager(sub store.Subscription) (subscriber.ISubscriber, error) func GetConnectionType(endpoint store.Endpoint) (subscriber.Type, error) { switch endpoint.Type { // Add blockchain implementations that encapsulate entire connection here - case XTZ, ONT, IOTX, Keeper, BIRITA: + case XTZ, ONT, IOTX, Keeper, BIRITA, HEDERA: return subscriber.Client, nil default: u, err := url.Parse(endpoint.Url) @@ -177,6 +181,10 @@ func GetValidations(t string, params Params) []int { return []int{ 1, } + case HEDERA: + return []int{ + len(params.AccountId), + } } return nil @@ -228,6 +236,10 @@ func CreateSubscription(sub *store.Subscription, params Params) { } case Agoric: sub.Agoric = store.AgoricSubscription{} + case HEDERA: + sub.Hedera = store.HederaSubscription{ + AccountId: params.AccountId, + } } } diff --git a/blockchain/hedera.go b/blockchain/hedera.go new file mode 100644 index 00000000..cacd47ed --- /dev/null +++ b/blockchain/hedera.go @@ -0,0 +1,441 @@ +package blockchain + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "github.com/smartcontractkit/chainlink/core/logger" + "github.com/smartcontractkit/external-initiator/store" + "github.com/smartcontractkit/external-initiator/subscriber" + "github.com/spf13/viper" + "io/ioutil" + "net/http" + "strconv" + "strings" + "time" +) + +const HEDERA = "hedera" +var tokenId string +var minPayment int64 +var hederaSubscribersMap = make(map[string][]hederaSubscription) + +func addHederaSubscriber(key string, value hederaSubscription) { + hederaSubscribersMap[key] = append(hederaSubscribersMap[key], value) +} + +func containsJobId(hederaSubscriptions []hederaSubscription, expected string) bool { + for _, hs := range hederaSubscriptions { + if hs.jobid == expected { + return true + } + } + return false +} + +func createHederaSubscriber(sub store.Subscription) hederaSubscriber { + + return hederaSubscriber{ + Endpoint: sub.Endpoint.Url, + AccountId: sub.Hedera.AccountId, + JobID: sub.Job, + } +} + +type hederaSubscriber struct { + Endpoint string + AccountId string + JobID string +} + +type hederaSubscription struct { + endpoint string + events chan<- subscriber.Event + accountId string + monitorResp *http.Response + isDone bool + jobid string +} + +type HederaConfig struct { + TokenId string `mapstructure:"TOKEN_ID"` + MinPayment int64 `mapstructure:"MIN_PAYMENT"` +} + +type EventInfo struct { + HederaTopicId string `json:"hederaTopicId"` +} + +func LoadConfig(configName string) (hederaConfig HederaConfig, err error) { + viper.AddConfigPath(".") + viper.SetConfigType("env") + viper.SetConfigName(configName) + + viper.AutomaticEnv() + + err = viper.ReadInConfig() + if err != nil { + return + } + + err = viper.Unmarshal(&hederaConfig) + return +} + +func (hSubscr hederaSubscriber) SubscribeToEvents(channel chan<- subscriber.Event, _ store.RuntimeConfig) (subscriber.ISubscription, error) { + + hederaSubscription := hederaSubscription{ + endpoint: hSubscr.Endpoint, + events: channel, + accountId: hSubscr.AccountId, + jobid: hSubscr.JobID, + } + + if len(hederaSubscribersMap[hederaSubscription.accountId]) == 0 { + var client = NewClient(hederaSubscription.endpoint, 5) + go client.WaitForTransaction(hederaSubscription.accountId) + } + + addHederaSubscriber(hederaSubscription.accountId, hederaSubscription) + + return hederaSubscription, nil +} + +func (hSubscr hederaSubscriber) Test() error { + + hederaConfig, err := LoadConfig(".env") + if err != nil { + logger.Error(err) + return err + } else if hederaConfig.TokenId == "" { + return errors.New("LINK Token ID is missing! Please set LINK Token ID to .env configuration file") + } + + tokenId = hederaConfig.TokenId + minPayment = hederaConfig.MinPayment + + var client = NewClient(hSubscr.Endpoint, 0) + response, err := client.GetAccountByAccountId(hSubscr.AccountId) + if err != nil { + return errors.New("Error while getting the Account information") + } + + if response == nil || response.Accounts == nil || len(response.Accounts) != 1 { + return errors.New("Please check that you have recorded the Hedera's AccountId correctly") + } + + account := response.Accounts[0] + if account.Deleted { + errorMessage := fmt.Sprintf("Account with ID: %s is deleted", hSubscr.AccountId) + return errors.New(errorMessage) + } + + if account.Balance.Tokens == nil { + errorMessage := fmt.Sprintf("Account with ID: %s is not assigned to LINK token with ID: %s", hSubscr.AccountId, tokenId) + return errors.New(errorMessage) + } + + isAccountHasAssignedLinkToken := false + for _, token := range account.Balance.Tokens { + if token.TokenId == tokenId { + isAccountHasAssignedLinkToken = true + } + } + + if !isAccountHasAssignedLinkToken { + errorMessage := fmt.Sprintf("Account with ID: %s is not assigned to LINK token with ID: %s", hSubscr.AccountId, tokenId) + return errors.New(errorMessage) + } + + return nil +} + +func (hederaSubscription hederaSubscription) Unsubscribe() { + logger.Info("Unsubscribing from Hedera endpoint", hederaSubscription.endpoint) + hederaSubscription.isDone = true + if hederaSubscription.monitorResp != nil { + hederaSubscription.monitorResp.Body.Close() + } +} + +type Client struct { + mirrorAPIAddress string + httpClient *http.Client + pollingInterval time.Duration +} + +type ( + // Transaction struct used by the Hedera Mirror node REST API + Transaction struct { + ConsensusTimestamp string `json:"consensus_timestamp"` + EntityId string `json:"entity_id"` + TransactionHash string `json:"transaction_hash"` + ValidStartTimestamp string `json:"valid_start_timestamp"` + ChargedTxFee int `json:"charged_tx_fee"` + MemoBase64 string `json:"memo_base64"` + Result string `json:"result"` + Name string `json:"name"` + MaxFee string `json:"max_fee"` + ValidDurationSeconds string `json:"valid_duration_seconds"` + Node string `json:"node"` + Scheduled bool `json:"scheduled"` + TransactionID string `json:"transaction_id"` + Transfers []Transfer `json:"transfers"` + TokenTransfers []Transfer `json:"token_transfers"` + } + Account struct { + Balance Balance `json:"balance"` + Account string `json:"account"` + ExpiryTimestamp string `json:"expiry_timestamp"` + AutoRenewPeriod int `json:"auto_renew_period"` + Key Key `json:"key"` + Deleted bool `json:"deleted"` + } + Balance struct { + Timestamp string `json:"timestamp"` + Balance int64 `json:"balance"` + Tokens []Token `json:"tokens"` + } + Token struct { + TokenId string `json:"token_id"` + Balance int64 `json:"balance"` + } + Key struct { + Type string `json:"_type"` + Key string `json:"key"` + } + // Transfer struct used by the Hedera Mirror node REST API + Transfer struct { + Account string `json:"account"` + Amount int64 `json:"amount"` + // When retrieving ordinary hbar transfers, this field does not get populated + Token string `json:"token_id"` + } + // Response struct used by the Hedera Mirror node REST API and returned once + // account transactions are queried + Response struct { + Transactions []Transaction + Accounts []Account + Status `json:"_status"` + } + ErrorMessage struct { + Message string `json:"message"` + } + Status struct { + Messages []ErrorMessage + } +) + +func NewClient(mirrorNodeAPIAddress string, pollingInterval time.Duration) *Client { + return &Client{ + mirrorAPIAddress: mirrorNodeAPIAddress, + pollingInterval: pollingInterval, + httpClient: &http.Client{}, + } +} + +func (c Client) GetAccountCreditTransactionsAfterTimestamp(accountID string, from int64) (*Response, error) { + transactionsDownloadQuery := fmt.Sprintf("?account.id=%s&type=credit&result=success×tamp=gt:%s&order=asc&transactiontype=cryptotransfer", + accountID, + String(from)) + return c.getTransactionsByQuery(transactionsDownloadQuery) +} + +func (c Client) GetAccountByAccountId(accountID string) (*Response, error) { + accountQuery := fmt.Sprintf("?account.id=%s", accountID) + return c.getAccountByQuery(accountQuery) +} + +// WaitForTransaction Polls the transaction at intervals. +func (c Client) WaitForTransaction(accountId string) { + + logger.Infof("Listening for events on account id: %v", accountId) + lastTransactionTimestamp := time.Now().UnixNano() + for { + response, err := c.GetAccountCreditTransactionsAfterTimestamp(accountId, lastTransactionTimestamp) + + if err != nil { + logger.Errorf("Error while trying to get account. Error: [%s].", err.Error()) + return + } + + if response != nil { + numberOfTransactions := len(response.Transactions) + if numberOfTransactions != 0 { + transactionTimestamp, err := FromString(response.Transactions[numberOfTransactions-1].ConsensusTimestamp) + if err != nil { + logger.Errorf(err.Error()) + return + } + lastTransactionTimestamp = transactionTimestamp + } + for _, transaction := range response.Transactions { + if !checkForValidTokenTransfer(transaction.TokenTransfers, accountId) { + continue + } + + if len(transaction.MemoBase64) == 0 { + continue + } + + decodedMemo, err := DecodeMemo(transaction.MemoBase64) + if err != nil { + logger.Error("Failed decoding base64:", err) + continue + } + + hederaTransactionInfo := strings.Fields(decodedMemo) + if len(hederaTransactionInfo) != 2 { + logger.Error("Invalid transaction info format") + continue + } + extractedTopicId := hederaTransactionInfo[0] + extractedJobId := hederaTransactionInfo[1] + + if !containsJobId(hederaSubscribersMap[accountId], extractedJobId) { + continue + } else { + hederaSubs := hederaSubscribersMap[accountId] + for _, hs := range hederaSubs { + if hs.jobid == extractedJobId { + bytes, err := json.Marshal(EventInfo{HederaTopicId: extractedTopicId}) + if err != nil { + logger.Errorf("error!") + } + hs.events <- bytes + } + } + } + } + } + + time.Sleep(c.pollingInterval * time.Second) + } +} + +const ( + nanosInSecond = 1000000000 +) + +// FromString parses a string in the format `{seconds}.{nanos}` into int64 timestamp +func FromString(timestamp string) (int64, error) { + var err error + stringTimestamp := strings.Split(timestamp, ".") + + seconds, err := strconv.ParseInt(stringTimestamp[0], 10, 64) + if err != nil { + return 0, errors.New("invalid timestamp seconds provided") + } + nano, err := strconv.ParseInt(stringTimestamp[1], 10, 64) + if err != nil { + return 0, errors.New("invalid timestamp nanos provided") + } + + return seconds*nanosInSecond + nano, nil +} + +// String parses int64 timestamp into `{seconds}.{nanos}` string +func String(timestamp int64) string { + seconds := timestamp / nanosInSecond + nano := timestamp % nanosInSecond + return fmt.Sprintf("%d.%d", seconds, nano) +} + +func DecodeMemo(base64Memo string) (string, error) { + decodedMemo, err := base64.StdEncoding.DecodeString(base64Memo) + if err != nil { + return "", err + } + + return string(decodedMemo), nil +} + +func checkForValidTokenTransfer(tokenTransfers []Transfer, accountId string) bool { + isValid := false + if tokenTransfers != nil && len(tokenTransfers) > 0 { + for _, tokenTransfer := range tokenTransfers { + if tokenTransfer.Token == tokenId && tokenTransfer.Account == accountId && tokenTransfer.Amount >= minPayment { + isValid = true + } + } + } + + return isValid +} + +func (c Client) getAccountByQuery(query string) (*Response, error) { + transactionsQuery := fmt.Sprintf("%s%s%s", c.mirrorAPIAddress, "accounts", query) + httpResponse, e := c.get(transactionsQuery) + if e != nil { + return nil, e + } + + bodyBytes, e := readResponseBody(httpResponse) + if e != nil { + return nil, e + } + + var response *Response + e = json.Unmarshal(bodyBytes, &response) + if e != nil { + return nil, e + } + if httpResponse.StatusCode >= 400 { + return response, errors.New(fmt.Sprintf(`Failed to execute query: [%s]. Error: [%s]`, query, response.Status.String())) + } + + return response, nil +} + +func (c Client) getTransactionsByQuery(query string) (*Response, error) { + transactionsQuery := fmt.Sprintf("%s%s%s", c.mirrorAPIAddress, "transactions", query) + httpResponse, e := c.get(transactionsQuery) + if e != nil { + return nil, e + } + + bodyBytes, e := readResponseBody(httpResponse) + if e != nil { + return nil, e + } + + var response *Response + e = json.Unmarshal(bodyBytes, &response) + if e != nil { + return nil, e + } + if httpResponse.StatusCode >= 400 { + return response, errors.New(fmt.Sprintf(`Failed to execute query: [%s]. Error: [%s]`, query, response.Status.String())) + } + + return response, nil +} + +func (c Client) get(query string) (*http.Response, error) { + return c.httpClient.Get(query) +} + +func readResponseBody(response *http.Response) ([]byte, error) { + defer response.Body.Close() + + return ioutil.ReadAll(response.Body) +} + +// String converts the Status struct to human readable string +func (s *Status) String() string { + r := "[" + for i, m := range s.Messages { + r += m.String() + if i != len(s.Messages)-1 { + r += ", " + } + } + r += "]" + return r +} + +// String converts ErrorMessage struct to human readable string +func (m *ErrorMessage) String() string { + return fmt.Sprintf("message: %s", m.Message) +} diff --git a/blockchain/hedera_test.go b/blockchain/hedera_test.go new file mode 100644 index 00000000..b211bbc6 --- /dev/null +++ b/blockchain/hedera_test.go @@ -0,0 +1,243 @@ +package blockchain + +import ( + "errors" + "github.com/smartcontractkit/external-initiator/store" + "github.com/stretchr/testify/assert" + "reflect" + "testing" +) + +func init() { + tokenId = "0.0.2138566" + minPayment = 1000000000 +} + +func TestHedera_CreateHederaSubscriber(t *testing.T) { + tests := []struct { + name string + args store.Subscription + want hederaSubscriber + }{ + { + "empty", + store.Subscription{}, + hederaSubscriber{}, + }, + { + "enpoint only", + store.Subscription{ + Endpoint: store.Endpoint{Url: "http://example.com/api"}, + }, + hederaSubscriber{Endpoint: "http://example.com/api"}, + }, + { + "endpoint and accountId only", + store.Subscription{ + Endpoint: store.Endpoint{Url: "http://example.com/api"}, + Hedera: store.HederaSubscription{AccountId: "0.0.1234"}, + }, + hederaSubscriber{ + Endpoint: "http://example.com/api", + AccountId: "0.0.1234", + }, + }, + { + "endpoint and accountId and jobId", + store.Subscription{ + Endpoint: store.Endpoint{Url: "http://example.com/api"}, + Hedera: store.HederaSubscription{AccountId: "0.0.1234"}, + Job: "8fb39e9048844fe58de078d46d8bc9d0"}, + hederaSubscriber{ + Endpoint: "http://example.com/api", + AccountId: "0.0.1234", + JobID: "8fb39e9048844fe58de078d46d8bc9d0", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sub := createHederaSubscriber(tt.args) + if !reflect.DeepEqual(sub, tt.want) { + t.Errorf("createHederaSubscriber() = %s, want %s", sub, tt.want) + } + }) + } +} + +func TestHedera_DecodeMemo(t *testing.T) { + tests := []struct { + name string + arg string + want string + }{ + { + "empty", + "", + "", + }, + { + "decode topic id and job id", + "MC4wLjIxMTM4MTQtZnR2cGEgNTMyZmFiOTA4ZTJhNDg3MGI3ZDUwZTI4ZWViMGEzMjU=", + "0.0.2113814-ftvpa 532fab908e2a4870b7d50e28eeb0a325", + }, + { + "decode incorrect memo", + "-1", + "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got, _ := DecodeMemo(tt.arg); !reflect.DeepEqual(got, tt.want) { + t.Errorf("DecodeMemo(string) = %s, want %s", got, tt.want) + } + }) + } +} + +func TestHedera_FromString(t *testing.T) { + tests := []struct { + name string + arg string + want int64 + wantErr error + }{ + { + "empty", + "", + 0, + errors.New("invalid timestamp seconds provided"), + }, + { + "empty seconds", + ".033488000", + 0, + errors.New("invalid timestamp seconds provided"), + }, + { + "empty nanoseconds", + "1626699754.", + 0, + errors.New("invalid timestamp nanos provided"), + }, + { + "correct timestamp", + "1626699754.033488000", + 1626699754033488000, + nil, + }, + { + "incomplete nanoseconds", + "1626699754.0", + 1626699754000000000, + nil, + }, + { + "incomplete seconds", + "0.033488000", + 33488000, + nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := FromString(tt.arg) + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("FromString(int64) got = %d, want %d", got, tt.want) + } + if err != nil && !assert.EqualError(t, err, tt.wantErr.Error()) { + t.Errorf("FromString(int64) Error should be: %v, got: %v", tt.wantErr, err) + } + }) + } +} + +func TestHedera_String(t *testing.T) { + tests := []struct { + name string + arg int64 + want string + }{ + { + "zero", + 0, + "0.0", + }, + { + "zero seconds", + 33488000, + "0.33488000", + }, + { + "correct timestamp", + 1626699754033488000, + "1626699754.33488000", + }, + { + "short timestamp", + 1626699754, + "1.626699754", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := String(tt.arg); !reflect.DeepEqual(got, tt.want) { + t.Errorf("String(string) = %s, want %s", got, tt.want) + } + }) + } +} + +func TestHedera_CheckForValidTokenTransfer(t *testing.T) { + tests := []struct { + name string + arg1 []Transfer + arg2 string + want bool + }{ + { + "valid transfer", + []Transfer{ + { + Token: "0.0.2138566", + Amount: 2500000000, + Account: "0.0.1967249", + }, + { + Token: "0.0.2138569", + Amount: 2500000000, + Account: "0.0.1967249", + }, + }, + "0.0.1967249", + true, + }, + { + "invalid transfer", + []Transfer{ + { + Token: "0.0.2138569", + Amount: 2500000000, + Account: "0.0.1967249", + }, + { + Token: "0.0.2138569", + Amount: 2500000000, + Account: "0.0.1967249", + }, + }, + "0.0.1967249", + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + if got := checkForValidTokenTransfer(tt.arg1, tt.arg2); !reflect.DeepEqual(got, tt.want) { + t.Errorf("checkForValidTokenTransfer([]Transfer, string) = %t, want %t", got, tt.want) + } + }) + } +} diff --git a/client/service.go b/client/service.go index f8e1c552..6ef6b36f 100644 --- a/client/service.go +++ b/client/service.go @@ -156,7 +156,8 @@ func (srv *Service) Run() error { return err } - for _, sub := range subs { + for i, _ := range subs { + sub := subs[i] iSubscriber, err := srv.getAndTestSubscription(&sub) if err != nil { logger.Error(err) diff --git a/store/database.go b/store/database.go index 9cd0d249..afb563aa 100644 --- a/store/database.go +++ b/store/database.go @@ -6,14 +6,13 @@ import ( "database/sql/driver" "encoding/csv" "fmt" - "io" - "github.com/ethereum/go-ethereum/common" "github.com/jinzhu/gorm" _ "github.com/jinzhu/gorm/dialects/postgres" "github.com/pkg/errors" "github.com/smartcontractkit/chainlink/core/logger" "github.com/smartcontractkit/external-initiator/store/migrations" + "io" ) const sqlDialect = "postgres" @@ -161,6 +160,10 @@ func (client Client) prepareSubscription(rawSub *Subscription) (*Subscription, e if err := client.db.Model(&sub).Related(&sub.Agoric).Error; err != nil { return nil, err } + case "hedera": + if err := client.db.Model(&sub).Related(&sub.Hedera).Error; err != nil { + return nil, err + } } return &sub, nil @@ -302,6 +305,7 @@ type Subscription struct { Keeper KeeperSubscription BSNIrita BSNIritaSubscription Agoric AgoricSubscription + Hedera HederaSubscription } type EthSubscription struct { @@ -367,3 +371,9 @@ type AgoricSubscription struct { gorm.Model SubscriptionId uint } + +type HederaSubscription struct { + gorm.Model + SubscriptionId uint + AccountId string +} diff --git a/store/migrations/migrate.go b/store/migrations/migrate.go index 60919a9a..ec4127e4 100644 --- a/store/migrations/migrate.go +++ b/store/migrations/migrate.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/external-initiator/store/migrations/migration1610281978" "github.com/smartcontractkit/external-initiator/store/migrations/migration1611169747" "github.com/smartcontractkit/external-initiator/store/migrations/migration1613356332" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1614545435" "gopkg.in/gormigrate.v1" ) @@ -96,6 +97,11 @@ func Migrate(db *gorm.DB) error { Migrate: migration1613356332.Migrate, Rollback: migration1613356332.Rollback, }, + { + ID: "1614545435", + Migrate: migration1614545435.Migrate, + Rollback: migration1614545435.Rollback, + }, } m := gormigrate.New(db, &options, migrations) diff --git a/store/migrations/migration1614545435/migrate.go b/store/migrations/migration1614545435/migrate.go new file mode 100644 index 00000000..79b4ea6d --- /dev/null +++ b/store/migrations/migration1614545435/migrate.go @@ -0,0 +1,58 @@ +package migration1614545435 + +import ( + "github.com/jinzhu/gorm" + "github.com/pkg/errors" + "github.com/smartcontractkit/external-initiator/store/migrations/migration0" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1576509489" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1576783801" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1587897988" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1592829052" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1594317706" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1599849837" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1608026935" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1610281978" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1613356332" +) + +type HederaSubscription struct { + gorm.Model + SubscriptionId uint + AccountId string +} + +type Subscription struct { + gorm.Model + ReferenceId string `gorm:"unique;not null"` + Job string + EndpointName string + Ethereum migration0.EthSubscription + Tezos migration1576509489.TezosSubscription + Substrate migration1576783801.SubstrateSubscription + Ontology migration1587897988.OntSubscription + BinanceSmartChain migration1592829052.BinanceSmartChainSubscription + NEAR migration1594317706.NEARSubscription + Conflux migration1599849837.CfxSubscription + Keeper migration1608026935.KeeperSubscription + BSNIrita migration1610281978.BSNIritaSubscription + Agoric migration1613356332.AgoricSubscription + Hedera HederaSubscription +} + +func Migrate(tx *gorm.DB) error { + err := tx.AutoMigrate(&Subscription{}).Error + if err != nil { + return errors.Wrap(err, "failed to auto migrate Subscription") + } + + err = tx.AutoMigrate(&HederaSubscription{}).AddForeignKey("subscription_id", "subscriptions(id)", "CASCADE", "CASCADE").Error + if err != nil { + return errors.Wrap(err, "failed to auto migrate HederaSubscription") + } + + return nil +} + +func Rollback(tx *gorm.DB) error { + return tx.DropTable("hedera_subscriptions").Error +} From 3265a6af3bc3b34bf51505671145574df59c3384 Mon Sep 17 00:00:00 2001 From: Rusi Boyadjiev Date: Tue, 14 Sep 2021 10:35:42 +0300 Subject: [PATCH 2/2] * fixed PR comments and removed accountId field from Params struct * fixed hedera_tests * Fixed connection to DB Signed-off-by: Rusi Boyadjiev --- blockchain/common.go | 7 +++-- blockchain/hedera.go | 12 ++++++--- blockchain/hedera_test.go | 27 ++++++++++++------- store/database.go | 2 +- store/migrations/migrate.go | 8 +++--- .../migrate.go | 4 +-- 6 files changed, 36 insertions(+), 24 deletions(-) rename store/migrations/{migration1614545435 => migration1631086126}/migrate.go (97%) diff --git a/blockchain/common.go b/blockchain/common.go index 8586cbf3..63b0d4ae 100644 --- a/blockchain/common.go +++ b/blockchain/common.go @@ -54,7 +54,6 @@ type Params struct { Addresses []string `json:"addresses"` Topics []string `json:"topics"` AccountIds []string `json:"accountIds"` - AccountId string `json:"accountId"` Address string `json:"address"` UpkeepID string `json:"upkeepId"` ServiceName string `json:"serviceName"` @@ -101,7 +100,7 @@ func CreateClientManager(sub store.Subscription) (subscriber.ISubscriber, error) case BIRITA: return createBSNIritaSubscriber(sub) case HEDERA: - return createHederaSubscriber(sub), nil + return createHederaSubscriber(sub) } return nil, errors.New("unknown blockchain type for Client subscription") @@ -183,7 +182,7 @@ func GetValidations(t string, params Params) []int { } case HEDERA: return []int{ - len(params.AccountId), + len(params.AccountIds), } } @@ -238,7 +237,7 @@ func CreateSubscription(sub *store.Subscription, params Params) { sub.Agoric = store.AgoricSubscription{} case HEDERA: sub.Hedera = store.HederaSubscription{ - AccountId: params.AccountId, + AccountIds: params.AccountIds, } } } diff --git a/blockchain/hedera.go b/blockchain/hedera.go index cacd47ed..17659c01 100644 --- a/blockchain/hedera.go +++ b/blockchain/hedera.go @@ -34,13 +34,17 @@ func containsJobId(hederaSubscriptions []hederaSubscription, expected string) bo return false } -func createHederaSubscriber(sub store.Subscription) hederaSubscriber { +func createHederaSubscriber(sub store.Subscription) (*hederaSubscriber, error) { - return hederaSubscriber{ + if len(sub.Hedera.AccountIds) != 1 { + return nil, errors.New("The AccountIds array should contain only one account id") + } + + return &hederaSubscriber{ Endpoint: sub.Endpoint.Url, - AccountId: sub.Hedera.AccountId, + AccountId: sub.Hedera.AccountIds[0], JobID: sub.Job, - } + }, nil } type hederaSubscriber struct { diff --git a/blockchain/hedera_test.go b/blockchain/hedera_test.go index b211bbc6..63ee245c 100644 --- a/blockchain/hedera_test.go +++ b/blockchain/hedera_test.go @@ -17,50 +17,59 @@ func TestHedera_CreateHederaSubscriber(t *testing.T) { tests := []struct { name string args store.Subscription - want hederaSubscriber + want *hederaSubscriber + wantErr error }{ { "empty", store.Subscription{}, - hederaSubscriber{}, + nil, + errors.New("The AccountIds array should contain only one account id"), }, { - "enpoint only", + "endpoint only", store.Subscription{ Endpoint: store.Endpoint{Url: "http://example.com/api"}, }, - hederaSubscriber{Endpoint: "http://example.com/api"}, + nil, + errors.New("The AccountIds array should contain only one account id"), }, { "endpoint and accountId only", store.Subscription{ Endpoint: store.Endpoint{Url: "http://example.com/api"}, - Hedera: store.HederaSubscription{AccountId: "0.0.1234"}, + Hedera: store.HederaSubscription{AccountIds: []string{"0.0.1234"}}, }, - hederaSubscriber{ + &hederaSubscriber{ Endpoint: "http://example.com/api", AccountId: "0.0.1234", }, + nil, }, { "endpoint and accountId and jobId", store.Subscription{ Endpoint: store.Endpoint{Url: "http://example.com/api"}, - Hedera: store.HederaSubscription{AccountId: "0.0.1234"}, + Hedera: store.HederaSubscription{AccountIds: []string{"0.0.1234"}}, Job: "8fb39e9048844fe58de078d46d8bc9d0"}, - hederaSubscriber{ + &hederaSubscriber{ Endpoint: "http://example.com/api", AccountId: "0.0.1234", JobID: "8fb39e9048844fe58de078d46d8bc9d0", }, + nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - sub := createHederaSubscriber(tt.args) + sub, err := createHederaSubscriber(tt.args) if !reflect.DeepEqual(sub, tt.want) { t.Errorf("createHederaSubscriber() = %s, want %s", sub, tt.want) } + + if err != nil && !assert.EqualError(t, err, tt.wantErr.Error()) { + t.Errorf("FromString(int64) Error should be: %v, got: %v", tt.wantErr, err) + } }) } } diff --git a/store/database.go b/store/database.go index afb563aa..c2eb230e 100644 --- a/store/database.go +++ b/store/database.go @@ -375,5 +375,5 @@ type AgoricSubscription struct { type HederaSubscription struct { gorm.Model SubscriptionId uint - AccountId string + AccountIds SQLStringArray } diff --git a/store/migrations/migrate.go b/store/migrations/migrate.go index ec4127e4..d0e9d6e2 100644 --- a/store/migrations/migrate.go +++ b/store/migrations/migrate.go @@ -17,7 +17,7 @@ import ( "github.com/smartcontractkit/external-initiator/store/migrations/migration1610281978" "github.com/smartcontractkit/external-initiator/store/migrations/migration1611169747" "github.com/smartcontractkit/external-initiator/store/migrations/migration1613356332" - "github.com/smartcontractkit/external-initiator/store/migrations/migration1614545435" + "github.com/smartcontractkit/external-initiator/store/migrations/migration1631086126" "gopkg.in/gormigrate.v1" ) @@ -98,9 +98,9 @@ func Migrate(db *gorm.DB) error { Rollback: migration1613356332.Rollback, }, { - ID: "1614545435", - Migrate: migration1614545435.Migrate, - Rollback: migration1614545435.Rollback, + ID: "1631086126", + Migrate: migration1631086126.Migrate, + Rollback: migration1631086126.Rollback, }, } diff --git a/store/migrations/migration1614545435/migrate.go b/store/migrations/migration1631086126/migrate.go similarity index 97% rename from store/migrations/migration1614545435/migrate.go rename to store/migrations/migration1631086126/migrate.go index 79b4ea6d..8c81fb49 100644 --- a/store/migrations/migration1614545435/migrate.go +++ b/store/migrations/migration1631086126/migrate.go @@ -1,4 +1,4 @@ -package migration1614545435 +package migration1631086126 import ( "github.com/jinzhu/gorm" @@ -18,7 +18,7 @@ import ( type HederaSubscription struct { gorm.Model SubscriptionId uint - AccountId string + AccountIds string } type Subscription struct {