diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java index 5990368752..3e4296a2f6 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java @@ -54,6 +54,7 @@ import org.smartdata.model.WhitelistHelper; import org.smartdata.model.action.ActionScheduler; import org.smartdata.model.action.ScheduleResult; +import org.smartdata.model.request.CmdletSearchRequest; import org.smartdata.protocol.message.ActionStatus; import org.smartdata.protocol.message.ActionStatusFactory; import org.smartdata.protocol.message.CmdletStatus; @@ -249,14 +250,18 @@ public List getSchedulers(String actionName) { private void loadCmdletsFromDb() throws IOException { LOG.info("reloading the dispatched and pending cmdlets in DB."); try { - for (CmdletInfo cmdletInfo : metaStore.getCmdlets(CmdletState.DISPATCHED)) { + CmdletSearchRequest searchRequest = CmdletSearchRequest.builder() + .state(CmdletState.DISPATCHED) + .state(CmdletState.EXECUTING) + .build(); + + for (CmdletInfo cmdletInfo : cmdletInfoHandler.search(searchRequest)) { recoverCmdletInfo(cmdletInfo, actionInfos -> recoverDispatchedActionInfos(cmdletInfo, actionInfos)); } for (CmdletInfo cmdletInfo : metaStore.getCmdlets(CmdletState.PENDING)) { - recoverCmdletInfo(cmdletInfo, actionInfos -> { - }); + recoverCmdletInfo(cmdletInfo, actionInfos -> {}); } } catch (MetaStoreException e) { LOG.error("DB connection error occurs when ssm is reloading cmdlets!"); @@ -304,7 +309,7 @@ private void onActionInfoRecover(ActionInfo actionInfo) { * Let Scheduler check actioninfo onsubmit and add them to cmdletinfo. */ private void checkActionsOnSubmit(CmdletInfo cmdletInfo, - List actionInfos) throws IOException { + List actionInfos) throws IOException { for (ActionInfo actionInfo : actionInfos) { cmdletInfo.addAction(actionInfo.getActionId()); } @@ -431,7 +436,7 @@ public long submitCmdlet(CmdletDescriptor cmdletDescriptor) throws IOException { * Insert cmdletInfo and actions to metastore and cache. */ private void syncCmdAction(CmdletInfo cmdletInfo, - List actionInfos) { + List actionInfos) { LOG.debug("Cache cmdlet {}", cmdletInfo); actionInfos.forEach(actionInfoHandler::store); cmdletInfoHandler.storeUnfinished(cmdletInfo); @@ -440,10 +445,12 @@ private void syncCmdAction(CmdletInfo cmdletInfo, synchronized (pendingCmdlets) { pendingCmdlets.add(cmdletInfo.getId()); } - } else if (cmdletInfo.getState() == CmdletState.DISPATCHED) { + } else if (cmdletInfo.getState() == CmdletState.DISPATCHED + || cmdletInfo.getState() == CmdletState.EXECUTING) { runningCmdlets.add(cmdletInfo.getId()); LaunchCmdlet launchCmdlet = cmdletInfoHandler.createLaunchCmdlet(cmdletInfo); idToLaunchCmdlets.put(cmdletInfo.getId(), launchCmdlet); + scheduledCmdlets.add(cmdletInfo.getId()); } } diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/RuleManager.java b/smart-engine/src/main/java/org/smartdata/server/engine/RuleManager.java index 97e43e3028..10fedbb239 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/RuleManager.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/RuleManager.java @@ -370,12 +370,12 @@ public SmartPrincipalManager getPrincipalService() { @Override public SearchResult search( RuleSearchRequest searchRequest, - PageRequest pageRequest) throws Exception { + PageRequest pageRequest) throws IOException { return ruleInfoHandler.search(searchRequest, pageRequest); } @Override - public List search(RuleSearchRequest searchRequest) throws Exception { + public List search(RuleSearchRequest searchRequest) throws IOException { return ruleInfoHandler.search(searchRequest); } } diff --git a/smart-integration/src/test/java/org/smartdata/integration/TestCmdletsRestart.java b/smart-integration/src/test/java/org/smartdata/integration/TestCmdletsRestart.java new file mode 100644 index 0000000000..a1ac803117 --- /dev/null +++ b/smart-integration/src/test/java/org/smartdata/integration/TestCmdletsRestart.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.integration; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.smartdata.client.generated.model.ActionDto; +import org.smartdata.client.generated.model.ActionStateDto; +import org.smartdata.client.generated.model.CmdletDto; +import org.smartdata.client.generated.model.CmdletStateDto; +import org.smartdata.conf.SmartConf; +import org.smartdata.conf.SmartConfKeys; +import org.smartdata.integration.api.ActionsApiWrapper; +import org.smartdata.integration.api.CmdletsApiWrapper; +import org.smartdata.integration.cluster.SmartCluster; +import org.smartdata.integration.cluster.SmartMiniCluster; +import org.smartdata.metastore.MetaStore; +import org.smartdata.model.ActionInfo; +import org.smartdata.model.CmdletInfo; +import org.smartdata.model.CmdletState; +import org.smartdata.server.SmartServer; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.smartdata.metastore.utils.MetaStoreUtils.getDBAdapter; + +public class TestCmdletsRestart { + private SmartCluster cluster; + private SmartConf conf; + private CmdletsApiWrapper cmdletsApiClient; + private ActionsApiWrapper actionsApiWrapper; + + @Before + public void setup() throws Exception { + cluster = new SmartMiniCluster(); + cluster.setUp(); + + conf = cluster.getConf(); + conf.setLong(SmartConfKeys.SMART_STATUS_REPORT_PERIOD_KEY, 100); + + cmdletsApiClient = new CmdletsApiWrapper(); + actionsApiWrapper = new ActionsApiWrapper(); + } + + @After + public void cleanUp() throws Exception { + if (cluster != null) { + cluster.cleanUp(); + } + } + + @Test + public void testExecutingCmdletRestartAfterFailure() throws Exception { + long cmdletId = 22L; + long actionId = 77L; + + testCmdletRestartAfterFailure( + createTestCmdlet(cmdletId, CmdletState.EXECUTING, actionId), + createTestAction(actionId, cmdletId) + ); + } + + @Test + public void testPendingCmdletRestartAfterFailure() throws Exception { + long cmdletId = 1L; + long actionId = 2L; + + testCmdletRestartAfterFailure( + createTestCmdlet(cmdletId, CmdletState.PENDING, actionId), + createTestAction(actionId, cmdletId) + ); + } + + @Test + public void testDispatchedCmdletRestartAfterFailure() throws Exception { + long cmdletId = 2L; + long actionId = 1L; + + testCmdletRestartAfterFailure( + createTestCmdlet(cmdletId, CmdletState.DISPATCHED, actionId), + createTestAction(actionId, cmdletId) + ); + } + + @Test + public void testMixedCmdletsRestartAfterFailure() throws Exception { + List cmdlets = Arrays.asList( + createTestCmdlet(1L, CmdletState.PENDING, 1L), + createTestCmdlet(2L, CmdletState.DISPATCHED, 2L), + createTestCmdlet(3L, CmdletState.EXECUTING, 3L), + createTestCmdlet(4L, CmdletState.FAILED, 4L), + createTestCmdlet(5L, CmdletState.CANCELLED, 5L) + ); + + List actions = IntStream.range(1, cmdlets.size() + 1) + .mapToObj(id -> createTestAction(id, id)) + .collect(Collectors.toList()); + + actions.get(3).setFinished(true); + actions.get(4).setFinished(true); + + MetaStore metaStore = initMetastore(cmdlets, actions); + try (SmartServer smartServer = new SmartServer(conf, metaStore)) { + smartServer.run(); + + IntStream.range(1, 4) + .peek(this::checkCmdletDone) + .forEach(this::checkActionDone); + + assertEquals(CmdletStateDto.FAILED, cmdletsApiClient.getCmdlet(4L).getState()); + assertEquals(CmdletStateDto.CANCELLED, cmdletsApiClient.getCmdlet(5L).getState()); + + assertEquals(ActionStateDto.FAILED, actionsApiWrapper.getAction(4L).getState()); + assertEquals(ActionStateDto.FAILED, actionsApiWrapper.getAction(5L).getState()); + } + } + + private void testCmdletRestartAfterFailure( + CmdletInfo cmdletInfo, ActionInfo... actionInfos) throws Exception { + MetaStore metaStore = initMetastore( + Collections.singletonList(cmdletInfo), Arrays.asList(actionInfos)); + + try (SmartServer smartServer = new SmartServer(conf, metaStore)) { + smartServer.run(); + checkCmdletDone(cmdletInfo.getId()); + for (ActionInfo actionInfo : actionInfos) { + checkActionDone(actionInfo.getActionId()); + } + } + } + + private MetaStore initMetastore( + List cmdletsToStore, + List actionsToStore) throws Exception { + MetaStore metaStore = getDBAdapter(new SmartConf()); + metaStore.checkTables(); + + metaStore.cmdletDao().upsert(cmdletsToStore); + metaStore.actionDao().upsert(actionsToStore); + + return metaStore; + } + + private void checkCmdletDone(long cmdletId) { + CmdletDto cmdletDto = cmdletsApiClient.waitTillCmdletFinished( + cmdletId, + Duration.ofMillis(500), + Duration.ofSeconds(5)); + assertEquals(CmdletStateDto.DONE, cmdletDto.getState()); + } + + private void checkActionDone(long actionId) { + ActionDto actionDto = actionsApiWrapper.waitTillActionFinished( + actionId, + Duration.ofMillis(500), + Duration.ofSeconds(5)); + assertEquals(ActionStateDto.SUCCESSFUL, actionDto.getState()); + } + + private CmdletInfo createTestCmdlet(long id, CmdletState state, Long... actionIds) { + return CmdletInfo.builder() + .setId(id) + .setState(state) + .setParameters("write -file /test" + id) + .setActionIds(Arrays.asList(actionIds)) + .setGenerateTime(System.currentTimeMillis()) + .build(); + } + + private ActionInfo createTestAction(long id, long cmdletId) { + Map actionArgs = new HashMap<>(); + actionArgs.put("-file", "/test" + cmdletId); + + return ActionInfo.builder() + .setActionId(id) + .setCmdletId(cmdletId) + .setActionName("write") + .setCreateTime(System.currentTimeMillis()) + .setArgs(actionArgs) + .build(); + } +} diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/Searchable.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/Searchable.java index a4b60a5374..6a3e5c1a0d 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/Searchable.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/Searchable.java @@ -21,11 +21,12 @@ import org.smartdata.metastore.queries.PageRequest; import org.smartdata.metastore.queries.sort.SortField; +import java.io.IOException; import java.util.List; public interface Searchable { SearchResult search( - RequestT searchRequest, PageRequest pageRequest) throws Exception; + RequestT searchRequest, PageRequest pageRequest) throws IOException; - List search(RequestT searchRequest) throws Exception; + List search(RequestT searchRequest) throws IOException; } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/SearchableService.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/SearchableService.java index 3fa9c9fb0b..f16276ef12 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/SearchableService.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/SearchableService.java @@ -23,6 +23,7 @@ import org.smartdata.metastore.queries.PageRequest; import org.smartdata.metastore.queries.sort.SortField; +import java.io.IOException; import java.util.List; import static org.smartdata.metastore.utils.MetaStoreUtils.logAndBuildMetastoreException; @@ -37,7 +38,7 @@ public class SearchableService @Override public SearchResult search(RequestT searchRequest, PageRequest pageRequest) - throws Exception { + throws IOException { try { return dbDelegate.search(searchRequest, pageRequest); } catch (Exception exception) { @@ -46,7 +47,7 @@ public SearchResult search(RequestT searchRequest, PageRequest } @Override - public List search(RequestT searchRequest) throws Exception { + public List search(RequestT searchRequest) throws IOException { try { return dbDelegate.search(searchRequest); } catch (Exception exception) { diff --git a/smart-rule/src/main/antlr4/org/smartdata/rule/parser/SmartRule.g4 b/smart-rule/src/main/antlr4/org/smartdata/rule/parser/SmartRule.g4 index 985d530cf6..fa0d105a33 100644 --- a/smart-rule/src/main/antlr4/org/smartdata/rule/parser/SmartRule.g4 +++ b/smart-rule/src/main/antlr4/org/smartdata/rule/parser/SmartRule.g4 @@ -56,10 +56,13 @@ boolvalue compareexpr : numricexpr OPCMP numricexpr #cmpIdLong - | stringexpr OPEQCMP stringexpr #cmpIdString + | numricexpr OPEQ numricexpr #cmpEqIdLong + | stringexpr OPEQ stringexpr #cmpIdString | stringexpr MATCHES stringexpr #cmpIdStringMatches | timeintvalexpr OPCMP timeintvalexpr #cmpTimeintvalTimeintval + | timeintvalexpr OPEQ timeintvalexpr #cmpEqTimeintvalTimeintval | timepointexpr OPCMP timepointexpr #cmpTimepointTimePoint + | timepointexpr OPEQ timepointexpr #cmpEqTimepointTimePoint ; timeintvalexpr @@ -128,27 +131,18 @@ id | OBJECTTYPE '.' ID '(' constexpr (',' constexpr)* ')' #idObjAttPara ; -OPEQCMP +OPEQ : '==' | '!=' ; OPCMP - : OPEQCMP - | '>' + : '>' | '<' | '>=' | '<=' ; -opr - : '*' - | '/' - | '+' - | '-' - | '%' - ; - fileEvent : FILECREATE | FILECLOSE diff --git a/smart-rule/src/test/java/org/smartdata/rule/TestSmartRuleParser.java b/smart-rule/src/test/java/org/smartdata/rule/TestSmartRuleParser.java index 8acc3f5cef..548abab932 100644 --- a/smart-rule/src/test/java/org/smartdata/rule/TestSmartRuleParser.java +++ b/smart-rule/src/test/java/org/smartdata/rule/TestSmartRuleParser.java @@ -79,6 +79,7 @@ public void testValidRule() throws Exception { rules.add("file : every 1s | storagePolicy == \"ALL_SSD\" | cache"); rules.add("file : accessCount(10min) < 20 | uncache"); rules.add("file : accessCount(10min) == 0 | uncache"); + rules.add("file : accessCount(10min) != 0 | uncache"); rules.add("file : accessCount(10min) <= 1 | uncache"); rules.add("file : accessCount(1min) > 5 | cache -replica 2"); rules.add("file : age <= 1 | echo -msg \"crul world\""); diff --git a/smart-server/src/main/java/org/smartdata/server/SmartServer.java b/smart-server/src/main/java/org/smartdata/server/SmartServer.java index a00e17bab6..4811ed82a1 100644 --- a/smart-server/src/main/java/org/smartdata/server/SmartServer.java +++ b/smart-server/src/main/java/org/smartdata/server/SmartServer.java @@ -44,6 +44,7 @@ import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.List; @@ -55,7 +56,7 @@ /** * From this Smart Storage Management begins. */ -public class SmartServer { +public class SmartServer implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(SmartServer.class); public static final Tags SMART_SERVER_BASE_TAGS = Tags.of("service", "smart-server"); @@ -71,12 +72,13 @@ public class SmartServer { SLF4JBridgeHandler.install(); } - public SmartServer(SmartConf conf) { + public SmartServer(SmartConf conf, MetaStore metaStore) throws IOException { this.conf = conf; this.enabled = false; + initWith(metaStore); } - public void initWith(MetaStore metaStore) throws Exception { + private void initWith(MetaStore metaStore) throws IOException { LOG.info("Start Init Smart Server"); HadoopUtil.setSmartConfByHadoop(conf); @@ -108,7 +110,6 @@ public ClusterNodesManager getClusterNodesManager() { return engine.getClusterNodesManager(); } - public MetaStore getMetaStore() { return this.context.getMetaStore(); } @@ -175,9 +176,8 @@ static SmartServer processWith(StartupOption startOption, SmartConf conf) throws metaStore.checkTables(); } - SmartServer ssm = new SmartServer(conf); + SmartServer ssm = new SmartServer(conf, metaStore); try { - ssm.initWith(metaStore); ssm.run(); return ssm; } catch (Exception e) { @@ -225,7 +225,7 @@ private static boolean parseHelpArgument(String[] args, * * @throws Exception */ - private void run() throws Exception { + public void run() throws Exception { boolean enabled = conf.getBoolean(SmartConfKeys.SMART_DFS_ENABLED, SmartConfKeys.SMART_DFS_ENABLED_DEFAULT); @@ -300,6 +300,11 @@ public void shutdown() { } } + @Override + public void close() throws Exception { + shutdown(); + } + private enum StartupOption { FORMAT("-format"), REGULAR("-regular");