Skip to content

Commit

Permalink
[improve][broker][PIP-195] Add topicName and cursorName for ledger me…
Browse files Browse the repository at this point in the history
…tadata of bucket snapshot (apache#19802)
  • Loading branch information
coderzc authored Mar 15, 2023
1 parent 0e96ded commit 160a864
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public final class LedgerMetadataUtils {
private static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo";
private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";

private static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKETID = "pulsar/delayedIndexBucketId";
private static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = "pulsar/delayedIndexBucketKey";
private static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = "pulsar/delayedIndexTopic";
private static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = "pulsar/delayedIndexCursor";

/**
* Build base metadata for every ManagedLedger.
Expand Down Expand Up @@ -108,14 +110,19 @@ public static Map<String, byte[]> buildMetadataForSchema(String schemaId) {
/**
* Build additional metadata for a delayed message index bucket.
*
* @param bucketKey key of the delayed message bucket
* @param bucketKey key of the delayed message bucket
* @param topicName name of the topic
* @param cursorName name of the cursor
* @return an immutable map which describes the schema
*/
public static Map<String, byte[]> buildMetadataForDelayedIndexBucket(String bucketKey) {
public static Map<String, byte[]> buildMetadataForDelayedIndexBucket(String bucketKey,
String topicName, String cursorName) {
return Map.of(
METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET,
METADATA_PROPERTY_DELAYED_INDEX_BUCKETID, bucketKey.getBytes(StandardCharsets.UTF_8)
METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY, bucketKey.getBytes(StandardCharsets.UTF_8),
METADATA_PROPERTY_DELAYED_INDEX_TOPIC, topicName.getBytes(StandardCharsets.UTF_8),
METADATA_PROPERTY_DELAYED_INDEX_CURSOR, cursorName.getBytes(StandardCharsets.UTF_8)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
@Override
public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments,
String bucketKey) {
return createLedger(bucketKey)
String bucketKey, String topicName, String cursorName) {
return createLedger(bucketKey, topicName, cursorName)
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
.thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
.thenCompose(__ -> closeLedger(ledgerHandle))
Expand Down Expand Up @@ -143,9 +143,10 @@ private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntr
}

@NotNull
private CompletableFuture<LedgerHandle> createLedger(String bucketKey) {
private CompletableFuture<LedgerHandle> createLedger(String bucketKey, String topicName, String cursorName) {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(bucketKey);
Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(bucketKey,
topicName, cursorName);
bookKeeper.asyncCreateLedger(
config.getManagedLedgerDefaultEnsembleSize(),
config.getManagedLedgerDefaultWriteQuorum(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.common.util.Codec;
import org.roaringbitmap.RoaringBitmap;

@Slf4j
Expand Down Expand Up @@ -130,8 +131,11 @@ CompletableFuture<Long> asyncSaveBucketSnapshot(
ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
final String cursorName = Codec.decode(cursor.getName());
final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
return executeWithRetry(
() -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey)
() -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey,
topicName, cursorName)
.whenComplete((__, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to create bucket snapshot, bucketKey: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ public interface BucketSnapshotStorage {
/**
* Create a delayed message index bucket snapshot with metadata and bucketSnapshotSegments.
*
* @param snapshotMetadata the metadata of snapshot
* @param snapshotMetadata the metadata of snapshot
* @param bucketSnapshotSegments the list of snapshot segments
* @param bucketKey the key of bucket is used to generate custom storage metadata
* @param bucketKey the key of bucket is used to generate custom storage metadata
* @param topicName the name of topic is used to generate custom storage metadata
* @param cursorName the name of cursor is used to generate custom storage metadata
* @return the future with bucketId(ledgerId).
*/
CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments,
String bucketKey);
String bucketKey, String topicName, String cursorName);

/**
* Get delayed message index bucket snapshot metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ public class BrokerService implements Closeable {
private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

@Getter
@VisibleForTesting
private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
private final ServerBootstrap defaultServerBootstrap;
private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,17 @@ protected void cleanup() throws Exception {
bucketSnapshotStorage.close();
}

private static final String TOPIC_NAME = "topicName";
private static final String CURSOR_NAME = "sub";

@Test
public void testCreateSnapshot() throws ExecutionException, InterruptedException {
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
Long bucketId = future.get();
Assert.assertNotNull(bucketId);
}
Expand Down Expand Up @@ -90,7 +93,7 @@ public void testGetSnapshot() throws ExecutionException, InterruptedException {

CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
Long bucketId = future.get();
Assert.assertNotNull(bucketId);

Expand Down Expand Up @@ -129,7 +132,7 @@ public void testGetSnapshotMetadata() throws ExecutionException, InterruptedExce

CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
Long bucketId = future.get();
Assert.assertNotNull(bucketId);

Expand All @@ -151,7 +154,7 @@ public void testDeleteSnapshot() throws ExecutionException, InterruptedException
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
Long bucketId = future.get();
Assert.assertNotNull(bucketId);

Expand Down Expand Up @@ -189,7 +192,7 @@ public void testGetBucketSnapshotLength() throws ExecutionException, Interrupted

CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
Long bucketId = future.get();
Assert.assertNotNull(bucketId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void injectDeleteException(Throwable throwable) {

@Override
public CompletableFuture<Long> createBucketSnapshot(
SnapshotMetadata snapshotMetadata, List<SnapshotSegment> bucketSnapshotSegments, String bucketKey) {
SnapshotMetadata snapshotMetadata, List<SnapshotSegment> bucketSnapshotSegments, String bucketKey,
String topicName, String cursorName) {
Throwable throwable = createExceptionQueue.poll();
if (throwable != null) {
return FutureUtil.failedFuture(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Object[][] provider(Method method) throws Exception {
bucketSnapshotStorage.start();
ManagedCursor cursor = new MockManagedCursor("my_test_cursor");
doReturn(cursor).when(dispatcher).getCursor();
doReturn(cursor.getName()).when(dispatcher).getName();
doReturn("persistent://public/default/testDelay" + " / " + cursor.getName()).when(dispatcher).getName();

final String methodName = method.getName();
return switch (methodName) {
Expand Down

0 comments on commit 160a864

Please sign in to comment.