From 020da68f2610067488dee4cccb52ac97ec41be43 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Thu, 24 Oct 2024 14:48:28 +0700 Subject: [PATCH] feat: add partitioning strategy --- .../helper/MaxComputeSchemaHelper.java | 26 ++++++------- .../ProtoDataColumnRecordDecorator.java | 8 +--- .../PartitioningStrategyFactory.java | 16 ++++---- .../converter/record/RecordConverterTest.java | 11 +++++- .../helper/MaxComputeSchemaHelperTest.java | 37 +++++++++++++++---- .../ProtoDataColumnRecordDecoratorTest.java | 3 +- ...rotoMetadataColumnRecordDecoratorTest.java | 2 +- 7 files changed, 65 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/gotocompany/depot/maxcompute/helper/MaxComputeSchemaHelper.java b/src/main/java/com/gotocompany/depot/maxcompute/helper/MaxComputeSchemaHelper.java index a31743c2..85cb69a9 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/helper/MaxComputeSchemaHelper.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/helper/MaxComputeSchemaHelper.java @@ -6,6 +6,7 @@ import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.maxcompute.converter.ConverterOrchestrator; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; +import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; import com.gotocompany.depot.maxcompute.util.MetadataUtil; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; @@ -21,12 +22,12 @@ public class MaxComputeSchemaHelper { private final ConverterOrchestrator converterOrchestrator; private final MaxComputeSinkConfig maxComputeSinkConfig; + private final PartitioningStrategy partitioningStrategy; public MaxComputeSchema buildMaxComputeSchema(Descriptors.Descriptor descriptor) { - List dataColumn = buildDataColumns(descriptor, maxComputeSinkConfig.getTablePartitionKey()); + List dataColumn = buildDataColumns(descriptor, partitioningStrategy); List metadataColumns = buildMetadataColumns(); - Column partitionColumn = maxComputeSinkConfig.isTablePartitioningEnabled() ? - buildPartitionColumn(descriptor, maxComputeSinkConfig.getTablePartitionKey()) : null; + Column partitionColumn = maxComputeSinkConfig.isTablePartitioningEnabled() ? buildPartitionColumn(partitioningStrategy) : null; TableSchema.Builder tableSchemaBuilder = com.aliyun.odps.TableSchema.builder(); tableSchemaBuilder.withColumns(dataColumn); tableSchemaBuilder.withColumns(metadataColumns); @@ -46,23 +47,22 @@ public MaxComputeSchema buildMaxComputeSchema(Descriptors.Descriptor descriptor) } private List buildDataColumns(Descriptors.Descriptor descriptor, - String partitionKey) { + PartitioningStrategy partitioningStrategy) { return descriptor.getFields() .stream() - .filter(fieldDescriptor -> !fieldDescriptor.getName().equals(partitionKey)) + .filter(fieldDescriptor -> { + if (!maxComputeSinkConfig.isTablePartitioningEnabled() || !fieldDescriptor.getName().equals(maxComputeSinkConfig.getTablePartitionKey())) { + return true; + } + return !partitioningStrategy.shouldReplaceOriginalColumn(); + }) .map(fieldDescriptor -> Column.newBuilder(fieldDescriptor.getName(), converterOrchestrator.convert(fieldDescriptor)).build()) .collect(Collectors.toList()); } - private Column buildPartitionColumn(Descriptors.Descriptor descriptor, String partitionKey) { - return descriptor.getFields() - .stream() - .filter(fieldDescriptor -> fieldDescriptor.getName().equals(partitionKey)) - .map(fieldDescriptor -> Column.newBuilder(fieldDescriptor.getName(), - converterOrchestrator.convert(fieldDescriptor)).build()) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("Partition key not found in descriptor")); + private Column buildPartitionColumn(PartitioningStrategy partitioningStrategy) { + return partitioningStrategy.getPartitionColumn(); } private List buildMetadataColumns() { diff --git a/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecorator.java b/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecorator.java index 70900c0a..ee418305 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecorator.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecorator.java @@ -1,7 +1,6 @@ package com.gotocompany.depot.maxcompute.record; import com.aliyun.odps.data.Record; -import com.aliyun.odps.type.TypeInfo; import com.google.protobuf.Descriptors; import com.gotocompany.depot.config.SinkConfig; import com.gotocompany.depot.maxcompute.converter.ConverterOrchestrator; @@ -13,7 +12,6 @@ import com.gotocompany.depot.message.proto.ProtoMessageParser; import java.io.IOException; -import java.util.Map; public class ProtoDataColumnRecordDecorator extends RecordDecorator { @@ -40,10 +38,8 @@ public void append(Record record, Message message) throws IOException { ParsedMessage parsedMessage = protoMessageParser.parse(message, sinkConfig.getSinkConnectorSchemaMessageMode(), schemaClass); parsedMessage.validate(sinkConfig); com.google.protobuf.Message protoMessage = (com.google.protobuf.Message) parsedMessage.getRaw(); - MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema(); - for (Map.Entry entry : maxComputeSchema.getDataColumns().entrySet()) { - Descriptors.FieldDescriptor fieldDescriptor = protoMessage.getDescriptorForType().findFieldByName(entry.getKey()); - record.set(entry.getKey(), converterOrchestrator.convert(fieldDescriptor, protoMessage.getField(fieldDescriptor))); + for (Descriptors.FieldDescriptor fieldDescriptor : protoMessage.getDescriptorForType().getFields()) { + record.set(fieldDescriptor.getName(), converterOrchestrator.convert(fieldDescriptor, protoMessage.getField(fieldDescriptor))); } } diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/partition/PartitioningStrategyFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/partition/PartitioningStrategyFactory.java index a648ed27..c4609705 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/schema/partition/PartitioningStrategyFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/partition/PartitioningStrategyFactory.java @@ -5,7 +5,6 @@ import com.google.protobuf.Descriptors; import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.maxcompute.converter.ConverterOrchestrator; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; import lombok.RequiredArgsConstructor; @RequiredArgsConstructor @@ -13,16 +12,19 @@ public class PartitioningStrategyFactory { private final ConverterOrchestrator converterOrchestrator; private final MaxComputeSinkConfig maxComputeSinkConfig; - private final MaxComputeSchemaCache maxComputeSchemaCache; - public PartitioningStrategy createPartitioningStrategy() { + public PartitioningStrategy createPartitioningStrategy(Descriptors.Descriptor descriptor) { + if (!maxComputeSinkConfig.isTablePartitioningEnabled()) { + return null; + } String partitionKey = maxComputeSinkConfig.getTablePartitionKey(); - Descriptors.FieldDescriptor fieldDescriptor = maxComputeSchemaCache.getMaxComputeSchema() - .getDescriptor() + Descriptors.FieldDescriptor fieldDescriptor = descriptor .findFieldByName(partitionKey); + if (fieldDescriptor == null) { + throw new IllegalArgumentException("Partition key not found in the descriptor: " + partitionKey); + } TypeInfo partitionKeyTypeInfo = converterOrchestrator.convert(fieldDescriptor); - - if (TypeInfoFactory.TIMESTAMP_NTZ.equals(partitionKeyTypeInfo)) { + if (TypeInfoFactory.TIMESTAMP.equals(partitionKeyTypeInfo)) { return new TimestampPartitioningStrategy(maxComputeSinkConfig); } else { return new DefaultPartitioningStrategy(partitionKeyTypeInfo, maxComputeSinkConfig); diff --git a/src/test/java/com/gotocompany/depot/maxcompute/converter/record/RecordConverterTest.java b/src/test/java/com/gotocompany/depot/maxcompute/converter/record/RecordConverterTest.java index 3f9dae2c..d538965b 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/converter/record/RecordConverterTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/converter/record/RecordConverterTest.java @@ -20,6 +20,7 @@ import com.gotocompany.depot.maxcompute.record.ProtoMetadataColumnRecordDecorator; import com.gotocompany.depot.maxcompute.record.RecordDecorator; import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory; import com.gotocompany.depot.message.Message; import com.gotocompany.depot.message.ParsedMessage; import com.gotocompany.depot.message.SinkConnectorSchemaMessageMode; @@ -59,6 +60,7 @@ public void setup() throws IOException { ); Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.TRUE); Mockito.when(maxComputeSinkConfig.getTablePartitionKey()).thenReturn("timestamp"); + Mockito.when(maxComputeSinkConfig.getTablePartitionColumnName()).thenReturn("__partition_column"); converterOrchestrator = new ConverterOrchestrator(); protoMessageParser = Mockito.mock(ProtoMessageParser.class); ParsedMessage parsedMessage = Mockito.mock(ParsedMessage.class); @@ -68,7 +70,10 @@ public void setup() throws IOException { sinkConfig = Mockito.mock(SinkConfig.class); Mockito.when(sinkConfig.getSinkConnectorSchemaMessageMode()) .thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE); - maxComputeSchemaHelper = new MaxComputeSchemaHelper(converterOrchestrator, maxComputeSinkConfig); + PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory(converterOrchestrator, maxComputeSinkConfig); + maxComputeSchemaHelper = new MaxComputeSchemaHelper(converterOrchestrator, maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy( + descriptor + )); maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor); Mockito.when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); @@ -88,7 +93,8 @@ public void shouldConvertMessageToRecordWrapper() { new Tuple<>("__kafka_topic", "topic"), new Tuple<>("__kafka_offset", 100L) ); - + java.sql.Timestamp expectedTimestamp = new java.sql.Timestamp(10002010L * 1000); + expectedTimestamp.setNanos(1000); List recordWrappers = recordConverter.convert(Collections.singletonList(message)); Assertions.assertThat(recordWrappers).size().isEqualTo(1); @@ -114,6 +120,7 @@ public void shouldConvertMessageToRecordWrapper() { Arrays.asList("name_2", 50f) ) )), + expectedTimestamp, new java.sql.Timestamp(123012311L), "topic", 100L diff --git a/src/test/java/com/gotocompany/depot/maxcompute/helper/MaxComputeSchemaHelperTest.java b/src/test/java/com/gotocompany/depot/maxcompute/helper/MaxComputeSchemaHelperTest.java index fbc43065..48402109 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/helper/MaxComputeSchemaHelperTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/helper/MaxComputeSchemaHelperTest.java @@ -7,6 +7,7 @@ import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.maxcompute.converter.ConverterOrchestrator; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; +import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory; import com.sun.tools.javac.util.List; import org.assertj.core.api.Assertions; import org.assertj.core.groups.Tuple; @@ -30,8 +31,13 @@ public void shouldBuildPartitionedTableSchemaWithRootLevelMetadata() { ); Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.TRUE); Mockito.when(maxComputeSinkConfig.getTablePartitionKey()).thenReturn("event_timestamp"); - MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), maxComputeSinkConfig); - int expectedNonPartitionColumnCount = 6; + Mockito.when(maxComputeSinkConfig.getTablePartitionColumnName()).thenReturn("__partitioning_column"); + PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory( + new ConverterOrchestrator(), maxComputeSinkConfig + ); + MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), + maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy(descriptor)); + int expectedNonPartitionColumnCount = 7; int expectedPartitionColumnCount = 1; MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor); @@ -53,13 +59,14 @@ public void shouldBuildPartitionedTableSchemaWithRootLevelMetadata() { List.of("id", "name"), List.of(TypeInfoFactory.STRING, TypeInfoFactory.STRING) ))), + Tuple.tuple("event_timestamp", TypeInfoFactory.TIMESTAMP), Tuple.tuple("__message_timestamp", TypeInfoFactory.TIMESTAMP), Tuple.tuple("__kafka_topic", TypeInfoFactory.STRING), Tuple.tuple("__kafka_offset", TypeInfoFactory.BIGINT) ); Assertions.assertThat(maxComputeSchema.getTableSchema().getPartitionColumns()) .extracting("name", "typeInfo") - .contains(Tuple.tuple("event_timestamp", TypeInfoFactory.TIMESTAMP)); + .contains(Tuple.tuple("__partitioning_column", TypeInfoFactory.STRING)); } @Test @@ -75,9 +82,14 @@ public void shouldBuildPartitionedTableSchemaWithNestedMetadata() { ); Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.TRUE); Mockito.when(maxComputeSinkConfig.getTablePartitionKey()).thenReturn("event_timestamp"); - int expectedNonPartitionColumnCount = 4; + Mockito.when(maxComputeSinkConfig.getTablePartitionColumnName()).thenReturn("__partitioning_column"); + int expectedNonPartitionColumnCount = 5; int expectedPartitionColumnCount = 1; - MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), maxComputeSinkConfig); + PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory( + new ConverterOrchestrator(), maxComputeSinkConfig + ); + MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper( + new ConverterOrchestrator(), maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy(descriptor)); MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor); @@ -98,6 +110,7 @@ public void shouldBuildPartitionedTableSchemaWithNestedMetadata() { List.of("id", "name"), List.of(TypeInfoFactory.STRING, TypeInfoFactory.STRING) ))), + Tuple.tuple("event_timestamp", TypeInfoFactory.TIMESTAMP), Tuple.tuple("meta", TypeInfoFactory.getStructTypeInfo( List.of("__message_timestamp", "__kafka_topic", "__kafka_offset"), List.of(TypeInfoFactory.TIMESTAMP, TypeInfoFactory.STRING, TypeInfoFactory.BIGINT) @@ -105,7 +118,7 @@ public void shouldBuildPartitionedTableSchemaWithNestedMetadata() { ); Assertions.assertThat(maxComputeSchema.getTableSchema().getPartitionColumns()) .extracting("name", "typeInfo") - .contains(Tuple.tuple("event_timestamp", TypeInfoFactory.TIMESTAMP)); + .contains(Tuple.tuple("__partitioning_column", TypeInfoFactory.STRING)); } @Test @@ -115,7 +128,11 @@ public void shouldBuildTableSchemaWithoutPartitionAndMeta() { Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.FALSE); int expectedNonPartitionColumnCount = 4; int expectedPartitionColumnCount = 0; - MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), maxComputeSinkConfig); + PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory( + new ConverterOrchestrator(), maxComputeSinkConfig + ); + MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), + maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy(descriptor)); MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor); @@ -152,7 +169,11 @@ public void shouldThrowIllegalArgumentExceptionWhenPartitionKeyIsNotFound() { ); Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.TRUE); Mockito.when(maxComputeSinkConfig.getTablePartitionKey()).thenReturn("non_existent_partition_key"); - MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), maxComputeSinkConfig); + PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory( + new ConverterOrchestrator(), maxComputeSinkConfig + ); + MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), + maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy(descriptor)); maxComputeSchemaHelper.buildMaxComputeSchema(descriptor); } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java index 47c300d5..a50dbae1 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java @@ -71,7 +71,8 @@ private void instantiateProtoDataColumnRecordDecorator(SinkConfig sinkConfig, Ma ConverterOrchestrator converterOrchestrator = new ConverterOrchestrator(); maxComputeSchemaHelper = new MaxComputeSchemaHelper( converterOrchestrator, - maxComputeSinkConfig + maxComputeSinkConfig, + null ); MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(DESCRIPTOR); diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java index dd2478b4..e29b53ca 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java @@ -49,7 +49,7 @@ public void setup() { private void initializeDecorator(MaxComputeSinkConfig maxComputeSinkConfig) { this.maxComputeSinkConfig = maxComputeSinkConfig; ConverterOrchestrator converterOrchestrator = new ConverterOrchestrator(); - MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(converterOrchestrator, maxComputeSinkConfig); + MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(converterOrchestrator, maxComputeSinkConfig, null); MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor); maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); Mockito.when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema);