Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Ignore void partitions in metacat (#621)" #624

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public TableInfo fromIcebergTableToTableInfo(final QualifiedName name,
final TableInfo tableInfo) {
final org.apache.iceberg.Table table = tableWrapper.getTable();
final List<FieldInfo> allFields =
this.hiveTypeConverter.icebergSchemaTofieldDtos(table.schema(), table.spec().fields());
this.hiveTypeConverter.icebergeSchemaTofieldDtos(table.schema(), table.spec().fields());
final Map<String, String> tableParameters = new HashMap<>();
tableParameters.put(DirectSqlTable.PARAM_TABLE_TYPE, DirectSqlTable.ICEBERG_TABLE_TYPE);
tableParameters.put(DirectSqlTable.PARAM_METADATA_LOCATION, tableLoc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,12 @@ public Type toMetacatType(final String type) {
* @param partitionFields partitioned fields
* @return list of field Info
*/
public List<FieldInfo> icebergSchemaTofieldDtos(final Schema schema,
public List<FieldInfo> icebergeSchemaTofieldDtos(final Schema schema,
final List<PartitionField> partitionFields) {
final List<FieldInfo> fields = Lists.newArrayList();
final List<String> partitionNames = partitionFields.stream()
.filter(f -> f.transform() != null && !f.transform().toString().equalsIgnoreCase("void"))
.map(f -> schema.findField(f.sourceId()).name())
.collect(Collectors.toList());
final List<String> partitionNames =
partitionFields.stream()
.map(f -> schema.findField(f.sourceId()).name()).collect(Collectors.toList());

for (Types.NestedField field : schema.columns()) {
final FieldInfo fieldInfo = new FieldInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.netflix.metacat.connector.hive.converters
import org.apache.iceberg.PartitionField
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.Schema
import org.apache.iceberg.transforms.Identity
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Types
import com.netflix.metacat.common.QualifiedName
Expand Down Expand Up @@ -59,7 +58,7 @@ class HiveConnectorInfoConvertorSpec extends Specification{
def setup() {
// Stub this to always return true
config.isEpochInSeconds() >> true
converter = new HiveConnectorInfoConverter(new HiveTypeConverter())
converter = new HiveConnectorInfoConverter( new HiveTypeConverter())
}

def 'test date to epoch seconds'() {
Expand Down Expand Up @@ -513,7 +512,6 @@ class HiveConnectorInfoConvertorSpec extends Specification{
def tableInfo = converter.fromIcebergTableToTableInfo(QualifiedName.ofTable('c', 'd', 't'),
icebergTableWrapper, "/tmp/test", TableInfo.builder().build() )
then:
2 * field.transform() >> Mock(Identity)
1 * icebergTable.properties() >> ["test":"abd"]
2 * icebergTable.spec() >> partSpec
1 * partSpec.fields() >> [ field]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@

package com.netflix.metacat.connector.hive.converters

import org.apache.iceberg.PartitionField
import org.apache.iceberg.transforms.Identity
import org.apache.iceberg.transforms.VoidTransform
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Unroll
Expand Down Expand Up @@ -260,36 +255,4 @@ class HiveTypeConverterSpec extends Specification {
"array<struct<date:string,countryCodes:array<string>,source:string>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}"""
"array<struct<Date:string,nestedArray:array<struct<date:string,countryCodes:array<string>,source:string>>>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"Date","type":"string"},{"name":"nestedArray","type":{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}}]}}"""
}

def "Test treat void transforms partitions as non-partition field"() {
given:
// Initial schema with three fields
def initialSchema = new Schema(
Types.NestedField.optional(1, "field1", Types.BooleanType.get(), "added 1st - partition key"),
Types.NestedField.optional(2, "field2", Types.StringType.get(), "added 2nd"),
Types.NestedField.optional(3, "field3", Types.IntegerType.get(), "added 3rd")
)
// Initial partition fields
def initialPartitionFields = [
new PartitionField(1, 1, "field1", new Identity()),
new PartitionField(2, 2, "field2", new VoidTransform<String>()),
]
when:
def fieldDtos = this.converter.icebergSchemaTofieldDtos(initialSchema, initialPartitionFields)
then:
fieldDtos.size() == 3
// Validate the first field
def field1 = fieldDtos.find { it.name == "field1" }
field1 != null
field1.partitionKey == true
// Validate the second field
def field2 = fieldDtos.find { it.name == "field2" }
field2 != null
field2.partitionKey == false
// Validate the third field
def field3 = fieldDtos.find { it.name == "field3" }
field3 != null
field3.partitionKey == false
noExceptionThrown()
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import feign.RetryableException
import feign.Retryer
import groovy.sql.Sql
import org.apache.commons.io.FileUtils
import org.apache.iceberg.PartitionField
import org.joda.time.Instant
import org.skyscreamer.jsonassert.JSONAssert
import spock.lang.Ignore
Expand Down Expand Up @@ -832,47 +831,6 @@ class MetacatSmokeSpec extends Specification {
api.deleteTable(catalogName, databaseName, tableName)
}

@Unroll
def "Test ignore void transform as partition fields"() {
given:
def catalogName = 'embedded-fast-hive-metastore'
def databaseName = 'iceberg_db'
def tableName = 'iceberg_table_6'
def uri = isLocalEnv ? String.format('file:/tmp/data/') : null
def tableDto = new TableDto(
name: QualifiedName.ofTable(catalogName, databaseName, tableName),
serde: new StorageDto(
owner: 'metacat-test',
inputFormat: 'org.apache.hadoop.mapred.TextInputFormat',
outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
serializationLib: 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
parameters: [
'serialization.format': '1'
],
uri: uri
),
definitionMetadata: null,
dataMetadata: null,
fields: null
)
def metadataLocation = String.format('/tmp/data/metadata/00001-ba4b775c-7b1a-4a6f-aec0-03d3c851088c.metadata.json')
def metadata = [table_type: 'ICEBERG', metadata_location: metadataLocation]
tableDto.setMetadata(metadata)
when:
try {api.createDatabase(catalogName, databaseName, new DatabaseCreateRequestDto())
} catch (Exception ignored) {
}
api.createTable(catalogName, databaseName, tableName, tableDto)
def tableDTO = api.getTable(catalogName, databaseName, tableName, true, true, true)
then:
tableDTO.getFields().size() == 4
tableDTO.getPartition_keys().size() == 1
tableDTO.getPartition_keys()[0] == "field1"
cleanup:
api.deleteTable(catalogName, databaseName, tableName)
}


@Unroll
def "Test get partitions from iceberg table using #filter"() {
def catalogName = 'embedded-fast-hive-metastore'
Expand Down
Loading