Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker, build, client, fn, io, ml, proxy, sql, test] Merge apache/branch-3.1 into 3.1_ds (II) #215

Closed
wants to merge 82 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
efc4bf3
[fix][sec] Add OWASP Dependency Check suppressions (#21281)
lhotari Sep 30, 2023
9cd2f9e
[fix][build] Upgrade Lombok to 1.18.30 to support compiling with JDK2…
lhotari Sep 30, 2023
5900635
[fix][ml] Fix thread safe issue with RangeCache.put and RangeCache.cl…
lhotari Oct 7, 2023
33553cc
[fix][test] Fix flaky test NarUnpackerTest (#21328)
lhotari Oct 9, 2023
8e3f68f
[fix][test] Fix flaky CompactionTest.testDispatcherMaxReadSizeBytes (…
lhotari Oct 9, 2023
031809e
[fix][broker]Check that the super user role is in the MultiRolesToken…
tuteng Aug 11, 2023
25c84dd
[fix][sec] Fix MultiRoles token provider when using anonymous clients…
merlimat Oct 10, 2023
c98a019
[fix] [broker] fix flaky test PatternTopicsConsumerImplTest (#21222)
poorbarcode Sep 23, 2023
b38c563
[improve] [auto-recovery] [branch-3.1] Migrate the replication testin…
horizonzy Oct 11, 2023
50eb846
[fix] [metadata] Fix zookeeper related flacky test (#21310)
horizonzy Oct 8, 2023
a88fe1f
[fix][broker][branch-3.1] Fix inconsistent topic policy (#21255)
mattisonchao Oct 14, 2023
27f13a1
[fix][ci] Fix docker image building by releasing more disk space befo…
lhotari Oct 14, 2023
662130f
[fix] [broker] Make specified producer could override the previous on…
poorbarcode Sep 13, 2023
baca9a9
[fix][test] Fix a resource leak in ClusterMigrationTest (#21366)
lhotari Oct 16, 2023
ebc79f2
[feat][sql] Support UUID for json and avro (#21267)
liangyepianzhou Oct 9, 2023
d5edbbf
[fix][test] Fix some resource leaks in compaction tests (#21374)
lhotari Oct 17, 2023
f5baebf
[fix][test] Fix resource leaks with Pulsar Functions tests (#21378)
lhotari Oct 17, 2023
0454410
[fix][broker][branch-3.1] Fix lookup heartbeat and sla namespace bund…
Demogorgon314 Oct 18, 2023
9c467bb
[improve][ci] Add new CI unit test group "Broker Group 4" with cluste…
lhotari Oct 18, 2023
dfc2084
[fix][broker] rackaware policy is ineffective when delete zk rack inf…
TakaHiR07 Oct 7, 2023
2ed9a76
[fix][broker] Fix inconsistent topic policy (#21231)
mattisonchao Sep 26, 2023
aba4764
[fix][broker] Fix heartbeat namespace create transaction internal top…
TakaHiR07 Oct 19, 2023
7d097f7
[fix][broker] Fix heartbeat namespace create event topic and cannot d…
TakaHiR07 Oct 19, 2023
5fda6ad
[fix] [bk-client] Fix bk client MinNumRacksPerWriteQuorum and Enforc…
horizonzy Oct 11, 2023
a9e58b1
[improve][broker] use ConcurrentHashMap in ServiceUnitStateChannel an…
heesung-sn Oct 11, 2023
2b2b83d
[fix][broker] Fix unload operation stuck when use ExtensibleLoadManag…
Demogorgon314 Oct 19, 2023
8e5f00e
[fix][sec] Upgrade snappy-java to 1.1.10.5 (#21280)
lhotari Oct 3, 2023
1496925
[fix][proxy] Move status endpoint out of auth coverage (#21428)
mattisonchao Oct 24, 2023
a790d7f
[fix][sec] Upgrade Jetty to 9.4.53 to address CVE-2023-44487 (#21395)
lhotari Oct 19, 2023
6d8e17f
[fix][sec] Upgrade Netty to 4.1.100 to address CVE-2023-44487 (#21397)
lhotari Oct 19, 2023
119b832
Bump version to 3.1.2-SNAPSHOT
lhotari Oct 26, 2023
ce322f0
[fix][sec] Upgrade Zookeeper to 3.8.3 to address CVE-2023-44981 (#21398)
lhotari Oct 19, 2023
9919a37
[fix][broker] Fix MultiRoles token provider NPE when using anonymous …
Technoboy- Oct 25, 2023
7a2e3f6
[fix][broker] Ignore individual acknowledgment for CompactorSubscript…
coderzc Oct 25, 2023
bd9e42b
[fix][build] Fix apt download issue in building the docker image (#21…
lhotari Oct 31, 2023
b71fedc
[fix][txn] Ack all message ids when ack chunk messages with transacti…
liangyepianzhou Nov 8, 2023
c46ed2d
[fix][client] Avert extensive time consumption during table view cons…
liangyepianzhou Nov 6, 2023
c5b4af3
[fix][test] Fix LocalBookkeeperEnsemble resource leak in tests (#21407)
lhotari Oct 23, 2023
7bc4956
[fix][txn] OpRequestSend reuse problem cause tbClient commitTxnOnTopi…
TakaHiR07 Nov 3, 2023
47232c7
[fix][broker] Avoid pass null role in MultiRolesTokenAuthorizationPro…
mattisonchao Nov 2, 2023
80f921a
[fix][broker] Fix issue with consumer read uncommitted messages from …
coderzc Nov 3, 2023
e3cd354
[fix][broker] Fix PulsarService/BrokerService shutdown when brokerShu…
lhotari Nov 1, 2023
d39482f
[fix][broker] Fix namespace bundle stuck in unloading status (#21445)
mattisonchao Nov 8, 2023
af65e30
[fix][broker] Fix create topic with different auto creation strategie…
mattisonchao Nov 10, 2023
bf361fc
[fix][broker] Fix the deadlock when using BookieRackAffinityMapping w…
erobot Nov 10, 2023
8b1a90b
[fix][broker] Fix failure while creating non-durable cursor with inac…
rdhabalia Nov 4, 2023
515cf1d
[fix][client] Fix print error log 'Auto getting partitions failed' wh…
hanmz Nov 11, 2023
d386d14
[fix][ml] Fix unfinished callback when deleting managed ledger (#21530)
mattisonchao Nov 10, 2023
467e9c0
[fix] [broker] Fix thousands orphan PersistentTopic caused OOM (#21540)
poorbarcode Nov 11, 2023
cf78b71
[fix] [ml] Fix orphan scheduled task for ledger create timeout check …
poorbarcode Nov 11, 2023
ceafb3b
Revert "[fix][client] Avert extensive time consumption during table v…
liangyepianzhou Nov 13, 2023
c616deb
Revert "[fix][broker] Fix issue with consumer read uncommitted messag…
coderzc Nov 13, 2023
a303bd1
[fix][broker][branch-3.1] Fix issue with consumer read uncommitted me…
coderzc Nov 13, 2023
5f0a160
[fix][broker] Duplicate LedgerOffloader creation when namespace/topic…
shibd Nov 20, 2023
6ddf83d
[improve][broker] Support not retaining null-key message during topic…
coderzc Nov 22, 2023
4f7d084
[fix] [broker] Delete topic timeout due to NPE (#21595)
poorbarcode Nov 21, 2023
d735346
[fix][broker] Fix setReplicatedSubscriptionStatus incorrect behavior …
liudezhi2098 Nov 12, 2023
a6df6b9
[fix][broker] Do not write replicated snapshot marker when the topic …
liangyepianzhou Nov 14, 2023
3657ee1
[fix][broker] Fix resource_quota_zpath (#21461)
AnonHxy Nov 16, 2023
a8b311f
[cleanup][client] Fix inconsistent API annotations of `getTopicName` …
liangyepianzhou Nov 27, 2023
e6bebab
[fix][sec] Upgrade rabbitmq client to address CVE-2023-46120 (#21619)
liangyepianzhou Nov 27, 2023
372b443
[fix] [build] rename schema_example.conf to schema_example.json (#21447)
poorbarcode Oct 27, 2023
9bc415d
[improve] [broker] Let the producer request success at the first time…
poorbarcode Nov 2, 2023
1ab10da
[fix] [log] fix the vague response if topic not found (#20932)
poorbarcode Nov 28, 2023
9f11039
Release 3.1.2
AnonHxy Nov 30, 2023
0970471
[fix][build] Fix Stage Docker images fail on M1 Mac (#21659)
AnonHxy Dec 2, 2023
80ac009
[improve][admin] Add clusters check when set replication clusters (#2…
Technoboy- Dec 2, 2023
8fc56f0
[fix][broker] Fix memory leak during topic compaction (#21647)
coderzc Dec 2, 2023
78a3964
[fix][broker] Fix lookupRequestSemaphore leak when topic not found (#…
pengxiangrui127 Dec 4, 2023
2eb5423
[fix][admin] Fix KeyValue schema compatibility check caused OOM (#21645)
Technoboy- Nov 30, 2023
4aabd42
[fix][broker] Fixed getting incorrect KeyValue schema version (#21632)
Technoboy- Nov 29, 2023
d458b80
[fix][broker] Fix incorrect unack count when using shared subscriptio…
1Jack2 Nov 30, 2023
a614aad
[improve][build] Upgrade Apache ZooKeeper to 3.9.1 (#20933)
eolivelli Nov 29, 2023
16e6c46
[fix][offload] Don't cleanup data when offload met MetaStore exceptio…
zymap Dec 7, 2023
89b56a1
[fix][broker] Fix typo in the config key (#21690)
coderzc Dec 7, 2023
2393ca7
[improve][broker] Print recoverBucketSnapshot log if cursorProperties…
coderzc Dec 4, 2023
c4196fb
[fix][broker] Record GeoPersistentReplicator.msgOut before producer#s…
nodece Dec 8, 2023
23bf51a
[fix][sec] Bump avro version to 1.11.3 for CVE-2023-39410 (#21341)
tisonkun Oct 17, 2023
18acfd1
Merge remote-tracking branch 'apache/branch-3.1' into 3.1_ds_merge-br…
nikhil-ctds Dec 13, 2023
2ac5b96
Update groupID of dependencies to com.datastax.oss
nikhil-ctds Dec 13, 2023
78ec780
Updated groupID of artifact
nikhil-ctds Dec 19, 2023
4f3f8f9
Test SLF4J exclusion
nikhil-ctds Dec 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[fix][broker] Ignore individual acknowledgment for CompactorSubscript…
…ion when an entry has been filtered. (apache#21434)
coderzc authored and Technoboy- committed Oct 30, 2023
commit 7a2e3f67b552cf3436ca7b62bebe2a50f8235d66
Original file line number Diff line number Diff line change
@@ -176,14 +176,16 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
if (Markers.isTxnMarker(msgMetadata)) {
// because consumer can receive message is smaller than maxReadPosition,
// so this marker is useless for this subscription
individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
} else if (((PersistentTopic) subscription.getTopic())
.isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
(PositionImpl) entry.getPosition())) {
individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
@@ -200,7 +202,8 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i

entries.set(i, null);
entry.release();
individualAcknowledgeMessageIfNeeded(pos, Collections.emptyMap());
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
Collections.emptyMap());
continue;
} else if (trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
@@ -271,8 +274,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
}
}
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
subscription.acknowledgeMessage(entriesToFiltered, AckType.Individual,
Collections.emptyMap());
individualAcknowledgeMessageIfNeeded(entriesToFiltered, Collections.emptyMap());

int filtered = entriesToFiltered.size();
Topic topic = subscription.getTopic();
@@ -301,9 +303,9 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
return totalEntries;
}

private void individualAcknowledgeMessageIfNeeded(Position position, Map<String, Long> properties) {
private void individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<String, Long> properties) {
if (!(subscription instanceof PulsarCompactorSubscription)) {
subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Individual, properties);
subscription.acknowledgeMessage(positions, AckType.Individual, properties);
}
}

Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -30,16 +31,16 @@
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;

import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -58,11 +59,15 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -150,6 +155,58 @@ public void testOverride() throws Exception {
consumer.close();
}

@Test
public void testEntryFilterWithCompactor() throws Exception {
conf.setAllowOverrideEntryFilters(true);
String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();

List<String> messages = new ArrayList<>();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topic).create();
producer.newMessage().key("K1").value("V1").send();
producer.newMessage().key("K2").value("V2").send();
producer.newMessage().key("K3").value("V3").send();
producer.newMessage().key("K4").value("V4").send();
messages.add("V2");
messages.add("V4");

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();

// set topic level entry filters
EntryFilter mockFilter = mock(EntryFilter.class);
doAnswer(invocationOnMock -> {
FilterContext filterContext = invocationOnMock.getArgument(1);
String partitionKey = filterContext.getMsgMetadata().getPartitionKey();
if (partitionKey.equals("K1") || partitionKey.equals("K3")) {
return EntryFilter.FilterResult.REJECT;
} else {
return EntryFilter.FilterResult.ACCEPT;
}
}).when(mockFilter).filterEntry(any(Entry.class), any(FilterContext.class));
setMockFilterToTopic(topicRef, List.of(mockFilter));

List<String> results = new ArrayList<>();
RawReader rawReader = RawReader.create(pulsarClient, topic, Compactor.COMPACTION_SUBSCRIPTION).get();
while (true) {
boolean hasMsg = rawReader.hasMessageAvailableAsync().get();
if (hasMsg) {
try (RawMessage m = rawReader.readNextAsync().get()) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
Commands.skipMessageMetadata(headersAndPayload);
byte[] bytes = new byte[headersAndPayload.readableBytes()];
headersAndPayload.readBytes(bytes);

results.add(new String(bytes));
}
} else {
break;
}
}
rawReader.closeAsync().get();

Assert.assertEquals(messages, results);
}

@SneakyThrows
private void setMockFilterToTopic(PersistentTopic topicRef, List<EntryFilter> mockFilter) {
FieldUtils.writeField(topicRef, "entryFilters", Pair.of(null, mockFilter), true);