Skip to content

Commit

Permalink
add some log
Browse files Browse the repository at this point in the history
  • Loading branch information
pongandnoon committed Dec 26, 2023
1 parent 8a9d9c9 commit 7f0188b
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public WriterContainer<T> createWriterContainer(
if (LOG.isDebugEnabled()) {
LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket);
}

final long startTs = System.currentTimeMillis();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
List<DataFileMeta> restoreFiles = new ArrayList<>();
if (!ignorePreviousFiles && latestSnapshotId != null) {
Expand All @@ -347,6 +347,11 @@ public WriterContainer<T> createWriterContainer(
RecordWriter<T> writer =
createWriter(partition.copy(), bucket, restoreFiles, null, compactExecutor());
notifyNewWriter(writer);
LOG.info(
"Creating writer for partition {}, bucket {}, time {} ms",
partition,
bucket,
System.currentTimeMillis() - startTs);
return new WriterContainer<>(writer, indexMaintainer, latestSnapshotId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ public void expireUntil(long earliestId, long endExclusiveId) {

endExclusiveId = Math.min(beginInclusiveId + expireLimit, endExclusiveId);

if (LOG.isDebugEnabled()) {
LOG.debug(
if (LOG.isInfoEnabled()) {
LOG.info(
"Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
}

Expand All @@ -175,8 +175,8 @@ public void expireUntil(long earliestId, long endExclusiveId) {
// deleted merge tree files in a snapshot are not used by the next snapshot, so the range of
// id should be (beginInclusiveId, endExclusiveId]
for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete merge tree files not used by snapshot #" + id);
if (LOG.isInfoEnabled()) {
LOG.info("Ready to delete merge tree files not used by snapshot #" + id);
}
Snapshot snapshot = snapshotManager.snapshot(id);
// expire merge tree files and collect changed buckets
Expand All @@ -197,8 +197,8 @@ public void expireUntil(long earliestId, long endExclusiveId) {

// delete changelog files
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete changelog files from snapshot #" + id);
if (LOG.isInfoEnabled()) {
LOG.info("Ready to delete changelog files from snapshot #" + id);
}
Snapshot snapshot = snapshotManager.snapshot(id);
if (snapshot.changelogManifestList() != null) {
Expand All @@ -217,8 +217,8 @@ public void expireUntil(long earliestId, long endExclusiveId) {
skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
Set<String> skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete manifests in snapshot #" + id);
if (LOG.isInfoEnabled()) {
LOG.info("Ready to delete manifests in snapshot #" + id);
}

Snapshot snapshot = snapshotManager.snapshot(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -73,6 +75,7 @@
public abstract class AbstractFileStoreTable implements FileStoreTable {

private static final long serialVersionUID = 1L;
protected static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreTable.class);

protected final FileIO fileIO;
protected final Path path;
Expand Down Expand Up @@ -371,12 +374,17 @@ private Optional<TableSchema> tryTimeTravel(Options options) {
}
return Optional.empty();
case FROM_TIMESTAMP:
Snapshot snapshot =
StaticFromTimestampStartingScanner.timeTravelToTimestamp(
snapshotManager(), coreOptions.scanTimestampMills());
if (snapshot != null) {
long schemaId = snapshot.schemaId();
return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
try {
Snapshot snapshot =
StaticFromTimestampStartingScanner.timeTravelToTimestamp(
snapshotManager(), coreOptions.scanTimestampMills());
if (snapshot != null) {
long schemaId = snapshot.schemaId();
return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
}
} catch (Exception ex) {
LOG.warn("Get table schema from snapshot error,snapshot maybe expired");
return Optional.empty();
}
return Optional.empty();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -39,6 +42,8 @@ public class FileUtils {

public static final ForkJoinPool COMMON_IO_FORK_JOIN_POOL;

private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);

// if we want to name threads in the fork join pool we need all these
// see https://stackoverflow.com/questions/34303094/
static {
Expand All @@ -49,6 +54,7 @@ public class FileUtils {
worker.setName("file-store-common-io-" + worker.getPoolIndex());
return worker;
};
LOG.info("runtime availableProcessors:{}", Runtime.getRuntime().availableProcessors());
COMMON_IO_FORK_JOIN_POOL =
new ForkJoinPool(Runtime.getRuntime().availableProcessors(), factory, null, false);
}
Expand Down

0 comments on commit 7f0188b

Please sign in to comment.