Skip to content

Commit

Permalink
Merge pull request #73 from arenadata/feature/ADH-4648
Browse files Browse the repository at this point in the history
[ADH-4648] Implemented postgres partitioning for file access counts
  • Loading branch information
iamlapa authored Aug 14, 2024
2 parents ab3c7d9 + 8818a27 commit b15fa28
Show file tree
Hide file tree
Showing 94 changed files with 1,673 additions and 4,096 deletions.
72 changes: 15 additions & 57 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,6 @@
</description>
</property>

<property>
<name>smart.metastore.mysql.legacy.enabled</name>
<value>false</value>
<description>
Whether to enable support for versions of MySQL lesser than 5.7.
Warning: it requires admin privileges for ssm user to be able to configure the db.
</description>
</property>

<property>
<name>smart.ignore.path.templates</name>
<value></value>
Expand All @@ -393,45 +384,7 @@
</property>

<property>
<name>smart.access.count.day.tables.num</name>
<value>30</value>
<description>
The max number of access count per day tables in the Metastore.
</description>
</property>

<property>
<name>smart.access.count.hour.tables.num</name>
<value>48</value>
<description>
The max number of access count per hour tables in the Metastore.
It should be at least 24 to cover the full day to be successfully
aggregated into a per day access count table.
</description>
</property>

<property>
<name>smart.access.count.minute.tables.num</name>
<value>120</value>
<description>
The max number of access count per minute tables in the Metastore.
It should be at least 60 to cover the full hour to be successfully
aggregated into a per hour access count table.
</description>
</property>

<property>
<name>smart.access.count.second.tables.num</name>
<value>30</value>
<description>
The max number of access count per second tables in the Metastore.
It should be at least 60000/'smart.access.count.aggregation.interval.ms'
to cover the full minute to be successfully aggregated into a per minute access count table.
</description>
</property>

<property>
<name>smart.access.event.fetch.interval.ms</name>
<name>smart.file.access.event.fetch.interval.ms</name>
<value>1000</value>
<description>
The interval in milliseconds between access event fetches.
Expand Down Expand Up @@ -479,7 +432,7 @@
</property>

<property>
<name>smart.access.count.aggregation.interval.ms</name>
<name>smart.file.access.count.aggregation.interval.ms</name>
<value>5000</value>
<description>
The interval in milliseconds that is covered by single second-granularity access count table.
Expand Down Expand Up @@ -578,17 +531,22 @@
</property>

<property>
<name>smart.access.count.aggregator.failover</name>
<value>SUBMIT_NEW_FAILED_EVENTS_LATER</value>
<name>smart.file.access.count.aggregator.failover.retry.count</name>
<value>60</value>
<description>
Maximum number of attempts to save file access events
</description>
</property>

<property>
<name>smart.file.access.count.aggregator.failover</name>
<value>SUBMIT_FAILED_EVENTS_WITH_RETRY</value>
<description>
Failover strategy for file access events aggregator. Possible values:
DROP_EVENTS - drop file access events that caused exception.
FAIL - throw exception, no failover.
SUBMIT_FAILED_EVENTS_LATER - save all file access events that caused exception
for later submission. Should be used carefully, because in case of repeated
exceptions can potentially cause OOM error.
SUBMIT_NEW_FAILED_EVENTS_LATER - save new file access events that caused exception
for later submission and drop old failing events.
SUBMIT_FAILED_EVENTS_WITH_RETRY - save all file access events that caused exception
for later submission with max attempts less or equals than smart.access.count.aggregator.failover.retry.count
</description>
</property>

</configuration>
32 changes: 15 additions & 17 deletions docs/admin-user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,21 @@ Table-4 Condition Ingredient

Table-5 Object Properties

| Object | Property | Description |
|----------|----------------------------------|---------------------------------------------------------------------------------------------|
| | age | The time span from last modification moment to now |
| | atime | The last access time |
| | blocksize | The block size of the file |
| | inCache | The file is in cache storage |
| | isDir | The file is a directory |
| | length | Length of the file. Currently, only pure digital is supported, which indicates bytes number.|
| file | path | The file path in HDFS |
| | mtime | The last modification time of the file |
| | unsynced | The file is not synced |
| | storagePolicy | Storage policy of file |
| | accessCount(Time Interval) | The access counts during the last time interval |
| | accessCountTop(interval,N ) | The topmost N for access counts during the last time interval |
| | accessCountBottom(interval,N) | The bottommost N for access counts during the last time interval |
| | accessCountTopOnStoragePolicy(interval,N,$StoragePolicy") | The topmost N for access counts with regard to a storage policy.The supported HDFS storage policies are COLD,WARM,HOT,ONE_SSD,ALL_SSD,LAZY_PERSIST |
| | accessCountBottomOnStoragePolicy(interval,N,$StoragePolicy") | The bottommost N for access counts with regard to a storage policy during the last time interval |
| Object | Property | Description |
|----------|--------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
| | age | The time span from last modification moment to now |
| | atime | The last access time |
| | blocksize | The block size of the file |
| | inCache | The file is in cache storage |
| | isDir | The file is a directory |
| | length | Length of the file. Currently, only pure digital is supported, which indicates bytes number. |
| file | path | The file path in HDFS |
| | mtime | The last modification time of the file |
| | unsynced | The file is not synced |
| | storagePolicy | Storage policy of file |
| | accessCount(Time Interval) | The access counts during the last time interval |
| | accessCountTop(interval,N ) | The topmost N for access counts during the last time interval |
| | accessCountBottom(interval,N) | The bottommost N for access counts during the last time interval |

Table-6 Commands

Expand Down
48 changes: 17 additions & 31 deletions smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,41 +79,22 @@ public class SmartConfKeys {
// Password which get from hadoop credentialProvider used for metastore connect
public static final String SMART_METASTORE_PASSWORD = "smart.metastore.password";

public static final String SMART_METASTORE_MIGRATION_CHANGELOG_PATH_KEY =
"smart.metastore.migration.liquibase.changelog.path";
public static final String SMART_METASTORE_MIGRATION_CHANGELOG_PATH_DEFAULT =
"db/changelog/changelog-root.xml";
public static final String SMART_METASTORE_LEGACY_MYSQL_SUPPORT_KEY =
"smart.metastore.mysql.legacy.enabled";
public static final boolean SMART_METASTORE_LEGACY_MYSQL_SUPPORT_DEFAULT = false;

public static final String SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS =
"smart.access.count.aggregation.interval.ms";
public static final int SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT = 5000;
public static final String SMART_METASTORE_MIGRATION_CHANGELOG_PATH_KEY =
"smart.metastore.migration.liquibase.changelog.path";
public static final String SMART_METASTORE_MIGRATION_CHANGELOG_PATH_DEFAULT =
"db/changelog/changelog-root.xml";

public static final String SMART_ACCESS_COUNT_AGGREGATOR_FAILOVER_KEY =
"smart.access.count.aggregator.failover";
"smart.file.access.count.aggregator.failover";
public static final String SMART_ACCESS_COUNT_AGGREGATOR_FAILOVER_MAX_RETRIES_KEY =
"smart.file.access.count.aggregator.failover.retry.count";
public static final int SMART_ACCESS_COUNT_AGGREGATOR_FAILOVER_MAX_RETRIES_DEFAULT = 60;

public static final String SMART_NUM_DAY_TABLES_TO_KEEP_KEY =
"smart.access.count.day.tables.num";
public static final int SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT = 30;

public static final String SMART_NUM_HOUR_TABLES_TO_KEEP_KEY =
"smart.access.count.hour.tables.num";
public static final int SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT = 48;
public static final int SMART_NUM_HOUR_TABLES_TO_KEEP_MIN = 24;

public static final String SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY =
"smart.access.count.minute.tables.num";
public static final int SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT = 120;
public static final int SMART_NUM_MINUTE_TABLES_TO_KEEP_MIN = 60;

public static final String SMART_NUM_SECOND_TABLES_TO_KEEP_KEY =
"smart.access.count.second.tables.num";
public static final int SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT = 30;
public static final String SMART_FILE_ACCESS_PARTITIONS_RETENTION_POLICY_KEY =
"smart.file.access.partition.retention.policy";

public static final String SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_KEY =
"smart.access.event.fetch.interval.ms";
"smart.file.access.event.fetch.interval.ms";
public static final long SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_DEFAULT = 1000L;

public static final String SMART_CACHED_FILE_FETCH_INTERVAL_MS_KEY =
Expand All @@ -124,7 +105,12 @@ public class SmartConfKeys {
"smart.namespace.fetch.interval.ms";
public static final long SMART_NAMESPACE_FETCH_INTERVAL_MS_DEFAULT = 1L;

// StatesManager
// File access partitions
public static final String SMART_FILE_ACCESS_PARTITIONS_RETENTION_COUNT_KEY =
"smart.file.access.partition.retention.count";
public static final int SMART_FILE_ACCESS_PARTITIONS_RETENTION_COUNT_DEFAULT = 24;

// StatesManager

// RuleManager
public static final String SMART_RULE_EXECUTORS_KEY = "smart.rule.executors";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.smartdata.model.TimeInterval;

import java.util.List;
import java.util.Set;

@Data
@Builder
Expand All @@ -34,7 +33,6 @@ public class FileAccessInfoSearchRequest {
private final List<Long> ids;
private final String pathLike;
private final TimeInterval lastAccessedTime;
private final Set<String> accessCountTables;

public static FileAccessInfoSearchRequest noFilters() {
return builder().build();
Expand Down
10 changes: 10 additions & 0 deletions smart-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,16 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
import org.smartdata.conf.ReconfigurableRegistry;
import org.smartdata.conf.ReconfigureException;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.metastore.MetaStoreException;
import org.smartdata.metastore.dao.accesscount.AccessCountTableManager;
import org.smartdata.metastore.model.AccessCountTable;
import org.smartdata.metastore.accesscount.DbAccessEventAggregator;
import org.smartdata.metastore.accesscount.FileAccessManager;
import org.smartdata.metastore.accesscount.failover.AccessCountFailoverFactory;
import org.smartdata.metastore.partition.FileAccessPartitionManagerImpl;
import org.smartdata.metastore.partition.FileAccessPartitionService;
import org.smartdata.metastore.partition.cleanup.FileAccessPartitionRetentionPolicyExecutorFactory;
import org.smartdata.metastore.transaction.TransactionRunner;
import org.smartdata.metrics.FileAccessEvent;
import org.smartdata.metrics.FileAccessEventSource;
import org.smartdata.metrics.impl.MetricsFactory;
Expand All @@ -40,19 +44,23 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static org.springframework.transaction.annotation.Isolation.SERIALIZABLE;

/**
* Polls metrics and events from NameNode.
*/
public class StatesManager extends AbstractService implements Reconfigurable {
private final ServerContext serverContext;

private ScheduledExecutorService executorService;
private AccessCountTableManager accessCountTableManager;
@Getter
private FileAccessManager fileAccessManager;
private AccessEventFetcher accessEventFetcher;
private FileAccessEventSource fileAccessEventSource;
@Getter
private CachedFilesManager cachedFilesManager;
private AbstractService statesUpdaterService;
private FileAccessPartitionService fileAccessPartitionService;
private PathChecker pathChecker;
private volatile boolean working = false;

Expand All @@ -69,18 +77,39 @@ public StatesManager(ServerContext context) {
@Override
public void init() throws IOException {
LOG.info("Initializing ...");
this.executorService = Executors.newScheduledThreadPool(4);
this.accessCountTableManager = new AccessCountTableManager(
serverContext.getMetaStore(), executorService, serverContext.getConf());
this.executorService = Executors.newScheduledThreadPool(5);
TransactionRunner transactionRunner =
new TransactionRunner(serverContext.getMetaStore().transactionManager());
transactionRunner.setIsolationLevel(SERIALIZABLE);
this.fileAccessManager = new FileAccessManager(
transactionRunner,
serverContext.getMetaStore().accessCountEventDao(),
serverContext.getMetaStore().cacheFileDao());
this.fileAccessEventSource = MetricsFactory.createAccessEventSource(serverContext.getConf());
AccessCountFailoverFactory accessCountFailoverFactory =
new AccessCountFailoverFactory(serverContext.getConf());
DbAccessEventAggregator accessEventAggregator = new DbAccessEventAggregator(
serverContext.getMetaStore().fileInfoDao(),
fileAccessManager,
accessCountFailoverFactory.create());
this.accessEventFetcher = new AccessEventFetcher(
serverContext.getConf(),
accessCountTableManager.getAccessEventAggregator(),
accessEventAggregator,
executorService,
fileAccessEventSource.getCollector());
this.pathChecker = new PathChecker(serverContext.getConf());
this.cachedFilesManager =
new CachedFilesManager(serverContext.getMetaStore().cacheFileDao());
FileAccessPartitionRetentionPolicyExecutorFactory
fileAccessPartitionRetentionPolicyExecutorFactory =
new FileAccessPartitionRetentionPolicyExecutorFactory(
serverContext.getMetaStore());
this.fileAccessPartitionService = new FileAccessPartitionService(
executorService,
new FileAccessPartitionManagerImpl(serverContext.getMetaStore()),
fileAccessPartitionRetentionPolicyExecutorFactory.createPolicyExecutor(
serverContext.getConf())
);

initStatesUpdaterService();
if (statesUpdaterService == null) {
Expand All @@ -105,6 +134,7 @@ public boolean inSafeMode() {
@Override
public void start() throws IOException {
LOG.info("Starting ...");
fileAccessPartitionService.start();
accessEventFetcher.start();
if (statesUpdaterService != null) {
statesUpdaterService.start();
Expand All @@ -118,6 +148,9 @@ public void stop() throws IOException {
working = false;
LOG.info("Stopping ...");

if (fileAccessPartitionService != null) {
fileAccessPartitionService.stop();
}
if (accessEventFetcher != null) {
accessEventFetcher.stop();
}
Expand All @@ -134,10 +167,6 @@ public void stop() throws IOException {
LOG.info("Stopped.");
}

public List<AccessCountTable> getTablesForLast(long timeInMills) throws MetaStoreException {
return accessCountTableManager.getTablesForLast(timeInMills);
}

public void reportFileAccessEvent(FileAccessEvent event) {
String path = event.getPath();
path = path + (path.endsWith("/") ? "" : "/");
Expand Down Expand Up @@ -203,8 +232,4 @@ private synchronized void initStatesUpdaterService() {
LOG.info("", t);
}
}

public AccessCountTableManager getAccessCountTableManager() {
return accessCountTableManager;
}
}
Loading

0 comments on commit b15fa28

Please sign in to comment.