diff --git a/ams/api/pom.xml b/ams/api/pom.xml
index 19bc0b5db4..15c7e6823d 100644
--- a/ams/api/pom.xml
+++ b/ams/api/pom.xml
@@ -99,6 +99,16 @@
${curator.version}test
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.10.2
+ provided
+
+
+ org.apache.commons
+ commons-lang3
+
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/Action.java b/ams/api/src/main/java/com/netease/arctic/ams/api/Action.java
new file mode 100644
index 0000000000..058ea11d0c
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/Action.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api;
+
+import com.netease.arctic.ams.api.process.TableProcessState;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.util.Collections;
+import java.util.Set;
+
+public enum Action {
+ MINOR_OPTIMIZING("minor-optimizing", 0),
+ MAJOR_OPTIMIZING("major-optimizing", 1),
+ EXTERNAL_OPTIMIZING("external-optimizing", 2),
+ // refresh all metadata including snapshots, watermark, configurations, schema, etc.
+ REFRESH_METADATA("refresh-metadata", 10),
+ // expire all metadata and data files necessarily.
+ EXPIRE_DATA("expire-data", 11),
+ DELETE_ORPHAN_FILES("delete-orphan-files", 12),
+ SYNC_HIVE_COMMIT("sync-hive-commit", 13);
+
+ /**
+ * Arbitrary actions are actions that can be handled by a single optimizer. The processes they
+ * related to like refreshing, expiring, cleaning and syncing all share the same basic
+ * implementations which are {@link com.netease.arctic.ams.api.process.TableProcess} and {@link
+ * TableProcessState} and they won't have any spitted stages like optimizing processes(plan,
+ * execute, commit), so they can be easily triggered and managed. If you want to add a new action
+ * which is handled stand-alone, you should add it to this set, and you would find it's easy to
+ * implement the process and state.
+ */
+ public static final Set ARBITRARY_ACTIONS =
+ Collections.unmodifiableSet(
+ Sets.newHashSet(REFRESH_METADATA, EXPIRE_DATA, DELETE_ORPHAN_FILES, SYNC_HIVE_COMMIT));
+
+ public static boolean isArbitrary(Action action) {
+ return ARBITRARY_ACTIONS.contains(action);
+ }
+
+ private final String description;
+ private final int code;
+
+ Action(String description, int dbValue) {
+ this.description = description;
+ this.code = dbValue;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public static Action of(int code) {
+ for (Action action : Action.values()) {
+ if (action.code == code) {
+ return action;
+ }
+ }
+ throw new IllegalArgumentException("No action with code: " + code);
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/ServerTableIdentifier.java b/ams/api/src/main/java/com/netease/arctic/ams/api/ServerTableIdentifier.java
new file mode 100644
index 0000000000..a8de222c5e
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/ServerTableIdentifier.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api;
+
+import java.util.Objects;
+
+/** Server-side table identifier containing server-side id and table format. */
+public class ServerTableIdentifier {
+
+ private Long id;
+ private String catalog;
+ private String database;
+ private String tableName;
+ private TableFormat format;
+
+ // used by the MyBatis framework.
+ private ServerTableIdentifier() {}
+
+ private ServerTableIdentifier(TableIdentifier tableIdentifier, TableFormat format) {
+ this.catalog = tableIdentifier.getCatalog();
+ this.database = tableIdentifier.getDatabase();
+ this.tableName = tableIdentifier.getTableName();
+ this.format = format;
+ }
+
+ private ServerTableIdentifier(
+ String catalog, String database, String tableName, TableFormat format) {
+ this.catalog = catalog;
+ this.database = database;
+ this.tableName = tableName;
+ this.format = format;
+ }
+
+ private ServerTableIdentifier(
+ Long id, String catalog, String database, String tableName, TableFormat format) {
+ this.id = id;
+ this.catalog = catalog;
+ this.database = database;
+ this.tableName = tableName;
+ this.format = format;
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ public String getCatalog() {
+ return catalog;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public TableFormat getFormat() {
+ return this.format;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public void setCatalog(String catalog) {
+ this.catalog = catalog;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public void setFormat(TableFormat format) {
+ this.format = format;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ServerTableIdentifier that = (ServerTableIdentifier) o;
+ return Objects.equals(id, that.id)
+ && Objects.equals(catalog, that.catalog)
+ && Objects.equals(database, that.database)
+ && Objects.equals(tableName, that.tableName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, catalog, database, tableName);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s.%s.%s(tableId=%d)", catalog, database, tableName, id);
+ }
+
+ public static ServerTableIdentifier of(TableIdentifier tableIdentifier, TableFormat format) {
+ return new ServerTableIdentifier(tableIdentifier, format);
+ }
+
+ public static ServerTableIdentifier of(
+ String catalog, String database, String tableName, TableFormat format) {
+ return new ServerTableIdentifier(catalog, database, tableName, format);
+ }
+
+ public static ServerTableIdentifier of(
+ Long id, String catalog, String database, String tableName, TableFormat format) {
+ return new ServerTableIdentifier(id, catalog, database, tableName, format);
+ }
+
+ public TableIdentifier getIdentifier() {
+ return new TableIdentifier(catalog, database, tableName);
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/StateField.java b/ams/api/src/main/java/com/netease/arctic/ams/api/StateField.java
new file mode 100644
index 0000000000..56d6c12aa2
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/StateField.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * This annotation is used to mark fields in a Persistent Object that will be kept consistency by
+ * Amoro framework.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+public @interface StateField {}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/TableRuntime.java b/ams/api/src/main/java/com/netease/arctic/ams/api/TableRuntime.java
new file mode 100644
index 0000000000..bcd723ad86
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/TableRuntime.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api;
+
+import com.netease.arctic.ams.api.config.TableConfiguration;
+import com.netease.arctic.ams.api.process.OptimizingState;
+import com.netease.arctic.ams.api.process.TableProcessState;
+
+import java.util.List;
+
+/**
+ * TableRuntime is the key interface for the AMS framework to interact with the table. Typically, it
+ * is used to get the table's configuration, process states, and table identifier. The key usage is
+ * {@link com.netease.arctic.ams.api.process.ProcessFactory} to create and recover Process.
+ */
+public interface TableRuntime {
+
+ /**
+ * Get the list of optimizing process states. Normally, the list contains one default optimizing
+ * state at least. There could be more than one states if multiple optimizing processes are
+ * running.
+ *
+ * @return the list of optimizing process states
+ */
+ List getOptimizingStates();
+
+ /**
+ * Get the list of arbitrary process states. One arbitrary state belongs to one arbitrary process
+ * related to one {@link com.netease.arctic.ams.api.Action#ARBITRARY_ACTIONS}. There could be more
+ * than one arbitrary states depending on scheduler implementation.
+ *
+ * @return the list of arbitrary process states
+ */
+ List getArbitraryStates();
+
+ /**
+ * Get the table identifier containing server side id and table format.
+ *
+ * @return the table identifier
+ */
+ ServerTableIdentifier getTableIdentifier();
+
+ /**
+ * Get the table configuration.
+ *
+ * @return the table configuration
+ */
+ TableConfiguration getTableConfiguration();
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/config/DataExpirationConfig.java b/ams/api/src/main/java/com/netease/arctic/ams/api/config/DataExpirationConfig.java
new file mode 100644
index 0000000000..9f77d42d90
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/config/DataExpirationConfig.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.config;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+import java.util.Set;
+
+/** Data expiration configuration. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DataExpirationConfig {
+ // data-expire.enabled
+ private boolean enabled;
+ // data-expire.field
+ private String expirationField;
+ // data-expire.level
+ private ExpireLevel expirationLevel;
+ // data-expire.retention-time
+ private long retentionTime;
+ // data-expire.datetime-string-pattern
+ private String dateTimePattern;
+ // data-expire.datetime-number-format
+ private String numberDateFormat;
+ // data-expire.since
+ private Since since;
+
+ @VisibleForTesting
+ public enum ExpireLevel {
+ PARTITION,
+ FILE;
+
+ public static ExpireLevel fromString(String level) {
+ Preconditions.checkArgument(null != level, "Invalid level type: null");
+ try {
+ return ExpireLevel.valueOf(level.toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format("Invalid level type: %s", level), e);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public enum Since {
+ LATEST_SNAPSHOT,
+ CURRENT_TIMESTAMP;
+
+ public static Since fromString(String since) {
+ Preconditions.checkArgument(null != since, "data-expire.since is invalid: null");
+ try {
+ return Since.valueOf(since.toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format("Unable to expire data since: %s", since), e);
+ }
+ }
+ }
+
+ public static final Set FIELD_TYPES =
+ Sets.newHashSet(Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG);
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataExpirationConfig.class);
+
+ public DataExpirationConfig() {}
+
+ public DataExpirationConfig(
+ boolean enabled,
+ String expirationField,
+ ExpireLevel expirationLevel,
+ long retentionTime,
+ String dateTimePattern,
+ String numberDateFormat,
+ Since since) {
+ this.enabled = enabled;
+ this.expirationField = expirationField;
+ this.expirationLevel = expirationLevel;
+ this.retentionTime = retentionTime;
+ this.dateTimePattern = dateTimePattern;
+ this.numberDateFormat = numberDateFormat;
+ this.since = since;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public DataExpirationConfig setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
+
+ public String getExpirationField() {
+ return expirationField;
+ }
+
+ public DataExpirationConfig setExpirationField(String expirationField) {
+ this.expirationField = expirationField;
+ return this;
+ }
+
+ public ExpireLevel getExpirationLevel() {
+ return expirationLevel;
+ }
+
+ public DataExpirationConfig setExpirationLevel(ExpireLevel expirationLevel) {
+ this.expirationLevel = expirationLevel;
+ return this;
+ }
+
+ public long getRetentionTime() {
+ return retentionTime;
+ }
+
+ public DataExpirationConfig setRetentionTime(long retentionTime) {
+ this.retentionTime = retentionTime;
+ return this;
+ }
+
+ public String getDateTimePattern() {
+ return dateTimePattern;
+ }
+
+ public DataExpirationConfig setDateTimePattern(String dateTimePattern) {
+ this.dateTimePattern = dateTimePattern;
+ return this;
+ }
+
+ public String getNumberDateFormat() {
+ return numberDateFormat;
+ }
+
+ public DataExpirationConfig setNumberDateFormat(String numberDateFormat) {
+ this.numberDateFormat = numberDateFormat;
+ return this;
+ }
+
+ public Since getSince() {
+ return since;
+ }
+
+ public DataExpirationConfig setSince(Since since) {
+ this.since = since;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DataExpirationConfig)) {
+ return false;
+ }
+ DataExpirationConfig config = (DataExpirationConfig) o;
+ return enabled == config.enabled
+ && retentionTime == config.retentionTime
+ && Objects.equal(expirationField, config.expirationField)
+ && expirationLevel == config.expirationLevel
+ && Objects.equal(dateTimePattern, config.dateTimePattern)
+ && Objects.equal(numberDateFormat, config.numberDateFormat)
+ && since == config.since;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ enabled,
+ expirationField,
+ expirationLevel,
+ retentionTime,
+ dateTimePattern,
+ numberDateFormat,
+ since);
+ }
+
+ public boolean isValid(Types.NestedField field, String name) {
+ return isEnabled()
+ && getRetentionTime() > 0
+ && validateExpirationField(field, name, getExpirationField());
+ }
+
+ private boolean validateExpirationField(
+ Types.NestedField field, String name, String expirationField) {
+ if (StringUtils.isBlank(expirationField) || null == field) {
+ LOG.warn(
+ String.format(
+ "Field(%s) used to determine data expiration is illegal for table(%s)",
+ expirationField, name));
+ return false;
+ }
+ Type.TypeID typeID = field.type().typeId();
+ if (!DataExpirationConfig.FIELD_TYPES.contains(typeID)) {
+ LOG.warn(
+ String.format(
+ "Table(%s) field(%s) type(%s) is not supported for data expiration, please use the "
+ + "following types: %s",
+ name,
+ expirationField,
+ typeID.name(),
+ StringUtils.join(DataExpirationConfig.FIELD_TYPES, ", ")));
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/config/OptimizingConfig.java b/ams/api/src/main/java/com/netease/arctic/ams/api/config/OptimizingConfig.java
new file mode 100644
index 0000000000..c8cb06c890
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/config/OptimizingConfig.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.config;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+/** Configuration for optimizing process scheduling and executing. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class OptimizingConfig {
+
+ // self-optimizing.enabled
+ private boolean enabled;
+ // self-optimizing.quota
+ private double targetQuota;
+ // self-optimizing.group
+ private String optimizerGroup;
+ // self-optimizing.execute.num-retries
+ private int maxExecuteRetryCount;
+ // self-optimizing.commit.num-retries
+ private int maxCommitRetryCount;
+ // self-optimizing.target-size
+ private long targetSize;
+ // self-optimizing.max-task-size-bytes
+ private long maxTaskSize;
+ // self-optimizing.max-file-count
+ private int maxFileCount;
+ // read.split.open-file-cost
+ private long openFileCost;
+ // self-optimizing.fragment-ratio
+ private int fragmentRatio;
+ // self-optimizing.min-target-size-ratio
+ private double minTargetSizeRatio;
+ // self-optimizing.minor.trigger.file-count
+ private int minorLeastFileCount;
+ // self-optimizing.minor.trigger.interval
+ private int minorLeastInterval;
+ // self-optimizing.major.trigger.duplicate-ratio
+ private double majorDuplicateRatio;
+ // self-optimizing.full.trigger.interval
+ private int fullTriggerInterval;
+ // self-optimizing.full.rewrite-all-files
+ private boolean fullRewriteAllFiles;
+ // base.file-index.hash-bucket
+ private int baseHashBucket;
+ // base.refresh-interval
+ private long baseRefreshInterval;
+ // base.hive.refresh-interval
+ private long hiveRefreshInterval;
+ // self-optimizing.min-plan-interval
+ private long minPlanInterval;
+
+ public OptimizingConfig() {}
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public OptimizingConfig setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
+
+ public int getMaxCommitRetryCount() {
+ return maxCommitRetryCount;
+ }
+
+ public OptimizingConfig setMaxCommitRetryCount(int maxCommitRetryCount) {
+ this.maxCommitRetryCount = maxCommitRetryCount;
+ return this;
+ }
+
+ public double getTargetQuota() {
+ return targetQuota;
+ }
+
+ public OptimizingConfig setTargetQuota(double targetQuota) {
+ this.targetQuota = targetQuota;
+ return this;
+ }
+
+ public OptimizingConfig setMinPlanInterval(long minPlanInterval) {
+ this.minPlanInterval = minPlanInterval;
+ return this;
+ }
+
+ public long getMinPlanInterval() {
+ return minPlanInterval;
+ }
+
+ public String getOptimizerGroup() {
+ return optimizerGroup;
+ }
+
+ public OptimizingConfig setOptimizerGroup(String optimizerGroup) {
+ this.optimizerGroup = optimizerGroup;
+ return this;
+ }
+
+ public int getMaxExecuteRetryCount() {
+ return maxExecuteRetryCount;
+ }
+
+ public OptimizingConfig setMaxExecuteRetryCount(int maxExecuteRetryCount) {
+ this.maxExecuteRetryCount = maxExecuteRetryCount;
+ return this;
+ }
+
+ public long getTargetSize() {
+ return targetSize;
+ }
+
+ public OptimizingConfig setTargetSize(long targetSize) {
+ this.targetSize = targetSize;
+ return this;
+ }
+
+ public long getMaxTaskSize() {
+ return maxTaskSize;
+ }
+
+ public OptimizingConfig setMaxTaskSize(long maxTaskSize) {
+ this.maxTaskSize = maxTaskSize;
+ return this;
+ }
+
+ public int getMaxFileCount() {
+ return maxFileCount;
+ }
+
+ public OptimizingConfig setMaxFileCount(int maxFileCount) {
+ this.maxFileCount = maxFileCount;
+ return this;
+ }
+
+ public long getOpenFileCost() {
+ return openFileCost;
+ }
+
+ public OptimizingConfig setOpenFileCost(long openFileCost) {
+ this.openFileCost = openFileCost;
+ return this;
+ }
+
+ public int getFragmentRatio() {
+ return fragmentRatio;
+ }
+
+ public double getMinTargetSizeRatio() {
+ return minTargetSizeRatio;
+ }
+
+ public long maxFragmentSize() {
+ return targetSize / fragmentRatio;
+ }
+
+ public long maxDuplicateSize() {
+ return (long) (maxFragmentSize() * majorDuplicateRatio);
+ }
+
+ public OptimizingConfig setFragmentRatio(int fragmentRatio) {
+ this.fragmentRatio = fragmentRatio;
+ return this;
+ }
+
+ public OptimizingConfig setMinTargetSizeRatio(double minTargetSizeRatio) {
+ this.minTargetSizeRatio = minTargetSizeRatio;
+ return this;
+ }
+
+ public int getMinorLeastFileCount() {
+ return minorLeastFileCount;
+ }
+
+ public OptimizingConfig setMinorLeastFileCount(int minorLeastFileCount) {
+ this.minorLeastFileCount = minorLeastFileCount;
+ return this;
+ }
+
+ public int getMinorLeastInterval() {
+ return minorLeastInterval;
+ }
+
+ public OptimizingConfig setMinorLeastInterval(int minorLeastInterval) {
+ this.minorLeastInterval = minorLeastInterval;
+ return this;
+ }
+
+ public double getMajorDuplicateRatio() {
+ return majorDuplicateRatio;
+ }
+
+ public OptimizingConfig setMajorDuplicateRatio(double majorDuplicateRatio) {
+ this.majorDuplicateRatio = majorDuplicateRatio;
+ return this;
+ }
+
+ public int getFullTriggerInterval() {
+ return fullTriggerInterval;
+ }
+
+ public OptimizingConfig setFullTriggerInterval(int fullTriggerInterval) {
+ this.fullTriggerInterval = fullTriggerInterval;
+ return this;
+ }
+
+ public boolean isFullRewriteAllFiles() {
+ return fullRewriteAllFiles;
+ }
+
+ public OptimizingConfig setFullRewriteAllFiles(boolean fullRewriteAllFiles) {
+ this.fullRewriteAllFiles = fullRewriteAllFiles;
+ return this;
+ }
+
+ public int getBaseHashBucket() {
+ return baseHashBucket;
+ }
+
+ public OptimizingConfig setBaseHashBucket(int baseHashBucket) {
+ this.baseHashBucket = baseHashBucket;
+ return this;
+ }
+
+ public long getBaseRefreshInterval() {
+ return baseRefreshInterval;
+ }
+
+ public OptimizingConfig setBaseRefreshInterval(long baseRefreshInterval) {
+ this.baseRefreshInterval = baseRefreshInterval;
+ return this;
+ }
+
+ public long getHiveRefreshInterval() {
+ return hiveRefreshInterval;
+ }
+
+ public OptimizingConfig setHiveRefreshInterval(long hiveRefreshInterval) {
+ this.hiveRefreshInterval = hiveRefreshInterval;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OptimizingConfig that = (OptimizingConfig) o;
+ return enabled == that.enabled
+ && Double.compare(that.targetQuota, targetQuota) == 0
+ && maxExecuteRetryCount == that.maxExecuteRetryCount
+ && maxCommitRetryCount == that.maxCommitRetryCount
+ && targetSize == that.targetSize
+ && maxTaskSize == that.maxTaskSize
+ && maxFileCount == that.maxFileCount
+ && openFileCost == that.openFileCost
+ && fragmentRatio == that.fragmentRatio
+ && minorLeastFileCount == that.minorLeastFileCount
+ && minorLeastInterval == that.minorLeastInterval
+ && Double.compare(that.majorDuplicateRatio, majorDuplicateRatio) == 0
+ && fullTriggerInterval == that.fullTriggerInterval
+ && fullRewriteAllFiles == that.fullRewriteAllFiles
+ && baseHashBucket == that.baseHashBucket
+ && baseRefreshInterval == that.baseRefreshInterval
+ && hiveRefreshInterval == that.hiveRefreshInterval
+ && Objects.equal(optimizerGroup, that.optimizerGroup);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ enabled,
+ targetQuota,
+ optimizerGroup,
+ maxExecuteRetryCount,
+ maxCommitRetryCount,
+ targetSize,
+ maxTaskSize,
+ maxFileCount,
+ openFileCost,
+ fragmentRatio,
+ minorLeastFileCount,
+ minorLeastInterval,
+ majorDuplicateRatio,
+ fullTriggerInterval,
+ fullRewriteAllFiles,
+ baseHashBucket,
+ baseRefreshInterval,
+ hiveRefreshInterval);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("enabled", enabled)
+ .add("targetQuota", targetQuota)
+ .add("optimizerGroup", optimizerGroup)
+ .add("maxExecuteRetryCount", maxExecuteRetryCount)
+ .add("maxCommitRetryCount", maxCommitRetryCount)
+ .add("targetSize", targetSize)
+ .add("maxTaskSize", maxTaskSize)
+ .add("maxFileCount", maxFileCount)
+ .add("openFileCost", openFileCost)
+ .add("fragmentRatio", fragmentRatio)
+ .add("minorLeastFileCount", minorLeastFileCount)
+ .add("minorLeastInterval", minorLeastInterval)
+ .add("majorDuplicateRatio", majorDuplicateRatio)
+ .add("fullTriggerInterval", fullTriggerInterval)
+ .add("fullRewriteAllFiles", fullRewriteAllFiles)
+ .add("baseHashBucket", baseHashBucket)
+ .add("baseRefreshInterval", baseRefreshInterval)
+ .add("hiveRefreshInterval", hiveRefreshInterval)
+ .toString();
+ }
+
+ public long getRefreshMinInterval() {
+ return Math.min(baseRefreshInterval, hiveRefreshInterval);
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/config/TableConfiguration.java b/ams/api/src/main/java/com/netease/arctic/ams/api/config/TableConfiguration.java
new file mode 100644
index 0000000000..785ed798c0
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/config/TableConfiguration.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.config;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.netease.arctic.ams.api.Action;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Configuration for a table, containing {@link OptimizingConfig}, {@link DataExpirationConfig}, and
+ * {@link TagConfiguration}.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TableConfiguration {
+
+ // The maximum retry count for executing process.
+ private int maxExecuteRetryCount;
+ // Whether to expire snapshots.
+ private boolean expireOperationEnabled;
+ // The time to live for snapshots.
+ private long snapshotTTLMinutes;
+ // The time to live for change store data.
+ private long changeDataTTLMinutes;
+ // Whether to clean orphaned files.
+ private boolean cleanOrphanEnabled;
+ // The time to live for orphaned files.
+ private long orphanExistingMinutes;
+ // Whether to delete dangling delete files.
+ private boolean deleteDanglingDeleteFilesEnabled;
+ // The optimizing configuration.
+ private OptimizingConfig optimizingConfig;
+ // The data expiration configuration.
+ private DataExpirationConfig expiringDataConfig;
+ // The tag configuration.
+ private TagConfiguration tagConfiguration;
+
+ public TableConfiguration() {}
+
+ public Map getActionMinIntervals(Set actions) {
+ Map minIntervals = new HashMap<>();
+ if (actions.contains(Action.REFRESH_METADATA)) {
+ minIntervals.put(Action.REFRESH_METADATA, optimizingConfig.getRefreshMinInterval());
+ } else if (actions.contains(Action.EXPIRE_DATA) && isExpireOperationEnabled()) {
+ minIntervals.put(Action.EXPIRE_DATA, getSnapshotTTLMinutes() * 60 * 1000);
+ } else if (actions.contains(Action.DELETE_ORPHAN_FILES)) {
+ minIntervals.put(Action.DELETE_ORPHAN_FILES, getOrphanExistingMinutes() * 60 * 1000);
+ }
+ return minIntervals;
+ }
+
+ /**
+ * Get the maximum retry count for executing process.
+ *
+ * @return the maximum retry count
+ */
+ public int getMaxExecuteRetryCount() {
+ return maxExecuteRetryCount;
+ }
+
+ public boolean isExpireOperationEnabled() {
+ return expireOperationEnabled;
+ }
+
+ public long getSnapshotTTLMinutes() {
+ return snapshotTTLMinutes;
+ }
+
+ public long getChangeDataTTLMinutes() {
+ return changeDataTTLMinutes;
+ }
+
+ public boolean isCleanOrphanEnabled() {
+ return cleanOrphanEnabled;
+ }
+
+ public long getOrphanExistingMinutes() {
+ return orphanExistingMinutes;
+ }
+
+ public OptimizingConfig getOptimizingConfig() {
+ return optimizingConfig;
+ }
+
+ public TableConfiguration setMaxExecuteRetryCount(int maxExecuteRetryCount) {
+ this.maxExecuteRetryCount = maxExecuteRetryCount;
+ return this;
+ }
+
+ public TableConfiguration setOptimizingConfig(OptimizingConfig optimizingConfig) {
+ this.optimizingConfig = optimizingConfig;
+ return this;
+ }
+
+ public TableConfiguration setExpireOperationEnabled(boolean expireOperationEnabled) {
+ this.expireOperationEnabled = expireOperationEnabled;
+ return this;
+ }
+
+ public TableConfiguration setSnapshotTTLMinutes(long snapshotTTLMinutes) {
+ this.snapshotTTLMinutes = snapshotTTLMinutes;
+ return this;
+ }
+
+ public TableConfiguration setChangeDataTTLMinutes(long changeDataTTLMinutes) {
+ this.changeDataTTLMinutes = changeDataTTLMinutes;
+ return this;
+ }
+
+ public TableConfiguration setCleanOrphanEnabled(boolean cleanOrphanEnabled) {
+ this.cleanOrphanEnabled = cleanOrphanEnabled;
+ return this;
+ }
+
+ public TableConfiguration setOrphanExistingMinutes(long orphanExistingMinutes) {
+ this.orphanExistingMinutes = orphanExistingMinutes;
+ return this;
+ }
+
+ public boolean isDeleteDanglingDeleteFilesEnabled() {
+ return deleteDanglingDeleteFilesEnabled;
+ }
+
+ public TableConfiguration setDeleteDanglingDeleteFilesEnabled(
+ boolean deleteDanglingDeleteFilesEnabled) {
+ this.deleteDanglingDeleteFilesEnabled = deleteDanglingDeleteFilesEnabled;
+ return this;
+ }
+
+ public DataExpirationConfig getExpiringDataConfig() {
+ return Optional.ofNullable(expiringDataConfig).orElse(new DataExpirationConfig());
+ }
+
+ public TableConfiguration setExpiringDataConfig(DataExpirationConfig expiringDataConfig) {
+ this.expiringDataConfig = expiringDataConfig;
+ return this;
+ }
+
+ public TagConfiguration getTagConfiguration() {
+ return Optional.ofNullable(tagConfiguration).orElse(new TagConfiguration());
+ }
+
+ public TableConfiguration setTagConfiguration(TagConfiguration tagConfiguration) {
+ this.tagConfiguration = tagConfiguration;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TableConfiguration that = (TableConfiguration) o;
+ return expireOperationEnabled == that.expireOperationEnabled
+ && snapshotTTLMinutes == that.snapshotTTLMinutes
+ && changeDataTTLMinutes == that.changeDataTTLMinutes
+ && cleanOrphanEnabled == that.cleanOrphanEnabled
+ && orphanExistingMinutes == that.orphanExistingMinutes
+ && deleteDanglingDeleteFilesEnabled == that.deleteDanglingDeleteFilesEnabled
+ && Objects.equal(optimizingConfig, that.optimizingConfig)
+ && Objects.equal(expiringDataConfig, that.expiringDataConfig)
+ && Objects.equal(tagConfiguration, that.tagConfiguration);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ expireOperationEnabled,
+ snapshotTTLMinutes,
+ changeDataTTLMinutes,
+ cleanOrphanEnabled,
+ orphanExistingMinutes,
+ deleteDanglingDeleteFilesEnabled,
+ optimizingConfig,
+ expiringDataConfig,
+ tagConfiguration);
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/config/TagConfiguration.java b/ams/api/src/main/java/com/netease/arctic/ams/api/config/TagConfiguration.java
new file mode 100644
index 0000000000..1c53952ea4
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/config/TagConfiguration.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.config;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+
+/** Configuration for auto creating tags. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TagConfiguration {
+ // tag.auto-create.enabled
+ private boolean autoCreateTag = false;
+ // tag.auto-create.daily.tag-format
+ private String tagFormat;
+ // tag.auto-create.trigger.period
+ private Period triggerPeriod;
+ // tag.auto-create.trigger.offset.minutes
+ private int triggerOffsetMinutes;
+ // tag.auto-create.trigger.max-delay.minutes
+ private int maxDelayMinutes;
+
+ /** The interval for periodically triggering creating tags */
+ public enum Period {
+ DAILY("daily") {
+ @Override
+ public long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
+ LocalTime offsetTime = LocalTime.ofSecondOfDay(triggerOffsetMinutes * 60L);
+ LocalDateTime triggerTime = LocalDateTime.of(checkTime.toLocalDate(), offsetTime);
+ return triggerTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+ }
+ };
+
+ private final String propertyName;
+
+ Period(String propertyName) {
+ this.propertyName = propertyName;
+ }
+
+ public String propertyName() {
+ return propertyName;
+ }
+
+ /**
+ * Obtain the trigger time for creating a tag, which is the idea time of the last tag before the
+ * check time.
+ *
+ *
For example, when creating a daily tag, the check time is 2022-08-08 11:00:00 and the
+ * offset is set to be 5 min, the idea trigger time is 2022-08-08 00:05:00.
+ */
+ public abstract long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes);
+ }
+
+ public boolean isAutoCreateTag() {
+ return autoCreateTag;
+ }
+
+ public void setAutoCreateTag(boolean autoCreateTag) {
+ this.autoCreateTag = autoCreateTag;
+ }
+
+ public String getTagFormat() {
+ return tagFormat;
+ }
+
+ public void setTagFormat(String tagFormat) {
+ this.tagFormat = tagFormat;
+ }
+
+ public Period getTriggerPeriod() {
+ return triggerPeriod;
+ }
+
+ public void setTriggerPeriod(Period triggerPeriod) {
+ this.triggerPeriod = triggerPeriod;
+ }
+
+ public int getTriggerOffsetMinutes() {
+ return triggerOffsetMinutes;
+ }
+
+ public void setTriggerOffsetMinutes(int triggerOffsetMinutes) {
+ this.triggerOffsetMinutes = triggerOffsetMinutes;
+ }
+
+ public int getMaxDelayMinutes() {
+ return maxDelayMinutes;
+ }
+
+ public void setMaxDelayMinutes(int maxDelayMinutes) {
+ this.maxDelayMinutes = maxDelayMinutes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TagConfiguration that = (TagConfiguration) o;
+ return autoCreateTag == that.autoCreateTag
+ && triggerOffsetMinutes == that.triggerOffsetMinutes
+ && maxDelayMinutes == that.maxDelayMinutes
+ && Objects.equal(tagFormat, that.tagFormat)
+ && triggerPeriod == that.triggerPeriod;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ autoCreateTag, tagFormat, triggerPeriod, triggerOffsetMinutes, maxDelayMinutes);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("autoCreateTag", autoCreateTag)
+ .add("tagFormat", tagFormat)
+ .add("triggerPeriod", triggerPeriod)
+ .add("triggerOffsetMinutes", triggerOffsetMinutes)
+ .add("maxDelayMinutes", maxDelayMinutes)
+ .toString();
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/process/AmoroProcess.java b/ams/api/src/main/java/com/netease/arctic/ams/api/process/AmoroProcess.java
new file mode 100644
index 0000000000..ffba73a32c
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/process/AmoroProcess.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.process;
+
+import com.netease.arctic.ams.api.Action;
+
+import java.util.Map;
+
+/**
+ * AmoroProcess is a process the whole lifecycle of which is managed by Amoro. AmoroProcess is
+ * submitted by user or system and handled by Amoro. AmoroProcess should be related to one single
+ * {@link Action}, which could be minor optimizing, major optimizing, external optimizing, metadata
+ * refreshing, snapshots expiring, orphaned files cleaning or hive commit sync.
+ *
+ * @param the state type of the process
+ */
+public interface AmoroProcess {
+
+ /**
+ * Submit the process to Amoro. The process will be handled by Amoro after submitted. If the
+ * process is already submitted, this method will do nothing. For external optimizing, the process
+ * will be submitted to external resources like Yarn.
+ */
+ void submit();
+
+ /**
+ * return submit future of the process. This method always returns the same future object even if
+ * submit() has not been called
+ *
+ * @return submit future of the process
+ */
+ SimpleFuture getSubmitFuture();
+
+ /**
+ * return complete future of the process. This method always returns the same future object even
+ * if submit() has not been called
+ *
+ * @return complete future of the process
+ */
+ SimpleFuture getCompleteFuture();
+
+ /**
+ * Cancel and close this process, related resources will be released. This method will block until
+ * getStatus() return CLOSED, but related resource could be released later.
+ */
+ void close();
+
+ /**
+ * Get {@link ProcessState} of the process
+ *
+ * @return the state of the process
+ */
+ T getState();
+
+ /**
+ * Get the string encoded summary of the process, this could be a simple description or a POJO
+ * encoded by JSON
+ *
+ * @return the summary of the process
+ */
+ default Map getSummary() {
+ return getState().getSummary();
+ }
+
+ /**
+ * Get {@link ProcessStatus} of the process
+ *
+ * @return the status of the process
+ */
+ default ProcessStatus getStatus() {
+ return getState().getStatus();
+ }
+
+ /**
+ * Check if the process is closed
+ *
+ * @return true if the process is closed, false otherwise
+ */
+ default boolean isClosed() {
+ return getStatus() == ProcessStatus.CLOSED;
+ }
+
+ /**
+ * Get the id of the process
+ *
+ * @return the id of the process
+ */
+ default long getId() {
+ return getState().getId();
+ }
+
+ /**
+ * Get the {@link Action} of the process
+ *
+ * @return the action of the process
+ */
+ default Action getAction() {
+ return getState().getAction();
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/process/OptimizingStage.java b/ams/api/src/main/java/com/netease/arctic/ams/api/process/OptimizingStage.java
new file mode 100644
index 0000000000..475e318f36
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/process/OptimizingStage.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.process;
+
+/** The stage of the optimizing process. */
+public enum OptimizingStage {
+
+ /** Full optimizing executing phase */
+ FULL_OPTIMIZING("full", true),
+
+ /** Major optimizing executing phase */
+ MAJOR_OPTIMIZING("major", true),
+
+ /** Minor optimizing executing phase */
+ MINOR_OPTIMIZING("minor", true),
+
+ /** Committing phase of optimizing */
+ COMMITTING("committing", true),
+
+ /** Planning phase of optimizing */
+ PLANNING("planning", false),
+
+ /** When input data has been collected but waiting for quota available(not scheduled yet) */
+ PENDING("pending", false),
+
+ /** When waiting for input data */
+ IDLE("idle", false),
+
+ /** When the process has been scheduled but being waiting for quota available */
+ SUSPENDING("suspending", false),
+
+ /** Mainly for external process submitting to external resources */
+ SUBMITTING("submitting", false);
+
+ /** The display description of the stage. */
+ private final String displayValue;
+
+ /*
+ * Whether the stage is an optimizing executing stage.
+ */
+ private final boolean isOptimizing;
+
+ OptimizingStage(String displayValue, boolean isProcessing) {
+ this.displayValue = displayValue;
+ this.isOptimizing = isProcessing;
+ }
+
+ public boolean isOptimizing() {
+ return isOptimizing;
+ }
+
+ public String displayValue() {
+ return displayValue;
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/process/OptimizingState.java b/ams/api/src/main/java/com/netease/arctic/ams/api/process/OptimizingState.java
new file mode 100644
index 0000000000..597944748f
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/process/OptimizingState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.process;
+
+import com.netease.arctic.ams.api.Action;
+import com.netease.arctic.ams.api.ServerTableIdentifier;
+import com.netease.arctic.ams.api.StateField;
+
+/** The state of the optimizing process. */
+public abstract class OptimizingState extends TableProcessState {
+
+ @StateField private volatile long targetSnapshotId;
+ @StateField private volatile long watermark;
+ @StateField private volatile OptimizingStage stage;
+ @StateField private volatile long currentStageStartTime;
+
+ public OptimizingState(Action action, ServerTableIdentifier tableIdentifier) {
+ super(action, tableIdentifier);
+ }
+
+ public OptimizingState(long id, Action action, ServerTableIdentifier tableIdentifier) {
+ super(id, action, tableIdentifier);
+ }
+
+ protected void setStage(OptimizingStage stage) {
+ this.stage = stage;
+ this.currentStageStartTime = System.currentTimeMillis();
+ }
+
+ protected void setStage(OptimizingStage stage, long stageStartTime) {
+ this.stage = stage;
+ this.currentStageStartTime = stageStartTime;
+ }
+
+ protected void setTargetSnapshotId(long targetSnapshotId) {
+ this.targetSnapshotId = targetSnapshotId;
+ }
+
+ protected void setWatermark(long watermark) {
+ this.watermark = watermark;
+ }
+
+ public long getWatermark() {
+ return watermark;
+ }
+
+ public OptimizingStage getStage() {
+ return stage;
+ }
+
+ public long getTargetSnapshotId() {
+ return targetSnapshotId;
+ }
+
+ public long getCurrentStageStartTime() {
+ return currentStageStartTime;
+ }
+
+ @Override
+ public String getName() {
+ return stage.displayValue();
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/process/PendingInput.java b/ams/api/src/main/java/com/netease/arctic/ams/api/process/PendingInput.java
new file mode 100644
index 0000000000..20ab33fea9
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/process/PendingInput.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.process;
+
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.util.Set;
+
+/** Pending input for optimizing processes. */
+public class PendingInput {
+
+ private final Set partitions = Sets.newHashSet();
+
+ private int dataFileCount;
+ private long dataFileSize;
+ private int equalityDeleteFileCount;
+ private long equalityDeleteFileSize;
+ private int positionalDeleteFileCount;
+ private long positionalDeleteFileSize;
+ private long currentSnapshotId;
+ private long currentChangeSnapshotId;
+ private long currentWatermark;
+ private long currentChangeWatermark;
+ private boolean needMinorOptimizing;
+ private boolean needMajorOptimizing;
+
+ public PendingInput() {}
+
+ public PendingInput(
+ Set partitions,
+ int dataFileCount,
+ long dataFileSize,
+ int equalityDeleteFileCount,
+ int positionalDeleteFileCount,
+ long positionalDeleteFileSize,
+ long equalityDeleteFileSize,
+ long currentSnapshotId,
+ long currentChangeSnapshotId,
+ long currentWatermark,
+ long currentChangeWatermark,
+ boolean needMinorOptimizing,
+ boolean needMajorOptimizing) {
+ this.partitions.addAll(partitions);
+ this.dataFileCount = dataFileCount;
+ this.dataFileSize = dataFileSize;
+ this.equalityDeleteFileCount = equalityDeleteFileCount;
+ this.positionalDeleteFileCount = positionalDeleteFileCount;
+ this.positionalDeleteFileSize = positionalDeleteFileSize;
+ this.equalityDeleteFileSize = equalityDeleteFileSize;
+ this.currentSnapshotId = currentSnapshotId;
+ this.currentChangeSnapshotId = currentChangeSnapshotId;
+ this.currentWatermark = currentWatermark;
+ this.currentChangeWatermark = currentChangeWatermark;
+ this.needMinorOptimizing = needMinorOptimizing;
+ this.needMajorOptimizing = needMajorOptimizing;
+ }
+
+ public int getInputFileCount() {
+ return dataFileCount + equalityDeleteFileCount + positionalDeleteFileCount;
+ }
+
+ public long getInputFileSize() {
+ return dataFileSize + equalityDeleteFileSize + positionalDeleteFileSize;
+ }
+
+ public Set getPartitions() {
+ return partitions;
+ }
+
+ public int getDataFileCount() {
+ return dataFileCount;
+ }
+
+ public long getDataFileSize() {
+ return dataFileSize;
+ }
+
+ public int getEqualityDeleteFileCount() {
+ return equalityDeleteFileCount;
+ }
+
+ public int getPositionalDeleteFileCount() {
+ return positionalDeleteFileCount;
+ }
+
+ public long getPositionalDeleteFileSize() {
+ return positionalDeleteFileSize;
+ }
+
+ public long getEqualityDeleteFileSize() {
+ return equalityDeleteFileSize;
+ }
+
+ public long getCurrentSnapshotId() {
+ return currentSnapshotId;
+ }
+
+ public long getCurrentChangeSnapshotId() {
+ return currentChangeSnapshotId;
+ }
+
+ public boolean needMinorOptimizing() {
+ return needMinorOptimizing;
+ }
+
+ public boolean needMajorOptimizing() {
+ return needMajorOptimizing;
+ }
+
+ public int getFragmentFileCount() {
+ return 0;
+ }
+
+ public long getFragmentFileSize() {
+ return 0;
+ }
+
+ public int getSegmentFileCount() {
+ return 0;
+ }
+
+ public long getSegmentFileSize() {
+ return 0;
+ }
+
+ public long getWatermark() {
+ return currentWatermark;
+ }
+
+ public long getChangeWatermark() {
+ return currentChangeWatermark;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("partitions", partitions)
+ .add("dataFileCount", dataFileCount)
+ .add("dataFileSize", dataFileSize)
+ .add("equalityDeleteFileCount", equalityDeleteFileCount)
+ .add("positionalDeleteFileCount", positionalDeleteFileCount)
+ .add("positionalDeleteBytes", positionalDeleteFileSize)
+ .add("equalityDeleteBytes", equalityDeleteFileSize)
+ .toString();
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/process/ProcessFactory.java b/ams/api/src/main/java/com/netease/arctic/ams/api/process/ProcessFactory.java
new file mode 100644
index 0000000000..52628e255e
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/process/ProcessFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.process;
+
+import com.netease.arctic.ams.api.Action;
+import com.netease.arctic.ams.api.TableRuntime;
+
+/**
+ * A factory to create a process. Normally, There will be default ProcessFactories for each action
+ * and used by default scheduler. Meanwhile, user could extend external ProcessFactory to run jobs
+ * on external resources like Yarn.
+ */
+public interface ProcessFactory {
+
+ /**
+ * Create a process for the action.
+ *
+ * @param tableRuntime table runtime
+ * @param action action type
+ * @return target process which has not been submitted yet.
+ */
+ AmoroProcess create(TableRuntime tableRuntime, Action action);
+
+ /**
+ * Recover a process for the action from a state.
+ *
+ * @param tableRuntime table runtime
+ * @param state state of the process
+ * @return target process which has not been submitted yet.
+ */
+ AmoroProcess recover(TableRuntime tableRuntime, T state);
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/process/ProcessState.java b/ams/api/src/main/java/com/netease/arctic/ams/api/process/ProcessState.java
new file mode 100644
index 0000000000..e70c09dca5
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/process/ProcessState.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.process;
+
+import com.netease.arctic.ams.api.Action;
+
+import java.util.Map;
+
+/**
+ * ProcessState contains information in any {@link AmoroProcess} which must be persistent and {@link
+ * ProcessFactory} will use to recover {@link AmoroProcess}.
+ */
+public interface ProcessState {
+
+ /** @return unique identifier of the process. */
+ long getId();
+
+ /**
+ * @return the name of the state. If multiple stages are involved, it should be the name of the
+ * current stage.
+ */
+ String getName();
+
+ /** @return start time of the process. */
+ long getStartTime();
+
+ /** @return the action of the process. */
+ Action getAction();
+
+ /** @return the status of the process. */
+ ProcessStatus getStatus();
+
+ /**
+ * Get the string encoded summary of the process, this could be a simple description or a POJO
+ * encoded by JSON
+ *
+ * @return the summary of the process
+ */
+ Map getSummary();
+
+ /** @return the reason of process failure, null if the process has not failed yet. */
+ String getFailedReason();
+
+ /**
+ * Total millisecond running time of all tasks in the process.
+ *
+ * @return actual quota runtime of the process.
+ */
+ long getQuotaRuntime();
+
+ /**
+ * Quota value is calculated by the total millisecond running time of all tasks in the process
+ * divided by the total millisecond from the start time to the current time. It is used to
+ * evaluate the actual runtime concurrence of the process.
+ *
+ * @return the quota value of the process.
+ */
+ default double getQuotaValue() {
+ return (double) getQuotaRuntime() / (System.currentTimeMillis() - getStartTime());
+ }
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/process/ProcessStatus.java b/ams/api/src/main/java/com/netease/arctic/ams/api/process/ProcessStatus.java
new file mode 100644
index 0000000000..48d38a3e32
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/process/ProcessStatus.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.process;
+
+/**
+ * Status of any {@link AmoroProcess}. Only UNKNOWN, RUNNING, FINISHED(SUCCESS, CLOSED, FAILED) are
+ * necessary Stage classes are used to define multiple phases of one process such as OptimizingStage
+ */
+public enum ProcessStatus {
+ UNKNOWN,
+
+ /** This status containing scheduled and running phases */
+ ACTIVE,
+ SUCCESS,
+ CLOSED,
+ FAILED
+}
diff --git a/ams/api/src/main/java/com/netease/arctic/ams/api/process/SimpleFuture.java b/ams/api/src/main/java/com/netease/arctic/ams/api/process/SimpleFuture.java
new file mode 100644
index 0000000000..1e7bfdab15
--- /dev/null
+++ b/ams/api/src/main/java/com/netease/arctic/ams/api/process/SimpleFuture.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.ams.api.process;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+
+/** A simple wrapper of CompletableFuture for better code readability. */
+public class SimpleFuture {
+
+ protected CompletableFuture