Skip to content

Commit

Permalink
[flink] Fix compatibility for ManifestCommittableSerializer
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed May 27, 2024
1 parent 4c003fb commit 8c64fd2
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -93,6 +94,13 @@ public static void copyBytes(final InputStream in, final OutputStream out) throw
// Stream input skipping
// ------------------------------------------------------------------------

/** Reads all into a bytes. */
public static byte[] readFully(InputStream in, boolean close) throws IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream();
copyBytes(in, output, BLOCKSIZE, close);
return output.toByteArray();
}

/**
* Reads len bytes in a loop.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectSerializer;

import java.util.ArrayList;
import java.util.List;

import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;

/** Serializer for {@link DataFileMeta}. */
public class DataFileMeta08Serializer extends ObjectSerializer<DataFileMeta> {

private static final long serialVersionUID = 1L;

public DataFileMeta08Serializer() {
super(schemaFor08());
}

private static RowType schemaFor08() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "_FILE_NAME", newStringType(false)));
fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false)));
fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false)));
fields.add(new DataField(3, "_MIN_KEY", newBytesType(false)));
fields.add(new DataField(4, "_MAX_KEY", newBytesType(false)));
fields.add(new DataField(5, "_KEY_STATS", SimpleStatsConverter.schema()));
fields.add(new DataField(6, "_VALUE_STATS", SimpleStatsConverter.schema()));
fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)));
fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)));
fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false)));
fields.add(new DataField(10, "_LEVEL", new IntType(false)));
fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))));
fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()));
fields.add(new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)));
fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)));
return new RowType(fields);
}

@Override
public InternalRow toRow(DataFileMeta meta) {
return GenericRow.of(
BinaryString.fromString(meta.fileName()),
meta.fileSize(),
meta.rowCount(),
serializeBinaryRow(meta.minKey()),
serializeBinaryRow(meta.maxKey()),
meta.keyStats().toRow(),
meta.valueStats().toRow(),
meta.minSequenceNumber(),
meta.maxSequenceNumber(),
meta.schemaId(),
meta.level(),
toStringArrayData(meta.extraFiles()),
meta.creationTime(),
meta.deleteRowCount().orElse(null),
meta.embeddedIndex());
}

@Override
public DataFileMeta fromRow(InternalRow row) {
return new DataFileMeta(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
deserializeBinaryRow(row.getBinary(3)),
deserializeBinaryRow(row.getBinary(4)),
SimpleStats.fromRow(row.getRow(5, 3)),
SimpleStats.fromRow(row.getRow(6, 3)),
row.getLong(7),
row.getLong(8),
row.getLong(9),
row.getInt(10),
fromStringArrayData(row.getArray(11)),
row.getTimestamp(12, 3),
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/** {@link VersionedSerializer} for {@link ManifestCommittable}. */
public class ManifestCommittableSerializer implements VersionedSerializer<ManifestCommittable> {

private static final int CURRENT_VERSION = 2;
private static final int CURRENT_VERSION = 3;

private final CommitMessageSerializer commitMessageSerializer;

Expand Down Expand Up @@ -75,14 +75,13 @@ private void serializeOffsets(DataOutputViewStreamWrapper view, Map<Integer, Lon

@Override
public ManifestCommittable deserialize(int version, byte[] serialized) throws IOException {
if (version != CURRENT_VERSION) {
if (version > CURRENT_VERSION) {
throw new UnsupportedOperationException(
"Expecting ManifestCommittable version to be "
"Expecting ManifestCommittableSerializer version to be smaller or equal than "
+ CURRENT_VERSION
+ ", but found "
+ version
+ ".\nManifestCommittable is not a compatible data structure. "
+ "Please restart the job afresh (do not recover from savepoint).");
+ ".");
}

DataInputDeserializer view = new DataInputDeserializer(serialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.utils.ObjectSerializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
Expand All @@ -40,11 +44,13 @@
/** {@link VersionedSerializer} for {@link CommitMessage}. */
public class CommitMessageSerializer implements VersionedSerializer<CommitMessage> {

private static final int CURRENT_VERSION = 2;
private static final int CURRENT_VERSION = 3;

private final DataFileMetaSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;

private DataFileMeta08Serializer dataFile08Serializer;

public CommitMessageSerializer() {
this.dataFileSerializer = new DataFileMetaSerializer();
this.indexEntrySerializer = new IndexFileMetaSerializer();
Expand Down Expand Up @@ -86,34 +92,36 @@ private void serialize(CommitMessage obj, DataOutputView view) throws IOExceptio

@Override
public CommitMessage deserialize(int version, byte[] serialized) throws IOException {
checkVersion(version);
DataInputDeserializer view = new DataInputDeserializer(serialized);
return deserialize(view);
return deserialize(version, view);
}

public List<CommitMessage> deserializeList(int version, DataInputView view) throws IOException {
checkVersion(version);
int length = view.readInt();
List<CommitMessage> list = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
list.add(deserialize(view));
list.add(deserialize(version, view));
}
return list;
}

private void checkVersion(int version) {
if (version != CURRENT_VERSION) {
private CommitMessage deserialize(int version, DataInputView view) throws IOException {
ObjectSerializer<DataFileMeta> dataFileSerializer;
if (version == CURRENT_VERSION) {
dataFileSerializer = this.dataFileSerializer;
} else if (version <= 2) {
if (dataFile08Serializer == null) {
dataFile08Serializer = new DataFileMeta08Serializer();
}
dataFileSerializer = dataFile08Serializer;
} else {
throw new UnsupportedOperationException(
"Expecting FileCommittable version to be "
"Expecting CommitMessageSerializer version to be smaller or equal than "
+ CURRENT_VERSION
+ ", but found "
+ version
+ ".\nFileCommittable is not a compatible data structure. "
+ "Please restart the job afresh (do not recover from savepoint).");
+ ".");
}
}

private CommitMessage deserialize(DataInputView view) throws IOException {
return new CommitMessageImpl(
deserializeBinaryRow(view),
view.readInt(),
Expand All @@ -127,6 +135,8 @@ private CommitMessage deserialize(DataInputView view) throws IOException {
dataFileSerializer.deserializeList(view)),
new IndexIncrement(
indexEntrySerializer.deserializeList(view),
indexEntrySerializer.deserializeList(view)));
version <= 2
? Collections.emptyList()
: indexEntrySerializer.deserializeList(view)));
}
}
Loading

0 comments on commit 8c64fd2

Please sign in to comment.