Skip to content

Commit

Permalink
feat: add partitioning strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
ekawinataa committed Oct 24, 2024
1 parent 792eeb8 commit 020da68
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.converter.ConverterOrchestrator;
import com.gotocompany.depot.maxcompute.model.MaxComputeSchema;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy;
import com.gotocompany.depot.maxcompute.util.MetadataUtil;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -21,12 +22,12 @@ public class MaxComputeSchemaHelper {

private final ConverterOrchestrator converterOrchestrator;
private final MaxComputeSinkConfig maxComputeSinkConfig;
private final PartitioningStrategy<?> partitioningStrategy;

public MaxComputeSchema buildMaxComputeSchema(Descriptors.Descriptor descriptor) {
List<Column> dataColumn = buildDataColumns(descriptor, maxComputeSinkConfig.getTablePartitionKey());
List<Column> dataColumn = buildDataColumns(descriptor, partitioningStrategy);
List<Column> metadataColumns = buildMetadataColumns();
Column partitionColumn = maxComputeSinkConfig.isTablePartitioningEnabled() ?
buildPartitionColumn(descriptor, maxComputeSinkConfig.getTablePartitionKey()) : null;
Column partitionColumn = maxComputeSinkConfig.isTablePartitioningEnabled() ? buildPartitionColumn(partitioningStrategy) : null;
TableSchema.Builder tableSchemaBuilder = com.aliyun.odps.TableSchema.builder();
tableSchemaBuilder.withColumns(dataColumn);
tableSchemaBuilder.withColumns(metadataColumns);
Expand All @@ -46,23 +47,22 @@ public MaxComputeSchema buildMaxComputeSchema(Descriptors.Descriptor descriptor)
}

private List<Column> buildDataColumns(Descriptors.Descriptor descriptor,
String partitionKey) {
PartitioningStrategy<?> partitioningStrategy) {
return descriptor.getFields()
.stream()
.filter(fieldDescriptor -> !fieldDescriptor.getName().equals(partitionKey))
.filter(fieldDescriptor -> {
if (!maxComputeSinkConfig.isTablePartitioningEnabled() || !fieldDescriptor.getName().equals(maxComputeSinkConfig.getTablePartitionKey())) {
return true;
}
return !partitioningStrategy.shouldReplaceOriginalColumn();
})
.map(fieldDescriptor -> Column.newBuilder(fieldDescriptor.getName(),
converterOrchestrator.convert(fieldDescriptor)).build())
.collect(Collectors.toList());
}

private Column buildPartitionColumn(Descriptors.Descriptor descriptor, String partitionKey) {
return descriptor.getFields()
.stream()
.filter(fieldDescriptor -> fieldDescriptor.getName().equals(partitionKey))
.map(fieldDescriptor -> Column.newBuilder(fieldDescriptor.getName(),
converterOrchestrator.convert(fieldDescriptor)).build())
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Partition key not found in descriptor"));
private Column buildPartitionColumn(PartitioningStrategy<?> partitioningStrategy) {
return partitioningStrategy.getPartitionColumn();
}

private List<Column> buildMetadataColumns() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.gotocompany.depot.maxcompute.record;

import com.aliyun.odps.data.Record;
import com.aliyun.odps.type.TypeInfo;
import com.google.protobuf.Descriptors;
import com.gotocompany.depot.config.SinkConfig;
import com.gotocompany.depot.maxcompute.converter.ConverterOrchestrator;
Expand All @@ -13,7 +12,6 @@
import com.gotocompany.depot.message.proto.ProtoMessageParser;

import java.io.IOException;
import java.util.Map;

public class ProtoDataColumnRecordDecorator extends RecordDecorator {

Expand All @@ -40,10 +38,8 @@ public void append(Record record, Message message) throws IOException {
ParsedMessage parsedMessage = protoMessageParser.parse(message, sinkConfig.getSinkConnectorSchemaMessageMode(), schemaClass);
parsedMessage.validate(sinkConfig);
com.google.protobuf.Message protoMessage = (com.google.protobuf.Message) parsedMessage.getRaw();
MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema();
for (Map.Entry<String, TypeInfo> entry : maxComputeSchema.getDataColumns().entrySet()) {
Descriptors.FieldDescriptor fieldDescriptor = protoMessage.getDescriptorForType().findFieldByName(entry.getKey());
record.set(entry.getKey(), converterOrchestrator.convert(fieldDescriptor, protoMessage.getField(fieldDescriptor)));
for (Descriptors.FieldDescriptor fieldDescriptor : protoMessage.getDescriptorForType().getFields()) {
record.set(fieldDescriptor.getName(), converterOrchestrator.convert(fieldDescriptor, protoMessage.getField(fieldDescriptor)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@
import com.google.protobuf.Descriptors;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.converter.ConverterOrchestrator;
import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class PartitioningStrategyFactory {

private final ConverterOrchestrator converterOrchestrator;
private final MaxComputeSinkConfig maxComputeSinkConfig;
private final MaxComputeSchemaCache maxComputeSchemaCache;

public PartitioningStrategy<?> createPartitioningStrategy() {
public PartitioningStrategy<?> createPartitioningStrategy(Descriptors.Descriptor descriptor) {
if (!maxComputeSinkConfig.isTablePartitioningEnabled()) {
return null;
}
String partitionKey = maxComputeSinkConfig.getTablePartitionKey();
Descriptors.FieldDescriptor fieldDescriptor = maxComputeSchemaCache.getMaxComputeSchema()
.getDescriptor()
Descriptors.FieldDescriptor fieldDescriptor = descriptor
.findFieldByName(partitionKey);
if (fieldDescriptor == null) {
throw new IllegalArgumentException("Partition key not found in the descriptor: " + partitionKey);
}
TypeInfo partitionKeyTypeInfo = converterOrchestrator.convert(fieldDescriptor);

if (TypeInfoFactory.TIMESTAMP_NTZ.equals(partitionKeyTypeInfo)) {
if (TypeInfoFactory.TIMESTAMP.equals(partitionKeyTypeInfo)) {
return new TimestampPartitioningStrategy(maxComputeSinkConfig);
} else {
return new DefaultPartitioningStrategy(partitionKeyTypeInfo, maxComputeSinkConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.gotocompany.depot.maxcompute.record.ProtoMetadataColumnRecordDecorator;
import com.gotocompany.depot.maxcompute.record.RecordDecorator;
import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory;
import com.gotocompany.depot.message.Message;
import com.gotocompany.depot.message.ParsedMessage;
import com.gotocompany.depot.message.SinkConnectorSchemaMessageMode;
Expand Down Expand Up @@ -59,6 +60,7 @@ public void setup() throws IOException {
);
Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.TRUE);
Mockito.when(maxComputeSinkConfig.getTablePartitionKey()).thenReturn("timestamp");
Mockito.when(maxComputeSinkConfig.getTablePartitionColumnName()).thenReturn("__partition_column");
converterOrchestrator = new ConverterOrchestrator();
protoMessageParser = Mockito.mock(ProtoMessageParser.class);
ParsedMessage parsedMessage = Mockito.mock(ParsedMessage.class);
Expand All @@ -68,7 +70,10 @@ public void setup() throws IOException {
sinkConfig = Mockito.mock(SinkConfig.class);
Mockito.when(sinkConfig.getSinkConnectorSchemaMessageMode())
.thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE);
maxComputeSchemaHelper = new MaxComputeSchemaHelper(converterOrchestrator, maxComputeSinkConfig);
PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory(converterOrchestrator, maxComputeSinkConfig);
maxComputeSchemaHelper = new MaxComputeSchemaHelper(converterOrchestrator, maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy(
descriptor
));
maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class);
MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor);
Mockito.when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema);
Expand All @@ -88,7 +93,8 @@ public void shouldConvertMessageToRecordWrapper() {
new Tuple<>("__kafka_topic", "topic"),
new Tuple<>("__kafka_offset", 100L)
);

java.sql.Timestamp expectedTimestamp = new java.sql.Timestamp(10002010L * 1000);
expectedTimestamp.setNanos(1000);
List<RecordWrapper> recordWrappers = recordConverter.convert(Collections.singletonList(message));

Assertions.assertThat(recordWrappers).size().isEqualTo(1);
Expand All @@ -114,6 +120,7 @@ public void shouldConvertMessageToRecordWrapper() {
Arrays.asList("name_2", 50f)
)
)),
expectedTimestamp,
new java.sql.Timestamp(123012311L),
"topic",
100L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.converter.ConverterOrchestrator;
import com.gotocompany.depot.maxcompute.model.MaxComputeSchema;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory;
import com.sun.tools.javac.util.List;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
Expand All @@ -30,8 +31,13 @@ public void shouldBuildPartitionedTableSchemaWithRootLevelMetadata() {
);
Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.TRUE);
Mockito.when(maxComputeSinkConfig.getTablePartitionKey()).thenReturn("event_timestamp");
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), maxComputeSinkConfig);
int expectedNonPartitionColumnCount = 6;
Mockito.when(maxComputeSinkConfig.getTablePartitionColumnName()).thenReturn("__partitioning_column");
PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory(
new ConverterOrchestrator(), maxComputeSinkConfig
);
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(),
maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy(descriptor));
int expectedNonPartitionColumnCount = 7;
int expectedPartitionColumnCount = 1;

MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor);
Expand All @@ -53,13 +59,14 @@ public void shouldBuildPartitionedTableSchemaWithRootLevelMetadata() {
List.of("id", "name"),
List.of(TypeInfoFactory.STRING, TypeInfoFactory.STRING)
))),
Tuple.tuple("event_timestamp", TypeInfoFactory.TIMESTAMP),
Tuple.tuple("__message_timestamp", TypeInfoFactory.TIMESTAMP),
Tuple.tuple("__kafka_topic", TypeInfoFactory.STRING),
Tuple.tuple("__kafka_offset", TypeInfoFactory.BIGINT)
);
Assertions.assertThat(maxComputeSchema.getTableSchema().getPartitionColumns())
.extracting("name", "typeInfo")
.contains(Tuple.tuple("event_timestamp", TypeInfoFactory.TIMESTAMP));
.contains(Tuple.tuple("__partitioning_column", TypeInfoFactory.STRING));
}

@Test
Expand All @@ -75,9 +82,14 @@ public void shouldBuildPartitionedTableSchemaWithNestedMetadata() {
);
Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.TRUE);
Mockito.when(maxComputeSinkConfig.getTablePartitionKey()).thenReturn("event_timestamp");
int expectedNonPartitionColumnCount = 4;
Mockito.when(maxComputeSinkConfig.getTablePartitionColumnName()).thenReturn("__partitioning_column");
int expectedNonPartitionColumnCount = 5;
int expectedPartitionColumnCount = 1;
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), maxComputeSinkConfig);
PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory(
new ConverterOrchestrator(), maxComputeSinkConfig
);
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(
new ConverterOrchestrator(), maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy(descriptor));

MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor);

Expand All @@ -98,14 +110,15 @@ public void shouldBuildPartitionedTableSchemaWithNestedMetadata() {
List.of("id", "name"),
List.of(TypeInfoFactory.STRING, TypeInfoFactory.STRING)
))),
Tuple.tuple("event_timestamp", TypeInfoFactory.TIMESTAMP),
Tuple.tuple("meta", TypeInfoFactory.getStructTypeInfo(
List.of("__message_timestamp", "__kafka_topic", "__kafka_offset"),
List.of(TypeInfoFactory.TIMESTAMP, TypeInfoFactory.STRING, TypeInfoFactory.BIGINT)
))
);
Assertions.assertThat(maxComputeSchema.getTableSchema().getPartitionColumns())
.extracting("name", "typeInfo")
.contains(Tuple.tuple("event_timestamp", TypeInfoFactory.TIMESTAMP));
.contains(Tuple.tuple("__partitioning_column", TypeInfoFactory.STRING));
}

@Test
Expand All @@ -115,7 +128,11 @@ public void shouldBuildTableSchemaWithoutPartitionAndMeta() {
Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.FALSE);
int expectedNonPartitionColumnCount = 4;
int expectedPartitionColumnCount = 0;
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), maxComputeSinkConfig);
PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory(
new ConverterOrchestrator(), maxComputeSinkConfig
);
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(),
maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy(descriptor));

MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor);

Expand Down Expand Up @@ -152,7 +169,11 @@ public void shouldThrowIllegalArgumentExceptionWhenPartitionKeyIsNotFound() {
);
Mockito.when(maxComputeSinkConfig.isTablePartitioningEnabled()).thenReturn(Boolean.TRUE);
Mockito.when(maxComputeSinkConfig.getTablePartitionKey()).thenReturn("non_existent_partition_key");
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(), maxComputeSinkConfig);
PartitioningStrategyFactory partitioningStrategyFactory = new PartitioningStrategyFactory(
new ConverterOrchestrator(), maxComputeSinkConfig
);
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(new ConverterOrchestrator(),
maxComputeSinkConfig, partitioningStrategyFactory.createPartitioningStrategy(descriptor));

maxComputeSchemaHelper.buildMaxComputeSchema(descriptor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private void instantiateProtoDataColumnRecordDecorator(SinkConfig sinkConfig, Ma
ConverterOrchestrator converterOrchestrator = new ConverterOrchestrator();
maxComputeSchemaHelper = new MaxComputeSchemaHelper(
converterOrchestrator,
maxComputeSinkConfig
maxComputeSinkConfig,
null
);

MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(DESCRIPTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void setup() {
private void initializeDecorator(MaxComputeSinkConfig maxComputeSinkConfig) {
this.maxComputeSinkConfig = maxComputeSinkConfig;
ConverterOrchestrator converterOrchestrator = new ConverterOrchestrator();
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(converterOrchestrator, maxComputeSinkConfig);
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(converterOrchestrator, maxComputeSinkConfig, null);
MaxComputeSchema maxComputeSchema = maxComputeSchemaHelper.buildMaxComputeSchema(descriptor);
maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class);
Mockito.when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema);
Expand Down

0 comments on commit 020da68

Please sign in to comment.