Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADH-5499] Restart cmdlets that were running at the moment of the cluster shutdown #145

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.smartdata.model.WhitelistHelper;
import org.smartdata.model.action.ActionScheduler;
import org.smartdata.model.action.ScheduleResult;
import org.smartdata.model.request.CmdletSearchRequest;
import org.smartdata.protocol.message.ActionStatus;
import org.smartdata.protocol.message.ActionStatusFactory;
import org.smartdata.protocol.message.CmdletStatus;
Expand Down Expand Up @@ -249,14 +250,18 @@ public List<ActionScheduler> getSchedulers(String actionName) {
private void loadCmdletsFromDb() throws IOException {
LOG.info("reloading the dispatched and pending cmdlets in DB.");
try {
for (CmdletInfo cmdletInfo : metaStore.getCmdlets(CmdletState.DISPATCHED)) {
CmdletSearchRequest searchRequest = CmdletSearchRequest.builder()
.state(CmdletState.DISPATCHED)
.state(CmdletState.EXECUTING)
.build();

for (CmdletInfo cmdletInfo : cmdletInfoHandler.search(searchRequest)) {
recoverCmdletInfo(cmdletInfo,
actionInfos -> recoverDispatchedActionInfos(cmdletInfo, actionInfos));
}

for (CmdletInfo cmdletInfo : metaStore.getCmdlets(CmdletState.PENDING)) {
recoverCmdletInfo(cmdletInfo, actionInfos -> {
});
recoverCmdletInfo(cmdletInfo, actionInfos -> {});
}
} catch (MetaStoreException e) {
LOG.error("DB connection error occurs when ssm is reloading cmdlets!");
Expand Down Expand Up @@ -304,7 +309,7 @@ private void onActionInfoRecover(ActionInfo actionInfo) {
* Let Scheduler check actioninfo onsubmit and add them to cmdletinfo.
*/
private void checkActionsOnSubmit(CmdletInfo cmdletInfo,
List<ActionInfo> actionInfos) throws IOException {
List<ActionInfo> actionInfos) throws IOException {
for (ActionInfo actionInfo : actionInfos) {
cmdletInfo.addAction(actionInfo.getActionId());
}
Expand Down Expand Up @@ -431,7 +436,7 @@ public long submitCmdlet(CmdletDescriptor cmdletDescriptor) throws IOException {
* Insert cmdletInfo and actions to metastore and cache.
*/
private void syncCmdAction(CmdletInfo cmdletInfo,
List<ActionInfo> actionInfos) {
List<ActionInfo> actionInfos) {
LOG.debug("Cache cmdlet {}", cmdletInfo);
actionInfos.forEach(actionInfoHandler::store);
cmdletInfoHandler.storeUnfinished(cmdletInfo);
Expand All @@ -440,10 +445,12 @@ private void syncCmdAction(CmdletInfo cmdletInfo,
synchronized (pendingCmdlets) {
pendingCmdlets.add(cmdletInfo.getId());
}
} else if (cmdletInfo.getState() == CmdletState.DISPATCHED) {
} else if (cmdletInfo.getState() == CmdletState.DISPATCHED
|| cmdletInfo.getState() == CmdletState.EXECUTING) {
runningCmdlets.add(cmdletInfo.getId());
LaunchCmdlet launchCmdlet = cmdletInfoHandler.createLaunchCmdlet(cmdletInfo);
idToLaunchCmdlets.put(cmdletInfo.getId(), launchCmdlet);
scheduledCmdlets.add(cmdletInfo.getId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,12 @@ public SmartPrincipalManager getPrincipalService() {
@Override
public SearchResult<RuleInfo> search(
RuleSearchRequest searchRequest,
PageRequest<RuleSortField> pageRequest) throws Exception {
PageRequest<RuleSortField> pageRequest) throws IOException {
return ruleInfoHandler.search(searchRequest, pageRequest);
}

@Override
public List<RuleInfo> search(RuleSearchRequest searchRequest) throws Exception {
public List<RuleInfo> search(RuleSearchRequest searchRequest) throws IOException {
return ruleInfoHandler.search(searchRequest);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.integration;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.smartdata.client.generated.model.ActionDto;
import org.smartdata.client.generated.model.ActionStateDto;
import org.smartdata.client.generated.model.CmdletDto;
import org.smartdata.client.generated.model.CmdletStateDto;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.integration.api.ActionsApiWrapper;
import org.smartdata.integration.api.CmdletsApiWrapper;
import org.smartdata.integration.cluster.SmartCluster;
import org.smartdata.integration.cluster.SmartMiniCluster;
import org.smartdata.metastore.MetaStore;
import org.smartdata.model.ActionInfo;
import org.smartdata.model.CmdletInfo;
import org.smartdata.model.CmdletState;
import org.smartdata.server.SmartServer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;
import static org.smartdata.metastore.utils.MetaStoreUtils.getDBAdapter;

public class TestCmdletsRestart {
private SmartCluster cluster;
private SmartConf conf;
private CmdletsApiWrapper cmdletsApiClient;
private ActionsApiWrapper actionsApiWrapper;

@Before
public void setup() throws Exception {
cluster = new SmartMiniCluster();
cluster.setUp();

conf = cluster.getConf();
conf.setLong(SmartConfKeys.SMART_STATUS_REPORT_PERIOD_KEY, 100);

cmdletsApiClient = new CmdletsApiWrapper();
actionsApiWrapper = new ActionsApiWrapper();
}

@After
public void cleanUp() throws Exception {
if (cluster != null) {
cluster.cleanUp();
}
}

@Test
public void testExecutingCmdletRestartAfterFailure() throws Exception {
long cmdletId = 22L;
long actionId = 77L;

testCmdletRestartAfterFailure(
createTestCmdlet(cmdletId, CmdletState.EXECUTING, actionId),
createTestAction(actionId, cmdletId)
);
}

@Test
public void testPendingCmdletRestartAfterFailure() throws Exception {
long cmdletId = 1L;
long actionId = 2L;

testCmdletRestartAfterFailure(
createTestCmdlet(cmdletId, CmdletState.PENDING, actionId),
createTestAction(actionId, cmdletId)
);
}

@Test
public void testDispatchedCmdletRestartAfterFailure() throws Exception {
long cmdletId = 2L;
long actionId = 1L;

testCmdletRestartAfterFailure(
createTestCmdlet(cmdletId, CmdletState.DISPATCHED, actionId),
createTestAction(actionId, cmdletId)
);
}

@Test
public void testMixedCmdletsRestartAfterFailure() throws Exception {
List<CmdletInfo> cmdlets = Arrays.asList(
createTestCmdlet(1L, CmdletState.PENDING, 1L),
createTestCmdlet(2L, CmdletState.DISPATCHED, 2L),
createTestCmdlet(3L, CmdletState.EXECUTING, 3L),
createTestCmdlet(4L, CmdletState.FAILED, 4L),
createTestCmdlet(5L, CmdletState.CANCELLED, 5L)
);

List<ActionInfo> actions = IntStream.range(1, cmdlets.size() + 1)
.mapToObj(id -> createTestAction(id, id))
.collect(Collectors.toList());

actions.get(3).setFinished(true);
actions.get(4).setFinished(true);

MetaStore metaStore = initMetastore(cmdlets, actions);
try (SmartServer smartServer = new SmartServer(conf, metaStore)) {
smartServer.run();

IntStream.range(1, 4)
.peek(this::checkCmdletDone)
.forEach(this::checkActionDone);

assertEquals(CmdletStateDto.FAILED, cmdletsApiClient.getCmdlet(4L).getState());
assertEquals(CmdletStateDto.CANCELLED, cmdletsApiClient.getCmdlet(5L).getState());

assertEquals(ActionStateDto.FAILED, actionsApiWrapper.getAction(4L).getState());
assertEquals(ActionStateDto.FAILED, actionsApiWrapper.getAction(5L).getState());
}
}

private void testCmdletRestartAfterFailure(
CmdletInfo cmdletInfo, ActionInfo... actionInfos) throws Exception {
MetaStore metaStore = initMetastore(
Collections.singletonList(cmdletInfo), Arrays.asList(actionInfos));

try (SmartServer smartServer = new SmartServer(conf, metaStore)) {
smartServer.run();
checkCmdletDone(cmdletInfo.getId());
for (ActionInfo actionInfo : actionInfos) {
checkActionDone(actionInfo.getActionId());
}
}
}

private MetaStore initMetastore(
List<CmdletInfo> cmdletsToStore,
List<ActionInfo> actionsToStore) throws Exception {
MetaStore metaStore = getDBAdapter(new SmartConf());
metaStore.checkTables();

metaStore.cmdletDao().upsert(cmdletsToStore);
metaStore.actionDao().upsert(actionsToStore);

return metaStore;
}

private void checkCmdletDone(long cmdletId) {
CmdletDto cmdletDto = cmdletsApiClient.waitTillCmdletFinished(
cmdletId,
Duration.ofMillis(500),
Duration.ofSeconds(5));
assertEquals(CmdletStateDto.DONE, cmdletDto.getState());
}

private void checkActionDone(long actionId) {
ActionDto actionDto = actionsApiWrapper.waitTillActionFinished(
actionId,
Duration.ofMillis(500),
Duration.ofSeconds(5));
assertEquals(ActionStateDto.SUCCESSFUL, actionDto.getState());
}

private CmdletInfo createTestCmdlet(long id, CmdletState state, Long... actionIds) {
return CmdletInfo.builder()
.setId(id)
.setState(state)
.setParameters("write -file /test" + id)
.setActionIds(Arrays.asList(actionIds))
.setGenerateTime(System.currentTimeMillis())
.build();
}

private ActionInfo createTestAction(long id, long cmdletId) {
Map<String, String> actionArgs = new HashMap<>();
actionArgs.put("-file", "/test" + cmdletId);

return ActionInfo.builder()
.setActionId(id)
.setCmdletId(cmdletId)
.setActionName("write")
.setCreateTime(System.currentTimeMillis())
.setArgs(actionArgs)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import org.smartdata.metastore.queries.PageRequest;
import org.smartdata.metastore.queries.sort.SortField;

import java.io.IOException;
import java.util.List;

public interface Searchable<RequestT, EntityT, ColumnT extends SortField> {
SearchResult<EntityT> search(
RequestT searchRequest, PageRequest<ColumnT> pageRequest) throws Exception;
RequestT searchRequest, PageRequest<ColumnT> pageRequest) throws IOException;

List<EntityT> search(RequestT searchRequest) throws Exception;
List<EntityT> search(RequestT searchRequest) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.smartdata.metastore.queries.PageRequest;
import org.smartdata.metastore.queries.sort.SortField;

import java.io.IOException;
import java.util.List;

import static org.smartdata.metastore.utils.MetaStoreUtils.logAndBuildMetastoreException;
Expand All @@ -37,7 +38,7 @@ public class SearchableService<RequestT, EntityT, ColumnT extends SortField>

@Override
public SearchResult<EntityT> search(RequestT searchRequest, PageRequest<ColumnT> pageRequest)
throws Exception {
throws IOException {
try {
return dbDelegate.search(searchRequest, pageRequest);
} catch (Exception exception) {
Expand All @@ -46,7 +47,7 @@ public SearchResult<EntityT> search(RequestT searchRequest, PageRequest<ColumnT>
}

@Override
public List<EntityT> search(RequestT searchRequest) throws Exception {
public List<EntityT> search(RequestT searchRequest) throws IOException {
try {
return dbDelegate.search(searchRequest);
} catch (Exception exception) {
Expand Down
18 changes: 6 additions & 12 deletions smart-rule/src/main/antlr4/org/smartdata/rule/parser/SmartRule.g4
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ boolvalue

compareexpr
: numricexpr OPCMP numricexpr #cmpIdLong
| stringexpr OPEQCMP stringexpr #cmpIdString
| numricexpr OPEQ numricexpr #cmpEqIdLong
| stringexpr OPEQ stringexpr #cmpIdString
| stringexpr MATCHES stringexpr #cmpIdStringMatches
| timeintvalexpr OPCMP timeintvalexpr #cmpTimeintvalTimeintval
| timeintvalexpr OPEQ timeintvalexpr #cmpEqTimeintvalTimeintval
| timepointexpr OPCMP timepointexpr #cmpTimepointTimePoint
| timepointexpr OPEQ timepointexpr #cmpEqTimepointTimePoint
;

timeintvalexpr
Expand Down Expand Up @@ -128,27 +131,18 @@ id
| OBJECTTYPE '.' ID '(' constexpr (',' constexpr)* ')' #idObjAttPara
;

OPEQCMP
OPEQ
: '=='
| '!='
;

OPCMP
: OPEQCMP
| '>'
: '>'
| '<'
| '>='
| '<='
;

opr
: '*'
| '/'
| '+'
| '-'
| '%'
;

fileEvent
: FILECREATE
| FILECLOSE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void testValidRule() throws Exception {
rules.add("file : every 1s | storagePolicy == \"ALL_SSD\" | cache");
rules.add("file : accessCount(10min) < 20 | uncache");
rules.add("file : accessCount(10min) == 0 | uncache");
rules.add("file : accessCount(10min) != 0 | uncache");
rules.add("file : accessCount(10min) <= 1 | uncache");
rules.add("file : accessCount(1min) > 5 | cache -replica 2");
rules.add("file : age <= 1 | echo -msg \"crul world\"");
Expand Down
Loading
Loading