Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-8760 PostgresJdbcChannelMessageStore using DELETE ... RETURNING #8762

Conversation

joshiste
Copy link
Contributor

A first draft of a PostgresJdbcChannelMessageStore implementation using a single statement - designed as proposed in #8760 (comment)

I wonder if other DBMS have similar operations. If yes, we could add a isSingleStatementPoll() to the ChannelMessageStoreQueryProvider interface and if true the JdbcChannelMessageStore omits the delete. Also, that would prevent using the wrong query provider with the wrong store. WDYT?

Fixes #8760

@joshiste
Copy link
Contributor Author

joshiste commented Oct 12, 2023

Test results are in!

averaged for all runs:
image

Test 'no_skip' implementation with 1 poller threads
Polled 20000 messages in 13226 ms. throughput 1512,17 msg/s
Polled 20000 messages in 18784 ms. throughput 1064,74 msg/s
Polled 20000 messages in 15093 ms. throughput 1325,12 msg/s
Polled 20000 messages in 16712 ms. throughput 1196,74 msg/s
Polled 20000 messages in 20011 ms. throughput 999,45 msg/s

Test ‘with_skip' implementation with 1 poller threads
Polled 20000 messages in 17045 ms. throughput 1173,36 msg/s
Polled 20000 messages in 18307 ms. throughput 1092,48 msg/s
Polled 20000 messages in 12968 ms. throughput 1542,26 msg/s
Polled 20000 messages in 19372 ms. throughput 1032,42 msg/s
Polled 20000 messages in 17237 ms. throughput 1160,29 msg/s

Test 'delete_returning' implementation with 1 poller threads
Polled 20000 messages in 6827 ms. throughput 2929,54 msg/s
Polled 20000 messages in 9767 ms. throughput 2047,71 msg/s
Polled 20000 messages in 13346 ms. throughput 1498,58 msg/s
Polled 20000 messages in 10869 ms. throughput 1840,10 msg/s
Polled 20000 messages in 10431 ms. throughput 1917,36 msg/s

Test 'no_skip' implementation with 5 poller threads (~4 times not deleted warning each run)
Polled 20000 messages in 12758 ms. throughput 1567,64 msg/s 
Polled 20000 messages in 14951 ms. throughput 1337,70 msg/s
Polled 20000 messages in 16416 ms. throughput 1218,32 msg/s
Polled 20000 messages in 15664 ms. throughput 1276,81 msg/s
Polled 20000 messages in 17329 ms. throughput 1154,13 msg/s

Test 'with_skip' implementation with 5 poller threads (~4 times not deleted warning each run)
Polled 20000 messages in 20124 ms. throughput 993,84 msg/s
Polled 20000 messages in 15439 ms. throughput 1295,42 msg/s
Polled 20000 messages in 18370 ms. throughput 1088,73 msg/s
Polled 20000 messages in 15791 ms. throughput 1266,54 msg/s
Polled 20000 messages in 16029 ms. throughput 1247,74 msg/s

Test 'delete_returning' implementation with 5 poller threads
Polled 20000 messages in 12512 ms. throughput 1598,47 msg/s
Polled 20000 messages in 9230 ms. throughput 2166,85 msg/s
Polled 20000 messages in 11494 ms. throughput 1740,04 msg/s
Polled 20000 messages in 14085 ms. throughput 1419,95 msg/s
Polled 20000 messages in 10895 ms. throughput 1835,70 msg/s

}

@Override
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to assume that there any other ChannelMessageStoreQueryProvider impl but not just PostgresChannelMessageStoreQueryProvider.

Let's just override those poll methods in the PostgresChannelMessageStoreQueryProvider and allow to set only that one it this PostgresJdbcMessageStore having a new setPostgresChannelMessageStoreQueryProvider (if you think someone may provide their own impl) with an UnsupportedOperationException for this overridden setChannelMessageStoreQueryProvider()!

I mean that we don't need extra DeleteReturning... abstraction. I doubt any other RDBMS have something similar to worry right now about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that we don't need extra DeleteReturning... abstraction. I doubt any other RDBMS have something similar to worry right now about.

At least Oracle has it: https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/DELETE.html#GUID-156845A5-B626-412B-9F95-8869B988ABD7

Therefore, I'd favor adding the "single statement poll" as a concept in the JDBCMessageChannelStore over specialization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just commented about generic nature of the ChannelMessageStoreQueryProvider.
We can look into Oracle feature later on.

…... RETURNING`

Fixes spring-projects#8760

* Add PostgresJdbcChannelMessageStore and tests
@joshiste joshiste force-pushed the feat/optimized-postgres-jdbc-channel-message-store branch from 459f163 to 1755cc6 Compare October 12, 2023 19:43
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point now about:

isSingleStatementPoll() to the ChannelMessageStoreQueryProvider

and no extra DeleteReturning and specific PostgresJdbcChannelMessageStore!

Can you rework that way?
Or do I miss anything else?

Thanks

@artembilan
Copy link
Member

Please, don't do commits squash: Github doesn't notify us about such a push into the PR.
We do squash on merge anyway.
Please, incremental commits make it easier to review: I wouldn't need to look into those classes I have reviewed before and you haven't done any changes for in a fresh commit.

@joshiste joshiste force-pushed the feat/optimized-postgres-jdbc-channel-message-store branch from 8e7ab93 to 3ee09d8 Compare October 13, 2023 21:25
@@ -633,6 +633,10 @@ protected Message<?> doPollForMessage(String groupIdKey) {
}

private boolean doRemoveMessageFromGroup(Object groupId, Message<?> messageToRemove) {
if (this.channelMessageStoreQueryProvider.isSingleStatementForPoll()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with this expression being present in this method.
It really does not reflect the purpose of this method.
Please, consider to implement this if in the pollMessageFromGroup() instead.

and %PREFIX%CHANNEL_MESSAGE.REGION = :region
and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids)
order by CREATED_DATE, MESSAGE_SEQUENCE
limit 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we don't need FOR UPDATE. But how about SKIP LOCKED?
WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I don't know. I think I'll run my performance tests if it makes a difference. Keep you posted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.
Would you mind coming back with results as soon as possible?
We have a release today, so would be great to have this merged.

@joshiste
Copy link
Contributor Author

joshiste commented Oct 17, 2023

OK... running with the skip locked in delete returning gives some additional boost when using multi reader threads...

[noSkip] 1 threads; 20000 messages; duration 19055ms throughput; 1049,593 msg/s
[noSkip] 1 threads; 20000 messages; duration 19130ms throughput; 1045,478 msg/s
[noSkip] 1 threads; 20000 messages; duration 20540ms throughput; 973,710 msg/s
[noSkip] 1 threads; 20000 messages; duration 17644ms throughput; 1133,530 msg/s
[noSkip] 1 threads; 20000 messages; duration 19814ms throughput; 1009,387 msg/s
[noSkip] 5 threads; 20000 messages; duration 18606ms throughput; 1074,922 msg/s
[noSkip] 5 threads; 20000 messages; duration 21429ms throughput; 933,315 msg/s
[noSkip] 5 threads; 20000 messages; duration 19467ms throughput; 1027,380 msg/s
[noSkip] 5 threads; 20000 messages; duration 22763ms throughput; 878,619 msg/s
[noSkip] 5 threads; 20000 messages; duration 21031ms throughput; 950,977 msg/s
[skip] 1 threads; 20000 messages; duration 18306ms throughput; 1092,538 msg/s
[skip] 1 threads; 20000 messages; duration 19745ms throughput; 1012,915 msg/s
[skip] 1 threads; 20000 messages; duration 19698ms throughput; 1015,332 msg/s
[skip] 1 threads; 20000 messages; duration 21687ms throughput; 922,211 msg/s
[skip] 1 threads; 20000 messages; duration 18634ms throughput; 1073,307 msg/s
[skip] 5 threads; 20000 messages; duration 13968ms throughput; 1431,844 msg/s
[skip] 5 threads; 20000 messages; duration 11556ms throughput; 1730,703 msg/s
[skip] 5 threads; 20000 messages; duration 14620ms throughput; 1367,989 msg/s
[skip] 5 threads; 20000 messages; duration 15301ms throughput; 1307,104 msg/s
[skip] 5 threads; 20000 messages; duration 11820ms throughput; 1692,047 msg/s
[deleteReturning] 1 threads; 20000 messages; duration 10552ms throughput; 1895,375 msg/s
[deleteReturning] 1 threads; 20000 messages; duration 14579ms throughput; 1371,836 msg/s
[deleteReturning] 1 threads; 20000 messages; duration 10562ms throughput; 1893,581 msg/s
[deleteReturning] 1 threads; 20000 messages; duration 14027ms throughput; 1425,822 msg/s
[deleteReturning] 1 threads; 20000 messages; duration 17119ms throughput; 1168,293 msg/s
[deleteReturning] 5 threads; 20000 messages; duration 12442ms throughput; 1607,459 msg/s
[deleteReturning] 5 threads; 20000 messages; duration 18685ms throughput; 1070,377 msg/s
[deleteReturning] 5 threads; 20000 messages; duration 13863ms throughput; 1442,689 msg/s
[deleteReturning] 5 threads; 20000 messages; duration 16700ms throughput; 1197,605 msg/s
[deleteReturning] 5 threads; 20000 messages; duration 19823ms throughput; 1008,929 msg/s
[deleteReturningSkip] 1 threads; 20000 messages; duration 58722ms throughput; 340,588 msg/s
[deleteReturningSkip] 1 threads; 20000 messages; duration 11603ms throughput; 1723,692 msg/s
[deleteReturningSkip] 1 threads; 20000 messages; duration 13991ms throughput; 1429,490 msg/s
[deleteReturningSkip] 1 threads; 20000 messages; duration 10805ms throughput; 1850,995 msg/s
[deleteReturningSkip] 1 threads; 20000 messages; duration 11808ms throughput; 1693,767 msg/s
[deleteReturningSkip] 5 threads; 20000 messages; duration 3066ms throughput; 6523,157 msg/s
[deleteReturningSkip] 5 threads; 20000 messages; duration 3469ms throughput; 5765,350 msg/s
[deleteReturningSkip] 5 threads; 20000 messages; duration 3958ms throughput; 5053,057 msg/s
[deleteReturningSkip] 5 threads; 20000 messages; duration 2490ms throughput; 8032,129 msg/s
[deleteReturningSkip] 5 threads; 20000 messages; duration 2852ms throughput; 7012,623 msg/s

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import order is wrong in the PostgresJdbcChannelMessageStoreTests, but everything else is OK with me, so I'm pulling your PR locally for final review, clean up and merging.

Thank you!

@artembilan
Copy link
Member

Merged as 87a2ac5 after some clean up and docs.

@joshiste ,

thank you for contribution; looking forward for more!

@artembilan artembilan closed this Oct 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Using Postgres as ChannelMessageStore: use DELETE ... RETURNING to make polling a single operation
2 participants