Skip to content

Commit

Permalink
fixed doc
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 4, 2024
1 parent 3df2995 commit 8c2e10d
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 118 deletions.
7 changes: 2 additions & 5 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,15 @@ CREATE CATALOG my_jdbc WITH (
'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>',
'jdbc.user' = '...',
'jdbc.password' = '...',
'catalog-name'='jdbc'
'initialize-catalog-tables'='true'
'store-key'='jdbc',
'warehouse' = 'hdfs:///path/to/warehouse'
);
USE CATALOG my_jdbc;
```
You can define any connection parameters for a database with the prefix "jdbc.".
You can define the "initialize-catalog-tables" configuration to automatically create tables required for the initial JdbcCatalog. If it is true, tables will be automatically created when initializing JdbcCatalog. If it is false, tables will not be automatically created and you must manually create them.
You can also customize the name of the created jdbc catalog through 'catalog-name'.
You can also perform logical isolation for databases under multiple catalogs by specifying "store-key".
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,10 @@
</thead>
<tbody>
<tr>
<td><h5>catalog-name</h5></td>
<td><h5>store-key</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Custom jdbc catalog name.</td>
</tr>
<tr>
<td><h5>initialize-catalog-tables</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enable automatic table creation.</td>
<td>Custom jdbc catalog store key.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon;
package org.apache.paimon.client;

/** Source: [core/src/main/java/org/apache/iceberg/ClientPool.java]. */
public interface ClientPool<C, E extends Exception> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon;
package org.apache.paimon.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
103 changes: 48 additions & 55 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout;
import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
import static org.apache.paimon.jdbc.JdbcCatalogOptions.INITIALIZE_CATALOG_TABLES;
import static org.apache.paimon.jdbc.JdbcUtils.execute;
import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
Expand All @@ -66,15 +65,15 @@ public class JdbcCatalog extends AbstractCatalog {
public static final String PROPERTY_PREFIX = "jdbc.";
private static final String DATABASE_EXISTS_PROPERTY = "exists";
private JdbcClientPool connections;
private String catalogName = "jdbc";
private String storeKey = "jdbc";
private Map<String, String> configuration;
private final String warehouse;

protected JdbcCatalog(
FileIO fileIO, String catalogName, Map<String, String> config, String warehouse) {
FileIO fileIO, String storeKey, Map<String, String> config, String warehouse) {
super(fileIO);
if (!StringUtils.isBlank(catalogName)) {
this.catalogName = catalogName;
if (!StringUtils.isBlank(storeKey)) {
this.storeKey = storeKey;
}
this.configuration = config;
this.warehouse = warehouse;
Expand Down Expand Up @@ -103,42 +102,37 @@ public JdbcClientPool getConnections() {

/** Initialize catalog tables. */
private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedException {
boolean initializeCatalogTables =
Boolean.parseBoolean(
configuration.getOrDefault(INITIALIZE_CATALOG_TABLES.key(), "false"));
String uri = configuration.get(CatalogOptions.URI.key());
Preconditions.checkNotNull(uri, "JDBC connection URI is required");
if (initializeCatalogTables) {
// Check and create catalog table.
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists =
dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null);
if (tableExists.next()) {
return true;
}
return conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute();
});

// Check and create database properties table.
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists =
dbMeta.getTables(
null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null);
if (tableExists.next()) {
return true;
}
return conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE)
.execute();
});

// if lock enabled, Check and create distributed lock table.
if (lockEnabled()) {
JdbcUtils.createDistributedLockTable(connections);
}
// Check and create catalog table.
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists =
dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null);
if (tableExists.next()) {
return true;
}
return conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute();
});

// Check and create database properties table.
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists =
dbMeta.getTables(
null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null);
if (tableExists.next()) {
return true;
}
return conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE)
.execute();
});

// if lock enabled, Check and create distributed lock table.
if (lockEnabled()) {
JdbcUtils.createDistributedLockTable(connections);
}
}

Expand All @@ -154,19 +148,19 @@ public List<String> listDatabases() {
fetch(
row -> row.getString(JdbcUtils.TABLE_DATABASE),
JdbcUtils.LIST_ALL_TABLE_DATABASES_SQL,
catalogName));
storeKey));

databases.addAll(
fetch(
row -> row.getString(JdbcUtils.DATABASE_NAME),
JdbcUtils.LIST_ALL_PROPERTY_DATABASES_SQL,
catalogName));
storeKey));
return databases;
}

@Override
protected boolean databaseExistsImpl(String databaseName) {
return JdbcUtils.databaseExists(connections, catalogName, databaseName);
return JdbcUtils.databaseExists(connections, storeKey, databaseName);
}

@Override
Expand Down Expand Up @@ -199,15 +193,15 @@ protected void createDatabaseImpl(String name, Map<String, String> properties) {
Path databasePath = newDatabasePath(name);
createProps.put(DB_LOCATION_PROP, databasePath.toString());
}
insertProperties(connections, catalogName, name, createProps);
insertProperties(connections, storeKey, name, createProps);
}

@Override
protected void dropDatabaseImpl(String name) {
// Delete table from paimon_tables
execute(connections, JdbcUtils.DELETE_TABLES_SQL, catalogName, name);
execute(connections, JdbcUtils.DELETE_TABLES_SQL, storeKey, name);
// Delete properties from paimon_database_properties
execute(connections, JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL, catalogName, name);
execute(connections, JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL, storeKey, name);
}

@Override
Expand All @@ -218,7 +212,7 @@ protected List<String> listTablesImpl(String databaseName) {
return fetch(
row -> row.getString(JdbcUtils.TABLE_NAME),
JdbcUtils.LIST_TABLES_SQL,
catalogName,
storeKey,
databaseName);
}

Expand All @@ -229,7 +223,7 @@ protected void dropTableImpl(Identifier identifier) {
execute(
connections,
JdbcUtils.DROP_TABLE_SQL,
catalogName,
storeKey,
identifier.getDatabaseName(),
identifier.getObjectName());

Expand Down Expand Up @@ -263,7 +257,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
try (PreparedStatement sql =
conn.prepareStatement(
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
sql.setString(1, catalogName);
sql.setString(1, storeKey);
sql.setString(2, identifier.getDatabaseName());
sql.setString(3, identifier.getObjectName());
return sql.executeUpdate();
Expand All @@ -280,7 +274,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
throw new RuntimeException(
String.format(
"Failed to create table %s in catalog %s",
identifier.getFullName(), catalogName));
identifier.getFullName(), storeKey));
}
} catch (Exception e) {
throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
Expand All @@ -291,7 +285,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
try {
// update table metadata info
updateTable(connections, catalogName, fromTable, toTable);
updateTable(connections, storeKey, fromTable, toTable);

Path fromPath = getDataTableLocation(fromTable);
if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
Expand Down Expand Up @@ -342,7 +336,7 @@ public boolean tableExists(Identifier identifier) {
return super.tableExists(identifier);
}
return JdbcUtils.tableExists(
connections, catalogName, identifier.getDatabaseName(), identifier.getObjectName());
connections, storeKey, identifier.getDatabaseName(), identifier.getObjectName());
}

@Override
Expand All @@ -353,8 +347,7 @@ public boolean caseSensitive() {
@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled()
? Optional.of(
JdbcCatalogLock.createFactory(connections, catalogName, configuration))
? Optional.of(JdbcCatalogLock.createFactory(connections, storeKey, configuration))
: Optional.empty();
}

Expand All @@ -371,7 +364,7 @@ private Lock lock(Identifier identifier) {
JdbcCatalogLock lock =
new JdbcCatalogLock(
connections,
catalogName,
storeKey,
checkMaxSleep(configuration),
acquireTimeout(configuration));
return Lock.fromCatalog(lock, identifier);
Expand Down Expand Up @@ -400,7 +393,7 @@ private Map<String, String> fetchProperties(String databaseName) {
row.getString(JdbcUtils.DATABASE_PROPERTY_KEY),
row.getString(JdbcUtils.DATABASE_PROPERTY_VALUE)),
JdbcUtils.GET_ALL_DATABASE_PROPERTIES_SQL,
catalogName,
storeKey,
databaseName);
return ImmutableMap.<String, String>builder().putAll(entries).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public String identifier() {

@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
String catalogName = context.options().get(JdbcCatalogOptions.CATALOG_NAME);
return new JdbcCatalog(fileIO, catalogName, context.options().toMap(), warehouse.getName());
String storeKey = context.options().get(JdbcCatalogOptions.STORE_KEY);
return new JdbcCatalog(fileIO, storeKey, context.options().toMap(), warehouse.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,12 @@

/** Options for jdbc catalog. */
public final class JdbcCatalogOptions {
public static final ConfigOption<Boolean> INITIALIZE_CATALOG_TABLES =
ConfigOptions.key("initialize-catalog-tables")
.booleanType()
.defaultValue(false)
.withDescription("Enable automatic table creation.");

public static final ConfigOption<String> CATALOG_NAME =
ConfigOptions.key("catalog-name")
public static final ConfigOption<String> STORE_KEY =
ConfigOptions.key("store-key")
.stringType()
.defaultValue(null)
.withDescription("Custom jdbc catalog name.");
.withDescription("Custom jdbc catalog store key.");

private JdbcCatalogOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.jdbc;

import org.apache.paimon.ClientPoolImpl;
import org.apache.paimon.client.ClientPoolImpl;

import java.sql.Connection;
import java.sql.DriverManager;
Expand Down
Loading

0 comments on commit 8c2e10d

Please sign in to comment.