Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting ValueError: Invalid file object: None in check_version when connecting #2450

Open
dgoldenberg-ias opened this issue Oct 31, 2024 · 2 comments

Comments

@dgoldenberg-ias
Copy link

Environment

kafka-python = "^2.0.2"
python = Python 3.10.15

We're using AWS MSK - Kafka version 3.5.1.

Code

import traceback

from kafka import KafkaAdminClient

BOOTSTRAP_SERVERS = (
    "srv1.us-east-1.amazonaws.com:9092"
    ",srv2.us-east-1.amazonaws.com:9092"
    ",srv3.us-east-1.amazonaws.com:9092"
)

def run_util() -> None:
    admin_client = None
    try:
        # Create an admin client
        print(f">> Connecting to: '{BOOTSTRAP_SERVERS}'...")
        # api_version=(3, 5, 1)
        admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)

        # Retrieve and print the list of topics
        topics = admin_client.list_topics()
        print("Kafka Topics:")
        for topic in topics:
            print(f"- {topic}")

    except Exception as e:
        print(f"An error occurred while creating the admin client: {e}")
        traceback.print_exc()
    finally:
        # Close the admin client if it was created
        if admin_client:
            try:
                admin_client.close()
            except Exception as close_e:
                print(f"Error while closing the admin client: {close_e}")
                traceback.print_exc()

if __name__ == "__main__":
    run_util()

Observations

When I run this code locally, I get the below error. Curiously, when running it from a databricks notebook, I'm not getting the error. Python version there is 3.10.12.

Error stack trace

An error occurred while creating the admin client: Invalid file object: None
Traceback (most recent call last):
  File "/msk_proj/with_kafka_python/msk_kp_util_list_topics.py", line 49, in run_util
    admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/.venv310/lib/python3.10/site-packages/kafka/admin/client.py", line 208, in __init__
    self._client = KafkaClient(metrics=self._metrics,
  File "/.venv310/lib/python3.10/site-packages/kafka/client_async.py", line 244, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/.venv310/lib/python3.10/site-packages/kafka/client_async.py", line 909, in check_version
    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
  File "/.venv310/lib/python3.10/site-packages/kafka/conn.py", line 1254, in check_version
    selector.register(self._sock, selectors.EVENT_READ)
  File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 518, in register
    key = super().register(fileobj, events, data)
  File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 239, in register
    key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
  File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 226, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 39, in _fileobj_to_fd
    raise ValueError("Invalid file object: "
ValueError: Invalid file object: None

Specifying the Kafka API version explicitly

Specifying the Kafka API version explicitly:

admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, api_version=(3, 5, 1))

Same error.

Network?

Seems OK.

telnet srv1.us-east-1.amazonaws.com 9092
Trying 10.34.11.23...
Connected to srv1.amazonaws.com.
Escape character is '^]'.

Basic socket operations

Used the below code to verify sockets are working fine.

import socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('srv1.us-east-1.amazonaws.com', 9092))
    print("Connection successful!")
except Exception as e:
    print(f"Socket connection failed: {e}")
finally:
    sock.close()

The exception occurs here:

selector.register(self._sock, selectors.EVENT_READ)

in the check_version method.

        for version, request in test_cases:
            if not self.connect_blocking(timeout_at - time.time()):
                reset_override_configs()
                raise Errors.NodeNotReadyError()
            f = self.send(request)
            # HACK: sleeping to wait for socket to send bytes
            time.sleep(0.1)
            # when broker receives an unrecognized request API
            # it abruptly closes our socket.
            # so we attempt to send a second request immediately
            # that we believe it will definitely recognize (metadata)
            # the attempt to write to a disconnected socket should
            # immediately fail and allow us to infer that the prior
            # request was unrecognized
            mr = self.send(MetadataRequest[0](topics))

            selector = self.config['selector']()
            selector.register(self._sock, selectors.EVENT_READ)

Any cause or workaround? Thanks.

@dgoldenberg-ias
Copy link
Author

dgoldenberg-ias commented Oct 31, 2024

Enabled logging enabled and increased ulimit to 1024 locally.

@dgoldenberg-ias
Copy link
Author

What I'm seeing in the log:

INFO - <BrokerConnection node_id=bootstrap-0 host=srv3.us-east-1.amazonaws.com:9092 <connecting> [IPv4 ('XX.XX.X.XXX', 9092)]>: connecting to srv3.us-east-1.amazonaws.com:9092 [('XX.XX.X.XXX', 9092) IPv4]

INFO - Probing node bootstrap-0 broker version
INFO - <BrokerConnection node_id=bootstrap-0 host=srv3.us-east-1.amazonaws.com:9092 <connecting> [IPv4 ('XX.XX.X.XXX', 9092)]>: Connection complete.

ERROR - Error sending request data to <BrokerConnection node_id=bootstrap-0 host=srv3.us-east-1.amazonaws.com:9092 <connected> [IPv4 ('XX.XX.X.XXX', 9092)]>
Traceback (most recent call last):
  File "/msk-proj/.venv310/lib/python3.10/site-packages/kafka/conn.py", line 998, in send_pending_requests
    total_bytes = self._send_bytes_blocking(data)
  File "/msk-proj/.venv310/lib/python3.10/site-packages/kafka/conn.py", line 601, in _send_bytes_blocking
    sent_bytes = self._sock.send(data[total_sent:])
BrokenPipeError: [Errno 32] Broken pipe

It seems to me the issue comes down to this code:

(

# HACK: sleeping to wait for socket to send bytes
)

            if not self.connect_blocking(timeout_at - time.time()):
                reset_override_configs()
                raise Errors.NodeNotReadyError()
            f = self.send(request)
            # HACK: sleeping to wait for socket to send bytes
            time.sleep(0.1)
            # when broker receives an unrecognized request API
            # it abruptly closes our socket.
            # so we attempt to send a second request immediately
            # that we believe it will definitely recognize (metadata)
            # the attempt to write to a disconnected socket should
            # immediately fail and allow us to infer that the prior
            # request was unrecognized
            mr = self.send(MetadataRequest[0](topics))

Even increasing this sleep to 1 or even 10 is not helping, in my local setup. Any ideas as to how to fix this or work around it?

@dgoldenberg-ias dgoldenberg-ias changed the title Getting ValueError: Invalid file object: None Getting ValueError: Invalid file object: None in check_version when connecting Nov 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant