Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamConsumer not waking #665

Open
trtt opened this issue Mar 30, 2024 · 5 comments
Open

StreamConsumer not waking #665

trtt opened this issue Mar 30, 2024 · 5 comments

Comments

@trtt
Copy link
Contributor

trtt commented Mar 30, 2024

This StreamConsumer racy behavior can be observed in the recent release, probably after the move to Event-based API.
Some of the events (stats, logs etc.) are processed internally and are consumed by the poll returning None. Since librdkafka only wakes the queue when it transitions from empty -> non-empty, waker might be not called at any point in the future in this case.

Event order example:

  1. new message event
  2. new stats event
  3. poll processes stats and returns None
  4. StreamConsumer sets up waker and returns Poll::Pending
  5. waker is never called by librdkafka since the queue never went empty

Probably connected issue: #638

@trtt
Copy link
Contributor Author

trtt commented Mar 30, 2024

I sketched out a potential solution to the problem: #666

@sukhmel
Copy link

sukhmel commented Nov 27, 2024

I was experiencing something that seems an instance of this issue, if I have statistics.interval.ms set and sleep for longer time before consuming messages with StreamConsumer<LatestStatisicsContext>.

This problem is gone with [email protected], thank you for the effort of fixing it. Maybe this issue can be closed?

@1990heidou
Copy link

Using version 0.35 will result in the same #638 issue.
Upgrading to version 0.37 will still encounter the problem of being unable to continue consuming, but not outputting :
WARN librdkafka: librdkafka: MAXPOLL [thrd:main]: Application maximum poll interval (300000ms) exceeded by ??? ms
image

fn consumer_config(config_overrides: Option<HashMap<&str, &str>>) -> ClientConfig {
    let mut config = ClientConfig::new();

    config.set("group.id", "group_id");
    config.set("bootstrap.servers", "localhost")
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "60000")
        .set("enable.auto.commit", "true")
        .set("max.poll.interval.ms", "60000")
        .set("fetch.wait.max.ms", "50")
        .set("log.thread.name", "false")
        .set("auto.offset.reset", "earliest");

    if let Some(overrides) = config_overrides {
        for (key, value) in overrides {
            config.set(key, value);
        }
    }

    config
}

pub async fn consume_otel_spans(config: &Config, kafka_name: &str, tx: &Sender<Vec<u8>>, mut cancel_single: watch::Receiver<String>) {
    let kafka = config.get_kafka_config_with_name(kafka_name);
    let topic = &kafka.topic;
    let mut consumer = init_kafka_consumer(&kafka);
    consumer
        .subscribe(&[topic])
        .expect("Can't subscribe to specified topics");

    loop {
        tokio::select! {
         biased;
            _ = cancel_single.changed() => {
                info!("Stop consume msg from kafka after cancel single!!");
                break;
            },
            result = consumer.recv() => {
                match result {
                    Ok(msg) => {
                        if let Some(payload) = msg.payload() {
                            tx.send(payload.to_vec()).unwrap_or_else(|e| error!("Send kafka msg to channel failed, err: {}", e));
                        }
                    },
                    Err(e) => {
                        error!("Failed to receive message from Kafka: {}", e);
                        if should_recreate_consumer(&e) {
                            info!("Recreating Kafka consumer...");
                            consumer = init_kafka_consumer(&kafka);
                            consumer
                                .subscribe(&[topic])
                                .expect("Can't resubscribe to specified topics");
                        } else {
                            // Handle other errors as needed
                        }
                    }
                }
            }
        }
    }
}

@sukhmel
Copy link

sukhmel commented Dec 2, 2024

@1990heidou I'm not sure if this is the same issue, I usually encounter max poll timeout exceeded when I don't poll the consumer at all, and the issue we're commenting on doesn't produce errors in the log.

@1990heidou
Copy link

@1990heidou I'm not sure if this is the same issue, I usually encounter max poll timeout exceeded when I don't poll the consumer at all, and the issue we're commenting on doesn't produce errors in the log.
I found that after a pod encounters the above problem, the data in the partition originally consumed by this pod will be continued to be consumed by other pods, as if this pod has disappeared

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants