Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong committed Dec 3, 2024
2 parents 757f96a + d33b871 commit f18047e
Show file tree
Hide file tree
Showing 54 changed files with 1,907 additions and 419 deletions.
11 changes: 11 additions & 0 deletions docs/content/migration/iceberg-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,20 @@ you also need to set some (or all) of the following table options when creating
<td>Boolean</td>
<td>Should use the legacy manifest version to generate Iceberg's 1.4 manifest files.</td>
</tr>
<tr>
<td><h5>metadata.iceberg.hive-client-class</h5></td>
<td style="word-wrap: break-word;">org.apache.hadoop.hive.metastore.HiveMetaStoreClient</td>
<td>String</td>
<td>Hive client class name for Iceberg Hive Catalog.</td>
</tr>
</tbody>
</table>

## AWS Glue Catalog

You can use Hive Catalog to connect AWS Glue metastore, you can use set `'metadata.iceberg.hive-client-class'` to
`'com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient'`.

## AWS Athena

AWS Athena may use old manifest reader to read Iceberg manifest by names, we should let Paimon producing legacy Iceberg
Expand Down
11 changes: 11 additions & 0 deletions docs/content/spark/auxiliary.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ SHOW PARTITIONS my_table;
SHOW PARTITIONS my_table PARTITION (dt=20230817);
```

## Show Table Extended
The SHOW TABLE EXTENDED statement is used to list table or partition information.

```sql
-- Lists tables that satisfy regular expressions
SHOW TABLE EXTENDED IN db_name LIKE 'test*';

-- Lists the specified partition information for the table
SHOW TABLE EXTENDED IN db_name LIKE 'table_name' PARTITION(pt = '2024');
```

## Analyze table

The ANALYZE TABLE statement collects statistics about the table, that are to be used by the query optimizer to find a better query execution plan.
Expand Down
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@
</tr>
<tr>
<td><h5>lookup.local-file-type</h5></td>
<td style="word-wrap: break-word;">hash</td>
<td style="word-wrap: break-word;">sort</td>
<td><p>Enum</p></td>
<td>The local file type for lookup.<br /><br />Possible values:<ul><li>"sort": Construct a sorted file for lookup.</li><li>"hash": Construct a hash file for lookup.</li></ul></td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ protected Pair<String, LookupStoreFactory.Context> writeData(
new CacheManager(MemorySize.ofMebiBytes(10)),
keySerializer.createSliceComparator());

File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
String name =
String.format(
"%s-%s-%s", options.lookupLocalFileType(), valueLength, bloomFilterEnabled);
File file = new File(tempDir.toFile(), UUID.randomUUID() + "-" + name);
LookupStoreWriter writer = factory.createWriter(file, createBloomFiler(bloomFilterEnabled));
int i = 0;
for (byte[] input : inputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void readData(
LookupStoreFactory factory =
LookupStoreFactory.create(
options,
new CacheManager(MemorySize.ofMebiBytes(10)),
new CacheManager(MemorySize.ofMebiBytes(20), 0.5),
new RowCompactedSerializer(RowType.of(new IntType()))
.createSliceComparator());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<LookupLocalFileType> LOOKUP_LOCAL_FILE_TYPE =
key("lookup.local-file-type")
.enumType(LookupLocalFileType.class)
.defaultValue(LookupLocalFileType.HASH)
.defaultValue(LookupLocalFileType.SORT)
.withDescription("The local file type for lookup.");

public static final ConfigOption<Float> LOOKUP_HASH_LOAD_FACTOR =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public Cache build() {
org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder
.newBuilder()
.weigher(CacheBuilder::weigh)
// The concurrency level determines the number of segment caches in
// Guava,limiting the maximum block entries held in cache. Since we do
// not access this cache concurrently, it is set to 1.
.concurrencyLevel(1)
.maximumWeight(memorySize.getBytes())
.removalListener(this::onRemoval)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public byte[] lookup(byte[] key) throws IOException {
return null;
}

private BlockIterator getNextBlock() throws IOException {
private BlockIterator getNextBlock() {
// index block handle, point to the key, value position.
MemorySlice blockHandle = indexBlockIterator.next().getValue();
BlockReader dataBlock =
Expand Down Expand Up @@ -134,42 +134,41 @@ private BlockReader readBlock(BlockHandle blockHandle, boolean index) {
blockCache.getBlock(
blockHandle.offset(),
blockHandle.size(),
bytes -> {
MemorySegment block = MemorySegment.wrap(bytes);
int crc32cCode = crc32c(block, blockTrailer.getCompressionType());
checkArgument(
blockTrailer.getCrc32c() == crc32cCode,
String.format(
"Expected CRC32C(%d) but found CRC32C(%d) for file(%s)",
blockTrailer.getCrc32c(), crc32cCode, filePath));

// decompress data
BlockCompressionFactory compressionFactory =
BlockCompressionFactory.create(
blockTrailer.getCompressionType());
if (compressionFactory == null) {
return bytes;
} else {
MemorySliceInput compressedInput =
MemorySlice.wrap(block).toInput();
byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
BlockDecompressor decompressor =
compressionFactory.getDecompressor();
int uncompressedLength =
decompressor.decompress(
block.getHeapMemory(),
compressedInput.position(),
compressedInput.available(),
uncompressed,
0);
checkArgument(uncompressedLength == uncompressed.length);
return uncompressed;
}
},
bytes -> decompressBlock(bytes, blockTrailer),
index);
return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator);
}

private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer blockTrailer) {
MemorySegment compressed = MemorySegment.wrap(compressedBytes);
int crc32cCode = crc32c(compressed, blockTrailer.getCompressionType());
checkArgument(
blockTrailer.getCrc32c() == crc32cCode,
String.format(
"Expected CRC32C(%d) but found CRC32C(%d) for file(%s)",
blockTrailer.getCrc32c(), crc32cCode, filePath));

// decompress data
BlockCompressionFactory compressionFactory =
BlockCompressionFactory.create(blockTrailer.getCompressionType());
if (compressionFactory == null) {
return compressedBytes;
} else {
MemorySliceInput compressedInput = MemorySlice.wrap(compressed).toInput();
byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
BlockDecompressor decompressor = compressionFactory.getDecompressor();
int uncompressedLength =
decompressor.decompress(
compressed.getHeapMemory(),
compressedInput.position(),
compressedInput.available(),
uncompressed,
0);
checkArgument(uncompressedLength == uncompressed.length);
return uncompressed;
}
}

@Override
public void close() throws IOException {
if (bloomFilter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,7 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) {
break;
case HADOOP_CATALOG:
case HIVE_CATALOG:
Path dbPath = table.location().getParent();
final String dbSuffix = ".db";
if (dbPath.getName().endsWith(dbSuffix)) {
String dbName =
dbPath.getName()
.substring(0, dbPath.getName().length() - dbSuffix.length());
String tableName = table.location().getName();
Path separatePath =
new Path(
dbPath.getParent(),
String.format("iceberg/%s/%s/metadata", dbName, tableName));
this.pathFactory = new IcebergPathFactory(separatePath);
} else {
throw new UnsupportedOperationException(
"Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse.");
}
this.pathFactory = new IcebergPathFactory(catalogTableMetadataPath(table));
break;
default:
throw new UnsupportedOperationException(
Expand All @@ -152,6 +137,24 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) {
this.manifestList = IcebergManifestList.create(table, pathFactory);
}

public static Path catalogTableMetadataPath(FileStoreTable table) {
Path icebergDBPath = catalogDatabasePath(table);
return new Path(icebergDBPath, String.format("%s/metadata", table.location().getName()));
}

public static Path catalogDatabasePath(FileStoreTable table) {
Path dbPath = table.location().getParent();
final String dbSuffix = ".db";
if (dbPath.getName().endsWith(dbSuffix)) {
String dbName =
dbPath.getName().substring(0, dbPath.getName().length() - dbSuffix.length());
return new Path(dbPath.getParent(), String.format("iceberg/%s/", dbName));
} else {
throw new UnsupportedOperationException(
"Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse.");
}
}

@Override
public void close() throws Exception {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public class IcebergOptions {
.withDescription(
"Should use the legacy manifest version to generate Iceberg's 1.4 manifest files.");

public static final ConfigOption<String> HIVE_CLIENT_CLASS =
key("metadata.iceberg.hive-client-class")
.stringType()
.defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
.withDescription("Hive client class name for Iceberg Hive Catalog.");

/** Where to store Iceberg metadata. */
public enum StorageType implements DescribedEnum {
DISABLED("disabled", "Disable Iceberg compatibility support."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -809,6 +810,68 @@ public void testDeletionVectorsWithFileIndexInFile() throws Exception {
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testDeletionVectorsWithParquetFilter() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(FILE_FORMAT, "parquet");
conf.set("parquet.block.size", "1048576");
conf.set("parquet.page.size", "1024");
});

BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();

BatchTableWrite write =
(BatchTableWrite)
writeBuilder
.newWrite()
.withIOManager(new IOManagerImpl(tempDir.toString()));

for (int i = 0; i < 200000; i++) {
write.write(rowData(1, i, i * 100L));
}

List<CommitMessage> messages = write.prepareCommit();
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);
write =
(BatchTableWrite)
writeBuilder
.newWrite()
.withIOManager(new IOManagerImpl(tempDir.toString()));
for (int i = 180000; i < 200000; i++) {
write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L));
}

messages = write.prepareCommit();
commit = writeBuilder.newCommit();
commit.commit(messages);

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
Random random = new Random();

for (int i = 0; i < 10; i++) {
int value = random.nextInt(180000);
TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter();
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
String.format(
"%d|%d|%d|binary|varbinary|mapKey:mapVal|multiset",
1, value, value * 100L)));
}

for (int i = 0; i < 10; i++) {
int value = 180000 + random.nextInt(20000);
TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter();
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty();
}
}

@Test
public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
FileStoreTable table =
Expand Down
Loading

0 comments on commit f18047e

Please sign in to comment.