Skip to content

Commit

Permalink
Merge branch 'master' into rest-catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 authored Dec 17, 2024
2 parents 6b77e13 + cdd5bb7 commit dbe6591
Show file tree
Hide file tree
Showing 197 changed files with 4,210 additions and 1,670 deletions.
2 changes: 2 additions & 0 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ To use this feature through `flink run`, run the following shell command.
--warehouse <warehouse-path> \
--database <database-name> \
[--table_mapping <table-name>=<paimon-table-name>] \
[--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_prefix <paimon-table-prefix>] \
[--table_suffix_db <paimon-table-suffix-by-db>] \
[--table_suffix <paimon-table-suffix>] \
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
Expand Down
40 changes: 37 additions & 3 deletions docs/content/concepts/spec/datafile.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,45 @@ relationship between various table types and buckets in Paimon:
The name of data file is `data-${uuid}-${id}.${format}`. For the append table, the file stores the data of the table
without adding any new columns. But for the primary key table, each row of data stores additional system columns:

1. `_VALUE_KIND`: row is deleted or added. Similar to RocksDB, each row of data can be deleted or added, which will be
## Table with Primary key Data File

1. Primary key columns, `_KEY_` prefix to key columns, this is to avoid conflicts with columns of the table. It's optional,
Paimon version 1.0 and above will retrieve the primary key fields from value_columns.
2. `_VALUE_KIND`: TINYINT, row is deleted or added. Similar to RocksDB, each row of data can be deleted or added, which will be
used for updating the primary key table.
2. `_SEQUENCE_NUMBER`: this number is used for comparison during updates, determining which data came first and which
3. `_SEQUENCE_NUMBER`: BIGINT, this number is used for comparison during updates, determining which data came first and which
data came later.
3. `_KEY_` prefix to key columns, this is to avoid conflicts with columns of the table.
4. Value columns. All columns declared in the table.

For example, data file for table:

```sql
CREATE TABLE T (
a INT PRIMARY KEY NOT ENFORCED,
b INT,
c INT
);
```

Its file has 6 columns: `_KEY_a`, `_VALUE_KIND`, `_SEQUENCE_NUMBER`, `a`, `b`, `c`.

When `data-file.thin-mode` enabled, its file has 5 columns: `_VALUE_KIND`, `_SEQUENCE_NUMBER`, `a`, `b`, `c`.

## Table w/o Primary key Data File

- Value columns. All columns declared in the table.

For example, data file for table:

```sql
CREATE TABLE T (
a INT,
b INT,
c INT
);
```

Its file has 3 columns: `a`, `b`, `c`.

## Changelog File

Expand Down
97 changes: 58 additions & 39 deletions docs/content/concepts/spec/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ under the License.

Manifest List includes meta of several manifest files. Its name contains UUID, it is a avro file, the schema is:

1. fileName: manifest file name.
2. fileSize: manifest file size.
3. numAddedFiles: number added files in manifest.
4. numDeletedFiles: number deleted files in manifest.
5. partitionStats: partition stats, the minimum and maximum values of partition fields in this manifest are beneficial
1. _FILE_NAME: STRING, manifest file name.
2. _FILE_SIZE: BIGINT, manifest file size.
3. _NUM_ADDED_FILES: BIGINT, number added files in manifest.
4. _NUM_DELETED_FILES: BIGINT, number deleted files in manifest.
5. _PARTITION_STATS: SimpleStats, partition stats, the minimum and maximum values of partition fields in this manifest are beneficial
for skipping certain manifest files during queries, it is a SimpleStats.
6. schemaId: schema id when writing this manifest file.
6. _SCHEMA_ID: BIGINT, schema id when writing this manifest file.

## Manifest

Expand All @@ -63,31 +63,31 @@ Data Manifest includes meta of several data files or changelog files.

The schema is:

1. kind: ADD or DELETE,
2. partition: partition spec, a BinaryRow.
3. bucket: bucket of this file.
4. totalBuckets: total buckets when write this file, it is used for verification after bucket changes.
5. file: data file meta.
1. _KIND: TINYINT, ADD or DELETE,
2. _PARTITION: BYTES, partition spec, a BinaryRow.
3. _BUCKET: INT, bucket of this file.
4. _TOTAL_BUCKETS: INT, total buckets when write this file, it is used for verification after bucket changes.
5. _FILE: data file meta.

The data file meta is:

1. fileName: file name.
2. fileSize: file size.
3. rowCount: total number of rows (including add & delete) in this file.
4. minKey: the minimum key of this file.
5. maxKey: the maximum key of this file.
6. keyStats: the statistics of the key.
7. valueStats: the statistics of the value.
8. minSequenceNumber: the minimum sequence number.
9. maxSequenceNumber: the maximum sequence number.
10. schemaId: schema id when write this file.
11. level: level of this file, in LSM.
12. extraFiles: extra files for this file, for example, data file index file.
13. creationTime: creation time of this file.
14. deleteRowCount: rowCount = addRowCount + deleteRowCount.
15. embeddedIndex: if data file index is too small, store the index in manifest.
16. fileSource: indicate whether this file is generated as an APPEND or COMPACT file
17. valueStatsCols: statistical column in metadata
1. _FILE_NAME: STRING, file name.
2. _FILE_SIZE: BIGINT, file size.
3. _ROW_COUNT: BIGINT, total number of rows (including add & delete) in this file.
4. _MIN_KEY: STRING, the minimum key of this file.
5. _MAX_KEY: STRING, the maximum key of this file.
6. _KEY_STATS: SimpleStats, the statistics of the key.
7. _VALUE_STATS: SimpleStats, the statistics of the value.
8. _MIN_SEQUENCE_NUMBER: BIGINT, the minimum sequence number.
9. _MAX_SEQUENCE_NUMBER: BIGINT, the maximum sequence number.
10. _SCHEMA_ID: BIGINT, schema id when write this file.
11. _LEVEL: INT, level of this file, in LSM.
12. _EXTRA_FILES: ARRAY<STRING>, extra files for this file, for example, data file index file.
13. _CREATION_TIME: TIMESTAMP_MILLIS, creation time of this file.
14. _DELETE_ROW_COUNT: BIGINT, rowCount = addRowCount + deleteRowCount.
15. _EMBEDDED_FILE_INDEX: BYTES, if data file index is too small, store the index in manifest.
16. _FILE_SOURCE: TINYINT, indicate whether this file is generated as an APPEND or COMPACT file
17. _VALUE_STATS_COLS: ARRAY<STRING>, statistical column in metadata

### Index Manifest

Expand All @@ -100,16 +100,35 @@ Index Manifest includes meta of several [table-index]({{< ref "concepts/spec/tab

The schema is:

1. kind: ADD or DELETE,
2. partition: partition spec, a BinaryRow.
3. bucket: bucket of this file.
4. indexFile: index file meta.
1. _KIND: TINYINT, ADD or DELETE,
2. _PARTITION: BYTES, partition spec, a BinaryRow.
3. _BUCKET: INT, bucket of this file.
4. _INDEX_TYPE: STRING, "HASH" or "DELETION_VECTORS".
5. _FILE_NAME: STRING, file name.
6. _FILE_SIZE: BIGINT, file size.
7. _ROW_COUNT: BIGINT, total number of rows.
8. _DELETIONS_VECTORS_RANGES: Metadata only used by "DELETION_VECTORS", is an array of deletion vector meta, the schema of each deletion vector meta is:
1. f0: the data file name corresponding to this deletion vector.
2. f1: the starting offset of this deletion vector in the index file.
3. f2: the length of this deletion vector in the index file.
4. _CARDINALITY: the number of deleted rows.

The index file meta is:
## Appendix

1. indexType: string, "HASH" or "DELETION_VECTORS".
2. fileName: file name.
3. fileSize: file size.
4. rowCount: total number of rows.
5. deletionVectorsRanges: Metadata only used by "DELETION_VECTORS", Stores offset and length of each data file,
The schema is `ARRAY<ROW<f0: STRING, f1: INT, f2: INT>>`.
### SimpleStats

SimpleStats is nested row, the schema is:

1. _MIN_VALUES: BYTES, BinaryRow, the minimum values of the columns.
2. _MAX_VALUES: BYTES, BinaryRow, the maximum values of the columns.
3. _NULL_COUNTS: ARRAY<BIGINT>, the number of nulls of the columns.

### BinaryRow

BinaryRow is backed by bytes instead of Object. It can significantly reduce the serialization/deserialization of Java
objects.

A Row has two part: Fixed-length part and variable-length part. Fixed-length part contains 1 byte header and null bit
set and field values. Null bit set is used for null tracking and is aligned to 8-byte word boundaries. `Field values`
holds fixed-length primitive types and variable-length values which can be stored in 8 bytes inside. If it do not fit
the variable-length field, then store the length and offset of variable-length part.
21 changes: 21 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,27 @@ All available procedures are listed below.
CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 1730292023000)
</td>
</tr>
<tr>
<td>purge_files</td>
<td>
-- for Flink 1.18<br/>
-- clear table with purge files directly.<br/>
CALL sys.purge_files('identifier')<br/><br/>
-- for Flink 1.19 and later<br/>
-- clear table with purge files directly.<br/>
CALL sys.purge_files(`table` => 'default.T')<br/><br/>
</td>
<td>
To clear table with purge files directly. Argument:
<li>identifier: the target table identifier. Cannot be empty.</li>
</td>
<td>
-- for Flink 1.18<br/>
CALL sys.purge_files('default.T')
-- for Flink 1.19 and later<br/>
CALL sys.purge_files(`table` => 'default.T')
</td>
</tr>
<tr>
<td>expire_snapshots</td>
<td>
Expand Down
6 changes: 0 additions & 6 deletions docs/content/maintenance/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ Options for paimon catalog.

{{< generated/catalog_configuration >}}

### FilesystemCatalogOptions

Options for Filesystem catalog.

{{< generated/file_system_catalog_configuration >}}

### HiveCatalogOptions

Options for Hive catalog.
Expand Down
2 changes: 1 addition & 1 deletion docs/content/program-api/flink-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public class WriteCdcToTable {
Identifier identifier = Identifier.create("my_db", "T");
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog.Loader catalogLoader =
CatalogLoader catalogLoader =
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalogLoader.load().getTable(identifier);

Expand Down
10 changes: 10 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ This section introduce all available spark procedures about paimon.
CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000)<br/><br/>
</td>
</tr>
<tr>
<td>purge_files</td>
<td>
To clear table with purge files directly. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
</td>
<td>
CALL sys.purge_files(table => 'default.T')<br/><br/>
</td>
</tr>
<tr>
<td>migrate_database</td>
<td>
Expand Down
12 changes: 6 additions & 6 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>allow-upper-case</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog.</td>
</tr>
<tr>
<td><h5>cache-enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down Expand Up @@ -74,6 +68,12 @@
<td>Integer</td>
<td>Controls the max number for snapshots per table in the catalog are cached.</td>
</tr>
<tr>
<td><h5>case-sensitive</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Indicates whether this catalog is case-sensitive.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@
<td>String</td>
<td>Specify the file name prefix of data files.</td>
</tr>
<tr>
<td><h5>data-file.thin-mode</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enable data file thin mode to avoid duplicate columns storage.</td>
</tr>
<tr>
<td><h5>delete-file.thread-num</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down

This file was deleted.

12 changes: 10 additions & 2 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,21 @@
<td><h5>--table_mapping</h5></td>
<td>The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix".</td>
</tr>
<tr>
<td><h5>--table_prefix_db</h5></td>
<td>The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
<td>The prefix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_prefix_db". For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
</tr>
<tr>
<td><h5>--table_suffix_db</h5></td>
<td>The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--table_suffix</h5></td>
<td>The suffix of all Paimon tables to be synchronized. The usage is same as "--table_prefix".</td>
<td>The suffix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_suffix_db". The usage is same as "--table_prefix".</td>
</tr>
<tr>
<td><h5>--including_tables</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public class ArrowBundleRecords implements BundleRecords {

private final VectorSchemaRoot vectorSchemaRoot;
private final RowType rowType;
private final boolean allowUpperCase;
private final boolean caseSensitive;

public ArrowBundleRecords(
VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean allowUpperCase) {
VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean caseSensitive) {
this.vectorSchemaRoot = vectorSchemaRoot;
this.rowType = rowType;
this.allowUpperCase = allowUpperCase;
this.caseSensitive = caseSensitive;
}

public VectorSchemaRoot getVectorSchemaRoot() {
Expand All @@ -52,7 +52,7 @@ public long rowCount() {

@Override
public Iterator<InternalRow> iterator() {
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, allowUpperCase);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, caseSensitive);
return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
}
}
Loading

0 comments on commit dbe6591

Please sign in to comment.