Skip to content

Commit

Permalink
Add ability to set dataset project id
Browse files Browse the repository at this point in the history
This CL adds the ability to specify a dataset project id as part of
the dataset field in the connection string used to create a
BQConnection instance. Previously, we used the project defined in the
URL's path component for both the billing and default dataset
projects. Recall that the default project and dataset are used to
disambiguate unqualified table names referenced in a query being
processed by a connection.

Now, we parse the potentially fully qualified dataset to check for a
project id. If no project id is found, we revert back to our former
behavior, using the billing project as the default dataset project.
  • Loading branch information
mmotoyama committed Nov 27, 2024
1 parent 3518182 commit a39d3af
Show file tree
Hide file tree
Showing 8 changed files with 519 additions and 25 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ For instance for a _service account_ in a properties file:

```ini
projectid=disco-parsec-659
dataset=publicdata.samples
type=service
user[email protected]
password=bigquery_credentials.p12
Expand Down
94 changes: 89 additions & 5 deletions src/main/java/net/starschema/clouddb/jdbc/BQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.api.client.http.HttpTransport;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.common.base.Splitter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
Expand All @@ -42,6 +43,13 @@
* @author Gunics Balázs, Horváth Attila
*/
public class BQConnection implements Connection {

// We permit using either a "." or ":" as the delimiter between the dataset and project ids.
private static final String PROJECT_DELIMITERS = ":.";
// Find the last instance of a project delimiter using a regex lookahead.
private static final String LAST_PROJECT_DELIMITER_REGEX =
"[" + PROJECT_DELIMITERS + "](?=[^" + PROJECT_DELIMITERS + "]*$)";

/** Variable to store auto commit mode */
private boolean autoCommitEnabled = false;

Expand All @@ -50,10 +58,19 @@ public class BQConnection implements Connection {
/** The bigquery client to access the service. */
private Bigquery bigquery = null;

/** The default dataset id to configure on queries processed by this connection. */
private String dataset = null;

/** The ProjectId for the connection */
private String projectId = null;
/**
* The default dataset project id to configure on queries processed by this connection.
*
* <p>We follow the same naming convention as the corresponding <a
* href="https://cloud.google.com/bigquery/docs/reference/system-variables">system variable</a>
*/
private String datasetProjectId = null;

/** The ProjectId to use for billing. */
private final String projectId;

/** Boolean to determine if the Connection is closed */
private boolean isclosed = false;
Expand Down Expand Up @@ -152,7 +169,7 @@ public BQConnection(String url, Properties loginProp, HttpTransport httpTranspor

if (matchData.find()) {
this.projectId = CatalogName.toProjectId(matchData.group(1));
this.dataset = matchData.group(2);
configureDataSet(matchData.group(2));
} else {
this.projectId = CatalogName.toProjectId(pathParams);
}
Expand Down Expand Up @@ -415,6 +432,19 @@ public void close() throws SQLException {
}
}

/**
* Parses the input dataset specification and sets the values for the dataset and datasetProjectId
* instance variables.
*/
private void configureDataSet(String datasetSpec) {
DatasetReference datasetRef = parseDatasetRef(datasetSpec, this.projectId);
this.dataset = datasetRef.getDatasetId();
this.datasetProjectId = datasetRef.getProjectId();
}

/**
* Returns the default dataset that should be configured on queries processed by this connection.
*/
public String getDataSet() {
return this.dataset;
}
Expand Down Expand Up @@ -577,7 +607,7 @@ public Struct createStruct(String typeName, Object[] attributes) throws SQLExcep

@Override
public void setSchema(String schema) {
this.dataset = schema;
configureDataSet(schema);
}

@Override
Expand Down Expand Up @@ -694,11 +724,19 @@ public DatabaseMetaData getMetaData() throws SQLException {
return metadata;
}

/** Getter method for projectId */
/** Getter method for the projectId to use for billing. */
public String getProjectId() {
return projectId;
}

/**
* Returns the dataset project id that should be configured on queries processed by this
* connection.
*/
public String getDataSetProjectId() {
return this.datasetProjectId;
}

/**
*
*
Expand Down Expand Up @@ -1258,4 +1296,50 @@ public Integer getTimeoutMs() {
public JobCreationMode getJobCreationMode() {
return jobCreationMode;
}

/**
* Returns a DatasetReference extracted from the input dataset specification, which may optionally
* include a project id reference.
*
* <p>While there are well-defined rules regarding <a
* href="https://cloud.google.com/resource-manager/docs/creating-managing-projects">project
* ids</a> and <a href="https://cloud.google.com/bigquery/docs/datasets#dataset-naming">dataset
* names</a>, we must support historical references to both that don't adhere to the documented
* rules (such as having a domain name with a colon as part of the project id). The only
* restriction that we impose is that either identifier consists of at least one character. We
* permit using either a "." or ":" as the delimiter between the dataset id and the project id. If
* the {@code datasetSpec} is either null or an empty string, we return a DatasetReference with a
* null dataset id and its project id set to the {@code defaultProjectId} argument.
*
* <p>Visible for testing.
*
* @param datasetSpec Dataset specification, typically from the BQJDBC connection string.
* @param defaultProjectId Project id to set on the resulting DatasetReference if no value found
* in {@code datasetSpec}.
* @return DatasetReference
*/
static DatasetReference parseDatasetRef(@Nullable String datasetSpec, String defaultProjectId) {
if (defaultProjectId == null || defaultProjectId.isEmpty()) {
throw new IllegalArgumentException("defaultProjectId cannot be null or empty");
}
if (datasetSpec == null || datasetSpec.isEmpty()) {
return new DatasetReference().setDatasetId(null).setProjectId(defaultProjectId);
}
// We split datasetSpec on the last occurrence of a project delimiter. To detect invalid empty
// identifiers, we pass -1 as the limit value to disable discarding empty strings.
String[] datasetComponents = datasetSpec.split(LAST_PROJECT_DELIMITER_REGEX, -1);
boolean isDatasetIdOnly = datasetComponents.length == 1;
String datasetId = isDatasetIdOnly ? datasetComponents[0] : datasetComponents[1];
String datasetProjectId =
isDatasetIdOnly ? defaultProjectId : CatalogName.toProjectId(datasetComponents[0]);
if (datasetId.isEmpty()) {
throw new IllegalArgumentException(
String.format("Resulting dataset id is empty after parsing '%s'", datasetSpec));
}
if (datasetProjectId.isEmpty()) {
throw new IllegalArgumentException(
String.format("Resulting dataset project id is empty after parsing '%s'", datasetSpec));
}
return new DatasetReference().setDatasetId(datasetId).setProjectId(datasetProjectId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public ResultSet executeQuery() throws SQLException {
this.projectId,
this.RunnableStatement,
this.connection.getDataSet(),
this.connection.getDataSetProjectId(),
this.connection.getUseLegacySql(),
this.connection.getMaxBillingBytes());
this.logger.info("Executing Query: " + this.RunnableStatement);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/starschema/clouddb/jdbc/BQStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ protected QueryResponse runSyncQuery(String querySql, boolean unlimitedBillingBy
projectId,
querySql,
connection.getDataSet(),
connection.getDataSetProjectId(),
this.connection.getUseLegacySql(),
!unlimitedBillingBytes ? this.connection.getMaxBillingBytes() : null,
getSyncTimeoutMillis(), // we need this to respond fast enough to avoid any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ private int executeDML(String sql) throws SQLException {
projectId,
sql,
connection.getDataSet(),
connection.getDataSetProjectId(),
this.connection.getUseLegacySql(),
this.connection.getMaxBillingBytes(),
(long) querytimeout * 1000,
Expand Down Expand Up @@ -323,6 +324,7 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes)
projectId,
querySql,
connection.getDataSet(),
connection.getDataSetProjectId(),
this.connection.getUseLegacySql(),
billingBytes,
(long) querytimeout * 1000,
Expand Down
53 changes: 47 additions & 6 deletions src/main/java/net/starschema/clouddb/jdbc/BQSupportFuncts.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ public class BQSupportFuncts {
*/
public static String constructUrlFromPropertiesFile(
Properties properties, boolean full, String dataset) throws UnsupportedEncodingException {
String projectId = properties.getProperty("projectid");
String projectId = properties.getProperty("projectid"); // Represents the billing project.
logger.debug("projectId is: " + projectId);
String User = properties.getProperty("user");
String Password = properties.getProperty("password");
String path = properties.getProperty("path");
// The dataset property value can optionally include a reference to a project id, which will be
// used in conjunction with the default dataset to handle unqualified table references.
dataset = dataset == null ? properties.getProperty("dataset") : dataset;

String forreturn = "";
Expand Down Expand Up @@ -621,10 +623,12 @@ public static Properties readFromPropFile(String filePath) throws IOException {
* Run a query using the synchronous jobs.query() BigQuery endpoint.
*
* @param bigquery The BigQuery API wrapper
* @param projectId
* @param projectId The ProjectId to use for billing
* @param querySql The SQL to execute
* @param dataSet default dataset, can be null
* @param useLegacySql
* @param dataSetProjectId default dataset project id, only specified when the default dataset is
* non-null
* @param useLegacySql Use the legacy SQL dialect when true
* @param maxBillingBytes Maximum bytes that the API will allow to bill
* @param queryTimeoutMs The timeout at which point the API will return with an incomplete result
* NOTE: this does _not_ mean the query fails, just we have to get the results async
Expand All @@ -640,6 +644,7 @@ static QueryResponse runSyncQuery(
String projectId,
String querySql,
String dataSet,
String dataSetProjectId,
Boolean useLegacySql,
Long maxBillingBytes,
Long queryTimeoutMs,
Expand All @@ -653,6 +658,7 @@ static QueryResponse runSyncQuery(
projectId,
querySql,
dataSet,
dataSetProjectId,
useLegacySql,
maxBillingBytes,
queryTimeoutMs,
Expand All @@ -672,6 +678,7 @@ static Bigquery.Jobs.Query getSyncQuery(
String projectId,
String querySql,
String dataSet,
String dataSetProjectId,
Boolean useLegacySql,
Long maxBillingBytes,
Long queryTimeoutMs,
Expand All @@ -692,7 +699,8 @@ static Bigquery.Jobs.Query getSyncQuery(
qr = qr.setJobCreationMode(jobCreationMode.name());
}
if (dataSet != null) {
qr.setDefaultDataset(new DatasetReference().setDatasetId(dataSet).setProjectId(projectId));
qr.setDefaultDataset(
new DatasetReference().setDatasetId(dataSet).setProjectId(dataSetProjectId));
}
if (maxResults != null) {
qr.setMaxResults(maxResults);
Expand All @@ -701,12 +709,44 @@ static Bigquery.Jobs.Query getSyncQuery(
return bigquery.jobs().query(projectId, qr);
}

/**
* Starts a new query in async mode.
*
* <p>This method exists to maintain backwards compatibility with prior bqjdbc releases.
*
* @param bigquery The bigquery instance, which is authorized
* @param projectId The project ID to use for both the billing and default dataset project ids
* @param querySql The sql query which we want to run
* @param dataSet The default dataset, can be null
* @param useLegacySql Use the legacy SQL dialect when true
* @param maxBillingBytes Maximum bytes that the API will allow to bill
* @return A JobReference which we'll use to poll the bigquery, for its state, then for its mined
* data.
* @throws IOException
* <p>if the request for initializing or executing job fails
*/
public static Job startQuery(
Bigquery bigquery,
String projectId,
String querySql,
String dataSet,
Boolean useLegacySql,
Long maxBillingBytes)
throws IOException {
return startQuery(
bigquery, projectId, querySql, dataSet, projectId, useLegacySql, maxBillingBytes);
}

/**
* Starts a new query in async mode.
*
* @param bigquery The bigquery instance, which is authorized
* @param projectId The project's ID
* @param projectId The project ID to use for billing
* @param querySql The sql query which we want to run
* @param dataSet The default dataset, can be null
* @param dataSetProjectId The default dataset project id, only specified when the default dataset
* is non-null
* @param useLegacySql Use the legacy SQL dialect when true
* @return A JobReference which we'll use to poll the bigquery, for its state, then for its mined
* data.
* @throws IOException
Expand All @@ -717,6 +757,7 @@ public static Job startQuery(
String projectId,
String querySql,
String dataSet,
String dataSetProjectId,
Boolean useLegacySql,
Long maxBillingBytes)
throws IOException {
Expand All @@ -732,7 +773,7 @@ public static Job startQuery(

if (dataSet != null)
queryConfig.setDefaultDataset(
new DatasetReference().setDatasetId(dataSet).setProjectId(projectId));
new DatasetReference().setDatasetId(dataSet).setProjectId(dataSetProjectId));

job.setConfiguration(config);
queryConfig.setQuery(querySql);
Expand Down
Loading

0 comments on commit a39d3af

Please sign in to comment.