-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingestion): fix stateful ingestion for GCS source #11879
base: master
Are you sure you want to change the base?
Conversation
@@ -88,7 +89,10 @@ def __init__(self, config: GCSSourceConfig, ctx: PipelineContext): | |||
super().__init__(config, ctx) | |||
self.config = config | |||
self.report = GCSSourceReport() | |||
self.s3_source = self.create_equivalent_s3_source(ctx) | |||
self.platform: str = "gcs" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this constant is available
self.platform: str = "gcs" | |
self.platform: str = PLATFORM_GCS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed, thank you!
s3_ctx = copy.deepcopy(ctx) | ||
s3_ctx.pipeline_name = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may move these two lines inside create_equivalent_s3_source
Also, how is s3_ctx.pipeline_name = None
is required?
Note pipeline_name
is required when stateful_ingestion
is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for your feedback!
Without this, running stateful ingestion leads to IndexError: Checkpointing provider DatahubIngestionCheckpointingProvider already registered.
. This is because StatefulIngestionSourceBase
is initialized twice, first for GCSSource
and then in create_equivalent_s3_source
with S3Source
. This can also be verified by the change I did in the unit test.
I think, stateful ingestion was always meant to be switched off for the equivalent S3 source object, because in create_equivalent_s3_config
, DataLakeSourceConfig
is initialized without stateful_ingestion
, which implicitly means stateful_ingestion=None
. Since pipeline_name
is set, the logic in StateProviderWrapper
leads to stateful ingestion being switched on, which leads to the error.
I put the lines in the function, as suggested.
Remove pipeline name before passing context to equivalent s3 source to avoid error "Checkpointing provider DatahubIngestionCheckpointingProvider already registered."
Remove pipeline name before passing context to equivalent s3 source to avoid error "Checkpointing provider DatahubIngestionCheckpointingProvider already registered." Fixes this issue
Checklist