diff --git a/.gitignore b/.gitignore
index 45cf7a4..6a7917c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
target/
nb-configuration.xml
+nbactions.xml
.DS_Store
*.DS_Store
*.iml
diff --git a/README.md b/README.md
index 4d6a90b..4cc4c44 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,8 @@
-# StreamFlow™
+# StreamFlow™
+
+[![Build Status](https://travis-ci.org/lmco/streamflow.svg?branch=develop)](https://travis-ci.org/lmco/streamflow)
+
+[![Join the chat at https://gitter.im/lmco/streamflow](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/lmco/streamflow?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
## Overview
@@ -8,6 +12,8 @@ such as Apache Storm easier, faster, and with "enterprise" like management funct
StreamFlow also provides a mechanism for non-developers such as data scientists, analysts, or
operational users to rapidly build scalable data flows and analytics.
+![Sample topology](https://raw.githubusercontent.com/wiki/lmco/streamflow/sample-topology.png)
+
StreamFlow provides the following capabilities:
1. A **responsive web interface** for building and monitoring Storm topologies.
@@ -95,7 +101,7 @@ See the License for the specific language governing permissions and
limitations under the License.
This product incorporates open source software components covered by the terms
-of third party license agreements contained are contained in the /Licenses
+of third party license agreements contained in the /Licenses
folder of this project.
## Documentation Version
diff --git a/pom.xml b/pom.xml
index 7a24f55..78fee6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
streamflow
streamflow
- 0.9.1
+ 0.10.0
pom
StreamFlow
@@ -43,14 +43,16 @@
- 1.6
+ 1.7
+ 1.7
+ 1.7
UTF-8
UTF-8
- 0.9.1-incubating
+ 0.9.3
9.0.0.RC2
- 2.1.5
+ 2.4.1
1.17.1
3.0
0.107
@@ -59,6 +61,7 @@
3.0.2
1.2.3
1.7.7
+ 1.0.13
1.2.17
4.11
1.5.6
@@ -91,10 +94,12 @@
storm-core
${storm.version}
+
ring
ring-jetty-adapter
@@ -307,6 +312,12 @@
org.apache.kafka
kafka_2.10
0.8.1.1
+
+
+ log4j
+ log4j
+
+
@@ -357,6 +368,24 @@
slf4j-api
${slf4j.version}
+
+ org.slf4j
+ log4j-over-slf4j
+ ${slf4j.version}
+
+
+
+ ch.qos.logback
+ logback-core
+ ${logback.version}
+
+
+ ch.qos.logback
+ logback-classic
+ ${logback.version}
+
+
+
@@ -421,11 +449,15 @@
org.slf4j
- slf4j-log4j12
+ log4j-over-slf4j
- log4j
- log4j
+ ch.qos.logback
+ logback-core
+
+
+ ch.qos.logback
+ logback-classic
junit
@@ -459,6 +491,7 @@
+
org.apache.maven.plugins
maven-resources-plugin
2.6
-
-
- UTF-8
-
diff --git a/streamflow-core/pom.xml b/streamflow-core/pom.xml
index e4a4549..3371f7a 100644
--- a/streamflow-core/pom.xml
+++ b/streamflow-core/pom.xml
@@ -22,7 +22,7 @@
streamflow
streamflow
- 0.9.1
+ 0.10.0
streamflow-core
@@ -30,11 +30,12 @@
pom
- streamflow-util
streamflow-model
+ streamflow-util
streamflow-datastore
streamflow-engine
streamflow-service
streamflow-server
+ streamflow-app
diff --git a/streamflow-core/streamflow-app/pom.xml b/streamflow-core/streamflow-app/pom.xml
new file mode 100644
index 0000000..3e2c6b7
--- /dev/null
+++ b/streamflow-core/streamflow-app/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+ 4.0.0
+
+
+ streamflow
+ streamflow-core
+ 0.10.0
+
+
+ streamflow-app
+ StreamFlow Application
+ pom
+
+
+ streamflow-app-jar
+ streamflow-app-war
+
+
diff --git a/streamflow-core/streamflow-app/streamflow-app-jar/pom.xml b/streamflow-core/streamflow-app/streamflow-app-jar/pom.xml
new file mode 100644
index 0000000..0416071
--- /dev/null
+++ b/streamflow-core/streamflow-app/streamflow-app-jar/pom.xml
@@ -0,0 +1,99 @@
+
+
+
+ 4.0.0
+
+
+ streamflow
+ streamflow-app
+ 0.10.0
+
+
+ streamflow-app-jar
+ StreamFlow Application JAR
+ jar
+
+
+
+ streamflow
+ streamflow-server
+ ${project.version}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.3
+
+
+ package
+
+ shade
+
+
+ false
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ streamflow.server.StreamflowServer
+
+
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 1.3.2
+
+
+
+ exec
+
+
+
+
+ java
+
+ -jar
+ ${project.build.directory}/${project.build.finalName}.jar
+
+
+
+
+
+
+
diff --git a/streamflow-core/streamflow-app/streamflow-app-war/pom.xml b/streamflow-core/streamflow-app/streamflow-app-war/pom.xml
new file mode 100644
index 0000000..9e2cfb1
--- /dev/null
+++ b/streamflow-core/streamflow-app/streamflow-app-war/pom.xml
@@ -0,0 +1,103 @@
+
+
+
+ 4.0.0
+
+
+ streamflow
+ streamflow-app
+ 0.10.0
+
+
+ streamflow-app-war
+ StreamFlow Application WAR
+ war
+
+
+
+ streamflow
+ streamflow-server
+ ${project.version}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.3
+
+
+ package
+
+ shade
+
+
+ false
+
+
+ streamflow:streamflow-server
+
+
+
+
+ *:*
+
+ STREAMFLOW-INF/**
+ streamflow/server/**
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 2.8
+
+
+ process-sources
+
+ copy
+
+
+
+
+ streamflow
+ streamflow-engine
+ ${project.version}
+ jar
+
+
+ ${project.build.outputDirectory}/STREAMFLOW-INF/lib/
+ true
+
+
+
+
+
+
+
+
diff --git a/streamflow-core/streamflow-app/streamflow-app-war/src/main/webapp/META-INF/context.xml b/streamflow-core/streamflow-app/streamflow-app-war/src/main/webapp/META-INF/context.xml
new file mode 100644
index 0000000..d73f871
--- /dev/null
+++ b/streamflow-core/streamflow-app/streamflow-app-war/src/main/webapp/META-INF/context.xml
@@ -0,0 +1,19 @@
+
+
+
diff --git a/streamflow-core/streamflow-app/streamflow-app-war/src/main/webapp/WEB-INF/web.xml b/streamflow-core/streamflow-app/streamflow-app-war/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..e564ace
--- /dev/null
+++ b/streamflow-core/streamflow-app/streamflow-app-war/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,32 @@
+
+
+
+ Streamflow Application
+
+ streamflow.server.config.WebConfig
+
+
+ guiceFilter
+ com.google.inject.servlet.GuiceFilter
+
+
+ guiceFilter
+ /*
+
+
diff --git a/streamflow-core/streamflow-datastore/pom.xml b/streamflow-core/streamflow-datastore/pom.xml
index c42036d..6c5cd0c 100644
--- a/streamflow-core/streamflow-datastore/pom.xml
+++ b/streamflow-core/streamflow-datastore/pom.xml
@@ -22,7 +22,7 @@
streamflow
streamflow-core
- 0.9.1
+ 0.10.0
streamflow-datastore
diff --git a/streamflow-core/streamflow-datastore/streamflow-datastore-core/pom.xml b/streamflow-core/streamflow-datastore/streamflow-datastore-core/pom.xml
index eabc2a3..9459860 100644
--- a/streamflow-core/streamflow-datastore/streamflow-datastore-core/pom.xml
+++ b/streamflow-core/streamflow-datastore/streamflow-datastore-core/pom.xml
@@ -22,7 +22,7 @@
streamflow
streamflow-datastore
- 0.9.1
+ 0.10.0
streamflow-datastore-core
diff --git a/streamflow-core/streamflow-datastore/streamflow-datastore-jdbc/pom.xml b/streamflow-core/streamflow-datastore/streamflow-datastore-jdbc/pom.xml
index 4c43a0c..9b8a797 100644
--- a/streamflow-core/streamflow-datastore/streamflow-datastore-jdbc/pom.xml
+++ b/streamflow-core/streamflow-datastore/streamflow-datastore-jdbc/pom.xml
@@ -22,7 +22,7 @@
streamflow
streamflow-datastore
- 0.9.1
+ 0.10.0
streamflow-datastore-jdbc
@@ -46,6 +46,11 @@
h2
1.4.178
+
+ mysql
+ mysql-connector-java
+ 5.1.34
+
diff --git a/streamflow-core/streamflow-datastore/streamflow-datastore-mongodb/pom.xml b/streamflow-core/streamflow-datastore/streamflow-datastore-mongodb/pom.xml
index 0d8bde6..3339ebb 100644
--- a/streamflow-core/streamflow-datastore/streamflow-datastore-mongodb/pom.xml
+++ b/streamflow-core/streamflow-datastore/streamflow-datastore-mongodb/pom.xml
@@ -22,7 +22,7 @@
streamflow
streamflow-datastore
- 0.9.1
+ 0.10.0
streamflow-datastore-mongodb
diff --git a/streamflow-core/streamflow-engine/pom.xml b/streamflow-core/streamflow-engine/pom.xml
index 0b44eda..ad7adde 100644
--- a/streamflow-core/streamflow-engine/pom.xml
+++ b/streamflow-core/streamflow-engine/pom.xml
@@ -22,7 +22,7 @@
streamflow
streamflow-core
- 0.9.1
+ 0.10.0
streamflow-engine
@@ -30,11 +30,6 @@
jar
-
- org.apache.storm
- storm-core
- provided
-
streamflow
streamflow-model
@@ -45,6 +40,18 @@
streamflow-util
${project.version}
+
+
+ org.apache.storm
+ storm-core
+ provided
+
+
+
+ com.google.guava
+ guava
+ 13.0
+
com.google.inject
guice
@@ -57,21 +64,6 @@
commons-io
commons-io
-
- org.slf4j
- slf4j-api
- provided
-
-
- org.slf4j
- slf4j-log4j12
- provided
-
-
- log4j
- log4j
- provided
-
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java
index 395175c..f2b9d86 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java
@@ -57,7 +57,9 @@ public class StormEngine {
private final StreamflowConfig streamflowConfig;
- private final HashMap clusters = new HashMap();
+ private final HashMap clusters = new HashMap<>();
+
+ private static final int KILL_BUFFER_SECS = 60;
@Inject
public StormEngine(LocalCluster stormCluster, StreamflowConfig streamflowConfig) {
@@ -95,7 +97,7 @@ public Topology submitTopology(Topology topology, Cluster cluster) {
return topology;
}
- public boolean killTopology(Topology topology, int waitTimeSecs) {
+ public boolean killTopology(Topology topology, int waitTimeSecs, boolean async) {
boolean killed = true;
if (isDeployed(topology)) {
@@ -118,8 +120,10 @@ public boolean killTopology(Topology topology, int waitTimeSecs) {
client.killTopologyWithOpts(topology.getId(), killOptions);
}
- // Check for final removal of topology 60 times (60 seconds)
- killed = waitForTopologyRemoval(topology, 60);
+ if (!async) {
+ // Check for final removal of topology waitTime plus 60 second buffer
+ killed = waitForTopologyRemoval(topology, waitTimeSecs + KILL_BUFFER_SECS);
+ }
} catch (Exception ex) {
LOG.error("Exception occurred while killing the remote topology: ID = " +
@@ -166,7 +170,7 @@ public ClusterSummary getClusterSummary(Cluster cluster) {
clusterSummary.setNimbusUptimeSecs(summary.get_nimbus_uptime_secs());
clusterSummary.setNimbusConf(nimbusConf);
- List supervisors = new ArrayList();
+ List supervisors = new ArrayList<>();
for (backtype.storm.generated.SupervisorSummary ss : summary.get_supervisors()) {
SupervisorSummary supervisor = new SupervisorSummary();
supervisor.setHost(ss.get_host());
@@ -179,7 +183,7 @@ public ClusterSummary getClusterSummary(Cluster cluster) {
}
clusterSummary.setSupervisors(supervisors);
- List topologies = new ArrayList();
+ List topologies = new ArrayList<>();
for (backtype.storm.generated.TopologySummary ts : summary.get_topologies()) {
TopologySummary topology = new TopologySummary();
topology.setId(ts.get_id());
@@ -211,7 +215,7 @@ public TopologyInfo getTopologyInfo(Topology topology) {
return topologyInfo;
}
- // Convert the topology ID of the jetstram topology to the id recognized by Storm
+ // Convert the topology ID of the streamflow topology to the id recognized by Storm
String stormTopologyId = resolveStormTopologyId(topology);
// The topology should be running, but found no matching name. Topology must have been killed
@@ -241,19 +245,21 @@ public TopologyInfo getTopologyInfo(Topology topology) {
topologyConf = client.getTopologyConf(stormTopologyId);
} catch (NotAliveException ex) {
- LOG.error("The requested topology was not found in the cluster: ID = {}",
- stormTopologyId);
-
- ex.printStackTrace();
+ LOG.error("The requested topology was not found in the cluster: ID = " + stormTopologyId);
} catch (TException ex) {
- LOG.error("Exception while retrieving the remote topology info: {}",
- ex.getMessage());
-
- ex.printStackTrace();
+ LOG.error("Exception while retrieving the remote topology info: ", ex);
} finally {
tTransport.close();
}
}
+
+ /*
+ // Make sure the specified topology was found on the storm cluster
+ if (info == null) {
+ LOG.error("Unable to retrieve topology info from the storm cluster");
+ return null;
+ }
+ */
TopologyInfo topologyInfo = new TopologyInfo();
topologyInfo.setId(info.get_id());
@@ -264,11 +270,13 @@ public TopologyInfo getTopologyInfo(Topology topology) {
for (Map.Entry> error
: info.get_errors().entrySet()) {
- List errorInfoList = new ArrayList();
+ List errorInfoList = new ArrayList<>();
for (backtype.storm.generated.ErrorInfo ei : error.getValue()) {
ErrorInfo errorInfo = new ErrorInfo();
errorInfo.setError(ei.get_error());
errorInfo.setErrorTimeSecs(ei.get_error_time_secs());
+ errorInfo.setHost(ei.get_host());
+ errorInfo.setPort(ei.get_port());
errorInfoList.add(errorInfo);
}
@@ -276,7 +284,7 @@ public TopologyInfo getTopologyInfo(Topology topology) {
topologyInfo.getErrors().put(error.getKey(), errorInfoList);
}
- List executorSummaries = new ArrayList();
+ List executorSummaries = new ArrayList<>();
for (backtype.storm.generated.ExecutorSummary es : info.get_executors()) {
ExecutorSummary executor = new ExecutorSummary();
executor.setComponentId(es.get_component_id());
@@ -310,14 +318,13 @@ public TopologyInfo getTopologyInfo(Topology topology) {
for (Map.Entry> ae
: bs.get_acked().entrySet()) {
- Map ackedMap = new HashMap();
+ Map ackedMap = new HashMap<>();
for (Map.Entry aem
: ae.getValue().entrySet()) {
backtype.storm.generated.GlobalStreamId gsi = aem.getKey();
- String globalStreamId = gsi.get_componentId()
- + "(" + gsi.get_streamId() + ")";
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
ackedMap.put(globalStreamId, aem.getValue());
}
@@ -327,14 +334,13 @@ public TopologyInfo getTopologyInfo(Topology topology) {
for (Map.Entry> fe
: bs.get_failed().entrySet()) {
- Map failedMap = new HashMap();
+ Map failedMap = new HashMap<>();
for (Map.Entry fem
: fe.getValue().entrySet()) {
backtype.storm.generated.GlobalStreamId gsi = fem.getKey();
- String globalStreamId = gsi.get_componentId()
- + "(" + gsi.get_streamId() + ")";
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
failedMap.put(globalStreamId, fem.getValue());
}
@@ -344,14 +350,13 @@ public TopologyInfo getTopologyInfo(Topology topology) {
for (Map.Entry> ee
: bs.get_executed().entrySet()) {
- Map executedMap = new HashMap();
+ Map executedMap = new HashMap<>();
for (Map.Entry eem
: ee.getValue().entrySet()) {
backtype.storm.generated.GlobalStreamId gsi = eem.getKey();
- String globalStreamId = gsi.get_componentId()
- + "(" + gsi.get_streamId() + ")";
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
executedMap.put(globalStreamId, eem.getValue());
}
@@ -361,14 +366,13 @@ public TopologyInfo getTopologyInfo(Topology topology) {
for (Map.Entry> ema
: bs.get_execute_ms_avg().entrySet()) {
- Map executedMap = new HashMap();
+ Map executedMap = new HashMap<>();
for (Map.Entry emam
: ema.getValue().entrySet()) {
backtype.storm.generated.GlobalStreamId gsi = emam.getKey();
- String globalStreamId = gsi.get_componentId()
- + "(" + gsi.get_streamId() + ")";
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
executedMap.put(globalStreamId, emam.getValue());
}
@@ -378,14 +382,13 @@ public TopologyInfo getTopologyInfo(Topology topology) {
for (Map.Entry> pma
: bs.get_process_ms_avg().entrySet()) {
- Map processMap = new HashMap();
+ Map processMap = new HashMap<>();
for (Map.Entry pmam
: pma.getValue().entrySet()) {
backtype.storm.generated.GlobalStreamId gsi = pmam.getKey();
- String globalStreamId = gsi.get_componentId()
- + "(" + gsi.get_streamId() + ")";
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
processMap.put(globalStreamId, pmam.getValue());
}
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkFirstClassLoader.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkFirstClassLoader.java
new file mode 100644
index 0000000..9155097
--- /dev/null
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkFirstClassLoader.java
@@ -0,0 +1,75 @@
+package streamflow.engine.framework;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FrameworkFirstClassLoader extends ClassLoader {
+
+ private Logger LOG = LoggerFactory.getLogger(FrameworkFirstClassLoader.class);
+
+ private FrameworkClassLoader frameworkClassLoader;
+
+ public FrameworkFirstClassLoader(URL[] frameworkUrls) {
+ this(frameworkUrls, Thread.currentThread().getContextClassLoader());
+ }
+
+ public FrameworkFirstClassLoader(URL[] frameworkUrls, ClassLoader parent) {
+ super(parent);
+
+ frameworkClassLoader = new FrameworkClassLoader(frameworkUrls, this.getParent());
+ }
+
+ public void includeURL(URL frameworkUrl) {
+ frameworkClassLoader.includeURL(frameworkUrl);
+ }
+
+ @Override
+ protected synchronized Class> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ try {
+ return frameworkClassLoader.findClass(name);
+ } catch (ClassNotFoundException ex) {
+ return super.loadClass(name, resolve);
+ }
+ }
+
+ @Override
+ public URL getResource(String name) {
+ URL resource = frameworkClassLoader.getResource(name);
+ if (resource == null) {
+ resource = super.getResource(name);
+ }
+ return resource;
+ }
+
+ private class FrameworkClassLoader extends URLClassLoader {
+
+ private final ClassLoader parentClassLoader;
+
+ public FrameworkClassLoader(URL[] frameworkUrls, ClassLoader parentClassLoader) {
+ super(frameworkUrls, null);
+
+ this.parentClassLoader = parentClassLoader;
+ }
+
+ public void includeURL(URL frameworkUrl) {
+ this.addURL(frameworkUrl);
+ }
+
+ @Override
+ public Class> findClass(String name) throws ClassNotFoundException {
+ try {
+ // Attempt to reuse any classes which have already been loaded
+ Class> loadedClass = super.findLoadedClass(name);
+ if (loadedClass != null) {
+ return loadedClass;
+ }
+
+ return super.findClass(name);
+ } catch (ClassNotFoundException e) {
+ return parentClassLoader.loadClass(name);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkKryoFactory.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkKryoFactory.java
index bae0cfd..b983c41 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkKryoFactory.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkKryoFactory.java
@@ -83,16 +83,16 @@ public Kryo getKryo(Map conf) {
try {
// Retrieve the required serialization class from the Kryo Realm
- Class typeClass = FrameworkLoader.getInstance().loadFrameworkClass(
- FrameworkKryoFactory.KRYO_REALM + topology.getProjectId(),
- serialization.getFramework(), serialization.getTypeClass());
+ Class typeClass = FrameworkUtils.getInstance().loadFrameworkClass(
+ serialization.getFrameworkHash(), serialization.getTypeClass(),
+ topology.getClassLoaderPolicy());
// Retrieve the optional serializer class
- Class serializerClass = null;
+ Class serializerClass = null;
if (serialization.getSerializerClass() != null) {
- serializerClass = FrameworkLoader.getInstance().loadFrameworkClass(
- FrameworkKryoFactory.KRYO_REALM + topology.getProjectId(),
- serialization.getFramework(), serialization.getSerializerClass());
+ serializerClass = FrameworkUtils.getInstance().loadFrameworkClass(
+ serialization.getFrameworkHash(), serialization.getSerializerClass(),
+ topology.getClassLoaderPolicy());
}
if (serializerClass == null) {
@@ -103,7 +103,7 @@ public Kryo getKryo(Map conf) {
+ serialization.getTypeClass());
} else {
// Register the type class using the custom serializer
- kryo.register(typeClass, (Serializer) serializerClass.newInstance());
+ kryo.register(typeClass, serializerClass.newInstance());
LOG.info("Streamflow Registered serialization: Type Class = "
+ serialization.getTypeClass() + ", Serializer Class = "
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkLoader.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkLoader.java
index ecd5648..ad39a0b 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkLoader.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkLoader.java
@@ -17,13 +17,11 @@
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import streamflow.util.environment.StreamflowEnvironment;
-import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.codehaus.plexus.classworlds.ClassWorld;
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkModule.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkModule.java
index 0dba8f1..e7bd501 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkModule.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkModule.java
@@ -15,6 +15,7 @@
*/
package streamflow.engine.framework;
+import backtype.storm.task.TopologyContext;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
@@ -26,7 +27,6 @@
import com.google.inject.name.Names;
import java.io.File;
import java.util.Map.Entry;
-import streamflow.model.Cluster;
import streamflow.model.Topology;
import streamflow.model.TopologyComponent;
import streamflow.model.config.StreamflowConfig;
@@ -34,21 +34,23 @@
import org.slf4j.MDC;
public class FrameworkModule extends AbstractModule {
+
+ private final org.slf4j.Logger LOG = LoggerFactory.getLogger(FrameworkModule.class);
private final Topology topology;
private final TopologyComponent component;
- private final boolean isCluster;
-
private final StreamflowConfig streamflowConfig;
+
+ private final TopologyContext context;
public FrameworkModule(Topology topology, TopologyComponent component,
- boolean isCluster, StreamflowConfig streamflowConfig) {
+ StreamflowConfig streamflowConfig, TopologyContext context) {
this.topology = topology;
this.component = component;
- this.isCluster = isCluster;
this.streamflowConfig = streamflowConfig;
+ this.context = context;
}
@Override
@@ -69,40 +71,29 @@ protected void configure() {
streamflowConfig.getProxy().getPort());
}
- if (component.getKey() != null) {
- bindConstant().annotatedWith(
- Names.named("streamflow.component.key")).to(component.getKey());
- }
-
- if (topology.getId() != null) {
- bindConstant().annotatedWith(
- Names.named("streamflow.topology.id")).to(topology.getId());
- }
-
- // Bind configuration values for the cluster if values are needed
- Cluster cluster = streamflowConfig.getSelectedCluster();
-
- if (cluster != null) {
- if (cluster.getId() != null) {
- bindConstant().annotatedWith(
- Names.named("streamflow.cluster.id")).to(cluster.getId());
- }
-
- if (cluster.getDisplayName() != null) {
- bindConstant().annotatedWith(
- Names.named("streamflow.cluster.displayName")).to(cluster.getDisplayName());
- }
-
- if (cluster.getJmsURI() != null) {
- bindConstant().annotatedWith(
- Names.named("streamflow.cluster.jmsUri")).to(cluster.getJmsURI());
- }
- }
+ // Bind streamflow specific properties in case underlying bolts/resources require them
+ bindConstant().annotatedWith(
+ Names.named("streamflow.topology.id")).to(topology.getId());
+ //bindConstant().annotatedWith(
+ // Names.named("streamflow.topology.name")).to(topology.getName());
+ bindConstant().annotatedWith(
+ Names.named("streamflow.component.key")).to(component.getKey());
+ bindConstant().annotatedWith(
+ Names.named("streamflow.component.label")).to(component.getLabel());
+ bindConstant().annotatedWith(
+ Names.named("streamflow.component.name")).to(component.getName());
+ bindConstant().annotatedWith(
+ Names.named("streamflow.component.framework")).to(component.getFramework());
+ //bindConstant().annotatedWith(
+ // Names.named("streamflow.user.id")).to(topology.getUserId());
+ //bindConstant().annotatedWith(
+ // Names.named("streamflow.cluster.id")).to(topology.getClusterId());
+ //bindConstant().annotatedWith(
+ // Names.named("streamflow.cluster.name")).to(topology.getClusterName());
}
@Provides
public org.slf4j.Logger provideLogger() {
-
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
PatternLayout patternLayout = new PatternLayout();
@@ -110,12 +101,10 @@ public org.slf4j.Logger provideLogger() {
patternLayout.setContext(loggerContext);
patternLayout.start();
- // TODO: NEED TO ENSURE THE PROJECT ID IS PASSED TO THE FRAMEWORK MODULE
-
String logPath = streamflowConfig.getLogger().getBaseDir()
- + File.separator + topology.getId() + ".log";
+ + File.separator + "topology-" + topology.getId() + ".log";
- FileAppender fileAppender = new FileAppender();
+ FileAppender fileAppender = new FileAppender<>();
fileAppender.setName("FILE");
fileAppender.setFile(logPath);
fileAppender.setContext(loggerContext);
@@ -127,12 +116,17 @@ public org.slf4j.Logger provideLogger() {
logger.detachAndStopAllAppenders();
logger.addAppender(fileAppender);
logger.setAdditive(false);
- logger.setLevel(Level.DEBUG);
+ logger.setLevel(Level.toLevel(topology.getLogLevel()));
// Set the context for the topology/component when logging
MDC.put("topology", topology.getId());
- MDC.put("component", component.getKey());
MDC.put("project", topology.getProjectId());
+ MDC.put("component", component.getKey());
+ if (context != null) {
+ MDC.put("task", component.getName() + "-" + context.getThisTaskIndex());
+ } else {
+ MDC.put("task", component.getName());
+ }
return logger;
}
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkUtils.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkUtils.java
new file mode 100644
index 0000000..fc19fe8
--- /dev/null
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/framework/FrameworkUtils.java
@@ -0,0 +1,209 @@
+package streamflow.engine.framework;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.concurrent.ExecutionException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import streamflow.util.environment.StreamflowEnvironment;
+
+public class FrameworkUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FrameworkUtils.class);
+
+ private static final String DEFAULT_CLASS_LOADER_POLICY = "FRAMEWORK_FIRST";
+
+ private static FrameworkUtils singleton;
+
+ private LoadingCache frameworkFirstCache;
+
+ private LoadingCache frameworkLastCache;
+
+ private FrameworkUtils() {
+ }
+
+ public static synchronized FrameworkUtils getInstance() {
+ if (singleton == null) {
+ singleton = new FrameworkUtils();
+ singleton.initialize();
+ }
+ return singleton;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ throw new CloneNotSupportedException();
+ }
+
+ private void initialize() {
+ frameworkFirstCache = CacheBuilder.newBuilder().build(new FrameworkFirstCacheLoader());
+ frameworkLastCache = CacheBuilder.newBuilder().build(new FrameworkLastCacheLoader());
+
+ // Initialize the frameworks directory where temporary frameworks will be stored
+ File frameworksDir = new File(StreamflowEnvironment.getFrameworksDir());
+ if (!frameworksDir.exists()) {
+ frameworksDir.mkdirs();
+ }
+ }
+
+ public Class loadFrameworkClass(String frameworkHash, String frameworkClass)
+ throws FrameworkException {
+ // Default policy is to load frameworks first
+ return loadFrameworkClass(frameworkHash, frameworkClass, DEFAULT_CLASS_LOADER_POLICY);
+ }
+
+ public Class loadFrameworkClass(String frameworkHash, String frameworkClass, String classLoaderPolicy)
+ throws FrameworkException {
+ Class frameworkClazz = null;
+
+ try {
+ // Load the framework realm to isolate the class loading for the framework
+ ClassLoader frameworkClassLoader = getFrameworkClassLoader(frameworkHash, classLoaderPolicy);
+
+ // Load the module using the specified module class
+ frameworkClazz = frameworkClassLoader.loadClass(frameworkClass);
+ } catch (ClassNotFoundException ex) {
+ throw new FrameworkException(
+ "Framework class was not found in the framework jar: "
+ + frameworkClass);
+ }
+
+ return frameworkClazz;
+ }
+
+ public T loadFrameworkClassInstance(String frameworkHash, String frameworkClass,
+ Class frameworkClassType) throws FrameworkException {
+ // Defaut policy is to load frameworks first
+ return loadFrameworkClassInstance(
+ frameworkHash, frameworkClass, frameworkClassType, DEFAULT_CLASS_LOADER_POLICY);
+ }
+
+ public T loadFrameworkClassInstance(String frameworkHash, String frameworkClass,
+ Class frameworkClassType, String classLoaderPolicy) throws FrameworkException {
+ try {
+ // Load the framework class from the framework with specified coordinates
+ Class frameworkClazz = loadFrameworkClass(frameworkHash, frameworkClass, classLoaderPolicy);
+
+ // Check to make sure that the library loaded matches the class type
+ if (frameworkClassType.isAssignableFrom(frameworkClazz)) {
+ // Create a new instance of the module and return it
+ return (T) frameworkClazz.newInstance();
+ } else {
+ throw new FrameworkException(
+ "The framework class could not be assigned to the specified class type: "
+ + frameworkClass);
+ }
+ } catch (FrameworkException ex) {
+ // Rethrow framework exceptions as is
+ throw ex;
+ } catch (InstantiationException ex) {
+ throw new FrameworkException(
+ "Component class cound not be instantiated: " + frameworkClass);
+ } catch (IllegalAccessException ex) {
+ throw new FrameworkException(
+ "Component class was illegally accessed: " + frameworkClass);
+ } catch (Exception ex) {
+ LOG.error("Component loading failed due to an unexpected exception: ", ex);
+
+ throw new FrameworkException(
+ "Component loading failed due to an unexpected exception: " + ex.getMessage());
+ }
+ }
+
+ public ClassLoader getFrameworkClassLoader(String frameworkHash)
+ throws FrameworkException {
+ return getFrameworkClassLoader(frameworkHash, DEFAULT_CLASS_LOADER_POLICY);
+ }
+
+ public ClassLoader getFrameworkClassLoader(String frameworkHash, String classLoaderPolicy)
+ throws FrameworkException {
+ try {
+ if (classLoaderPolicy.equalsIgnoreCase("FRAMEWORK_LAST")) {
+ return frameworkLastCache.get(frameworkHash);
+ } else {
+ // Default class loader policy is framework first
+ return frameworkFirstCache.get(frameworkHash);
+ }
+ } catch (ExecutionException ex) {
+ throw new FrameworkException("Framework class loader execution exception: "
+ + ex.getMessage());
+ }
+ }
+
+ public URL getFrameworkJarUrl(String frameworkHash) {
+ File frameworkJar = new File(StreamflowEnvironment.getFrameworksDir(),
+ frameworkHash + ".jar");
+
+ // Check if the framework jar has already been added
+ if (!frameworkJar.exists()) {
+ // URL to the framework jar embedded within the topology jar
+ //URL embeddedFrameworkUrl = Thread.currentThread().getContextClassLoader()
+ URL embeddedFrameworkUrl = this.getClass().getClassLoader()
+ .getResource("STREAMFLOW-INF/lib/" + frameworkHash + ".jar");
+
+ try {
+ // Copy the framework jar out of the topology jar and into the temp directory
+ FileUtils.writeByteArrayToFile(frameworkJar,
+ IOUtils.toByteArray(embeddedFrameworkUrl));
+ } catch (Exception ex) {
+ LOG.error("An exception occurred while copying the inbuilt framework jar to the temp directory", ex);
+ }
+ }
+
+ try {
+ return frameworkJar.toURI().toURL();
+ } catch (Exception ex) {
+ LOG.error("Unabled to load the framework jar URL: ", ex);
+
+ return null;
+ }
+ }
+
+ private class FrameworkFirstCacheLoader extends CacheLoader {
+ @Override
+ public ClassLoader load(String frameworkHash) throws FrameworkException {
+ URL frameworkUrl = getFrameworkJarUrl(frameworkHash);
+
+ if (frameworkUrl != null) {
+ // Add the framework jar URL to the list to be loaded by the classloader
+ URL[] frameworkUrls = new URL[]{frameworkUrl};
+
+ LOG.info("Framework First Class Loader initialized by cache: Framework Hash = "
+ + frameworkHash);
+
+ return new FrameworkFirstClassLoader(
+ frameworkUrls, Thread.currentThread().getContextClassLoader());
+ } else {
+ throw new FrameworkException("Unable to load the framework jar, file was not available: "
+ + frameworkUrl);
+ }
+ }
+ }
+
+ private class FrameworkLastCacheLoader extends CacheLoader {
+ @Override
+ public ClassLoader load(String frameworkHash) throws FrameworkException {
+ URL frameworkUrl = getFrameworkJarUrl(frameworkHash);
+
+ if (frameworkUrl != null) {
+ // Add the framework jar URL to the list to be loaded by the classloader
+ URL[] frameworkUrls = new URL[]{frameworkUrl};
+
+ LOG.info("Framework Last Class Loader initialized by cache: Framework Hash = "
+ + frameworkHash);
+
+ return new URLClassLoader(
+ frameworkUrls, Thread.currentThread().getContextClassLoader());
+ } else {
+ throw new FrameworkException("Unable to load the framework jar, file was not available: "
+ + frameworkUrl);
+ }
+ }
+ }
+}
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/resource/ResourceModule.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/resource/ResourceModule.java
index 9d7baa7..3cdcfbe 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/resource/ResourceModule.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/resource/ResourceModule.java
@@ -20,11 +20,11 @@
import com.google.inject.name.Names;
import java.util.List;
import java.util.Map.Entry;
-import streamflow.engine.framework.FrameworkLoader;
import streamflow.model.Topology;
import streamflow.model.TopologyResourceEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import streamflow.engine.framework.FrameworkUtils;
public class ResourceModule extends AbstractModule {
@@ -53,8 +53,9 @@ protected void configure() {
}
// Use the FrameworkLoader to load the resource module class from the framework
- Class resourceClass = FrameworkLoader.getInstance().loadFrameworkClass(
- topology.getProjectId(), resourceEntry.getFramework(), resourceEntry.getResourceClass());
+ Class resourceClass = FrameworkUtils.getInstance().loadFrameworkClass(
+ resourceEntry.getFrameworkHash(), resourceEntry.getResourceClass(),
+ topology.getClassLoaderPolicy());
if (resourceClass.isAssignableFrom(Module.class)) {
// Bind the specific resource class to the injector
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java
index 18195f8..c1e6bfe 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java
@@ -19,12 +19,20 @@
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
import java.io.File;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.thrift7.TException;
+import org.json.simple.JSONValue;
import streamflow.engine.framework.FrameworkException;
-import streamflow.engine.framework.FrameworkKryoFactory;
import streamflow.model.Cluster;
import streamflow.model.Topology;
import streamflow.model.TopologyConfig;
@@ -75,8 +83,8 @@ public void run() {
+ ", Cluster ID = " + targetCluster.getId());
if (isClusterMode(targetCluster)) {
- String topologyJarPath = StreamflowEnvironment.getTopologiesDir() + File.separator
- + topology.getProjectId() + ".jar";
+ String topologyJarPath = StreamflowEnvironment.getTopologiesDir()
+ + File.separator + topology.getProjectId() + ".jar";
// Set the required config properties which specify the cluster endpoints
stormConfig.put(Config.NIMBUS_HOST, targetCluster.getNimbusHost());
@@ -85,8 +93,11 @@ public void run() {
// StormSubmitter requires that the path to the jar be set as a system property
System.setProperty("storm.jar", topologyJarPath);
- // Submit the topology to the remote cluster using the topology.id for the ID
- StormSubmitter.submitTopology(topology.getId(), stormConfig, stormTopology);
+ // Note: This should work but current version does not allow for reuse. Replace
+ // with native StormSubmitter
+ //StormSubmitter.submitTopology(topology.getId(), stormConfig, stormTopology);
+
+ submitTopology(topology.getId(), stormConfig, stormTopology, topologyJarPath);
} else {
localCluster.submitTopology(topology.getId(), stormConfig, stormTopology);
}
@@ -101,6 +112,65 @@ public void run() {
}
}
+ private void submitTopology(String name, Map stormConf, StormTopology stormTopology, String topologyJarPath)
+ throws AlreadyAliveException, InvalidTopologyException {
+
+ if(!Utils.isValidConf(stormConf)) {
+ throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
+ }
+ stormConf = new HashMap(stormConf);
+ stormConf.putAll(Utils.readCommandLineOpts());
+ Map conf = Utils.readStormConfig();
+ conf.putAll(stormConf);
+
+ try {
+ String serConf = JSONValue.toJSONString(stormConf);
+
+ NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ if(topologyNameExists(conf, name)) {
+ throw new RuntimeException("Topology with name '" + name + "' already exists on cluster");
+ }
+
+ String uploadedJarLocation = StormSubmitter.submitJar(conf, topologyJarPath);
+
+ try {
+ LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
+
+ client.getClient().submitTopology(name, uploadedJarLocation, serConf, stormTopology);
+ } catch(InvalidTopologyException e) {
+ LOG.warn("Topology submission exception: "+ e.get_msg());
+ throw e;
+ } catch(AlreadyAliveException e) {
+ LOG.warn("Topology already alive exception", e);
+ throw e;
+ } finally {
+ client.close();
+ }
+
+ LOG.info("Finished submitting topology: " + name);
+ } catch(TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean topologyNameExists(Map conf, String name) {
+ NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ try {
+ ClusterSummary summary = client.getClient().getClusterInfo();
+ for(TopologySummary topologySummary : summary.get_topologies()) {
+ if(topologySummary.get_name().equals(name)) {
+ return true;
+ }
+ }
+ return false;
+
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ client.close();
+ }
+ }
+
private Config buildStormConfig(StreamflowConfig streamflowConfig,
ArrayList topologyProperties, boolean isClusterMode) {
Config stormConfig = new Config();
@@ -122,7 +192,7 @@ private Config buildStormConfig(StreamflowConfig streamflowConfig,
// Kryo Factory is only required for clustered topologies
if (isClusterMode) {
// Initialize Storm with the Custom Kryo Factory
- stormConfig.setKryoFactory(FrameworkKryoFactory.class);
+ //stormConfig.setKryoFactory(FrameworkKryoFactory.class);
}
return stormConfig;
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/BaseWrapper.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/BaseWrapper.java
index bddc141..b40f85e 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/BaseWrapper.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/BaseWrapper.java
@@ -1,12 +1,12 @@
package streamflow.engine.wrapper;
+import backtype.storm.task.TopologyContext;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.Serializable;
import java.util.ArrayList;
import streamflow.engine.framework.FrameworkException;
-import streamflow.engine.framework.FrameworkLoader;
import streamflow.engine.framework.FrameworkModule;
import streamflow.engine.resource.ResourceModule;
import streamflow.model.Topology;
@@ -15,6 +15,7 @@
import streamflow.model.config.StreamflowConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import streamflow.engine.framework.FrameworkUtils;
public abstract class BaseWrapper implements Serializable {
@@ -28,6 +29,8 @@ public abstract class BaseWrapper implements Serializable {
protected TopologyComponent component;
+ protected TopologyContext context;
+
protected boolean isCluster;
protected StreamflowConfig configuration;
@@ -48,20 +51,17 @@ protected T getDelegate() throws FrameworkException {
if (delegate == null) {
try {
// Load the delegate class from the framework jar in an isolated class loader
- delegate = FrameworkLoader.getInstance().loadFrameworkComponent(
- topology.getProjectId(), component.getFramework(),
- component.getMainClass(), typeClass);
-
- ClassLoader loader = FrameworkLoader.getInstance()
- .loadRealm(component.getFramework());
-
- Thread.currentThread().setContextClassLoader(loader);
+ delegate = FrameworkUtils.getInstance().loadFrameworkClassInstance(
+ component.getFrameworkHash(), component.getMainClass(),
+ typeClass, topology.getClassLoaderPolicy());
injectModules();
} catch (Exception ex) {
- throw new FrameworkException("Unable to locate component class: "
- + component.getMainClass());
+ LOG.error("Unable to load component class: Class = " + component.getMainClass(), ex);
+
+ throw new FrameworkException("Unable to load component class: "
+ + component.getMainClass() + ", Exception = " + ex.getMessage());
}
}
return delegate;
@@ -70,7 +70,7 @@ protected T getDelegate() throws FrameworkException {
private void injectModules() throws FrameworkException {
// Create the new FrameworkModule to inject proxy and property information
FrameworkModule frameworkModule = new FrameworkModule(
- topology, component, isCluster, configuration);
+ topology, component, configuration, context);
// Create the resource module which will inject resource properties
ResourceModule resourceModule = new ResourceModule(
@@ -80,13 +80,13 @@ private void injectModules() throws FrameworkException {
Injector injector = Guice.createInjector(
(Module) frameworkModule, (Module) resourceModule);
- ArrayList resourceModules = new ArrayList();
+ ArrayList resourceModules = new ArrayList<>();
for (TopologyResourceEntry resourceEntry : component.getResources()) {
// Load the framework class instance from the framework
- Class resourceClass = FrameworkLoader.getInstance().loadFrameworkClass(
- topology.getProjectId(), resourceEntry.getFramework(),
- resourceEntry.getResourceClass());
+ Class resourceClass = FrameworkUtils.getInstance().loadFrameworkClass(
+ resourceEntry.getFrameworkHash(), resourceEntry.getResourceClass(),
+ topology.getClassLoaderPolicy());
// Create an instance of each resource module save it for injection
resourceModules.add((Module) injector.getInstance(resourceClass));
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/BasicBoltWrapper.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/BasicBoltWrapper.java
index 9e2cdf5..b19720d 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/BasicBoltWrapper.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/BasicBoltWrapper.java
@@ -42,6 +42,8 @@ public BasicBoltWrapper(Topology topology, TopologyComponent component,
@Override
public void prepare(Map conf, TopologyContext context) {
+ this.context = context;
+
try {
// Register the metrics hook for this bolt to track statistics
context.addTaskHook(new BoltMetricsHook());
@@ -49,8 +51,7 @@ public void prepare(Map conf, TopologyContext context) {
getDelegate().prepare(conf, context);
} catch (FrameworkException ex) {
LOG.error("prepare() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("prepare() threw an uncaught exception: ", ex);
+ throw new RuntimeException(ex);
}
}
@@ -60,8 +61,6 @@ public void execute(Tuple tuple, BasicOutputCollector collector) {
getDelegate().execute(tuple, collector);
} catch (FrameworkException ex) {
LOG.error("execute() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("execute() threw an uncaught exception: ", ex);
}
}
@@ -71,8 +70,6 @@ public void cleanup() {
getDelegate().cleanup();
} catch (FrameworkException ex) {
LOG.error("cleanup() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("cleanup() threw an uncaught exception: ", ex);
}
}
@@ -82,8 +79,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
getDelegate().declareOutputFields(declarer);
} catch (FrameworkException ex) {
LOG.error("declareOutputFields() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("declareOutputFields() threw an uncaught exception: ", ex);
}
}
@@ -94,9 +89,6 @@ public Map getComponentConfiguration() {
} catch (FrameworkException ex) {
LOG.error("getComponentConfiguration() not delegated due to a Framework exception: ", ex);
return new HashMap();
- } catch (Exception ex) {
- LOG.error("getComponentConfiguration() threw an uncaught exception: ", ex);
- return new HashMap();
}
}
}
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/RichBoltWrapper.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/RichBoltWrapper.java
index e2783e6..b56fd1b 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/RichBoltWrapper.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/RichBoltWrapper.java
@@ -42,6 +42,8 @@ public RichBoltWrapper(Topology topology, TopologyComponent component,
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ this.context = context;
+
try {
// Register the metrics hook for this bolt to track statistics
context.addTaskHook(new BoltMetricsHook());
@@ -49,6 +51,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector
getDelegate().prepare(conf, context, collector);
} catch (FrameworkException ex) {
LOG.error("prepare() not delegated due to a Framework exception: ", ex);
+ throw new RuntimeException(ex);
}
}
@@ -58,8 +61,6 @@ public void execute(Tuple tuple) {
getDelegate().execute(tuple);
} catch (FrameworkException ex) {
LOG.error("execute() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("execute() threw an uncaught exception: ", ex);
}
}
@@ -69,8 +70,6 @@ public void cleanup() {
getDelegate().cleanup();
} catch (FrameworkException ex) {
LOG.error("cleanup() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("cleanup() threw an uncaught exception: ", ex);
}
}
@@ -80,8 +79,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
getDelegate().declareOutputFields(declarer);
} catch (FrameworkException ex) {
LOG.error("declareOutputFields() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("declareOutputFields() threw an uncaught exception: ", ex);
}
}
@@ -92,9 +89,6 @@ public Map getComponentConfiguration() {
} catch (FrameworkException ex) {
LOG.error("getComponentConfiguration() not delegated due to a Framework exception: ", ex);
return new HashMap();
- } catch (Exception ex) {
- LOG.error("getComponentConfiguration() threw an uncaught exception: ", ex);
- return new HashMap();
}
}
}
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/RichSpoutWrapper.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/RichSpoutWrapper.java
index e538ee3..7ad0811 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/RichSpoutWrapper.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/wrapper/storm/RichSpoutWrapper.java
@@ -41,6 +41,8 @@ public RichSpoutWrapper(Topology topology, TopologyComponent component,
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.context = context;
+
try {
// Register the metrics hook for this bolt to track statistics
context.addTaskHook(new SpoutMetricsHook());
@@ -48,8 +50,7 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
getDelegate().open(conf, context, collector);
} catch (FrameworkException ex) {
LOG.error("open() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("open() threw an uncaught exception: ", ex);
+ throw new RuntimeException(ex);
}
}
@@ -59,8 +60,6 @@ public void close() {
getDelegate().close();
} catch (FrameworkException ex) {
LOG.error("close() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("close() threw an uncaught exception: ", ex);
}
}
@@ -70,8 +69,6 @@ public void activate() {
getDelegate().activate();
} catch (FrameworkException ex) {
LOG.error("activate() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("activate() threw an uncaught exception: ", ex);
}
}
@@ -81,8 +78,6 @@ public void deactivate() {
getDelegate().deactivate();
} catch (FrameworkException ex) {
LOG.error("deactivate() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("deactivate() threw an uncaught exception: ", ex);
}
}
@@ -92,8 +87,6 @@ public void nextTuple() {
getDelegate().nextTuple();
} catch (FrameworkException ex) {
LOG.error("nextTuple() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("nextTuple() threw an uncaught exception: ", ex);
}
}
@@ -103,8 +96,6 @@ public void ack(Object msgId) {
getDelegate().ack(msgId);
} catch (FrameworkException ex) {
LOG.error("ack() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("ack() threw an uncaught exception: ", ex);
}
}
@@ -114,8 +105,6 @@ public void fail(Object msgId) {
getDelegate().fail(msgId);
} catch (FrameworkException ex) {
LOG.error("fail() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("fail() threw an uncaught exception: ", ex);
}
}
@@ -125,8 +114,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
getDelegate().declareOutputFields(declarer);
} catch (FrameworkException ex) {
LOG.error("declareOutputFields() not delegated due to a Framework exception: ", ex);
- } catch (Exception ex) {
- LOG.error("declareOutputFields() threw an uncaught exception: ", ex);
}
}
@@ -137,9 +124,6 @@ public Map getComponentConfiguration() {
} catch (FrameworkException ex) {
LOG.error("getComponentConfiguration() not delegated due to a Framework exception: ", ex);
return new HashMap();
- } catch (Exception ex) {
- LOG.error("getComponentConfiguration() threw an uncaught exception: ", ex);
- return new HashMap();
}
}
}
diff --git a/streamflow-core/streamflow-model/pom.xml b/streamflow-core/streamflow-model/pom.xml
index 1386a5f..e033007 100644
--- a/streamflow-core/streamflow-model/pom.xml
+++ b/streamflow-core/streamflow-model/pom.xml
@@ -22,7 +22,7 @@
streamflow
streamflow-core
- 0.9.1
+ 0.10.0
streamflow-model
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/ComponentPropertyOptions.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/ComponentPropertyOptions.java
index 0a382f3..08c5bc4 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/ComponentPropertyOptions.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/ComponentPropertyOptions.java
@@ -33,13 +33,15 @@ public class ComponentPropertyOptions implements Serializable {
private Integer minNumber;
- private Float floatStep;
+ private Double floatStep;
private String displayUnits;
private String dateFormat;
private Integer minuteStep;
+
+ private Integer numRows;
private String resourceFramework;
@@ -75,11 +77,11 @@ public void setMinNumber(Integer minNumber) {
this.minNumber = minNumber;
}
- public Float getFloatStep() {
+ public Double getFloatStep() {
return floatStep;
}
- public void setFloatStep(Float floatStep) {
+ public void setFloatStep(Double floatStep) {
this.floatStep = floatStep;
}
@@ -115,6 +117,14 @@ public void setMinuteStep(Integer minuteStep) {
this.minuteStep = minuteStep;
}
+ public Integer getNumRows() {
+ return numRows;
+ }
+
+ public void setNumRows(Integer numRows) {
+ this.numRows = numRows;
+ }
+
public String getResourceFramework() {
return resourceFramework;
}
@@ -141,6 +151,7 @@ public int hashCode() {
hash = 41 * hash + (this.displayUnits != null ? this.displayUnits.hashCode() : 0);
hash = 41 * hash + (this.dateFormat != null ? this.dateFormat.hashCode() : 0);
hash = 41 * hash + (this.minuteStep != null ? this.minuteStep.hashCode() : 0);
+ hash = 41 * hash + (this.numRows != null ? this.numRows.hashCode() : 0);
hash = 41 * hash + (this.resourceFramework != null ? this.resourceFramework.hashCode() : 0);
hash = 41 * hash + (this.resourceName != null ? this.resourceName.hashCode() : 0);
hash = 41 * hash + (this.listItems != null ? this.listItems.hashCode() : 0);
@@ -184,6 +195,10 @@ public boolean equals(Object obj) {
|| !this.minuteStep.equals(other.minuteStep))) {
return false;
}
+ if (this.numRows != other.numRows && (this.numRows == null
+ || !this.numRows.equals(other.numRows))) {
+ return false;
+ }
if ((this.resourceFramework == null) ? (other.resourceFramework != null)
: !this.resourceFramework.equals(other.resourceFramework)) {
return false;
@@ -205,7 +220,7 @@ public String toString() {
+ ", maxNumber=" + maxNumber + ", minNumber=" + minNumber
+ ", floatStep=" + floatStep + ", displayUnits=" + displayUnits
+ ", dateFormat=" + dateFormat + ", minuteStep=" + minuteStep
- + ", resourceFramework=" + resourceFramework
+ + ", numRows=" + numRows + ", resourceFramework=" + resourceFramework
+ ", resourceName=" + resourceName + ", listItems=" + listItems + '}';
}
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/ResourcePropertyOptions.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/ResourcePropertyOptions.java
index dbbf165..b3b67a4 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/ResourcePropertyOptions.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/ResourcePropertyOptions.java
@@ -33,13 +33,15 @@ public class ResourcePropertyOptions implements Serializable {
private Integer minNumber;
- private Float floatStep;
+ private Double floatStep;
private String displayUnits;
private String dateFormat;
private Integer minuteStep;
+
+ private Integer numRows;
@Embedded
private ArrayList listItems = new ArrayList();
@@ -72,11 +74,11 @@ public void setMinNumber(Integer minNumber) {
this.minNumber = minNumber;
}
- public Float getFloatStep() {
+ public Double getFloatStep() {
return floatStep;
}
- public void setFloatStep(Float floatStep) {
+ public void setFloatStep(Double floatStep) {
this.floatStep = floatStep;
}
@@ -112,6 +114,14 @@ public void setMinuteStep(Integer minuteStep) {
this.minuteStep = minuteStep;
}
+ public Integer getNumRows() {
+ return numRows;
+ }
+
+ public void setNumRows(Integer numRows) {
+ this.numRows = numRows;
+ }
+
@Override
public int hashCode() {
int hash = 5;
@@ -122,6 +132,7 @@ public int hashCode() {
hash = 59 * hash + (this.displayUnits != null ? this.displayUnits.hashCode() : 0);
hash = 59 * hash + (this.dateFormat != null ? this.dateFormat.hashCode() : 0);
hash = 59 * hash + (this.minuteStep != null ? this.minuteStep.hashCode() : 0);
+ hash = 59 * hash + (this.numRows != null ? this.numRows.hashCode() : 0);
hash = 59 * hash + (this.listItems != null ? this.listItems.hashCode() : 0);
return hash;
}
@@ -163,6 +174,10 @@ public boolean equals(Object obj) {
|| !this.minuteStep.equals(other.minuteStep))) {
return false;
}
+ if (this.numRows != other.numRows && (this.numRows == null
+ || !this.numRows.equals(other.numRows))) {
+ return false;
+ }
if (this.listItems != other.listItems && (this.listItems == null
|| !this.listItems.equals(other.listItems))) {
return false;
@@ -176,6 +191,6 @@ public String toString() {
+ ", maxNumber=" + maxNumber + ", minNumber=" + minNumber
+ ", floatStep=" + floatStep + ", displayUnits=" + displayUnits
+ ", dateFormat=" + dateFormat + ", minuteStep=" + minuteStep
- + ", listItems=" + listItems + '}';
+ + ", numRows=" + numRows + ", listItems=" + listItems + '}';
}
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/Topology.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/Topology.java
index d87b6f5..b73bc8f 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/Topology.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/Topology.java
@@ -63,6 +63,10 @@ public class Topology implements streamflow.model.util.Entity, Serializa
private String clusterId;
private String clusterName;
+
+ private String logLevel;
+
+ private String classLoaderPolicy;
private Date submitted;
@@ -199,6 +203,22 @@ public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
+ public String getLogLevel() {
+ return logLevel;
+ }
+
+ public void setLogLevel(String logLevel) {
+ this.logLevel = logLevel;
+ }
+
+ public String getClassLoaderPolicy() {
+ return classLoaderPolicy;
+ }
+
+ public void setClassLoaderPolicy(String classLoaderPolicy) {
+ this.classLoaderPolicy = classLoaderPolicy;
+ }
+
public Date getSubmitted() {
return submitted;
}
@@ -232,6 +252,8 @@ public int hashCode() {
hash = 41 * hash + (this.projectId != null ? this.projectId.hashCode() : 0);
hash = 41 * hash + (this.clusterId != null ? this.clusterId.hashCode() : 0);
hash = 41 * hash + (this.clusterName != null ? this.clusterName.hashCode() : 0);
+ hash = 41 * hash + (this.logLevel != null ? this.logLevel.hashCode() : 0);
+ hash = 41 * hash + (this.classLoaderPolicy != null ? this.classLoaderPolicy.hashCode() : 0);
hash = 41 * hash + (this.submitted != null ? this.submitted.hashCode() : 0);
hash = 41 * hash + (this.killed != null ? this.killed.hashCode() : 0);
return hash;
@@ -301,6 +323,14 @@ public boolean equals(Object obj) {
: !this.clusterName.equals(other.clusterName)) {
return false;
}
+ if ((this.logLevel == null) ? (other.logLevel != null)
+ : !this.logLevel.equals(other.logLevel)) {
+ return false;
+ }
+ if ((this.classLoaderPolicy == null) ? (other.classLoaderPolicy != null)
+ : !this.classLoaderPolicy.equals(other.classLoaderPolicy)) {
+ return false;
+ }
if (this.submitted != other.submitted && (this.submitted == null
|| !this.submitted.equals(other.submitted))) {
return false;
@@ -320,6 +350,7 @@ public String toString() {
+ ", currentConfig=" + currentConfig + ", deployedConfig=" + deployedConfig
+ ", status=" + status + ", projectId=" + projectId
+ ", clusterId=" + clusterId + ", clusterName=" + clusterName
+ + ", logLevel=" + logLevel + ", classLoaderPolicy=" + classLoaderPolicy
+ ", submitted=" + submitted + ", killed=" + killed + '}';
}
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyComponent.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyComponent.java
index 91941c2..d4b38d1 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyComponent.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyComponent.java
@@ -35,6 +35,8 @@ public class TopologyComponent implements Serializable {
private String name;
private String framework;
+
+ private String frameworkHash;
private String version;
@@ -94,6 +96,14 @@ public void setFramework(String framework) {
this.framework = framework;
}
+ public String getFrameworkHash() {
+ return frameworkHash;
+ }
+
+ public void setFrameworkHash(String frameworkHash) {
+ this.frameworkHash = frameworkHash;
+ }
+
public String getVersion() {
return version;
}
@@ -182,6 +192,7 @@ public int hashCode() {
hash = 11 * hash + (this.label != null ? this.label.hashCode() : 0);
hash = 11 * hash + (this.name != null ? this.name.hashCode() : 0);
hash = 11 * hash + (this.framework != null ? this.framework.hashCode() : 0);
+ hash = 11 * hash + (this.frameworkHash != null ? this.frameworkHash.hashCode() : 0);
hash = 11 * hash + (this.version != null ? this.version.hashCode() : 0);
hash = 11 * hash + (this.mainClass != null ? this.mainClass.hashCode() : 0);
hash = 11 * hash + this.parallelism;
@@ -225,6 +236,10 @@ public boolean equals(Object obj) {
: !this.framework.equals(other.framework)) {
return false;
}
+ if ((this.frameworkHash == null) ? (other.frameworkHash != null)
+ : !this.frameworkHash.equals(other.frameworkHash)) {
+ return false;
+ }
if ((this.version == null) ? (other.version != null)
: !this.version.equals(other.version)) {
return false;
@@ -263,8 +278,9 @@ public boolean equals(Object obj) {
@Override
public String toString() {
- return "TopologyComponent{" + "key=" + key + ", type=" + type + ", label=" + label
- + ", name=" + name + ", framework=" + framework + ", version=" + version
+ return "TopologyComponent{" + "key=" + key + ", type=" + type
+ + ", label=" + label + ", name=" + name + ", framework=" + framework
+ + ", frameworkHash=" + frameworkHash + ", version=" + version
+ ", mainClass=" + mainClass + ", parallelism=" + parallelism
+ ", posX=" + posX + ", posY=" + posY + ", properties=" + properties
+ ", propertyTypes=" + propertyTypes + ", resources=" + resources
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyConfig.java
index 74a3c4f..aea973f 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyConfig.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyConfig.java
@@ -15,6 +15,7 @@
*/
package streamflow.model;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.mongodb.morphia.annotations.Embedded;
@@ -24,6 +25,7 @@
@Embedded
@JsonInclude(Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
public class TopologyConfig implements Serializable {
@Embedded
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyLogCriteria.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyLogCriteria.java
index 7a12fb8..1962b96 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyLogCriteria.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyLogCriteria.java
@@ -33,6 +33,8 @@ public class TopologyLogCriteria implements Serializable {
private int pageSize = 100;
+ private boolean showHistoric = false;
+
private SortOrder sortOrder = SortOrder.DESC;
public enum SortOrder {
@@ -100,6 +102,14 @@ public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
+ public boolean getShowHistoric() {
+ return showHistoric;
+ }
+
+ public void setShowHistoric(boolean showHistoric) {
+ this.showHistoric = showHistoric;
+ }
+
public SortOrder getSortOrder() {
return sortOrder;
}
@@ -118,6 +128,7 @@ public int hashCode() {
hash = 59 * hash + (this.age != null ? this.age.hashCode() : 0);
hash = 59 * hash + this.pageNum;
hash = 59 * hash + this.pageSize;
+ hash = 59 * hash + (this.showHistoric ? 1 : 0);
hash = 59 * hash + (this.sortOrder != null ? this.sortOrder.hashCode() : 0);
return hash;
}
@@ -152,6 +163,9 @@ public boolean equals(Object obj) {
if (this.pageSize != other.pageSize) {
return false;
}
+ if (this.showHistoric != other.showHistoric) {
+ return false;
+ }
if (this.sortOrder != other.sortOrder) {
return false;
}
@@ -163,6 +177,6 @@ public String toString() {
return "TopologyLogCriteria{" + "query=" + query + ", component=" + component
+ ", level=" + level + ", category=" + category + ", age=" + age
+ ", pageNum=" + pageNum + ", pageSize=" + pageSize
- + ", sortOrder=" + sortOrder + '}';
+ + ", showHistoric=" + showHistoric + ", sortOrder=" + sortOrder + '}';
}
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyLogEntry.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyLogEntry.java
index 7f47ff8..a1bfeeb 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyLogEntry.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyLogEntry.java
@@ -23,6 +23,10 @@ public class TopologyLogEntry implements Serializable {
private String level;
+ private String host;
+
+ private String task;
+
private String component;
private String category;
@@ -49,6 +53,22 @@ public void setLevel(String level) {
this.level = level;
}
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getTask() {
+ return task;
+ }
+
+ public void setTask(String task) {
+ this.task = task;
+ }
+
public String getComponent() {
return component;
}
@@ -78,6 +98,8 @@ public int hashCode() {
int hash = 3;
hash = 97 * hash + (this.timestamp != null ? this.timestamp.hashCode() : 0);
hash = 97 * hash + (this.level != null ? this.level.hashCode() : 0);
+ hash = 97 * hash + (this.host != null ? this.host.hashCode() : 0);
+ hash = 97 * hash + (this.task != null ? this.task.hashCode() : 0);
hash = 97 * hash + (this.component != null ? this.component.hashCode() : 0);
hash = 97 * hash + (this.category != null ? this.category.hashCode() : 0);
hash = 97 * hash + (this.text != null ? this.text.hashCode() : 0);
@@ -99,6 +121,12 @@ public boolean equals(Object obj) {
if ((this.level == null) ? (other.level != null) : !this.level.equals(other.level)) {
return false;
}
+ if ((this.host == null) ? (other.host != null) : !this.host.equals(other.host)) {
+ return false;
+ }
+ if ((this.task == null) ? (other.task != null) : !this.task.equals(other.task)) {
+ return false;
+ }
if ((this.component == null) ? (other.component != null) : !this.component.equals(other.component)) {
return false;
}
@@ -114,6 +142,7 @@ public boolean equals(Object obj) {
@Override
public String toString() {
return "TopologyLogEntry{" + "timestamp=" + timestamp + ", level=" + level
- + ", component=" + component + ", category=" + category + ", text=" + text + '}';
+ + ", host=" + host + ", task=" + task + ", component=" + component
+ + ", category=" + category + ", text=" + text + '}';
}
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyResourceEntry.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyResourceEntry.java
index a9da83d..75b56aa 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyResourceEntry.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologyResourceEntry.java
@@ -34,6 +34,8 @@ public class TopologyResourceEntry implements Serializable {
private String version;
private String framework;
+
+ private String frameworkHash;
private String resource;
@@ -87,6 +89,14 @@ public void setFramework(String framework) {
this.framework = framework;
}
+ public String getFrameworkHash() {
+ return frameworkHash;
+ }
+
+ public void setFrameworkHash(String frameworkHash) {
+ this.frameworkHash = frameworkHash;
+ }
+
public String getResource() {
return resource;
}
@@ -127,6 +137,7 @@ public int hashCode() {
hash = 61 * hash + (this.description != null ? this.description.hashCode() : 0);
hash = 61 * hash + (this.version != null ? this.version.hashCode() : 0);
hash = 61 * hash + (this.framework != null ? this.framework.hashCode() : 0);
+ hash = 61 * hash + (this.frameworkHash != null ? this.frameworkHash.hashCode() : 0);
hash = 61 * hash + (this.resource != null ? this.resource.hashCode() : 0);
hash = 61 * hash + (this.resourceClass != null ? this.resourceClass.hashCode() : 0);
hash = 61 * hash + (this.properties != null ? this.properties.hashCode() : 0);
@@ -163,6 +174,10 @@ public boolean equals(Object obj) {
: !this.framework.equals(other.framework)) {
return false;
}
+ if ((this.frameworkHash == null) ? (other.frameworkHash != null)
+ : !this.frameworkHash.equals(other.frameworkHash)) {
+ return false;
+ }
if ((this.resource == null) ? (other.resource != null)
: !this.resource.equals(other.resource)) {
return false;
@@ -186,8 +201,8 @@ public boolean equals(Object obj) {
public String toString() {
return "TopologyResourceEntry{" + "id=" + id + ", name=" + name
+ ", description=" + description + ", version=" + version
- + ", framework=" + framework + ", resource=" + resource
- + ", resourceClass=" + resourceClass + ", properties=" + properties
- + ", propertyTypes=" + propertyTypes + '}';
+ + ", framework=" + framework + ", frameworkHash=" + frameworkHash
+ + ", resource=" + resource + ", resourceClass=" + resourceClass
+ + ", properties=" + properties + ", propertyTypes=" + propertyTypes + '}';
}
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologySerialization.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologySerialization.java
index 319bc58..da000c7 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologySerialization.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/TopologySerialization.java
@@ -32,6 +32,8 @@ public class TopologySerialization implements Serializable {
private String framework;
+ private String frameworkHash;
+
public TopologySerialization() {
}
@@ -68,6 +70,14 @@ public void setFramework(String framework) {
this.framework = framework;
}
+ public String getFrameworkHash() {
+ return frameworkHash;
+ }
+
+ public void setFrameworkHash(String frameworkHash) {
+ this.frameworkHash = frameworkHash;
+ }
+
@Override
public int hashCode() {
int hash = 7;
@@ -75,6 +85,7 @@ public int hashCode() {
hash = 61 * hash + (this.serializerClass != null ? this.serializerClass.hashCode() : 0);
hash = 61 * hash + (this.version != null ? this.version.hashCode() : 0);
hash = 61 * hash + (this.framework != null ? this.framework.hashCode() : 0);
+ hash = 61 * hash + (this.frameworkHash != null ? this.frameworkHash.hashCode() : 0);
return hash;
}
@@ -103,6 +114,10 @@ public boolean equals(Object obj) {
: !this.framework.equals(other.framework)) {
return false;
}
+ if ((this.frameworkHash == null) ? (other.frameworkHash != null)
+ : !this.frameworkHash.equals(other.frameworkHash)) {
+ return false;
+ }
return true;
}
@@ -110,6 +125,6 @@ public boolean equals(Object obj) {
public String toString() {
return "TopologySerialization{" + "typeClass=" + typeClass
+ ", serializerClass=" + serializerClass + ", version=" + version
- + ", framework=" + framework + '}';
+ + ", framework=" + framework + ", frameworkHash=" + frameworkHash + '}';
}
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/AuthConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/AuthConfig.java
index b315778..2895dd3 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/AuthConfig.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/AuthConfig.java
@@ -27,11 +27,11 @@ public class AuthConfig implements Serializable {
static Logger LOG = LoggerFactory.getLogger(AuthConfig.class);
- private boolean enabled;
+ private boolean enabled = false;
- private String realmClass;
+ private String realmClass = "streamflow.server.security.DatastoreRealm";
- private String moduleClass;
+ private String moduleClass = "streamflow.server.security.DatastoreRealmModule";
private final Map properties = new HashMap();
@@ -40,7 +40,8 @@ public AuthConfig() {
}
public boolean isEnabled() {
- return enabled;
+ return Boolean.parseBoolean(
+ System.getProperty("auth.enabled", Boolean.toString(enabled)));
}
public void setEnabled(boolean enabled) {
@@ -48,7 +49,7 @@ public void setEnabled(boolean enabled) {
}
public String getRealmClass() {
- return realmClass;
+ return System.getProperty("auth.realmClass", realmClass);
}
public void setRealmClass(String realmClass) {
@@ -56,7 +57,7 @@ public void setRealmClass(String realmClass) {
}
public String getModuleClass() {
- return moduleClass;
+ return System.getProperty("auth.moduleClass", moduleClass);
}
public void setModuleClass(String moduleClass) {
@@ -76,6 +77,11 @@ public void property(String name, Object value) {
public T getProperty(String name, Class clazz) {
T value = null;
+ // If the system property is found, add it to the properties object
+ if (System.getProperties().containsKey("auth." + name)) {
+ properties.put(name, System.getProperties().get("auth." + name));
+ }
+
if (properties.containsKey(name)) {
try {
// Attempt to type cast the Object to the specified Class
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/DatastoreConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/DatastoreConfig.java
index 942d3fe..65c7762 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/DatastoreConfig.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/DatastoreConfig.java
@@ -36,7 +36,7 @@ public DatastoreConfig() {
}
public String getModuleClass() {
- return moduleClass;
+ return System.getProperty("datastore.moduleClass", moduleClass);
}
public void setModuleClass(String moduleClass) {
@@ -56,6 +56,11 @@ public void property(String name, Object value) {
public T getProperty(String name, Class clazz) {
T value = null;
+ // If the system property is found, add it to the properties object
+ if (System.getProperties().containsKey("datastore." + name)) {
+ properties.put(name, System.getProperties().get("datastore." + name));
+ }
+
if (properties.containsKey(name)) {
try {
// Attempt to type cast the Object to the specified Class
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LoggerConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LoggerConfig.java
index ccd3ba2..a98eaaa 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LoggerConfig.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LoggerConfig.java
@@ -1,4 +1,4 @@
-/**
+ /**
* Copyright 2014 Lockheed Martin Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,13 +27,13 @@ public class LoggerConfig implements Serializable {
+ File.separator + ".streamflow" + File.separator + "logs";
private String formatPattern =
- "%d{ISO8601,GMT} %p %X{topology} %X{component} %c - %m%n";
+ "%d{yyyy-MM-dd'T'HH:mm:ss.sss'Z',GMT} %p %X{topology} %X{project} %X{task} %X{component} %c - %m%n";
public LoggerConfig() {
}
public String getLevel() {
- return level;
+ return System.getProperty("logger.level", level);
}
public void setLevel(String level) {
@@ -41,7 +41,7 @@ public void setLevel(String level) {
}
public String getBaseDir() {
- return baseDir;
+ return System.getProperty("logger.baseDir", baseDir);
}
public void setBaseDir(String baseDir) {
@@ -49,7 +49,7 @@ public void setBaseDir(String baseDir) {
}
public String getFormatPattern() {
- return formatPattern;
+ return System.getProperty("logger.formatPattern", formatPattern);
}
public void setFormatPattern(String formatPattern) {
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/ProxyConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/ProxyConfig.java
index 692d6f9..61cf1f7 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/ProxyConfig.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/ProxyConfig.java
@@ -27,7 +27,7 @@ public ProxyConfig() {
}
public String getHost() {
- return host;
+ return System.getProperty("proxy.host", host);
}
public void setHost(String host) {
@@ -35,6 +35,12 @@ public void setHost(String host) {
}
public int getPort() {
+ if (System.getProperty("proxy.port") != null) {
+ try {
+ port = Integer.parseInt(System.getProperty("proxy.port"));
+ } catch (Exception ex) {
+ }
+ }
return port;
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/ServerConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/ServerConfig.java
index d03fbd1..8e4f954 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/ServerConfig.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/ServerConfig.java
@@ -25,6 +25,12 @@ public ServerConfig() {
}
public int getPort() {
+ if (System.getProperty("server.port") != null) {
+ try {
+ port = Integer.parseInt(System.getProperty("server.port"));
+ } catch (Exception ex) {
+ }
+ }
return port;
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/storm/ErrorInfo.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/storm/ErrorInfo.java
index 0bbfd8c..ffdb8cf 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/storm/ErrorInfo.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/storm/ErrorInfo.java
@@ -16,12 +16,17 @@
package streamflow.model.storm;
import java.io.Serializable;
+import java.util.Objects;
public class ErrorInfo implements Serializable {
private String error;
private int errorTimeSecs;
+
+ private String host;
+
+ private int port;
public ErrorInfo() {
@@ -43,11 +48,29 @@ public void setErrorTimeSecs(int errorTimeSecs) {
this.errorTimeSecs = errorTimeSecs;
}
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
@Override
public int hashCode() {
int hash = 5;
- hash = 17 * hash + (this.error != null ? this.error.hashCode() : 0);
- hash = 17 * hash + this.errorTimeSecs;
+ hash = 37 * hash + Objects.hashCode(this.error);
+ hash = 37 * hash + this.errorTimeSecs;
+ hash = 37 * hash + Objects.hashCode(this.host);
+ hash = 37 * hash + this.port;
return hash;
}
@@ -60,17 +83,24 @@ public boolean equals(Object obj) {
return false;
}
final ErrorInfo other = (ErrorInfo) obj;
- if ((this.error == null) ? (other.error != null) : !this.error.equals(other.error)) {
+ if (!Objects.equals(this.error, other.error)) {
return false;
}
if (this.errorTimeSecs != other.errorTimeSecs) {
return false;
}
+ if (!Objects.equals(this.host, other.host)) {
+ return false;
+ }
+ if (this.port != other.port) {
+ return false;
+ }
return true;
}
@Override
public String toString() {
- return "ErrorInfo{" + "error=" + error + ", errorTimeSecs=" + errorTimeSecs + '}';
+ return "ErrorInfo{" + "error=" + error + ", errorTimeSecs=" + errorTimeSecs
+ + ", host=" + host + ", port=" + port + '}';
}
}
diff --git a/streamflow-core/streamflow-server/pom.xml b/streamflow-core/streamflow-server/pom.xml
index cd22af1..4cfaca3 100644
--- a/streamflow-core/streamflow-server/pom.xml
+++ b/streamflow-core/streamflow-server/pom.xml
@@ -22,7 +22,7 @@
streamflow
streamflow-core
- 0.9.1
+ 0.10.0
streamflow-server
@@ -141,11 +141,23 @@
src/main/resources
-
+
src/main/webapp
- true
+ false
+ **/META-INF/**
+ **/WEB-INF/**
+
+ ${project.build.outputDirectory}
+
+
+
+
+ src/main/webapp
+ false
+
+ **/META-INF/**
**/WEB-INF/**
${project.build.outputDirectory}/assets
@@ -153,41 +165,7 @@
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 2.3
-
-
- package
-
- shade
-
-
- false
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
- streamflow.server.StreamflowServer
-
-
-
-
-
-
-
-
+
org.apache.maven.plugins
maven-dependency-plugin
@@ -214,58 +192,12 @@
-
-
- org.lesscss
- lesscss-maven-plugin
- 1.3.3
-
- ${project.basedir}/src/main/webapp
- ${project.build.directory}/${project.build.finalName}
-
- vendor/**/*.less
-
-
- **/*.less
-
-
-
-
-
- compile
-
-
-
-
-
org.mortbay.jetty
jetty-maven-plugin
8.1.16.v20140903
-
-
-
- org.codehaus.mojo
- exec-maven-plugin
- 1.3.2
-
-
-
- exec
-
-
-
-
- java
-
- -cp
- ${project.build.directory}/${project.build.finalName}.jar
- streamflow.server.StreamFlowServer
-
-
-
diff --git a/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/SecurityModule.java b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/SecurityModule.java
index 041920b..7540442 100644
--- a/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/SecurityModule.java
+++ b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/SecurityModule.java
@@ -44,6 +44,9 @@ protected final void configureShiroWeb() {
// Basic auth - change auth scheme to hide native browser basic auth dialog box
bindConstant().annotatedWith(Names.named("shiro.authcScheme")).to("Streamflow");
+ // 12 hour session timeout in milliseconds
+ bindConstant().annotatedWith(Names.named("shiro.globalSessionTimeout")).to(43200000L);
+
// Attempt to load the custom realm class/module and use defaults if necessary
Class realmClass = loadRealmClass(authConfig);
AbstractModule realmModule = loadRealmModule(authConfig);
diff --git a/streamflow-core/streamflow-server/src/main/java/streamflow/server/resource/TopologyResource.java b/streamflow-core/streamflow-server/src/main/java/streamflow/server/resource/TopologyResource.java
index 87b84ba..3899684 100644
--- a/streamflow-core/streamflow-server/src/main/java/streamflow/server/resource/TopologyResource.java
+++ b/streamflow-core/streamflow-server/src/main/java/streamflow/server/resource/TopologyResource.java
@@ -113,6 +113,14 @@ public Response updateTopologyConfig(@PathParam("topologyId") String topologyId,
return Response.ok().build();
}
+ @GET
+ @Path("/{topologyId}/config/deployed")
+ @Produces(MediaType.APPLICATION_JSON)
+ public TopologyConfig getDeployedTopologyConfig(@PathParam("topologyId") String topologyId) {
+ String userId = (String) SecurityUtils.getSubject().getPrincipal();
+ return topologyService.getTopology(topologyId, userId).getDeployedConfig();
+ }
+
@GET
@Path("/{topologyId}/info")
@Produces(MediaType.APPLICATION_JSON)
@@ -125,17 +133,20 @@ public TopologyInfo getTopologyInfo(@PathParam("topologyId") String topologyId)
@Path("/{topologyId}/submit")
@Produces(MediaType.APPLICATION_JSON)
public Topology submitTopology(@PathParam("topologyId") String topologyId,
- @QueryParam("clusterId") String clusterId) {
+ @QueryParam("clusterId") String clusterId,
+ @QueryParam("logLevel") @DefaultValue("INFO") String logLevel,
+ @QueryParam("classLoaderPolicy") @DefaultValue("FRAMEWORK_FIRST") String classLoaderPolicy) {
String userId = (String) SecurityUtils.getSubject().getPrincipal();
- return topologyService.submitTopology(topologyId, userId, clusterId);
+ return topologyService.submitTopology(topologyId, userId, clusterId, logLevel, classLoaderPolicy);
}
@GET
@Path("/{topologyId}/kill")
public Response killTopology(@PathParam("topologyId") String topologyId,
- @QueryParam("waitTimeSecs") @DefaultValue("0") int waitTimeSecs) {
+ @QueryParam("waitTimeSecs") @DefaultValue("0") int waitTimeSecs,
+ @QueryParam("async") @DefaultValue("false") boolean async) {
String userId = (String) SecurityUtils.getSubject().getPrincipal();
- topologyService.killTopology(topologyId, waitTimeSecs, userId);
+ topologyService.killTopology(topologyId, waitTimeSecs, async, userId);
return Response.ok().build();
}
diff --git a/streamflow-core/streamflow-server/src/main/resources/icons/storm-bolt.png b/streamflow-core/streamflow-server/src/main/resources/icons/storm-bolt.png
deleted file mode 100644
index 57e410e..0000000
Binary files a/streamflow-core/streamflow-server/src/main/resources/icons/storm-bolt.png and /dev/null differ
diff --git a/streamflow-core/streamflow-server/src/main/resources/icons/storm-spout.png b/streamflow-core/streamflow-server/src/main/resources/icons/storm-spout.png
deleted file mode 100644
index 11d8d57..0000000
Binary files a/streamflow-core/streamflow-server/src/main/resources/icons/storm-spout.png and /dev/null differ
diff --git a/streamflow-core/streamflow-server/src/main/resources/icons/storm-trident.png b/streamflow-core/streamflow-server/src/main/resources/icons/storm-trident.png
deleted file mode 100644
index b3c96ad..0000000
Binary files a/streamflow-core/streamflow-server/src/main/resources/icons/storm-trident.png and /dev/null differ
diff --git a/streamflow-core/streamflow-server/src/main/webapp/app/app.js b/streamflow-core/streamflow-server/src/main/webapp/app/app.js
index 4ee16f2..65c2d12 100644
--- a/streamflow-core/streamflow-server/src/main/webapp/app/app.js
+++ b/streamflow-core/streamflow-server/src/main/webapp/app/app.js
@@ -29,7 +29,8 @@ var app = angular.module('streamflow', [
'streamflow.fileupload',
'streamflow.notify',
'streamflow.resize',
- 'streamflow.same'
+ 'streamflow.same',
+ 'streamflow.moment'
]);
// Route configuration for the various views
diff --git a/streamflow-core/streamflow-server/src/main/webapp/app/dashboard/dashboard.js b/streamflow-core/streamflow-server/src/main/webapp/app/dashboard/dashboard.js
index c3d5ff8..4313648 100644
--- a/streamflow-core/streamflow-server/src/main/webapp/app/dashboard/dashboard.js
+++ b/streamflow-core/streamflow-server/src/main/webapp/app/dashboard/dashboard.js
@@ -86,12 +86,10 @@ dashboardModule.controller('DashboardController', [
$scope.killTopology = function(topology) {
TopologyService.killTopology(topology,
function() {
- streamflowNotify.success('The topology was killed successfully.');
-
$scope.listTopologies();
},
function() {
- streamflowNotify.error('The topology was not killed due to a server error.');
+ $scope.listTopologies();
}
);
};
diff --git a/streamflow-core/streamflow-server/src/main/webapp/app/dashboard/dashboard.tpl.html b/streamflow-core/streamflow-server/src/main/webapp/app/dashboard/dashboard.tpl.html
index bf8621e..1129e2e 100644
--- a/streamflow-core/streamflow-server/src/main/webapp/app/dashboard/dashboard.tpl.html
+++ b/streamflow-core/streamflow-server/src/main/webapp/app/dashboard/dashboard.tpl.html
@@ -41,7 +41,7 @@