Skip to content

Commit

Permalink
[ISSUE apache#103] Fix seek offset and deserialize bug in newSource (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
844028312 authored and tao.chen committed Mar 28, 2024
1 parent f48eacf commit 7d0ce72
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 14 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<junit.version>4.13.2</junit.version>
<junit-jupiter.version>5.9.2</junit-jupiter.version>
<powermock.version>1.7.4</powermock.version>
<jaxb-api.version>2.3.1</jaxb-api.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.connector.rocketmq.source;

import com.alibaba.fastjson.JSON;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
Expand All @@ -25,8 +26,6 @@
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
Expand Down Expand Up @@ -240,7 +239,15 @@ public CompletableFuture<Long> seekCommittedOffset(MessageQueue messageQueue) {
long offset =
consumer.getOffsetStore()
.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
LOG.error(

if (offset == -1) {
offset = adminExt.minOffset(messageQueue);
LOG.info(
"Consumer seek committed offset from remote, offset=-1,mq={},use minOffset={}",
UtilAll.getQueueDescription(messageQueue),
offset);
}
LOG.info(
"Consumer seek committed offset from remote, mq={}, offset={}",
UtilAll.getQueueDescription(messageQueue),
offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

package org.apache.flink.connector.rocketmq.source.enumerator;

import org.apache.flink.core.io.SimpleVersionedSerializer;

import com.alibaba.fastjson.JSON;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,7 +92,7 @@ private Set<MessageQueue> deserializeMessageQueue(byte[] serialized) throws IOEx
String topic = in.readUTF();
int queueId = in.readInt();

MessageQueue queue = new MessageQueue(brokerName, topic, queueId);
MessageQueue queue = new MessageQueue(topic, brokerName, queueId);
result.add(queue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.connector.rocketmq.source.enumerator;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Sets;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
Expand All @@ -34,15 +36,11 @@
import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.util.FlinkRuntimeException;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Sets;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -166,6 +164,9 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
*/
@Override
public void addSplitsBack(List<RocketMQSourceSplit> splits, int subtaskId) {
SourceSplitChangeResult sourceSplitChangeResult =
new SourceSplitChangeResult(new HashSet<>(splits));
this.calculateSplitAssignment(sourceSplitChangeResult);
// If the failed subtask has already restarted, we need to assign splits to it
if (context.registeredReaders().containsKey(subtaskId)) {
sendSplitChangesToRemote(Collections.singleton(subtaskId));
Expand Down Expand Up @@ -321,7 +322,7 @@ private void sendSplitChangesToRemote(Set<Integer> pendingReaders) {
}

final Set<RocketMQSourceSplit> pendingAssignmentForReader =
this.pendingSplitAssignmentMap.get(pendingReader);
this.pendingSplitAssignmentMap.remove(pendingReader);

// Put pending assignment into incremental assignment
if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -190,7 +188,6 @@ public void handleSplitsChanges(SplitsChange<RocketMQSourceSplit> splitsChange)
public void wakeUp() {
LOG.debug("Wake up the split reader in case the fetcher thread is blocking in fetch().");
wakeup = true;
this.consumer.wakeup();
}

@Override
Expand Down

0 comments on commit 7d0ce72

Please sign in to comment.