Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: MaxCompute Sink #55

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.10.7'
version '0.11.1'

def projName = "firehose"

Expand Down Expand Up @@ -100,7 +100,7 @@ dependencies {
implementation platform('com.google.cloud:libraries-bom:20.5.0')
implementation 'com.google.cloud:google-cloud-storage:2.20.1'
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation group: 'com.gotocompany', name: 'depot', version: '0.9.2'
implementation group: 'com.gotocompany', name: 'depot', version: '0.10.1'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
implementation 'dev.cel:cel:0.5.2'

Expand Down
54 changes: 54 additions & 0 deletions docs/docs/sinks/maxcompute-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# MaxCompute sink

### Datatype Protobuf

MaxCompute sink has several responsibilities, including :

1. Creation of MaxCompute table if it does not exist.
2. Updating the MaxCompute table schema based on the latest protobuf schema.
3. Translating protobuf messages into MaxCompute compatible records and inserting them into MaxCompute tables.

## MaxCompute Table Schema Update

### Protobuf

MaxCompute Sink update the MaxCompute table schema on separate table update operation. MaxCompute
utilise [Stencil](https://github.com/goto/stencil) to parse protobuf messages generate schema and update MaxCompute
tables with the latest schema.
The stencil client periodically reload the descriptor cache. Table schema update happened after the descriptor caches
uploaded.

#### Supported Protobuf - MaxCompute Table Type Mapping

| Protobuf Type | MaxCompute Type |
|------------------------------------------------------------------------------------|-----------------------------|
| bytes | BINARY |
| string | STRING |
| enum | STRING |
| float | FLOAT |
| double | DOUBLE |
| bool | BOOLEAN |
| int64, uint64, int32, uint32, fixed64, fixed32, sfixed64, sfixed32, sint64, sint32 | BIGINT |
| message | STRUCT |
| .google.protobuf.Timestamp | TIMESTAMP_NTZ |
| .google.protobuf.Struct | STRING (Json Serialised) |
| .google.protobuf.Duration | STRUCT |
| map<k,v> | ARRAY<STRUCT<key:k, value:v |

## Partitioning

MaxCompute Sink supports creation of table with partition configuration. Currently, MaxCompute Sink supports primitive field(STRING, TINYINT, SMALLINT, BIGINT)
and timestamp field based partitioning. Timestamp based partitioning strategy introduces a pseudo-partition column with the value of the timestamp field truncated to the nearest start of day.

## Clustering

MaxCompute Sink currently does not support clustering.

## Metadata

For data quality checking purposes, sometimes some metadata need to be added on the record.
if `SINK_MAXCOMPUTE_ADD_METADATA_ENABLED` is true then the metadata will be added.
`SINK_MAXCOMPUTE_METADATA_NAMESPACE` is used for another namespace to add columns
if namespace is empty, the metadata columns will be added in the root level.
`SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES` is set with kafka metadata column and their type,
An example of metadata columns that can be added for kafka records.
17 changes: 17 additions & 0 deletions env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,21 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id
# SINK_REDIS_TTL_TYPE=DISABLE
# SINK_REDIS_TTL_VALUE=0
# SINK_REDIS_DEPLOYMENT_TYPE=Standalone
#############################################
#
## MaxCompute Sink
#
SINK_MAXCOMPUTE_ODPS_URL=http://service.ap-southeast-5.maxcompute.aliyun.com/api
SINK_MAXCOMPUTE_TUNNEL_URL=http://dt.ap-southeast-5.maxcompute.aliyun.com
SINK_MAXCOMPUTE_ACCESS_ID=
SINK_MAXCOMPUTE_ACCESS_KEY=
SINK_MAXCOMPUTE_PROJECT_ID=your_project_id
SINK_MAXCOMPUTE_SCHEMA=default
SINK_MAXCOMPUTE_METADATA_NAMESPACE=__kafka_metadata
SINK_MAXCOMPUTE_ADD_METADATA_ENABLED=true
SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES=message_timestamp=timestamp,message_topic=string,message_partition=integer,message_offset=long
SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE=true
SINK_MAXCOMPUTE_TABLE_PARTITION_KEY=event_timestamp
SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME=__partition_key
SINK_MAXCOMPUTE_TABLE_NAME=table_name

Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ public enum SinkType {
BLOB,
BIGQUERY,
BIGTABLE,
MONGODB
MONGODB,
MAXCOMPUTE
}
8 changes: 8 additions & 0 deletions src/main/java/com/gotocompany/firehose/sink/SinkFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.gotocompany.depot.http.HttpSink;
import com.gotocompany.depot.log.LogSink;
import com.gotocompany.depot.log.LogSinkFactory;
import com.gotocompany.depot.maxcompute.MaxComputeSinkFactory;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.depot.redis.RedisSink;
import com.gotocompany.depot.redis.RedisSinkFactory;
Expand Down Expand Up @@ -46,6 +47,7 @@ public class SinkFactory {
private LogSinkFactory logSinkFactory;
private RedisSinkFactory redisSinkFactory;
private com.gotocompany.depot.http.HttpSinkFactory httpv2SinkFactory;
private MaxComputeSinkFactory maxComputeSinkFactory;

public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
StatsDReporter statsDReporter,
Expand Down Expand Up @@ -104,6 +106,10 @@ public void init() {
statsDReporter);
httpv2SinkFactory.init();
return;
case MAXCOMPUTE:
maxComputeSinkFactory = new MaxComputeSinkFactory(statsDReporter, stencilClient, config);
maxComputeSinkFactory.init();
return;
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
}
Expand Down Expand Up @@ -139,6 +145,8 @@ public Sink getSink() {
return MongoSinkFactory.create(config, statsDReporter, stencilClient);
case HTTPV2:
return new GenericSink(new FirehoseInstrumentation(statsDReporter, HttpSink.class), sinkType.name(), httpv2SinkFactory.create());
case MAXCOMPUTE:
return new GenericSink(new FirehoseInstrumentation(statsDReporter, MaxComputeSinkFactory.class), sinkType.name(), maxComputeSinkFactory.create());
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
}
Expand Down
Loading