Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/pipeline: Create functional producer for BufferedStorageBackend #5412

Closed
sreuland opened this issue Aug 6, 2024 · 4 comments · Fixed by #5462
Closed

ingest/pipeline: Create functional producer for BufferedStorageBackend #5412

sreuland opened this issue Aug 6, 2024 · 4 comments · Fixed by #5462

Comments

@sreuland
Copy link
Contributor

sreuland commented Aug 6, 2024

What problem does your feature solve?

BufferedStorageBackend provides ledger close meta(LCM) individually per GetLedger, but there is no more efficient way to participate as a streaming producer of LCM's.

What would you like to see?

Follow design proposal on Functional Processors Lib
Provide a ‘Producer’ function for BufferedStorage Backend.
The function will be used as ‘producer’ operator in a pipeline, emitting tx-meta LCM over a callback fn and acts as a closures to encapsulate private instance of BufferedStorageBackend and avoid any unintended side effects.

// the returned channel will either return one error or it will be closed when the publishing is finished for bounded range
// if it's unbounded, then chan would never be closed.

PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
                                            bufferedConfig ledgerbackend.BufferedStorageBackendConfig,
                                            dataStoreConfig datastore.DataStoreConfig 
                                            ctx context.Context, 
                                            callback func (xdr.LedgerCloseMeta) error) chan error
    
}

The method will return immediate, creating an async worker routine in background to continue processing.

  • it uses whatever prepared ledger range was configured for the backend.
  • it goes into a loop of invoking internal LedgerBacked.GetLedger to iterate each LedgerCloseMeta retrieved for the given prepared ledger range.
  • It invokes the callback func per each LedgerCloseMeta.
  • If the range was unbounded then it runs infinite until ctx is canceled.

Visualization of where the producer function fits in the larger CDP design for data transformation pipeline:

dual_func_general

Relates to:

What alternatives are there?

new streaming ingestion app use cases would have to implement the same locally.

@tamirms
Copy link
Contributor

tamirms commented Aug 9, 2024

I think there are some subtleties to this interface change that we should consider:

  • If captive-core terminates unexpectedly this interface does not allow us to communicate that error to the caller
  • What happens if you call Publish() and while that go routine is running you call GetLedger() concurrently?
  • What happens if you call Publish()multiple times before the go routine finishes running?
  • What happens if you call Publish() and then call PrepareRange() before the go routine finishes running?

Given these potential issues, I think for the MVP we should avoid changing the LedgerBackend interface. In the future, as we see more uses of the ingestion library we can come up with some helper functions which will reduce boilerplate.

@sreuland
Copy link
Contributor Author

  • If captive-core terminates unexpectedly this interface does not allow us to communicate that error to the caller

publishing can return a channel to propagate completion status to the caller, an error is sent on channel and then closed or if no errors and finished publishing for requested range, then channel is closed.

  • What happens if you call Publish() and while that go routine is running you call GetLedger() concurrently?
  • What happens if you call Publish()multiple times before the go routine finishes running?
  • What happens if you call Publish() and then call PrepareRange() before the go routine finishes running?

yes, to avoid these re-entrancy problems with LegerBackend instance needed to drive publishing, I think can skip adding the notion of publishing on to LedgerBackend, instead go with functional closures to encapsulate a private instance of LedgerBackend for publishing concerns, internally the closures can iterate on GetLedger(), to avoid any side effects and edge cases related to the underlying backend, the net change for new proposal would be:

// this will create a private instance of CaptiveStellarCore using NewCaptive() 
PublishFromCaptiveCore(ledgerRange ledgerbackend.Range,
                                            captiveCoreConfig ledgerbackend.CaptiveCoreConfig, 
                                            ctx context.Context, 
                                            callback func (xdr.LedgerCloseMeta) error) chan error

// this will create a private instance of BufferedStorageBackend using NewBufferedStorageBackend()
PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
                                            bufferedConfig ledgerbackend.BufferedStorageBackendConfig,
                                            dataStoreConfig datastore.DataStoreConfig 
                                            ctx context.Context, 
                                            callback func (xdr.LedgerCloseMeta) error) chan error
    
}

Given these potential issues, I think for the MVP we should avoid changing the LedgerBackend interface. In the future, as we see more uses of the ingestion library we can come up with some helper functions which will reduce boilerplate.

I think if we can provide this sdk mechanism up front for automating the streaming of ledger tx-meta it will be worthwhile for demonstrating the DX during the MVP timeframe as it lowers resistance for app development(DX) to adopt CDP approach of transforming network data to derived models in a pipeline. Apps avoid investing in boilerplate(ledgerbackend, GetLedger iteration, etc) and they get stellar tx-meta 'source of origin' operator(publisher) to use in their pipeline out-of-box.

@sreuland sreuland added this to the platform sprint 50 milestone Aug 30, 2024
@sreuland sreuland changed the title ingest/pipeline: Update LedgerBackend to support functional pipeline producer ingest/pipeline: Create functional producer for BufferedStorageBackend Sep 3, 2024
@sreuland sreuland self-assigned this Sep 6, 2024
@sreuland
Copy link
Contributor Author

sreuland commented Sep 16, 2024

@chowbao , @tamirms @urvisavla

are there any known BufferedStorage settings based on benchmarks that we feel good to provide as a default constant in the sdk, i.e. clients can use it for sanity check/reference and to quickly get moving initially, tuning later if they need:

buffer_size = ?  
num_workers = ?      
retry_limit = ?      
retry_wait = "30s"

could encapsulate functionally in sdk as func DefaultBufferedStorageBackendConfig() *BufferedStorageBackendConfig

@urvisavla
Copy link
Contributor

urvisavla commented Sep 16, 2024

@chowbao , @tamirms @urvisavla

are there any known BufferedStorage settings based on benchmarks that we feel good to provide as a default constant in the sdk, i.e. clients can use it for sanity check/reference and to quickly get moving initially, tuning later if they need:

buffer_size = ?  
num_workers = ?      
retry_limit = ?      
retry_wait = "30s"

Here's a summary of the recommended configuration for buffer size and number of workers based on my analysis:

For a small number of ledgers_per_file (1 LedgersPerFile):
buffer_size: 100 to 500 and num_workers: 5

For a medium number of ledgers_per_file (100 LedgersPerFile):
buffer_size: 10 and num_workers: 5

For a large number of ledgers_per_file (1000 LedgersPerFile):
buffer_size: 10 and num_workers: 1 to 2

You can find the detailed numbers and results here. One thing to note is that these tests were run on my local machine so actual times may vary depending on hardware but the relative config recommendation should remain same.

As for retry_limit and retry_wait, these values aren't dependent on other parameters so imo a reasonable values of retry limit=3 to 5 and retry_wait=30s should be good.

Let me know if you need any additional info. Thanks!

sreuland added a commit to sreuland/go that referenced this issue Sep 25, 2024
sreuland added a commit to sreuland/go that referenced this issue Sep 25, 2024
sreuland added a commit to sreuland/go that referenced this issue Sep 26, 2024
sreuland added a commit to sreuland/go that referenced this issue Sep 26, 2024
sreuland added a commit to sreuland/go that referenced this issue Sep 27, 2024
sreuland added a commit to sreuland/go that referenced this issue Sep 30, 2024
sreuland added a commit to sreuland/go that referenced this issue Sep 30, 2024
@github-project-automation github-project-automation bot moved this from Needs Review to Done in Platform Scrum Oct 3, 2024
sreuland added a commit to sreuland/go that referenced this issue Oct 8, 2024
sreuland added a commit to sreuland/go that referenced this issue Oct 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants