Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Support updating row type nested in array/map in Flink #4528

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.casting;

import org.apache.paimon.data.InternalArray;

/** Get element from array and cast it according to specific {@link CastExecutor}. */
public class CastElementGetter {

private final InternalArray.ElementGetter elementGetter;
private final CastExecutor<Object, Object> castExecutor;

@SuppressWarnings("unchecked")
public CastElementGetter(
InternalArray.ElementGetter elementGetter, CastExecutor<?, ?> castExecutor) {
this.elementGetter = elementGetter;
this.castExecutor = (CastExecutor<Object, Object>) castExecutor;
}

@SuppressWarnings("unchecked")
public <V> V getElementOrNull(InternalArray array, int pos) {
Object value = elementGetter.getElementOrNull(array, pos);
return value == null ? null : (V) castExecutor.cast(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
* Get field value from row with given pos and cast it according to specific {@link CastExecutor}.
*/
public class CastFieldGetter {

private final InternalRow.FieldGetter fieldGetter;
private final CastExecutor<Object, Object> castExecutor;

@SuppressWarnings("unchecked")
public CastFieldGetter(InternalRow.FieldGetter fieldGetter, CastExecutor<?, ?> castExecutor) {
this.fieldGetter = fieldGetter;
this.castExecutor = (CastExecutor<Object, Object>) castExecutor;
}

@SuppressWarnings("unchecked")
public <V> V getFieldOrNull(InternalRow row) {
Object value = fieldGetter.getFieldOrNull(row);
return value == null ? null : (V) castExecutor.cast(value);
Expand Down
201 changes: 201 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.casting;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;

/**
* An implementation of {@link InternalArray} which provides a casted view of the underlying {@link
* InternalArray}.
*
* <p>It reads data from underlying {@link InternalArray} according to source logical type and casts
* it with specific {@link CastExecutor}.
*/
public class CastedArray implements InternalArray {

private final CastElementGetter castElementGetter;
private InternalArray array;

protected CastedArray(CastElementGetter castElementGetter) {
this.castElementGetter = castElementGetter;
}

/**
* Replaces the underlying {@link InternalArray} backing this {@link CastedArray}.
*
* <p>This method replaces the array in place and does not return a new object. This is done for
* performance reasons.
*/
public static CastedArray from(CastElementGetter castElementGetter) {
return new CastedArray(castElementGetter);
}

public CastedArray replaceArray(InternalArray array) {
this.array = array;
return this;
}

@Override
public int size() {
return array.size();
}

@Override
public boolean[] toBooleanArray() {
boolean[] result = new boolean[size()];
for (int i = 0; i < result.length; i++) {
result[i] = castElementGetter.getElementOrNull(array, i);
}
return result;
}

@Override
public byte[] toByteArray() {
byte[] result = new byte[size()];
for (int i = 0; i < result.length; i++) {
result[i] = castElementGetter.getElementOrNull(array, i);
}
return result;
}

@Override
public short[] toShortArray() {
short[] result = new short[size()];
for (int i = 0; i < result.length; i++) {
result[i] = castElementGetter.getElementOrNull(array, i);
}
return result;
}

@Override
public int[] toIntArray() {
int[] result = new int[size()];
for (int i = 0; i < result.length; i++) {
result[i] = castElementGetter.getElementOrNull(array, i);
}
return result;
}

@Override
public long[] toLongArray() {
long[] result = new long[size()];
for (int i = 0; i < result.length; i++) {
result[i] = castElementGetter.getElementOrNull(array, i);
}
return result;
}

@Override
public float[] toFloatArray() {
float[] result = new float[size()];
for (int i = 0; i < result.length; i++) {
result[i] = castElementGetter.getElementOrNull(array, i);
}
return result;
}

@Override
public double[] toDoubleArray() {
double[] result = new double[size()];
for (int i = 0; i < result.length; i++) {
result[i] = castElementGetter.getElementOrNull(array, i);
}
return result;
}

@Override
public boolean isNullAt(int pos) {
return castElementGetter.getElementOrNull(array, pos) == null;
}

@Override
public boolean getBoolean(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public byte getByte(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public short getShort(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public int getInt(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public long getLong(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public float getFloat(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public double getDouble(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public BinaryString getString(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public Decimal getDecimal(int pos, int precision, int scale) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public Timestamp getTimestamp(int pos, int precision) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public byte[] getBinary(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public InternalArray getArray(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public InternalMap getMap(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public InternalRow getRow(int pos, int numFields) {
return castElementGetter.getElementOrNull(array, pos);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.casting;

import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;

/**
* An implementation of {@link InternalMap} which provides a casted view of the underlying {@link
* InternalMap}.
*
* <p>It reads data from underlying {@link InternalMap} according to source logical type and casts
* it with specific {@link CastExecutor}.
*/
public class CastedMap implements InternalMap {

private final CastedArray castedValueArray;
private InternalMap map;

protected CastedMap(CastElementGetter castValueGetter) {
this.castedValueArray = CastedArray.from(castValueGetter);
}

/**
* Replaces the underlying {@link InternalMap} backing this {@link CastedMap}.
*
* <p>This method replaces the map in place and does not return a new object. This is done for
* performance reasons.
*/
public static CastedMap from(CastElementGetter castValueGetter) {
return new CastedMap(castValueGetter);
}

public CastedMap replaceMap(InternalMap map) {
this.castedValueArray.replaceArray(map.valueArray());
this.map = map;
return this;
}

@Override
public int size() {
return map.size();
}

@Override
public InternalArray keyArray() {
return map.keyArray();
}

@Override
public InternalArray valueArray() {
return castedValueArray;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
*
* <p>It reads data from underlying {@link InternalRow} according to source logical type and casts
* it with specific {@link CastExecutor}.
*
* <p>Note: This class supports only top-level castings, not nested castings.
*/
public class CastedRow implements InternalRow {

Expand Down
Loading
Loading