Skip to content

Commit

Permalink
Normalize basePath in targetTables in ConversionController
Browse files Browse the repository at this point in the history
  • Loading branch information
vinishjail97 committed Dec 14, 2024
1 parent a360aff commit af55915
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public <COMMIT> Map<String, SyncResult> sync(
if (config.getTargetTables() == null || config.getTargetTables().isEmpty()) {
throw new IllegalArgumentException("Please provide at-least one format to sync");
}

config = ConversionUtils.normalizeTargetPaths(config);
try (ConversionSource<COMMIT> conversionSource =
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
ExtractFromSource<COMMIT> source = ExtractFromSource.of(conversionSource);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.conversion;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.xtable.model.storage.TableFormat;

public class ConversionUtils {

/**
* Adjusts the location of metadata for few table formats need to be at the root level of the data
* files. Eg: An iceberg table generated through spark will have two directories basePath/data and
* basePath/metadata For synchronising the iceberg metadata to hudi and delta, they need to be
* present in basePath/data/.hoodie and basePath/data/_delta_log.
*
* @param config conversion config for synchronizing source and target tables
* @return updated table config.
*/
public static ConversionConfig normalizeTargetPaths(ConversionConfig config) {
if (!config.getSourceTable().getDataPath().equals(config.getSourceTable().getBasePath())
&& config.getSourceTable().getFormatName().equals(TableFormat.ICEBERG)) {
List<TargetTable> updatedTargetTables =
config.getTargetTables().stream()
.filter(
targetTable ->
targetTable.getFormatName().equals(TableFormat.HUDI)
|| targetTable.getFormatName().equals(TableFormat.DELTA))
.map(
targetTable ->
targetTable.toBuilder()
.basePath(config.getSourceTable().getDataPath())
.build())
.collect(Collectors.toList());
return new ConversionConfig(
config.getSourceTable(), updatedTargetTables, config.getSyncMode());
}
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ private static ConversionConfig getTableSyncConfig(
.name(tableName)
.formatName(formatName)
// set the metadata path to the data path as the default (required by Hudi)
.basePath(table.getDataPath())
.basePath(table.getBasePath())
.metadataRetention(metadataRetention)
.build())
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.conversion;

import static org.junit.jupiter.api.Assertions.*;

import java.util.Arrays;

import org.junit.jupiter.api.Test;

import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;

class TestConversionUtils {

@Test
void testNormalizeTargetPaths() {
ConversionConfig config =
ConversionConfig.builder()
.sourceTable(
SourceTable.builder()
.name("table_name")
.formatName(TableFormat.ICEBERG)
.basePath("/tmp/basePath")
.dataPath("/tmp/basePath/data")
.build())
.syncMode(SyncMode.FULL)
.targetTables(
Arrays.asList(
TargetTable.builder()
.name("table_name")
.basePath("/tmp/basePath")
.formatName(TableFormat.DELTA)
.build(),
TargetTable.builder()
.name("table_name")
.basePath("/tmp/basePath")
.formatName(TableFormat.HUDI)
.build()))
.build();
ConversionConfig expectedNormalizedConfig =
ConversionConfig.builder()
.sourceTable(
SourceTable.builder()
.name("table_name")
.formatName(TableFormat.ICEBERG)
.basePath("/tmp/basePath")
.dataPath("/tmp/basePath/data")
.build())
.syncMode(SyncMode.FULL)
.targetTables(
Arrays.asList(
TargetTable.builder()
.name("table_name")
.basePath("/tmp/basePath/data")
.formatName(TableFormat.DELTA)
.build(),
TargetTable.builder()
.name("table_name")
.basePath("/tmp/basePath/data")
.formatName(TableFormat.HUDI)
.build()))
.build();
ConversionConfig actualConfig = ConversionUtils.normalizeTargetPaths(config);
assertEquals(expectedNormalizedConfig, actualConfig);
}

@Test
void testNormalizeTargetPathsNoOp() {
ConversionConfig config =
ConversionConfig.builder()
.sourceTable(
SourceTable.builder()
.name("table_name")
.formatName(TableFormat.HUDI)
.basePath("/tmp/basePath")
.build())
.syncMode(SyncMode.FULL)
.targetTables(
Arrays.asList(
TargetTable.builder()
.name("table_name")
.basePath("/tmp/basePath")
.formatName(TableFormat.ICEBERG)
.build(),
TargetTable.builder()
.name("table_name")
.basePath("/tmp/basePath")
.formatName(TableFormat.DELTA)
.build()))
.build();
ConversionConfig actualConfig = ConversionUtils.normalizeTargetPaths(config);
assertEquals(config, actualConfig);
}
}

0 comments on commit af55915

Please sign in to comment.