builder) {
setIf(removedDeleteFiles > 0, builder, REMOVED_DELETE_FILES_PROP, removedDeleteFiles);
setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords);
setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords);
+ setIf(totalDataManifestFiles > 0, builder, TOTAL_DATA_MANIFEST_FILES, totalDataManifestFiles);
+ setIf(
+ totalDeleteManifestFiles > 0,
+ builder,
+ TOTAL_DELETE_MANIFEST_FILES,
+ totalDeleteManifestFiles);
if (trustSizeAndDeleteCounts) {
setIf(addedSize > 0, builder, ADDED_FILE_SIZE_PROP, addedSize);
@@ -336,6 +352,16 @@ void addedManifest(ManifestFile manifest) {
}
}
+ void addedManifestStats(ManifestFile manifest) {
+ switch (manifest.content()) {
+ case DATA:
+ this.totalDataManifestFiles++;
+ break;
+ case DELETES:
+ this.totalDeleteManifestFiles++;
+ }
+ }
+
void merge(UpdateMetrics other) {
this.addedFiles += other.addedFiles;
this.removedFiles += other.removedFiles;
@@ -353,6 +379,8 @@ void merge(UpdateMetrics other) {
this.removedPosDeletes += other.removedPosDeletes;
this.addedEqDeletes += other.addedEqDeletes;
this.removedEqDeletes += other.removedEqDeletes;
+ this.totalDataManifestFiles += other.totalDataManifestFiles;
+ this.totalDeleteManifestFiles += other.totalDeleteManifestFiles;
this.trustSizeAndDeleteCounts = trustSizeAndDeleteCounts && other.trustSizeAndDeleteCounts;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
index f25ebd49c9d8..1a396f0bfc7e 100644
--- a/core/src/main/java/org/apache/iceberg/StaticDataTask.java
+++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
@@ -127,7 +127,11 @@ DataFile metadataFile() {
return metadataFile;
}
- /** @return the table rows before projection */
+ /**
+ * Returns the table rows before projection.
+ *
+ * @return the table rows before projection
+ */
StructLike[] tableRows() {
return rows;
}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 74b8ad0bbddc..923db6bbd68f 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -51,7 +51,7 @@ public class TableMetadata implements Serializable {
static final long INITIAL_SEQUENCE_NUMBER = 0;
static final long INVALID_SEQUENCE_NUMBER = -1;
static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
- static final int SUPPORTED_TABLE_FORMAT_VERSION = 2;
+ static final int SUPPORTED_TABLE_FORMAT_VERSION = 3;
static final int INITIAL_SPEC_ID = 0;
static final int INITIAL_SORT_ORDER_ID = 1;
static final int INITIAL_SCHEMA_ID = 0;
@@ -564,6 +564,10 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
return new Builder(this).setDefaultPartitionSpec(newPartitionSpec).build();
}
+ public TableMetadata addPartitionSpec(PartitionSpec newPartitionSpec) {
+ return new Builder(this).addPartitionSpec(newPartitionSpec).build();
+ }
+
public TableMetadata replaceSortOrder(SortOrder newOrder) {
return new Builder(this).setDefaultSortOrder(newOrder).build();
}
diff --git a/core/src/main/java/org/apache/iceberg/V3Metadata.java b/core/src/main/java/org/apache/iceberg/V3Metadata.java
new file mode 100644
index 000000000000..94e20ea99858
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/V3Metadata.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+class V3Metadata {
+ private V3Metadata() {}
+
+ static final Schema MANIFEST_LIST_SCHEMA =
+ new Schema(
+ ManifestFile.PATH,
+ ManifestFile.LENGTH,
+ ManifestFile.SPEC_ID,
+ ManifestFile.MANIFEST_CONTENT.asRequired(),
+ ManifestFile.SEQUENCE_NUMBER.asRequired(),
+ ManifestFile.MIN_SEQUENCE_NUMBER.asRequired(),
+ ManifestFile.SNAPSHOT_ID.asRequired(),
+ ManifestFile.ADDED_FILES_COUNT.asRequired(),
+ ManifestFile.EXISTING_FILES_COUNT.asRequired(),
+ ManifestFile.DELETED_FILES_COUNT.asRequired(),
+ ManifestFile.ADDED_ROWS_COUNT.asRequired(),
+ ManifestFile.EXISTING_ROWS_COUNT.asRequired(),
+ ManifestFile.DELETED_ROWS_COUNT.asRequired(),
+ ManifestFile.PARTITION_SUMMARIES,
+ ManifestFile.KEY_METADATA);
+
+ /**
+ * A wrapper class to write any ManifestFile implementation to Avro using the v3 write schema.
+ *
+ * This is used to maintain compatibility with v3 by writing manifest list files with the old
+ * schema, instead of writing a sequence number into metadata files in v3 tables.
+ */
+ static class IndexedManifestFile implements ManifestFile, IndexedRecord {
+ private static final org.apache.avro.Schema AVRO_SCHEMA =
+ AvroSchemaUtil.convert(MANIFEST_LIST_SCHEMA, "manifest_file");
+
+ private final long commitSnapshotId;
+ private final long sequenceNumber;
+ private ManifestFile wrapped = null;
+
+ IndexedManifestFile(long commitSnapshotId, long sequenceNumber) {
+ this.commitSnapshotId = commitSnapshotId;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public ManifestFile wrap(ManifestFile file) {
+ this.wrapped = file;
+ return this;
+ }
+
+ @Override
+ public org.apache.avro.Schema getSchema() {
+ return AVRO_SCHEMA;
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ throw new UnsupportedOperationException("Cannot modify IndexedManifestFile wrapper via put");
+ }
+
+ @Override
+ public Object get(int pos) {
+ switch (pos) {
+ case 0:
+ return wrapped.path();
+ case 1:
+ return wrapped.length();
+ case 2:
+ return wrapped.partitionSpecId();
+ case 3:
+ return wrapped.content().id();
+ case 4:
+ if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
+ // if the sequence number is being assigned here, then the manifest must be created by
+ // the current
+ // operation. to validate this, check that the snapshot id matches the current commit
+ Preconditions.checkState(
+ commitSnapshotId == wrapped.snapshotId(),
+ "Found unassigned sequence number for a manifest from snapshot: %s",
+ wrapped.snapshotId());
+ return sequenceNumber;
+ } else {
+ return wrapped.sequenceNumber();
+ }
+ case 5:
+ if (wrapped.minSequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
+ // same sanity check as above
+ Preconditions.checkState(
+ commitSnapshotId == wrapped.snapshotId(),
+ "Found unassigned sequence number for a manifest from snapshot: %s",
+ wrapped.snapshotId());
+ // if the min sequence number is not determined, then there was no assigned sequence
+ // number for any file
+ // written to the wrapped manifest. replace the unassigned sequence number with the one
+ // for this commit
+ return sequenceNumber;
+ } else {
+ return wrapped.minSequenceNumber();
+ }
+ case 6:
+ return wrapped.snapshotId();
+ case 7:
+ return wrapped.addedFilesCount();
+ case 8:
+ return wrapped.existingFilesCount();
+ case 9:
+ return wrapped.deletedFilesCount();
+ case 10:
+ return wrapped.addedRowsCount();
+ case 11:
+ return wrapped.existingRowsCount();
+ case 12:
+ return wrapped.deletedRowsCount();
+ case 13:
+ return wrapped.partitions();
+ case 14:
+ return wrapped.keyMetadata();
+ default:
+ throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+ }
+ }
+
+ @Override
+ public String path() {
+ return wrapped.path();
+ }
+
+ @Override
+ public long length() {
+ return wrapped.length();
+ }
+
+ @Override
+ public int partitionSpecId() {
+ return wrapped.partitionSpecId();
+ }
+
+ @Override
+ public ManifestContent content() {
+ return wrapped.content();
+ }
+
+ @Override
+ public long sequenceNumber() {
+ return wrapped.sequenceNumber();
+ }
+
+ @Override
+ public long minSequenceNumber() {
+ return wrapped.minSequenceNumber();
+ }
+
+ @Override
+ public Long snapshotId() {
+ return wrapped.snapshotId();
+ }
+
+ @Override
+ public boolean hasAddedFiles() {
+ return wrapped.hasAddedFiles();
+ }
+
+ @Override
+ public Integer addedFilesCount() {
+ return wrapped.addedFilesCount();
+ }
+
+ @Override
+ public Long addedRowsCount() {
+ return wrapped.addedRowsCount();
+ }
+
+ @Override
+ public boolean hasExistingFiles() {
+ return wrapped.hasExistingFiles();
+ }
+
+ @Override
+ public Integer existingFilesCount() {
+ return wrapped.existingFilesCount();
+ }
+
+ @Override
+ public Long existingRowsCount() {
+ return wrapped.existingRowsCount();
+ }
+
+ @Override
+ public boolean hasDeletedFiles() {
+ return wrapped.hasDeletedFiles();
+ }
+
+ @Override
+ public Integer deletedFilesCount() {
+ return wrapped.deletedFilesCount();
+ }
+
+ @Override
+ public Long deletedRowsCount() {
+ return wrapped.deletedRowsCount();
+ }
+
+ @Override
+ public List partitions() {
+ return wrapped.partitions();
+ }
+
+ @Override
+ public ByteBuffer keyMetadata() {
+ return wrapped.keyMetadata();
+ }
+
+ @Override
+ public ManifestFile copy() {
+ return wrapped.copy();
+ }
+ }
+
+ static Schema entrySchema(Types.StructType partitionType) {
+ return wrapFileSchema(fileType(partitionType));
+ }
+
+ static Schema wrapFileSchema(Types.StructType fileSchema) {
+ // this is used to build projection schemas
+ return new Schema(
+ ManifestEntry.STATUS,
+ ManifestEntry.SNAPSHOT_ID,
+ ManifestEntry.SEQUENCE_NUMBER,
+ ManifestEntry.FILE_SEQUENCE_NUMBER,
+ required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
+ }
+
+ static Types.StructType fileType(Types.StructType partitionType) {
+ return Types.StructType.of(
+ DataFile.CONTENT.asRequired(),
+ DataFile.FILE_PATH,
+ DataFile.FILE_FORMAT,
+ required(
+ DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType, DataFile.PARTITION_DOC),
+ DataFile.RECORD_COUNT,
+ DataFile.FILE_SIZE,
+ DataFile.COLUMN_SIZES,
+ DataFile.VALUE_COUNTS,
+ DataFile.NULL_VALUE_COUNTS,
+ DataFile.NAN_VALUE_COUNTS,
+ DataFile.LOWER_BOUNDS,
+ DataFile.UPPER_BOUNDS,
+ DataFile.KEY_METADATA,
+ DataFile.SPLIT_OFFSETS,
+ DataFile.EQUALITY_IDS,
+ DataFile.SORT_ORDER_ID);
+ }
+
+ static class IndexedManifestEntry>
+ implements ManifestEntry, IndexedRecord {
+ private final org.apache.avro.Schema avroSchema;
+ private final Long commitSnapshotId;
+ private final IndexedDataFile> fileWrapper;
+ private ManifestEntry wrapped = null;
+
+ IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) {
+ this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
+ this.commitSnapshotId = commitSnapshotId;
+ this.fileWrapper = new IndexedDataFile<>(partitionType);
+ }
+
+ public IndexedManifestEntry wrap(ManifestEntry entry) {
+ this.wrapped = entry;
+ return this;
+ }
+
+ @Override
+ public org.apache.avro.Schema getSchema() {
+ return avroSchema;
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ throw new UnsupportedOperationException("Cannot modify IndexedManifestEntry wrapper via put");
+ }
+
+ @Override
+ public Object get(int i) {
+ switch (i) {
+ case 0:
+ return wrapped.status().id();
+ case 1:
+ return wrapped.snapshotId();
+ case 2:
+ if (wrapped.dataSequenceNumber() == null) {
+ // if the entry's data sequence number is null,
+ // then it will inherit the sequence number of the current commit.
+ // to validate that this is correct, check that the snapshot id is either null (will
+ // also be inherited) or that it matches the id of the current commit.
+ Preconditions.checkState(
+ wrapped.snapshotId() == null || wrapped.snapshotId().equals(commitSnapshotId),
+ "Found unassigned sequence number for an entry from snapshot: %s",
+ wrapped.snapshotId());
+
+ // inheritance should work only for ADDED entries
+ Preconditions.checkState(
+ wrapped.status() == Status.ADDED,
+ "Only entries with status ADDED can have null sequence number");
+
+ return null;
+ }
+ return wrapped.dataSequenceNumber();
+ case 3:
+ return wrapped.fileSequenceNumber();
+ case 4:
+ return fileWrapper.wrap(wrapped.file());
+ default:
+ throw new UnsupportedOperationException("Unknown field ordinal: " + i);
+ }
+ }
+
+ @Override
+ public Status status() {
+ return wrapped.status();
+ }
+
+ @Override
+ public Long snapshotId() {
+ return wrapped.snapshotId();
+ }
+
+ @Override
+ public void setSnapshotId(long snapshotId) {
+ wrapped.setSnapshotId(snapshotId);
+ }
+
+ @Override
+ public Long dataSequenceNumber() {
+ return wrapped.dataSequenceNumber();
+ }
+
+ @Override
+ public void setDataSequenceNumber(long dataSequenceNumber) {
+ wrapped.setDataSequenceNumber(dataSequenceNumber);
+ }
+
+ @Override
+ public Long fileSequenceNumber() {
+ return wrapped.fileSequenceNumber();
+ }
+
+ @Override
+ public void setFileSequenceNumber(long fileSequenceNumber) {
+ wrapped.setFileSequenceNumber(fileSequenceNumber);
+ }
+
+ @Override
+ public F file() {
+ return wrapped.file();
+ }
+
+ @Override
+ public ManifestEntry copy() {
+ return wrapped.copy();
+ }
+
+ @Override
+ public ManifestEntry copyWithoutStats() {
+ return wrapped.copyWithoutStats();
+ }
+ }
+
+ /** Wrapper used to write DataFile or DeleteFile to v3 metadata. */
+ static class IndexedDataFile implements ContentFile, IndexedRecord {
+ private final org.apache.avro.Schema avroSchema;
+ private final IndexedStructLike partitionWrapper;
+ private ContentFile wrapped = null;
+
+ IndexedDataFile(Types.StructType partitionType) {
+ this.avroSchema = AvroSchemaUtil.convert(fileType(partitionType), "data_file");
+ this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema());
+ }
+
+ @SuppressWarnings("unchecked")
+ IndexedDataFile wrap(ContentFile> file) {
+ this.wrapped = (ContentFile) file;
+ return this;
+ }
+
+ @Override
+ public org.apache.avro.Schema getSchema() {
+ return avroSchema;
+ }
+
+ @Override
+ public Object get(int pos) {
+ switch (pos) {
+ case 0:
+ return wrapped.content().id();
+ case 1:
+ return wrapped.path().toString();
+ case 2:
+ return wrapped.format() != null ? wrapped.format().toString() : null;
+ case 3:
+ return partitionWrapper.wrap(wrapped.partition());
+ case 4:
+ return wrapped.recordCount();
+ case 5:
+ return wrapped.fileSizeInBytes();
+ case 6:
+ return wrapped.columnSizes();
+ case 7:
+ return wrapped.valueCounts();
+ case 8:
+ return wrapped.nullValueCounts();
+ case 9:
+ return wrapped.nanValueCounts();
+ case 10:
+ return wrapped.lowerBounds();
+ case 11:
+ return wrapped.upperBounds();
+ case 12:
+ return wrapped.keyMetadata();
+ case 13:
+ return wrapped.splitOffsets();
+ case 14:
+ return wrapped.equalityFieldIds();
+ case 15:
+ return wrapped.sortOrderId();
+ }
+ throw new IllegalArgumentException("Unknown field ordinal: " + pos);
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ throw new UnsupportedOperationException("Cannot modify IndexedDataFile wrapper via put");
+ }
+
+ @Override
+ public Long pos() {
+ return null;
+ }
+
+ @Override
+ public int specId() {
+ return wrapped.specId();
+ }
+
+ @Override
+ public FileContent content() {
+ return wrapped.content();
+ }
+
+ @Override
+ public CharSequence path() {
+ return wrapped.path();
+ }
+
+ @Override
+ public FileFormat format() {
+ return wrapped.format();
+ }
+
+ @Override
+ public StructLike partition() {
+ return wrapped.partition();
+ }
+
+ @Override
+ public long recordCount() {
+ return wrapped.recordCount();
+ }
+
+ @Override
+ public long fileSizeInBytes() {
+ return wrapped.fileSizeInBytes();
+ }
+
+ @Override
+ public Map columnSizes() {
+ return wrapped.columnSizes();
+ }
+
+ @Override
+ public Map valueCounts() {
+ return wrapped.valueCounts();
+ }
+
+ @Override
+ public Map nullValueCounts() {
+ return wrapped.nullValueCounts();
+ }
+
+ @Override
+ public Map nanValueCounts() {
+ return wrapped.nanValueCounts();
+ }
+
+ @Override
+ public Map lowerBounds() {
+ return wrapped.lowerBounds();
+ }
+
+ @Override
+ public Map upperBounds() {
+ return wrapped.upperBounds();
+ }
+
+ @Override
+ public ByteBuffer keyMetadata() {
+ return wrapped.keyMetadata();
+ }
+
+ @Override
+ public List splitOffsets() {
+ return wrapped.splitOffsets();
+ }
+
+ @Override
+ public List equalityFieldIds() {
+ return wrapped.equalityFieldIds();
+ }
+
+ @Override
+ public Integer sortOrderId() {
+ return wrapped.sortOrderId();
+ }
+
+ @Override
+ public Long dataSequenceNumber() {
+ return wrapped.dataSequenceNumber();
+ }
+
+ @Override
+ public Long fileSequenceNumber() {
+ return wrapped.fileSequenceNumber();
+ }
+
+ @Override
+ public F copy() {
+ throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
+ }
+
+ @Override
+ public F copyWithStats(Set requestedColumnIds) {
+ throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
+ }
+
+ @Override
+ public F copyWithoutStats() {
+ throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
index fb3c27220cb2..cea7003c1a38 100644
--- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
@@ -229,7 +229,8 @@ protected long numOutputFiles(long inputSize) {
// the remainder file is of a valid size for this rewrite so keep it
return numFilesWithRemainder;
- } else if (avgFileSizeWithoutRemainder < Math.min(1.1 * targetFileSize, writeMaxFileSize())) {
+ } else if (avgFileSizeWithoutRemainder
+ < Math.min(1.1 * targetFileSize, (double) writeMaxFileSize())) {
// if the reminder is distributed amongst other files,
// the average file size will be no more than 10% bigger than the target file size
// so round down and distribute remainder amongst other files
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index ff20ba53ff70..cef57cd16726 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -36,6 +36,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.Filter;
@@ -398,33 +399,9 @@ private static class DataFileFilter extends Filter {
@Override
protected boolean shouldKeep(T posDelete) {
- return charSeqEquals(dataLocation, (CharSequence) FILENAME_ACCESSOR.get(posDelete));
- }
-
- private boolean charSeqEquals(CharSequence s1, CharSequence s2) {
- if (s1 == s2) {
- return true;
- }
-
- int count = s1.length();
- if (count != s2.length()) {
- return false;
- }
-
- if (s1 instanceof String && s2 instanceof String && s1.hashCode() != s2.hashCode()) {
- return false;
- }
-
- // File paths inside a delete file normally have more identical chars at the beginning. For
- // example, a typical
- // path is like "s3:/bucket/db/table/data/partition/00000-0-[uuid]-00001.parquet".
- // The uuid is where the difference starts. So it's faster to find the first diff backward.
- for (int i = count - 1; i >= 0; i--) {
- if (s1.charAt(i) != s2.charAt(i)) {
- return false;
- }
- }
- return true;
+ return Comparators.filePath()
+ .compare(dataLocation, (CharSequence) FILENAME_ACCESSOR.get(posDelete))
+ == 0;
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/io/ContentCache.java b/core/src/main/java/org/apache/iceberg/io/ContentCache.java
index 7942c69d5d77..ce37cfb08934 100644
--- a/core/src/main/java/org/apache/iceberg/io/ContentCache.java
+++ b/core/src/main/java/org/apache/iceberg/io/ContentCache.java
@@ -28,7 +28,6 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
-import java.util.function.Function;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
@@ -111,24 +110,6 @@ public CacheStats stats() {
return cache.stats();
}
- /** @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead */
- @Deprecated
- public CacheEntry get(String key, Function mappingFunction) {
- return cache.get(key, mappingFunction);
- }
-
- /** @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead */
- @Deprecated
- public CacheEntry getIfPresent(String location) {
- return cache.getIfPresent(location);
- }
-
- /** @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead */
- @Deprecated
- public InputFile tryCache(FileIO io, String location, long length) {
- return tryCache(io.newInputFile(location, length));
- }
-
/**
* Try cache the file-content of file in the given location upon stream reading.
*
@@ -173,11 +154,7 @@ public String toString() {
.toString();
}
- /** @deprecated will be removed in 1.7; use {@link FileContent} instead. */
- @Deprecated
- private static class CacheEntry {}
-
- private static class FileContent extends CacheEntry {
+ private static class FileContent {
private final long length;
private final List buffers;
diff --git a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
index ad66e8d32408..e5e4972603dc 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
@@ -50,6 +50,8 @@ public interface CommitMetricsResult {
String ADDED_EQ_DELETES = "added-equality-deletes";
String REMOVED_EQ_DELETES = "removed-equality-deletes";
String TOTAL_EQ_DELETES = "total-equality-deletes";
+ String TOTAL_DATA_MANIFEST_FILES = "total-data-manifest-files";
+ String TOTAL_DELETE_MANIFEST_FILES = "total-delete-manifest-files";
@Nullable
TimerResult totalDuration();
@@ -123,6 +125,12 @@ public interface CommitMetricsResult {
@Nullable
CounterResult totalEqualityDeletes();
+ @Nullable
+ CounterResult totalDataManifestFiles();
+
+ @Nullable
+ CounterResult totalDeleteManifestFiles();
+
static CommitMetricsResult from(
CommitMetrics commitMetrics, Map snapshotSummary) {
Preconditions.checkArgument(null != commitMetrics, "Invalid commit metrics: null");
@@ -163,6 +171,10 @@ static CommitMetricsResult from(
.removedEqualityDeletes(
counterFrom(snapshotSummary, SnapshotSummary.REMOVED_EQ_DELETES_PROP))
.totalEqualityDeletes(counterFrom(snapshotSummary, SnapshotSummary.TOTAL_EQ_DELETES_PROP))
+ .totalDataManifestFiles(
+ counterFrom(snapshotSummary, SnapshotSummary.TOTAL_DATA_MANIFEST_FILES))
+ .totalDeleteManifestFiles(
+ counterFrom(snapshotSummary, SnapshotSummary.TOTAL_DELETE_MANIFEST_FILES))
.build();
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java
index aadb97bc7112..263b3c305af0 100644
--- a/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java
@@ -149,7 +149,7 @@ public TimeValue getRetryInterval(HttpResponse response, int execCount, HttpCont
}
}
- int delayMillis = 1000 * (int) Math.min(Math.pow(2.0, (long) execCount - 1), 64.0);
+ int delayMillis = 1000 * (int) Math.min(Math.pow(2.0, (long) execCount - 1.0), 64.0);
int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMillis * 0.1)));
return TimeValue.ofMilliseconds(delayMillis + jitter);
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index a72d3958c140..1c607e3b0220 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -547,7 +547,7 @@ public void createNamespace(
public List listNamespaces(SessionContext context, Namespace namespace) {
Map queryParams = Maps.newHashMap();
if (!namespace.isEmpty()) {
- queryParams.put("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels()));
+ queryParams.put("parent", RESTUtil.encodeNamespace(namespace));
}
ImmutableList.Builder namespaces = ImmutableList.builder();
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
index fab01162cad7..45422b8ae8b5 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
@@ -33,14 +33,24 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
public class RESTUtil {
- private static final char NAMESPACE_SEPARATOR = '\u001f';
- public static final Joiner NAMESPACE_JOINER = Joiner.on(NAMESPACE_SEPARATOR);
- public static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_SEPARATOR);
private static final String NAMESPACE_ESCAPED_SEPARATOR = "%1F";
private static final Joiner NAMESPACE_ESCAPED_JOINER = Joiner.on(NAMESPACE_ESCAPED_SEPARATOR);
private static final Splitter NAMESPACE_ESCAPED_SPLITTER =
Splitter.on(NAMESPACE_ESCAPED_SEPARATOR);
+ /**
+ * @deprecated since 1.7.0, will be made private in 1.8.0; use {@link
+ * RESTUtil#encodeNamespace(Namespace)} instead.
+ */
+ @Deprecated public static final Joiner NAMESPACE_JOINER = Joiner.on(NAMESPACE_ESCAPED_SEPARATOR);
+
+ /**
+ * @deprecated since 1.7.0, will be made private in 1.8.0; use {@link
+ * RESTUtil#decodeNamespace(String)} instead.
+ */
+ @Deprecated
+ public static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_ESCAPED_SEPARATOR);
+
private RESTUtil() {}
public static String stripTrailingSlash(String path) {
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
index 189e5fde2cad..52c89af9d474 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
@@ -465,26 +465,6 @@ public AuthSession(Map baseHeaders, AuthConfig config) {
this.config = config;
}
- /** @deprecated since 1.6.0, will be removed in 1.7.0 */
- @Deprecated
- public AuthSession(
- Map baseHeaders,
- String token,
- String tokenType,
- String credential,
- String scope,
- String oauth2ServerUri) {
- this(
- baseHeaders,
- AuthConfig.builder()
- .token(token)
- .tokenType(tokenType)
- .credential(credential)
- .scope(scope)
- .oauth2ServerUri(oauth2ServerUri)
- .build());
- }
-
public Map headers() {
return headers;
}
diff --git a/core/src/main/java/org/apache/iceberg/util/Pair.java b/core/src/main/java/org/apache/iceberg/util/Pair.java
index bd3a934f6f04..e36321c8e2c9 100644
--- a/core/src/main/java/org/apache/iceberg/util/Pair.java
+++ b/core/src/main/java/org/apache/iceberg/util/Pair.java
@@ -58,11 +58,6 @@ public Schema load(Pair, Class>> key) {
private X first;
private Y second;
- /** Constructor used by Avro */
- private Pair(Schema schema) {
- this.schema = schema;
- }
-
private Pair(X first, Y second) {
this.first = first;
this.second = second;
diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index 6486bd7fd483..27cd96a39733 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -35,6 +35,7 @@
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.io.Closer;
@@ -77,7 +78,8 @@ public CloseableIterator iterator() {
return iter;
}
- private static class ParallelIterator implements CloseableIterator {
+ @VisibleForTesting
+ static class ParallelIterator implements CloseableIterator {
private final Iterator> tasks;
private final Deque> yieldedTasks = new ArrayDeque<>();
private final ExecutorService workerPool;
@@ -99,6 +101,7 @@ private ParallelIterator(
}
@Override
+ @SuppressWarnings("FutureReturnValueIgnored")
public void close() {
// close first, avoid new task submit
this.closed.set(true);
@@ -229,6 +232,11 @@ public synchronized T next() {
}
return queue.poll();
}
+
+ @VisibleForTesting
+ int queueSize() {
+ return queue.size();
+ }
}
private static class Task implements Supplier>>, Closeable {
diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java
index 02d2b834311f..14804e040755 100644
--- a/core/src/main/java/org/apache/iceberg/util/Tasks.java
+++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java
@@ -450,7 +450,9 @@ private void runTaskWithRetry(Task task, I item) thr
}
int delayMs =
- (int) Math.min(minSleepTimeMs * Math.pow(scaleFactor, attempt - 1), maxSleepTimeMs);
+ (int)
+ Math.min(
+ minSleepTimeMs * Math.pow(scaleFactor, attempt - 1), (double) maxSleepTimeMs);
int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMs * 0.1)));
LOG.warn("Retrying task after failure: {}", e.getMessage(), e);
diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java
index 18b452f98367..df96b90eb728 100644
--- a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java
+++ b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java
@@ -102,6 +102,7 @@ public ViewMetadata refresh() {
}
@Override
+ @SuppressWarnings("ImmutablesReferenceEquality")
public void commit(ViewMetadata base, ViewMetadata metadata) {
// if the metadata is already out of date, reject it
if (base != current()) {
diff --git a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
index 229650566ca8..836a1ddd80f5 100644
--- a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
@@ -46,7 +46,7 @@ public abstract class DeleteFileIndexTestBase<
@Parameters(name = "formatVersion = {0}")
public static List