Skip to content

Commit

Permalink
Cassandra Schema and Value Mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle committed Dec 11, 2024
1 parent 6a50180 commit fb0b666
Show file tree
Hide file tree
Showing 21 changed files with 1,118 additions and 15 deletions.
10 changes: 10 additions & 0 deletions v2/sourcedb-to-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,15 @@
<version>5.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-cassandra</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.mappings;

import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.FieldMapper;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.RowValueExtractor;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.RowValueMapper;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapping;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider;
import com.google.common.collect.ImmutableMap;

/** Represent Unified type mapping, value extractor and value mappings for Cassandra. */
@AutoValue
public abstract class CassandraMappings {
public abstract ImmutableMap<String, UnifiedTypeMapping> typeMapping();

public abstract ImmutableMap<String, FieldMapper<?>> fieldMapping();

public static Builder builder() {
return new AutoValue_CassandraMappings.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
abstract ImmutableMap.Builder<String, UnifiedTypeMapping> typeMappingBuilder();

abstract ImmutableMap.Builder<String, FieldMapper<?>> fieldMappingBuilder();

public <T> Builder put(
String cassandraType,
UnifiedMappingProvider.Type type,
RowValueExtractor<T> rowValueExtractor,
RowValueMapper<T> rowValueMapper) {
this.typeMappingBuilder()
.put(cassandraType.toUpperCase(), UnifiedMappingProvider.getMapping(type));
this.fieldMappingBuilder()
.put(cassandraType.toUpperCase(), FieldMapper.create(rowValueExtractor, rowValueMapper));
return this;
}

public abstract CassandraMappings build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.mappings;

import com.datastax.driver.core.Duration;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.TypeCodec;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.FieldMapper;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.RowValueExtractor;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.RowValueMapper;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapping;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.IntervalNano;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider;
import com.google.common.collect.ImmutableMap;
import java.nio.ByteBuffer;
import java.util.Date;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.commons.codec.binary.Hex;

public class CassandraMappingsProvider {
/** Pass the value as is to avro. */
private static final RowValueMapper valuePassThrough = (value, schema) -> value;

/** Pass the value as a string to avro. */
private static final RowValueMapper toString = (value, schema) -> value.toString();

/** Pass the value as an integer to avro. */
private static final RowValueMapper<Number> toInt = (value, schema) -> value.intValue();

/** Map {@link ByteBuffer} to a Hex encoded String. */
private static final RowValueMapper<ByteBuffer> ByteBufferToHexString =
(value, schema) -> new String(Hex.encodeHex(value.array()));

/**
* Map {@link LocalDate} to {@link LogicalTypes.Date}. Cassandra Date type encodes number of days
* since epoch, without any time or time zone component.
*
* <p>See: <a href="https://cassandra.apache.org/doc/stable/cassandra/cql/types.html">types</a>
* for additional information on date type.
*/
private static final RowValueMapper<LocalDate> localDateToAvroLogicalDate =
(value, schema) -> value.getDaysSinceEpoch();

private static final RowValueExtractor<Duration> getDuration =
(row, name) -> row.get(name, TypeCodec.duration());

private static final RowValueMapper<Duration> durationToAvro =
(value, schema) ->
new GenericRecordBuilder(IntervalNano.SCHEMA)
.set(IntervalNano.MONTHS_FIELD_NAME, value.getMonths())
.set(IntervalNano.DAYS_FIELD_NAME, value.getDays())
.set(IntervalNano.NANOS_FIELD_NAME, value.getNanoseconds())
.build();

/**
* Cassandra represents `Time` field as 64 bit singed integer representing number of nanoseconds
* since midnight. See <a
* href=https://cassandra.apache.org/doc/stable/cassandra/cql/types.html>types documentation</a>
* for further details.
*/
private static final RowValueMapper<Long> cassandraTimeToIntervalNano =
(value, schema) ->
new GenericRecordBuilder(IntervalNano.SCHEMA)
.set(IntervalNano.NANOS_FIELD_NAME, value)
.build();

private static final RowValueMapper<Date> dateToAvro = (value, schema) -> value.getTime() * 1000L;

private static final CassandraMappings CASSANDRA_MAPPINGS =
CassandraMappings.builder()
.put("ASCII", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough)
.put("BIGINT", UnifiedMappingProvider.Type.LONG, Row::getLong, valuePassThrough)
.put("BLOB", UnifiedMappingProvider.Type.STRING, Row::getBytes, ByteBufferToHexString)
.put("BOOLEAN", UnifiedMappingProvider.Type.BOOLEAN, Row::getBool, valuePassThrough)
.put("COUNTER", UnifiedMappingProvider.Type.LONG, Row::getLong, valuePassThrough)
.put("DATE", UnifiedMappingProvider.Type.DATE, Row::getDate, localDateToAvroLogicalDate)
// Unlike MySql, the Cassandra decimal does not have precision and scale fixed in the
// schema.
// Hence, we can't map it to avro decimal.
.put("DECIMAL", UnifiedMappingProvider.Type.STRING, Row::getDecimal, toString)
.put("DOUBLE", UnifiedMappingProvider.Type.DOUBLE, Row::getDouble, valuePassThrough)
.put("DURATION", UnifiedMappingProvider.Type.INTERVAL_NANO, getDuration, durationToAvro)
.put("FLOAT", UnifiedMappingProvider.Type.FLOAT, Row::getFloat, valuePassThrough)
.put("INET", UnifiedMappingProvider.Type.STRING, Row::getInet, toString)
.put("INT", UnifiedMappingProvider.Type.INTEGER, Row::getInt, valuePassThrough)
.put("SMALLINT", UnifiedMappingProvider.Type.INTEGER, Row::getShort, toInt)
.put("TEXT", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough)
.put(
"TIME",
UnifiedMappingProvider.Type.INTERVAL_NANO,
Row::getTime,
cassandraTimeToIntervalNano)
.put("TIMESTAMP", UnifiedMappingProvider.Type.TIMESTAMP, Row::getTimestamp, dateToAvro)
.put("TIMEUUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString)
.put("TINYINT", UnifiedMappingProvider.Type.INTEGER, Row::getByte, toInt)
.put("UUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString)
.put("VARCHAR", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough)
.put("VARINT", UnifiedMappingProvider.Type.NUMBER, Row::getVarint, toString)
.put(
"UNSUPPORTED",
UnifiedMappingProvider.Type.UNSUPPORTED,
(row, name) -> null,
(value, schema) -> null)
.build();

private CassandraMappingsProvider() {}

/** Mappings for unified type interface. */
public static ImmutableMap<String, UnifiedTypeMapping> getMapping() {
return CASSANDRA_MAPPINGS.typeMapping();
}

/**
* Field Mappers for {@link
* com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraSourceRowMapper}.
*/
public static ImmutableMap<String, FieldMapper<?>> getFieldMapping() {
return CASSANDRA_MAPPINGS.fieldMapping();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/** Schema and Value mapping for Cassandra. */
package com.google.cloud.teleport.v2.source.reader.io.cassandra.mappings;
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper;

import com.datastax.driver.core.ResultSet;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema;
import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.Future;
import org.apache.beam.sdk.io.cassandra.Mapper;
import org.apache.commons.collections4.iterators.TransformIterator;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

@AutoValue
public abstract class CassandraSourceRowMapper implements Mapper<SourceRow>, Serializable {
abstract SourceSchemaReference sourceSchemaReference();

abstract SourceTableSchema sourceTableSchema();

@Override
public @UnknownKeyFor @NonNull @Initialized Iterator<SourceRow> map(
@UnknownKeyFor @NonNull @Initialized ResultSet resultSet) {
var ret = new TransformIterator();
ret.setIterator(resultSet.iterator());
ret.setTransformer(RowMapper.create(sourceSchemaReference(), sourceTableSchema()));
return ret;
}

@Override
public @UnknownKeyFor @NonNull @Initialized Future<@UnknownKeyFor @Nullable @Initialized Void>
deleteAsync(SourceRow entity) {
throw new UnsupportedOperationException("Only Read from Cassandra is supported");
}

@Override
public @UnknownKeyFor @NonNull @Initialized Future<@UnknownKeyFor @Nullable @Initialized Void>
saveAsync(SourceRow entity) {
throw new UnsupportedOperationException("Only Read from Cassandra is supported");
}

public static Builder builder() {
return new AutoValue_CassandraSourceRowMapper.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setSourceSchemaReference(SourceSchemaReference value);

public abstract Builder setSourceTableSchema(SourceTableSchema value);

public abstract CassandraSourceRowMapper build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper;

import com.datastax.driver.core.Row;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.apache.avro.Schema;

@AutoValue
public abstract class FieldMapper<T> implements Serializable {

public static FieldMapper<?> create(
RowValueExtractor<?> rowValueExtractor, RowValueMapper<?> rowValueMapper) {
return new AutoValue_FieldMapper(rowValueExtractor, rowValueMapper);
}

public Object mapValue(Row row, String fieldName, Schema fieldSchema) {
T extractedValue = rowValueExtractor().extract(row, fieldName);
if (extractedValue == null) {
return null;
}
Object avroValue = rowValueMapper().map(extractedValue, fieldSchema);
return avroValue;
}

abstract RowValueExtractor<T> rowValueExtractor();

abstract RowValueMapper<T> rowValueMapper();
}
Loading

0 comments on commit fb0b666

Please sign in to comment.