Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADH-4648] Implemented postgres partitioning for file access counts #73

Merged
merged 11 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 15 additions & 57 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,6 @@
</description>
</property>

<property>
<name>smart.metastore.mysql.legacy.enabled</name>
<value>false</value>
<description>
Whether to enable support for versions of MySQL lesser than 5.7.
Warning: it requires admin privileges for ssm user to be able to configure the db.
</description>
</property>

<property>
<name>smart.ignore.path.templates</name>
<value></value>
Expand All @@ -393,45 +384,7 @@
</property>

<property>
<name>smart.access.count.day.tables.num</name>
<value>30</value>
<description>
The max number of access count per day tables in the Metastore.
</description>
</property>

<property>
<name>smart.access.count.hour.tables.num</name>
<value>48</value>
<description>
The max number of access count per hour tables in the Metastore.
It should be at least 24 to cover the full day to be successfully
aggregated into a per day access count table.
</description>
</property>

<property>
<name>smart.access.count.minute.tables.num</name>
<value>120</value>
<description>
The max number of access count per minute tables in the Metastore.
It should be at least 60 to cover the full hour to be successfully
aggregated into a per hour access count table.
</description>
</property>

<property>
<name>smart.access.count.second.tables.num</name>
<value>30</value>
<description>
The max number of access count per second tables in the Metastore.
It should be at least 60000/'smart.access.count.aggregation.interval.ms'
to cover the full minute to be successfully aggregated into a per minute access count table.
</description>
</property>

<property>
<name>smart.access.event.fetch.interval.ms</name>
<name>smart.file.access.event.fetch.interval.ms</name>
<value>1000</value>
<description>
The interval in milliseconds between access event fetches.
Expand Down Expand Up @@ -479,7 +432,7 @@
</property>

<property>
<name>smart.access.count.aggregation.interval.ms</name>
<name>smart.file.access.count.aggregation.interval.ms</name>
<value>5000</value>
<description>
The interval in milliseconds that is covered by single second-granularity access count table.
Expand Down Expand Up @@ -578,17 +531,22 @@
</property>

<property>
<name>smart.access.count.aggregator.failover</name>
<value>SUBMIT_NEW_FAILED_EVENTS_LATER</value>
<name>smart.file.access.count.aggregator.failover.retry.count</name>
<value>60</value>
<description>
Maximum number of attempts to save file access events
</description>
</property>

<property>
<name>smart.file.access.count.aggregator.failover</name>
<value>SUBMIT_FAILED_EVENTS_WITH_RETRY</value>
<description>
Failover strategy for file access events aggregator. Possible values:
DROP_EVENTS - drop file access events that caused exception.
FAIL - throw exception, no failover.
SUBMIT_FAILED_EVENTS_LATER - save all file access events that caused exception
for later submission. Should be used carefully, because in case of repeated
exceptions can potentially cause OOM error.
SUBMIT_NEW_FAILED_EVENTS_LATER - save new file access events that caused exception
for later submission and drop old failing events.
SUBMIT_FAILED_EVENTS_WITH_RETRY - save all file access events that caused exception
for later submission with max attempts less or equals than smart.access.count.aggregator.failover.retry.count
</description>
</property>

</configuration>
32 changes: 15 additions & 17 deletions docs/admin-user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,21 @@ Table-4 Condition Ingredient

Table-5 Object Properties

| Object | Property | Description |
|----------|----------------------------------|---------------------------------------------------------------------------------------------|
| | age | The time span from last modification moment to now |
| | atime | The last access time |
| | blocksize | The block size of the file |
| | inCache | The file is in cache storage |
| | isDir | The file is a directory |
| | length | Length of the file. Currently, only pure digital is supported, which indicates bytes number.|
| file | path | The file path in HDFS |
| | mtime | The last modification time of the file |
| | unsynced | The file is not synced |
| | storagePolicy | Storage policy of file |
| | accessCount(Time Interval) | The access counts during the last time interval |
| | accessCountTop(interval,N ) | The topmost N for access counts during the last time interval |
| | accessCountBottom(interval,N) | The bottommost N for access counts during the last time interval |
| | accessCountTopOnStoragePolicy(interval,N,$StoragePolicy") | The topmost N for access counts with regard to a storage policy.The supported HDFS storage policies are COLD,WARM,HOT,ONE_SSD,ALL_SSD,LAZY_PERSIST |
| | accessCountBottomOnStoragePolicy(interval,N,$StoragePolicy") | The bottommost N for access counts with regard to a storage policy during the last time interval |
| Object | Property | Description |
|----------|--------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
| | age | The time span from last modification moment to now |
| | atime | The last access time |
| | blocksize | The block size of the file |
| | inCache | The file is in cache storage |
| | isDir | The file is a directory |
| | length | Length of the file. Currently, only pure digital is supported, which indicates bytes number. |
| file | path | The file path in HDFS |
| | mtime | The last modification time of the file |
| | unsynced | The file is not synced |
| | storagePolicy | Storage policy of file |
| | accessCount(Time Interval) | The access counts during the last time interval |
| | accessCountTop(interval,N ) | The topmost N for access counts during the last time interval |
| | accessCountBottom(interval,N) | The bottommost N for access counts during the last time interval |

Table-6 Commands

Expand Down
38 changes: 12 additions & 26 deletions smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,37 +90,18 @@ public class SmartConfKeys {
"smart.metastore.migration.liquibase.changelog.path";
public static final String SMART_METASTORE_MIGRATION_CHANGELOG_PATH_DEFAULT =
"db/changelog/changelog-root.xml";
public static final String SMART_METASTORE_LEGACY_MYSQL_SUPPORT_KEY =
"smart.metastore.mysql.legacy.enabled";
public static final boolean SMART_METASTORE_LEGACY_MYSQL_SUPPORT_DEFAULT = false;

public static final String SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS =
"smart.access.count.aggregation.interval.ms";
public static final int SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT = 5000;

public static final String SMART_ACCESS_COUNT_AGGREGATOR_FAILOVER_KEY =
"smart.access.count.aggregator.failover";

public static final String SMART_NUM_DAY_TABLES_TO_KEEP_KEY =
"smart.access.count.day.tables.num";
public static final int SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT = 30;

public static final String SMART_NUM_HOUR_TABLES_TO_KEEP_KEY =
"smart.access.count.hour.tables.num";
public static final int SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT = 48;
public static final int SMART_NUM_HOUR_TABLES_TO_KEEP_MIN = 24;
"smart.file.access.count.aggregator.failover";
public static final String SMART_ACCESS_COUNT_AGGREGATOR_FAILOVER_MAX_RETRIES_KEY =
"smart.file.access.count.aggregator.failover.retry.count";
public static final int SMART_ACCESS_COUNT_AGGREGATOR_FAILOVER_MAX_RETRIES_DEFAULT = 60;

public static final String SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY =
"smart.access.count.minute.tables.num";
public static final int SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT = 120;
public static final int SMART_NUM_MINUTE_TABLES_TO_KEEP_MIN = 60;

public static final String SMART_NUM_SECOND_TABLES_TO_KEEP_KEY =
"smart.access.count.second.tables.num";
public static final int SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT = 30;
public static final String SMART_FILE_ACCESS_PARTITIONS_RETENTION_POLICY_KEY =
"smart.file.access.partition.retention.policy";

public static final String SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_KEY =
"smart.access.event.fetch.interval.ms";
"smart.file.access.event.fetch.interval.ms";
public static final long SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_DEFAULT = 1000L;

public static final String SMART_CACHED_FILE_FETCH_INTERVAL_MS_KEY =
Expand All @@ -131,6 +112,11 @@ public class SmartConfKeys {
"smart.namespace.fetch.interval.ms";
public static final long SMART_NAMESPACE_FETCH_INTERVAL_MS_DEFAULT = 1L;

// File access partitions
public static final String SMART_FILE_ACCESS_PARTITIONS_RETENTION_COUNT_KEY =
"smart.file.access.partition.retention.count";
public static final int SMART_FILE_ACCESS_PARTITIONS_RETENTION_COUNT_DEFAULT = 24;

// StatesManager

// RuleManager
Expand Down
10 changes: 10 additions & 0 deletions smart-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,16 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
import org.smartdata.conf.ReconfigureException;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.metastore.MetaStoreException;
import org.smartdata.metastore.dao.accesscount.AccessCountTableManager;
import org.smartdata.metastore.model.AccessCountTable;
import org.smartdata.metastore.accesscount.DbAccessEventAggregator;
import org.smartdata.metastore.accesscount.FileAccessManager;
import org.smartdata.metastore.accesscount.failover.AccessCountFailoverFactory;
import org.smartdata.metastore.partition.FileAccessPartitionManagerImpl;
import org.smartdata.metastore.partition.FileAccessPartitionService;
import org.smartdata.metastore.partition.cleanup.FileAccessPartitionRetentionPolicyExecutorFactory;
import org.smartdata.metastore.transaction.TransactionRunner;
import org.smartdata.metrics.FileAccessEvent;
import org.smartdata.metrics.FileAccessEventSource;
import org.smartdata.metrics.impl.MetricsFactory;
Expand All @@ -48,20 +53,23 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static org.springframework.transaction.annotation.Isolation.SERIALIZABLE;

/**
* Polls metrics and events from NameNode.
*/
public class StatesManager extends AbstractService implements Reconfigurable {
private final ServerContext serverContext;

private ScheduledExecutorService executorService;
private AccessCountTableManager accessCountTableManager;
@Getter
private FileAccessManager fileAccessManager;
private AccessEventFetcher accessEventFetcher;
private FileAccessEventSource fileAccessEventSource;
@Getter
private CachedFilesManager cachedFilesManager;
private AbstractService statesUpdaterService;

private FileAccessPartitionService fileAccessPartitionService;
private PathChecker pathChecker;
private volatile boolean working = false;
public static final Logger LOG = LoggerFactory.getLogger(StatesManager.class);
Expand All @@ -77,18 +85,39 @@ public StatesManager(ServerContext context) {
@Override
public void init() throws IOException {
LOG.info("Initializing ...");
this.executorService = Executors.newScheduledThreadPool(4);
this.accessCountTableManager = new AccessCountTableManager(
serverContext.getMetaStore(), executorService, serverContext.getConf());
this.executorService = Executors.newScheduledThreadPool(5);
TransactionRunner transactionRunner =
new TransactionRunner(serverContext.getMetaStore().transactionManager());
transactionRunner.setIsolationLevel(SERIALIZABLE);
this.fileAccessManager = new FileAccessManager(
transactionRunner,
serverContext.getMetaStore().accessCountEventDao(),
serverContext.getMetaStore().cacheFileDao());
this.fileAccessEventSource = MetricsFactory.createAccessEventSource(serverContext.getConf());
AccessCountFailoverFactory accessCountFailoverFactory =
new AccessCountFailoverFactory(serverContext.getConf());
DbAccessEventAggregator accessEventAggregator = new DbAccessEventAggregator(
serverContext.getMetaStore().fileInfoDao(),
fileAccessManager,
accessCountFailoverFactory.create());
this.accessEventFetcher = new AccessEventFetcher(
serverContext.getConf(),
accessCountTableManager.getAccessEventAggregator(),
accessEventAggregator,
executorService,
fileAccessEventSource.getCollector());
this.pathChecker = new PathChecker(serverContext.getConf());
this.cachedFilesManager =
new CachedFilesManager(serverContext.getMetaStore());
FileAccessPartitionRetentionPolicyExecutorFactory
fileAccessPartitionRetentionPolicyExecutorFactory =
new FileAccessPartitionRetentionPolicyExecutorFactory(
serverContext.getMetaStore());
this.fileAccessPartitionService = new FileAccessPartitionService(serverContext.getConf(),
executorService,
new FileAccessPartitionManagerImpl(serverContext.getMetaStore(),
fileAccessPartitionRetentionPolicyExecutorFactory.createPolicyExecutor(
serverContext.getConf()))
);

initStatesUpdaterService();
if (statesUpdaterService == null) {
Expand All @@ -113,6 +142,7 @@ public boolean inSafeMode() {
@Override
public void start() throws IOException {
LOG.info("Starting ...");
fileAccessPartitionService.start();
accessEventFetcher.start();
if (statesUpdaterService != null) {
statesUpdaterService.start();
Expand All @@ -126,6 +156,9 @@ public void stop() throws IOException {
working = false;
LOG.info("Stopping ...");

if (fileAccessPartitionService != null) {
fileAccessPartitionService.stop();
}
if (accessEventFetcher != null) {
accessEventFetcher.stop();
}
Expand All @@ -142,10 +175,6 @@ public void stop() throws IOException {
LOG.info("Stopped.");
}

public List<AccessCountTable> getTablesForLast(long timeInMills) throws MetaStoreException {
return accessCountTableManager.getTablesForLast(timeInMills);
}

public void reportFileAccessEvent(FileAccessEvent event) {
String path = event.getPath();
path = path + (path.endsWith("/") ? "" : "/");
Expand All @@ -164,16 +193,16 @@ public void reportFileAccessEvent(FileAccessEvent event) {
}

public List<FileAccessInfo> getHotFilesForLast(long timeInMills) {
return accessCountTableManager.search(FileAccessInfoSearchRequest.builder()
.lastAccessedTime(TimeInterval.builder()
.from(Instant.ofEpochMilli(System.currentTimeMillis() - timeInMills))
.build())
return fileAccessManager.search(FileAccessInfoSearchRequest.builder()
.lastAccessedTime(TimeInterval.builder()
.from(Instant.ofEpochMilli(System.currentTimeMillis() - timeInMills))
.build())
.build());
}

// todo remove after zeppelin removal
public List<CachedFileStatus> getCachedFileStatus() {
return cachedFilesManager.search(CachedFileSearchRequest.noFilters());
return cachedFilesManager.search(CachedFileSearchRequest.noFilters());
}

// todo remove after zeppelin removal
Expand Down Expand Up @@ -248,8 +277,4 @@ private synchronized void initStatesUpdaterService() {
LOG.info("", t);
}
}

public AccessCountTableManager getAccessCountTableManager() {
return accessCountTableManager;
}
}
Loading