Skip to content

Commit

Permalink
Add missed out group_instance_id as subscriber and router parameter (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kumaranvpl authored Aug 20, 2024
1 parent 9f22ff6 commit 10d589d
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 1 deletion.
53 changes: 53 additions & 0 deletions faststream/confluent/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ def subscriber(
"""
),
] = None,
group_instance_id: Annotated[
Optional[str],
Doc(
"""
A unique string that identifies the consumer instance.
If set, the consumer is treated as a static member of the group
and does not participate in consumer group management (e.g.
partition assignment, rebalances). This can be used to assign
partitions to specific consumers, rather than letting the group
assign partitions based on consumer metadata.
"""
),
] = None,
fetch_max_wait_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -346,6 +359,19 @@ def subscriber(
"""
),
] = None,
group_instance_id: Annotated[
Optional[str],
Doc(
"""
A unique string that identifies the consumer instance.
If set, the consumer is treated as a static member of the group
and does not participate in consumer group management (e.g.
partition assignment, rebalances). This can be used to assign
partitions to specific consumers, rather than letting the group
assign partitions based on consumer metadata.
"""
),
] = None,
fetch_max_wait_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -615,6 +641,19 @@ def subscriber(
"""
),
] = None,
group_instance_id: Annotated[
Optional[str],
Doc(
"""
A unique string that identifies the consumer instance.
If set, the consumer is treated as a static member of the group
and does not participate in consumer group management (e.g.
partition assignment, rebalances). This can be used to assign
partitions to specific consumers, rather than letting the group
assign partitions based on consumer metadata.
"""
),
] = None,
fetch_max_wait_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -887,6 +926,19 @@ def subscriber(
"""
),
] = None,
group_instance_id: Annotated[
Optional[str],
Doc(
"""
A unique string that identifies the consumer instance.
If set, the consumer is treated as a static member of the group
and does not participate in consumer group management (e.g.
partition assignment, rebalances). This can be used to assign
partitions to specific consumers, rather than letting the group
assign partitions based on consumer metadata.
"""
),
] = None,
fetch_max_wait_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -1150,6 +1202,7 @@ def subscriber(
max_records=max_records,
group_id=group_id,
connection_data={
"group_instance_id": group_instance_id,
"fetch_max_wait_ms": fetch_max_wait_ms,
"fetch_max_bytes": fetch_max_bytes,
"fetch_min_bytes": fetch_min_bytes,
Expand Down
5 changes: 4 additions & 1 deletion faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ def __init__(
self.config: Dict[str, Any] = {} if config is None else dict(config)

if group_id is None:
group_id = "faststream-consumer-group"
group_id = self.config.get("group.id", "faststream-consumer-group")

if group_instance_id is None:
group_instance_id = self.config.get("group.instance.id", None)

if isinstance(bootstrap_servers, Iterable) and not isinstance(
bootstrap_servers, str
Expand Down
53 changes: 53 additions & 0 deletions faststream/confluent/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,19 @@ def subscriber(
"""
),
] = None,
group_instance_id: Annotated[
Optional[str],
Doc(
"""
A unique string that identifies the consumer instance.
If set, the consumer is treated as a static member of the group
and does not participate in consumer group management (e.g.
partition assignment, rebalances). This can be used to assign
partitions to specific consumers, rather than letting the group
assign partitions based on consumer metadata.
"""
),
] = None,
fetch_max_wait_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -1010,6 +1023,19 @@ def subscriber(
"""
),
] = None,
group_instance_id: Annotated[
Optional[str],
Doc(
"""
A unique string that identifies the consumer instance.
If set, the consumer is treated as a static member of the group
and does not participate in consumer group management (e.g.
partition assignment, rebalances). This can be used to assign
partitions to specific consumers, rather than letting the group
assign partitions based on consumer metadata.
"""
),
] = None,
fetch_max_wait_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -1388,6 +1414,19 @@ def subscriber(
"""
),
] = None,
group_instance_id: Annotated[
Optional[str],
Doc(
"""
A unique string that identifies the consumer instance.
If set, the consumer is treated as a static member of the group
and does not participate in consumer group management (e.g.
partition assignment, rebalances). This can be used to assign
partitions to specific consumers, rather than letting the group
assign partitions based on consumer metadata.
"""
),
] = None,
fetch_max_wait_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -1783,6 +1822,19 @@ def subscriber(
"""
),
] = None,
group_instance_id: Annotated[
Optional[str],
Doc(
"""
A unique string that identifies the consumer instance.
If set, the consumer is treated as a static member of the group
and does not participate in consumer group management (e.g.
partition assignment, rebalances). This can be used to assign
partitions to specific consumers, rather than letting the group
assign partitions based on consumer metadata.
"""
),
] = None,
fetch_max_wait_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -2166,6 +2218,7 @@ def subscriber(
polling_interval=polling_interval,
partitions=partitions,
group_id=group_id,
group_instance_id=group_instance_id,
fetch_max_wait_ms=fetch_max_wait_ms,
fetch_max_bytes=fetch_max_bytes,
fetch_min_bytes=fetch_min_bytes,
Expand Down
14 changes: 14 additions & 0 deletions faststream/confluent/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@ def __init__(
"""
),
] = None,
group_instance_id: Annotated[
Optional[str],
Doc(
"""
A unique string that identifies the consumer instance.
If set, the consumer is treated as a static member of the group
and does not participate in consumer group management (e.g.
partition assignment, rebalances). This can be used to assign
partitions to specific consumers, rather than letting the group
assign partitions based on consumer metadata.
"""
),
] = None,
fetch_max_wait_ms: Annotated[
int,
Doc(
Expand Down Expand Up @@ -417,6 +430,7 @@ def __init__(
partitions=partitions,
polling_interval=polling_interval,
group_id=group_id,
group_instance_id=group_instance_id,
fetch_max_wait_ms=fetch_max_wait_ms,
fetch_max_bytes=fetch_max_bytes,
fetch_min_bytes=fetch_min_bytes,
Expand Down

0 comments on commit 10d589d

Please sign in to comment.