Skip to content

Commit

Permalink
Merge pull request #92 from arenadata/feature/ADH-4853-reuse-dfs-clients
Browse files Browse the repository at this point in the history
[ADH-4853] Add DFS client cache for actions
  • Loading branch information
iamlapa authored Aug 16, 2024
2 parents c35286c + f00b45c commit 893ab82
Show file tree
Hide file tree
Showing 35 changed files with 456 additions and 290 deletions.
10 changes: 10 additions & 0 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -549,4 +549,14 @@
</description>
</property>

<property>
<name>smart.action.client.cache.ttl</name>
<value>10m</value>
<description>
The minimum amount of time after the last access to the DFS client cache entry
that must pass in order for the entry to be evicted.
Should be in the format '[Amount][TimeUnit]', where TimeUnit is one
of 'day' or 'd', 'hour' or 'h', 'min' or 'm', 'sec' or 's'.
</description>
</property>
</configuration>
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
<skip-build-web-ui>true</skip-build-web-ui>
<hamcrest.version>1.3</hamcrest.version>
<rest-assured.version>4.5.1</rest-assured.version>
<cache2k.version>2.6.1.Final</cache2k.version>
<smart.hadoop.artifact>smart-hadoop-3.2</smart.hadoop.artifact>
<smart.hadoop.client.artifact>smart-hadoop-client-3</smart.hadoop.client.artifact>
</properties>
Expand Down
5 changes: 5 additions & 0 deletions smart-action/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
64 changes: 0 additions & 64 deletions smart-action/src/main/java/org/smartdata/action/ActionType.java

This file was deleted.

106 changes: 23 additions & 83 deletions smart-action/src/main/java/org/smartdata/action/SmartAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.smartdata.action;

import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
Expand All @@ -34,23 +36,34 @@
* should be able to run in a cmdlet line or web console. User defined actions
* are also meant to extend this.
*/
@Getter
public abstract class SmartAction {
private static final Logger LOG = LoggerFactory.getLogger(SmartAction.class);
private long cmdletId;
private boolean lastAction;
private long actionId;
private Map<String, String> actionArgs;
private SmartContext context;

private final ByteArrayOutputStream resultOutputStream;
private final PrintStream resultPrintStream;
private final ByteArrayOutputStream logOutputStream;
private final PrintStream logPrintStream;
private volatile boolean successful;
protected String name;

@Setter
private long cmdletId;
@Setter
private boolean lastAction;
@Setter
private long actionId;
@Setter
private SmartContext context;
@Setter
private String name;
@Setter
private Throwable throwable;

private Map<String, String> arguments;
private long startTime;
private long finishTime;
private Throwable throwable;
private boolean finished;

private volatile boolean successful;
private volatile boolean finished;

public SmartAction() {
this.successful = false;
Expand All @@ -61,66 +74,13 @@ public SmartAction() {
this.logPrintStream = new PrintStream(logOutputStream, false);
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public long getCmdletId() {
return cmdletId;
}

public void setCmdletId(long cmdletId) {
this.cmdletId = cmdletId;
}

public boolean isLastAction() {
return lastAction;
}

public void setLastAction(boolean lastAction) {
this.lastAction = lastAction;
}

public SmartContext getContext() {
return context;
}

public void setContext(SmartContext context) {
this.context = context;
}

/**
* Used to initialize the action.
*
* @param args Action specific
*/
public void init(Map<String, String> args) {
this.actionArgs = args;
}

/**
* Get action arguments.
*
* @return
*/
public Map<String, String> getArguments() {
return actionArgs;
}

public void setArguments(Map<String, String> args) {
actionArgs = args;
}

public long getActionId() {
return actionId;
}

public void setActionId(long actionId) {
this.actionId = actionId;
this.arguments = args;
}

protected abstract void execute() throws Exception;
Expand All @@ -145,10 +105,6 @@ private void setStartTime() {
this.startTime = System.currentTimeMillis();
}

private void setThrowable(Throwable t) {
this.throwable = t;
}

private void setFinishTime() {
this.finishTime = System.currentTimeMillis();
}
Expand All @@ -163,14 +119,6 @@ protected void appendLog(String log) {
logPrintStream.println(log);
}

public PrintStream getResultOutputStream() {
return resultPrintStream;
}

public PrintStream getLogPrintStream() {
return logPrintStream;
}

public float getProgress() {
if (successful) {
return 1.0F;
Expand All @@ -197,14 +145,6 @@ private void stop() {
resultPrintStream.close();
}

public boolean isSuccessful() {
return successful;
}

public boolean isFinished() {
return finished;
}

@VisibleForTesting
public boolean getExpectedAfterRun() throws UnsupportedEncodingException {
ActionStatus actionStatus = getActionStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,8 @@ public class SmartConfKeys {
"smart.client.active.server.cache.path";
public static final String SMART_CLIENT_ACTIVE_SERVER_CACHE_PATH_DEFAULT =
"/tmp/active_smart_server";

public static final String SMART_ACTION_CLIENT_CACHE_TTL_KEY =
"smart.action.client.cache.ttl";
public static final String SMART_ACTION_CLIENT_CACHE_TTL_DEFAULT = "10m";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public static Configuration toRemoteClusterConfig(Configuration configuration) {
return remoteConfig;
}

public static InetSocketAddress getSsmMasterRpcAddress(
Configuration configuration) throws IOException {
return getSsmRpcAddresses(configuration).get(0);
}

public static List<InetSocketAddress> getSsmRpcAddresses(
Configuration configuration) throws IOException {
Collection<String> rawRpcAddresses = configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.topic.ITopic;
import com.hazelcast.topic.Message;
import com.hazelcast.topic.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
Expand All @@ -44,27 +45,26 @@
import java.util.concurrent.TimeUnit;

// Todo: recover and reconnect when master is offline
@Slf4j
public class HazelcastWorker implements StatusReporter {
private static final Logger LOG = LoggerFactory.getLogger(HazelcastWorker.class);
private final HazelcastInstance instance;
private ScheduledExecutorService executorService;
private ITopic<Serializable> masterMessages;
private ITopic<StatusMessage> statusTopic;
private CmdletExecutor cmdletExecutor;
private CmdletFactory factory;
private SmartConf smartConf;
private final ScheduledExecutorService executorService;
private final ITopic<StatusMessage> statusTopic;
private final CmdletExecutor cmdletExecutor;
private final CmdletFactory factory;
private final SmartConf smartConf;

public HazelcastWorker(SmartContext smartContext) {
this.smartConf = smartContext.getConf();
this.factory = new CmdletFactory(smartContext, this);
this.factory = new CmdletFactory(smartContext);
this.cmdletExecutor = new CmdletExecutor(smartContext.getConf());
this.executorService = Executors.newSingleThreadScheduledExecutor();
this.instance = HazelcastInstanceProvider.getInstance();
HazelcastInstance instance = HazelcastInstanceProvider.getInstance();
this.statusTopic = instance.getTopic(HazelcastExecutorService.STATUS_TOPIC);
String instanceId = String.valueOf(instance.getCluster().getLocalMember().getUuid());
this.masterMessages =
ITopic<Serializable> masterMessages =
instance.getTopic(HazelcastExecutorService.WORKER_TOPIC_PREFIX + instanceId);
this.masterMessages.addMessageListener(new MasterMessageListener());
masterMessages.addMessageListener(new MasterMessageListener());
}

public void start() {
Expand All @@ -78,6 +78,7 @@ public void start() {
public void stop() {
executorService.shutdown();
cmdletExecutor.shutdown();
factory.close();
}

@Override
Expand Down
Loading

0 comments on commit 893ab82

Please sign in to comment.