Skip to content

Commit

Permalink
feat: get implementation diff from feat branch
Browse files Browse the repository at this point in the history
  • Loading branch information
ekawinataa committed Oct 29, 2024
1 parent 14bdd12 commit 31357f1
Show file tree
Hide file tree
Showing 67 changed files with 3,431 additions and 156 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dependencies {
implementation(group: 'com.google.cloud', name: 'google-cloud-bigtable', version: '2.24.1') {
exclude group: "io.grpc"
}
implementation group: 'com.aliyun.odps', name: 'odps-sdk-core', version: '0.48.8-public'
implementation group: 'com.aliyun.odps', name: 'odps-sdk-core', version: '0.50.3-public'
implementation 'io.grpc:grpc-all:1.55.1'
implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35'
implementation group: 'redis.clients', name: 'jedis', version: '3.10.0'
Expand All @@ -54,7 +54,10 @@ dependencies {
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.8.0'
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.1'
implementation group: 'joda-time', name: 'joda-time', version: '2.10.2'
// need to take this out as well
implementation 'ch.qos.logback:logback-classic:1.4.11'
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.26.3'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.mockito:mockito-core:4.5.1'
testImplementation 'com.github.tomakehurst:wiremock:2.16.0'
Expand Down
76 changes: 76 additions & 0 deletions src/main/java/com/gotocompany/depot/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.gotocompany.depot;

import com.google.protobuf.Timestamp;
import com.gotocompany.depot.common.Tuple;
import com.gotocompany.depot.exception.SinkException;
import com.gotocompany.depot.maxcompute.MaxComputeSinkFactory;
import com.gotocompany.depot.message.Message;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.timgroup.statsd.NoOpStatsDClient;
import deduction.HttpRequest;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Slf4j
public class Main {
public static void main(String[] args) throws SinkException {
StatsDReporter statsDReporter = new StatsDReporter(new NoOpStatsDClient());
MaxComputeSinkFactory maxComputeSinkFactory = new MaxComputeSinkFactory(statsDReporter, getMockEnv());
maxComputeSinkFactory.init();
Sink sink = maxComputeSinkFactory.create();

while (true) {
int batchCount = Math.max(1, new Random().nextInt() % 20);

List<Message> messageList = IntStream.range(0, batchCount)
.mapToObj(index -> {
byte[] messageBytes = HttpRequest.newBuilder()
.setField1(UUID.randomUUID().toString())
.setField2(UUID.randomUUID().toString())
.setEventTimestamp(Timestamp.newBuilder()
.setSeconds(System.currentTimeMillis() / 1000)
.setNanos(0)
.build())
.build().toByteArray();
return new Message(null, messageBytes, new Tuple<>("topic", "test"), new Tuple<>("partition", 0), new Tuple<>("offset", index), new Tuple<>("timestamp", System.currentTimeMillis()));
})
.collect(Collectors.toList());
sink.pushToSink(messageList);
log.info("Pushed {} messages to sink", batchCount);
}
}

private static Map<String, String> getMockEnv() {
Map<String, String> env = new HashMap<>();
env.put("SINK_CONNECTOR_SCHEMA_MESSAGE_MODE", "LOG_MESSAGE");
env.put("INPUT_SCHEMA_PROTO_CLASS", "deduction.HttpRequest");
env.put("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "deduction.HttpRequest");
env.put("SCHEMA_REGISTRY_STENCIL_ENABLE", "true");
env.put("SCHEMA_REGISTRY_STENCIL_URLS", "http://stencil.integration.gtfdata.io/v1beta1/namespaces/gtfn/schemas/depot");
env.put("SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS", "10000");
env.put("SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH", "true");
env.put("SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY", "LONG_POLLING");
env.put("SINK_MAXCOMPUTE_ODPS_URL", "http://service.ap-southeast-5.maxcompute.aliyun.com/api");
env.put("SINK_MAXCOMPUTE_TUNNEL_URL", "http://dt.ap-southeast-5.maxcompute.aliyun.com");
env.put("SINK_MAXCOMPUTE_ACCESS_ID", "");
env.put("SINK_MAXCOMPUTE_ACCESS_KEY", "");
env.put("SINK_MAXCOMPUTE_PROJECT_ID", "goto_test");
env.put("SINK_MAXCOMPUTE_SCHEMA", "default");
env.put("SINK_MAXCOMPUTE_METADATA_NAMESPACE", "__kafka_metadata");
env.put("SINK_MAXCOMPUTE_ADD_METADATA_ENABLED", "true");
env.put("SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES", "timestamp=timestamp,topic=string,partition=integer,offset=long");
env.put("SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE", "true");
env.put("SINK_MAXCOMPUTE_TABLE_PARTITION_KEY", "event_timestamp");
env.put("SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME", "__partition_key");
env.put("SINK_MAXCOMPUTE_TABLE_NAME", "depot_test_partitioned_1");
env.put("SINK_MAXCOMPUTE_TABLE_LIFECYCLE_DAYS", "100");
return env;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.gotocompany.depot.config;

import com.gotocompany.depot.common.TupleString;
import com.gotocompany.depot.config.converter.ConfToListConverter;
import org.aeonbits.owner.Config;

import java.util.List;

public interface MaxComputeSinkConfig extends Config {
@Key("SINK_MAXCOMPUTE_ODPS_URL")
String getMaxComputeOdpsUrl();

@Key("SINK_MAXCOMPUTE_TUNNEL_URL")
String getMaxComputeTunnelUrl();

@Key("SINK_MAXCOMPUTE_ACCESS_ID")
String getMaxComputeAccessId();

@Key("SINK_MAXCOMPUTE_ACCESS_KEY")
String getMaxComputeAccessKey();

@Key("SINK_MAXCOMPUTE_PROJECT_ID")
String getMaxComputeProjectId();

@Key("SINK_MAXCOMPUTE_METADATA_NAMESPACE")
@DefaultValue("")
String getMaxcomputeMetadataNamespace();

@DefaultValue("true")
@Key("SINK_MAXCOMPUTE_ADD_METADATA_ENABLED")
boolean shouldAddMetadata();

@DefaultValue("")
@Key("SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES")
@ConverterClass(ConfToListConverter.class)
@Separator(ConfToListConverter.ELEMENT_SEPARATOR)
List<TupleString> getMetadataColumnsTypes();

@Key("SINK_MAXCOMPUTE_SCHEMA")
@DefaultValue("default")
String getMaxComputeSchema();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE")
@DefaultValue("false")
Boolean isTablePartitioningEnabled();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITION_KEY")
String getTablePartitionKey();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME")
String getTablePartitionColumnName();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITION_BY_TIMESTAMP_TIMEZONE")
@DefaultValue("UTC+7")
String getTablePartitionByTimestampTimezone();

@Key("SINK_MAX_COMPUTE_TABLE_PARTITION_BY_TIMESTAMP_ZONE_OFFSET")
@DefaultValue("+07:00")
String getTablePartitionByTimestampZoneOffset();

@Key("SINK_MAXCOMPUTE_TABLE_NAME")
String getMaxComputeTableName();

@Key("SINK_MAXCOMPUTE_TABLE_LIFECYCLE_DAYS")
Long getMaxComputeTableLifecycleDays();

@Key("SINK_MAXCOMPUTE_RECORD_PACK_FLUSH_TIMEOUT")
@DefaultValue("-1")
Long getMaxComputeRecordPackFlushTimeout();

}
41 changes: 41 additions & 0 deletions src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.gotocompany.depot.maxcompute;

import com.gotocompany.depot.Sink;
import com.gotocompany.depot.SinkResponse;
import com.gotocompany.depot.error.ErrorInfo;
import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.depot.exception.SinkException;
import com.gotocompany.depot.maxcompute.client.MaxComputeClient;
import com.gotocompany.depot.maxcompute.converter.record.RecordConverter;
import com.gotocompany.depot.maxcompute.model.RecordWrappers;
import com.gotocompany.depot.message.Message;
import lombok.RequiredArgsConstructor;

import java.io.IOException;
import java.util.List;

@RequiredArgsConstructor
public class MaxComputeSink implements Sink {

private final MaxComputeClient maxComputeClient;
private final RecordConverter recordConverter;

@Override
public SinkResponse pushToSink(List<Message> messages) throws SinkException {
SinkResponse sinkResponse = new SinkResponse();
RecordWrappers recordWrappers = recordConverter.convert(messages);
recordWrappers.getInvalidRecords()
.forEach(invalidRecord -> sinkResponse.getErrors().put(invalidRecord.getIndex(), invalidRecord.getErrorInfo()));
try {
maxComputeClient.insert(recordWrappers.getValidRecords());
} catch (Exception e) {
recordWrappers.getValidRecords()
.forEach(validRecord -> sinkResponse.getErrors().put(validRecord.getIndex(), new ErrorInfo(e, ErrorType.DEFAULT_ERROR)));
}
return sinkResponse;
}

@Override
public void close() throws IOException {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.gotocompany.depot.maxcompute;

import com.google.protobuf.Descriptors;
import com.gotocompany.depot.Sink;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.config.SinkConfig;
import com.gotocompany.depot.maxcompute.client.MaxComputeClient;
import com.gotocompany.depot.maxcompute.converter.ConverterOrchestrator;
import com.gotocompany.depot.maxcompute.converter.record.RecordConverter;
import com.gotocompany.depot.maxcompute.helper.MaxComputeSchemaHelper;
import com.gotocompany.depot.maxcompute.record.RecordDecorator;
import com.gotocompany.depot.maxcompute.record.RecordDecoratorFactory;
import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory;
import com.gotocompany.depot.message.MessageParser;
import com.gotocompany.depot.message.MessageParserFactory;
import com.gotocompany.depot.message.SinkConnectorSchemaMessageMode;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.depot.utils.StencilUtils;
import com.gotocompany.stencil.StencilClientFactory;
import com.gotocompany.stencil.client.StencilClient;
import com.gotocompany.stencil.config.StencilConfig;
import org.aeonbits.owner.ConfigFactory;

import java.util.Map;

public class MaxComputeSinkFactory {

private final PartitioningStrategyFactory partitioningStrategyFactory;
private final MaxComputeSinkConfig maxComputeSinkConfig;
private final SinkConfig sinkConfig;
private final StatsDReporter statsDReporter;
private final ConverterOrchestrator converterOrchestrator;
private MaxComputeClient maxComputeClient;
private MaxComputeSchemaCache maxComputeSchemaCache;
private PartitioningStrategy partitioningStrategy;

public MaxComputeSinkFactory(StatsDReporter statsDReporter, Map<String, String> env) {
this.statsDReporter = statsDReporter;
this.maxComputeSinkConfig = ConfigFactory.create(MaxComputeSinkConfig.class, env);
this.sinkConfig = ConfigFactory.create(SinkConfig.class, env);
this.converterOrchestrator = new ConverterOrchestrator();
this.partitioningStrategyFactory = new PartitioningStrategyFactory(converterOrchestrator, maxComputeSinkConfig);
}

public void init() {
String schemaClass = SinkConnectorSchemaMessageMode.LOG_MESSAGE == sinkConfig.getSinkConnectorSchemaMessageMode() ? sinkConfig.getSinkConnectorSchemaProtoMessageClass() : sinkConfig.getSinkConnectorSchemaProtoKeyClass();
StencilClient stencilClient = getStencilClient(statsDReporter);
Descriptors.Descriptor descriptor = stencilClient.get(schemaClass);
this.partitioningStrategy = partitioningStrategyFactory.createPartitioningStrategy(descriptor);
this.maxComputeClient = new MaxComputeClient(maxComputeSinkConfig);
MaxComputeSchemaHelper maxComputeSchemaHelper = new MaxComputeSchemaHelper(converterOrchestrator, maxComputeSinkConfig, partitioningStrategy);
this.maxComputeSchemaCache = new MaxComputeSchemaCache(maxComputeSchemaHelper, sinkConfig, converterOrchestrator, maxComputeClient);
MessageParser messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter, maxComputeSchemaCache);
this.maxComputeSchemaCache.setMessageParser(messageParser);
this.maxComputeSchemaCache.updateSchema();
}

public Sink create() {
RecordConverter recordConverter = new RecordConverter(buildRecordDecorator(), maxComputeSchemaCache);
return new MaxComputeSink(maxComputeClient, recordConverter);
}

private RecordDecorator buildRecordDecorator() {
MessageParser messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter, maxComputeSchemaCache);
return RecordDecoratorFactory.createRecordDecorator(
converterOrchestrator,
maxComputeSchemaCache,
messageParser,
partitioningStrategy,
maxComputeSinkConfig,
sinkConfig
);
}

private StencilClient getStencilClient(StatsDReporter reporter) {
StencilConfig stencilConfig = StencilUtils.getStencilConfig(sinkConfig, reporter.getClient(), null);
if (sinkConfig.isSchemaRegistryStencilEnable()) {
return StencilClientFactory.getClient(sinkConfig.getSchemaRegistryStencilUrls(), stencilConfig);
} else {
return StencilClientFactory.getClient();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.gotocompany.depot.maxcompute.client;

import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.client.insert.InsertManager;
import com.gotocompany.depot.maxcompute.client.insert.NonPartitionedInsertManager;
import com.gotocompany.depot.maxcompute.client.insert.PartitionedInsertManager;
import com.gotocompany.depot.maxcompute.model.RecordWrapper;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MaxComputeClient {

private final Odps odps;
private final MaxComputeSinkConfig maxComputeSinkConfig;
private final TableTunnel tableTunnel;
private final InsertManager insertManager;

public MaxComputeClient(MaxComputeSinkConfig maxComputeSinkConfig) {
this.maxComputeSinkConfig = maxComputeSinkConfig;
this.odps = initializeOdps();
this.tableTunnel = new TableTunnel(odps);
this.tableTunnel.setEndpoint(maxComputeSinkConfig.getMaxComputeTunnelUrl());
this.insertManager = initializeInsertManager();
}

public void upsertTable(TableSchema tableSchema) throws OdpsException {
String tableName = maxComputeSinkConfig.getMaxComputeTableName();
if (!this.odps.tables().exists(tableName)) {
this.odps.tables().create(odps.getDefaultProject(), tableName, tableSchema, "",
false, maxComputeSinkConfig.getMaxComputeTableLifecycleDays(),
null, null);
}
}

public void insert(List<RecordWrapper> recordWrappers) {
try {
insertManager.insert(recordWrappers);
} catch (Exception e) {
throw new RuntimeException("Failed to insert records into MaxCompute", e);
}
}

private Odps initializeOdps() {
Account account = new AliyunAccount(maxComputeSinkConfig.getMaxComputeAccessId(), maxComputeSinkConfig.getMaxComputeAccessKey());
Odps odpsClient = new Odps(account);
odpsClient.setDefaultProject(maxComputeSinkConfig.getMaxComputeProjectId());
odpsClient.setEndpoint(maxComputeSinkConfig.getMaxComputeOdpsUrl());
odpsClient.setCurrentSchema(maxComputeSinkConfig.getMaxComputeSchema());
odpsClient.setGlobalSettings(getGlobalSettings());
return odpsClient;
}

private InsertManager initializeInsertManager() {
if (maxComputeSinkConfig.isTablePartitioningEnabled()) {
return new PartitionedInsertManager(tableTunnel, maxComputeSinkConfig);
}
return new NonPartitionedInsertManager(tableTunnel, maxComputeSinkConfig);
}

private Map<String, String> getGlobalSettings() {
Map<String, String> globalSettings = new HashMap<>();
globalSettings.put("setproject odps.schema.evolution.enable", "true");
globalSettings.put("odps.namespace.schema", "true");
return globalSettings;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.gotocompany.depot.maxcompute.client.insert;

import com.aliyun.odps.tunnel.TunnelException;
import com.gotocompany.depot.maxcompute.model.RecordWrapper;

import java.io.IOException;
import java.util.List;

public interface InsertManager {
void insert(List<RecordWrapper> recordWrappers) throws TunnelException, IOException;
}
Loading

0 comments on commit 31357f1

Please sign in to comment.