Skip to content

Commit

Permalink
rewrite distributed lock & fixed doc
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 1, 2024
1 parent 5c1fa42 commit 9d41884
Show file tree
Hide file tree
Showing 15 changed files with 397 additions and 437 deletions.
147 changes: 0 additions & 147 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,151 +176,6 @@ Using the table option facilitates the convenient definition of Hive table param
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table.
For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process.
## Creating a Catalog with Filesystem Metastore
{{< tabs "filesystem-metastore-example" >}}
{{< tab "Flink" >}}
The following Flink SQL registers and uses a Paimon catalog named `my_catalog`. Metadata and table files are stored under `hdfs:///path/to/warehouse`.
```sql
CREATE CATALOG my_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs:///path/to/warehouse'
);
USE CATALOG my_catalog;
```
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
{{< /tab >}}
{{< tab "Spark3" >}}
The following shell command registers a paimon catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`.
```bash
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse
```
You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog.
After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL.
```sql
USE paimon.default;
```
{{< /tab >}}
{{< /tabs >}}
## Creating a Catalog with Hive Metastore
By using Paimon Hive catalog, changes to the catalog will directly affect the corresponding Hive metastore. Tables created in such catalog can also be accessed directly from Hive.
To use Hive catalog, Database name, Table name and Field names should be **lower** case.
{{< tabs "hive-metastore-example" >}}
{{< tab "Flink" >}}
Paimon Hive catalog in Flink relies on Flink Hive connector bundled jar. You should first download Hive connector bundled jar and add it to classpath.
| Metastore version | Bundle Name | SQL Client JAR |
|:------------------|:--------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.3.0 - 3.1.3 | Flink Bundle | [Download](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/overview/#using-bundled-hive-jar) |
| 1.2.0 - x.x.x | Presto Bundle | [Download](https://repo.maven.apache.org/maven2/com/facebook/presto/hive/hive-apache/1.2.2-2/hive-apache-1.2.2-2.jar) |
The following Flink SQL registers and uses a Paimon Hive catalog named `my_hive`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore.
If your Hive requires security authentication such as Kerberos, LDAP, Ranger or you want the paimon table to be managed
by Apache Atlas(Setting 'hive.metastore.event.listeners' in hive-site.xml). You can specify the hive-conf-dir and
hadoop-conf-dir parameter to the hive-site.xml file path.
```sql
CREATE CATALOG my_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
-- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
-- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
-- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
-- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
);
USE CATALOG my_hive;
```
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}).
{{< /tab >}}
{{< tab "Spark3" >}}
Your Spark installation should be able to detect, or already contains Hive dependencies. See [here](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html) for more information.
The following shell command registers a Paimon Hive catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore.
```bash
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
--conf spark.sql.catalog.paimon.metastore=hive \
--conf spark.sql.catalog.paimon.uri=thrift://<hive-metastore-host-name>:<port>
```
You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog.
After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL.
```sql
USE paimon.default;
```
Also, you can create [SparkGenericCatalog]({{< ref "engines/spark" >}}).
{{< /tab >}}
{{< /tabs >}}
> When using hive catalog to change incompatible column types through alter table, you need to configure `hive.metastore.disallow.incompatible.col.type.changes=false`. see [HIVE-17832](https://issues.apache.org/jira/browse/HIVE-17832).
> If you are using Hive3, please disable Hive ACID:
>
> ```shell
> hive.strict.managed.tables=false
> hive.create.as.insert.only=false
> metastore.create.as.acid=false
> ```
### Setting Location in Properties
If you are using an object storage , and you don't want that the location of paimon table/database is accessed by the filesystem of hive,
which may lead to the error such as "No FileSystem for scheme: s3a".
You can set location in the properties of table/database by the config of `location-in-properties`. See
[setting the location of table/database in properties ]({{< ref "maintenance/configurations#HiveCatalogOptions" >}})
### Synchronizing Partitions into Hive Metastore
By default, Paimon does not synchronize newly created partitions into Hive metastore. Users will see an unpartitioned table in Hive. Partition push-down will be carried out by filter push-down instead.
If you want to see a partitioned table in Hive and also synchronize newly created partitions into Hive metastore, please set the table property `metastore.partitioned-table` to true. Also see [CoreOptions]({{< ref "maintenance/configurations#CoreOptions" >}}).
### Adding Parameters to a Hive Table
Using the table option facilitates the convenient definition of Hive table parameters.
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table.
For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process.
## Creating a Catalog with JDBC Metastore
By using the Paimon JDBC catalog, changes to the catalog will be directly stored in relational databases such as MySQL, postgres, etc.
Expand Down Expand Up @@ -353,8 +208,6 @@ You can define any connection parameters for a database with the prefix "jdbc.".
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}).

{{< /tab >}}
{{< /tabs >}}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>catalog-name</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Custom jdbc catalog name.</td>
</tr>
<tr>
<td><h5>initialize-catalog-tables</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon;

/** Define client connection pool. */
/** Source: [core/src/main/java/org/apache/iceberg/ClientPool.java]. */
public interface ClientPool<C, E extends Exception> {
/** Action interface for client. */
interface Action<R, C, E extends Exception> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import static org.apache.paimon.utils.Preconditions.checkState;

/** A universal implementation for defining client connection pools. */
/** Source: [core/src/main/java/org/apache/iceberg/ClientPoolImpl.java]. */
public abstract class ClientPoolImpl<C, E extends Exception>
implements Closeable, ClientPool<C, E> {
private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);
Expand Down Expand Up @@ -59,7 +59,6 @@ public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedExc
C client = get();
try {
return action.run(client);

} catch (Exception exc) {
if (retry && isConnectionException(exc)) {
try {
Expand Down Expand Up @@ -104,13 +103,12 @@ public void close() {
}
}
if (clients.isEmpty() && currentSize > 0) {
// wake every second in case this missed the signal
synchronized (signal) {
// wake every second in case this missed the signal
signal.wait(1000);
}
}
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while shutting down pool. Some clients may not be closed.", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/** Jdbc distributed lock interface. */
public abstract class AbstractDistributedLockDialect implements JdbcDistributedLockDialect {

@Override
public void createTable(JdbcClientPool connections) throws SQLException, InterruptedException {
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists =
dbMeta.getTables(
null, null, JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME, null);
if (tableExists.next()) {
return true;
}
return conn.prepareStatement(getCreateTableSql()).execute();
});
}

public abstract String getCreateTableSql();

@Override
public boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds)
throws SQLException, InterruptedException {
return connections.run(
connection -> {
try (PreparedStatement preparedStatement =
connection.prepareStatement(getLockAcquireSql())) {
preparedStatement.setString(1, lockId);
preparedStatement.setLong(2, timeoutMillSeconds / 1000);
return preparedStatement.executeUpdate() > 0;
} catch (SQLException ex) {
return false;
}
});
}

public abstract String getLockAcquireSql();

@Override
public boolean releaseLock(JdbcClientPool connections, String lockId)
throws SQLException, InterruptedException {
return connections.run(
connection -> {
try (PreparedStatement preparedStatement =
connection.prepareStatement(getReleaseLockSql())) {
preparedStatement.setString(1, lockId);
return preparedStatement.executeUpdate() > 0;
}
});
}

public abstract String getReleaseLockSql();

@Override
public int tryReleaseTimedOutLock(JdbcClientPool connections, String lockId)
throws SQLException, InterruptedException {
return connections.run(
connection -> {
try (PreparedStatement preparedStatement =
connection.prepareStatement(getTryReleaseTimedOutLock())) {
preparedStatement.setString(1, lockId);
return preparedStatement.executeUpdate();
}
});
}

public abstract String getTryReleaseTimedOutLock();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

class DistributedLockDialectFactory {
static JdbcDistributedLockDialect create(String protocol) {
JdbcProtocol type = JdbcProtocol.valueOf(protocol.toUpperCase());
switch (type) {
case SQLITE:
return new SqlLiteDistributedLockDialect();
case MYSQL:
return new MysqlDistributedLockDialect();
default:
throw new UnsupportedOperationException(
String.format("Distributed locks based on %s are not supported", protocol));
}
}

/** Supported jdbc protocol. */
enum JdbcProtocol {
SQLITE,
// for mysql.
MARIADB,
MYSQL;
}
}
Loading

0 comments on commit 9d41884

Please sign in to comment.