diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index dc6659696..bc5f5e02d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -85,7 +85,7 @@ public Map sync( if (config.getTargetTables() == null || config.getTargetTables().isEmpty()) { throw new IllegalArgumentException("Please provide at-least one format to sync"); } - + config = ConversionUtils.normalizeTargetPaths(config); try (ConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) { ExtractFromSource source = ExtractFromSource.of(conversionSource); diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java new file mode 100644 index 000000000..fdeedc9bb --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.xtable.model.storage.TableFormat; + +public class ConversionUtils { + + /** + * Few table formats need the metadata to be located at the root level of the data files. Eg: An + * iceberg table generated through spark will have two directories basePath/data and + * basePath/metadata For synchronising the iceberg metadata to hudi and delta, they need to be + * present in basePath/data/.hoodie and basePath/data/_delta_log. + * + * @param config conversion config for synchronizing source and target tables + * @return updated table config. + */ + public static ConversionConfig normalizeTargetPaths(ConversionConfig config) { + if (!config.getSourceTable().getDataPath().equals(config.getSourceTable().getBasePath()) + && config.getSourceTable().getFormatName().equals(TableFormat.ICEBERG)) { + List updatedTargetTables = + config.getTargetTables().stream() + .filter( + targetTable -> + targetTable.getFormatName().equals(TableFormat.HUDI) + || targetTable.getFormatName().equals(TableFormat.DELTA)) + .map( + targetTable -> + targetTable.toBuilder() + .basePath(config.getSourceTable().getDataPath()) + .build()) + .collect(Collectors.toList()); + return new ConversionConfig( + config.getSourceTable(), updatedTargetTables, config.getSyncMode()); + } + return config; + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 3d539766a..b5ffcdf13 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -1013,8 +1013,7 @@ private static ConversionConfig getTableSyncConfig( TargetTable.builder() .name(tableName) .formatName(formatName) - // set the metadata path to the data path as the default (required by Hudi) - .basePath(table.getDataPath()) + .basePath(table.getBasePath()) .metadataRetention(metadataRetention) .build()) .collect(Collectors.toList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java new file mode 100644 index 000000000..b1044039f --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Arrays; + +import org.junit.jupiter.api.Test; + +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.model.sync.SyncMode; + +class TestConversionUtils { + + @Test + void testNormalizeTargetPaths() { + ConversionConfig config = + ConversionConfig.builder() + .sourceTable( + SourceTable.builder() + .name("table_name") + .formatName(TableFormat.ICEBERG) + .basePath("/tmp/basePath") + .dataPath("/tmp/basePath/data") + .build()) + .syncMode(SyncMode.FULL) + .targetTables( + Arrays.asList( + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath") + .formatName(TableFormat.DELTA) + .build(), + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath") + .formatName(TableFormat.HUDI) + .build())) + .build(); + ConversionConfig expectedNormalizedConfig = + ConversionConfig.builder() + .sourceTable( + SourceTable.builder() + .name("table_name") + .formatName(TableFormat.ICEBERG) + .basePath("/tmp/basePath") + .dataPath("/tmp/basePath/data") + .build()) + .syncMode(SyncMode.FULL) + .targetTables( + Arrays.asList( + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath/data") + .formatName(TableFormat.DELTA) + .build(), + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath/data") + .formatName(TableFormat.HUDI) + .build())) + .build(); + ConversionConfig actualConfig = ConversionUtils.normalizeTargetPaths(config); + assertEquals(expectedNormalizedConfig, actualConfig); + } + + @Test + void testNormalizeTargetPathsNoOp() { + ConversionConfig config = + ConversionConfig.builder() + .sourceTable( + SourceTable.builder() + .name("table_name") + .formatName(TableFormat.HUDI) + .basePath("/tmp/basePath") + .build()) + .syncMode(SyncMode.FULL) + .targetTables( + Arrays.asList( + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath") + .formatName(TableFormat.ICEBERG) + .build(), + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath") + .formatName(TableFormat.DELTA) + .build())) + .build(); + ConversionConfig actualConfig = ConversionUtils.normalizeTargetPaths(config); + assertEquals(config, actualConfig); + } +}