Skip to content

Commit

Permalink
Add ability to set dataset project id
Browse files Browse the repository at this point in the history
This CL adds the ability to specify a dataset project id as part of
the dataset field in the connection string used to create a
BQConnection instance. Previously, we used the project defined in the
URL's path component for both the billing and default dataset
projects. Recall that the default project and dataset are used to
disambiguate unqualified table names referenced in a query being
processed by a connection.

Now, we parse the potentially fully qualified dataset to check for a
project id. If no project id is found, we revert back to our former
behavior, using the billing project as the default dataset project.
  • Loading branch information
mmotoyama committed Nov 27, 2024
1 parent 3518182 commit 0762d72
Show file tree
Hide file tree
Showing 8 changed files with 680 additions and 25 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ For instance for a _service account_ in a properties file:

```ini
projectid=disco-parsec-659
dataset=publicdata.samples
type=service
user[email protected]
password=bigquery_credentials.p12
Expand Down
90 changes: 85 additions & 5 deletions src/main/java/net/starschema/clouddb/jdbc/BQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.api.client.http.HttpTransport;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.common.base.Splitter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
Expand All @@ -42,6 +43,13 @@
* @author Gunics Balázs, Horváth Attila
*/
public class BQConnection implements Connection {

// We permit using either a "." or ":" as the delimiter between the dataset and project ids.
private static final String PROJECT_DELIMITERS = ":.";
// The following regex uses a lookahead to match the last occurrence of a project delimiter.
private static final String LAST_PROJECT_DELIMITER_REGEX =
"[" + PROJECT_DELIMITERS + "](?=[^" + PROJECT_DELIMITERS + "]*$)";

/** Variable to store auto commit mode */
private boolean autoCommitEnabled = false;

Expand All @@ -50,10 +58,19 @@ public class BQConnection implements Connection {
/** The bigquery client to access the service. */
private Bigquery bigquery = null;

/** The default dataset id to configure on queries processed by this connection. */
private String dataset = null;

/** The ProjectId for the connection */
private String projectId = null;
/**
* The default dataset project id to configure on queries processed by this connection.
*
* <p>We follow the same naming convention as the corresponding system variable <a
* href="https://cloud.google.com/bigquery/docs/reference/system-variables">@@dataset_project_id</a>.
*/
private String datasetProjectId = null;

/** The ProjectId to use for billing on queries processed by this connection. */
private final String projectId;

/** Boolean to determine if the Connection is closed */
private boolean isclosed = false;
Expand Down Expand Up @@ -152,7 +169,7 @@ public BQConnection(String url, Properties loginProp, HttpTransport httpTranspor

if (matchData.find()) {
this.projectId = CatalogName.toProjectId(matchData.group(1));
this.dataset = matchData.group(2);
configureDataSet(matchData.group(2));
} else {
this.projectId = CatalogName.toProjectId(pathParams);
}
Expand Down Expand Up @@ -415,6 +432,19 @@ public void close() throws SQLException {
}
}

/**
* Parses the input dataset expression and sets the values for the dataset and datasetProjectId
* instance variables.
*/
private void configureDataSet(String datasetExpr) {
DatasetReference datasetRef = parseDatasetRef(datasetExpr, this.projectId);
this.dataset = datasetRef.getDatasetId();
this.datasetProjectId = datasetRef.getProjectId();
}

/**
* Returns the default dataset that should be configured on queries processed by this connection.
*/
public String getDataSet() {
return this.dataset;
}
Expand Down Expand Up @@ -577,7 +607,7 @@ public Struct createStruct(String typeName, Object[] attributes) throws SQLExcep

@Override
public void setSchema(String schema) {
this.dataset = schema;
configureDataSet(schema);
}

@Override
Expand Down Expand Up @@ -694,11 +724,19 @@ public DatabaseMetaData getMetaData() throws SQLException {
return metadata;
}

/** Getter method for projectId */
/** Getter method for the projectId to use for billing. */
public String getProjectId() {
return projectId;
}

/**
* Returns the default dataset project id that should be configured on queries processed by this
* connection.
*/
public String getDataSetProjectId() {
return this.datasetProjectId;
}

/**
*
*
Expand Down Expand Up @@ -1258,4 +1296,46 @@ public Integer getTimeoutMs() {
public JobCreationMode getJobCreationMode() {
return jobCreationMode;
}

/**
* Returns a DatasetReference extracted from the input dataset expression, which may optionally
* include a project id reference.
*
* <p>This method parses the dataset expression into discrete components, using either a dot ('.')
* or colon (':') as the delimiter between the dataset and project identifiers. We split the
* string on the last occurrence of either delimiter, and we use the length of the split array to
* determine whether the input contains a project id reference. If the {@code datasetExpr} is
* null, we return a DatasetReference with its project id set to the {@code defaultProjectId}
* argument and its dataset id set to null.
*
* <p>We don't perform any validation on the result; while there are well-defined rules regarding
* <a href="https://cloud.google.com/resource-manager/docs/creating-managing-projects">project
* ids</a> and <a href="https://cloud.google.com/bigquery/docs/datasets#dataset-naming">dataset
* names</a>, we must handle references to both that don't adhere to the documented requirements
* (such as having a domain name with a colon as part of the project id). Rather than deal with
* the various corner cases for each type of identifier, we defer to the BigQuery API to validate
* the default dataset configured on queries.
*
* <p>Visible for testing.
*
* @param datasetExpr Dataset expression, generally taken from the BQJDBC connection string. Can
* be null.
* @param defaultProjectId Project id to set on the returned DatasetReference if no project id is
* found in {@code datasetExpr}.
* @return DatasetReference
*/
static DatasetReference parseDatasetRef(String datasetExpr, String defaultProjectId) {
if (datasetExpr == null) {
return new DatasetReference().setDatasetId(null).setProjectId(defaultProjectId);
}
// We split datasetExpr on the last occurrence of a project delimiter. To account for each
// delimiter appearance in the expression, we pass -1 as the limit value to disable discarding
// trailing empty strings.
String[] datasetComponents = datasetExpr.split(LAST_PROJECT_DELIMITER_REGEX, -1);
boolean isDatasetIdOnly = datasetComponents.length == 1;
String datasetId = isDatasetIdOnly ? datasetComponents[0] : datasetComponents[1];
String datasetProjectId =
isDatasetIdOnly ? defaultProjectId : CatalogName.toProjectId(datasetComponents[0]);
return new DatasetReference().setDatasetId(datasetId).setProjectId(datasetProjectId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public ResultSet executeQuery() throws SQLException {
this.projectId,
this.RunnableStatement,
this.connection.getDataSet(),
this.connection.getDataSetProjectId(),
this.connection.getUseLegacySql(),
this.connection.getMaxBillingBytes());
this.logger.info("Executing Query: " + this.RunnableStatement);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/starschema/clouddb/jdbc/BQStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ protected QueryResponse runSyncQuery(String querySql, boolean unlimitedBillingBy
projectId,
querySql,
connection.getDataSet(),
connection.getDataSetProjectId(),
this.connection.getUseLegacySql(),
!unlimitedBillingBytes ? this.connection.getMaxBillingBytes() : null,
getSyncTimeoutMillis(), // we need this to respond fast enough to avoid any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ private int executeDML(String sql) throws SQLException {
projectId,
sql,
connection.getDataSet(),
connection.getDataSetProjectId(),
this.connection.getUseLegacySql(),
this.connection.getMaxBillingBytes(),
(long) querytimeout * 1000,
Expand Down Expand Up @@ -323,6 +324,7 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes)
projectId,
querySql,
connection.getDataSet(),
connection.getDataSetProjectId(),
this.connection.getUseLegacySql(),
billingBytes,
(long) querytimeout * 1000,
Expand Down
53 changes: 47 additions & 6 deletions src/main/java/net/starschema/clouddb/jdbc/BQSupportFuncts.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ public class BQSupportFuncts {
*/
public static String constructUrlFromPropertiesFile(
Properties properties, boolean full, String dataset) throws UnsupportedEncodingException {
String projectId = properties.getProperty("projectid");
String projectId = properties.getProperty("projectid"); // Represents the billing project.
logger.debug("projectId is: " + projectId);
String User = properties.getProperty("user");
String Password = properties.getProperty("password");
String path = properties.getProperty("path");
// The dataset property value can optionally include a reference to a project id, which will be
// used in conjunction with the default dataset to handle unqualified table references.
dataset = dataset == null ? properties.getProperty("dataset") : dataset;

String forreturn = "";
Expand Down Expand Up @@ -621,10 +623,12 @@ public static Properties readFromPropFile(String filePath) throws IOException {
* Run a query using the synchronous jobs.query() BigQuery endpoint.
*
* @param bigquery The BigQuery API wrapper
* @param projectId
* @param projectId The ProjectId to use for billing
* @param querySql The SQL to execute
* @param dataSet default dataset, can be null
* @param useLegacySql
* @param dataSetProjectId default dataset project id, only specified when the default dataset is
* non-null
* @param useLegacySql Use the legacy SQL dialect when true
* @param maxBillingBytes Maximum bytes that the API will allow to bill
* @param queryTimeoutMs The timeout at which point the API will return with an incomplete result
* NOTE: this does _not_ mean the query fails, just we have to get the results async
Expand All @@ -640,6 +644,7 @@ static QueryResponse runSyncQuery(
String projectId,
String querySql,
String dataSet,
String dataSetProjectId,
Boolean useLegacySql,
Long maxBillingBytes,
Long queryTimeoutMs,
Expand All @@ -653,6 +658,7 @@ static QueryResponse runSyncQuery(
projectId,
querySql,
dataSet,
dataSetProjectId,
useLegacySql,
maxBillingBytes,
queryTimeoutMs,
Expand All @@ -672,6 +678,7 @@ static Bigquery.Jobs.Query getSyncQuery(
String projectId,
String querySql,
String dataSet,
String dataSetProjectId,
Boolean useLegacySql,
Long maxBillingBytes,
Long queryTimeoutMs,
Expand All @@ -692,7 +699,8 @@ static Bigquery.Jobs.Query getSyncQuery(
qr = qr.setJobCreationMode(jobCreationMode.name());
}
if (dataSet != null) {
qr.setDefaultDataset(new DatasetReference().setDatasetId(dataSet).setProjectId(projectId));
qr.setDefaultDataset(
new DatasetReference().setDatasetId(dataSet).setProjectId(dataSetProjectId));
}
if (maxResults != null) {
qr.setMaxResults(maxResults);
Expand All @@ -701,12 +709,44 @@ static Bigquery.Jobs.Query getSyncQuery(
return bigquery.jobs().query(projectId, qr);
}

/**
* Starts a new query in async mode.
*
* <p>This method exists to maintain backwards compatibility with prior bqjdbc releases.
*
* @param bigquery The bigquery instance, which is authorized
* @param projectId The project ID to use for both the billing and default dataset project ids
* @param querySql The sql query which we want to run
* @param dataSet The default dataset, can be null
* @param useLegacySql Use the legacy SQL dialect when true
* @param maxBillingBytes Maximum bytes that the API will allow to bill
* @return A JobReference which we'll use to poll the bigquery, for its state, then for its mined
* data.
* @throws IOException
* <p>if the request for initializing or executing job fails
*/
public static Job startQuery(
Bigquery bigquery,
String projectId,
String querySql,
String dataSet,
Boolean useLegacySql,
Long maxBillingBytes)
throws IOException {
return startQuery(
bigquery, projectId, querySql, dataSet, projectId, useLegacySql, maxBillingBytes);
}

/**
* Starts a new query in async mode.
*
* @param bigquery The bigquery instance, which is authorized
* @param projectId The project's ID
* @param projectId The project ID to use for billing
* @param querySql The sql query which we want to run
* @param dataSet The default dataset, can be null
* @param dataSetProjectId The default dataset project id, only specified when the default dataset
* is non-null
* @param useLegacySql Use the legacy SQL dialect when true
* @return A JobReference which we'll use to poll the bigquery, for its state, then for its mined
* data.
* @throws IOException
Expand All @@ -717,6 +757,7 @@ public static Job startQuery(
String projectId,
String querySql,
String dataSet,
String dataSetProjectId,
Boolean useLegacySql,
Long maxBillingBytes)
throws IOException {
Expand All @@ -732,7 +773,7 @@ public static Job startQuery(

if (dataSet != null)
queryConfig.setDefaultDataset(
new DatasetReference().setDatasetId(dataSet).setProjectId(projectId));
new DatasetReference().setDatasetId(dataSet).setProjectId(dataSetProjectId));

job.setConfiguration(config);
queryConfig.setQuery(querySql);
Expand Down
Loading

0 comments on commit 0762d72

Please sign in to comment.