Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
gcasale committed Jul 8, 2017
1 parent 867944c commit 553bf1e
Show file tree
Hide file tree
Showing 25 changed files with 1,859 additions and 1 deletion.
2 changes: 1 addition & 1 deletion qt-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
</descriptorRefs>
<archive>
<manifest>
<mainClass>uk.ic.dice.qt.examples.ExclamationTopology</mainClass>
<mainClass>com.github.dice-project.qt.examples.ExclamationTopology</mainClass>
</manifest>
</archive>
</configuration>
Expand Down
23 changes: 23 additions & 0 deletions qt-lib/src/main/com/github/dice-project/qt/QTLoadInjector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.github.dice-project.qt;

import com.github.dice-project.qt.producer.*;
import com.github.dice-project.qt.spout.*;

public class QTLoadInjector {

public QTLoadInjector() {
}

public RateSpout getRateSpout() {
return new RateSpout();
}

public RateSpout getRateSpout(String filePath) {
return new RateSpout(filePath);
}

public RateProducer getRateProducer() {
return new RateProducer();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.github.dice-project.qt.examples;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.github.dice-project.qt.QTLoadInjector;
import com.github.dice-project.qt.spout.RateSpout;
import com.github.dice-project.qt.spout.RateSpout.ArrivalMode;
import com.github.dice-project.qt.spout.RateSpout.DataMode;
import com.github.dice-project.qt.util.LoadConfigProperties;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.IRichSpout;

public class BasicExclamationTopology {
public static void main(String[] args) throws Exception {

/* Storm topology builder */
TopologyBuilder builder = new TopologyBuilder();

/* Load custom property file */
Integer qtSpoutParallelism = 1;
Integer qtSpoutNumTasks = 1;
String qtSpoutName = "word-spout";
Map<String, Object> confSpout = new HashMap<String, Object>();
confSpout.put("spout", "spout");

/* Create QT-LIB's Spout factory */
QTLoadInjector qt = new QTLoadInjector();

/* Obtain a spout to inject at prescribed rates specified in a CSV file */
RateSpout qtSpout = (RateSpout) qt.getRateSpout();

ArrivalMode am = ArrivalMode.ParseCount;
qtSpout.setArrivalMode(am);
switch (am) {
case ParseIat : {
qtSpout.setArrivalFile("iats.txt");
} break;
case ParseRate : {
qtSpout.setArrivalFile("rates.txt");
} break;
case ParseCount : {
qtSpout.setArrivalFile("counts.txt");
} break;
default:
break;
}

DataMode dm = DataMode.ParseJSON;
qtSpout.setDataMode(dm);
switch (dm) {
case ParseJSON :{
qtSpout.setDataFile("test.json");
}break;
case ParseTimedJSON :{
qtSpout.setDataFile("test.json");
qtSpout.setTimeField("publicationTime");
}break;
default: // for all Rand modes
qtSpout.setDataVolume(1024);
break;
}

/* Install spout */
builder.setSpout(qtSpoutName, (IRichSpout) qtSpout, qtSpoutParallelism).addConfigurations(confSpout).setNumTasks(qtSpoutNumTasks);

/* Install bolts */
builder.setBolt("exclaim1", new ExclamationBolt(), 1).shuffleGrouping(qtSpoutName).setNumTasks(1);
builder.setBolt("exclaim2", new ExclamationBolt(),2).shuffleGrouping("exclaim1").setNumTasks(2);

/* Configure and start topology */
Config conf = new Config();
conf.setDebug(true); // this is to get debugging information in logs
conf.setNumWorkers(1);
conf.registerMetricsConsumer( org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
StormSubmitter.submitTopology("topology-qt", conf, builder.createTopology());

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.github.dice-project.qt.examples;


import java.util.Map;

import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichBolt;


//There are a variety of bolt types. In this case, we use BaseBasicBolt


public class ExclamationBolt extends BaseRichBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
OutputCollector _collector;


public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}


public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}


public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}


}


Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.github.dice-project.qt.examples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.github.dice-project.qt.QTLoadInjector;
import com.github.dice-project.qt.spout.RateSpout;
import com.github.dice-project.qt.spout.RateSpout.ArrivalMode;
import com.github.dice-project.qt.spout.RateSpout.DataMode;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.Nimbus.Client;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;

import com.github.dice-project.qt.util.*;

public class ExclamationTopology {

public static void main(String[] args) throws Exception {
String timeStampDate = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
String timeStampTime = new SimpleDateFormat("HH:mm:ss.000").format(new Date());
String tStart = timeStampDate + "T"+ timeStampTime + "Z";
double maxBoltCapacity = 1.0;
double curMaxBoltCapacity = 0.0;

int numExperiments = 20;
for (int topologyId=1; topologyId<=numExperiments; topologyId++) {

/* Storm topology builder */
TopologyBuilder builder = new TopologyBuilder();

/* Load custom property file */
Integer qtSpoutParallelism = 2;
Integer qtSpoutNumTasks = 2;
String qtSpoutName = "word-spout";
Map<String, Object> confSpout = new HashMap<String, Object>();
confSpout.put("spout", "spout");

/* Create QT-LIB's Spout factory */
QTLoadInjector qt = new QTLoadInjector();

/* Obtain a spout to inject at prescribed rates specified in a CSV file */
RateSpout qtSpout = (RateSpout) qt.getRateSpout();

ArrivalMode am = ArrivalMode.ParseCount;
qtSpout.setArrivalMode(am);
qtSpout.setRateScaler(topologyId*200.0);
switch (am) {
case ParseIat : {
qtSpout.setArrivalFile("iats.txt");
} break;
case ParseRate : {
qtSpout.setArrivalFile("rates.txt");
} break;
case ParseCount : {
qtSpout.setArrivalFile("counts.txt");
} break;
default:
break;
}

DataMode dm = DataMode.ParseJSON;
qtSpout.setDataMode(dm);
qtSpout.setBinaryBDoc(false);
switch (dm) {
case ParseJSON :{
qtSpout.setDataFile("test.json");
}break;
case ParseTimedJSON :{
qtSpout.setDataFile("test.json");
qtSpout.setTimeField("publicationTime");
}break;
default: // for all Rand modes
qtSpout.setDataVolume(1024);
break;
}

String topologyName = "topology-qt"+topologyId;
/* Install spout */
builder.setSpout(qtSpoutName, (IRichSpout) qtSpout, qtSpoutParallelism).addConfigurations(confSpout).setNumTasks(qtSpoutNumTasks);

/* Install bolts */
builder.setBolt("exclaim1", new ExclamationBolt(), 1).shuffleGrouping(qtSpoutName).setNumTasks(1);
builder.setBolt("exclaim2", new ExclamationBolt(),2).shuffleGrouping("exclaim1").setNumTasks(2);

/* Configure and start topology */
Config conf = new Config();
conf.setDebug(true); // this is to get debugging information in logs
conf.setNumWorkers(1);
//conf.registerMetricsConsumer( org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
StormTopology myTopology = builder.createTopology();
StormSubmitter.submitTopology(topologyName, conf, myTopology);

/* Let the topology run for the specified time then kill it*/
int topologyDuration = 60;
try{
Thread.sleep(topologyDuration*1000);
}catch(InterruptedException e){
System.out.println("got interrupted!");
}

/* determine maximal bolt capacity from StormUI */
String StormUIurl = "http://192.168.0.4:8080"; // to configure as needed
String encodedId = StormUICapacityMonitor.getId(StormUIurl, topologyName);
curMaxBoltCapacity = StormUICapacityMonitor.getMaxCapacity(StormUIurl, encodedId);
System.out.println("Max Bolt Capacity (StormUI, Topology: "+ encodedId +"): "+curMaxBoltCapacity);

/* Kill the last topology */
Map stormConf = Utils.readStormConfig();
Client client = NimbusClient.getConfiguredClient(stormConf).getClient();
KillOptions killOpts = new KillOptions();
client.killTopologyWithOpts(topologyName, killOpts); //provide topology name


/* time before running the next topology */
int topologySleep = 30;
try{
Thread.sleep(topologySleep*1000);
}catch(InterruptedException e){
System.out.println("got interrupted!");
}

/* determine maximal bolt capacity from DMON */
timeStampDate = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
timeStampTime = new SimpleDateFormat("HH:mm:ss.000").format(new Date());
String tEnd = timeStampDate + "T"+ timeStampTime + "Z";
int maxDMONRecords = 100;
String DMONurl = "http://0.0.0.0:5001"; // to configure as needed
// uncomment the next lines to enable DMON-based monitoring
/*curMaxBoltCapacity = DMONBoltCapacityMonitor.getMaxCapacity(DMONurl, tStart, tEnd, maxDMONRecords);
System.out.println("Max Bolt Capacity (DMON," + "Topology: " + topologyId + " Start: " + tStart + " End: " + tEnd +"): "+curMaxBoltCapacity);
if (curMaxBoltCapacity>=maxBoltCapacity) {
System.out.println("Current bolt capacity is "+ curMaxBoltCapacity + " and exceeds the maximum bolt capacity: " + maxBoltCapacity + " STOPPING experiment.");
return;
}
*/
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.github.dice-project.qt.examples;

import com.github.dice-project.qt.QTLoadInjector;
import com.github.dice-project.qt.producer.RateProducer;

public class KafkaRateProducer {

public static void main(String[] args) {
QTLoadInjector QT = new QTLoadInjector();
String input_file = "test.json"; // this is assumed to be a resource of the project jar file

RateProducer RP;
RP=QT.getRateProducer();
String topic = "dice"; // topic get created automatically
String bootstrap_server = "localhost:9092";

// random message data - default is to send a single message
RP.run(bootstrap_server, topic);

// random message data
RP.run(bootstrap_server, topic);

RP.setMessageCount(1);
// data from jsonfile
RP.run(bootstrap_server, topic, input_file);

// data from jsonfile -- this is going to read more data than available in the json file, looping
RP.setMessageCount(101);
RP.run(bootstrap_server, topic, input_file);


}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.github.dice-project.qt.loadgen;
// author: yifan

import java.util.Map;

public interface LoadGeneratorInterface {

public void generateLoadForWords(Map<String, String> conf);

public void generateLoadForBinary(Map<String, String> conf);

}
Loading

0 comments on commit 553bf1e

Please sign in to comment.