Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat max compute sink payload mappers #51

Closed
wants to merge 51 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
12a8fe7
feat: Add Type Info Converter Implementation
ekawinataa Oct 11, 2024
27b8a7b
feat: Add Test
ekawinataa Oct 11, 2024
512e0b6
feat: Complete test for MessageTypeInfoConverterTest
ekawinataa Oct 11, 2024
ca51108
feat: Complete test for TimestampTypeInfoConverter
ekawinataa Oct 11, 2024
468a00e
feat: Complete test for StructTypeInfoConverter.java
ekawinataa Oct 11, 2024
d0228dd
feat: Complete test for DurationTypeInfoConverter.java
ekawinataa Oct 11, 2024
14bdd12
feat: Complete test for BaseTypeInfoConverterTest.java
ekawinataa Oct 11, 2024
53def4a
chore: Restructure Package
ekawinataa Oct 15, 2024
0ea432a
chore: wrap type
ekawinataa Oct 15, 2024
3d5ee38
feat: Add TableUtil test and its test
ekawinataa Oct 15, 2024
9ffb56d
feat: Add sink config
ekawinataa Oct 16, 2024
1e39362
test: complete TableUtil test cases
ekawinataa Oct 16, 2024
a624069
chore: remove payload converter from branch
ekawinataa Oct 16, 2024
5f21f01
chore: checkstyle main
ekawinataa Oct 16, 2024
adbb27f
chore: checkstyle test
ekawinataa Oct 16, 2024
74e71b1
chore: add newline between action and assert
ekawinataa Oct 16, 2024
9b9ba32
feat: add Primitive converter and its test
ekawinataa Oct 16, 2024
a407e41
feat: add Message converter and its test
ekawinataa Oct 16, 2024
1fab8df
test: add more test for PrimitivePayloadConverter.java
ekawinataa Oct 17, 2024
25d252f
test: add payload converter test
ekawinataa Oct 17, 2024
7275fa2
test: add convertSingular on typeInfo
ekawinataa Oct 17, 2024
14c2842
test: add message converter test
ekawinataa Oct 17, 2024
719047c
chore: remove unused method
ekawinataa Oct 17, 2024
7103cb9
refactor: Remove Base converter and add it to ConverterOrchestrator
ekawinataa Oct 18, 2024
d3874f7
feat: add caching on ConverterOrchestrator
ekawinataa Oct 21, 2024
cf6c6a1
FEAT: ADD RECORD Converter
ekawinataa Oct 21, 2024
0a6447e
feat: MaxComputeSchemaCache
ekawinataa Oct 21, 2024
38b7daf
refactor: change TableUtil structure
ekawinataa Oct 21, 2024
64fa4d0
refactor: rename tableutil to table helper
ekawinataa Oct 21, 2024
1c227b7
refactor: change TableHelper to MaxComputeSchemaHelper
ekawinataa Oct 21, 2024
cb67c70
refactor: add MaxComputeSchemaCache refresh implementation
ekawinataa Oct 21, 2024
e1cfd2e
test: add test for ProtoDataColumnRecordDecorator
ekawinataa Oct 22, 2024
783d534
test: change timestamp_ntz to timestamp
ekawinataa Oct 22, 2024
205016f
test: refactor ProtoDataColumnRecordDecorator
ekawinataa Oct 22, 2024
dd60be4
test: complete test for ProtoMetadataColumnRecordDecorator
ekawinataa Oct 22, 2024
f78a7b1
test: add test for RecordDecoratorFactory
ekawinataa Oct 22, 2024
3db4ae0
test: complete test for RecordConverter.java
ekawinataa Oct 22, 2024
38f6fd0
chore: remove unused interface method
ekawinataa Oct 22, 2024
0922540
chore: remove implementation of interface
ekawinataa Oct 22, 2024
1b81db0
test: add unit test MetadataUtil
ekawinataa Oct 22, 2024
55596e4
chore: rename default column to metadata
ekawinataa Oct 22, 2024
0ca323b
chore: refactor MaxComputeSchemaCache
ekawinataa Oct 24, 2024
792eeb8
feat: add PartitioningStrategy
ekawinataa Oct 24, 2024
020da68
feat: add partitioning strategy
ekawinataa Oct 24, 2024
1cb812c
feat: add MaxComputeClient
ekawinataa Oct 24, 2024
7eb3404
feat:
ekawinataa Oct 25, 2024
a8f2c6d
feat:
ekawinataa Oct 25, 2024
3411ab0
feat:
ekawinataa Oct 25, 2024
44e0417
feat: add MaxComputeClient.java
ekawinataa Oct 25, 2024
8c7a2cd
feat: add MaxComputeSink and update RecordConverter
ekawinataa Oct 25, 2024
6c35778
test: add PartitioningStrategyFactory test
ekawinataa Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies {
implementation(group: 'com.google.cloud', name: 'google-cloud-bigtable', version: '2.24.1') {
exclude group: "io.grpc"
}
implementation group: 'com.aliyun.odps', name: 'odps-sdk-core', version: '0.48.8-public'
implementation 'io.grpc:grpc-all:1.55.1'
implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35'
implementation group: 'redis.clients', name: 'jedis', version: '3.10.0'
Expand All @@ -54,6 +55,7 @@ dependencies {
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.1'
implementation group: 'joda-time', name: 'joda-time', version: '2.10.2'
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.26.3'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.mockito:mockito-core:4.5.1'
testImplementation 'com.github.tomakehurst:wiremock:2.16.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.gotocompany.depot.config;

import com.gotocompany.depot.common.TupleString;
import com.gotocompany.depot.config.converter.ConfToListConverter;
import org.aeonbits.owner.Config;

import java.util.List;

public interface MaxComputeSinkConfig extends Config {
@Key("SINK_MAXCOMPUTE_ODPS_URL")
String getMaxComputeOdpsUrl();

@Key("SINK_MAXCOMPUTE_TUNNEL_URL")
String getMaxComputeTunnelUrl();

@Key("SINK_MAXCOMPUTE_ACCESS_ID")
String getMaxComputeAccessId();

@Key("SINK_MAXCOMPUTE_ACCESS_KEY")
String getMaxComputeAccessKey();

@Key("SINK_MAXCOMPUTE_PROJECT_ID")
String getMaxComputeProjectId();

@Key("SINK_MAXCOMPUTE_METADATA_NAMESPACE")
@DefaultValue("")
String getMaxcomputeMetadataNamespace();

@DefaultValue("true")
@Key("SINK_MAXCOMPUTE_ADD_METADATA_ENABLED")
boolean shouldAddMetadata();

@DefaultValue("")
@Key("SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES")
@ConverterClass(ConfToListConverter.class)
@Separator(ConfToListConverter.ELEMENT_SEPARATOR)
List<TupleString> getMetadataColumnsTypes();

@Key("SINK_MAXCOMPUTE_SCHEMA")
@DefaultValue("default")
String getMaxComputeSchema();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE")
@DefaultValue("false")
Boolean isTablePartitioningEnabled();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITION_KEY")
String getTablePartitionKey();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME")
String getTablePartitionColumnName();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITION_BY_TIMESTAMP_TIMEZONE")
@DefaultValue("UTC+7")
String getTablePartitionByTimestampTimezone();

@Key("SINK_MAX_COMPUTE_TABLE_PARTITION_BY_TIMESTAMP_ZONE_OFFSET")
@DefaultValue("+07:00")
String getTablePartitionByTimestampZoneOffset();

@Key("SINK_MAXCOMPUTE_TABLE_NAME")
String getMaxComputeTableName();

@Key("SINK_MAXCOMPUTE_TABLE_LIFECYCLE_DAYS")
Long getMaxComputeTableLifecycleDays();

@Key("SINK_MAXCOMPUTE_RECORD_PACK_FLUSH_TIMEOUT")
@DefaultValue("-1")
Long getMaxComputeRecordPackFlushTimeout();

}
41 changes: 41 additions & 0 deletions src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.gotocompany.depot.maxcompute;

import com.gotocompany.depot.Sink;
import com.gotocompany.depot.SinkResponse;
import com.gotocompany.depot.error.ErrorInfo;
import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.depot.exception.SinkException;
import com.gotocompany.depot.maxcompute.client.MaxComputeClient;
import com.gotocompany.depot.maxcompute.converter.record.RecordConverter;
import com.gotocompany.depot.maxcompute.model.RecordWrappers;
import com.gotocompany.depot.message.Message;
import lombok.RequiredArgsConstructor;

import java.io.IOException;
import java.util.List;

@RequiredArgsConstructor
public class MaxComputeSink implements Sink {

private final MaxComputeClient maxComputeClient;
private final RecordConverter recordConverter;

@Override
public SinkResponse pushToSink(List<Message> messages) throws SinkException {
SinkResponse sinkResponse = new SinkResponse();
RecordWrappers recordWrappers = recordConverter.convert(messages);
recordWrappers.getInvalidRecords()
.forEach(invalidRecord -> sinkResponse.getErrors().put(invalidRecord.getIndex(), invalidRecord.getErrorInfo()));
try {
maxComputeClient.insert(recordWrappers.getValidRecords());
} catch (Exception e) {
recordWrappers.getValidRecords()
.forEach(validRecord -> sinkResponse.getErrors().put(validRecord.getIndex(), new ErrorInfo(e, ErrorType.DEFAULT_ERROR)));
}
return sinkResponse;
}

@Override
public void close() throws IOException {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.gotocompany.depot.maxcompute.client;

import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.client.insert.InsertManager;
import com.gotocompany.depot.maxcompute.client.insert.NonPartitionedInsertManager;
import com.gotocompany.depot.maxcompute.client.insert.PartitionedInsertManager;
import com.gotocompany.depot.maxcompute.model.RecordWrapper;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MaxComputeClient {

private final Odps odps;
private final MaxComputeSinkConfig maxComputeSinkConfig;
private final TableTunnel tableTunnel;
private final PartitioningStrategy partitioningStrategy;
private final InsertManager insertManager;

public MaxComputeClient(MaxComputeSinkConfig maxComputeSinkConfig,
PartitioningStrategy partitioningStrategy) {
this.maxComputeSinkConfig = maxComputeSinkConfig;
this.partitioningStrategy = partitioningStrategy;
this.odps = initializeOdps();
this.tableTunnel = new TableTunnel(odps);
this.insertManager = initializeInsertManager();
}

public void upsertTable(TableSchema tableSchema) throws OdpsException {
String tableName = maxComputeSinkConfig.getMaxComputeTableName();
if (!this.odps.tables().exists(tableName)) {
this.odps.tables().create(odps.getDefaultProject(), tableName, tableSchema, "",
false, maxComputeSinkConfig.getMaxComputeTableLifecycleDays(),
null, null);
}
}

public void insert(List<RecordWrapper> recordWrappers) {
try {
insertManager.insert(recordWrappers);
} catch (Exception e) {
throw new RuntimeException("Failed to insert records into MaxCompute", e);
}
}

private Odps initializeOdps() {
Account account = new AliyunAccount(maxComputeSinkConfig.getMaxComputeAccessId(), maxComputeSinkConfig.getMaxComputeAccessKey());
Odps odpsClient = new Odps(account);
odpsClient.setDefaultProject(maxComputeSinkConfig.getMaxComputeProjectId());
odpsClient.setEndpoint(maxComputeSinkConfig.getMaxComputeOdpsUrl());
odpsClient.setCurrentSchema(maxComputeSinkConfig.getMaxComputeSchema());
odpsClient.setGlobalSettings(getGlobalSettings());
return odpsClient;
}

private InsertManager initializeInsertManager() {
if (maxComputeSinkConfig.isTablePartitioningEnabled()) {
return new PartitionedInsertManager(partitioningStrategy, tableTunnel, maxComputeSinkConfig);
}
return new NonPartitionedInsertManager(tableTunnel, maxComputeSinkConfig);
}

private Map<String, String> getGlobalSettings() {
Map<String, String> globalSettings = new HashMap<>();
globalSettings.put("setproject odps.schema.evolution.enable", "true");
globalSettings.put("odps.namespace.schema", "true");
return globalSettings;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.gotocompany.depot.maxcompute.client.insert;

import com.aliyun.odps.tunnel.TunnelException;
import com.gotocompany.depot.maxcompute.model.RecordWrapper;

import java.io.IOException;
import java.util.List;

public interface InsertManager {
void insert(List<RecordWrapper> recordWrappers) throws TunnelException, IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.gotocompany.depot.maxcompute.client.insert;

import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.model.RecordWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;

@RequiredArgsConstructor
@Slf4j
public class NonPartitionedInsertManager implements InsertManager {

private final TableTunnel tableTunnel;
private final MaxComputeSinkConfig maxComputeSinkConfig;

@Override
public void insert(List<RecordWrapper> recordWrappers) throws TunnelException, IOException {
TableTunnel.StreamUploadSession streamUploadSession = getStreamUploadSession();
TableTunnel.StreamRecordPack recordPack = streamUploadSession.newRecordPack();
for (RecordWrapper recordWrapper : recordWrappers) {
recordPack.append(recordWrapper.getRecord());
}
TableTunnel.FlushResult flushResult = recordPack.flush(
new TableTunnel.FlushOption()
.timeout(maxComputeSinkConfig.getMaxComputeRecordPackFlushTimeout()));
log.info("Flushed {} records", flushResult.getRecordCount());
}

private TableTunnel.StreamUploadSession getStreamUploadSession() throws TunnelException {
return tableTunnel.buildStreamUploadSession(maxComputeSinkConfig.getMaxComputeProjectId(),
maxComputeSinkConfig.getMaxComputeTableName())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.gotocompany.depot.maxcompute.client.insert;

import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.model.RecordWrapper;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@RequiredArgsConstructor
@Slf4j
public class PartitionedInsertManager implements InsertManager {

private final PartitioningStrategy partitioningStrategy;
private final TableTunnel tableTunnel;
private final MaxComputeSinkConfig maxComputeSinkConfig;

@Override
public void insert(List<RecordWrapper> recordWrappers) throws TunnelException, IOException {
Map<PartitionSpec, List<RecordWrapper>> partitionSpecRecordWrapperMap = recordWrappers.stream()
.collect(Collectors.groupingBy(partitioningStrategy::getPartitionSpec));
for (Map.Entry<PartitionSpec, List<RecordWrapper>> entry : partitionSpecRecordWrapperMap.entrySet()) {
TableTunnel.StreamUploadSession streamUploadSession = getStreamUploadSession(entry.getKey());
TableTunnel.StreamRecordPack recordPack = streamUploadSession.newRecordPack();
for (RecordWrapper recordWrapper : entry.getValue()) {
recordPack.append(recordWrapper.getRecord());
}
TableTunnel.FlushResult flushResult = recordPack.flush(
new TableTunnel.FlushOption()
.timeout(maxComputeSinkConfig.getMaxComputeRecordPackFlushTimeout()));
log.info("Flushed {} records to partition {}", flushResult.getRecordCount(), entry.getKey());
}
}

private TableTunnel.StreamUploadSession getStreamUploadSession(PartitionSpec partitionSpec) throws TunnelException {
return tableTunnel.buildStreamUploadSession(maxComputeSinkConfig.getMaxComputeProjectId(),
maxComputeSinkConfig.getMaxComputeTableName())
.setPartitionSpec(partitionSpec)
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.gotocompany.depot.maxcompute.converter;

import com.aliyun.odps.type.TypeInfo;
import com.google.protobuf.Descriptors;
import com.gotocompany.depot.maxcompute.converter.payload.DurationPayloadConverter;
import com.gotocompany.depot.maxcompute.converter.payload.MessagePayloadConverter;
import com.gotocompany.depot.maxcompute.converter.payload.PayloadConverter;
import com.gotocompany.depot.maxcompute.converter.payload.PrimitivePayloadConverter;
import com.gotocompany.depot.maxcompute.converter.payload.StructPayloadConverter;
import com.gotocompany.depot.maxcompute.converter.payload.TimestampPayloadConverter;
import com.gotocompany.depot.maxcompute.converter.type.DurationTypeInfoConverter;
import com.gotocompany.depot.maxcompute.converter.type.MessageTypeInfoConverter;
import com.gotocompany.depot.maxcompute.converter.type.PrimitiveTypeInfoConverter;
import com.gotocompany.depot.maxcompute.converter.type.StructTypeInfoConverter;
import com.gotocompany.depot.maxcompute.converter.type.TimestampTypeInfoConverter;
import com.gotocompany.depot.maxcompute.converter.type.TypeInfoConverter;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ConverterOrchestrator {

private final List<TypeInfoConverter> typeInfoConverters;
private final List<PayloadConverter> payloadConverters;
private final Map<String, TypeInfo> typeInfoCache;

public ConverterOrchestrator() {
typeInfoConverters = new ArrayList<>();
payloadConverters = new ArrayList<>();
typeInfoCache = new ConcurrentHashMap<>();
initializeConverters();
}

public TypeInfo convert(Descriptors.FieldDescriptor fieldDescriptor) {
return typeInfoCache.computeIfAbsent(fieldDescriptor.getFullName(), key -> typeInfoConverters.stream()
.filter(converter -> converter.canConvert(fieldDescriptor))
.findFirst()
.map(converter -> converter.convert(fieldDescriptor))
.orElseThrow(() -> new IllegalArgumentException("Unsupported type: " + fieldDescriptor.getType())));
}

public Object convert(Descriptors.FieldDescriptor fieldDescriptor, Object object) {
return payloadConverters.stream()
.filter(converter -> converter.canConvert(fieldDescriptor))
.findFirst()
.map(converter -> converter.convert(fieldDescriptor, object))
.orElseThrow(() -> new IllegalArgumentException("Unsupported type: " + fieldDescriptor.getType()));
}

private void initializeConverters() {
PrimitiveTypeInfoConverter primitiveTypeInfoConverter = new PrimitiveTypeInfoConverter();
DurationTypeInfoConverter durationTypeInfoConverter = new DurationTypeInfoConverter();
StructTypeInfoConverter structTypeInfoConverter = new StructTypeInfoConverter();
TimestampTypeInfoConverter timestampTypeInfoConverter = new TimestampTypeInfoConverter();
MessageTypeInfoConverter messageTypeInfoConverter = new MessageTypeInfoConverter(typeInfoConverters);

typeInfoConverters.add(primitiveTypeInfoConverter);
typeInfoConverters.add(durationTypeInfoConverter);
typeInfoConverters.add(structTypeInfoConverter);
typeInfoConverters.add(timestampTypeInfoConverter);
typeInfoConverters.add(messageTypeInfoConverter);

payloadConverters.add(new PrimitivePayloadConverter(primitiveTypeInfoConverter));
payloadConverters.add(new DurationPayloadConverter(durationTypeInfoConverter));
payloadConverters.add(new StructPayloadConverter(structTypeInfoConverter));
payloadConverters.add(new TimestampPayloadConverter(timestampTypeInfoConverter));
payloadConverters.add(new MessagePayloadConverter(messageTypeInfoConverter, payloadConverters));
}

}
Loading
Loading