Skip to content

Commit

Permalink
Merge upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
AshishKhatkar committed Dec 18, 2024
2 parents fed801a + 2db0281 commit 348a250
Show file tree
Hide file tree
Showing 265 changed files with 5,231 additions and 1,639 deletions.
5 changes: 5 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
from https://parquet.apache.org/ version 1.14.0

paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantUtil.java
from https://spark.apache.org/ version 4.0.0-preview2

MIT License
-----------

Expand Down
6 changes: 4 additions & 2 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,14 @@ To use this feature through `flink run`, run the following shell command.
--warehouse <warehouse-path> \
--database <database-name> \
[--table_mapping <table-name>=<paimon-table-name>] \
[--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_prefix <paimon-table-prefix>] \
[--table_suffix_db <paimon-table-suffix-by-db>] \
[--table_suffix <paimon-table-suffix>] \
[--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_suffix_db <paimon-table-suffix-by-db>] \
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
[--including_dbs <database-name|name-regular-expr>] \
[--excluding_dbs <database-name|name-regular-expr>] \
[--type_mapping to-string] \
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
Expand Down
16 changes: 16 additions & 0 deletions docs/content/flink/sql-alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,19 @@ The following SQL modifies the watermark strategy to `ts - INTERVAL '2' HOUR`.
```sql
ALTER TABLE my_table MODIFY WATERMARK FOR ts AS ts - INTERVAL '2' HOUR
```

# ALTER DATABASE

The following SQL sets one or more properties in the specified database. If a particular property is already set in the database, override the old value with the new one.

```sql
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
```

## Altering Database Location

The following SQL changes location of database `my_database` to `file:/temp/my_database`.

```sql
ALTER DATABASE my_database SET ('location' = 'file:/temp/my_database')
```
6 changes: 0 additions & 6 deletions docs/content/maintenance/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ Options for paimon catalog.

{{< generated/catalog_configuration >}}

### FilesystemCatalogOptions

Options for Filesystem catalog.

{{< generated/file_system_catalog_configuration >}}

### HiveCatalogOptions

Options for Hive catalog.
Expand Down
26 changes: 25 additions & 1 deletion docs/content/program-api/catalog-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class ListDatabases {

## Drop Database

You can use the catalog to drop databases.
You can use the catalog to drop database.

```java
import org.apache.paimon.catalog.Catalog;
Expand All @@ -102,6 +102,30 @@ public class DropDatabase {
}
```

## Alter Database

You can use the catalog to alter database's properties.(ps: only support hive and jdbc catalog)

```java
import java.util.ArrayList;
import org.apache.paimon.catalog.Catalog;

public class AlterDatabase {

public static void main(String[] args) {
try {
Catalog catalog = CreateCatalog.createHiveCatalog();
List<DatabaseChange> changes = new ArrayList<>();
changes.add(DatabaseChange.setProperty("k1", "v1"));
changes.add(DatabaseChange.removeProperty("k2"));
catalog.alterDatabase("my_db", changes, true);
} catch (Catalog.DatabaseNotExistException e) {
// do something
}
}
}
```

## Determine Whether Table Exists

You can use the catalog to determine whether the table exists
Expand Down
2 changes: 1 addition & 1 deletion docs/content/program-api/flink-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public class WriteCdcToTable {
Identifier identifier = Identifier.create("my_db", "T");
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog.Loader catalogLoader =
CatalogLoader catalogLoader =
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalogLoader.load().getTable(identifier);

Expand Down
18 changes: 18 additions & 0 deletions docs/content/spark/sql-alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,21 @@ The following SQL changes the type of a nested column `f2` to `BIGINT` in a stru
-- column v previously has type MAP<INT, STRUCT<f1: STRING, f2: INT>>
ALTER TABLE my_table ALTER COLUMN v.value.f2 TYPE BIGINT;
```


# ALTER DATABASE

The following SQL sets one or more properties in the specified database. If a particular property is already set in the database, override the old value with the new one.

```sql
ALTER { DATABASE | SCHEMA | NAMESPACE } my_database
SET { DBPROPERTIES | PROPERTIES } ( property_name = property_value [ , ... ] )
```

## Altering Database Location

The following SQL sets the location of the specified database to `file:/temp/my_database.db`.

```sql
ALTER DATABASE my_database SET LOCATION 'file:/temp/my_database.db'
```
14 changes: 7 additions & 7 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,11 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>allow-upper-case</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog.</td>
</tr>
<tr>
<td><h5>cache-enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Controls whether the catalog will cache databases, tables and manifests.</td>
<td>Controls whether the catalog will cache databases, tables, manifests and partitions.</td>
</tr>
<tr>
<td><h5>cache.expiration-interval</h5></td>
Expand Down Expand Up @@ -74,6 +68,12 @@
<td>Integer</td>
<td>Controls the max number for snapshots per table in the catalog are cached.</td>
</tr>
<tr>
<td><h5>case-sensitive</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Indicates whether this catalog is case-sensitive.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
12 changes: 6 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@
<td>String</td>
<td>Specify the file name prefix of data files.</td>
</tr>
<tr>
<td><h5>data-file.thin-mode</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enable data file thin mode to avoid duplicate columns storage.</td>
</tr>
<tr>
<td><h5>delete-file.thread-num</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -864,12 +870,6 @@
<td>Integer</td>
<td>Default spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.</td>
</tr>
<tr>
<td><h5>data-file.thin-mode</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enable data file thin mode to avoid duplicate columns storage.</td>
</tr>
<tr>
<td><h5>streaming-read-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down

This file was deleted.

26 changes: 17 additions & 9 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@
<td><h5>--table_mapping</h5></td>
<td>The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix".</td>
</tr>
<tr>
<td><h5>--table_prefix_db</h5></td>
<td>The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_prefix_db". For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
</tr>
<tr>
<td><h5>--table_suffix_db</h5></td>
<td>The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--table_suffix</h5></td>
<td>The suffix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_suffix_db". The usage is same as "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_prefix_db</h5></td>
<td>The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_suffix_db</h5></td>
<td>The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--including_tables</h5></td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including_tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td>
Expand All @@ -65,6 +65,14 @@
<td><h5>--excluding_tables</h5></td>
<td>It is used to specify which source tables are not to be synchronized. The usage is same as "--including_tables". "--excluding_tables" has higher priority than "--including_tables" if you specified both.</td>
</tr>
<tr>
<td><h5>--including_dbs</h5></td>
<td>It is used to specify the databases within which the tables are to be synchronized. The usage is same as "--including_tables".</td>
</tr>
<tr>
<td><h5>--excluding_dbs</h5></td>
<td>It is used to specify the databases within which the tables are not to be synchronized. The usage is same as "--excluding_tables". "--excluding_dbs" has higher priority than "--including_dbs" if you specified both.</td>
</tr>
<tr>
<td><h5>--type_mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon type.<br />
Expand Down Expand Up @@ -114,4 +122,4 @@
<td>The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of table configurations.</td>
</tr>
</tbody>
</table>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public class ArrowBundleRecords implements BundleRecords {

private final VectorSchemaRoot vectorSchemaRoot;
private final RowType rowType;
private final boolean allowUpperCase;
private final boolean caseSensitive;

public ArrowBundleRecords(
VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean allowUpperCase) {
VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean caseSensitive) {
this.vectorSchemaRoot = vectorSchemaRoot;
this.rowType = rowType;
this.allowUpperCase = allowUpperCase;
this.caseSensitive = caseSensitive;
}

public VectorSchemaRoot getVectorSchemaRoot() {
Expand All @@ -52,7 +52,7 @@ public long rowCount() {

@Override
public Iterator<InternalRow> iterator() {
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, allowUpperCase);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, caseSensitive);
return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;

/** Utilities for creating Arrow objects. */
public class ArrowUtils {

Expand All @@ -66,13 +68,13 @@ public static VectorSchemaRoot createVectorSchemaRoot(
}

public static VectorSchemaRoot createVectorSchemaRoot(
RowType rowType, BufferAllocator allocator, boolean allowUpperCase) {
RowType rowType, BufferAllocator allocator, boolean caseSensitive) {
List<Field> fields =
rowType.getFields().stream()
.map(
f ->
toArrowField(
allowUpperCase ? f.name() : f.name().toLowerCase(),
toLowerCaseIfNeed(f.name(), caseSensitive),
f.id(),
f.type(),
0))
Expand All @@ -81,9 +83,9 @@ public static VectorSchemaRoot createVectorSchemaRoot(
}

public static FieldVector createVector(
DataField dataField, BufferAllocator allocator, boolean allowUpperCase) {
DataField dataField, BufferAllocator allocator, boolean caseSensitive) {
return toArrowField(
allowUpperCase ? dataField.name() : dataField.name().toLowerCase(),
toLowerCaseIfNeed(dataField.name(), caseSensitive),
dataField.id(),
dataField.type(),
0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@
import java.util.Iterator;
import java.util.List;

import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;

/** Reader from a {@link VectorSchemaRoot} to paimon rows. */
public class ArrowBatchReader {

private final InternalRowSerializer internalRowSerializer;
private final VectorizedColumnBatch batch;
private final Arrow2PaimonVectorConverter[] convertors;
private final RowType projectedRowType;
private final boolean allowUpperCase;
private final boolean caseSensitive;

public ArrowBatchReader(RowType rowType, boolean allowUpperCase) {
public ArrowBatchReader(RowType rowType, boolean caseSensitive) {
this.internalRowSerializer = new InternalRowSerializer(rowType);
ColumnVector[] columnVectors = new ColumnVector[rowType.getFieldCount()];
this.convertors = new Arrow2PaimonVectorConverter[rowType.getFieldCount()];
Expand All @@ -53,7 +55,7 @@ public ArrowBatchReader(RowType rowType, boolean allowUpperCase) {
for (int i = 0; i < columnVectors.length; i++) {
this.convertors[i] = Arrow2PaimonVectorConverter.construct(rowType.getTypeAt(i));
}
this.allowUpperCase = allowUpperCase;
this.caseSensitive = caseSensitive;
}

public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
Expand All @@ -63,8 +65,7 @@ public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
for (int i = 0; i < dataFields.size(); ++i) {
try {
String fieldName = dataFields.get(i).name();
Field field =
arrowSchema.findField(allowUpperCase ? fieldName : fieldName.toLowerCase());
Field field = arrowSchema.findField(toLowerCaseIfNeed(fieldName, caseSensitive));
int idx = arrowSchema.getFields().indexOf(field);
mapping[i] = idx;
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public class ArrowFormatCWriter implements AutoCloseable {
private final ArrowSchema schema;
private final ArrowFormatWriter realWriter;

public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean allowUpperCase) {
this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, allowUpperCase);
public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean caseSensitive) {
this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive);
RootAllocator allocator = realWriter.getAllocator();
array = ArrowArray.allocateNew(allocator);
schema = ArrowSchema.allocateNew(allocator);
Expand Down
Loading

0 comments on commit 348a250

Please sign in to comment.