Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem handling Kafka exceptions #54

Open
areeda opened this issue Mar 17, 2022 · 16 comments
Open

Problem handling Kafka exceptions #54

areeda opened this issue Mar 17, 2022 · 16 comments

Comments

@areeda
Copy link
Contributor

areeda commented Mar 17, 2022

I am still unsure of the underlying cause and whether this problem is endemic to all installations but it is repeatable on my development workstation, an Intel iMac at home.

The symptom is a slew of errors like:

%5|1647498302.003|REQTMOUT|rdkafka#consumer-2| [thrd:sasl_ssl://kb-2.prod.hop.scimma.org:9092/2]: sasl_ssl://kb-2.prod.hop.scimma.org:9092/2: Timed out FetchRequest in flight (after 1799295ms, timeout #0)
<snip>
%4|1647498302.005|REQTMOUT|rdkafka#consumer-2| [thrd:sasl_ssl://kb-0.prod.hop.scimma.org:9092/0]: sasl_ssl://kb-0.prod.hop.scimma.org:9092/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1647498302.005|FAIL|rdkafka#consumer-2| [thrd:sasl_ssl://kb-0.prod.hop.scimma.org:9092/0]: sasl_ssl://kb-0.prod.hop.scimma.org:9092/0: 1 request(s) timed out: disconnect (after 115074987ms in state UP)

The Exception is raised in: /Users/areeda/mambaforge/envs/igwn-py39/lib/python3.9/site-packages/adc/errors.py line 12:

My problem is that this kills the main thread of our DQR alert listener without any way to catch the exception.

I have a merge request going in next.

@cnweaver
Copy link
Collaborator

Can you describe a bit further the circumstances under which this happens? I don't think I've come across anyone having quite this problem listening to the Hopskotch brokers before, and we have at least one listener (hopbeat-monitor) that we leave running for quite extended periods. I also do my development on Intel Macs, and while I haven't seen this myself I'd be interrested to try to recreate it to better understand the context of your PR.

@areeda
Copy link
Contributor Author

areeda commented Mar 18, 2022

Thank you for looking into this. I wonder if my use of multiprocessing has anything to do with it?

The listener is fairly simple following the outline in the documentation. When an alert is received it is filtered by alert type and FAR. If it passes a JSON file with the payload is written to disk and a dict is added to a Queue. There are multiple worker processes which ingest that Queue and construct a command line to generate and submit a Condor DAG to create a Data Quality report.

What I've been seeing is the Kafka error between 5 and oh 30 hrs of running. During that time we process an mdc_superevent every hour.

The setup has been running on ldas-pcdev4 at CIT for 52 hrs without a problem.

Another possibility I just thought of is that I have DHCP set up on the Mac with both WiFi and wired connections. The DHCP server set the IP for both based on MAC address so it doesn't change wonder if lease renewal is somehow messing with hop/SASL. I will force a renewal to check.

The code is pre-release but available with LIGO credentials at [email protected]:joseph-areeda/dqralert.git

@areeda
Copy link
Contributor Author

areeda commented Mar 18, 2022

One more detail: it has been running in the igwn-py39 environment.

@cnweaver
Copy link
Collaborator

cnweaver commented Mar 18, 2022

Given that librdkafka uses multithreading internally, using multiprocessing does sound a bit dangerous; it might be worthwhile to test with multiprocessing.set_start_method('forkserver'), which should be safer, to try to rule that out. EDIT: However, I would generally expect only the child processes to be in danger of starting in broken states, not the parent, so this probably isn't the cause.

I'm not a member of LIGO, so I can't access any of its internal resources.

@areeda
Copy link
Contributor Author

areeda commented Mar 18, 2022

Thank you I'll try it.
I was going to wait for release but this can be put up on github if you want to look.

@areeda
Copy link
Contributor Author

areeda commented Mar 18, 2022

Note to self: Changing the start method from spawn to fork interferes with running under the debugger in PyCharm. but not running on the command line.

Running with the "fork" method (on the command line) had many more of those Kafka errors. Not exactly proof but it sure smells like a thread-safety problem.

The good news is that the PR seems to allow the program to recover.

I will redesign my program to remove multiprocessing.

@areeda
Copy link
Contributor Author

areeda commented Mar 18, 2022

I don't know if this is a good place to ask but how are people responding to alerts that come in at random times with strict latency requirements?
With multiprocessing off the table, and multithreading locking when using os.system() I can think of a couple of ideas:

  • Eat the latency for the processing job, hoping it will be quick. Generating and submitting a Condor DAG is only a few seconds but if users add something to wait for data to appear or preprocess this could change.
  • We could open a named pipe and do all the multiprocessing in another program.
  • We could write the alert payload to a file and have another program looking for it.

I'm curious how others are addressing this.

@areeda
Copy link
Contributor Author

areeda commented Mar 21, 2022

We removed the multitasking option from the process but using multiprocessing.run to execute the external program to process the alert. I can report 75 hours of processing alert on my development Mac. On the LIGO clusters we have seen 6 days without the error.

Taken in aggregate this is enough circumstantial evidence to conclude that multiprocessing is incompatible with the libraries.

I will leave this pull request open for the maintainers to decide if Kafka runtime exceptions should be passed to the calling programs.

I do appreciate the help from cnweaver. The change in frequency of the error by switching from spawn to fork was the key piece of evidence for me.

@cnweaver
Copy link
Collaborator

I'm surprised that the 'forkserver' multiprocessing mehtod isn't a safely viable option. Unfortunately, I don't think I have any other particularly good ideas for for doing non-trivial processing from the same process without introducing delays in receiving further messages. I'll discuss with the rest of the SCiMMA team and see if we can think of anything.

I will still review the associated PR, hopefully tomorrow.

@areeda
Copy link
Contributor Author

areeda commented Mar 22, 2022

It is interesting that the problem seems to only occur on my Mac but not on the Scientific Linux machines at Caltech.

I'm assuming that it is explained by the random nature of thread safety problems. The exact same [source] code is used in both places.

@areeda
Copy link
Contributor Author

areeda commented Mar 24, 2022

We received a suggestion for how to handle simultaneous alerts.

Kafaka provides the concept of a consumer group that will distribute events across a group of listeners. My application has to wait for the package between us and adc, kafka to expose it but it will allow a very clean way to process multiple simultaneous events by simply running multiple listeners.

@cnweaver
Copy link
Collaborator

Okay, that does sound like a reasonable approach, and adc-streaming should already allow normal use of consumer groups.

@areeda
Copy link
Contributor Author

areeda commented Mar 30, 2022

After running for 20 days we got one of these errors on my Mac. Still running under Scientific Linux without seeing it.
I've been using the Merge Request version and our program recovered nicely.

%3|1648594740.001|FAIL|rdkafka#consumer-2| [thrd:sasl_ssl://kb-0.prod.hop.scimma.org:9092/0]: sasl_ssl://kb-0.prod.hop.scimma.org:9092/0: Receive failed: SSL transport error: Operation timed out (after 280131137ms in state UP, 1 identical error(s) suppressed)
03-29 15:59:00 - ERROR: internal kafka error: KafkaError{code=_TRANSPORT,val=-195,str="sasl_ssl://kb-0.prod.hop.scimma.org:9092/0: Receive failed: SSL transport error: Operation timed out (after 280131137ms in state UP, 1 identical error(s) suppressed)"}
%3|1648594742.030|FAIL|rdkafka#consumer-2| [thrd:sasl_ssl://kb-1.prod.hop.scimma.org:9092/1]: sasl_ssl://kb-1.prod.hop.scimma.org:9092/1: Receive failed: SSL transport error: Operation timed out (after 69725729ms in state UP, 1 identical error(s) suppressed)
03-29 15:59:02 - ERROR: internal kafka error: KafkaError{code=_TRANSPORT,val=-195,str="sasl_ssl://kb-1.prod.hop.scimma.org:9092/1: Receive failed: SSL transport error: Operation timed out (after 69725729ms in state UP, 1 identical error(s) suppressed)"}

I am not sure what this means.

@cnweaver
Copy link
Collaborator

cnweaver commented Apr 1, 2022

I have merged the corresponding PR (sorry it took so long!). I'm not quite sure when we'll get this into a release, as the 2.1.0 release ended up in a slightly odd state due to upstream issues, so I will leave this issue open at least until we get that out.

@deepchatterjeeligo
Copy link

Hi @cnweaver I am wondering about the status of this issue since we are seeing this -195 error

KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}

still occasionally (with v2.3.0).

@cnweaver
Copy link
Collaborator

Unfortunately, librdkafka produces this error very readily, and with very little transparency to debug its root cause. (In fairness to librdkafka, one of the major causes of these errors is lack of network connectivity, which it can't really do anything about, and which may be a transient problem.) As such, it's very hard to say whether you're seeing something related to the multiprocessing issue mostly discussed in this thread, or not. Any contextual information you can give which might help us narrow the cause down would be valuable; unfortunately I do not myself have any very good ideas for how to obtain such.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants