Skip to content

Commit

Permalink
Added BQ Execute Action job label support
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Dec 5, 2023
1 parent 8e43f42 commit bce9e0f
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 288 deletions.
8 changes: 8 additions & 0 deletions docs/BigQueryExecute-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ write BigQuery data to this project.

**SQL**: SQL command to execute.

**BQ Job Labels:** Key value pairs to be added as labels to the BigQuery job. Keys must be unique. (Macro Enabled)

[job_source, type] are system defined labels used by CDAP for internal purpose and cannot be used as label keys.
Macro format is supported. example `key1:val1,key2:val2`

Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
For more information about labels, see [Docs](https://cloud.google.com/bigquery/docs/labels-intro#requirements).

**Dialect**: Dialect of the SQL command. The value must be 'legacy' or 'standard'. If set to 'standard',
the query will use BigQuery's standard SQL: https://cloud.google.com/bigquery/sql-reference/.
If set to 'legacy', BigQuery's legacy SQL dialect will be used for this query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void run(ActionContext context) throws Exception {
}

// Add labels for the BigQuery Execute job.
builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG));
builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG, config.getJobLabelKeyValue()));

QueryJobConfiguration queryConfig = builder.build();

Expand Down Expand Up @@ -205,6 +205,7 @@ public static final class Config extends AbstractBigQueryActionConfig {
private static final String DATASET = "dataset";
private static final String TABLE = "table";
private static final String NAME_LOCATION = "location";
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
private static final int ERROR_CODE_NOT_FOUND = 404;
private static final String STORE_RESULTS = "storeResults";

Expand Down Expand Up @@ -272,10 +273,17 @@ public static final class Config extends AbstractBigQueryActionConfig {
@Description("Whether to store results in a BigQuery Table.")
private Boolean storeResults;

@Name(NAME_BQ_JOB_LABELS)
@Macro
@Nullable
@Description("Key value pairs to be added as labels to the BigQuery job. Keys must be unique. [job_source, type] " +
"are reserved keys and cannot be used as label keys.")
protected String jobLabelKeyValue;

private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath,
@Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table,
@Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql,
@Nullable String mode, @Nullable Boolean storeResults) {
@Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue) {
this.project = project;
this.serviceAccountType = serviceAccountType;
this.serviceFilePath = serviceFilePath;
Expand All @@ -288,6 +296,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
this.sql = sql;
this.mode = mode;
this.storeResults = storeResults;
this.jobLabelKeyValue = jobLabelKeyValue;
}

public boolean isLegacySQL() {
Expand Down Expand Up @@ -328,6 +337,11 @@ public String getTable() {
return table;
}

@Nullable
public String getJobLabelKeyValue() {
return jobLabelKeyValue;
}

@Override
public void validate(FailureCollector failureCollector) {
validate(failureCollector, Collections.emptyMap());
Expand Down Expand Up @@ -376,9 +390,17 @@ public void validate(FailureCollector failureCollector, Map<String, String> argu
validateCmekKey(failureCollector, arguments);
}

if (!containsMacro(NAME_BQ_JOB_LABELS)) {
validateJobLabelKeyValue(failureCollector);
}

failureCollector.getOrThrowException();
}

void validateJobLabelKeyValue(FailureCollector failureCollector) {
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
}

void validateCmekKey(FailureCollector failureCollector, Map<String, String> arguments) {
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(cmekKey, arguments, failureCollector);
//these fields are needed to check if bucket exists or not and for location validation
Expand Down Expand Up @@ -449,6 +471,7 @@ public static class Builder {
private String sql;
private String mode;
private Boolean storeResults;
private String jobLabelKeyValue;

public Builder setProject(@Nullable String project) {
this.project = project;
Expand Down Expand Up @@ -505,6 +528,11 @@ public Builder setSql(@Nullable String sql) {
return this;
}

public Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) {
this.jobLabelKeyValue = jobLabelKeyValue;
return this;
}

public Config build() {
return new Config(
project,
Expand All @@ -518,7 +546,8 @@ public Config build() {
dialect,
sql,
mode,
storeResults
storeResults,
jobLabelKeyValue
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,106 +187,8 @@ void validateCmekKey(FailureCollector failureCollector, Map<String, String> argu
validateCmekKeyLocation(cmekKeyName, null, location, failureCollector);
}

/**
* Validates job label key value pairs, as per the following rules:
* Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
* Defined in the following link:
* <a href="https://cloud.google.com/bigquery/docs/labels-intro#requirements">Docs</a>
* @param failureCollector failure collector
*/
void validateJobLabelKeyValue(FailureCollector failureCollector) {
Set<String> reservedKeys = BigQueryUtil.BQ_JOB_LABEL_SYSTEM_KEYS;
int maxLabels = 64 - reservedKeys.size();
int maxKeyLength = 63;
int maxValueLength = 63;

String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$";
String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$";
String capitalLetterRegex = ".*[A-Z].*";
String labelKeyValue = getJobLabelKeyValue();

if (Strings.isNullOrEmpty(labelKeyValue)) {
return;
}

String[] keyValuePairs = labelKeyValue.split(",");
Set<String> uniqueKeys = new HashSet<>();

for (String keyValuePair : keyValuePairs) {

// Adding a label without a value is valid behavior
// Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value
String[] keyValue = keyValuePair.trim().split(":");
boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2;
boolean isValuePresent = keyValue.length == 2;


if (!isKeyPresent) {
failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair),
"Job label key value pair should be in the format 'key:value'.")
.withConfigProperty(NAME_BQ_JOB_LABELS);
continue;
}

// Check if key is reserved
if (reservedKeys.contains(keyValue[0])) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]),
"A system label already exists with same name.").withConfigProperty(NAME_BQ_JOB_LABELS);
continue;
}

String key = keyValue[0];
String value = isValuePresent ? keyValue[1] : "";
boolean isKeyValid = true;
boolean isValueValid = true;

// Key cannot be empty
if (Strings.isNullOrEmpty(key)) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
"Job label key cannot be empty.").withConfigProperty(NAME_BQ_JOB_LABELS);
isKeyValid = false;
}

// Key cannot be longer than 63 characters
if (key.length() > maxKeyLength) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
"Job label key cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
isKeyValid = false;
}

// Value cannot be longer than 63 characters
if (value.length() > maxValueLength) {
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
"Job label value cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
isValueValid = false;
}

if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
"Job label key can only contain lowercase letters, numeric characters, " +
"underscores, and dashes. Check docs for more details.")
.withConfigProperty(NAME_BQ_JOB_LABELS);
isKeyValid = false;
}

if (isValuePresent && isValueValid &&
(!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) {
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
"Job label value can only contain lowercase letters, numeric characters, " +
"underscores, and dashes.").withConfigProperty(NAME_BQ_JOB_LABELS);
}

if (isKeyValid && !uniqueKeys.add(key)) {
failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key),
"Job label key should be unique.").withConfigProperty(NAME_BQ_JOB_LABELS);
}
}
// Check if number of labels is greater than 64 - reserved keys
if (uniqueKeys.size() > maxLabels) {
failureCollector.addFailure("Number of job labels exceeds the limit.",
String.format("Number of job labels cannot be greater than %d.", maxLabels))
.withConfigProperty(NAME_BQ_JOB_LABELS);
}
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
}

public String getDatasetProject() {
Expand Down
104 changes: 104 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
import io.cdap.cdap.etl.api.validation.InvalidStageException;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySinkConfig;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySink;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig;
Expand All @@ -60,6 +61,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -923,4 +925,106 @@ public static String getStagingBucketName(Map<String, String> arguments, @Nullab
}
return bucket;
}

/**
* Validates job label key value pairs, as per the following rules:
* Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
* Defined in the following link:
* <a href="https://cloud.google.com/bigquery/docs/labels-intro#requirements">Docs</a>
* @param failureCollector failure collector
*/
public static void validateJobLabelKeyValue(String labelKeyValue, FailureCollector failureCollector,
String stageConfigProperty) {
Set<String> reservedKeys = BQ_JOB_LABEL_SYSTEM_KEYS;
int maxLabels = 64 - reservedKeys.size();
int maxKeyLength = 63;
int maxValueLength = 63;

String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$";
String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$";
String capitalLetterRegex = ".*[A-Z].*";

if (com.google.api.client.util.Strings.isNullOrEmpty(labelKeyValue)) {
return;
}

String[] keyValuePairs = labelKeyValue.split(",");
Set<String> uniqueKeys = new HashSet<>();

for (String keyValuePair : keyValuePairs) {

// Adding a label without a value is valid behavior
// Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value
String[] keyValue = keyValuePair.trim().split(":");
boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2;
boolean isValuePresent = keyValue.length == 2;


if (!isKeyPresent) {
failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair),
"Job label key value pair should be in the format 'key:value'.")
.withConfigProperty(stageConfigProperty);
continue;
}

// Check if key is reserved
if (reservedKeys.contains(keyValue[0])) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]),
"A system label already exists with same name.").withConfigProperty(stageConfigProperty);
continue;
}

String key = keyValue[0];
String value = isValuePresent ? keyValue[1] : "";
boolean isKeyValid = true;
boolean isValueValid = true;

// Key cannot be empty
if (com.google.api.client.util.Strings.isNullOrEmpty(key)) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
"Job label key cannot be empty.").withConfigProperty(stageConfigProperty);
isKeyValid = false;
}

// Key cannot be longer than 63 characters
if (key.length() > maxKeyLength) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
"Job label key cannot be longer than 63 characters.").withConfigProperty(stageConfigProperty);
isKeyValid = false;
}

// Value cannot be longer than 63 characters
if (value.length() > maxValueLength) {
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
"Job label value cannot be longer than 63 characters.").withConfigProperty(stageConfigProperty);
isValueValid = false;
}

if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) {
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
"Job label key can only contain lowercase letters, numeric characters, " +
"underscores, and dashes. Check docs for more details.")
.withConfigProperty(stageConfigProperty);
isKeyValid = false;
}

if (isValuePresent && isValueValid &&
(!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) {
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
"Job label value can only contain lowercase letters, numeric characters, " +
"underscores, and dashes.").withConfigProperty(stageConfigProperty);
}

if (isKeyValid && !uniqueKeys.add(key)) {
failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key),
"Job label key should be unique.").withConfigProperty(stageConfigProperty);
}
}
// Check if number of labels is greater than 64 - reserved keys
if (uniqueKeys.size() > maxLabels) {
failureCollector.addFailure("Number of job labels exceeds the limit.",
String.format("Number of job labels cannot be greater than %d.", maxLabels))
.withConfigProperty(stageConfigProperty);
}
}
}
Loading

0 comments on commit bce9e0f

Please sign in to comment.