Skip to content

Commit

Permalink
[eclipse-hono#3595] migrate to Vert.x SQL Client API
Browse files Browse the repository at this point in the history
  • Loading branch information
harism committed Apr 14, 2024
1 parent dc78911 commit 9e31c6a
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 256 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -16,7 +16,9 @@
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,7 +32,7 @@
import io.agroal.api.security.SimplePassword;
import io.agroal.pool.DataSource;
import io.vertx.core.Vertx;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.jdbcclient.JDBCPool;

/**
* Configuration properties for a JDBC service.
Expand Down Expand Up @@ -175,14 +177,14 @@ public void setTableName(final String tableName) {
}

/**
* Creates a JDBC client for configuration properties.
* Creates a JDBC pool for configuration properties.
*
* @param vertx The vertx instance to use.
* @param dataSourceProperties The properties.
* @return The client.
* @return The JDBC pool.
* @throws IllegalArgumentException if any of the properties are invalid.
*/
public static JDBCClient dataSource(final Vertx vertx, final JdbcProperties dataSourceProperties) {
public static JDBCPool dataSource(final Vertx vertx, final JdbcProperties dataSourceProperties) {

log.info("Creating new SQL client for table: {}", dataSourceProperties.getTableName());

Expand Down Expand Up @@ -213,6 +215,12 @@ public static JDBCClient dataSource(final Vertx vertx, final JdbcProperties data
.principal(username)
.credential(password)));

return JDBCClient.create(vertx, new DataSource(configuration.get()));
return JDBCPool.pool(vertx, new DataSource(configuration.get()),
new JsonObject()
.put("jdbcUrl", dataSourceProperties.getUrl())
.put("username", dataSourceProperties.getUsername())
.put("database", "")
.put("datasourceName", UUID.randomUUID().toString())
.put("maxPoolSize", dataSourceProperties.getMaximumPoolSize()));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -31,9 +31,9 @@
import io.vertx.core.Future;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.UpdateResult;
import io.vertx.jdbcclient.JDBCPool;

/**
* An abstract JDBC based data store.
Expand All @@ -45,7 +45,7 @@ public abstract class AbstractStore implements HealthCheckProvider, AutoCloseabl
*/
public static final String DEFAULT_CHECK_SQL = "SELECT 1";

private final JDBCClient client;
private final JDBCPool client;
private final Tracer tracer;

private final ExpandedStatement checkSql;
Expand All @@ -58,7 +58,7 @@ public abstract class AbstractStore implements HealthCheckProvider, AutoCloseabl
* @param checkSql An optional SQL statement, which will be used to check if the connection to the
* database is OK. It this value is empty, the default statement {@value #DEFAULT_CHECK_SQL} will be used.
*/
public AbstractStore(final JDBCClient client, final Tracer tracer, final Optional<Statement> checkSql) {
public AbstractStore(final JDBCPool client, final Tracer tracer, final Optional<Statement> checkSql) {
this.client = Objects.requireNonNull(client);
this.tracer = Objects.requireNonNull(tracer);
this.checkSql = checkSql.orElseGet(() -> Statement.statement(DEFAULT_CHECK_SQL)).expand();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -37,9 +37,8 @@
import io.opentracing.log.Fields;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.jdbcclient.JDBCPool;
import io.vertx.sqlclient.SqlConnection;

/**
* SQL helper methods.
Expand Down Expand Up @@ -84,54 +83,6 @@ public static <T> Future<T> translateException(final Throwable e) {
}
}

/**
* Enable auto-commit.
*
* @param tracer The tracer.
* @param context The span.
* @param connection The database connection to change.
* @param state The auto-commit state.
* @return A future for tracking the outcome.
*/
public static Future<SQLConnection> setAutoCommit(final Tracer tracer, final SpanContext context, final SQLConnection connection, final boolean state) {
final Span span = startSqlSpan(tracer, context, "set autocommit", builder -> {
builder.withTag("db.autocommit", state);
});
final Promise<Void> promise = Promise.promise();
connection.setAutoCommit(state, promise);
return finishSpan(promise.future().map(connection), span, null);
}

/**
* Perform commit operation.
*
* @param tracer The tracer.
* @param context The span.
* @param connection The database connection to work on.
* @return A future for tracking the outcome.
*/
public static Future<SQLConnection> commit(final Tracer tracer, final SpanContext context, final SQLConnection connection) {
final Span span = startSqlSpan(tracer, context, "commit", null);
final Promise<Void> promise = Promise.promise();
connection.commit(promise);
return finishSpan(promise.future().map(connection), span, null);
}

/**
* Perform rollback operation.
*
* @param tracer The tracer.
* @param context The span.
* @param connection The database connection to work on.
* @return A future for tracking the outcome.
*/
public static Future<SQLConnection> rollback(final Tracer tracer, final SpanContext context, final SQLConnection connection) {
final Span span = startSqlSpan(tracer, context, "rollback", null);
final Promise<Void> promise = Promise.promise();
connection.rollback(promise);
return finishSpan(promise.future().map(connection), span, null);
}

/**
* Start a new span for an SQL operation.
*
Expand Down Expand Up @@ -286,40 +237,22 @@ public static <T extends Throwable> boolean hasCauseOf(final Throwable e, final
* @param <T> The type of the result.
* @return A future, tracking the outcome of the operation.
*/
public static <T> Future<T> runTransactionally(final SQLClient client, final Tracer tracer, final SpanContext context, final BiFunction<SQLConnection, SpanContext, Future<T>> function) {
public static <T> Future<T> runTransactionally(final JDBCPool client, final Tracer tracer, final SpanContext context, final BiFunction<SqlConnection, SpanContext, Future<T>> function) {

final Span span = startSqlSpan(tracer, context, "run transactionally", builder -> {
});

final Promise<SQLConnection> promise = Promise.promise();
client.getConnection(promise);

return promise.future()

return client.withTransaction((connection) -> {
// log open connection
.onSuccess(x -> {
final Map<String, Object> log = new HashMap<>();
log.put(Fields.EVENT, "success");
log.put(Fields.MESSAGE, "connection opened");
span.log(log);
})

// disable autocommit, which is enabled by default
.flatMap(connection -> SQL.setAutoCommit(tracer, span.context(), connection, false)

// run code
.flatMap(y -> function.apply(connection, span.context())

// commit or rollback ... return original result
.compose(
v -> SQL.commit(tracer, span.context(), connection).map(v),
x -> SQL.rollback(tracer, span.context(), connection).flatMap(unused -> Future.failedFuture(x))))

// close the connection
.onComplete(x -> connection.close()))

.onComplete(x -> span.finish());

final Map<String, Object> log = new HashMap<>();
log.put(Fields.EVENT, "success");
log.put(Fields.MESSAGE, "connection opened");
span.log(log);

// execute function within a transaction
return function.apply(connection, span.context());
})
.onComplete(x -> span.finish());
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -36,14 +36,16 @@
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLOperations;
import io.vertx.ext.sql.UpdateResult;
import io.vertx.jdbcclient.JDBCPool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.Tuple;

/**
* An SQL statement, which can map named parameters to positional parameters.
Expand Down Expand Up @@ -232,10 +234,6 @@ public Object[] getParameters() {
return this.parameters;
}

public JsonArray getParametersAsJson() {
return new JsonArray(Arrays.asList(this.parameters));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand All @@ -255,17 +253,6 @@ public ExpandedStatement trace(final Tracer tracer, final SpanContext spanContex
return new ExpandedStatement(this.sql, this.parameters, tracer, spanContext);
}

@FunctionalInterface
private interface Operation<T> {
void run(String sql, JsonArray params, Handler<AsyncResult<T>> handler);
}

private <T> Future<T> run(final Operation<T> operation) {
final Promise<T> promise = Promise.promise();
operation.run(this.sql, getParametersAsJson(), promise);
return promise.future();
}

/**
* Start a new span for this SQL statement.
* @return The newly created span.
Expand All @@ -280,30 +267,85 @@ public Span startSqlSpan() {
});
}

/**
* Execute this statement as a query.
* @param pool The connection pool to work on.
* @return A future tracking the query result.
*/
public Future<ResultSet> query(final JDBCPool pool) {
final Span sqlSpan = startSqlSpan();
return SQL.finishSpan(pool
.preparedQuery(this.sql)
.execute(Tuple.from(getParameters()))
.map(this::convertRowSetToResultSet), sqlSpan, (r, log) -> {
log.put("rows", r.getNumRows());
});
}

/**
* Execute this statement as a query.
* @param connection The connection to work on.
* @return A future tracking the query result.
*/
public Future<ResultSet> query(final SQLOperations connection) {
public Future<ResultSet> query(final SqlConnection connection) {
final Span sqlSpan = startSqlSpan();
return SQL.finishSpan(run(connection::queryWithParams), sqlSpan, (r, log) -> {
return SQL.finishSpan(connection
.preparedQuery(this.sql)
.execute(Tuple.from(getParameters()))
.map(this::convertRowSetToResultSet), sqlSpan, (r, log) -> {
log.put("rows", r.getNumRows());
});
}

/**
* Execute this statement as a update.
* @param pool The connection pool to work on.
* @return A future tracking the update result.
*/
public Future<UpdateResult> update(final JDBCPool pool) {
final Span sqlSpan = startSqlSpan();
return SQL.finishSpan(pool
.preparedQuery(this.sql)
.execute(Tuple.from(getParameters()))
.map(this::convertRowSetToUpdateResult), sqlSpan, (r, log) -> {
log.put("rows", r);
});
}

/**
* Execute this statement as a update.
* @param connection The connection to work on.
* @return A future tracking the update result.
*/
public Future<UpdateResult> update(final SQLOperations connection) {
public Future<UpdateResult> update(final SqlConnection connection) {
final Span sqlSpan = startSqlSpan();
return SQL.finishSpan(run(connection::updateWithParams), sqlSpan, (r, log) -> {
log.put("rows", r.getUpdated());
return SQL.finishSpan(connection
.preparedQuery(this.sql)
.execute(Tuple.from(getParameters()))
.map(this::convertRowSetToUpdateResult), sqlSpan, (r, log) -> {
log.put("rows", r);
});
}

private ResultSet convertRowSetToResultSet(final RowSet<Row> rows) {
final List<JsonArray> results = new ArrayList<>();
rows.forEach(row -> {
final JsonArray values = new JsonArray();
for (int index = 0; index < row.size(); ++index) {
values.add(row.getValue(index));
}
results.add(values);
});
return new ResultSet()
.setColumnNames(rows.columnsNames())
.setResults(results);
}

private UpdateResult convertRowSetToUpdateResult(final SqlResult<RowSet<Row>> sqlResult) {
return new UpdateResult()
.setUpdated(sqlResult.rowCount());
}

}

}
Loading

0 comments on commit 9e31c6a

Please sign in to comment.