Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add implementation for transaction statuses data providers #6818

Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
38e7151
Implemented pending transaction trigger
Guitarheroua Nov 19, 2024
2ac2742
fixed tests
Guitarheroua Nov 19, 2024
8eee89a
Added documentation
Guitarheroua Nov 19, 2024
6766a38
revert unnecessary changes
Guitarheroua Nov 19, 2024
6456d40
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Guitarheroua Dec 2, 2024
378afa8
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Guitarheroua Dec 4, 2024
8ced75a
Implemented subsscribe tx statuses. Make refactoring.
Guitarheroua Dec 9, 2024
ff1a95f
linted
Guitarheroua Dec 9, 2024
c817414
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Guitarheroua Dec 9, 2024
815419b
Clean up code. Added comments
Guitarheroua Dec 10, 2024
a9e7bc3
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk Dec 11, 2024
2bbd399
Added skeleton for tx account data provider
AndriiDiachuk Dec 11, 2024
683f026
Implemented tx statuses provider
AndriiDiachuk Dec 11, 2024
3b19cd0
Separate subscribe tx statuses to 3 function
Guitarheroua Dec 12, 2024
699b243
Added invalid params test
AndriiDiachuk Dec 13, 2024
71c7f6c
Merge branch 'AndriiSlisarchuk/6573-pending-should-return-immediately…
AndriiDiachuk Dec 13, 2024
39f7a48
Merged
AndriiDiachuk Dec 13, 2024
9ed9207
Changed parse args function, added invalid args testcases
AndriiDiachuk Dec 13, 2024
d660c7f
Linted
AndriiDiachuk Dec 13, 2024
3126cd9
Changed api in test to mock api
AndriiDiachuk Dec 13, 2024
213a5ed
Merge branch 'onflow:master' into AndriiDiachuk/6586-tx-statuses-data…
UlyanaAndrukhiv Dec 17, 2024
e028162
Merge branch 'AndriiDiachuk/6586-tx-statuses-data-providers-impl' of …
AndriiDiachuk Dec 17, 2024
2ec3817
Linted
AndriiDiachuk Dec 17, 2024
a7a6f76
Fixed type in the log msg
AndriiDiachuk Dec 17, 2024
e34d787
Added new topic for send and subcribe tx statuses
AndriiDiachuk Dec 18, 2024
3492dc1
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk Dec 19, 2024
08f4b29
Refactored tx statuses parse function
AndriiDiachuk Dec 19, 2024
80c97a6
Added test for tx statuses data provider for msgIndex increment
AndriiDiachuk Dec 19, 2024
317eb0e
Implemented parse function for send and susubdcribe tx statuses provider
AndriiDiachuk Dec 19, 2024
679ef2f
Fixed typos and added missing invalid cases to godoc
AndriiDiachuk Dec 23, 2024
3e828b6
fixed small remarks
AndriiDiachuk Dec 23, 2024
71bac49
Added tests for send and subscribe data provider
AndriiDiachuk Dec 23, 2024
f71edd5
Removed handleResponse func and using generic HandleResponse instead
AndriiDiachuk Dec 24, 2024
7874b11
Merge branch 'master' into AndriiDiachuk/6586-tx-statuses-data-provid…
AndriiDiachuk Dec 24, 2024
3630b39
Merge branch 'master' into AndriiDiachuk/6586-tx-statuses-data-provid…
Guitarheroua Dec 26, 2024
74c8c6d
Renamed topic, deleted redundant godoc
AndriiDiachuk Dec 26, 2024
4bd61bd
Merge branch 'AndriiDiachuk/6586-tx-statuses-data-providers-impl' of …
AndriiDiachuk Dec 26, 2024
c4b1d0a
Fixed worng impl for tx statuses data privoders, fixed tests< renamed…
AndriiDiachuk Dec 27, 2024
bd8e32f
Merge branch 'master' into AndriiDiachuk/6586-tx-statuses-data-provid…
peterargue Dec 27, 2024
aa60fbe
Merge branch 'master' into AndriiDiachuk/6586-tx-statuses-data-provid…
AndriiDiachuk Jan 2, 2025
2a8e15e
Added defer for closing provider
AndriiDiachuk Jan 2, 2025
79c5655
Moved defer after error check
AndriiDiachuk Jan 2, 2025
ea7e457
Removed commented code
AndriiDiachuk Jan 2, 2025
66a79b5
Merge branch 'master' into AndriiDiachuk/6586-tx-statuses-data-provid…
AndriiDiachuk Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions access/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,50 @@ type API interface {
//
// If invalid parameters will be supplied SubscribeBlockDigestsFromLatest will return a failed subscription.
SubscribeBlockDigestsFromLatest(ctx context.Context, blockStatus flow.BlockStatus) subscription.Subscription
// SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the
// transaction itself until the block containing the transaction becomes sealed or expired. When the transaction
// status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down.
SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block ID. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
//
// Parameters:
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
// - txID: The identifier of the transaction to monitor.
// - startBlockID: The block ID from which to start monitoring.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block height. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
//
// Parameters:
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
// - txID: The unique identifier of the transaction to monitor.
// - startHeight: The block height from which to start monitoring.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the latest block. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
//
// Parameters:
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
// - txID: The unique identifier of the transaction to monitor.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromLatest(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SendAndSubscribeTransactionStatuses sends a transaction to the network and subscribes to its status updates.
// Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription
// automatically terminates.
//
// Parameters:
// - ctx: The context to manage the transaction sending and subscription lifecycle, including cancellation.
// - tx: The transaction body to be sent and monitored.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
//
// If the transaction cannot be sent, the subscription will fail and return a failed subscription.
SendAndSubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
}

// TODO: Combine this with flow.TransactionResult?
Expand Down
7 changes: 1 addition & 6 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,12 +1425,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
return status.Error(codes.InvalidArgument, err.Error())
}

err = h.api.SendTransaction(ctx, &tx)
if err != nil {
return err
}

sub := h.api.SubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())
sub := h.api.SendAndSubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())

messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleRPCSubscription(sub, func(txResults []*TransactionResult) error {
Expand Down
72 changes: 66 additions & 6 deletions access/mock/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion cmd/util/cmd/run-script/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,32 @@ func (*api) SubscribeBlockDigestsFromLatest(
return nil
}

func (*api) SubscribeTransactionStatuses(
func (a *api) SubscribeTransactionStatusesFromStartBlockID(
_ context.Context,
_ flow.Identifier,
_ flow.Identifier,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return nil
}

func (a *api) SubscribeTransactionStatusesFromStartHeight(
_ context.Context,
_ flow.Identifier,
_ uint64,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return nil
}

func (a *api) SubscribeTransactionStatusesFromLatest(
_ context.Context,
_ flow.Identifier,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return nil
}
func (a *api) SendAndSubscribeTransactionStatuses(
_ context.Context,
_ *flow.TransactionBody,
_ entities.EventEncodingVersion,
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func NewServer(serverAPI access.API,
serverAPI,
chain,
stateStreamConfig.EventFilterConfig,
stateStreamConfig.HeartbeatInterval)
stateStreamConfig.HeartbeatInterval,
)
builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory)

c := cors.New(cors.Options{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
s.Require().NotNil(provider)
s.Require().NoError(err)

// Ensure the provider is properly closed after the test
defer provider.Close()

// Run the provider in a separate goroutine to simulate subscription processing
go func() {
err = provider.Run()
Expand Down Expand Up @@ -263,7 +266,4 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
currentIndex := responses[i].MessageIndex
s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1")
}

// Ensure the provider is properly closed after the test
provider.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (s *EventsProviderSuite) TestEventsDataProvider_HappyPath() {
Events: expectedEvents,
BlockTimestamp: s.rootBlock.Header.Timestamp,
})

}

testHappyPath(
Expand Down Expand Up @@ -252,9 +251,13 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath()
s.chain,
state_stream.DefaultEventFilterConfig,
subscription.DefaultHeartbeatInterval)

s.Require().NotNil(provider)
s.Require().NoError(err)

// Ensure the provider is properly closed after the test
defer provider.Close()

// Run the provider in a separate goroutine to simulate subscription processing
go func() {
err = provider.Run()
Expand Down Expand Up @@ -291,6 +294,6 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath()
s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1")
}

// Ensure the provider is properly closed after the test
provider.Close()
//// Ensure the provider is properly closed after the test
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
//provider.Close()
}
18 changes: 10 additions & 8 deletions engine/access/rest/websockets/data_providers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
// Constants defining various topic names used to specify different types of
// data providers.
const (
EventsTopic = "events"
AccountStatusesTopic = "account_statuses"
BlocksTopic = "blocks"
BlockHeadersTopic = "block_headers"
BlockDigestsTopic = "block_digests"
TransactionStatusesTopic = "transaction_statuses"
EventsTopic = "events"
AccountStatusesTopic = "account_statuses"
BlocksTopic = "blocks"
BlockHeadersTopic = "block_headers"
BlockDigestsTopic = "block_digests"
TransactionStatusesTopic = "transaction_statuses"
SendAndGetTransactionStatusesTopic = "send_and_get_transaction_statuses"
)

// DataProviderFactory defines an interface for creating data providers
Expand Down Expand Up @@ -103,8 +104,9 @@ func (s *DataProviderFactoryImpl) NewDataProvider(
case AccountStatusesTopic:
return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval)
case TransactionStatusesTopic:
// TODO: Implemented handlers for each topic should be added in respective case
return nil, fmt.Errorf(`topic "%s" not implemented yet`, topic)
return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch)
case SendAndGetTransactionStatusesTopic:
return NewSendAndGetTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch)
default:
return nil, fmt.Errorf("unsupported topic \"%s\"", topic)
}
Expand Down
22 changes: 22 additions & 0 deletions engine/access/rest/websockets/data_providers/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() {
s.stateStreamApi.AssertExpectations(s.T())
},
},
{
name: "transaction statuses topic",
topic: TransactionStatusesTopic,
arguments: models.Arguments{},
setupSubscription: func() {
s.setupSubscription(s.accessApi.On("SubscribeTransactionStatusesFromLatest", mock.Anything, mock.Anything, mock.Anything))
},
assertExpectations: func() {
s.stateStreamApi.AssertExpectations(s.T())
},
},
{
name: "send transaction statuses topic",
topic: SendAndGetTransactionStatusesTopic,
arguments: models.Arguments{},
setupSubscription: func() {
s.setupSubscription(s.accessApi.On("SendAndSubscribeTransactionStatuses", mock.Anything, mock.Anything, mock.Anything))
},
assertExpectations: func() {
s.stateStreamApi.AssertExpectations(s.T())
},
},
}

for _, test := range testCases {
Expand Down
Loading
Loading