Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chose producer_cls globally and for each producer #38

Open
Gerleff opened this issue May 22, 2023 · 4 comments
Open

Chose producer_cls globally and for each producer #38

Gerleff opened this issue May 22, 2023 · 4 comments

Comments

@Gerleff
Copy link

Gerleff commented May 22, 2023

I'd like to use something like this:

def make_func_push(func: Callable) -> Callable:
    """Decores func or method, making it push metrics into Prometeus."""
    return prometheus_pusher(metrics=[metrics.metric_factory.get_metrics()], job=pod_name)(metrics.calculate(func))


class PushingProducer(AIOKafkaProducer):
    """Pushing to Prometeus Kafka producer."""

    send = make_func_push(AIOKafkaProducer.send)
    send_batch = make_func_push(AIOKafkaProducer.send_batch)

as producer for whole application by default or with chosen producers.

For now, monkeypatch is an option, but I'd thank much for more flexible solution.

@sternakt
Copy link
Contributor

Hi,

This is how i understood the issue, please tell if I misinterpreted.
So, you want to pass a function to metrics.calculate() which will call it, I guess, when the metrics are calculated to push them to Kafka.

Wouldn't this solve your problem?

app = FastKafka(...)

@app.produces()
async def to_prometheus_metrics(metrics: Metrics) -> MetricsDataFormat:
    return MetricsDataFormat(metrics=metrics)

prometheus_pusher(metrics=[metrics.metric_factory.get_metrics()], job=pod_name)(
    metrics.calculate(to_prometheus_metrics)
)

@Gerleff
Copy link
Author

Gerleff commented May 23, 2023

My goal is to decorate AIOKafkaProducer.send & AIOKafkaProducer.send_batch methods, so

class MyProd(AIOKafkaProducer):
    @prometheus_pusher(metrics=[metrics.metric_factory.get_metrics()], job=pod_name)
    @metrics.calculate
    async def send(...):
        ...
        
    @prometheus_pusher(metrics=[metrics.metric_factory.get_metrics()], job=pod_name)
    @metrics.calculate
    async def send_batch(...):
        ...

With that producer all "send" calls will push metrics.

Can you please explain a little deeper, how provided above solution will solve my problem?

@sternakt
Copy link
Contributor

My mistake, I think I understand now,

So, the prometheus_pusher and metrics decorators collect and push metrics from the producing calls.

Is there a particular reason why you don't decorate the producing functions instead of send and send_batch? Something like this:

@app.produces()
@prometheus_pusher(metrics=[metrics.metric_factory.get_metrics()], job=pod_name)
@metrics.calculate
async def to_some_topic(...) -> ...:
    ...

And about the global producer:

I guess that you want to have a global producer to collect the metrics for all the messages being sent, no matter the topic?
I'm afraid this is not easily done at the moment, the app internally creates a producer for each topic individually.

When we started with the implementation, we had an option to pass the producer object to the produces decorator, so you can pass your own custom producer (or a global one if that would be the case), we could plan for the reintroduction of that feature in the next sprints if that would solve your problem more elegantly

@Gerleff
Copy link
Author

Gerleff commented May 24, 2023

Sorry for late answer(

We are collecting metrics, including Summary. Summary measures time, required to execute function.

@prometheus_pusher(metrics=[metrics.metric_factory.get_metrics()], job=pod_name)
@metrics.calculate
@app.produces()
async def to_some_topic(x):
    return x

mb if decorators are placed that way, we'll summarize time to execute producer's functionality instead of lambda x: x? =)
If so, it's gonna be solution, which can be developed into overriding produces method in FastKafka's child class.

@kumaranvpl kumaranvpl transferred this issue from airtai/faststream Sep 25, 2023
@sternakt sternakt removed their assignment Nov 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants