Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][Feature] Support common jdbc catalog lock for filesystem catalog #3116

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 54 additions & 46 deletions docs/content/flink/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: "SQL DDL"
weight: 2
type: docs
aliases:
- /flink/sql-ddl.html
- /flink/sql-ddl.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand Down Expand Up @@ -51,6 +51,14 @@ USE CATALOG my_catalog;

You can define any default table options with the prefix `table-default.` for tables created in the catalog.

The FileSystem catalog supports jdbc lock and can take effect through the following configuration:

> ```shell
> 'lock.uri' = 'jdbc:mysql://<host>:<port>/<databaseName>'
> 'lock.jdbc.user' = '...',
> 'lock.jdbc.password' = '...',
> ```

### Creating Hive Catalog

By using Paimon Hive catalog, changes to the catalog will directly affect the corresponding Hive metastore. Tables created in such catalog can also be accessed directly from Hive.
Expand All @@ -68,7 +76,7 @@ The following Flink SQL registers and uses a Paimon Hive catalog named `my_hive`

If your Hive requires security authentication such as Kerberos, LDAP, Ranger or you want the paimon table to be managed
by Apache Atlas(Setting 'hive.metastore.event.listeners' in hive-site.xml). You can specify the hive-conf-dir and
hadoop-conf-dir parameter to the hive-site.xml file path.
hadoop-conf-dir parameter to the hive-site.xml file path.

```sql
CREATE CATALOG my_hive WITH (
Expand Down Expand Up @@ -105,8 +113,8 @@ If you want to see a partitioned table in Hive and also synchronize newly create

#### Adding Parameters to a Hive Table

Using the table option facilitates the convenient definition of Hive table parameters.
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table.
Using the table option facilitates the convenient definition of Hive table parameters.
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table.
For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process.

#### Setting Location in Properties
Expand Down Expand Up @@ -155,30 +163,30 @@ You can define any default table options with the prefix `table-default.` for ta
After use Paimon catalog, you can create and drop tables. Tables created in Paimon Catalogs are managed by the catalog.
When the table is dropped from catalog, its table files will also be deleted.

The following SQL assumes that you have registered and are using a Paimon catalog. It creates a managed table named
The following SQL assumes that you have registered and are using a Paimon catalog. It creates a managed table named
`my_table` with five columns in the catalog's `default` database, where `dt`, `hh` and `user_id` are the primary keys.

```sql
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
```

You can create partitioned table:

```sql
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
```

Expand Down Expand Up @@ -220,48 +228,48 @@ We can specify the primary key or partition when use `CREATE TABLE AS SELECT`, f
/* For streaming mode, you need to enable the checkpoint. */

CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE my_table_as AS SELECT * FROM my_table;

/* partitioned table */
CREATE TABLE my_table_partition (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE my_table_partition_as WITH ('partition' = 'dt') AS SELECT * FROM my_table_partition;

/* change options */
CREATE TABLE my_table_options (
user_id BIGINT,
item_id BIGINT
user_id BIGINT,
item_id BIGINT
) WITH ('file.format' = 'orc');
CREATE TABLE my_table_options_as WITH ('file.format' = 'parquet') AS SELECT * FROM my_table_options;

/* primary key */
CREATE TABLE my_table_pk (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
CREATE TABLE my_table_pk_as WITH ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_pk;


/* primary key + partition */
CREATE TABLE my_table_all (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
CREATE TABLE my_table_all_as WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM my_table_all;
```
Expand All @@ -272,12 +280,12 @@ To create a table with the same schema, partition, and table properties as anoth

```sql
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

CREATE TABLE my_table_like LIKE my_table;
Expand Down Expand Up @@ -315,4 +323,4 @@ CREATE TEMPORARY TABLE temp_table (
);

SELECT my_table.k, my_table.v, temp_table.v FROM my_table JOIN temp_table ON my_table.k = temp_table.k;
```
```
8 changes: 7 additions & 1 deletion docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,15 @@ spark-sql ... \
```

You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog.
The FileSystem catalog supports jdbc lock and can take effect through the following configuration:

After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL.
> ```shell
> 'lock.uri' = 'jdbc:mysql://<host>:<port>/<databaseName>'
> 'lock.jdbc.user' = '...',
> 'lock.jdbc.password' = '...',
> ```

After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL.
```sql
USE paimon.default;
```
Expand Down
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>client-pool-cache-eviction-interval-ms</h5></td>
<td style="word-wrap: break-word;">300000</td>
<td>Long</td>
<td>Client pool cache eviction interval ms.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down Expand Up @@ -66,7 +72,7 @@
<td><h5>lock.type</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The Lock Type for Catalog, such as 'hive', 'zookeeper'.</td>
<td>The Lock Type for Catalog, such as 'hive', 'zookeeper', 'jdbc'.</td>
</tr>
<tr>
<td><h5>metastore</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class CatalogOptions {
ConfigOptions.key("lock.type")
.stringType()
.noDefaultValue()
.withDescription("The Lock Type for Catalog, such as 'hive', 'zookeeper'.");
.withDescription(
"The Lock Type for Catalog, such as 'hive', 'zookeeper', 'jdbc'.");

public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
key("lock-check-max-sleep")
Expand Down Expand Up @@ -110,4 +111,10 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<Long> CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS =
ConfigOptions.key("client-pool-cache-eviction-interval-ms")
.longType()
.defaultValue(5 * 60 * 1000L)
.withDescription("Client pool cache eviction interval ms.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand All @@ -62,6 +63,7 @@ public abstract class AbstractCatalog implements Catalog {
public static final String DB_SUFFIX = ".db";
protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
protected static final String DB_LOCATION_PROP = "location";
protected static final String LOCK_PROP_PREFIX = "lock.";

protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
Expand Down Expand Up @@ -115,6 +117,31 @@ public Optional<CatalogLockFactory> defaultLockFactory() {
return Optional.empty();
}

@Override
public Optional<CatalogLockContextFactory> lockContextFactory() {
String lock = catalogOptions.get(LOCK_TYPE);
if (lock == null) {
return Optional.empty();
}
return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(),
CatalogLockContextFactory.class,
lock));
}

public Options extractLockConfiguration(Map<String, String> properties) {
Map<String, String> result = new HashMap<>();
result.put(METASTORE.key(), properties.get(METASTORE.key()));
properties.forEach(
(key, value) -> {
if (key.startsWith(LOCK_PROP_PREFIX)) {
result.put(key.substring(LOCK_PROP_PREFIX.length()), value);
}
});
return Options.fromMap(result);
}

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public interface Catalog extends AutoCloseable {
*/
Optional<CatalogLockFactory> lockFactory();

/** Get lock context factory for lock factory to create a lock. */
default Optional<CatalogLockContextFactory> lockContextFactory() {
return Optional.empty();
}

/** Get lock context for lock factory to create a lock. */
default Optional<CatalogLockContext> lockContext() {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.factories.Factory;
import org.apache.paimon.options.Options;

import java.io.Serializable;

/** Context for lock context factory to create lock. */
public interface CatalogLockContextFactory extends Factory, Serializable {
CatalogLockContext createLockContext(Options lockOptions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
Expand Down Expand Up @@ -199,4 +200,16 @@ public String warehouse() {
public boolean caseSensitive() {
return catalogOptions.get(CASE_SENSITIVE);
}

@Override
public Optional<CatalogLockContext> lockContext() {
Optional<CatalogLockContextFactory> catalogLockContextFactory = lockContextFactory();
if (!catalogLockContextFactory.isPresent()) {
return super.lockContext();
}
return catalogLockContextFactory.map(
factory ->
factory.createLockContext(
extractLockConfiguration(catalogOptions.toMap())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.table.TableType;

import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;

/** Factory to create {@link FileSystemCatalog}. */
Expand All @@ -44,6 +46,10 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
"Only managed table is supported in File system catalog.");
}

if (context.options().get(LOCK_ENABLED) && context.options().get(LOCK_TYPE) == null) {
throw new IllegalArgumentException("Please configure the lock type correctly.");
}

Catalog catalog = new FileSystemCatalog(fileIO, warehouse, context.options());

PrivilegeManager privilegeManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public Optional<CatalogLockFactory> defaultLockFactory() {

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options));
return Optional.of(new JdbcCatalogLockContext(connections, options));
}

private Lock lock(Identifier identifier) {
Expand Down
Loading
Loading