-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[flink][hive] Introduce procedure to migrate table from hive to paimon (
- Loading branch information
1 parent
34a619f
commit c7a17b2
Showing
26 changed files
with
1,476 additions
and
53 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
--- | ||
title: "Migration From Hive" | ||
weight: 1 | ||
type: docs | ||
aliases: | ||
- /migration/migration-from-hive.html | ||
--- | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
|
||
# Hive Table Migration | ||
|
||
Apache Hive supports ORC, Parquet file formats that could be migrated to Paimon. | ||
When migrating data to a paimon table, the origin table will be permanently disappeared. So please back up your data if you | ||
still need the original table. The migrated table will be [unaware-bucket append-only table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}). | ||
|
||
Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon. | ||
|
||
* Migrate Table Procedure: Paimon table does not exist, use the procedure upgrade hive table to paimon table. Hive table will disappear after action done. | ||
* Migrate File Procedure: Paimon table already exists, use the procedure to migrate files from hive table to paimon table. **Notice that, Hive table will also disappear after action done.** | ||
|
||
These two actions now only support file format of hive "orc" and "parquet", if your table partition formatted by other format like avro, these procedures will fail. | ||
But we will support avro format in the future. Please make sure your table partition format is in "orc" and "parquet" now. | ||
|
||
<span style="color: red; "> **We highly recommend to back up hive table data before migrating, because migrating action is not atomic. If been interrupted while migrating, you may lose your data.** </span> | ||
|
||
## Example for Migration | ||
|
||
**Migrate Hive Table** | ||
|
||
Command: <br> | ||
|
||
***CALL <font color="green">sys.migrate_table</font>('hive', '<hive_database>.<hive_tablename>', '<paimon_tableconf>');*** | ||
|
||
**Example** | ||
|
||
```sql | ||
CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/'); | ||
|
||
USE CATALOG PAIMON; | ||
|
||
CALL sys.migrate_table('hive', 'default.hivetable', 'file.format=orc'); | ||
``` | ||
After invoke, "hivetable" will totally convert to paimon format. Writing and reading the table by old "hive way" will fail. | ||
We can add our table properties while importing by sys.migrate_table('<database>.<tablename>', '<tableproperties>'). | ||
<tableproperties> here should be separated by ",". For example: | ||
|
||
```sql | ||
CALL sys.migrate_table('hive', 'my_db.wait_to_upgrate', 'file.format=orc,read.batch-size=2096,write-only=true') | ||
``` | ||
|
||
If your flink version is below 1.17, you can use flink action to achieve this: | ||
```bash | ||
<FLINK_HOME>/bin/flink run \ | ||
/path/to/paimon-flink-action-{{< version >}}.jar \ | ||
migrate_table | ||
--warehouse <warehouse-path> \ | ||
--source-table-type hive \ | ||
--source-table-id <database.table-name> \ | ||
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \ | ||
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]] | ||
``` | ||
Example: | ||
```bash | ||
<FLINK_HOME>/flink run ./paimon-flink-action-0.7-SNAPSHOT.jar migrate_table \ | ||
--warehouse /path/to/warehouse \ | ||
--catalog-conf uri=thrift://localhost:9083 \ | ||
--catalog-conf metastore=hive \ | ||
--source-table-type hive \ | ||
--source-table-id default.hive_or_paimon \ | ||
``` | ||
**Migrate Hive File** | ||
Command: <br> | ||
***CALL <font color="green">sys.migrate_file</font>('hive', '<hive_database>.<hive_table_name>', '<paimon_database>.<paimon_tablename>');*** | ||
**Example** | ||
```sql | ||
CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/'); | ||
USE CATALOG PAIMON; | ||
CALL sys.migrate_file('hive', 'default.hivetable', 'default.paimontable'); | ||
``` | ||
After invoke, "hivetable" will disappear. And all files will be moved and renamed to paimon directory. "paimontable" here must have the same | ||
partition keys with "hivetable", and "paimontable" should be in unaware-bucket mode. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
186 changes: 186 additions & 0 deletions
186
paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.paimon.migrate; | ||
|
||
import org.apache.paimon.data.BinaryRow; | ||
import org.apache.paimon.data.BinaryRowWriter; | ||
import org.apache.paimon.data.BinaryWriter; | ||
import org.apache.paimon.format.FieldStats; | ||
import org.apache.paimon.format.FileFormat; | ||
import org.apache.paimon.format.TableStatsExtractor; | ||
import org.apache.paimon.fs.FileIO; | ||
import org.apache.paimon.fs.FileStatus; | ||
import org.apache.paimon.fs.Path; | ||
import org.apache.paimon.io.CompactIncrement; | ||
import org.apache.paimon.io.DataFileMeta; | ||
import org.apache.paimon.io.NewFilesIncrement; | ||
import org.apache.paimon.statistics.FieldStatsCollector; | ||
import org.apache.paimon.stats.BinaryTableStats; | ||
import org.apache.paimon.stats.FieldStatsArraySerializer; | ||
import org.apache.paimon.table.AbstractFileStoreTable; | ||
import org.apache.paimon.table.Table; | ||
import org.apache.paimon.table.sink.CommitMessage; | ||
import org.apache.paimon.table.sink.CommitMessageImpl; | ||
import org.apache.paimon.types.DataField; | ||
import org.apache.paimon.types.RowType; | ||
import org.apache.paimon.utils.Pair; | ||
import org.apache.paimon.utils.StatsCollectorFactories; | ||
import org.apache.paimon.utils.TypeUtils; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Collectors; | ||
|
||
/** To construct file meta data for external files. */ | ||
public class FileMetaUtils { | ||
|
||
public static List<DataFileMeta> construct( | ||
FileIO fileIO, | ||
String format, | ||
String location, | ||
Table paimonTable, | ||
Predicate<FileStatus> filter, | ||
Path dir, | ||
Map<Path, Path> rollback) | ||
throws IOException { | ||
List<FileStatus> fileStatuses = | ||
Arrays.stream(fileIO.listStatus(new Path(location))) | ||
.filter(s -> !s.isDir()) | ||
.filter(filter) | ||
.collect(Collectors.toList()); | ||
|
||
return fileStatuses.stream() | ||
.map( | ||
status -> | ||
constructFileMeta( | ||
format, status, fileIO, paimonTable, dir, rollback)) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
public static CommitMessage commitFile(BinaryRow partition, List<DataFileMeta> dataFileMetas) { | ||
return new CommitMessageImpl( | ||
partition, | ||
0, | ||
new NewFilesIncrement(dataFileMetas, Collections.emptyList()), | ||
new CompactIncrement( | ||
Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); | ||
} | ||
|
||
// -----------------------------private method--------------------------------------------- | ||
|
||
private static DataFileMeta constructFileMeta( | ||
String format, | ||
FileStatus fileStatus, | ||
FileIO fileIO, | ||
Table table, | ||
Path dir, | ||
Map<Path, Path> rollback) { | ||
|
||
try { | ||
FieldStatsCollector.Factory[] factories = | ||
StatsCollectorFactories.createStatsFactories( | ||
((AbstractFileStoreTable) table).coreOptions(), | ||
table.rowType().getFieldNames()); | ||
|
||
TableStatsExtractor tableStatsExtractor = | ||
FileFormat.getFileFormat( | ||
((AbstractFileStoreTable) table) | ||
.coreOptions() | ||
.toConfiguration(), | ||
format) | ||
.createStatsExtractor(table.rowType(), factories) | ||
.orElseThrow( | ||
() -> | ||
new RuntimeException( | ||
"Can't get table stats extractor for format " | ||
+ format)); | ||
Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, format, rollback); | ||
return constructFileMeta( | ||
newPath.getName(), | ||
fileStatus.getLen(), | ||
newPath, | ||
tableStatsExtractor, | ||
fileIO, | ||
table); | ||
} catch (IOException e) { | ||
throw new RuntimeException("error when construct file meta", e); | ||
} | ||
} | ||
|
||
private static Path renameFile( | ||
FileIO fileIO, Path originPath, Path newDir, String format, Map<Path, Path> rollback) | ||
throws IOException { | ||
String subfix = "." + format; | ||
String fileName = originPath.getName(); | ||
String newFileName = fileName.endsWith(subfix) ? fileName : fileName + "." + format; | ||
Path newPath = new Path(newDir, newFileName); | ||
rollback.put(newPath, originPath); | ||
fileIO.rename(originPath, newPath); | ||
return newPath; | ||
} | ||
|
||
private static DataFileMeta constructFileMeta( | ||
String fileName, | ||
long fileSize, | ||
Path path, | ||
TableStatsExtractor tableStatsExtractor, | ||
FileIO fileIO, | ||
Table table) | ||
throws IOException { | ||
FieldStatsArraySerializer statsArraySerializer = | ||
new FieldStatsArraySerializer(table.rowType()); | ||
|
||
Pair<FieldStats[], TableStatsExtractor.FileInfo> fileInfo = | ||
tableStatsExtractor.extractWithFileInfo(fileIO, path); | ||
BinaryTableStats stats = statsArraySerializer.toBinary(fileInfo.getLeft()); | ||
|
||
return DataFileMeta.forAppend( | ||
fileName, | ||
fileSize, | ||
fileInfo.getRight().getRowCount(), | ||
stats, | ||
0, | ||
0, | ||
((AbstractFileStoreTable) table).schema().id()); | ||
} | ||
|
||
public static BinaryRow writePartitionValue( | ||
RowType partitionRowType, | ||
Map<String, String> partitionValues, | ||
List<BinaryWriter.ValueSetter> valueSetters) { | ||
|
||
BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount()); | ||
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow); | ||
|
||
List<DataField> fields = partitionRowType.getFields(); | ||
|
||
for (int i = 0; i < fields.size(); i++) { | ||
Object value = | ||
TypeUtils.castFromString( | ||
partitionValues.get(fields.get(i).name()), fields.get(i).type()); | ||
valueSetters.get(i).setValue(binaryRowWriter, i, value); | ||
} | ||
binaryRowWriter.complete(); | ||
return binaryRow; | ||
} | ||
} |
25 changes: 25 additions & 0 deletions
25
paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.paimon.migrate; | ||
|
||
/** Migrator interface for migrating table from other data-lake like hive, iceberg, hudi and etc. */ | ||
public interface Migrator { | ||
|
||
void executeMigrate() throws Exception; | ||
} |
Oops, something went wrong.