Skip to content

Commit

Permalink
[Kernel] Add API on TxnBuilder to set the table properties (delta-io#…
Browse files Browse the repository at this point in the history
…3269)

## Description
Adds an API to `TransactionBuilder` to set the table properties to
provide a way to configure the table by committing a transaction. For
example, user can enable inCommitTimestamp property with this API.

## How was this patch tested?
Adds unit tests when setting valid and invalid properties with this API.

## Does this PR introduce _any_ user-facing changes?
Yes, connectors can use this API to set table properties.
  • Loading branch information
EstherBear authored Jun 18, 2024
1 parent 75c6acb commit 0cd33a1
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
package io.delta.kernel;

import java.util.List;
import java.util.Map;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentTransactionException;
import io.delta.kernel.exceptions.InvalidConfigurationValueException;
import io.delta.kernel.exceptions.UnknownConfigurationException;
import io.delta.kernel.types.StructType;
import io.delta.kernel.internal.TableConfig;

/**
* Builder for creating a {@link Transaction} to mutate a Delta table.
Expand Down Expand Up @@ -69,13 +73,29 @@ TransactionBuilder withTransactionId(
String applicationId,
long transactionVersion);

/**
* Set the table properties for the table. When the table already contains the property with
* same key, it gets replaced if it doesn't have the same value.
*
* @param engine {@link Engine} instance to use.
* @param properties The table properties to set. These are key-value pairs that can be used to
* configure the table. And these properties are stored in the table metadata.
* @return updated {@link TransactionBuilder} instance.
*
* @since 3.3.0
*/
TransactionBuilder withTableProperties(Engine engine, Map<String, String> properties);

/**
* Build the transaction. Also validates the given info to ensure that a valid transaction can
* be created.
*
* @param engine {@link Engine} instance to use.
* @throws ConcurrentTransactionException if the table already has a committed transaction with
* the same given transaction identifier.
* @throws InvalidConfigurationValueException if the value of the property is invalid.
* @throws UnknownConfigurationException if any of the properties are unknown to
* {@link TableConfig}.
*/
Transaction build(Engine engine);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.exceptions;

import io.delta.kernel.annotation.Evolving;

/**
* Thrown when an illegal value is specified for a table property.
*
* @since 3.3.0
*/
@Evolving
public class InvalidConfigurationValueException extends KernelException {
public InvalidConfigurationValueException(String key, String value, String helpMessage) {
super(String.format(
"Invalid value for table property '%s': '%s'. %s", key, value, helpMessage));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.exceptions;

import io.delta.kernel.annotation.Evolving;

/**
* Thrown when an unknown configuration key is specified.
*
* @since 3.3.0
*/
@Evolving
public class UnknownConfigurationException extends KernelException {
public UnknownConfigurationException(String confKey) {
super(String.format("Unknown configuration was specified: %s", confKey));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ public static KernelException voidTypeEncountered() {
"Failed to parse the schema. Encountered unsupported Delta data type: VOID");
}

public static KernelException unknownConfigurationException(String confKey) {
return new UnknownConfigurationException(confKey);
}

public static KernelException invalidConfigurationValueException(
String key, String value, String helpMessage) {
return new InvalidConfigurationValueException(key, value, helpMessage);
}

/* ------------------------ HELPER METHODS ----------------------------- */
private static String formatTimestamp(long millisSinceEpochUTC) {
return new Timestamp(millisSinceEpochUTC).toInstant().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
*/
package io.delta.kernel.internal;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;

import io.delta.kernel.exceptions.InvalidConfigurationValueException;
import io.delta.kernel.exceptions.UnknownConfigurationException;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.util.IntervalParserUtils;

Expand Down Expand Up @@ -60,12 +65,23 @@ public class TableConfig<T> {
"needs to be a positive integer."
);

/**
* All the valid properties that can be set on the table.
*/
private static final HashMap<String, TableConfig> validProperties = new HashMap<>();
private final String key;
private final String defaultValue;
private final Function<String, T> fromString;
private final Predicate<T> validator;
private final String helpMessage;

static {
validProperties.put(
TOMBSTONE_RETENTION.getKey().toLowerCase(Locale.ROOT), TOMBSTONE_RETENTION);
validProperties.put(
CHECKPOINT_INTERVAL.getKey().toLowerCase(Locale.ROOT), CHECKPOINT_INTERVAL);
}

private TableConfig(
String key,
String defaultValue,
Expand All @@ -86,13 +102,53 @@ private TableConfig(
* @return the value of the table property
*/
public T fromMetadata(Metadata metadata) {
T value = fromString.apply(metadata.getConfiguration().getOrDefault(key, defaultValue));
if (!validator.test(value)) {
throw new IllegalArgumentException(
String.format("Invalid value for table property '%s': '%s'. %s",
key, value, helpMessage));
String value = metadata.getConfiguration().getOrDefault(key, defaultValue);
validate(value);
return fromString.apply(value);
}

/**
* Returns the key of the table property.
*
* @return the key of the table property
*/
public String getKey() {
return key;
}

/**
* Validates that the given properties have the delta prefix in the key name, and they are in
* the set of valid properties. The caller should get the validated configurations and store the
* case of the property name defined in TableConfig.
*
* @param configurations the properties to validate
* @throws InvalidConfigurationValueException if any of the properties are invalid
* @throws UnknownConfigurationException if any of the properties are unknown
*/
public static Map<String, String> validateProperties(Map<String, String> configurations) {
Map<String, String> validatedConfigurations = new HashMap<>();
for (Map.Entry<String, String> kv : configurations.entrySet()) {
String key = kv.getKey().toLowerCase(Locale.ROOT);
String value = kv.getValue();
if (key.startsWith("delta.")) {
TableConfig tableConfig = validProperties.get(key);
if (tableConfig != null) {
tableConfig.validate(value);
validatedConfigurations.put(tableConfig.getKey(), value);
} else {
throw DeltaErrors.unknownConfigurationException(key);
}
} else {
throw DeltaErrors.unknownConfigurationException(key);
}
}
return value;
return validatedConfigurations;
}
}

private void validate(String value) {
T parsedValue = fromString.apply(value);
if (!validator.test(parsedValue)) {
throw DeltaErrors.invalidConfigurationValueException(key, value, helpMessage);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class TransactionBuilderImpl implements TransactionBuilder {
private Optional<StructType> schema = Optional.empty();
private Optional<List<String>> partitionColumns = Optional.empty();
private Optional<SetTransaction> setTxnOpt = Optional.empty();
private Optional<Map<String, String>> tableProperties = Optional.empty();

public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation operation) {
this.table = table;
Expand Down Expand Up @@ -85,6 +86,12 @@ public TransactionBuilder withTransactionId(
return this;
}

@Override
public TransactionBuilder withTableProperties(Engine engine, Map<String, String> properties) {
this.tableProperties = Optional.of(new HashMap<>(properties));
return this;
}

@Override
public Transaction build(Engine engine) {
SnapshotImpl snapshot;
Expand All @@ -104,6 +111,19 @@ public Transaction build(Engine engine) {
boolean isNewTable = snapshot.getVersion(engine) < 0;
validate(engine, snapshot, isNewTable);

Metadata metadata = snapshot.getMetadata();
boolean shouldUpdateMetadata = false;
if (tableProperties.isPresent()) {
Map<String, String> validatedProperties =
TableConfig.validateProperties(tableProperties.get());
Map<String, String> newProperties =
metadata.filterOutUnchangedProperties(validatedProperties);
if (!newProperties.isEmpty()) {
shouldUpdateMetadata = true;
metadata = metadata.withNewConfiguration(newProperties);
}
}

return new TransactionImpl(
isNewTable,
table.getDataPath(),
Expand All @@ -112,8 +132,9 @@ public Transaction build(Engine engine) {
engineInfo,
operation,
snapshot.getProtocol(),
snapshot.getMetadata(),
setTxnOpt);
metadata,
setTxnOpt,
shouldUpdateMetadata);
}

/**
Expand Down Expand Up @@ -203,7 +224,7 @@ private Metadata getInitialMetadata() {
schema.get(), /* schema */
stringArrayValue(partitionColumnsCasePreserving), /* partitionColumns */
Optional.of(currentTimeMillis), /* createdTime */
stringStringMapValue(Collections.emptyMap()) /* configuration */
stringStringMapValue(Collections.emptyMap())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class TransactionImpl
private final Path logPath;
private final Protocol protocol;
private final Metadata metadata;
private final boolean shouldUpdateMetadata;
private final SnapshotImpl readSnapshot;
private final Optional<SetTransaction> setTxnOpt;

Expand All @@ -82,7 +83,8 @@ public TransactionImpl(
Operation operation,
Protocol protocol,
Metadata metadata,
Optional<SetTransaction> setTxnOpt) {
Optional<SetTransaction> setTxnOpt,
boolean shouldUpdateMetadata) {
this.isNewTable = isNewTable;
this.dataPath = dataPath;
this.logPath = logPath;
Expand All @@ -92,6 +94,7 @@ public TransactionImpl(
this.protocol = protocol;
this.metadata = metadata;
this.setTxnOpt = setTxnOpt;
this.shouldUpdateMetadata = shouldUpdateMetadata;
}

@Override
Expand Down Expand Up @@ -151,9 +154,11 @@ private TransactionCommitResult doCommit(
throws FileAlreadyExistsException {
List<Row> metadataActions = new ArrayList<>();
metadataActions.add(createCommitInfoSingleAction(generateCommitAction()));
if (shouldUpdateMetadata || isNewTable) {
metadataActions.add(createMetadataSingleAction(metadata.toRow()));
}
if (isNewTable) {
// In the future, we need to add metadata and action when there are any changes to them.
metadataActions.add(createMetadataSingleAction(metadata.toRow()));
metadataActions.add(createProtocolSingleAction(protocol.toRow()));
}
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,22 @@ public Metadata(
.collect(Collectors.toList())));
}

public Metadata withNewConfiguration(Map<String, String> configuration) {
Map<String, String> newConfiguration = new HashMap<>(getConfiguration());
newConfiguration.putAll(configuration);
return new Metadata(
this.id,
this.name,
this.description,
this.format,
this.schemaString,
this.schema,
this.partitionColumns,
this.createdTime,
VectorUtils.stringStringMapValue(newConfiguration)
);
}

public String getSchemaString() {
return schemaString;
}
Expand Down Expand Up @@ -164,6 +180,21 @@ public Map<String, String> getConfiguration() {
return Collections.unmodifiableMap(configuration.get());
}

/**
* Filter out the key-value pair matches exactly with the old properties.
*
* @param newProperties the new properties to be filtered
*
* @return the filtered properties
*/
public Map<String, String> filterOutUnchangedProperties(Map<String, String> newProperties) {
Map<String, String> oldProperties = getConfiguration();
return newProperties.entrySet().stream()
.filter(entry -> !oldProperties.containsKey(entry.getKey()) ||
!oldProperties.get(entry.getKey()).equals(entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
* Encode as a {@link Row} object with the schema {@link Metadata#FULL_SCHEMA}.
*
Expand Down
Loading

0 comments on commit 0cd33a1

Please sign in to comment.