Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink]Support flink migrate hive database #2827

Merged
merged 9 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion docs/content/migration/migration-from-hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ When migrating data to a paimon table, the origin table will be permanently disa
still need the original table. The migrated table will be [unaware-bucket append table]({{< ref "concepts/append-table/append-scalable-table" >}}).

Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon.
At the same time, you can use paimon hive catalog with Migrate Database Procedure to fully synchronize all tables in the database to paimon.

* Migrate Table Procedure: Paimon table does not exist, use the procedure upgrade hive table to paimon table. Hive table will disappear after action done.
* Migrate Database Procedure: Paimon table does not exist, use the procedure upgrade all hive tables in database to paimon table. All hive tables will disappear after action done.
* Migrate File Procedure: Paimon table already exists, use the procedure to migrate files from hive table to paimon table. **Notice that, Hive table will also disappear after action done.**

These two actions now support file format of hive "orc" and "parquet" and "avro".
These three actions now support file format of hive "orc" and "parquet" and "avro".

<span style="color: red; "> **We highly recommend to back up hive table data before migrating, because migrating action is not atomic. If been interrupted while migrating, you may lose your data.** </span>

Expand Down Expand Up @@ -86,6 +88,51 @@ Example:
--table default.hive_or_paimon \
```

**Migrate Hive Database**

Command: <br>

***CALL <font color="green">sys.migrate_database</font>(&#39;hive&#39;, &#39;&lt;hive_database&gt;&#39;, &#39;&lt;paimon_tableconf&gt;&#39;);***

**Example**

```sql
CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');

USE CATALOG PAIMON;

CALL sys.migrate_database('hive', 'default', 'file.format=orc');
```
After invoke, all tables in "default" database will totally convert to paimon format. Writing and reading the table by old "hive way" will fail.
We can add our table properties while importing by sys.migrate_database('<database>', '<tableproperties>').
<tableproperties> here should be separated by ",". For example:

```sql
CALL sys.migrate_database('hive', 'my_db', 'file.format=orc,read.batch-size=2096,write-only=true')
```

If your flink version is below 1.17, you can use flink action to achieve this:
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
migrate_databse
--warehouse <warehouse-path> \
--source_type hive \
--database <database> \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--options <paimon-table-conf [,paimon-table-conf ...]> ]
```

Example:
```bash
<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar migrate_table \
--warehouse /path/to/warehouse \
--catalog_conf uri=thrift://localhost:9083 \
--catalog_conf metastore=hive \
--source_type hive \
--database default \
```

**Migrate Hive File**

Command: <br>
Expand Down
37 changes: 37 additions & 0 deletions paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,43 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.procedure.MigrateDatabaseProcedure;

import org.apache.flink.table.procedure.DefaultProcedureContext;

import java.util.Map;

/** Migrate from external all hive table in database to paimon table. */
public class MigrateDatabaseAction extends ActionBase {
private final String connector;
private final String hiveDatabaseName;
private final String tableProperties;

public MigrateDatabaseAction(
String connector,
String warehouse,
String hiveDatabaseName,
Map<String, String> catalogConfig,
String tableProperties) {
super(warehouse, catalogConfig);
this.connector = connector;
this.hiveDatabaseName = hiveDatabaseName;
this.tableProperties = tableProperties;
}

@Override
public void run() throws Exception {
MigrateDatabaseProcedure migrateDatabaseProcedure = new MigrateDatabaseProcedure();
migrateDatabaseProcedure.withCatalog(catalog);
migrateDatabaseProcedure.call(
new DefaultProcedureContext(env), connector, hiveDatabaseName, tableProperties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Action Factory for {@link MigrateDatabaseAction}. */
public class MigrateDatabaseActionFactory implements ActionFactory {

public static final String IDENTIFIER = "migrate_database";

private static final String SOURCE_TYPE = "source_type";
private static final String OPTIONS = "options";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
String warehouse = params.get(WAREHOUSE);
String connector = params.get(SOURCE_TYPE);
String sourceHiveDatabase = params.get(DATABASE);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);

MigrateDatabaseAction migrateDatabaseAction =
new MigrateDatabaseAction(
connector, warehouse, sourceHiveDatabase, catalogConfig, tableConf);
return Optional.of(migrateDatabaseAction);
}

@Override
public void printHelp() {
System.out.println(
"Action \"migrate_database\" migrate all tables in database from hive to paimon.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" migrate_database --warehouse <warehouse_path> --source_type hive "
+ "--database <database_name> "
+ "[--catalog_conf <key>=<value] "
+ "[--options <key>=<value>,<key>=<value>,...]");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/** Migrate procedure to migrate all hive tables in database to paimon table. */
public class MigrateDatabaseProcedure extends ProcedureBase {

private static final Logger LOG = LoggerFactory.getLogger(MigrateDatabaseProcedure.class);
private static final String PAIMON_SUFFIX = "_paimon_";

@Override
public String identifier() {
return "migrate_database";
}

public String[] call(
ProcedureContext procedureContext, String connector, String sourceDatabasePath)
throws Exception {
return call(procedureContext, connector, sourceDatabasePath, "");
}

public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceDatabasePath,
String properties)
throws Exception {
if (!(catalog instanceof HiveCatalog)) {
throw new IllegalArgumentException("Only support Hive Catalog");
}
HiveCatalog hiveCatalog = (HiveCatalog) this.catalog;
IMetaStoreClient client = hiveCatalog.getHmsClient();
List<String> sourceTables = client.getAllTables(sourceDatabasePath);
for (String sourceTable : sourceTables) {
String sourceTablePath = sourceDatabasePath + "." + sourceTable;
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);

TableMigrationUtils.getImporter(
connector,
(HiveCatalog) this.catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();

LOG.info("rename " + targetTableId + " to " + sourceTableId);
this.catalog.renameTable(targetTableId, sourceTableId, false);
}
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.MigrateTableActionFactory
org.apache.paimon.flink.action.MigrateDatabaseActionFactory
org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
org.apache.paimon.flink.action.QueryServiceActionFactory

Expand All @@ -39,6 +40,7 @@ org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
Expand Down
Loading
Loading