You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Properties consumerProps = new Properties(); consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, ONS_ADDR); consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, CONSUMER_GROUP); consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, CONSUMER_TOPIC); consumerProps.setProperty(RocketMQConfig.ACCESS_KEY, ONS_KEY); consumerProps.setProperty(RocketMQConfig.SECRET_KEY, SECRET_KEY); RocketMQSourceFunction<Map<Object,Object>> source = new RocketMQSourceFunction( new SimpleKeyValueDeserializationSchema("id", "data"), consumerProps); source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST); env.addSource(source).setParallelism(1).print(" =======> ");
ErrorLog
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=MQ_INST_1956905707885195_BauvlVUQ%dev-po-parcel-route-add, brokerName=qd-internet-pull-01, queueId=0]
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:543)
at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seekToEnd(DefaultLitePullConsumerImpl.java:565)
at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seekToEnd(DefaultLitePullConsumer.java:297)
at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.initOffsets(RocketMQSourceFunction.java:396)
at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.open(RocketMQSourceFunction.java:254)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
The text was updated successfully, but these errors were encountered:
please reset the branch of master to 90b00be.The pr of #46 has some bugs and author has not fixed it yet.
You can look the detail of this bug in issue #69 (comment)
When I tested connecting to Alibaba Cloud RocketMQ in Local Idea, I encountered some problems. How can I solve this problem
env
<rocketmq.version>4.7.1</rocketmq.version>
<flink.version>1.15.0</flink.version>
Demo Code
Properties consumerProps = new Properties(); consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, ONS_ADDR); consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, CONSUMER_GROUP); consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, CONSUMER_TOPIC); consumerProps.setProperty(RocketMQConfig.ACCESS_KEY, ONS_KEY); consumerProps.setProperty(RocketMQConfig.SECRET_KEY, SECRET_KEY); RocketMQSourceFunction<Map<Object,Object>> source = new RocketMQSourceFunction( new SimpleKeyValueDeserializationSchema("id", "data"), consumerProps); source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST); env.addSource(source).setParallelism(1).print(" =======> ");
ErrorLog
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=MQ_INST_1956905707885195_BauvlVUQ%dev-po-parcel-route-add, brokerName=qd-internet-pull-01, queueId=0]
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:543)
at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seekToEnd(DefaultLitePullConsumerImpl.java:565)
at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seekToEnd(DefaultLitePullConsumer.java:297)
at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.initOffsets(RocketMQSourceFunction.java:396)
at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.open(RocketMQSourceFunction.java:254)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
The text was updated successfully, but these errors were encountered: