Skip to content

Commit

Permalink
[flink] Introduce zorder/order sort compact for dynamic bucket table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Oct 10, 2023
1 parent 7e684f9 commit cef2fc1
Show file tree
Hide file tree
Showing 17 changed files with 689 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.index;

import org.apache.paimon.data.BinaryRow;

/** Assigner a bucket for a record, just used in dynamic bucket table. */
public interface BucketAssigner {

int assign(BinaryRow partition, int hash);

void prepareCommit(long commitIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Assign bucket for key hashcode. */
public class HashBucketAssigner {
public class HashBucketAssigner implements BucketAssigner {

private static final Logger LOG = LoggerFactory.getLogger(HashBucketAssigner.class);

Expand Down Expand Up @@ -64,6 +64,7 @@ public HashBucketAssigner(
}

/** Assign a bucket for key hash of a record. */
@Override
public int assign(BinaryRow partition, int hash) {
int recordAssignId = computeAssignId(hash);
checkArgument(
Expand All @@ -88,6 +89,7 @@ public int assign(BinaryRow partition, int hash) {
}

/** Prepare commit to clear outdated partition index. */
@Override
public void prepareCommit(long commitIdentifier) {
long latestCommittedIdentifier;
if (partitionIndex.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.paimon.index;

import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.utils.IntHashSet;
import org.apache.paimon.utils.IntIterator;

import javax.annotation.Nullable;

import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -40,7 +43,10 @@ public class HashIndexMaintainer implements IndexMaintainer<KeyValue> {
private boolean modified;

private HashIndexMaintainer(
IndexFileHandler fileHandler, Long snapshotId, BinaryRow partition, int bucket) {
IndexFileHandler fileHandler,
@Nullable Long snapshotId,
BinaryRow partition,
int bucket) {
this.fileHandler = fileHandler;
IntHashSet hashcode = new IntHashSet();
if (snapshotId != null) {
Expand Down Expand Up @@ -93,6 +99,11 @@ public List<IndexFileMeta> prepareCommit() {
return Collections.emptyList();
}

@VisibleForTesting
public boolean isEmpty() {
return hashcode.size() == 0;
}

/** Factory to restore {@link HashIndexMaintainer}. */
public static class Factory implements IndexMaintainer.Factory<KeyValue> {

Expand All @@ -104,7 +115,7 @@ public Factory(IndexFileHandler handler) {

@Override
public IndexMaintainer<KeyValue> createOrRestore(
Long snapshotId, BinaryRow partition, int bucket) {
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
return new HashIndexMaintainer(handler, snapshotId, partition, bucket);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.data.BinaryRow;

import javax.annotation.Nullable;

import java.util.List;

/** Maintainer to maintain index. */
Expand All @@ -31,6 +33,7 @@ public interface IndexMaintainer<T> {

/** Factory to restore {@link IndexMaintainer}. */
interface Factory<T> {
IndexMaintainer<T> createOrRestore(Long snapshotId, BinaryRow partition, int bucket);
IndexMaintainer<T> createOrRestore(
@Nullable Long snapshotId, BinaryRow partition, int bucket);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.index;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.utils.Int2ShortHashMap;

import java.util.HashMap;
import java.util.Map;

/** When we need to overwrite the table, we should use this to avoid loading index. */
public class SimpleHashBucketAssigner implements BucketAssigner {

private final int numAssigners;
private final int assignId;
private final long targetBucketRowNumber;

private final Map<BinaryRow, SimplePartitionIndex> partitionIndex;

public SimpleHashBucketAssigner(int numAssigners, int assignId, long targetBucketRowNumber) {
this.numAssigners = numAssigners;
this.assignId = assignId;
this.targetBucketRowNumber = targetBucketRowNumber;
this.partitionIndex = new HashMap<>();
}

@Override
public int assign(BinaryRow partition, int hash) {
SimplePartitionIndex index =
this.partitionIndex.computeIfAbsent(partition, p -> new SimplePartitionIndex());
return index.assign(hash);
}

@Override
public void prepareCommit(long commitIdentifier) {
// do nothing
}

/** Simple partition bucket hash assigner. */
private class SimplePartitionIndex {

public final Int2ShortHashMap hash2Bucket = new Int2ShortHashMap();
private final Map<Integer, Long> bucketInformation;
private int currentBucket;

private SimplePartitionIndex() {
bucketInformation = new HashMap<>();
loadNewBucket();
}

public int assign(int hash) {
// the same hash should go into the same bucket
if (hash2Bucket.containsKey(hash)) {
return hash2Bucket.get(hash);
}

Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 0L);
if (num >= targetBucketRowNumber) {
loadNewBucket();
}
bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L : l + 1);
hash2Bucket.put(hash, (short) currentBucket);
return currentBucket;
}

private void loadNewBucket() {
for (int i = 0; i < Short.MAX_VALUE; i++) {
if (i % numAssigners == assignId && !bucketInformation.containsKey(i)) {
currentBucket = i;
return;
}
}
throw new RuntimeException(
"Can't find a suitable bucket to assign, all the bucket are assigned?");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ public WriterContainer<T> createWriterContainer(
IndexMaintainer<T> indexMaintainer =
indexFactory == null
? null
: indexFactory.createOrRestore(latestSnapshotId, partition, bucket);
: indexFactory.createOrRestore(
ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
RecordWriter<T> writer =
createWriter(partition.copy(), bucket, restoreFiles, null, compactExecutor());
notifyNewWriter(writer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.index;

import org.apache.paimon.data.BinaryRow;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/** Tests for {@link SimpleHashBucketAssigner}. */
public class SimpleHashBucketAssignerTest {

@Test
public void testAssign() {
SimpleHashBucketAssigner simpleHashBucketAssigner = new SimpleHashBucketAssigner(2, 0, 100);

BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
int hash = 0;

for (int i = 0; i < 100; i++) {
int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isEqualTo(0);
}

for (int i = 0; i < 100; i++) {
int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isEqualTo(2);
}

int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isEqualTo(4);
}

@Test
public void testAssignWithSameHash() {
SimpleHashBucketAssigner simpleHashBucketAssigner = new SimpleHashBucketAssigner(2, 0, 100);

BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
int hash = 0;

for (int i = 0; i < 100; i++) {
int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isEqualTo(0);
}

// reset hash, the record will go into bucket 0
hash = 0;
for (int i = 0; i < 100; i++) {
int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isEqualTo(0);
}
}
}
Loading

0 comments on commit cef2fc1

Please sign in to comment.