Replies: 4 comments
-
Off topic: The |
Beta Was this translation helpful? Give feedback.
0 replies
-
Hey @eusonlito 👋
Fixed, thanks! Can I see your code please? I don't have this many threads here |
Beta Was this translation helpful? Give feedback.
0 replies
-
My code is (similar to): $builder = ConsumerBuilder::create(
brokers: $config['brokers'],
groupId: $config['consumer_group_id'],
);
$builder->subscribe($topic);
$builder->withHandler(fn ($message) => $handler($message));
$builder->withSasl(new Sasl(
securityProtocol: $config['security_protocol'],
mechanisms: $config['sasl_mechanism'],
username: $config['sasl_username'],
password: $config['sasl_password'],
));
$builder->build()->consume(); And the config: <?php
return [
/*
| Your kafka brokers url.
*/
'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
/*
| Kafka consumers belonging to the same consumer group share a group id.
| The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by
| establishing that each partition is only consumed by a single consumer from the group.
| This config defines the consumer group id you want to use for your project.
*/
'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'group'),
/*
| After the consumer receives its assignment from the coordinator,
| it must determine the initial position for each assigned partition.
| When the group is first created, before any messages have been consumed, the position is set according to a configurable
| offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset.
| You can choose between "latest", "earliest" or "none".
*/
'offset_reset' => env('KAFKA_OFFSET_RESET', 'earliest'),
/*
| If you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the
| interval set by auto.commit.interval.ms.
*/
'auto_commit' => env('KAFKA_AUTO_COMMIT', true),
'sleep_on_error' => env('KAFKA_ERROR_SLEEP', 5),
'partition' => env('KAFKA_PARTITION', -1),
/*
| Kafka supports 4 compression codecs: none , gzip , lz4 and snappy
*/
'compression' => env('KAFKA_COMPRESSION_TYPE', 'snappy'),
/*
| Choose if debug is enabled or not.
*/
'debug' => env('KAFKA_DEBUG', false),
/*
| Repository for batching messages together
| Implement BatchRepositoryInterface to save batches in different storage
*/
'batch_repository' => env('KAFKA_BATCH_REPOSITORY', \Junges\Kafka\BatchRepositories\InMemoryBatchRepository::class),
'security_protocol' => env('KAFKA_SECURITY_PROTOCOL'),
'sasl_mechanism' => env('KAFKA_SASL_MECHANISM'),
'sasl_username' => env('KAFKA_SASL_USERNAME'),
'sasl_password' => env('KAFKA_SASL_PASSWORD'),
]; |
Beta Was this translation helpful? Give feedback.
0 replies
-
Thanks! I'll take a look tonight |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
My project has a consumer defined as:
And this generate more than 100 threads:
I would like to understand what is the reason, if they are necessary, if it is configurable, or if it is advisable to leave it like that.
Thanks a lot for this great package.
Beta Was this translation helpful? Give feedback.
All reactions