Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Implement subscribe transaction statuses by transaction ID #6737

Draft
wants to merge 28 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ed80a0c
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Guitarheroua Dec 17, 2024
103d15d
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
peterargue Dec 17, 2024
4b7275b
Apply suggestions from code review
Guitarheroua Dec 20, 2024
5ede502
fixed remarks
Guitarheroua Dec 20, 2024
80b63eb
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Guitarheroua Dec 20, 2024
c1dc45a
Apply suggestions from code review
Guitarheroua Dec 27, 2024
0f1ad0d
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Guitarheroua Dec 27, 2024
e53c356
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
peterargue Dec 27, 2024
93c69c1
change to irrecoverable throw
Guitarheroua Dec 30, 2024
a162ba2
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
peterargue Jan 2, 2025
383b7e8
Implement remarks. Added txRefID for subscribe calls. Added test.
Guitarheroua Jan 8, 2025
7184c8e
Started to implement state detection on transaction suscription
Guitarheroua Jan 9, 2025
725ccda
Fixed issues with test failint
Guitarheroua Jan 13, 2025
25493da
Get rid of unnecessary api calls
Guitarheroua Jan 16, 2025
733a485
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiSl…
Guitarheroua Jan 16, 2025
1170a55
Fixed crash
Guitarheroua Jan 16, 2025
b427e33
Fixed issue with statuses sending
Guitarheroua Jan 16, 2025
03c5be5
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Guitarheroua Jan 17, 2025
d1d0466
Added more uinit tests for subscribe tx statuses
Guitarheroua Jan 20, 2025
4aa5d14
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Guitarheroua Jan 20, 2025
52bc521
linted
Guitarheroua Jan 20, 2025
65a0757
extract tx_sub_metadata structure
Guitarheroua Jan 22, 2025
fd4afd1
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Guitarheroua Jan 22, 2025
b291085
fixed test
Guitarheroua Jan 22, 2025
a3f45e1
Merge branch 'AndriiSlisarchuk/6573-pending-should-return-immediately…
Guitarheroua Jan 22, 2025
fa5f03f
simplify logic
Guitarheroua Jan 22, 2025
a51b1fb
finished refactoring
Guitarheroua Jan 22, 2025
02e1d83
linted
Guitarheroua Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions access/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,50 @@ type API interface {
//
// If invalid parameters will be supplied SubscribeBlockDigestsFromLatest will return a failed subscription.
SubscribeBlockDigestsFromLatest(ctx context.Context, blockStatus flow.BlockStatus) subscription.Subscription
// SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the
// transaction itself until the block containing the transaction becomes sealed or expired. When the transaction
// status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down.
SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block ID. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block ID. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
// SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block ID. The subscription streams status updates until the transaction
// reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). When the transaction reaches one of
// these final states, the subscription will automatically terminate.

//
// Parameters:
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
// - txID: The identifier of the transaction to monitor.
// - startBlockID: The block ID from which to start monitoring.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
SubscribeTransactionStatusesFromBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription

To me it seems that "Start" can be omitted but that is just a preference for shorter function names.

// SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block height. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block height. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
// SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block height. The subscription streams status updates until the transaction
// reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). When the transaction reaches one of
// these final states, the subscription will automatically terminate.

//
// Parameters:
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
// - txID: The unique identifier of the transaction to monitor.
// - startHeight: The block height from which to start monitoring.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
SubscribeTransactionStatusesFromHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription

// SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the latest block. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the latest block. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
// SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the latest block. The subscription streams status updates until the transaction
// reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). When the transaction reaches one of
// these final states, the subscription will automatically terminate.

//
// Parameters:
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
// - txID: The unique identifier of the transaction to monitor.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromLatest(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates.
// Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription
// automatically terminates.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates.
// Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription
// automatically terminates.
// SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates.
// Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction
// reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). Once the final status has been reached, the subscription
// automatically terminates.

//
// Parameters:
// - ctx: The context to manage the transaction sending and subscription lifecycle, including cancellation.
// - tx: The transaction body to be sent and monitored.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
//
// If the transaction cannot be sent, the subscription will fail and return a failed subscription.
SendAndSubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
}

// TODO: Combine this with flow.TransactionResult?
Expand Down
7 changes: 1 addition & 6 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,12 +1425,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
return status.Error(codes.InvalidArgument, err.Error())
}

err = h.api.SendTransaction(ctx, &tx)
if err != nil {
return err
}

sub := h.api.SubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())
sub := h.api.SendAndSubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())

messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleRPCSubscription(sub, func(txResults []*TransactionResult) error {
Expand Down
72 changes: 66 additions & 6 deletions access/mock/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 30 additions & 2 deletions cmd/util/cmd/run-script/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/onflow/flow-go/module/metrics"
)

var ErrNotImplemented = errors.New("not implemented")

var (
flagPayloads string
flagState string
Expand Down Expand Up @@ -532,10 +534,36 @@ func (*api) SubscribeBlockDigestsFromLatest(
return nil
}

func (*api) SubscribeTransactionStatuses(
func (a *api) SubscribeTransactionStatusesFromStartBlockID(
_ context.Context,
_ flow.Identifier,
_ flow.Identifier,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return subscription.NewFailedSubscription(ErrNotImplemented, "failed to call SubscribeTransactionStatusesFromStartBlockID")
}

func (a *api) SubscribeTransactionStatusesFromStartHeight(
_ context.Context,
_ flow.Identifier,
_ uint64,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return subscription.NewFailedSubscription(ErrNotImplemented, "failed to call SubscribeTransactionStatusesFromStartHeight")
}

func (a *api) SubscribeTransactionStatusesFromLatest(
_ context.Context,
_ flow.Identifier,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return subscription.NewFailedSubscription(ErrNotImplemented, "failed to call SubscribeTransactionStatusesFromLatest")
}

func (a *api) SendAndSubscribeTransactionStatuses(
_ context.Context,
_ *flow.TransactionBody,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return nil
return subscription.NewFailedSubscription(ErrNotImplemented, "failed to call SendAndSubscribeTransactionStatuses")
}
1 change: 1 addition & 0 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func New(params Params) (*Backend, error) {
executionResults: params.ExecutionResults,
subscriptionHandler: params.SubscriptionHandler,
blockTracker: params.BlockTracker,
sendTransaction: b.SendTransaction,
}

retry.SetBackend(b)
Expand Down
Loading
Loading