Skip to content

Commit

Permalink
Integrate with stream read
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Mar 29, 2024
1 parent 385ac73 commit 29c318f
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 229 deletions.
217 changes: 48 additions & 169 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,15 @@

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Map;

/**
* The metadata of changelog. It generates from the snapshot file during expiration. So that the
Expand All @@ -42,200 +37,84 @@
* <li>The changelog file. Eg: from the changelog-producer = 'input'
* <li>The delta files in the APPEND commits when the changelog-producer = 'none'
*/
public class Changelog {
public class Changelog extends Snapshot {

private static final int CURRENT_VERSION = 1;

private static final String FIELD_VERSION = "version";
private static final String FIELD_ID = "id";
private static final String FIELD_SCHEMA_ID = "schemaId";
private static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList";
private static final String FIELD_CHANGELOG_MANIFEST_LIST = "changelogManifestList";
private static final String FIELD_COMMIT_KIND = "commitKind";
private static final String FIELD_TIME_MILLIS = "timeMillis";
private static final String FIELD_RECORD_COUNT = "recordCount";
private static final String FIELD_WATERMARK = "watermark";

@JsonProperty(FIELD_VERSION)
private final int version;

@JsonProperty(FIELD_ID)
private final long id;

@JsonProperty(FIELD_SCHEMA_ID)
private final long schemaId;

@JsonProperty(FIELD_DELTA_MANIFEST_LIST)
private final String deltaManifestList;

@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
private final String changelogManifestList;

@JsonProperty(FIELD_TIME_MILLIS)
private final long timeMillis;

@JsonProperty(FIELD_RECORD_COUNT)
@Nullable
private final Long recordCount;

@JsonProperty(FIELD_WATERMARK)
@Nullable
private final Long watermark;

@JsonProperty(FIELD_COMMIT_KIND)
private Snapshot.CommitKind commitKind;
private static final int CURRENT_VERSION = 3;

public Changelog(
long id,
long schemaId,
@Nullable String deltaManifestList,
String baseManifestList,
String deltaManifestList,
@Nullable String changelogManifestList,
Snapshot.CommitKind commitKind,
@Nullable String indexManifest,
String commitUser,
long commitIdentifier,
CommitKind commitKind,
long timeMillis,
Long recordCount,
@Nullable Long watermark) {
Map<Integer, Long> logOffsets,
@Nullable Long totalRecordCount,
@Nullable Long deltaRecordCount,
@Nullable Long changelogRecordCount,
@Nullable Long watermark,
@Nullable String statistics) {
this(
CURRENT_VERSION,
id,
schemaId,
baseManifestList,
deltaManifestList,
changelogManifestList,
indexManifest,
commitUser,
commitIdentifier,
commitKind,
timeMillis,
recordCount,
watermark);
logOffsets,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
watermark,
statistics);
}

@JsonCreator
public Changelog(
@JsonProperty(FIELD_VERSION) @Nullable Integer version,
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) @Nullable String deltaManifestList,
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList,
@JsonProperty(FIELD_COMMIT_KIND) Snapshot.CommitKind commitKind,
@JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
@JsonProperty(FIELD_RECORD_COUNT) @Nullable Long recordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark) {
this.version = version;
this.id = id;
this.schemaId = schemaId;
this.deltaManifestList = deltaManifestList;
this.changelogManifestList = changelogManifestList;
this.recordCount = recordCount;
this.commitKind = commitKind;
this.timeMillis = timeMillis;
this.watermark = watermark;
}

@JsonGetter(FIELD_VERSION)
public int version() {
return version;
}

@JsonGetter(FIELD_ID)
public long id() {
return id;
}

@JsonGetter(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
public String changelogManifestList() {
return changelogManifestList;
}

@JsonGetter(FIELD_DELTA_MANIFEST_LIST)
public String deltaManifestList() {
return deltaManifestList;
}

@JsonGetter(FIELD_SCHEMA_ID)
public long schemaId() {
return schemaId;
}

@JsonGetter(FIELD_COMMIT_KIND)
public Snapshot.CommitKind commitKind() {
return commitKind;
}

@JsonGetter(FIELD_TIME_MILLIS)
public long timeMillis() {
return timeMillis;
}

@JsonGetter(FIELD_RECORD_COUNT)
public Long recordCount() {
return recordCount;
}

@JsonGetter(FIELD_WATERMARK)
@Nullable
public Long watermark() {
return watermark;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Changelog)) {
return false;
}
Changelog changelog = (Changelog) o;
return version == changelog.version
&& id == changelog.id
&& schemaId == changelog.schemaId
&& timeMillis == changelog.timeMillis
&& Objects.equals(deltaManifestList, changelog.deltaManifestList)
&& Objects.equals(changelogManifestList, changelog.changelogManifestList)
&& Objects.equals(recordCount, changelog.recordCount)
&& Objects.equals(watermark, changelog.watermark)
&& commitKind == changelog.commitKind;
}

@Override
public int hashCode() {
return Objects.hash(
@JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets,
@JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount,
@JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
super(
version,
id,
schemaId,
baseManifestList,
deltaManifestList,
changelogManifestList,
indexManifest,
commitUser,
commitIdentifier,
commitKind,
timeMillis,
recordCount,
logOffsets,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
watermark,
commitKind);
}

/**
* Return a {@link ManifestFileMeta} for each delta manifest in this changelog.
*
* @param manifestList a {@link ManifestList} instance used for reading files at snapshot.
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> deltaManifests(ManifestList manifestList) {
return deltaManifestList == null
? Collections.emptyList()
: manifestList.read(deltaManifestList);
}

/**
* Return a {@link ManifestFileMeta} for each changelog manifest in this changelog.
*
* @param manifestList a {@link ManifestList} instance used for reading files at snapshot.
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> changelogManifests(ManifestList manifestList) {
return changelogManifestList == null
? Collections.emptyList()
: manifestList.read(changelogManifestList);
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
statistics);
}

public static Changelog fromJson(String json) {
Expand Down
34 changes: 17 additions & 17 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,23 @@ public class Snapshot {
public static final int TABLE_STORE_02_VERSION = 1;
private static final int CURRENT_VERSION = 3;

private static final String FIELD_VERSION = "version";
private static final String FIELD_ID = "id";
private static final String FIELD_SCHEMA_ID = "schemaId";
private static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList";
private static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList";
private static final String FIELD_CHANGELOG_MANIFEST_LIST = "changelogManifestList";
private static final String FIELD_INDEX_MANIFEST = "indexManifest";
private static final String FIELD_COMMIT_USER = "commitUser";
private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
private static final String FIELD_COMMIT_KIND = "commitKind";
private static final String FIELD_TIME_MILLIS = "timeMillis";
private static final String FIELD_LOG_OFFSETS = "logOffsets";
private static final String FIELD_TOTAL_RECORD_COUNT = "totalRecordCount";
private static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount";
private static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount";
private static final String FIELD_WATERMARK = "watermark";
private static final String FIELD_STATISTICS = "statistics";
protected static final String FIELD_VERSION = "version";
protected static final String FIELD_ID = "id";
protected static final String FIELD_SCHEMA_ID = "schemaId";
protected static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList";
protected static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList";
protected static final String FIELD_CHANGELOG_MANIFEST_LIST = "changelogManifestList";
protected static final String FIELD_INDEX_MANIFEST = "indexManifest";
protected static final String FIELD_COMMIT_USER = "commitUser";
protected static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
protected static final String FIELD_COMMIT_KIND = "commitKind";
protected static final String FIELD_TIME_MILLIS = "timeMillis";
protected static final String FIELD_LOG_OFFSETS = "logOffsets";
protected static final String FIELD_TOTAL_RECORD_COUNT = "totalRecordCount";
protected static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount";
protected static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount";
protected static final String FIELD_WATERMARK = "watermark";
protected static final String FIELD_STATISTICS = "statistics";

// version of snapshot
// null for paimon <= 0.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,19 @@ public int expireUntil(long earliestId, long endExclusiveId) {
id,
snapshot.schemaId(),
null,
null,
snapshot.changelogManifestList(),
null,
snapshot.commitUser(),
snapshot.commitIdentifier(),
snapshot.commitKind(),
snapshot.timeMillis(),
snapshot.logOffsets(),
snapshot.totalRecordCount(),
null,
snapshot.changelogRecordCount(),
snapshot.watermark());
snapshot.watermark(),
null);
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, false);
} else {
// no changelog
Expand All @@ -244,10 +252,18 @@ public int expireUntil(long earliestId, long endExclusiveId) {
snapshot.schemaId(),
null,
null,
null,
snapshot.indexManifest(),
snapshot.commitUser(),
snapshot.commitIdentifier(),
snapshot.commitKind(),
snapshot.timeMillis(),
0L,
snapshot.watermark());
snapshot.logOffsets(),
snapshot.changelogRecordCount(),
snapshot.totalRecordCount(),
snapshot.changelogRecordCount(),
snapshot.watermark(),
snapshot.statistics());
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, true);
}
try {
Expand Down
Loading

0 comments on commit 29c318f

Please sign in to comment.