Replies: 2 comments 5 replies
-
See the Kafka Documentation.
You can override kafka consumer properties for the factory using /**
* Set the consumer properties that will be merged with the consumer properties
* provided by the consumer factory; properties here will supersede any with the same
* name(s) in the consumer factory.
* {@code group.id} and {@code client.id} are ignored.
* Property keys must be {@link String}s.
* @param kafkaConsumerProperties the properties.
* @see org.apache.kafka.clients.consumer.ConsumerConfig
* @see #setGroupId(String)
* @see #setClientId(String)
*/
public void setKafkaConsumerProperties(Properties kafkaConsumerProperties) { or at the indidual /**
* Kafka consumer properties; they will supersede any properties with the same name
* defined in the consumer factory (if the consumer factory supports property overrides).
* <p>
* <b>Supported Syntax</b>
* <p>The supported syntax for key-value pairs is the same as the
* syntax defined for entries in a Java
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
* <ul>
* <li>{@code key=value}</li>
* <li>{@code key:value}</li>
* <li>{@code key value}</li>
* </ul>
* {@code group.id} and {@code client.id} are ignored.
* @return the properties.
* @since 2.2.4
* @see org.apache.kafka.clients.consumer.ConsumerConfig
* @see #groupId()
* @see #clientIdPrefix()
*/
String[] properties() default {}; There is a similar override mechanism in the |
Beta Was this translation helpful? Give feedback.
-
@vkpuduran Did you manage to solve this issue? |
Beta Was this translation helpful? Give feedback.
-
Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. I am creating a factory for each topic. My code is as below,
consumerFactoryUsingTopicName(String topicName) -> returns the default config values like,
Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - jaasConfig -> org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.
**
2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-as-xxx-consumer-local-1, groupId=ps-as-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container
**
Not sure about what is missing here? Can't we have different factories with different credentials to access the topics in the JVM? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?
Beta Was this translation helpful? Give feedback.
All reactions