Skip to content

Commit

Permalink
feat(ingest): additional limits on ingestProposalBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Dec 13, 2024
1 parent 50a7560 commit 6d70c4b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
21 changes: 17 additions & 4 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,18 @@
os.getenv("DATAHUB_REST_EMITTER_DEFAULT_RETRY_MAX_TIMES", "4")
)

# The limit is 16mb. We will use a max of 15mb to have some space for overhead.
_MAX_BATCH_INGEST_PAYLOAD_SIZE = 15 * 1024 * 1024
# The limit is 16mb. We will use a max of 15mb to have some space
# for overhead like request headers.
# This applies to pretty much all calls to GMS.
INGEST_MAX_PAYLOAD_BYTES = 15 * 1024 * 1024

# This limit is somewhat arbitrary. All GMS endpoints will timeout
# and return a 500 if processing takes too long. To avoid sending
# too much to the backend and hitting a timeout, we try to limit
# the number of MCPs we send in a batch.
BATCH_INGEST_MAX_PAYLOAD_LENGTH = int(
os.getenv("DATAHUB_REST_EMITTER_BATCH_MAX_PAYLOAD_LENGTH", 200)
)


class DataHubRestEmitter(Closeable, Emitter):
Expand Down Expand Up @@ -290,11 +300,14 @@ def emit_mcps(
# As a safety mechanism, we need to make sure we don't exceed the max payload size for GMS.
# If we will exceed the limit, we need to break it up into chunks.
mcp_obj_chunks: List[List[str]] = []
current_chunk_size = _MAX_BATCH_INGEST_PAYLOAD_SIZE
current_chunk_size = INGEST_MAX_PAYLOAD_BYTES
for mcp_obj in mcp_objs:
mcp_obj_size = len(json.dumps(mcp_obj))

if mcp_obj_size + current_chunk_size > _MAX_BATCH_INGEST_PAYLOAD_SIZE:
if (
mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES
or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH
):
mcp_obj_chunks.append([])
current_chunk_size = 0
mcp_obj_chunks[-1].append(mcp_obj)
Expand Down
13 changes: 12 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.emitter.rest_emitter import (
BATCH_INGEST_MAX_PAYLOAD_LENGTH,
DataHubRestEmitter,
)
from datahub.ingestion.api.common import RecordEnvelope, WorkUnit
from datahub.ingestion.api.sink import (
NoopWriteCallback,
Expand Down Expand Up @@ -71,6 +74,14 @@ class DatahubRestSinkConfig(DatahubClientConfig):
# Only applies in async batch mode.
max_per_batch: pydantic.PositiveInt = 100

@pydantic.validator("max_per_batch", always=True)
def validate_max_per_batch(cls, v):
if v > BATCH_INGEST_MAX_PAYLOAD_LENGTH:
raise ValueError(
f"max_per_batch must be less than or equal to {BATCH_INGEST_MAX_PAYLOAD_LENGTH}"
)
return v


@dataclasses.dataclass
class DataHubRestSinkReport(SinkReport):
Expand Down

0 comments on commit 6d70c4b

Please sign in to comment.