You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am using kafka-python in AWS Lambda for the purpose of producing messages to a Kafka cluster. My lambda is ran every 5 minutes, and only for a few ms at a time. The execution environment is paused between each lambda invocation, including background threads. AWS recommends you keep database connections and similar open between executions, so you don't have to re-establish them before each invocation. See https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown
kafka-python currently doesn't handle this kind of execution environment that well. If the connection takes several lambda executions to establish, even if the connection state progresses towards being connected, as long as it isn't CONNECTED by the end of conn.py's connect() (https://github.com/dpkp/kafka-python/blob/master/kafka/conn.py#L359), the connection will time out. This function is for all practical intents and purposes called in a loop until the connection is established or fails.
The naive fix is to increase request_timeout_ms. However, this configuration variable is shared with other features, such as sending messages. When in lambda, it can be useful to allow kafka-python to spend a long time connecting, but have it fail quickly when sending in order to quickly detect a broken connection. We flush kafka-python at the end of every lambda execution using a decorator, which should force the library to attempt establishing connection and then sending if needed.
Suggested fix
I suggest two fixes that together help minimize the problem:
Add a separate configuration variable, connection_timeout_ms, that optionally configures the timeout of establishing connections. If not set, it can default to request_timeout_ms, to ensure backwards compatability. This ensures we can be extra lenient with establishing connections when we are running in Lambda.
Update self.last_attempt multiple times during the connection phase, such that we become more tolerant of the connection taking time to establish as long as progress is made. The variable could probably need a rename to reflect the change in meaning.
These two fixes have been implemented and tested with my lambdas and solve the issue for me. I can create a PR with them, or discuss other solutions.
Thanks!
Liam
The text was updated successfully, but these errors were encountered:
In addition to this, the connection is closed from the Kafka-side after 10 minutes(connections.max.idle.ms controls this). This is no biggie because the connection can be re-established, but it creates some noise in our logs. The naive solution here is to increase the timeout(This increases the chance that one of the lambda invocations gives the kafka thread enough time to send some packets), but is there a better option? A wait_for_stable_connections function sounds nice, but would probably problematic in the case of a Kafka outage. Disabling errors on connections closed from the other side?
I am using
kafka-python
in AWS Lambda for the purpose of producing messages to a Kafka cluster. My lambda is ran every 5 minutes, and only for a few ms at a time. The execution environment is paused between each lambda invocation, including background threads. AWS recommends you keep database connections and similar open between executions, so you don't have to re-establish them before each invocation. See https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdownkafka-python
currently doesn't handle this kind of execution environment that well. If the connection takes several lambda executions to establish, even if the connection state progresses towards being connected, as long as it isn'tCONNECTED
by the end ofconn.py
'sconnect()
(https://github.com/dpkp/kafka-python/blob/master/kafka/conn.py#L359), the connection will time out. This function is for all practical intents and purposes called in a loop until the connection is established or fails.The naive fix is to increase
request_timeout_ms
. However, this configuration variable is shared with other features, such as sending messages. When in lambda, it can be useful to allow kafka-python to spend a long time connecting, but have it fail quickly when sending in order to quickly detect a broken connection. We flushkafka-python
at the end of every lambda execution using a decorator, which should force the library to attempt establishing connection and then sending if needed.Suggested fix
I suggest two fixes that together help minimize the problem:
connection_timeout_ms
, that optionally configures the timeout of establishing connections. If not set, it can default torequest_timeout_ms
, to ensure backwards compatability. This ensures we can be extra lenient with establishing connections when we are running in Lambda.self.last_attempt
multiple times during the connection phase, such that we become more tolerant of the connection taking time to establish as long as progress is made. The variable could probably need a rename to reflect the change in meaning.These two fixes have been implemented and tested with my lambdas and solve the issue for me. I can create a PR with them, or discuss other solutions.
Thanks!
Liam
The text was updated successfully, but these errors were encountered: