Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3595] migrate to Vert.x SQL Client API #3626

Merged
merged 1 commit into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -30,7 +30,8 @@
import io.agroal.api.security.SimplePassword;
import io.agroal.pool.DataSource;
import io.vertx.core.Vertx;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.core.json.JsonObject;
import io.vertx.jdbcclient.JDBCPool;

/**
* Configuration properties for a JDBC service.
Expand Down Expand Up @@ -175,14 +176,14 @@ public void setTableName(final String tableName) {
}

/**
* Creates a JDBC client for configuration properties.
* Creates a JDBC pool for configuration properties.
*
* @param vertx The vertx instance to use.
* @param dataSourceProperties The properties.
* @return The client.
* @return The JDBC pool.
* @throws IllegalArgumentException if any of the properties are invalid.
*/
public static JDBCClient dataSource(final Vertx vertx, final JdbcProperties dataSourceProperties) {
public static JDBCPool dataSource(final Vertx vertx, final JdbcProperties dataSourceProperties) {

log.info("Creating new SQL client for table: {}", dataSourceProperties.getTableName());

Expand Down Expand Up @@ -213,6 +214,11 @@ public static JDBCClient dataSource(final Vertx vertx, final JdbcProperties data
.principal(username)
.credential(password)));

return JDBCClient.create(vertx, new DataSource(configuration.get()));
return JDBCPool.pool(vertx, new DataSource(configuration.get()),
new JsonObject()
mattkaem marked this conversation as resolved.
Show resolved Hide resolved
.put("jdbcUrl", dataSourceProperties.getUrl())
.put("username", Optional.ofNullable(dataSourceProperties.getUsername()).orElse(""))
.put("database", "")
.put("maxPoolSize", dataSourceProperties.getMaximumPoolSize()));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -31,9 +31,9 @@
import io.vertx.core.Future;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.UpdateResult;
import io.vertx.jdbcclient.JDBCPool;

/**
* An abstract JDBC based data store.
Expand All @@ -45,7 +45,7 @@ public abstract class AbstractStore implements HealthCheckProvider, AutoCloseabl
*/
public static final String DEFAULT_CHECK_SQL = "SELECT 1";

private final JDBCClient client;
private final JDBCPool client;
private final Tracer tracer;

private final ExpandedStatement checkSql;
Expand All @@ -58,7 +58,7 @@ public abstract class AbstractStore implements HealthCheckProvider, AutoCloseabl
* @param checkSql An optional SQL statement, which will be used to check if the connection to the
* database is OK. It this value is empty, the default statement {@value #DEFAULT_CHECK_SQL} will be used.
*/
public AbstractStore(final JDBCClient client, final Tracer tracer, final Optional<Statement> checkSql) {
public AbstractStore(final JDBCPool client, final Tracer tracer, final Optional<Statement> checkSql) {
this.client = Objects.requireNonNull(client);
this.tracer = Objects.requireNonNull(tracer);
this.checkSql = checkSql.orElseGet(() -> Statement.statement(DEFAULT_CHECK_SQL)).expand();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -37,9 +37,8 @@
import io.opentracing.log.Fields;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.jdbcclient.JDBCPool;
import io.vertx.sqlclient.SqlConnection;

/**
* SQL helper methods.
Expand Down Expand Up @@ -84,54 +83,6 @@ public static <T> Future<T> translateException(final Throwable e) {
}
}

/**
* Enable auto-commit.
*
* @param tracer The tracer.
* @param context The span.
* @param connection The database connection to change.
* @param state The auto-commit state.
* @return A future for tracking the outcome.
*/
public static Future<SQLConnection> setAutoCommit(final Tracer tracer, final SpanContext context, final SQLConnection connection, final boolean state) {
final Span span = startSqlSpan(tracer, context, "set autocommit", builder -> {
builder.withTag("db.autocommit", state);
});
final Promise<Void> promise = Promise.promise();
connection.setAutoCommit(state, promise);
return finishSpan(promise.future().map(connection), span, null);
}

/**
* Perform commit operation.
*
* @param tracer The tracer.
* @param context The span.
* @param connection The database connection to work on.
* @return A future for tracking the outcome.
*/
public static Future<SQLConnection> commit(final Tracer tracer, final SpanContext context, final SQLConnection connection) {
final Span span = startSqlSpan(tracer, context, "commit", null);
final Promise<Void> promise = Promise.promise();
connection.commit(promise);
return finishSpan(promise.future().map(connection), span, null);
}

/**
* Perform rollback operation.
*
* @param tracer The tracer.
* @param context The span.
* @param connection The database connection to work on.
* @return A future for tracking the outcome.
*/
public static Future<SQLConnection> rollback(final Tracer tracer, final SpanContext context, final SQLConnection connection) {
final Span span = startSqlSpan(tracer, context, "rollback", null);
final Promise<Void> promise = Promise.promise();
connection.rollback(promise);
return finishSpan(promise.future().map(connection), span, null);
}

/**
* Start a new span for an SQL operation.
*
Expand Down Expand Up @@ -286,40 +237,22 @@ public static <T extends Throwable> boolean hasCauseOf(final Throwable e, final
* @param <T> The type of the result.
* @return A future, tracking the outcome of the operation.
*/
public static <T> Future<T> runTransactionally(final SQLClient client, final Tracer tracer, final SpanContext context, final BiFunction<SQLConnection, SpanContext, Future<T>> function) {
public static <T> Future<T> runTransactionally(final JDBCPool client, final Tracer tracer, final SpanContext context, final BiFunction<SqlConnection, SpanContext, Future<T>> function) {

final Span span = startSqlSpan(tracer, context, "run transactionally", builder -> {
});

final Promise<SQLConnection> promise = Promise.promise();
client.getConnection(promise);

return promise.future()

return client.withTransaction((connection) -> {
// log open connection
.onSuccess(x -> {
final Map<String, Object> log = new HashMap<>();
log.put(Fields.EVENT, "success");
log.put(Fields.MESSAGE, "connection opened");
span.log(log);
})

// disable autocommit, which is enabled by default
.flatMap(connection -> SQL.setAutoCommit(tracer, span.context(), connection, false)

// run code
.flatMap(y -> function.apply(connection, span.context())

// commit or rollback ... return original result
.compose(
v -> SQL.commit(tracer, span.context(), connection).map(v),
x -> SQL.rollback(tracer, span.context(), connection).flatMap(unused -> Future.failedFuture(x))))

// close the connection
.onComplete(x -> connection.close()))

.onComplete(x -> span.finish());

final Map<String, Object> log = new HashMap<>();
log.put(Fields.EVENT, "success");
log.put(Fields.MESSAGE, "connection opened");
span.log(log);

// execute function within a transaction
return function.apply(connection, span.context());
})
.onComplete(x -> span.finish());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLOperations;
import io.vertx.ext.sql.UpdateResult;
import io.vertx.jdbcclient.JDBCPool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.Tuple;

/**
* An SQL statement, which can map named parameters to positional parameters.
Expand Down Expand Up @@ -232,10 +234,6 @@ public Object[] getParameters() {
return this.parameters;
}

public JsonArray getParametersAsJson() {
return new JsonArray(Arrays.asList(this.parameters));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand All @@ -255,17 +253,6 @@ public ExpandedStatement trace(final Tracer tracer, final SpanContext spanContex
return new ExpandedStatement(this.sql, this.parameters, tracer, spanContext);
}

@FunctionalInterface
private interface Operation<T> {
void run(String sql, JsonArray params, Handler<AsyncResult<T>> handler);
}

private <T> Future<T> run(final Operation<T> operation) {
final Promise<T> promise = Promise.promise();
operation.run(this.sql, getParametersAsJson(), promise);
return promise.future();
}

/**
* Start a new span for this SQL statement.
* @return The newly created span.
Expand All @@ -280,30 +267,85 @@ public Span startSqlSpan() {
});
}

/**
* Execute this statement as a query.
* @param pool The connection pool to work on.
* @return A future tracking the query result.
*/
public Future<ResultSet> query(final JDBCPool pool) {
final Span sqlSpan = startSqlSpan();
return SQL.finishSpan(pool
.preparedQuery(this.sql)
.execute(Tuple.from(getParameters()))
.map(this::convertRowSetToResultSet), sqlSpan, (r, log) -> {
log.put("rows", r.getNumRows());
});
}

/**
* Execute this statement as a query.
* @param connection The connection to work on.
* @return A future tracking the query result.
*/
public Future<ResultSet> query(final SQLOperations connection) {
public Future<ResultSet> query(final SqlConnection connection) {
final Span sqlSpan = startSqlSpan();
return SQL.finishSpan(run(connection::queryWithParams), sqlSpan, (r, log) -> {
return SQL.finishSpan(connection
.preparedQuery(this.sql)
.execute(Tuple.from(getParameters()))
.map(this::convertRowSetToResultSet), sqlSpan, (r, log) -> {
log.put("rows", r.getNumRows());
});
}

/**
* Execute this statement as a update.
* @param pool The connection pool to work on.
* @return A future tracking the update result.
*/
public Future<UpdateResult> update(final JDBCPool pool) {
final Span sqlSpan = startSqlSpan();
return SQL.finishSpan(pool
.preparedQuery(this.sql)
.execute(Tuple.from(getParameters()))
.map(this::convertRowSetToUpdateResult), sqlSpan, (r, log) -> {
log.put("rows", r);
});
}

/**
* Execute this statement as a update.
* @param connection The connection to work on.
* @return A future tracking the update result.
*/
public Future<UpdateResult> update(final SQLOperations connection) {
public Future<UpdateResult> update(final SqlConnection connection) {
final Span sqlSpan = startSqlSpan();
return SQL.finishSpan(run(connection::updateWithParams), sqlSpan, (r, log) -> {
log.put("rows", r.getUpdated());
return SQL.finishSpan(connection
.preparedQuery(this.sql)
.execute(Tuple.from(getParameters()))
.map(this::convertRowSetToUpdateResult), sqlSpan, (r, log) -> {
log.put("rows", r);
});
}

private ResultSet convertRowSetToResultSet(final RowSet<Row> rows) {
final List<JsonArray> results = new ArrayList<>();
rows.forEach(row -> {
final JsonArray values = new JsonArray();
for (int index = 0; index < row.size(); ++index) {
values.add(row.getValue(index));
}
results.add(values);
});
return new ResultSet()
.setColumnNames(rows.columnsNames())
.setResults(results);
}

private UpdateResult convertRowSetToUpdateResult(final SqlResult<RowSet<Row>> sqlResult) {
return new UpdateResult()
.setUpdated(sqlResult.rowCount());
}

}

}
Loading
Loading