diff --git a/conf/smart-default.xml b/conf/smart-default.xml index a70401a960f..f4686eaef24 100644 --- a/conf/smart-default.xml +++ b/conf/smart-default.xml @@ -471,4 +471,12 @@ The max time in milliseconds to wait an answer from the SmartAgent master actor during action submission. + + + smart.access.count.aggregation.interval.ms + 5000 + + The interval in milliseconds that is covered by single second-granularity access count table. + + diff --git a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java index 9caac0aa7c6..e1c80aceab3 100644 --- a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java +++ b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java @@ -90,6 +90,10 @@ public class SmartConfKeys { "smart.metastore.mysql.legacy.enabled"; public static final boolean SMART_METASTORE_LEGACY_MYSQL_SUPPORT_DEFAULT = false; + public static final String SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS = + "smart.access.count.aggregation.interval.ms"; + public static final int SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT = 5000; + public static final String SMART_NUM_DAY_TABLES_TO_KEEP_KEY = "smart.access.count.day.tables.num"; public static final int SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT = 30; @@ -97,10 +101,12 @@ public class SmartConfKeys { public static final String SMART_NUM_HOUR_TABLES_TO_KEEP_KEY = "smart.access.count.hour.tables.num"; public static final int SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT = 48; + public static final int SMART_NUM_HOUR_TABLES_TO_KEEP_MIN = 24; public static final String SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY = "smart.access.count.minute.tables.num"; public static final int SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT = 120; + public static final int SMART_NUM_MINUTE_TABLES_TO_KEEP_MIN = 60; public static final String SMART_NUM_SECOND_TABLES_TO_KEEP_KEY = "smart.access.count.second.tables.num"; diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/data/AccessEventFetcher.java b/smart-engine/src/main/java/org/smartdata/server/engine/data/AccessEventFetcher.java index 1e56e3fc651..cf2533f31a5 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/data/AccessEventFetcher.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/data/AccessEventFetcher.java @@ -38,8 +38,8 @@ public class AccessEventFetcher { private final ScheduledExecutorService scheduledExecutorService; private final Long fetchInterval; - private ScheduledFuture scheduledFuture; - private FetchTask fetchTask; + private final FetchTask fetchTask; + private ScheduledFuture scheduledFuture; public AccessEventFetcher( Configuration conf, @@ -50,31 +50,29 @@ public AccessEventFetcher( SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_KEY, SMART_ACCESS_EVENT_FETCH_INTERVAL_MS_DEFAULT ); - this.fetchTask = new FetchTask(conf, manager, collector); + this.fetchTask = new FetchTask(manager, collector); this.scheduledExecutorService = service; } public void start() { - Long current = System.currentTimeMillis(); - Long toWait = fetchInterval - (current % fetchInterval); + long current = System.currentTimeMillis(); + long toWait = fetchInterval - (current % fetchInterval); this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate( fetchTask, toWait, fetchInterval, TimeUnit.MILLISECONDS); } public void stop() { if (scheduledFuture != null) { - this.scheduledFuture.cancel(true); + scheduledFuture.cancel(true); } } private static class FetchTask implements Runnable { - private final Configuration conf; private final AccessCountTableManager manager; private final FileAccessEventCollector collector; public FetchTask( - Configuration conf, AccessCountTableManager manager, FileAccessEventCollector collector) { - this.conf = conf; + AccessCountTableManager manager, FileAccessEventCollector collector) { this.manager = manager; this.collector = collector; } @@ -83,7 +81,7 @@ public FetchTask( public void run() { try { List events = this.collector.collect(); - if (events.size() > 0) { + if (!events.isEmpty()) { this.manager.onAccessEventsArrived(events); } } catch (IOException e) { diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java index dba3efa1835..c09103e7188 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java @@ -339,7 +339,7 @@ public List getHotFiles( try { Map accessCounts = accessCountDao.getHotFiles(tables, topNum); - if (accessCounts.size() == 0) { + if (accessCounts.isEmpty()) { return new ArrayList<>(); } Map idToPath = getFilePaths(accessCounts.keySet()); diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableAggregator.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableAggregator.java index 2d50014cf3d..349b0369b8e 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableAggregator.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableAggregator.java @@ -36,18 +36,20 @@ public AccessCountTableAggregator(MetaStore metaStore) { public void aggregate(AccessCountTable destinationTable, List tablesToAggregate) throws MetaStoreException { - if (!tablesToAggregate.isEmpty()) { - ReentrantLock accessCountLock = metaStore.getAccessCountLock(); + if (tablesToAggregate.isEmpty()) { + return; + } + + ReentrantLock accessCountLock = metaStore.getAccessCountLock(); + if (accessCountLock != null) { + accessCountLock.lock(); + } + try { + metaStore.aggregateTables(destinationTable, tablesToAggregate); + metaStore.insertAccessCountTable(destinationTable); + } finally { if (accessCountLock != null) { - accessCountLock.lock(); - } - try { - metaStore.aggregateTables(destinationTable, tablesToAggregate); - metaStore.insertAccessCountTable(destinationTable); - } finally { - if (accessCountLock != null) { - accessCountLock.unlock(); - } + accessCountLock.unlock(); } } } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableDeque.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableDeque.java index 3ad4f5dda88..7fe7f6adec7 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableDeque.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableDeque.java @@ -17,38 +17,52 @@ */ package org.smartdata.metastore.dao; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; + +import static com.google.common.base.Preconditions.checkNotNull; /** * Use deque to accelerate remove operation. */ -public class AccessCountTableDeque extends ArrayDeque { +public class AccessCountTableDeque extends ConcurrentLinkedDeque { private final TableAddOpListener listener; private final TableEvictor tableEvictor; public AccessCountTableDeque(TableEvictor tableEvictor) { - this(tableEvictor, null); + this(tableEvictor, TableAddOpListener.noOp()); } public AccessCountTableDeque(TableEvictor tableEvictor, TableAddOpListener listener) { super(); - this.listener = listener; - this.tableEvictor = tableEvictor; + this.listener = checkNotNull( + listener, "listener should not be null"); + this.tableEvictor = checkNotNull( + tableEvictor, "tableEvictor should not be null"); } - public boolean addAndNotifyListener(AccessCountTable table) { - if (!this.isEmpty()) { - assert table.getEndTime() > this.peekLast().getEndTime(); + public CompletableFuture addAndNotifyListener(AccessCountTable table) { + if (!isEmpty() && table.getEndTime() <= peekLast().getEndTime()) { + throw new IllegalArgumentException("Overlapping access count table: " + table); } - super.add(table); - if (this.listener != null) { - this.listener.tableAdded(this, table); + add(table); + return notifyListener(table); + } + + public CompletableFuture notifyListener(AccessCountTable table) { + return listener.tableAdded(this, table) + .thenAccept(this::evictTablesIfHigherGrainedCreated); + } + + private void evictTablesIfHigherGrainedCreated(AccessCountTable higherGrainedTable) { + if (higherGrainedTable == null) { + return; } - tableEvictor.evictTables(this, this.size()); - return true; + + tableEvictor.evictTables(this, higherGrainedTable.getEndTime()); } public List getTables(Long start, Long end) { diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableManager.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableManager.java index 21556b464b7..f8439de31b7 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableManager.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessCountTableManager.java @@ -31,23 +31,30 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.smartdata.conf.SmartConfKeys.SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS; +import static org.smartdata.conf.SmartConfKeys.SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT; import static org.smartdata.conf.SmartConfKeys.SMART_NUM_DAY_TABLES_TO_KEEP_DEFAULT; import static org.smartdata.conf.SmartConfKeys.SMART_NUM_DAY_TABLES_TO_KEEP_KEY; import static org.smartdata.conf.SmartConfKeys.SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT; import static org.smartdata.conf.SmartConfKeys.SMART_NUM_HOUR_TABLES_TO_KEEP_KEY; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_HOUR_TABLES_TO_KEEP_MIN; import static org.smartdata.conf.SmartConfKeys.SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT; import static org.smartdata.conf.SmartConfKeys.SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY; +import static org.smartdata.conf.SmartConfKeys.SMART_NUM_MINUTE_TABLES_TO_KEEP_MIN; import static org.smartdata.conf.SmartConfKeys.SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT; import static org.smartdata.conf.SmartConfKeys.SMART_NUM_SECOND_TABLES_TO_KEEP_KEY; +import static org.smartdata.metastore.utils.Constants.ONE_MINUTE_IN_MILLIS; public class AccessCountTableManager { private final MetaStore metaStore; private final Map tableDeques; private final AccessEventAggregator accessEventAggregator; private final ExecutorService executorService; + private final Configuration configuration; private AccessCountTableDeque secondTableDeque; public static final Logger LOG = @@ -62,11 +69,29 @@ public AccessCountTableManager(MetaStore adapter, this.metaStore = adapter; this.tableDeques = new HashMap<>(); this.executorService = service; - this.accessEventAggregator = new AccessEventAggregator(adapter, this); - this.initTables(configuration); + this.configuration = configuration; + + int aggregationIntervalMs = configuration.getInt( + SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS, + SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT); + this.accessEventAggregator = new AccessEventAggregator(adapter, this, aggregationIntervalMs); + + initTables(); + } + + private int getAccessTablesCount(String configKey, int defaultValue, int minimalCount) { + int tableCount = configuration.getInt(configKey, defaultValue); + + if (tableCount < minimalCount) { + String errorMessage = String.format( + "Wrong value for option %s. It should be at least %d", configKey, minimalCount); + LOG.error(errorMessage); + throw new IllegalArgumentException(errorMessage); + } + return tableCount; } - private void initTables(Configuration configuration) { + private void initTables() { AccessCountTableAggregator aggregator = new AccessCountTableAggregator(metaStore); int perDayAccessTablesCount = configuration.getInt(SMART_NUM_DAY_TABLES_TO_KEEP_KEY, @@ -74,47 +99,67 @@ private void initTables(Configuration configuration) { AccessCountTableDeque dayTableDeque = new AccessCountTableDeque( new CountEvictor(metaStore, perDayAccessTablesCount)); TableAddOpListener dayTableListener = - new TableAddOpListener.DayTableListener(dayTableDeque, aggregator, executorService); + TableAddOpListener.perDay(dayTableDeque, aggregator, executorService); - int perHourAccessTablesCount = configuration.getInt(SMART_NUM_HOUR_TABLES_TO_KEEP_KEY, - SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT); + int perHourAccessTablesCount = getAccessTablesCount( + SMART_NUM_HOUR_TABLES_TO_KEEP_KEY, + SMART_NUM_HOUR_TABLES_TO_KEEP_DEFAULT, + SMART_NUM_HOUR_TABLES_TO_KEEP_MIN); AccessCountTableDeque hourTableDeque = new AccessCountTableDeque( new CountEvictor(metaStore, perHourAccessTablesCount), dayTableListener); TableAddOpListener hourTableListener = - new TableAddOpListener.HourTableListener(hourTableDeque, aggregator, executorService); + TableAddOpListener.perHour(hourTableDeque, aggregator, executorService); - int perMinuteAccessTablesCount = configuration.getInt(SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY, - SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT); + int perMinuteAccessTablesCount = getAccessTablesCount( + SMART_NUM_MINUTE_TABLES_TO_KEEP_KEY, + SMART_NUM_MINUTE_TABLES_TO_KEEP_DEFAULT, + SMART_NUM_MINUTE_TABLES_TO_KEEP_MIN); AccessCountTableDeque minuteTableDeque = new AccessCountTableDeque( new CountEvictor(metaStore, perMinuteAccessTablesCount), hourTableListener); TableAddOpListener minuteTableListener = - new TableAddOpListener.MinuteTableListener(minuteTableDeque, aggregator, + TableAddOpListener.perMinute(minuteTableDeque, aggregator, executorService); - int perSecondAccessTablesCount = configuration.getInt(SMART_NUM_SECOND_TABLES_TO_KEEP_KEY, - SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT); + int minimalSecondAccessTablesCount = + (int) (ONE_MINUTE_IN_MILLIS / accessEventAggregator.getAggregationGranularity()); + + int perSecondAccessTablesCount = getAccessTablesCount( + SMART_NUM_SECOND_TABLES_TO_KEEP_KEY, + SMART_NUM_SECOND_TABLES_TO_KEEP_DEFAULT, + minimalSecondAccessTablesCount); this.secondTableDeque = new AccessCountTableDeque( new CountEvictor(metaStore, perSecondAccessTablesCount), minuteTableListener); - this.tableDeques.put(TimeGranularity.SECOND, this.secondTableDeque); - this.tableDeques.put(TimeGranularity.MINUTE, minuteTableDeque); - this.tableDeques.put(TimeGranularity.HOUR, hourTableDeque); - this.tableDeques.put(TimeGranularity.DAY, dayTableDeque); - this.recoverTables(); + tableDeques.put(TimeGranularity.SECOND, secondTableDeque); + tableDeques.put(TimeGranularity.MINUTE, minuteTableDeque); + tableDeques.put(TimeGranularity.HOUR, hourTableDeque); + tableDeques.put(TimeGranularity.DAY, dayTableDeque); + recoverTables(); } private void recoverTables() { try { List tables = metaStore.getAllSortedTables(); + + if (tables.isEmpty()) { + return; + } + + LOG.info("Loading existing access count tables: {}", tables); for (AccessCountTable table : tables) { - TimeGranularity timeGranularity = - TimeUtils.getGranularity(table.getEndTime() - table.getStartTime()); - if (tableDeques.containsKey(timeGranularity)) { - tableDeques.get(timeGranularity).add(table); + if (tableDeques.containsKey(table.getGranularity())) { + tableDeques.get(table.getGranularity()).add(table); } } + + // aggregate old tables if needed + AccessCountTable lastOldTable = tables.get(tables.size() - 1); + Optional.ofNullable(tableDeques.get(lastOldTable.getGranularity())) + .ifPresent(deque -> deque.notifyListener(lastOldTable)); + + LOG.info("Existing access count tables were successfully loaded"); } catch (MetaStoreException e) { - LOG.error(e.toString()); + LOG.error("Error during recovering access count tables", e); } } @@ -122,15 +167,15 @@ public void addTable(AccessCountTable accessCountTable) { if (LOG.isDebugEnabled()) { LOG.debug(accessCountTable.toString()); } - this.secondTableDeque.addAndNotifyListener(accessCountTable); + secondTableDeque.addAndNotifyListener(accessCountTable); } public void onAccessEventsArrived(List accessEvents) { - this.accessEventAggregator.addAccessEvents(accessEvents); + accessEventAggregator.addAccessEvents(accessEvents); } public List getTables(long lengthInMillis) throws MetaStoreException { - return AccessCountTableManager.getTables(this.tableDeques, this.metaStore, lengthInMillis); + return AccessCountTableManager.getTables(tableDeques, metaStore, lengthInMillis); } public static List getTables( diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessEventAggregator.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessEventAggregator.java index 5c125838b51..f4667774f48 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessEventAggregator.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AccessEventAggregator.java @@ -30,8 +30,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.smartdata.conf.SmartConfKeys.SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT; public class AccessEventAggregator { private final MetaStore adapter; @@ -39,12 +42,12 @@ public class AccessEventAggregator { private final AccessCountTableManager accessCountTableManager; private final List eventBuffer; private Window currentWindow; - private Map lastAccessCount = new HashMap<>(); + private Map unmergedAccessCounts = new HashMap<>(); public static final Logger LOG = LoggerFactory.getLogger(AccessEventAggregator.class); public AccessEventAggregator(MetaStore adapter, AccessCountTableManager manager) { - this(adapter, manager, 5 * 1000L); + this(adapter, manager, SMART_ACCESS_COUNT_AGGREGATION_INTERVAL_MS_DEFAULT); } public AccessEventAggregator(MetaStore adapter, @@ -56,23 +59,27 @@ public AccessEventAggregator(MetaStore adapter, } public void addAccessEvents(List eventList) { - if (this.currentWindow == null && !eventList.isEmpty()) { - this.currentWindow = assignWindow(eventList.get(0).getTimestamp()); + if (currentWindow == null && !eventList.isEmpty()) { + currentWindow = assignWindow(eventList.get(0).getTimestamp()); } for (FileAccessEvent event : eventList) { - if (!this.currentWindow.contains(event.getTimestamp())) { + if (!currentWindow.contains(event.getTimestamp())) { // New Window occurs - this.createTable(); - this.currentWindow = assignWindow(event.getTimestamp()); - this.eventBuffer.clear(); + createTable(); + currentWindow = assignWindow(event.getTimestamp()); + eventBuffer.clear(); } // Exclude watermark event if (!event.getPath().isEmpty()) { - this.eventBuffer.add(event); + eventBuffer.add(event); } } } + public long getAggregationGranularity() { + return aggregationGranularity; + } + private void createTable() { AccessCountTable table = new AccessCountTable(currentWindow.start, currentWindow.end); String createTable = AccessCountDao.createAccessCountTableSQL(table.getTableName()); @@ -86,57 +93,57 @@ private void createTable() { LOG.error("Create table error: " + table, e); return; } - if (!eventBuffer.isEmpty() || !lastAccessCount.isEmpty()) { - Map accessCount = this.getAccessCountMap(eventBuffer); - Set now = new HashSet<>(accessCount.keySet()); - accessCount = mergeMap(accessCount, lastAccessCount); + + if (!eventBuffer.isEmpty() || !unmergedAccessCounts.isEmpty()) { + Map accessCounts = getAccessCountMap(eventBuffer); + Set accessedFiles = new HashSet<>(accessCounts.keySet()); + mergeMapsInPlace(accessCounts, unmergedAccessCounts); final Map pathToIDs; try { - pathToIDs = adapter.getFileIDs(accessCount.keySet()); + pathToIDs = adapter.getFileIDs(accessCounts.keySet()); } catch (MetaStoreException e) { // TODO: dirty handle here LOG.error("Create Table " + table.getTableName(), e); return; } - now.removeAll(pathToIDs.keySet()); - Map tmpLast = new HashMap<>(); - for (String key : now) { - tmpLast.put(key, accessCount.get(key)); - } + Map accessCountsNotHandledBySSM = accessedFiles.stream() + .filter(file -> !pathToIDs.containsKey(file)) + .collect(Collectors.toMap( + Function.identity(), + accessCounts::get + )); - List values = new ArrayList<>(); + List sqlInsertValues = new ArrayList<>(); for (String key : pathToIDs.keySet()) { - values.add(String.format("(%d, %d)", pathToIDs.get(key), - accessCount.get(key))); + sqlInsertValues.add(String.format("(%d, %d)", pathToIDs.get(key), + accessCounts.get(key))); } - if (LOG.isDebugEnabled()) { - if (!lastAccessCount.isEmpty()) { - Set non = lastAccessCount.keySet(); - non.removeAll(pathToIDs.keySet()); - if (!non.isEmpty()) { - StringBuilder result = new StringBuilder("Access events ignored for file:\n"); - for (String p : non) { - result.append(p).append(" --> ").append(lastAccessCount.get(p)).append("\n"); - } - LOG.debug(result.toString()); + if (LOG.isDebugEnabled() && !unmergedAccessCounts.isEmpty()) { + Set non = unmergedAccessCounts.keySet(); + non.removeAll(pathToIDs.keySet()); + if (!non.isEmpty()) { + StringBuilder result = new StringBuilder("Access events ignored for file:\n"); + for (String p : non) { + result.append(p).append(" --> ").append(unmergedAccessCounts.get(p)).append("\n"); } + LOG.debug(result.toString()); } } - lastAccessCount = tmpLast; + unmergedAccessCounts = accessCountsNotHandledBySSM; - if (!values.isEmpty()) { + if (!sqlInsertValues.isEmpty()) { String insertValue = String.format( "INSERT INTO %s (%s, %s) VALUES %s", table.getTableName(), DefaultAccessCountDao.FILE_FIELD, DefaultAccessCountDao.ACCESSCOUNT_FIELD, - StringUtils.join(values, ", ")); + StringUtils.join(sqlInsertValues, ", ")); try { - this.adapter.execute(insertValue); - this.adapter.updateCachedFiles(pathToIDs, eventBuffer); + adapter.execute(insertValue); + adapter.updateCachedFiles(pathToIDs, eventBuffer); if (LOG.isDebugEnabled()) { LOG.debug("Table created: " + table); } @@ -145,32 +152,20 @@ private void createTable() { } } } - this.accessCountTableManager.addTable(table); + accessCountTableManager.addTable(table); } - private Map mergeMap(Map map1, Map map2) { - for (Entry entry : map2.entrySet()) { - String key = entry.getKey(); - if (map1.containsKey(key)) { - map1.put(key, map1.get(key) + entry.getValue()); - } else { - map1.put(key, map2.get(key)); - } - } - return map1; + private void mergeMapsInPlace(Map resultMap, Map mapToMerge) { + mapToMerge.forEach((key, value) -> resultMap.merge(key, value, Integer::sum)); } private Map getAccessCountMap(List events) { - Map map = new HashMap<>(); - for (FileAccessEvent event : events) { - String path = event.getPath(); - if (map.containsKey(path)) { - map.put(path, map.get(path) + 1); - } else { - map.put(path, 1); - } - } - return map; + return events.stream() + .collect(Collectors.toMap( + FileAccessEvent::getPath, + event -> 1, + Integer::sum + )); } private Window assignWindow(long time) { diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/AggregatingTableAddOpListener.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AggregatingTableAddOpListener.java new file mode 100644 index 00000000000..d367c339268 --- /dev/null +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/AggregatingTableAddOpListener.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.metastore.dao; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +public class AggregatingTableAddOpListener implements TableAddOpListener { + static final Logger LOG = LoggerFactory.getLogger(AggregatingTableAddOpListener.class); + + private final AccessCountTableDeque coarseGrainedTableDeque; + private final AccessCountTableAggregator tableAggregator; + private final ExecutorService executorService; + private final Set tablesUnderConstruction; + private final long millisPerGranularity; + + AggregatingTableAddOpListener( + AccessCountTableDeque deque, + AccessCountTableAggregator aggregator, + ExecutorService executorService, + long millisPerGranularity) { + this.coarseGrainedTableDeque = deque; + this.tableAggregator = aggregator; + this.executorService = executorService; + this.millisPerGranularity = millisPerGranularity; + tablesUnderConstruction = ConcurrentHashMap.newKeySet(); + } + + @Override + public CompletableFuture tableAdded( + AccessCountTableDeque fineGrainedTableDeque, AccessCountTable table) { + final AccessCountTable lastCoarseGrainedTable = lastCoarseGrainedTableFor(table.getEndTime()); + + // Todo: optimize contains + if (coarseGrainedTableDeque.contains(lastCoarseGrainedTable)) { + return CompletableFuture.completedFuture(lastCoarseGrainedTable); + } + + final List tablesToAggregate = + fineGrainedTableDeque.getTables( + lastCoarseGrainedTable.getStartTime(), lastCoarseGrainedTable.getEndTime()); + + if (!tablesToAggregate.isEmpty() + && !tablesUnderConstruction.contains(lastCoarseGrainedTable)) { + tablesUnderConstruction.add(lastCoarseGrainedTable); + return aggregateTableAsync(lastCoarseGrainedTable, tablesToAggregate); + } + + return TableAddOpListener.EMPTY_RESULT; + } + + private CompletableFuture aggregateTableAsync( + AccessCountTable lastCoarseGrainedTable, List tablesToAggregate) { + return CompletableFuture.supplyAsync( + () -> aggregateTable(lastCoarseGrainedTable, tablesToAggregate), executorService); + } + + private AccessCountTable aggregateTable( + AccessCountTable lastCoarseGrainedTable, List tablesToAggregate) { + try { + tableAggregator.aggregate(lastCoarseGrainedTable, tablesToAggregate); + coarseGrainedTableDeque.addAndNotifyListener(lastCoarseGrainedTable); + } catch (Exception e) { + LOG.error( + "Add AccessCount Table {} error", + lastCoarseGrainedTable.getTableName(), e); + } + tablesUnderConstruction.remove(lastCoarseGrainedTable); + return lastCoarseGrainedTable; + } + + protected AccessCountTable lastCoarseGrainedTableFor(long endTime) { + long lastEnd = endTime - (endTime % millisPerGranularity); + long lastStart = lastEnd - millisPerGranularity; + return new AccessCountTable(lastStart, lastEnd); + } + // Todo: WeekTableListener, MonthTableListener, YearTableListener +} diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/CountEvictor.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/CountEvictor.java index b3e1a070496..d3bb2bb420e 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/CountEvictor.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/CountEvictor.java @@ -30,19 +30,21 @@ public CountEvictor(MetaStore adapter, int count) { } @Override - public void evictTables(AccessCountTableDeque tables, int size) { - if (size > maxCount) { - int evictedCount = 0; - for (Iterator iterator = tables.iterator(); iterator.hasNext();) { - AccessCountTable table = iterator.next(); - evictedCount++; - if (evictedCount > size - maxCount) { - break; - } else { - this.dropTable(table); - iterator.remove(); - } + public void evictTables(AccessCountTableDeque tables, long lastAggregatedIntervalEndTimestamp) { + int elementsToRemove = tables.size() - maxCount; + + int evictedCount = 0; + for (Iterator iterator = tables.iterator(); + iterator.hasNext() && evictedCount++ < elementsToRemove;) { + AccessCountTable table = iterator.next(); + + if (table.getEndTime() > lastAggregatedIntervalEndTimestamp) { + // table belongs to not yet aggregated higher granularity interval + return; } + + iterator.remove(); + dropTable(table); } } } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/DurationEvictor.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/DurationEvictor.java deleted file mode 100644 index 07631c177a0..00000000000 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/DurationEvictor.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.smartdata.metastore.dao; - -import org.smartdata.metastore.MetaStore; - -import java.util.Iterator; - -public class DurationEvictor extends TableEvictor { - private final long duration; - - public DurationEvictor(MetaStore adapter, long duration) { - super(adapter); - this.duration = duration; - } - - @Override - public void evictTables(AccessCountTableDeque tables, int size) { - if (tables.peek() != null){ - AccessCountTable latestTable = tables.peekLast(); - Long threshHold = latestTable.getEndTime() - duration; - for (Iterator iterator = tables.iterator(); iterator.hasNext();) { - AccessCountTable table = iterator.next(); - if (table.getStartTime() < threshHold) { - this.dropTable(table); - iterator.remove(); - } else { - break; - } - } - } - } -} diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/TableAddOpListener.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/TableAddOpListener.java index db9c7fc2641..d7157c63036 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/TableAddOpListener.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/TableAddOpListener.java @@ -17,112 +17,36 @@ */ package org.smartdata.metastore.dao; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.smartdata.metastore.MetaStoreException; import org.smartdata.metastore.utils.Constants; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -public abstract class TableAddOpListener { - static final Logger LOG = LoggerFactory.getLogger(TableAddOpListener.class); - private final Set tablesUnderAggregating; +public interface TableAddOpListener { + CompletableFuture EMPTY_RESULT = CompletableFuture.completedFuture(null); - AccessCountTableDeque coarseGrainedTableDeque; - AccessCountTableAggregator tableAggregator; - ExecutorService executorService; + CompletableFuture tableAdded( + AccessCountTableDeque fineGrainedTableDeque, AccessCountTable table); - TableAddOpListener( - AccessCountTableDeque deque, - AccessCountTableAggregator aggregator, - ExecutorService executorService) { - this.coarseGrainedTableDeque = deque; - this.tableAggregator = aggregator; - this.executorService = executorService; - this.tablesUnderAggregating = new HashSet<>(); + static TableAddOpListener perDay(AccessCountTableDeque deque, + AccessCountTableAggregator aggregator, ExecutorService service) { + return new AggregatingTableAddOpListener( + deque, aggregator, service, Constants.ONE_DAY_IN_MILLIS); } - public void tableAdded(AccessCountTableDeque fineGrainedTableDeque, AccessCountTable table) { - final AccessCountTable lastCoarseGrainedTable = lastCoarseGrainedTableFor(table.getEndTime()); - // Todo: optimize contains - if (!coarseGrainedTableDeque.contains(lastCoarseGrainedTable)) { - final List tablesToAggregate = - fineGrainedTableDeque.getTables( - lastCoarseGrainedTable.getStartTime(), lastCoarseGrainedTable.getEndTime()); - if (!tablesToAggregate.isEmpty() - && !tablesUnderAggregating.contains(lastCoarseGrainedTable)) { - tablesUnderAggregating.add(lastCoarseGrainedTable); - executorService.submit( - new Runnable() { - @Override - public void run() { - try { - tableAggregator.aggregate(lastCoarseGrainedTable, tablesToAggregate); - coarseGrainedTableDeque.addAndNotifyListener(lastCoarseGrainedTable); - tablesUnderAggregating.remove(lastCoarseGrainedTable); - } catch (MetaStoreException e) { - LOG.error( - "Add AccessCount Table {} error", - lastCoarseGrainedTable.getTableName(), e); - } - } - }); - } - } + static TableAddOpListener perHour(AccessCountTableDeque deque, + AccessCountTableAggregator aggregator, ExecutorService service) { + return new AggregatingTableAddOpListener( + deque, aggregator, service, Constants.ONE_HOUR_IN_MILLIS); } - public abstract AccessCountTable lastCoarseGrainedTableFor(Long startTime); - - public static class MinuteTableListener extends TableAddOpListener { - public MinuteTableListener( - AccessCountTableDeque deque, - AccessCountTableAggregator aggregator, - ExecutorService service) { - super(deque, aggregator, service); - } - - @Override - public AccessCountTable lastCoarseGrainedTableFor(Long endTime) { - long lastEnd = endTime - (endTime % Constants.ONE_MINUTE_IN_MILLIS); - long lastStart = lastEnd - Constants.ONE_MINUTE_IN_MILLIS; - return new AccessCountTable(lastStart, lastEnd); - } + static TableAddOpListener perMinute(AccessCountTableDeque deque, + AccessCountTableAggregator aggregator, ExecutorService service) { + return new AggregatingTableAddOpListener( + deque, aggregator, service, Constants.ONE_MINUTE_IN_MILLIS); } - public static class HourTableListener extends TableAddOpListener { - public HourTableListener( - AccessCountTableDeque deque, - AccessCountTableAggregator aggregator, - ExecutorService service) { - super(deque, aggregator, service); - } - - @Override - public AccessCountTable lastCoarseGrainedTableFor(Long endTime) { - long lastEnd = endTime - (endTime % Constants.ONE_HOUR_IN_MILLIS); - long lastStart = lastEnd - Constants.ONE_HOUR_IN_MILLIS; - return new AccessCountTable(lastStart, lastEnd); - } + static TableAddOpListener noOp() { + return (deque, table) -> EMPTY_RESULT; } - - public static class DayTableListener extends TableAddOpListener { - public DayTableListener( - AccessCountTableDeque deque, - AccessCountTableAggregator aggregator, - ExecutorService service) { - super(deque, aggregator, service); - } - - @Override - public AccessCountTable lastCoarseGrainedTableFor(Long endTime) { - long lastEnd = endTime - (endTime % Constants.ONE_DAY_IN_MILLIS); - long lastStart = lastEnd - Constants.ONE_DAY_IN_MILLIS; - return new AccessCountTable(lastStart, lastEnd); - } - } - - // Todo: WeekTableListener, MonthTableListener, YearTableListener } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/TableEvictor.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/TableEvictor.java index 092c9107603..97fde54b076 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/TableEvictor.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/TableEvictor.java @@ -32,13 +32,13 @@ public TableEvictor(MetaStore metaStore) { public void dropTable(AccessCountTable accessCountTable) { try { - this.metaStore.dropTable(accessCountTable.getTableName()); - this.metaStore.deleteAccessCountTable(accessCountTable); + metaStore.dropTable(accessCountTable.getTableName()); + metaStore.deleteAccessCountTable(accessCountTable); LOG.debug("Dropped access count table " + accessCountTable.getTableName()); } catch (MetaStoreException e) { LOG.error("Drop access count table {} failed", accessCountTable.getTableName(), e); } } - abstract void evictTables(AccessCountTableDeque tables, int size); + abstract void evictTables(AccessCountTableDeque tables, long lastAggregatedIntervalEndTimestamp); } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultAccessCountDao.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultAccessCountDao.java index 806e8d348d3..2081cccdfa4 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultAccessCountDao.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultAccessCountDao.java @@ -17,6 +17,8 @@ */ package org.smartdata.metastore.dao.impl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.smartdata.metastore.dao.AbstractDao; import org.smartdata.metastore.dao.AccessCountDao; import org.smartdata.metastore.dao.AccessCountTable; @@ -33,6 +35,8 @@ import java.util.Map; public class DefaultAccessCountDao extends AbstractDao implements AccessCountDao { + static final Logger LOG = LoggerFactory.getLogger(DefaultAccessCountDao.class); + private static final String TABLE_NAME = "access_count_table"; public DefaultAccessCountDao(DataSource dataSource) { @@ -95,6 +99,7 @@ public void aggregateTables( DefaultAccessCountDao.ACCESSCOUNT_FIELD, getUnionStatement(tablesToAggregate), DefaultAccessCountDao.FILE_FIELD); + LOG.debug("Executing access count tables aggregation: {}", insert); jdbcTemplate.execute(insert); } diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/TestDBUtil.java b/smart-metastore/src/test/java/org/smartdata/metastore/TestDBUtil.java index 107debd15b9..c4fe80f2784 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/TestDBUtil.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/TestDBUtil.java @@ -18,6 +18,8 @@ package org.smartdata.metastore; import org.apache.hadoop.conf.Configuration; +import org.smartdata.metastore.dao.AccessCountTable; +import org.smartdata.metastore.dao.AccessCountTableDeque; import org.smartdata.metastore.db.DBHandlersFactory; import org.smartdata.metastore.db.DbSchemaManager; @@ -34,6 +36,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.UUID; +import java.util.concurrent.TimeUnit; import static org.smartdata.metastore.utils.MetaStoreUtils.SQLITE_URL_PREFIX; @@ -61,6 +64,12 @@ public static String getUniqueSqliteUrl() { return SQLITE_URL_PREFIX + getUniqueDBFilePath(); } + public static void addAccessCountTableToDeque( + AccessCountTableDeque deque, AccessCountTable table) throws Exception { + deque.addAndNotifyListener(table) + .get(1, TimeUnit.SECONDS); + } + /** * Get an initialized empty Sqlite database file path. * diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAddTableOpListener.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAddTableOpListener.java index 2ca0b09df4f..216d1c06eda 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAddTableOpListener.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestAddTableOpListener.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executors; import static org.mockito.Mockito.mock; +import static org.smartdata.metastore.TestDBUtil.addAccessCountTableToDeque; public class TestAddTableOpListener { MetaStore adapter = mock(MetaStore.class); @@ -33,13 +34,12 @@ public class TestAddTableOpListener { mock(MetaStore.class)); @Test - public void testMinuteTableListener() throws InterruptedException { - Long oneSec = 1000L; + public void testMinuteTableListener() throws Exception { + long oneSec = 1000L; TableEvictor tableEvictor = new CountEvictor(adapter, 10); AccessCountTableDeque minuteTableDeque = new AccessCountTableDeque(tableEvictor); TableAddOpListener minuteTableListener = - new TableAddOpListener.MinuteTableListener(minuteTableDeque, aggregator, - executorService); + TableAddOpListener.perMinute(minuteTableDeque, aggregator, executorService); AccessCountTableDeque secondTableDeque = new AccessCountTableDeque(tableEvictor, minuteTableListener); @@ -50,26 +50,26 @@ public void testMinuteTableListener() throws InterruptedException { AccessCountTable table3 = new AccessCountTable(55 * oneSec, 60 * oneSec); - secondTableDeque.addAndNotifyListener(table1); - Assert.assertTrue(minuteTableDeque.size() == 0); - secondTableDeque.addAndNotifyListener(table2); - Assert.assertTrue(minuteTableDeque.size() == 0); + addAccessCountTableToDeque(secondTableDeque, table1); + Assert.assertTrue(minuteTableDeque.isEmpty()); - secondTableDeque.addAndNotifyListener(table3); - Thread.sleep(1000); + addAccessCountTableToDeque(secondTableDeque, table2); + Assert.assertTrue(minuteTableDeque.isEmpty()); + + addAccessCountTableToDeque(secondTableDeque, table3); + Assert.assertEquals(1, minuteTableDeque.size()); - Assert.assertTrue(minuteTableDeque.size() == 1); AccessCountTable expected = new AccessCountTable(0L, 60 * oneSec); Assert.assertEquals(minuteTableDeque.poll(), expected); } @Test - public void testHourTableListener() throws InterruptedException { - Long oneMin = 60 * 1000L; + public void testHourTableListener() throws Exception { + long oneMin = 60 * 1000L; TableEvictor tableEvictor = new CountEvictor(adapter, 10); AccessCountTableDeque hourTableDeque = new AccessCountTableDeque(tableEvictor); TableAddOpListener hourTableListener = - new TableAddOpListener.HourTableListener(hourTableDeque, aggregator, executorService); + TableAddOpListener.perHour(hourTableDeque, aggregator, executorService); AccessCountTableDeque minuteTableDeque = new AccessCountTableDeque(tableEvictor, hourTableListener); @@ -80,27 +80,26 @@ public void testHourTableListener() throws InterruptedException { AccessCountTable table3 = new AccessCountTable(59 * oneMin, 60 * oneMin); - minuteTableDeque.addAndNotifyListener(table1); - Assert.assertTrue(hourTableDeque.size() == 0); + addAccessCountTableToDeque(minuteTableDeque, table1); + Assert.assertTrue(hourTableDeque.isEmpty()); - minuteTableDeque.addAndNotifyListener(table2); - Assert.assertTrue(hourTableDeque.size() == 0); + addAccessCountTableToDeque(minuteTableDeque, table2); + Assert.assertTrue(hourTableDeque.isEmpty()); - minuteTableDeque.addAndNotifyListener(table3); - Thread.sleep(1000); + addAccessCountTableToDeque(minuteTableDeque, table3); + Assert.assertEquals(1, hourTableDeque.size()); - Assert.assertTrue(hourTableDeque.size() == 1); AccessCountTable expected = new AccessCountTable(0L, 60 * oneMin); Assert.assertEquals(hourTableDeque.poll(), expected); } @Test - public void testDayTableListener() throws InterruptedException { - Long oneHour = 60 * 60 * 1000L; + public void testDayTableListener() throws Exception { + long oneHour = 60 * 60 * 1000L; TableEvictor tableEvictor = new CountEvictor(adapter, 10); AccessCountTableDeque dayTableDeque = new AccessCountTableDeque(tableEvictor); TableAddOpListener dayTableListener = - new TableAddOpListener.DayTableListener(dayTableDeque, aggregator, executorService); + TableAddOpListener.perDay(dayTableDeque, aggregator, executorService); AccessCountTableDeque hourTableDeque = new AccessCountTableDeque(tableEvictor, dayTableListener); @@ -111,16 +110,15 @@ public void testDayTableListener() throws InterruptedException { AccessCountTable table3 = new AccessCountTable(23 * oneHour, 24 * oneHour); - hourTableDeque.addAndNotifyListener(table1); - Assert.assertTrue(dayTableDeque.size() == 0); + addAccessCountTableToDeque(hourTableDeque, table1); + Assert.assertTrue(dayTableDeque.isEmpty()); - hourTableDeque.addAndNotifyListener(table2); - Assert.assertTrue(dayTableDeque.size() == 0); + addAccessCountTableToDeque(hourTableDeque, table2); + Assert.assertTrue(dayTableDeque.isEmpty()); - hourTableDeque.addAndNotifyListener(table3); - Thread.sleep(1000); + addAccessCountTableToDeque(hourTableDeque, table3); + Assert.assertEquals(1, dayTableDeque.size()); - Assert.assertTrue(dayTableDeque.size() == 1); AccessCountTable today = new AccessCountTable(0L, 24 * oneHour); Assert.assertEquals(dayTableDeque.poll(), today); } diff --git a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableEvictor.java b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableEvictor.java index cc0e8ffda9e..edb34edd246 100644 --- a/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableEvictor.java +++ b/smart-metastore/src/test/java/org/smartdata/metastore/dao/TestTableEvictor.java @@ -17,55 +17,117 @@ */ package org.smartdata.metastore.dao; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.smartdata.metastore.MetaStore; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import static org.mockito.Mockito.mock; +import static org.smartdata.metastore.TestDBUtil.addAccessCountTableToDeque; +import static org.smartdata.metastore.utils.Constants.ONE_SECOND_IN_MILLIS; public class TestTableEvictor { - MetaStore adapter = mock(MetaStore.class); + private MetaStore adapter; + private AccessCountTableAggregator aggregator; + + private ExecutorService executorService; + + @Before + public void setUp() { + adapter = mock(MetaStore.class); + aggregator = new AccessCountTableAggregator(adapter); + executorService = Executors.newSingleThreadExecutor(); + } + + @After + public void shutdown() { + executorService.shutdown(); + } @Test public void testCountEvictor() { - TableEvictor evictor = new CountEvictor(adapter, 3); - AccessCountTableDeque deque = new AccessCountTableDeque(evictor); - AccessCountTable first = new AccessCountTable(0L, 1L); - deque.addAndNotifyListener(first); - Assert.assertTrue(deque.size() == 1); - AccessCountTable second = new AccessCountTable(1L, 2L); - deque.addAndNotifyListener(second); - Assert.assertTrue(deque.size() == 2); - deque.addAndNotifyListener(new AccessCountTable(2L, 3L)); - Assert.assertTrue(deque.size() == 3); - deque.addAndNotifyListener(new AccessCountTable(3L, 4L)); - Assert.assertTrue(deque.size() == 3); - Assert.assertTrue(!deque.contains(first)); - deque.addAndNotifyListener(new AccessCountTable(4L, 5L)); - Assert.assertTrue(deque.size() == 3); - Assert.assertTrue(!deque.contains(second)); + CountEvictor countEvictor = new CountEvictor(adapter, 2); + AccessCountTableDeque tableDeque = new AccessCountTableDeque(countEvictor); + + tableDeque.add(new AccessCountTable(0L, 1L)); + countEvictor.evictTables(tableDeque, 0L); + Assert.assertEquals(1, tableDeque.size()); + + tableDeque.add(new AccessCountTable(1L, 2L)); + countEvictor.evictTables(tableDeque, 0L); + Assert.assertEquals(2, tableDeque.size()); + + tableDeque.add(new AccessCountTable(3L, 4L)); + countEvictor.evictTables(tableDeque, 0L); + Assert.assertEquals(3, tableDeque.size()); + + AccessCountTable firstExpectedTable = new AccessCountTable(4L, 59 * ONE_SECOND_IN_MILLIS); + tableDeque.add(firstExpectedTable); + countEvictor.evictTables(tableDeque, 0L); + Assert.assertEquals(4, tableDeque.size()); + + AccessCountTable secondExpectedTable = new AccessCountTable( + 59 * ONE_SECOND_IN_MILLIS, 60 * ONE_SECOND_IN_MILLIS); + tableDeque.add(secondExpectedTable); + countEvictor.evictTables(tableDeque, ONE_SECOND_IN_MILLIS); + Assert.assertEquals(2, tableDeque.size()); + + Assert.assertTrue(tableDeque.contains(firstExpectedTable)); + Assert.assertTrue(tableDeque.contains(secondExpectedTable)); + } + + @Test + public void testDontEvictIfNotAggregatedYet() throws Exception { + AccessCountTableDeque secondDeque = buildSecondDeque(1); + + addAccessCountTableToDeque(secondDeque, new AccessCountTable(0L, 1L)); + Assert.assertEquals(1, secondDeque.size()); + + addAccessCountTableToDeque(secondDeque, new AccessCountTable(1L, 2L)); + Assert.assertEquals(2, secondDeque.size()); + + addAccessCountTableToDeque(secondDeque, new AccessCountTable(2L, 3L)); + Assert.assertEquals(3, secondDeque.size()); + + AccessCountTable lastFirstMinuteTable = new AccessCountTable( + 59 * ONE_SECOND_IN_MILLIS, 60 * ONE_SECOND_IN_MILLIS); + addAccessCountTableToDeque(secondDeque, lastFirstMinuteTable); + + Assert.assertEquals(1, secondDeque.size()); + Assert.assertTrue(secondDeque.contains(lastFirstMinuteTable)); } @Test - public void testDurationEvictor() { - TableEvictor evictor = new DurationEvictor(adapter, 10); - AccessCountTableDeque deque = new AccessCountTableDeque(evictor); - AccessCountTable first = new AccessCountTable(0L, 3L); - deque.addAndNotifyListener(first); - Assert.assertTrue(deque.size() == 1); - AccessCountTable second = new AccessCountTable(3L, 7L); - deque.addAndNotifyListener(second); - Assert.assertTrue(deque.size() == 2); - deque.addAndNotifyListener(new AccessCountTable(7L, 10L)); - Assert.assertTrue(deque.size() == 3); - deque.addAndNotifyListener(new AccessCountTable(11L, 12L)); - Assert.assertTrue(deque.size() == 3); - Assert.assertTrue(!deque.contains(first)); - deque.addAndNotifyListener(new AccessCountTable(12L, 13L)); - Assert.assertTrue(deque.size() == 4); - Assert.assertTrue(deque.contains(second)); - deque.addAndNotifyListener(new AccessCountTable(13L, 22L)); - Assert.assertTrue(deque.size() == 2); - Assert.assertTrue(!deque.contains(second)); + public void testDontEvictDuringAggregationIfThresholdNotMet() throws Exception { + AccessCountTableDeque secondDeque = buildSecondDeque(10); + + addAccessCountTableToDeque(secondDeque, new AccessCountTable(0L, 1L)); + Assert.assertEquals(1, secondDeque.size()); + + addAccessCountTableToDeque(secondDeque, new AccessCountTable(1L, 2L)); + Assert.assertEquals(2, secondDeque.size()); + + addAccessCountTableToDeque(secondDeque, new AccessCountTable(2L, 3L)); + Assert.assertEquals(3, secondDeque.size()); + + AccessCountTable lastFirstMinuteTable = new AccessCountTable( + 59 * ONE_SECOND_IN_MILLIS, 60 * ONE_SECOND_IN_MILLIS); + addAccessCountTableToDeque(secondDeque, lastFirstMinuteTable); + + Assert.assertEquals(4, secondDeque.size()); + } + + private AccessCountTableDeque buildSecondDeque(int evictThreshold) { + AccessCountTableDeque minuteDeque = new AccessCountTableDeque( + new CountEvictor(adapter, 999)); + TableAddOpListener minuteTableListener = + TableAddOpListener.perMinute(minuteDeque, aggregator, executorService); + + TableEvictor secondEvictor = new CountEvictor(adapter, evictThreshold); + return new AccessCountTableDeque(secondEvictor, minuteTableListener); } }