Skip to content

Commit

Permalink
[ADH-5499] Restart actions that were running at the moment of the clu…
Browse files Browse the repository at this point in the history
…ster shutdown
  • Loading branch information
tigrulya-exe committed Jan 15, 2025
1 parent d55f863 commit c9e2bdc
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.smartdata.model.WhitelistHelper;
import org.smartdata.model.action.ActionScheduler;
import org.smartdata.model.action.ScheduleResult;
import org.smartdata.model.request.CmdletSearchRequest;
import org.smartdata.protocol.message.ActionStatus;
import org.smartdata.protocol.message.ActionStatusFactory;
import org.smartdata.protocol.message.CmdletStatus;
Expand Down Expand Up @@ -249,14 +250,18 @@ public List<ActionScheduler> getSchedulers(String actionName) {
private void loadCmdletsFromDb() throws IOException {
LOG.info("reloading the dispatched and pending cmdlets in DB.");
try {
for (CmdletInfo cmdletInfo : metaStore.getCmdlets(CmdletState.DISPATCHED)) {
CmdletSearchRequest searchRequest = CmdletSearchRequest.builder()
.state(CmdletState.DISPATCHED)
.state(CmdletState.EXECUTING)
.build();

for (CmdletInfo cmdletInfo : cmdletInfoHandler.search(searchRequest)) {
recoverCmdletInfo(cmdletInfo,
actionInfos -> recoverDispatchedActionInfos(cmdletInfo, actionInfos));
}

for (CmdletInfo cmdletInfo : metaStore.getCmdlets(CmdletState.PENDING)) {
recoverCmdletInfo(cmdletInfo, actionInfos -> {
});
recoverCmdletInfo(cmdletInfo, actionInfos -> {});
}
} catch (MetaStoreException e) {
LOG.error("DB connection error occurs when ssm is reloading cmdlets!");
Expand Down Expand Up @@ -304,7 +309,7 @@ private void onActionInfoRecover(ActionInfo actionInfo) {
* Let Scheduler check actioninfo onsubmit and add them to cmdletinfo.
*/
private void checkActionsOnSubmit(CmdletInfo cmdletInfo,
List<ActionInfo> actionInfos) throws IOException {
List<ActionInfo> actionInfos) throws IOException {
for (ActionInfo actionInfo : actionInfos) {
cmdletInfo.addAction(actionInfo.getActionId());
}
Expand Down Expand Up @@ -431,7 +436,7 @@ public long submitCmdlet(CmdletDescriptor cmdletDescriptor) throws IOException {
* Insert cmdletInfo and actions to metastore and cache.
*/
private void syncCmdAction(CmdletInfo cmdletInfo,
List<ActionInfo> actionInfos) {
List<ActionInfo> actionInfos) {
LOG.debug("Cache cmdlet {}", cmdletInfo);
actionInfos.forEach(actionInfoHandler::store);
cmdletInfoHandler.storeUnfinished(cmdletInfo);
Expand All @@ -440,10 +445,12 @@ private void syncCmdAction(CmdletInfo cmdletInfo,
synchronized (pendingCmdlets) {
pendingCmdlets.add(cmdletInfo.getId());
}
} else if (cmdletInfo.getState() == CmdletState.DISPATCHED) {
} else if (cmdletInfo.getState() == CmdletState.DISPATCHED
|| cmdletInfo.getState() == CmdletState.EXECUTING) {
runningCmdlets.add(cmdletInfo.getId());
LaunchCmdlet launchCmdlet = cmdletInfoHandler.createLaunchCmdlet(cmdletInfo);
idToLaunchCmdlets.put(cmdletInfo.getId(), launchCmdlet);
scheduledCmdlets.add(cmdletInfo.getId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,12 @@ public SmartPrincipalManager getPrincipalService() {
@Override
public SearchResult<RuleInfo> search(
RuleSearchRequest searchRequest,
PageRequest<RuleSortField> pageRequest) throws Exception {
PageRequest<RuleSortField> pageRequest) throws IOException {
return ruleInfoHandler.search(searchRequest, pageRequest);
}

@Override
public List<RuleInfo> search(RuleSearchRequest searchRequest) throws Exception {
public List<RuleInfo> search(RuleSearchRequest searchRequest) throws IOException {
return ruleInfoHandler.search(searchRequest);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package org.smartdata.integration;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.smartdata.client.generated.model.ActionDto;
import org.smartdata.client.generated.model.ActionStateDto;
import org.smartdata.client.generated.model.CmdletDto;
import org.smartdata.client.generated.model.CmdletStateDto;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.integration.api.ActionsApiWrapper;
import org.smartdata.integration.api.CmdletsApiWrapper;
import org.smartdata.integration.cluster.SmartCluster;
import org.smartdata.integration.cluster.SmartMiniCluster;
import org.smartdata.metastore.MetaStore;
import org.smartdata.model.ActionInfo;
import org.smartdata.model.CmdletInfo;
import org.smartdata.model.CmdletState;
import org.smartdata.server.SmartServer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;
import static org.smartdata.metastore.utils.MetaStoreUtils.getDBAdapter;

public class TestCmdletsRestart {
private SmartCluster cluster;
private SmartConf conf;
private CmdletsApiWrapper cmdletsApiClient;
private ActionsApiWrapper actionsApiWrapper;

@Before
public void setup() throws Exception {
cluster = new SmartMiniCluster();
cluster.setUp();

conf = cluster.getConf();
conf.setLong(SmartConfKeys.SMART_STATUS_REPORT_PERIOD_KEY, 100);

cmdletsApiClient = new CmdletsApiWrapper();
actionsApiWrapper = new ActionsApiWrapper();
}

@After
public void cleanUp() throws Exception {
if (cluster != null) {
cluster.cleanUp();
}
}

@Test
public void testExecutingCmdletRestartAfterFailure() throws Exception {
long cmdletId = 22L;
long actionId = 77L;

testCmdletRestartAfterFailure(
createTestCmdlet(cmdletId, CmdletState.EXECUTING, actionId),
createTestAction(actionId, cmdletId)
);
}

@Test
public void testPendingCmdletRestartAfterFailure() throws Exception {
long cmdletId = 1L;
long actionId = 2L;

testCmdletRestartAfterFailure(
createTestCmdlet(cmdletId, CmdletState.PENDING, actionId),
createTestAction(actionId, cmdletId)
);
}

@Test
public void testDispatchedCmdletRestartAfterFailure() throws Exception {
long cmdletId = 2L;
long actionId = 1L;

testCmdletRestartAfterFailure(
createTestCmdlet(cmdletId, CmdletState.DISPATCHED, actionId),
createTestAction(actionId, cmdletId)
);
}

@Test
public void testMixedCmdletsRestartAfterFailure() throws Exception {
List<CmdletInfo> cmdlets = Arrays.asList(
createTestCmdlet(1L, CmdletState.PENDING, 1L),
createTestCmdlet(2L, CmdletState.DISPATCHED, 2L),
createTestCmdlet(3L, CmdletState.EXECUTING, 3L),
createTestCmdlet(4L, CmdletState.FAILED, 4L),
createTestCmdlet(5L, CmdletState.CANCELLED, 5L)
);

List<ActionInfo> actions = IntStream.range(1, cmdlets.size() + 1)
.mapToObj(id -> createTestAction(id, id))
.collect(Collectors.toList());

actions.get(3).setFinished(true);
actions.get(4).setFinished(true);

MetaStore metaStore = initMetastore(cmdlets, actions);
try (SmartServer smartServer = new SmartServer(conf, metaStore)) {
smartServer.run();

IntStream.range(1, 4)
.peek(this::checkCmdletDone)
.forEach(this::checkActionDone);

assertEquals(CmdletStateDto.FAILED, cmdletsApiClient.getCmdlet(4L).getState());
assertEquals(CmdletStateDto.CANCELLED, cmdletsApiClient.getCmdlet(5L).getState());

assertEquals(ActionStateDto.FAILED, actionsApiWrapper.getAction(4L).getState());
assertEquals(ActionStateDto.FAILED, actionsApiWrapper.getAction(5L).getState());
}
}

private void testCmdletRestartAfterFailure(
CmdletInfo cmdletInfo, ActionInfo... actionInfos) throws Exception {
MetaStore metaStore = initMetastore(
Collections.singletonList(cmdletInfo), Arrays.asList(actionInfos));

try (SmartServer smartServer = new SmartServer(conf, metaStore)) {
smartServer.run();
checkCmdletDone(cmdletInfo.getId());
for (ActionInfo actionInfo : actionInfos) {
checkActionDone(actionInfo.getActionId());
}
}
}

private MetaStore initMetastore(
List<CmdletInfo> cmdletsToStore,
List<ActionInfo> actionsToStore) throws Exception {
MetaStore metaStore = getDBAdapter(new SmartConf());
metaStore.checkTables();

metaStore.cmdletDao().upsert(cmdletsToStore);
metaStore.actionDao().upsert(actionsToStore);

return metaStore;
}

private void checkCmdletDone(long cmdletId) {
CmdletDto cmdletDto = cmdletsApiClient.waitTillCmdletFinished(
cmdletId,
Duration.ofMillis(500),
Duration.ofSeconds(5));
assertEquals(CmdletStateDto.DONE, cmdletDto.getState());
}

private void checkActionDone(long actionId) {
ActionDto actionDto = actionsApiWrapper.waitTillActionFinished(
actionId,
Duration.ofMillis(500),
Duration.ofSeconds(5));
assertEquals(ActionStateDto.SUCCESSFUL, actionDto.getState());
}

private CmdletInfo createTestCmdlet(long id, CmdletState state, Long... actionIds) {
return CmdletInfo.builder()
.setId(id)
.setState(state)
.setParameters("write -file /test" + id)
.setActionIds(Arrays.asList(actionIds))
.setGenerateTime(System.currentTimeMillis())
.build();
}

private ActionInfo createTestAction(long id, long cmdletId) {
Map<String, String> actionArgs = new HashMap<>();
actionArgs.put("-file", "/test" + cmdletId);

return ActionInfo.builder()
.setActionId(id)
.setCmdletId(cmdletId)
.setActionName("write")
.setCreateTime(System.currentTimeMillis())
.setArgs(actionArgs)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import org.smartdata.metastore.queries.PageRequest;
import org.smartdata.metastore.queries.sort.SortField;

import java.io.IOException;
import java.util.List;

public interface Searchable<RequestT, EntityT, ColumnT extends SortField> {
SearchResult<EntityT> search(
RequestT searchRequest, PageRequest<ColumnT> pageRequest) throws Exception;
RequestT searchRequest, PageRequest<ColumnT> pageRequest) throws IOException;

List<EntityT> search(RequestT searchRequest) throws Exception;
List<EntityT> search(RequestT searchRequest) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.smartdata.metastore.queries.PageRequest;
import org.smartdata.metastore.queries.sort.SortField;

import java.io.IOException;
import java.util.List;

import static org.smartdata.metastore.utils.MetaStoreUtils.logAndBuildMetastoreException;
Expand All @@ -37,7 +38,7 @@ public class SearchableService<RequestT, EntityT, ColumnT extends SortField>

@Override
public SearchResult<EntityT> search(RequestT searchRequest, PageRequest<ColumnT> pageRequest)
throws Exception {
throws IOException {
try {
return dbDelegate.search(searchRequest, pageRequest);
} catch (Exception exception) {
Expand All @@ -46,7 +47,7 @@ public SearchResult<EntityT> search(RequestT searchRequest, PageRequest<ColumnT>
}

@Override
public List<EntityT> search(RequestT searchRequest) throws Exception {
public List<EntityT> search(RequestT searchRequest) throws IOException {
try {
return dbDelegate.search(searchRequest);
} catch (Exception exception) {
Expand Down
19 changes: 12 additions & 7 deletions smart-server/src/main/java/org/smartdata/server/SmartServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -55,7 +56,7 @@
/**
* From this Smart Storage Management begins.
*/
public class SmartServer {
public class SmartServer implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(SmartServer.class);
public static final Tags SMART_SERVER_BASE_TAGS = Tags.of("service", "smart-server");

Expand All @@ -71,12 +72,13 @@ public class SmartServer {
SLF4JBridgeHandler.install();
}

public SmartServer(SmartConf conf) {
public SmartServer(SmartConf conf, MetaStore metaStore) throws IOException {
this.conf = conf;
this.enabled = false;
initWith(metaStore);
}

public void initWith(MetaStore metaStore) throws Exception {
private void initWith(MetaStore metaStore) throws IOException {
LOG.info("Start Init Smart Server");

HadoopUtil.setSmartConfByHadoop(conf);
Expand Down Expand Up @@ -108,7 +110,6 @@ public ClusterNodesManager getClusterNodesManager() {
return engine.getClusterNodesManager();
}


public MetaStore getMetaStore() {
return this.context.getMetaStore();
}
Expand Down Expand Up @@ -175,9 +176,8 @@ static SmartServer processWith(StartupOption startOption, SmartConf conf) throws
metaStore.checkTables();
}

SmartServer ssm = new SmartServer(conf);
SmartServer ssm = new SmartServer(conf, metaStore);
try {
ssm.initWith(metaStore);
ssm.run();
return ssm;
} catch (Exception e) {
Expand Down Expand Up @@ -225,7 +225,7 @@ private static boolean parseHelpArgument(String[] args,
*
* @throws Exception
*/
private void run() throws Exception {
public void run() throws Exception {
boolean enabled = conf.getBoolean(SmartConfKeys.SMART_DFS_ENABLED,
SmartConfKeys.SMART_DFS_ENABLED_DEFAULT);

Expand Down Expand Up @@ -300,6 +300,11 @@ public void shutdown() {
}
}

@Override
public void close() throws Exception {
shutdown();
}

private enum StartupOption {
FORMAT("-format"),
REGULAR("-regular");
Expand Down

0 comments on commit c9e2bdc

Please sign in to comment.