diff --git a/rpc/ws/accountSubscribe.go b/rpc/ws/accountSubscribe.go index 8c54229a..0bd51168 100644 --- a/rpc/ws/accountSubscribe.go +++ b/rpc/ws/accountSubscribe.go @@ -30,11 +30,11 @@ type AccountResult struct { // AccountSubscribe subscribes to an account to receive notifications // when the lamports or data for a given account public key changes. -func (cl *Client) AccountSubscribe( +func (c *Client) AccountSubscribe( account solana.PublicKey, commitment rpc.CommitmentType, ) (*AccountSubscription, error) { - return cl.AccountSubscribeWithOpts( + return c.AccountSubscribeWithOpts( account, commitment, "", @@ -43,7 +43,7 @@ func (cl *Client) AccountSubscribe( // AccountSubscribe subscribes to an account to receive notifications // when the lamports or data for a given account public key changes. -func (cl *Client) AccountSubscribeWithOpts( +func (c *Client) AccountSubscribeWithOpts( account solana.PublicKey, commitment rpc.CommitmentType, encoding solana.EncodingType, @@ -60,7 +60,7 @@ func (cl *Client) AccountSubscribeWithOpts( conf["encoding"] = encoding } - genSub, err := cl.subscribe( + genSub, err := c.subscribe( params, conf, "accountSubscribe", diff --git a/rpc/ws/blockSubscribe.go b/rpc/ws/blockSubscribe.go index e1530f11..9009fcbc 100644 --- a/rpc/ws/blockSubscribe.go +++ b/rpc/ws/blockSubscribe.go @@ -80,7 +80,7 @@ type BlockSubscribeOpts struct { // **This subscription is unstable and only available if the validator was started // with the `--rpc-pubsub-enable-block-subscription` flag. The format of this // subscription may change in the future** -func (cl *Client) BlockSubscribe( +func (c *Client) BlockSubscribe( filter BlockSubscribeFilter, opts *BlockSubscribeOpts, ) (*BlockSubscription, error) { @@ -125,7 +125,7 @@ func (cl *Client) BlockSubscribe( params = append(params, obj) } } - genSub, err := cl.subscribe( + genSub, err := c.subscribe( params, nil, "blockSubscribe", diff --git a/rpc/ws/client.go b/rpc/ws/client.go index 261b4532..c72ef73a 100644 --- a/rpc/ws/client.go +++ b/rpc/ws/client.go @@ -172,6 +172,13 @@ func (c *Client) handleMessage(message []byte) { // when receiving message with id. the result will be a subscription number. // that number will be associated to all future message destine to this request + // Check for an error in the message. + if errorCode, errMsg, ok := getJsonRpcError(message); ok { + fmt.Printf("Error received in websocket message: Code: %d, Message: %s\n", errorCode, errMsg) + return + } + + // Handle message with ID: this is a subscription response. requestID, ok := getUint64WithOk(message, "id") if ok { subID, _ := getUint64WithOk(message, "result") @@ -179,10 +186,23 @@ func (c *Client) handleMessage(message []byte) { return } + // Handle message associated with a subscription ID. subID, _ := getUint64WithOk(message, "params", "subscription") c.handleSubscriptionMessage(subID, message) } +// getJsonRpcError checks if the message contains a JSON-RPC error. +// Returns the error code, error message, and a boolean indicating if an error was present. +func getJsonRpcError(message []byte) (errorCode int64, errMsg string, ok bool) { + if val, dataType, _, err := jsonparser.Get(message, "error"); err == nil && dataType == jsonparser.Object { + code, _ := jsonparser.GetInt(val, "code") + msg, _ := jsonparser.GetString(val, "message") + return code, msg, true + } + + return 0, "", false +} + func (c *Client) handleNewSubscriptionMessage(requestID, subID uint64) { c.lock.Lock() defer c.lock.Unlock() diff --git a/rpc/ws/logsSubscribe.go b/rpc/ws/logsSubscribe.go index 1a9b5e03..2d3448fd 100644 --- a/rpc/ws/logsSubscribe.go +++ b/rpc/ws/logsSubscribe.go @@ -46,25 +46,25 @@ const ( ) // LogsSubscribe subscribes to transaction logging. -func (cl *Client) LogsSubscribe( +func (c *Client) LogsSubscribe( // Filter criteria for the logs to receive results by account type. filter LogsSubscribeFilterType, commitment rpc.CommitmentType, // (optional) ) (*LogSubscription, error) { - return cl.logsSubscribe( + return c.logsSubscribe( filter, commitment, ) } // LogsSubscribe subscribes to all transactions that mention the provided Pubkey. -func (cl *Client) LogsSubscribeMentions( +func (c *Client) LogsSubscribeMentions( // Subscribe to all transactions that mention the provided Pubkey. mentions solana.PublicKey, // (optional) commitment rpc.CommitmentType, ) (*LogSubscription, error) { - return cl.logsSubscribe( + return c.logsSubscribe( rpc.M{ "mentions": []string{mentions.String()}, }, @@ -73,7 +73,7 @@ func (cl *Client) LogsSubscribeMentions( } // LogsSubscribe subscribes to transaction logging. -func (cl *Client) logsSubscribe( +func (c *Client) logsSubscribe( filter interface{}, commitment rpc.CommitmentType, ) (*LogSubscription, error) { @@ -84,7 +84,7 @@ func (cl *Client) logsSubscribe( conf["commitment"] = commitment } - genSub, err := cl.subscribe( + genSub, err := c.subscribe( params, conf, "logsSubscribe", diff --git a/rpc/ws/programSubscribe.go b/rpc/ws/programSubscribe.go index fb94491a..512aa216 100644 --- a/rpc/ws/programSubscribe.go +++ b/rpc/ws/programSubscribe.go @@ -28,11 +28,11 @@ type ProgramResult struct { // ProgramSubscribe subscribes to a program to receive notifications // when the lamports or data for a given account owned by the program changes. -func (cl *Client) ProgramSubscribe( +func (c *Client) ProgramSubscribe( programID solana.PublicKey, commitment rpc.CommitmentType, ) (*ProgramSubscription, error) { - return cl.ProgramSubscribeWithOpts( + return c.ProgramSubscribeWithOpts( programID, commitment, "", @@ -42,7 +42,7 @@ func (cl *Client) ProgramSubscribe( // ProgramSubscribe subscribes to a program to receive notifications // when the lamports or data for a given account owned by the program changes. -func (cl *Client) ProgramSubscribeWithOpts( +func (c *Client) ProgramSubscribeWithOpts( programID solana.PublicKey, commitment rpc.CommitmentType, encoding solana.EncodingType, @@ -63,7 +63,7 @@ func (cl *Client) ProgramSubscribeWithOpts( conf["filters"] = filters } - genSub, err := cl.subscribe( + genSub, err := c.subscribe( params, conf, "programSubscribe", diff --git a/rpc/ws/rootSubscribe.go b/rpc/ws/rootSubscribe.go index f54f30f5..8fef48c3 100644 --- a/rpc/ws/rootSubscribe.go +++ b/rpc/ws/rootSubscribe.go @@ -18,8 +18,8 @@ type RootResult uint64 // SignatureSubscribe subscribes to receive notification // anytime a new root is set by the validator. -func (cl *Client) RootSubscribe() (*RootSubscription, error) { - genSub, err := cl.subscribe( +func (c *Client) RootSubscribe() (*RootSubscription, error) { + genSub, err := c.subscribe( nil, nil, "rootSubscribe", diff --git a/rpc/ws/signatureSubscribe.go b/rpc/ws/signatureSubscribe.go index 134d244f..efd3c513 100644 --- a/rpc/ws/signatureSubscribe.go +++ b/rpc/ws/signatureSubscribe.go @@ -34,7 +34,7 @@ type SignatureResult struct { // SignatureSubscribe subscribes to a transaction signature to receive // notification when the transaction is confirmed On signatureNotification, // the subscription is automatically cancelled -func (cl *Client) SignatureSubscribe( +func (c *Client) SignatureSubscribe( signature solana.Signature, // Transaction Signature. commitment rpc.CommitmentType, // (optional) ) (*SignatureSubscription, error) { @@ -44,7 +44,7 @@ func (cl *Client) SignatureSubscribe( conf["commitment"] = commitment } - genSub, err := cl.subscribe( + genSub, err := c.subscribe( params, conf, "signatureSubscribe", diff --git a/rpc/ws/slotSubscribe.go b/rpc/ws/slotSubscribe.go index 05096bad..a369c18b 100644 --- a/rpc/ws/slotSubscribe.go +++ b/rpc/ws/slotSubscribe.go @@ -21,8 +21,8 @@ type SlotResult struct { } // SlotSubscribe subscribes to receive notification anytime a slot is processed by the validator. -func (cl *Client) SlotSubscribe() (*SlotSubscription, error) { - genSub, err := cl.subscribe( +func (c *Client) SlotSubscribe() (*SlotSubscription, error) { + genSub, err := c.subscribe( nil, nil, "slotSubscribe", diff --git a/rpc/ws/slotsUpdatesSubscribe.go b/rpc/ws/slotsUpdatesSubscribe.go index 45712434..19ac8e04 100644 --- a/rpc/ws/slotsUpdatesSubscribe.go +++ b/rpc/ws/slotsUpdatesSubscribe.go @@ -53,8 +53,8 @@ const ( // // This subscription is unstable; the format of this subscription // may change in the future and it may not always be supported. -func (cl *Client) SlotsUpdatesSubscribe() (*SlotsUpdatesSubscription, error) { - genSub, err := cl.subscribe( +func (c *Client) SlotsUpdatesSubscribe() (*SlotsUpdatesSubscription, error) { + genSub, err := c.subscribe( nil, nil, "slotsUpdatesSubscribe", diff --git a/rpc/ws/voteSubscribe.go b/rpc/ws/voteSubscribe.go index 1d4aebc6..29adcf8e 100644 --- a/rpc/ws/voteSubscribe.go +++ b/rpc/ws/voteSubscribe.go @@ -35,8 +35,8 @@ type VoteResult struct { // This subscription is unstable and only available if the validator // was started with the --rpc-pubsub-enable-vote-subscription flag. // The format of this subscription may change in the future. -func (cl *Client) VoteSubscribe() (*VoteSubscription, error) { - genSub, err := cl.subscribe( +func (c *Client) VoteSubscribe() (*VoteSubscription, error) { + genSub, err := c.subscribe( nil, nil, "voteSubscribe",