Skip to content

Commit

Permalink
[hive] Introduce metastore.tag-to-partition for Hive metastore
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Oct 16, 2023
1 parent d2ddaa9 commit 32f0842
Show file tree
Hide file tree
Showing 20 changed files with 560 additions and 42 deletions.
25 changes: 25 additions & 0 deletions docs/content/migration/_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
title: Migration
icon: <i class="fa fa-briefcase title maindish" aria-hidden="true"></i>
bold: true
bookCollapseSection: true
weight: 8
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
99 changes: 99 additions & 0 deletions docs/content/migration/upsert-to-partitioned.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
---
title: "Upsert To Partitioned"
weight: 1
type: docs
aliases:
- /migration/upsert-to-partitioned.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Upsert To Partitioned

The [Tag Management]({{< ref "maintenance/manage-tags" >}}) will maintain the manifests and data files of the snapshot.
A typical usage is creating tags daily, then you can maintain the historical data of each day for batch reading.

When using primary key tables, a non-partitioned approach is often used to maintain updates, in order to mirror and
synchronize tables from upstream database tables. This allows users to query the latest data. The tradition of Hive
data warehouses is not like this. Offline data warehouses require an immutable view every day to ensure the idempotence
of calculations. So we created a Tag mechanism to output these views.

However, the traditional use of Hive data warehouses is more accustomed to using partitions to specify the query's Tag,
and is more accustomed to using Hive computing engines.

So, we introduce `'metastore.tag-to-partition'` to mapping a non-partitioned primary key table to the partition table
in Hive metastore, and mapping the partition field to the name of the Tag to be fully compatible with Hive.

## Example

**Step 1: Create table and tag in Flink SQL**

{{< tabs "Create table and tag in Flink SQL" >}}
{{< tab "Flink" >}}
```sql
CREATE CATALOG my_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://<hive-metastore-host-name>:<port>',
-- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
-- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
'warehouse' = 'hdfs:///path/to/warehouse'
);

USE CATALOG my_hive;

CREATE TABLE mydb.T (
pk INT,
col1 STRING,
col2 STRING
) WITH (
'bucket' = '-1',
'metastore.tag-to-partition' = 'dt'
);

INSERT INTO t VALUES (1, '10', '100'), (2, '20', '200');

-- create tag '2023-10-16' for snapshot 1
CALL my_hive.system.create_tag('mydb.T', '2023-10-16', 1);
```

{{< /tab >}}
{{< /tabs >}}

**Step 2: Query table in Hive with Partition Pruning**

{{< tabs "Query table in Hive with Partition Pruning" >}}
{{< tab "Hive" >}}
```sql
SHOW PARTITIONS T;
/*
OK
dt=2023-10-16
*/

SELECT * FROM T WHERE dt='2023-10-16';
/*
OK
1 10 100 2023-10-16
2 20 200 2023-10-16
*/
```

{{< /tab >}}
{{< /tabs >}}
2 changes: 1 addition & 1 deletion docs/content/project/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ icon: <i class="fa fa-sitemap title maindish" aria-hidden="true"></i>
bold: true
bookCollapseSection: true
sectionBreak: true
weight: 7
weight: 9
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand Down
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@
For example, if you want to list all partitions of a Paimon table in Hive, you need to create this table as a partitioned table in Hive metastore.
This config option does not affect the default filesystem metastore.</td>
</tr>
<tr>
<td><h5>metastore.tag-to-partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Whether to create this table as a partitioned table for mapping non-partitioned table tags in metastore. This allows the Hive engine to view this table in a partitioned table view and use partitioning field to read specific partitions (specific tags).</td>
</tr>
<tr>
<td><h5>num-levels</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -557,6 +563,18 @@
<td><p>Enum</p></td>
<td>Whether to create tag automatically. And how to generate tags.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li></ul></td>
</tr>
<tr>
<td><h5>tag.callback.#.param</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Parameter string for the constructor of class #. Callback class should parse the parameter by itself.</td>
</tr>
<tr>
<td><h5>tag.callbacks</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>A list of commit callback classes to be called after a successful tag. Class names are connected with comma (example: com.test.CallbackA,com.sample.CallbackB).</td>
</tr>
<tr>
<td><h5>tag.creation-delay</h5></td>
<td style="word-wrap: break-word;">0 ms</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>read.changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to read row in the form of changelog (add rowkind column in row to represent its change type).</td>
</tr>
<tr>
<td><h5>read.stream.maxBytesPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -56,12 +62,6 @@
<td>Long</td>
<td>The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.</td>
</tr>
<tr>
<td><h5>read.changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to read row in the form of changelog (add rowkind column in row to represent its change type).</td>
</tr>
<tr>
<td><h5>write.merge-schema</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
46 changes: 44 additions & 2 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
Expand Down Expand Up @@ -797,6 +799,23 @@ public class CoreOptions implements Serializable {
"Parameter string for the constructor of class #. "
+ "Callback class should parse the parameter by itself.");

public static final ConfigOption<String> TAG_CALLBACKS =
key("tag.callbacks")
.stringType()
.defaultValue("")
.withDescription(
"A list of commit callback classes to be called after a successful tag. "
+ "Class names are connected with comma "
+ "(example: com.test.CallbackA,com.sample.CallbackB).");

public static final ConfigOption<String> TAG_CALLBACK_PARAM =
key("tag.callback.#.param")
.stringType()
.noDefaultValue()
.withDescription(
"Parameter string for the constructor of class #. "
+ "Callback class should parse the parameter by itself.");

public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE =
key("metastore.partitioned-table")
.booleanType()
Expand All @@ -807,6 +826,15 @@ public class CoreOptions implements Serializable {
+ "you need to create this table as a partitioned table in Hive metastore.\n"
+ "This config option does not affect the default filesystem metastore.");

public static final ConfigOption<String> METASTORE_TAG_TO_PARTITION =
key("metastore.tag-to-partition")
.stringType()
.noDefaultValue()
.withDescription(
"Whether to create this table as a partitioned table for mapping non-partitioned table tags in metastore. "
+ "This allows the Hive engine to view this table in a partitioned table view and "
+ "use partitioning field to read specific partitions (specific tags).");

public static final ConfigOption<TagCreationMode> TAG_AUTOMATIC_CREATION =
key("tag.automatic-creation")
.enumType(TagCreationMode.class)
Expand Down Expand Up @@ -1284,6 +1312,11 @@ public boolean partitionedTableInMetastore() {
return options.get(METASTORE_PARTITIONED_TABLE);
}

@Nullable
public String tagToPartitionField() {
return options.get(METASTORE_TAG_TO_PARTITION);
}

public TagCreationMode tagCreationMode() {
return options.get(TAG_AUTOMATIC_CREATION);
}
Expand Down Expand Up @@ -1319,14 +1352,23 @@ public Map<String, String> getFieldDefaultValues() {
}

public Map<String, String> commitCallbacks() {
return callbacks(COMMIT_CALLBACKS, COMMIT_CALLBACK_PARAM);
}

public Map<String, String> tagCallbacks() {
return callbacks(TAG_CALLBACKS, TAG_CALLBACK_PARAM);
}

private Map<String, String> callbacks(
ConfigOption<String> callbacks, ConfigOption<String> callbackParam) {
Map<String, String> result = new HashMap<>();
for (String className : options.get(COMMIT_CALLBACKS).split(",")) {
for (String className : options.get(callbacks).split(",")) {
className = className.trim();
if (className.length() == 0) {
continue;
}

String param = options.get(COMMIT_CALLBACK_PARAM.key().replace("#", className));
String param = options.get(callbackParam.key().replace("#", className));
result.put(className, param);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ public JoinedRow replace(InternalRow row1, InternalRow row2) {
return this;
}

public InternalRow row1() {
return row1;
}

public InternalRow row2() {
return row2;
}

// ---------------------------------------------------------------------------------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metastore;

import org.apache.paimon.table.sink.TagCallback;

import java.util.LinkedHashMap;

/** A {@link TagCallback} to add newly created partitions to metastore. */
public class AddPartitionTagCallback implements TagCallback {

private final MetastoreClient client;
private final String partitionField;

public AddPartitionTagCallback(MetastoreClient client, String partitionField) {
this.client = client;
this.partitionField = partitionField;
}

@Override
public void notifyCreation(String tagName) {
LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
partitionSpec.put(partitionField, tagName);
try {
client.addPartition(partitionSpec);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {
client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.BinaryRow;

import java.io.Serializable;
import java.util.LinkedHashMap;

/**
* A metastore client related to a table. All methods of this interface operate on the same specific
Expand All @@ -30,6 +31,8 @@ public interface MetastoreClient extends AutoCloseable {

void addPartition(BinaryRow partition) throws Exception;

void addPartition(LinkedHashMap<String, String> partitionSpec) throws Exception;

/** Factory to create {@link MetastoreClient}. */
interface Factory extends Serializable {

Expand Down
Loading

0 comments on commit 32f0842

Please sign in to comment.