-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Support jdbc catalog #2866
Conversation
7cc4f4c
to
442d343
Compare
@JingsongLi PTAL |
4fd21ac
to
443dd36
Compare
4ab8d2c
to
5c1fa42
Compare
@FangYongs @yuzelin @JingsongLi If you have time, could you please help review it ? Thx |
Thanks @sunxiaojian for your contribution. This PR looks good! I will take a look this week~ |
@@ -175,3 +176,185 @@ Using the table option facilitates the convenient definition of Hive table param | |||
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table. | |||
For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process. | |||
|
|||
|
|||
|
|||
## Creating a Catalog with Filesystem Metastore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why copy this?
-- 'jdbc.user' = '...', | ||
-- 'jdbc.password' = '...', | ||
-- 'initialize-catalog-tables'='true' | ||
-- 'warehouse' = 'hdfs:///path/to/warehouse', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warehouse should be required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should remove --
-- 'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>', | ||
-- 'jdbc.user' = '...', | ||
-- 'jdbc.password' = '...', | ||
-- 'initialize-catalog-tables'='true' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this one? What is the scene?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this one? What is the scene?
@JingsongLi When creating a JdbcCatalog, it will be determined based on this parameter whether to automatically create a table or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does it not need to be initialized? You added a public API. Please explain why, how to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or you can remove this option.
|
||
You can define any default table options with the prefix `table-default.` for tables created in the catalog. | ||
|
||
Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, FlinkGenericCatalog
must use HiveCatalog.
+ ")" | ||
+ ")"; | ||
|
||
static final String DISTRIBUTED_LOCK_ACQUIRE_SQL = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this? How to lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this? How to lock?
@JingsongLi First, create a table for distributed locks and write a data entry to the database table using database.table as the lockId. If successful, the lock acquisition is considered successful. If the primary key conflicts, it is considered a failure
@sunxiaojian Thanks for your work. This is really what we want! |
/** Util for jdbc catalog. */ | ||
public class JdbcUtils { | ||
private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); | ||
public static final String METADATA_LOCATION_PROP = "metadata_location"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, Paimon considers the commit successful once the snapshot file is successfully committed to the file system. Storing the current metadata location in the catalog metastore will cause consistency problems because the two operations of committing the snapshot file to the file system and committing it to the metastore are not atomic. So there seems to be no point in storing this metadata location? @JingsongLi Can you please review this? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhongyujiang You understanding is very correct! Yes, Paimon only consider Snapshot
files as source of truth.
metadata_location
is come from Iceberg, we don't need this.
|
||
private static int tryClearExpireLock(JdbcClientPool connections, String lockName, long timeout) | ||
throws SQLException, InterruptedException { | ||
long expirationTimeMillis = System.currentTimeMillis() - timeout * 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand this, could you please explain?
connection.prepareStatement(DISTRIBUTED_LOCK_ACQUIRE_SQL)) { | ||
preparedStatement.setString(1, lockName); | ||
preparedStatement.setTimestamp( | ||
2, new Timestamp(System.currentTimeMillis())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are using timestamp of clients as acquired_at
? I think this makes no sense since clients can be distributed on different machines, which have no consistent system time. Since acquired_at
is the timestamp of acquiring the lock, I think we should use the system time of the metastore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are using timestamp of clients as
acquired_at
? I think this makes no sense since clients can be distributed on different machines, which have no consistent system time. Sinceacquired_at
is the timestamp of acquiring the lock, I think we should use the system time of the metastore.
@zhongyujiang There may indeed be the issue you mentioned, but the database used for the meta storage system is different, and the generation system time function used is also different. May require separate compatibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial time setting was for metastore, but it was not compatible with all databases, so this version of the modification was made. In fact, specific database specific time functions can be set for each database
public static boolean acquire(JdbcClientPool connections, String lockName, long timeout) | ||
throws SQLException, InterruptedException { | ||
// Check and clear expire lock | ||
int affectedRows = tryClearExpireLock(connections, lockName, timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think timeout
is a property that should be set when acquiring the lock, not when the lock expires. Because different clients may configure different timeouts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
timeout
is a property that should be set when acquiring the lock, not when the lock expires. Because different clients may configure different timeouts.
In fact, none of them are the most suitable. They all belong to the timeout set by the client, but your solution should be better
a4fd960
to
516b5cf
Compare
@JingsongLi @zhongyujiang Thank you for the review, I have fixed the issues raised above. |
558025f
to
9d41884
Compare
9d41884
to
6a70411
Compare
Hi @sunxiaojian , you should make sure that every comment is resolved. |
@JingsongLi All have been resolved |
``` | ||
You can define any connection parameters for a database with the prefix "jdbc.". | ||
|
||
You can define the "initialize-catalog-tables" configuration to automatically create tables required for the initial JdbcCatalog. If it is true, tables will be automatically created when initializing JdbcCatalog. If it is false, tables will not be automatically created and you must manually create them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is false, tables will not be automatically created and you must manually create them.
Why is there such a need? I don't understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there such a need? I don't understand.
Some online business databases do not allow table creation through jdbc and must be submitted for approval in advance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The final decision is not to keep this configuration. It is recommended that users enable automatic table creation permissions for independent databases, which would be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some online business databases do not allow table creation through jdbc and must be submitted for approval in advance
You can document this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some online business databases do not allow table creation through jdbc and must be submitted for approval in advance
You can document this.
Most users may not need it, I will remove this parameter and add it back if there is a strong requirement.
'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>', | ||
'jdbc.user' = '...', | ||
'jdbc.password' = '...', | ||
'catalog-name'='jdbc' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we already have the concept of a catalog name, which is my_jdbc
.
Maybe this option can be renamed to store-key
or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at least, jdbc.catalog-name
, you need documentation to explain that it is designed to store multiple catalogs, and it is different from the catalog name you use in Flink SQL or Spark SQL.
'jdbc.user' = '...', | ||
'jdbc.password' = '...', | ||
'catalog-name'='jdbc' | ||
'initialize-catalog-tables'='true' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you forgot ,
import static org.apache.paimon.utils.Preconditions.checkState; | ||
|
||
/** Source: [core/src/main/java/org/apache/iceberg/ClientPoolImpl.java]. */ | ||
public abstract class ClientPoolImpl<C, E extends Exception> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move them to paimon-common
.
332fea0
to
8c2e10d
Compare
@JingsongLi All have been resolved |
String lockUniqueName = String.format("%s.%s.%s", catalogName, database, table); | ||
lock(lockUniqueName); | ||
try { | ||
return callable.call(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the lock expires before the commit is completed? Will there be conflicts? Does this break the atomicity of commits?
For example, let's consider the first column as logical timestamp, and the second column is the events that occurred at that time:
1 client-1 successfully aquired a lock of table a
2 client-1 starts committing but gets stuck for some reason
5 the lock of table a
timeouts
6 client-2 cleaned the expired lock of table a
and acquired a new lock of table a
7 client-2 starts committing
8 client-2 made a successful committment
9 client-1 can start committing
In this case, client-1's commit should fail, but here client-1 will continue to commit. If the underlying file system allows overwriting of files with the same name, then client-1's commit will overwrite client-2's commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can start a new thread renewal timeout to solve the problem, but you can also set a reasonable timeout to solve it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the current strategy, it is difficult for this situation to occur because currently using lock is only renaming the file, and this operation should be completed quickly.
@zhongyujiang Hi, do you have other comments or concerns? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left two minor comments, I have no other worries except the above one, Thanks @sunxiaojian
paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
Outdated
Show resolved
Hide resolved
@@ -175,3 +176,40 @@ Using the table option facilitates the convenient definition of Hive table param | |||
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table. | |||
For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process. | |||
|
|||
## Creating a Catalog with JDBC Metastore | |||
|
|||
By using the Paimon JDBC catalog, changes to the catalog will be directly stored in relational databases such as MySQL, postgres, etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postgres
is not supported for now, should we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postgres
is not supported for now, should we remove it?
it's just that locking is not supported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document this? only SQLITE and MYSQL supports catalog lock
736b0e2
to
0b7b739
Compare
'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>', | ||
'jdbc.user' = '...', | ||
'jdbc.password' = '...', | ||
'store-key'='jdbc', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you modify this to jdbc.catalog-key
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you modify this to
jdbc.catalog-key
?
The prefix "jdbc." is used to connect to the database configuration and can be directly parsed and placed in the connection configuration. If it is necessary to replace it, it can be replaced with "catalog-key"?
8809531
to
41fb584
Compare
d6b8d88
to
bd81c01
Compare
@JingsongLi The above issues have been resolved, please review again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Thanks @sunxiaojian and @zhongyujiang
Support jdbc catalog
Linked issue : #841