Skip to content

Commit

Permalink
[ADH-4100] Delete access count tables only after aggregating it to mo…
Browse files Browse the repository at this point in the history
…re high-grained table
  • Loading branch information
tigrulya-exe committed Feb 19, 2024
1 parent 8aac6a0 commit cb0dfac
Show file tree
Hide file tree
Showing 17 changed files with 464 additions and 345 deletions.
8 changes: 8 additions & 0 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,12 @@
The max time in milliseconds to wait an answer from the SmartAgent master actor during action submission.
</description>
</property>

<property>
<name>smart.access.count.aggregation.interval.ms</name>
<value>5000</value>
<description>
The interval in milliseconds that is covered by single second-granularity access count table.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,23 @@ public class SmartConfKeys {
"smart.metastore.mysql.legacy.enabled";
public static final boolean SMART_METASTORE_LEGACY_MYSQL_SUPPORT_DEFAULT = false;

public static final String SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS =
"smart.access.count.aggregation.interval.ms";
public static final int SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT = 5000;

public static final String SMART_NUM_DAY_TABLES_TO_KEEP_KEY =
"smart.access.count.day.tables.num";
public static final int SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT = 30;

public static final String SMART_NUM_HOUR_TABLES_TO_KEEP_KEY =
"smart.access.count.hour.tables.num";
public static final int SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT = 48;
public static final int SMART_NUM_HOUR_TABLES_TO_KEEP_MIN = 24;

public static final String SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY =
"smart.access.count.minute.tables.num";
public static final int SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT = 120;
public static final int SMART_NUM_MINUTE_TABLES_TO_KEEP_MIN = 60;

public static final String SMART_NUM_SECOND_TABLES_TO_KEEP_KEY =
"smart.access.count.second.tables.num";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class AccessEventFetcher {

private final ScheduledExecutorService scheduledExecutorService;
private final Long fetchInterval;
private ScheduledFuture scheduledFuture;
private FetchTask fetchTask;
private final FetchTask fetchTask;
private ScheduledFuture<?> scheduledFuture;

public AccessEventFetcher(
Configuration conf,
Expand All @@ -50,31 +50,29 @@ public AccessEventFetcher(
SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_KEY,
SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_DEFAULT
);
this.fetchTask = new FetchTask(conf, manager, collector);
this.fetchTask = new FetchTask(manager, collector);
this.scheduledExecutorService = service;
}

public void start() {
Long current = System.currentTimeMillis();
Long toWait = fetchInterval - (current % fetchInterval);
long current = System.currentTimeMillis();
long toWait = fetchInterval - (current % fetchInterval);
this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
fetchTask, toWait, fetchInterval, TimeUnit.MILLISECONDS);
}

public void stop() {
if (scheduledFuture != null) {
this.scheduledFuture.cancel(true);
scheduledFuture.cancel(true);
}
}

private static class FetchTask implements Runnable {
private final Configuration conf;
private final AccessCountTableManager manager;
private final FileAccessEventCollector collector;

public FetchTask(
Configuration conf, AccessCountTableManager manager, FileAccessEventCollector collector) {
this.conf = conf;
AccessCountTableManager manager, FileAccessEventCollector collector) {
this.manager = manager;
this.collector = collector;
}
Expand All @@ -83,7 +81,7 @@ public FetchTask(
public void run() {
try {
List<FileAccessEvent> events = this.collector.collect();
if (events.size() > 0) {
if (!events.isEmpty()) {
this.manager.onAccessEventsArrived(events);
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public List<FileAccessInfo> getHotFiles(
try {
Map<Long, Integer> accessCounts =
accessCountDao.getHotFiles(tables, topNum);
if (accessCounts.size() == 0) {
if (accessCounts.isEmpty()) {
return new ArrayList<>();
}
Map<Long, String> idToPath = getFilePaths(accessCounts.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,20 @@ public AccessCountTableAggregator(MetaStore metaStore) {

public void aggregate(AccessCountTable destinationTable,
List<AccessCountTable> tablesToAggregate) throws MetaStoreException {
if (!tablesToAggregate.isEmpty()) {
ReentrantLock accessCountLock = metaStore.getAccessCountLock();
if (tablesToAggregate.isEmpty()) {
return;
}

ReentrantLock accessCountLock = metaStore.getAccessCountLock();
if (accessCountLock != null) {
accessCountLock.lock();
}
try {
metaStore.aggregateTables(destinationTable, tablesToAggregate);
metaStore.insertAccessCountTable(destinationTable);
} finally {
if (accessCountLock != null) {
accessCountLock.lock();
}
try {
metaStore.aggregateTables(destinationTable, tablesToAggregate);
metaStore.insertAccessCountTable(destinationTable);
} finally {
if (accessCountLock != null) {
accessCountLock.unlock();
}
accessCountLock.unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,52 @@
*/
package org.smartdata.metastore.dao;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Use deque to accelerate remove operation.
*/
public class AccessCountTableDeque extends ArrayDeque<AccessCountTable> {
public class AccessCountTableDeque extends ConcurrentLinkedDeque<AccessCountTable> {
private final TableAddOpListener listener;
private final TableEvictor tableEvictor;

public AccessCountTableDeque(TableEvictor tableEvictor) {
this(tableEvictor, null);
this(tableEvictor, TableAddOpListener.noOp());
}

public AccessCountTableDeque(TableEvictor tableEvictor, TableAddOpListener listener) {
super();
this.listener = listener;
this.tableEvictor = tableEvictor;
this.listener = checkNotNull(
listener, "listener should not be null");
this.tableEvictor = checkNotNull(
tableEvictor, "tableEvictor should not be null");
}

public boolean addAndNotifyListener(AccessCountTable table) {
if (!this.isEmpty()) {
assert table.getEndTime() > this.peekLast().getEndTime();
public CompletableFuture<Void> addAndNotifyListener(AccessCountTable table) {
if (!isEmpty() && table.getEndTime() <= peekLast().getEndTime()) {
throw new IllegalArgumentException("Overlapping access count table: " + table);
}

super.add(table);
if (this.listener != null) {
this.listener.tableAdded(this, table);
add(table);
return notifyListener(table);
}

public CompletableFuture<Void> notifyListener(AccessCountTable table) {
return listener.tableAdded(this, table)
.thenAccept(this::evictTablesIfHigherGrainedCreated);
}

private void evictTablesIfHigherGrainedCreated(AccessCountTable higherGrainedTable) {
if (higherGrainedTable == null) {
return;
}
tableEvictor.evictTables(this, this.size());
return true;

tableEvictor.evictTables(this, higherGrainedTable.getEndTime());
}

public List<AccessCountTable> getTables(Long start, Long end) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,30 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.smartdata.conf.SmartConfKeys.SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS;
import static org.smartdata.conf.SmartConfKeys.SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_DAY_TABLES_TO_KEEP_KEY;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_HOUR_TABLES_TO_KEEP_KEY;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_HOUR_TABLES_TO_KEEP_MIN;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_MINUTE_TABLES_TO_KEEP_MIN;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT;
import static org.smartdata.conf.SmartConfKeys.SMART_NUM_SECOND_TABLES_TO_KEEP_KEY;
import static org.smartdata.metastore.utils.Constants.ONE_MINUTE_IN_MILLIS;

public class AccessCountTableManager {
private final MetaStore metaStore;
private final Map<TimeGranularity, AccessCountTableDeque> tableDeques;
private final AccessEventAggregator accessEventAggregator;
private final ExecutorService executorService;
private final Configuration configuration;
private AccessCountTableDeque secondTableDeque;

public static final Logger LOG =
Expand All @@ -62,75 +69,113 @@ public AccessCountTableManager(MetaStore adapter,
this.metaStore = adapter;
this.tableDeques = new HashMap<>();
this.executorService = service;
this.accessEventAggregator = new AccessEventAggregator(adapter, this);
this.initTables(configuration);
this.configuration = configuration;

int aggregationIntervalMs = configuration.getInt(
SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS,
SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT);
this.accessEventAggregator = new AccessEventAggregator(adapter, this, aggregationIntervalMs);

initTables();
}

private int getAccessTablesCount(String configKey, int defaultValue, int minimalCount) {
int tableCount = configuration.getInt(configKey, defaultValue);

if (tableCount < minimalCount) {
String errorMessage = String.format(
"Wrong value for option %s. It should be at least %d", configKey, minimalCount);
LOG.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
return tableCount;
}

private void initTables(Configuration configuration) {
private void initTables() {
AccessCountTableAggregator aggregator = new AccessCountTableAggregator(metaStore);

int perDayAccessTablesCount = configuration.getInt(SMART_NUM_DAY_TABLES_TO_KEEP_KEY,
SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT);
AccessCountTableDeque dayTableDeque = new AccessCountTableDeque(
new CountEvictor(metaStore, perDayAccessTablesCount));
TableAddOpListener dayTableListener =
new TableAddOpListener.DayTableListener(dayTableDeque, aggregator, executorService);
TableAddOpListener.perDay(dayTableDeque, aggregator, executorService);

int perHourAccessTablesCount = configuration.getInt(SMART_NUM_HOUR_TABLES_TO_KEEP_KEY,
SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT);
int perHourAccessTablesCount = getAccessTablesCount(
SMART_NUM_HOUR_TABLES_TO_KEEP_KEY,
SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT,
SMART_NUM_HOUR_TABLES_TO_KEEP_MIN);
AccessCountTableDeque hourTableDeque = new AccessCountTableDeque(
new CountEvictor(metaStore, perHourAccessTablesCount), dayTableListener);
TableAddOpListener hourTableListener =
new TableAddOpListener.HourTableListener(hourTableDeque, aggregator, executorService);
TableAddOpListener.perHour(hourTableDeque, aggregator, executorService);

int perMinuteAccessTablesCount = configuration.getInt(SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY,
SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT);
int perMinuteAccessTablesCount = getAccessTablesCount(
SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY,
SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT,
SMART_NUM_MINUTE_TABLES_TO_KEEP_MIN);
AccessCountTableDeque minuteTableDeque = new AccessCountTableDeque(
new CountEvictor(metaStore, perMinuteAccessTablesCount), hourTableListener);
TableAddOpListener minuteTableListener =
new TableAddOpListener.MinuteTableListener(minuteTableDeque, aggregator,
TableAddOpListener.perMinute(minuteTableDeque, aggregator,
executorService);

int perSecondAccessTablesCount = configuration.getInt(SMART_NUM_SECOND_TABLES_TO_KEEP_KEY,
SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT);
int minimalSecondAccessTablesCount =
(int) (ONE_MINUTE_IN_MILLIS / accessEventAggregator.getAggregationGranularity());

int perSecondAccessTablesCount = getAccessTablesCount(
SMART_NUM_SECOND_TABLES_TO_KEEP_KEY,
SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT,
minimalSecondAccessTablesCount);
this.secondTableDeque = new AccessCountTableDeque(
new CountEvictor(metaStore, perSecondAccessTablesCount), minuteTableListener);

this.tableDeques.put(TimeGranularity.SECOND, this.secondTableDeque);
this.tableDeques.put(TimeGranularity.MINUTE, minuteTableDeque);
this.tableDeques.put(TimeGranularity.HOUR, hourTableDeque);
this.tableDeques.put(TimeGranularity.DAY, dayTableDeque);
this.recoverTables();
tableDeques.put(TimeGranularity.SECOND, secondTableDeque);
tableDeques.put(TimeGranularity.MINUTE, minuteTableDeque);
tableDeques.put(TimeGranularity.HOUR, hourTableDeque);
tableDeques.put(TimeGranularity.DAY, dayTableDeque);
recoverTables();
}

private void recoverTables() {
try {
List<AccessCountTable> tables = metaStore.getAllSortedTables();

if (tables.isEmpty()) {
return;
}

LOG.info("Loading existing access count tables: {}", tables);
for (AccessCountTable table : tables) {
TimeGranularity timeGranularity =
TimeUtils.getGranularity(table.getEndTime() - table.getStartTime());
if (tableDeques.containsKey(timeGranularity)) {
tableDeques.get(timeGranularity).add(table);
if (tableDeques.containsKey(table.getGranularity())) {
tableDeques.get(table.getGranularity()).add(table);
}
}

// aggregate old tables if needed
AccessCountTable lastOldTable = tables.get(tables.size() - 1);
Optional.ofNullable(tableDeques.get(lastOldTable.getGranularity()))
.ifPresent(deque -> deque.notifyListener(lastOldTable));

LOG.info("Existing access count tables were successfully loaded");
} catch (MetaStoreException e) {
LOG.error(e.toString());
LOG.error("Error during recovering access count tables", e);
}
}

public void addTable(AccessCountTable accessCountTable) {
if (LOG.isDebugEnabled()) {
LOG.debug(accessCountTable.toString());
}
this.secondTableDeque.addAndNotifyListener(accessCountTable);
secondTableDeque.addAndNotifyListener(accessCountTable);
}

public void onAccessEventsArrived(List<FileAccessEvent> accessEvents) {
this.accessEventAggregator.addAccessEvents(accessEvents);
accessEventAggregator.addAccessEvents(accessEvents);
}

public List<AccessCountTable> getTables(long lengthInMillis) throws MetaStoreException {
return AccessCountTableManager.getTables(this.tableDeques, this.metaStore, lengthInMillis);
return AccessCountTableManager.getTables(tableDeques, metaStore, lengthInMillis);
}

public static List<AccessCountTable> getTables(
Expand Down
Loading

0 comments on commit cb0dfac

Please sign in to comment.