Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error handling to WebSocket handleMessage function, resolves #175 #210

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rpc/ws/accountSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type AccountResult struct {

// AccountSubscribe subscribes to an account to receive notifications
// when the lamports or data for a given account public key changes.
func (cl *Client) AccountSubscribe(
func (c *Client) AccountSubscribe(
account solana.PublicKey,
commitment rpc.CommitmentType,
) (*AccountSubscription, error) {
return cl.AccountSubscribeWithOpts(
return c.AccountSubscribeWithOpts(
account,
commitment,
"",
Expand All @@ -43,7 +43,7 @@ func (cl *Client) AccountSubscribe(

// AccountSubscribe subscribes to an account to receive notifications
// when the lamports or data for a given account public key changes.
func (cl *Client) AccountSubscribeWithOpts(
func (c *Client) AccountSubscribeWithOpts(
account solana.PublicKey,
commitment rpc.CommitmentType,
encoding solana.EncodingType,
Expand All @@ -60,7 +60,7 @@ func (cl *Client) AccountSubscribeWithOpts(
conf["encoding"] = encoding
}

genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
conf,
"accountSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/blockSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type BlockSubscribeOpts struct {
// **This subscription is unstable and only available if the validator was started
// with the `--rpc-pubsub-enable-block-subscription` flag. The format of this
// subscription may change in the future**
func (cl *Client) BlockSubscribe(
func (c *Client) BlockSubscribe(
filter BlockSubscribeFilter,
opts *BlockSubscribeOpts,
) (*BlockSubscription, error) {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (cl *Client) BlockSubscribe(
params = append(params, obj)
}
}
genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
nil,
"blockSubscribe",
Expand Down
20 changes: 20 additions & 0 deletions rpc/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,37 @@ func (c *Client) handleMessage(message []byte) {
// when receiving message with id. the result will be a subscription number.
// that number will be associated to all future message destine to this request

// Check for an error in the message.
if errorCode, errMsg, ok := getJsonRpcError(message); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt.Printf("Error received in websocket message: Code: %d, Message: %s\n", errorCode, errMsg)
return
}

// Handle message with ID: this is a subscription response.
requestID, ok := getUint64WithOk(message, "id")
if ok {
subID, _ := getUint64WithOk(message, "result")
c.handleNewSubscriptionMessage(requestID, subID)
return
}

// Handle message associated with a subscription ID.
subID, _ := getUint64WithOk(message, "params", "subscription")
c.handleSubscriptionMessage(subID, message)
}

// getJsonRpcError checks if the message contains a JSON-RPC error.
// Returns the error code, error message, and a boolean indicating if an error was present.
func getJsonRpcError(message []byte) (errorCode int64, errMsg string, ok bool) {
if val, dataType, _, err := jsonparser.Get(message, "error"); err == nil && dataType == jsonparser.Object {
code, _ := jsonparser.GetInt(val, "code")
msg, _ := jsonparser.GetString(val, "message")
return code, msg, true
}

return 0, "", false
}

func (c *Client) handleNewSubscriptionMessage(requestID, subID uint64) {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down
12 changes: 6 additions & 6 deletions rpc/ws/logsSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,25 @@ const (
)

// LogsSubscribe subscribes to transaction logging.
func (cl *Client) LogsSubscribe(
func (c *Client) LogsSubscribe(
// Filter criteria for the logs to receive results by account type.
filter LogsSubscribeFilterType,
commitment rpc.CommitmentType, // (optional)
) (*LogSubscription, error) {
return cl.logsSubscribe(
return c.logsSubscribe(
filter,
commitment,
)
}

// LogsSubscribe subscribes to all transactions that mention the provided Pubkey.
func (cl *Client) LogsSubscribeMentions(
func (c *Client) LogsSubscribeMentions(
// Subscribe to all transactions that mention the provided Pubkey.
mentions solana.PublicKey,
// (optional)
commitment rpc.CommitmentType,
) (*LogSubscription, error) {
return cl.logsSubscribe(
return c.logsSubscribe(
rpc.M{
"mentions": []string{mentions.String()},
},
Expand All @@ -73,7 +73,7 @@ func (cl *Client) LogsSubscribeMentions(
}

// LogsSubscribe subscribes to transaction logging.
func (cl *Client) logsSubscribe(
func (c *Client) logsSubscribe(
filter interface{},
commitment rpc.CommitmentType,
) (*LogSubscription, error) {
Expand All @@ -84,7 +84,7 @@ func (cl *Client) logsSubscribe(
conf["commitment"] = commitment
}

genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
conf,
"logsSubscribe",
Expand Down
8 changes: 4 additions & 4 deletions rpc/ws/programSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ type ProgramResult struct {

// ProgramSubscribe subscribes to a program to receive notifications
// when the lamports or data for a given account owned by the program changes.
func (cl *Client) ProgramSubscribe(
func (c *Client) ProgramSubscribe(
programID solana.PublicKey,
commitment rpc.CommitmentType,
) (*ProgramSubscription, error) {
return cl.ProgramSubscribeWithOpts(
return c.ProgramSubscribeWithOpts(
programID,
commitment,
"",
Expand All @@ -42,7 +42,7 @@ func (cl *Client) ProgramSubscribe(

// ProgramSubscribe subscribes to a program to receive notifications
// when the lamports or data for a given account owned by the program changes.
func (cl *Client) ProgramSubscribeWithOpts(
func (c *Client) ProgramSubscribeWithOpts(
programID solana.PublicKey,
commitment rpc.CommitmentType,
encoding solana.EncodingType,
Expand All @@ -63,7 +63,7 @@ func (cl *Client) ProgramSubscribeWithOpts(
conf["filters"] = filters
}

genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
conf,
"programSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/rootSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type RootResult uint64

// SignatureSubscribe subscribes to receive notification
// anytime a new root is set by the validator.
func (cl *Client) RootSubscribe() (*RootSubscription, error) {
genSub, err := cl.subscribe(
func (c *Client) RootSubscribe() (*RootSubscription, error) {
genSub, err := c.subscribe(
nil,
nil,
"rootSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/signatureSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type SignatureResult struct {
// SignatureSubscribe subscribes to a transaction signature to receive
// notification when the transaction is confirmed On signatureNotification,
// the subscription is automatically cancelled
func (cl *Client) SignatureSubscribe(
func (c *Client) SignatureSubscribe(
signature solana.Signature, // Transaction Signature.
commitment rpc.CommitmentType, // (optional)
) (*SignatureSubscription, error) {
Expand All @@ -44,7 +44,7 @@ func (cl *Client) SignatureSubscribe(
conf["commitment"] = commitment
}

genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
conf,
"signatureSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/slotSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type SlotResult struct {
}

// SlotSubscribe subscribes to receive notification anytime a slot is processed by the validator.
func (cl *Client) SlotSubscribe() (*SlotSubscription, error) {
genSub, err := cl.subscribe(
func (c *Client) SlotSubscribe() (*SlotSubscription, error) {
genSub, err := c.subscribe(
nil,
nil,
"slotSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/slotsUpdatesSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ const (
//
// This subscription is unstable; the format of this subscription
// may change in the future and it may not always be supported.
func (cl *Client) SlotsUpdatesSubscribe() (*SlotsUpdatesSubscription, error) {
genSub, err := cl.subscribe(
func (c *Client) SlotsUpdatesSubscribe() (*SlotsUpdatesSubscription, error) {
genSub, err := c.subscribe(
nil,
nil,
"slotsUpdatesSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/voteSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type VoteResult struct {
// This subscription is unstable and only available if the validator
// was started with the --rpc-pubsub-enable-vote-subscription flag.
// The format of this subscription may change in the future.
func (cl *Client) VoteSubscribe() (*VoteSubscription, error) {
genSub, err := cl.subscribe(
func (c *Client) VoteSubscribe() (*VoteSubscription, error) {
genSub, err := c.subscribe(
nil,
nil,
"voteSubscribe",
Expand Down