Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into docs-adjusting-fl…
Browse files Browse the repository at this point in the history
…ink-ddl-catalogue
  • Loading branch information
yangjf2019 committed Dec 9, 2024
2 parents 275e430 + e18f6ed commit e97fb26
Show file tree
Hide file tree
Showing 24 changed files with 292 additions and 52 deletions.
39 changes: 27 additions & 12 deletions docs/content/spark/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 +26,54 @@ under the License.

# SQL Write

## Syntax
## Insert Table

The `INSERT` statement inserts new rows into a table or overwrites the existing data in the table. The inserted rows can be specified by value expressions or result from a query.

**Syntax**

```sql
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };
```
**Parameters**

- **table_identifier**: Specifies a table name, which may be optionally qualified with a database name.

- **part_spec**: An optional parameter that specifies a comma-separated list of key and value pairs for partitions.

For more information, please check the syntax document:
- **column_list**: An optional parameter that specifies a comma-separated list of columns belonging to the table_identifier table. Spark will reorder the columns of the input query to match the table schema according to the specified column list.

[Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html)
Note: Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target table will automatically add the corresponding default values for the remaining columns (or NULL for any column lacking an explicitly-assigned default value). In Spark 3.3 or earlier, column_list's size must be equal to the target table's column size, otherwise these commands would have failed.

## INSERT INTO
- **value_expr** ( { value | NULL } [ , … ] ) [ , ( … ) ]: Specifies the values to be inserted. Either an explicitly specified value or a NULL can be inserted. A comma must be used to separate each value in the clause. More than one set of values can be specified to insert multiple rows.

For more information, please check the syntax document: [Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html)

### Insert Into

Use `INSERT INTO` to apply records and changes to tables.

```sql
INSERT INTO my_table SELECT ...
```

## Overwriting the Whole Table
### Insert Overwrite

Use `INSERT OVERWRITE` to overwrite the whole unpartitioned table.
Use `INSERT OVERWRITE` to overwrite the whole table.

```sql
INSERT OVERWRITE my_table SELECT ...
```

### Overwriting a Partition
#### Insert Overwrite Partition

Use `INSERT OVERWRITE` to overwrite a partition.

```sql
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
```

### Dynamic Overwrite
#### Dynamic Overwrite Partition

Spark's default overwrite mode is `static` partition overwrite. To enable dynamic overwritten you need to set the Spark session configuration `spark.sql.sources.partitionOverwriteMode` to `dynamic`

Expand Down Expand Up @@ -97,13 +110,15 @@ SELECT * FROM my_table;
*/
```

## Truncate tables
## Truncate Table

The `TRUNCATE TABLE` statement removes all the rows from a table or partition(s).

```sql
TRUNCATE TABLE my_table;
```

## Updating tables
## Update Table

spark supports update PrimitiveType and StructType, for example:

Expand All @@ -125,13 +140,13 @@ UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;
```

## Deleting from table
## Delete From Table

```sql
DELETE FROM my_table WHERE currency = 'UNKNOWN';
```

## Merging into table
## Merge Into Table

Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of updates, insertions and deletions based on a source table in a single commit.

Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,12 @@
<td>Duration</td>
<td>The check interval of partition expiration.</td>
</tr>
<tr>
<td><h5>partition.expiration-max-num</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>The default deleted num of partition expiration.</td>
</tr>
<tr>
<td><h5>partition.expiration-strategy</h5></td>
<td style="word-wrap: break-word;">values-time</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofHours(1))
.withDescription("The check interval of partition expiration.");

public static final ConfigOption<Integer> PARTITION_EXPIRATION_MAX_NUM =
key("partition.expiration-max-num")
.intType()
.defaultValue(100)
.withDescription("The default deleted num of partition expiration.");

public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER =
key("partition.timestamp-formatter")
.stringType()
Expand Down Expand Up @@ -2126,6 +2132,10 @@ public Duration partitionExpireCheckInterval() {
return options.get(PARTITION_EXPIRATION_CHECK_INTERVAL);
}

public int partitionExpireMaxNum() {
return options.get(PARTITION_EXPIRATION_MAX_NUM);
}

public PartitionExpireStrategy partitionExpireStrategy() {
return options.get(PARTITION_EXPIRATION_STRATEGY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ public PartitionExpire newPartitionExpire(String commitUser) {
newScan(),
newCommit(commitUser),
metastoreClient,
options.endInputCheckPartitionExpire());
options.endInputCheckPartitionExpire(),
options.partitionExpireMaxNum());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class PartitionExpire {
private LocalDateTime lastCheck;
private final PartitionExpireStrategy strategy;
private final boolean endInputCheckPartitionExpire;
private int maxExpires;
private int maxExpireNum;

public PartitionExpire(
Duration expirationTime,
Expand All @@ -63,7 +63,8 @@ public PartitionExpire(
FileStoreScan scan,
FileStoreCommit commit,
@Nullable MetastoreClient metastoreClient,
boolean endInputCheckPartitionExpire) {
boolean endInputCheckPartitionExpire,
int maxExpireNum) {
this.expirationTime = expirationTime;
this.checkInterval = checkInterval;
this.strategy = strategy;
Expand All @@ -72,7 +73,7 @@ public PartitionExpire(
this.metastoreClient = metastoreClient;
this.lastCheck = LocalDateTime.now();
this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
this.maxExpires = Integer.MAX_VALUE;
this.maxExpireNum = maxExpireNum;
}

public PartitionExpire(
Expand All @@ -81,17 +82,26 @@ public PartitionExpire(
PartitionExpireStrategy strategy,
FileStoreScan scan,
FileStoreCommit commit,
@Nullable MetastoreClient metastoreClient) {
this(expirationTime, checkInterval, strategy, scan, commit, metastoreClient, false);
@Nullable MetastoreClient metastoreClient,
int maxExpireNum) {
this(
expirationTime,
checkInterval,
strategy,
scan,
commit,
metastoreClient,
false,
maxExpireNum);
}

public PartitionExpire withLock(Lock lock) {
this.commit.withLock(lock);
return this;
}

public PartitionExpire withMaxExpires(int maxExpires) {
this.maxExpires = maxExpires;
public PartitionExpire withMaxExpireNum(int maxExpireNum) {
this.maxExpireNum = maxExpireNum;
return this;
}

Expand Down Expand Up @@ -145,6 +155,7 @@ private List<Map<String, String>> doExpire(

List<Map<String, String>> expired = new ArrayList<>();
if (!expiredPartValues.isEmpty()) {
// convert partition value to partition string, and limit the partition num
expired = convertToPartitionString(expiredPartValues);
LOG.info("Expire Partitions: {}", expired);
if (metastoreClient != null) {
Expand Down Expand Up @@ -175,7 +186,7 @@ private List<Map<String, String>> convertToPartitionString(
.sorted()
.map(s -> s.split(DELIMITER))
.map(strategy::toPartitionString)
.limit(Math.min(expiredPartValues.size(), maxExpires))
.limit(Math.min(expiredPartValues.size(), maxExpireNum))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ public String[] call(
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpires(maxExpires);
partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public ExpirePartitionsAction(
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ public String identifier() {
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpires(maxExpires);
partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,16 @@ protected DataStreamSink<?> buildDynamicBucketSink(
}

protected DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
int bucketNums = table.bucketSpec().getNumBuckets();
if (parallelism == null
&& bucketNums < input.getParallelism()
&& table.partitionKeys().isEmpty()) {
// For non-partitioned table, if the bucketNums is less than job parallelism.
LOG.warn(
"For non-partitioned table, if bucketNums is less than the parallelism of inputOperator,"
+ " then the parallelism of writerOperator will be set to bucketNums.");
parallelism = bucketNums;
}
DataStream<InternalRow> partitioned =
partition(
input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class FileStoreSourceReaderMetrics {
private long lastSplitUpdateTime = UNDEFINED;

public static final long UNDEFINED = -1L;
public static final long ACTIVE = Long.MAX_VALUE;

public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) {
sourceReaderMetricGroup.gauge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ public class ReadOperator extends AbstractStreamOperator<RowData>
private transient IOManager ioManager;

private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
// we create our own gauge for currentEmitEventTimeLag, because this operator is not a FLIP-27
// we create our own gauge for currentEmitEventTimeLag and sourceIdleTime, because this operator
// is not a FLIP-27
// source and Flink can't automatically calculate this metric
private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED;
private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
private transient Counter numRecordsIn;

public ReadOperator(ReadBuilder readBuilder) {
Expand All @@ -69,6 +71,7 @@ public void open() throws Exception {

this.sourceReaderMetrics = new FileStoreSourceReaderMetrics(getMetricGroup());
getMetricGroup().gauge(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, () -> emitEventTimeLag);
getMetricGroup().gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime);
this.numRecordsIn =
InternalSourceReaderMetricGroup.wrap(getMetricGroup())
.getIOMetricGroup()
Expand All @@ -83,6 +86,7 @@ public void open() throws Exception {
this.read = readBuilder.newRead().withIOManager(ioManager);
this.reuseRow = new FlinkRowData(null);
this.reuseRecord = new StreamRecord<>(reuseRow);
this.idlingStarted();
}

@Override
Expand All @@ -94,6 +98,8 @@ public void processElement(StreamRecord<Split> record) throws Exception {
.earliestFileCreationEpochMillis()
.orElse(FileStoreSourceReaderMetrics.UNDEFINED);
sourceReaderMetrics.recordSnapshotUpdate(eventTime);
// update idleStartTime when reading a new split
idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;

boolean firstRecord = true;
try (CloseableIterator<InternalRow> iterator =
Expand All @@ -113,6 +119,8 @@ public void processElement(StreamRecord<Split> record) throws Exception {
output.collect(reuseRecord);
}
}
// start idle when data sending is completed
this.idlingStarted();
}

@Override
Expand All @@ -122,4 +130,18 @@ public void close() throws Exception {
ioManager.close();
}
}

private void idlingStarted() {
if (!isIdling()) {
idleStartTime = System.currentTimeMillis();
}
}

private boolean isIdling() {
return idleStartTime != FileStoreSourceReaderMetrics.ACTIVE;
}

private long getIdleTime() {
return isIdling() ? System.currentTimeMillis() - idleStartTime : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,43 @@ public void testNullPartitionExpire() {
.containsExactly("No expired partitions.");
}

@Test
public void testExpirePartitionsWithDefaultNum() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ " 'bucket' = '1',"
+ " 'partition.expiration-max-num'='2'"
+ ")");
FileStoreTable table = paimonTable("T");

sql("INSERT INTO T VALUES ('a', '2024-06-01')");
sql("INSERT INTO T VALUES ('b', '2024-06-02')");
sql("INSERT INTO T VALUES ('c', '2024-06-03')");
// This partition never expires.
sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
Function<InternalRow, String> consumerReadResult =
(InternalRow row) -> row.getString(0) + ":" + row.getString(1);

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder(
"a:2024-06-01", "b:2024-06-02", "c:2024-06-03", "Never-expire:9999-09-09");

assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", expiration_time => '1 d'"
+ ", timestamp_formatter => 'yyyy-MM-dd')"))
.containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder("c:2024-06-03", "Never-expire:9999-09-09");
}

/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()
Expand Down
Loading

0 comments on commit e97fb26

Please sign in to comment.