-
Notifications
You must be signed in to change notification settings - Fork 993
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[flink] Support updating row type nested in array/map in Flink
- Loading branch information
Showing
10 changed files
with
561 additions
and
54 deletions.
There are no files selected for viewing
41 changes: 41 additions & 0 deletions
41
paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.paimon.casting; | ||
|
||
import org.apache.paimon.data.InternalArray; | ||
|
||
/** Get element from array and cast it according to specific {@link CastExecutor}. */ | ||
public class CastElementGetter { | ||
|
||
private final InternalArray.ElementGetter elementGetter; | ||
private final CastExecutor<Object, Object> castExecutor; | ||
|
||
@SuppressWarnings("unchecked") | ||
public CastElementGetter( | ||
InternalArray.ElementGetter elementGetter, CastExecutor<?, ?> castExecutor) { | ||
this.elementGetter = elementGetter; | ||
this.castExecutor = (CastExecutor<Object, Object>) castExecutor; | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public <V> V getElementOrNull(InternalArray array, int pos) { | ||
Object value = elementGetter.getElementOrNull(array, pos); | ||
return value == null ? null : (V) castExecutor.cast(value); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
201 changes: 201 additions & 0 deletions
201
paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.paimon.casting; | ||
|
||
import org.apache.paimon.data.BinaryString; | ||
import org.apache.paimon.data.Decimal; | ||
import org.apache.paimon.data.InternalArray; | ||
import org.apache.paimon.data.InternalMap; | ||
import org.apache.paimon.data.InternalRow; | ||
import org.apache.paimon.data.Timestamp; | ||
|
||
/** | ||
* An implementation of {@link InternalArray} which provides a casted view of the underlying {@link | ||
* InternalArray}. | ||
* | ||
* <p>It reads data from underlying {@link InternalArray} according to source logical type and casts | ||
* it with specific {@link CastExecutor}. | ||
*/ | ||
public class CastedArray implements InternalArray { | ||
|
||
private final CastElementGetter castElementGetter; | ||
private InternalArray array; | ||
|
||
protected CastedArray(CastElementGetter castElementGetter) { | ||
this.castElementGetter = castElementGetter; | ||
} | ||
|
||
/** | ||
* Replaces the underlying {@link InternalArray} backing this {@link CastedArray}. | ||
* | ||
* <p>This method replaces the array in place and does not return a new object. This is done for | ||
* performance reasons. | ||
*/ | ||
public static CastedArray from(CastElementGetter castElementGetter) { | ||
return new CastedArray(castElementGetter); | ||
} | ||
|
||
public CastedArray replaceArray(InternalArray array) { | ||
this.array = array; | ||
return this; | ||
} | ||
|
||
@Override | ||
public int size() { | ||
return array.size(); | ||
} | ||
|
||
@Override | ||
public boolean[] toBooleanArray() { | ||
boolean[] result = new boolean[size()]; | ||
for (int i = 0; i < result.length; i++) { | ||
result[i] = castElementGetter.getElementOrNull(array, i); | ||
} | ||
return result; | ||
} | ||
|
||
@Override | ||
public byte[] toByteArray() { | ||
byte[] result = new byte[size()]; | ||
for (int i = 0; i < result.length; i++) { | ||
result[i] = castElementGetter.getElementOrNull(array, i); | ||
} | ||
return result; | ||
} | ||
|
||
@Override | ||
public short[] toShortArray() { | ||
short[] result = new short[size()]; | ||
for (int i = 0; i < result.length; i++) { | ||
result[i] = castElementGetter.getElementOrNull(array, i); | ||
} | ||
return result; | ||
} | ||
|
||
@Override | ||
public int[] toIntArray() { | ||
int[] result = new int[size()]; | ||
for (int i = 0; i < result.length; i++) { | ||
result[i] = castElementGetter.getElementOrNull(array, i); | ||
} | ||
return result; | ||
} | ||
|
||
@Override | ||
public long[] toLongArray() { | ||
long[] result = new long[size()]; | ||
for (int i = 0; i < result.length; i++) { | ||
result[i] = castElementGetter.getElementOrNull(array, i); | ||
} | ||
return result; | ||
} | ||
|
||
@Override | ||
public float[] toFloatArray() { | ||
float[] result = new float[size()]; | ||
for (int i = 0; i < result.length; i++) { | ||
result[i] = castElementGetter.getElementOrNull(array, i); | ||
} | ||
return result; | ||
} | ||
|
||
@Override | ||
public double[] toDoubleArray() { | ||
double[] result = new double[size()]; | ||
for (int i = 0; i < result.length; i++) { | ||
result[i] = castElementGetter.getElementOrNull(array, i); | ||
} | ||
return result; | ||
} | ||
|
||
@Override | ||
public boolean isNullAt(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos) == null; | ||
} | ||
|
||
@Override | ||
public boolean getBoolean(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public byte getByte(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public short getShort(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public int getInt(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public long getLong(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public float getFloat(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public double getDouble(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public BinaryString getString(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public Decimal getDecimal(int pos, int precision, int scale) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public Timestamp getTimestamp(int pos, int precision) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public byte[] getBinary(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public InternalArray getArray(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public InternalMap getMap(int pos) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
|
||
@Override | ||
public InternalRow getRow(int pos, int numFields) { | ||
return castElementGetter.getElementOrNull(array, pos); | ||
} | ||
} |
70 changes: 70 additions & 0 deletions
70
paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.paimon.casting; | ||
|
||
import org.apache.paimon.data.InternalArray; | ||
import org.apache.paimon.data.InternalMap; | ||
|
||
/** | ||
* An implementation of {@link InternalMap} which provides a casted view of the underlying {@link | ||
* InternalMap}. | ||
* | ||
* <p>It reads data from underlying {@link InternalMap} according to source logical type and casts | ||
* it with specific {@link CastExecutor}. | ||
*/ | ||
public class CastedMap implements InternalMap { | ||
|
||
private final CastedArray castedValueArray; | ||
private InternalMap map; | ||
|
||
protected CastedMap(CastElementGetter castValueGetter) { | ||
this.castedValueArray = CastedArray.from(castValueGetter); | ||
} | ||
|
||
/** | ||
* Replaces the underlying {@link InternalMap} backing this {@link CastedMap}. | ||
* | ||
* <p>This method replaces the map in place and does not return a new object. This is done for | ||
* performance reasons. | ||
*/ | ||
public static CastedMap from(CastElementGetter castValueGetter) { | ||
return new CastedMap(castValueGetter); | ||
} | ||
|
||
public CastedMap replaceMap(InternalMap map) { | ||
this.castedValueArray.replaceArray(map.valueArray()); | ||
this.map = map; | ||
return this; | ||
} | ||
|
||
@Override | ||
public int size() { | ||
return map.size(); | ||
} | ||
|
||
@Override | ||
public InternalArray keyArray() { | ||
return map.keyArray(); | ||
} | ||
|
||
@Override | ||
public InternalArray valueArray() { | ||
return castedValueArray; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.