diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index ef2082b95330b..e2bc14925ad38 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -46,8 +46,18 @@ os.getenv("DATAHUB_REST_EMITTER_DEFAULT_RETRY_MAX_TIMES", "4") ) -# The limit is 16mb. We will use a max of 15mb to have some space for overhead. -_MAX_BATCH_INGEST_PAYLOAD_SIZE = 15 * 1024 * 1024 +# The limit is 16mb. We will use a max of 15mb to have some space +# for overhead like request headers. +# This applies to pretty much all calls to GMS. +INGEST_MAX_PAYLOAD_BYTES = 15 * 1024 * 1024 + +# This limit is somewhat arbitrary. All GMS endpoints will timeout +# and return a 500 if processing takes too long. To avoid sending +# too much to the backend and hitting a timeout, we try to limit +# the number of MCPs we send in a batch. +BATCH_INGEST_MAX_PAYLOAD_LENGTH = int( + os.getenv("DATAHUB_REST_EMITTER_BATCH_MAX_PAYLOAD_LENGTH", 200) +) class DataHubRestEmitter(Closeable, Emitter): @@ -290,11 +300,14 @@ def emit_mcps( # As a safety mechanism, we need to make sure we don't exceed the max payload size for GMS. # If we will exceed the limit, we need to break it up into chunks. mcp_obj_chunks: List[List[str]] = [] - current_chunk_size = _MAX_BATCH_INGEST_PAYLOAD_SIZE + current_chunk_size = INGEST_MAX_PAYLOAD_BYTES for mcp_obj in mcp_objs: mcp_obj_size = len(json.dumps(mcp_obj)) - if mcp_obj_size + current_chunk_size > _MAX_BATCH_INGEST_PAYLOAD_SIZE: + if ( + mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES + or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH + ): mcp_obj_chunks.append([]) current_chunk_size = 0 mcp_obj_chunks[-1].append(mcp_obj) diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 1bb07ea846227..209efbbb90feb 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -18,7 +18,10 @@ ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import mcps_from_mce -from datahub.emitter.rest_emitter import DataHubRestEmitter +from datahub.emitter.rest_emitter import ( + BATCH_INGEST_MAX_PAYLOAD_LENGTH, + DataHubRestEmitter, +) from datahub.ingestion.api.common import RecordEnvelope, WorkUnit from datahub.ingestion.api.sink import ( NoopWriteCallback, @@ -71,6 +74,14 @@ class DatahubRestSinkConfig(DatahubClientConfig): # Only applies in async batch mode. max_per_batch: pydantic.PositiveInt = 100 + @pydantic.validator("max_per_batch", always=True) + def validate_max_per_batch(cls, v): + if v > BATCH_INGEST_MAX_PAYLOAD_LENGTH: + raise ValueError( + f"max_per_batch must be less than or equal to {BATCH_INGEST_MAX_PAYLOAD_LENGTH}" + ) + return v + @dataclasses.dataclass class DataHubRestSinkReport(SinkReport):