Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] The partition expire was not correctly triggered during the commit execution. #3434

Open
2 tasks done
otherhy opened this issue May 30, 2024 · 4 comments
Open
2 tasks done
Labels
bug Something isn't working

Comments

@otherhy
Copy link

otherhy commented May 30, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

0.8

Compute Engine

JavaAPI

Minimal reproduce step

According to the examples in the documentation, this is how we perform the commit. Taking BatchWrite as an example, after writing the data, we need to create a new instance of the BatchTableCommit class each time to execute the commit.

// 3. Collect all CommitMessages to a global node and commit
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);

Based on the following call chain, we can infer that each time a BatchTableCommit instance is created, a TableCommitImpl instance is ultimately created within it. In its constructor, a PartitionExpire instance is passed as a parameter.

- org.apache.paimon.table.sink.BatchWriteBuilderImpl#newCommit
  - org.apache.paimon.table.AbstractFileStoreTable#newCommit(java.lang.String)
    - org.apache.paimon.table.AbstractFileStoreTable#newCommit(java.lang.String, java.lang.String)

Based on the following code, we can infer that each time a TableCommitImpl instance is created, the org.apache.paimon.AbstractFileStore#newPartitionExpire method is called to create a new PartitionExpire instance.

return new TableCommitImpl(
                store().newCommit(commitUser, branchName),
                createCommitCallbacks(),
                snapshotExpire,

                // creates a new PartitionExpire instance
                options.writeOnly() ? null : store().newPartitionExpire(commitUser),

                options.writeOnly() ? null : store().newTagCreationManager(),
                catalogEnvironment.lockFactory().create(),
                CoreOptions.fromMap(options()).consumerExpireTime(),
                new ConsumerManager(fileIO, path),
                coreOptions().snapshotExpireExecutionMode(),
                name(),
                coreOptions().forceCreatingSnapshot());

Based on the constructor of the PartitionExpire class, we can infer that when the instance is initialized, lastCheck is set to the current time.

public PartitionExpire(
            RowType partitionType,
            Duration expirationTime,
            Duration checkInterval,
            String timePattern,
            String timeFormatter,
            FileStoreScan scan,
            FileStoreCommit commit) {
        this.partitionKeys = partitionType.getFieldNames();
        this.toObjectArrayConverter = new RowDataToObjectArrayConverter(partitionType);
        this.expirationTime = expirationTime;
        this.checkInterval = checkInterval;
        this.timeExtractor = new PartitionTimeExtractor(timePattern, timeFormatter);
        this.scan = scan;
        this.commit = commit;

        // Initialize lastCheck to the current time.
        this.lastCheck = LocalDateTime.now();
    }

After BatchTableCommit is created, based on the example, we immediately start the commit. When the commit is completed, the org.apache.paimon.operation.PartitionExpire#expire(long) method of the PartitionExpire instance is called, as shown in the following code, to check for partition expiration.

public void expire(long commitIdentifier) {
        expire(LocalDateTime.now(), commitIdentifier);
    }

@VisibleForTesting
    void expire(LocalDateTime now, long commitIdentifier) {
        if (now.isAfter(lastCheck.plus(checkInterval))) {
            doExpire(now.minus(expirationTime), commitIdentifier);
            lastCheck = now;
        }
    }

But at this time, lastCheck is set to now because it was just initialized. Using the default value checkInterval=1h as an example, lastCheck.plus(checkInterval) would be one hour later. Therefore, now.isAfter(lastCheck.plus(checkInterval)) always results in false, causing the partition expiration to be skipped.
And because the BatchTableCommit can only perform a single commit, the next time we execute a commit, we will use a brand new PartitionExpire instance. This causes our commits to always fail to trigger the partition expiration check.

Please help me check if my logic is correct or if there is an issue with my usage.

What doesn't meet your expectations?

The partition expiration parameters set on the table did not take effect because they were not correctly triggered during the commit.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@otherhy otherhy added the bug Something isn't working label May 30, 2024
@otherhy
Copy link
Author

otherhy commented May 30, 2024

If my logic is correct, then I believe this issue is caused by the PartitionExpire instance not being reused. It can be resolved by recording the instance object in the upper layer AbstractFileStoreTable and reusing this instance object each time a TableCommitImpl instance is created.

I look forward to your response and feedback.

@AkisAya
Copy link

AkisAya commented Jun 6, 2024

in a stream env, StoreCommitter holds TableCommitImpl, so expiration of partition works as expected。But in a batch env, if you create TableCommitImpl every time, sure there will be a new PartitionExpire every time and PartitionExpire#expire() will not be executed。but in both stream env and batch env, paimon uses TableCommitImpl to commit changes, there's no other way to differ it's a batch env or stream env in TableCommitImpl. But it does have a isStreamingMode which is used in table.newWrite() but not used in table.newCommit(), see withExecutionMode

# BatchWriteBuilderImpl
    @Override
    public BatchTableWrite newWrite() {
        return table.newWrite(commitUser)
                .withIgnorePreviousFiles(staticPartition != null)
                .withExecutionMode(false);
    }

    @Override
    public BatchTableCommit newCommit() {
        InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition);
        commit.ignoreEmptyCommit(true);
        return commit;
    }

maybe isStreamingMode should be set when create a TableCommitImpl so a TableCommitImpl can differ stream and batch env and do diffrent things when expire a partition.

Overall, Paimon is a stream-first computing engine, and some designs do not take batch processing into consideration.
or maybe they want you to use a seperate Partition Expiration to doExpire. just like what they do in PartitionExpireTest, to new a Expire to do Expiration. And maybe there should be a PartitionExpirationProcedure for spark engine too.

@LinMingQiang
Copy link
Contributor

Just need add parameter isEndInput for committer to distinguish batch or streaming mode.

@otherhy
Copy link
Author

otherhy commented Jul 25, 2024

Just need add parameter isEndInput for committer to distinguish batch or streaming mode.只需要添加参数 isEndInput 供提交者区分批处理或流式处理模式。

My issue is not that the submitter cannot distinguish between batch processing and streaming modes, but rather that the BatchWrite I am using cannot be triggered at all. Your commit did not resolve my problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants