diff --git a/README.md b/README.md
index a9aef490..fc14aca7 100644
--- a/README.md
+++ b/README.md
@@ -77,6 +77,10 @@ TODO
2. [Storm核心概念详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm核心概念详解.md)
3. [Storm单机版本环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Storm%E5%8D%95%E6%9C%BA%E7%89%88%E6%9C%AC%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA.md)
4. [Storm编程模型详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm编程模型详解.md)
+5. Storm整合Redis
+6. Storm整合HDFS/HBase
+7. Storm整合Kafka
+8. Storm Topology的两种打包方式
## 六、Flume
diff --git a/code/Storm/storm-hbase-integration/pom.xml b/code/Storm/storm-hbase-integration/pom.xml
index c0446a90..8a5c7252 100644
--- a/code/Storm/storm-hbase-integration/pom.xml
+++ b/code/Storm/storm-hbase-integration/pom.xml
@@ -9,10 +9,25 @@
1.0
- UTF-8
1.2.2
+
+
+
+ org.apache.storm
+ storm-core
+ ${storm.version}
+
+
+
+ org.apache.storm
+ storm-hbase
+ ${storm.version}
+
+
+
+
@@ -23,35 +38,54 @@
8
+
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
-
- src/main/resources/assembly.xml
-
-
-
- com.heibaiying.wordcount.ClusterWordCountApp
-
-
+ true
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.sf
+ META-INF/*.DSA
+ META-INF/*.dsa
+ META-INF/*.RSA
+ META-INF/*.rsa
+ META-INF/*.EC
+ META-INF/*.ec
+ META-INF/MSFTSIG.SF
+ META-INF/MSFTSIG.RSA
+
+
+
+
+
+ org.apache.storm:storm-core
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+
+
+
-
-
-
- org.apache.storm
- storm-core
- ${storm.version}
-
-
-
- org.apache.storm
- storm-hbase
- ${storm.version}
-
-
-
\ No newline at end of file
diff --git a/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/WordCountToHBaseApp.java b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/WordCountToHBaseApp.java
index 3844b636..57c4aa90 100644
--- a/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/WordCountToHBaseApp.java
+++ b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/WordCountToHBaseApp.java
@@ -1,5 +1,6 @@
package com.heibaiying;
+import com.heibaiying.component.CountBolt;
import com.heibaiying.component.DataSourceSpout;
import com.heibaiying.component.SplitBolt;
import org.apache.storm.Config;
@@ -18,9 +19,6 @@
/**
* 进行词频统计 并将统计结果存储到HBase中
- *
- * 编译打包: mvn clean assembly:assembly -Dmaven.test.skip=true
- * hdfs://hadoop001:8020/hbase
*/
public class WordCountToHBaseApp {
@@ -45,11 +43,13 @@ public static void main(String[] args) {
// 定义流数据与HBase中数据的映射
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
- .withColumnFields(new Fields("word"))
- .withCounterFields(new Fields("count"))
- .withColumnFamily("cf");
+ .withColumnFields(new Fields("word","count"))
+ .withColumnFamily("info");
- // 给HBaseBolt传入表名、数据映射关系、和HBase的配置信息
+ /*
+ * 给HBaseBolt传入表名、数据映射关系、和HBase的配置信息
+ * 表需要预先创建: create 'WordCount','info'
+ */
HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
.withConfigKey("hbase.conf");
@@ -58,12 +58,14 @@ public static void main(String[] args) {
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
+ // count
+ builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
// save to HBase
- builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(SPLIT_BOLT, new Fields("word"));
+ builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
- if (args.length > 1 && args[1].equals("cluster")) {
+ if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
diff --git a/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/CountBolt.java b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/CountBolt.java
new file mode 100644
index 00000000..29974dfe
--- /dev/null
+++ b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/CountBolt.java
@@ -0,0 +1,47 @@
+package com.heibaiying.component;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 进行词频统计
+ */
+public class CountBolt extends BaseRichBolt {
+
+ private Map counts = new HashMap<>();
+
+ private OutputCollector collector;
+
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector=collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String word = input.getStringByField("word");
+ Integer count = counts.get(word);
+ if (count == null) {
+ count = 0;
+ }
+ count++;
+ counts.put(word, count);
+ // 输出
+ collector.emit(new Values(word, String.valueOf(count)));
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+}
diff --git a/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/SplitBolt.java b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/SplitBolt.java
index b315f112..06bc1d87 100644
--- a/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/SplitBolt.java
+++ b/code/Storm/storm-hbase-integration/src/main/java/com/heibaiying/component/SplitBolt.java
@@ -6,7 +6,6 @@
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
import java.util.Map;
diff --git a/code/Storm/storm-hbase-integration/src/main/resources/assembly.xml b/code/Storm/storm-hbase-integration/src/main/resources/assembly.xml
deleted file mode 100644
index dec0017b..00000000
--- a/code/Storm/storm-hbase-integration/src/main/resources/assembly.xml
+++ /dev/null
@@ -1,25 +0,0 @@
-
-
- with-dependencies
-
-
-
- jar
-
-
- false
-
-
- /
- true
- true
- runtime
-
-
- org.apache.storm:storm-core
-
-
-
-
\ No newline at end of file
diff --git a/code/Storm/storm-hdfs-integration/pom.xml b/code/Storm/storm-hdfs-integration/pom.xml
index 5d1ad12a..f65e03ca 100644
--- a/code/Storm/storm-hdfs-integration/pom.xml
+++ b/code/Storm/storm-hdfs-integration/pom.xml
@@ -43,10 +43,24 @@
*:*
- org.apache.storm:storm-core
+ META-INF/*.SF
+ META-INF/*.sf
+ META-INF/*.DSA
+ META-INF/*.dsa
+ META-INF/*.RSA
+ META-INF/*.rsa
+ META-INF/*.EC
+ META-INF/*.ec
+ META-INF/MSFTSIG.SF
+ META-INF/MSFTSIG.RSA
+
+
+ org.apache.storm:storm-core
+
+
@@ -82,6 +96,17 @@
storm-hdfs
${storm.version}
+
+ org.apache.hadoop
+ hadoop-common
+ 2.6.0-cdh5.15.2
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
org.apache.hadoop
hadoop-client
diff --git a/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/WordCountToHdfsApp.java b/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/DataToHdfsApp.java
similarity index 83%
rename from code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/WordCountToHdfsApp.java
rename to code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/DataToHdfsApp.java
index ff9d1396..cb370142 100644
--- a/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/WordCountToHdfsApp.java
+++ b/code/Storm/storm-hdfs-integration/src/main/java/com.heibaiying/DataToHdfsApp.java
@@ -20,18 +20,19 @@
import org.apache.storm.topology.TopologyBuilder;
/**
- * 进行词频统计 并将统计结果存储到HDFS中
- *
- * hdfs://hadoopp001:8020 path
+ * 将样本数据存储到HDFS中
*/
-public class WordCountToHdfsApp {
+public class DataToHdfsApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String HDFS_BOLT = "hdfsBolt";
public static void main(String[] args) {
- // 定义存储文本的分隔符
+ // 指定Hadoop的用户名 如果不指定,则在HDFS创建目录时候有可能抛出无权限的异常(RemoteException: Permission denied)
+ System.setProperty("HADOOP_USER_NAME", "root");
+
+ // 定义输出字段(Field)之间的分隔符
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
@@ -41,7 +42,7 @@ public static void main(String[] args) {
// 文件策略: 每个文件大小上限1M,超过限定时,创建新文件并继续写入
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);
- // 定义完整路径
+ // 定义存储路径
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm-hdfs/");
@@ -57,20 +58,20 @@ public static void main(String[] args) {
// 构建Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
- // save to HBase
+ // save to HDFS
builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);
// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
- StormSubmitter.submitTopology("ClusterWordCountToHdfsApp", new Config(), builder.createTopology());
+ StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("LocalWordCountToHdfsApp",
+ cluster.submitTopology("LocalDataToHdfsApp",
new Config(), builder.createTopology());
}
}
diff --git a/code/Storm/storm-kafka-integration/pom.xml b/code/Storm/storm-kafka-integration/pom.xml
new file mode 100644
index 00000000..f1ffe160
--- /dev/null
+++ b/code/Storm/storm-kafka-integration/pom.xml
@@ -0,0 +1,94 @@
+
+
+ 4.0.0
+
+ com.heibaiying
+ storm-kafka-integration
+ 1.0
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 8
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ true
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.sf
+ META-INF/*.DSA
+ META-INF/*.dsa
+ META-INF/*.RSA
+ META-INF/*.rsa
+ META-INF/*.EC
+ META-INF/*.ec
+ META-INF/MSFTSIG.SF
+ META-INF/MSFTSIG.RSA
+
+
+
+
+
+ org.apache.storm:storm-core
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 1.2.2
+ 2.2.0
+
+
+
+
+ org.apache.storm
+ storm-core
+ ${storm.version}
+
+
+ org.apache.storm
+ storm-kafka-client
+ ${storm.version}
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+
+
\ No newline at end of file
diff --git a/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/read/LogConsoleBolt.java b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/read/LogConsoleBolt.java
new file mode 100644
index 00000000..16b65f2a
--- /dev/null
+++ b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/read/LogConsoleBolt.java
@@ -0,0 +1,40 @@
+package com.heibaiying.kafka.read;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
+
+/**
+ * 打印从Kafka中获取的数据
+ */
+public class LogConsoleBolt extends BaseRichBolt {
+
+
+ private OutputCollector collector;
+
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector=collector;
+ }
+
+ public void execute(Tuple input) {
+ try {
+ String value = input.getStringByField("value");
+ System.out.println("received from kafka : "+ value);
+ // 必须ack,否则会重复消费kafka中的消息
+ collector.ack(input);
+ }catch (Exception e){
+ e.printStackTrace();
+ collector.fail(input);
+ }
+
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
diff --git a/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/read/ReadingFromKafkaApp.java b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/read/ReadingFromKafkaApp.java
new file mode 100644
index 00000000..fdddd446
--- /dev/null
+++ b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/read/ReadingFromKafkaApp.java
@@ -0,0 +1,61 @@
+package com.heibaiying.kafka.read;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.topology.TopologyBuilder;
+
+/**
+ * 从Kafka中读取数据
+ */
+public class ReadingFromKafkaApp {
+
+ private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
+ private static final String TOPIC_NAME = "storm-topic";
+
+ public static void main(String[] args) {
+
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);
+ builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");
+
+ // 如果外部传参cluster则代表线上环境启动,否则代表本地启动
+ if (args.length > 0 && args[0].equals("cluster")) {
+ try {
+ StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());
+ } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("LocalReadingFromKafkaApp",
+ new Config(), builder.createTopology());
+ }
+ }
+
+ private static KafkaSpoutConfig getKafkaSpoutConfig(String bootstrapServers, String topic) {
+ return KafkaSpoutConfig.builder(bootstrapServers, topic)
+ // 除了分组ID,以下配置都是可选的。分组ID必须指定,否则会抛出InvalidGroupIdException异常
+ .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+ // 定义重试策略
+ .setRetry(getRetryService())
+ // 定时提交偏移量的时间间隔,默认是15s
+ .setOffsetCommitPeriodMs(10_000)
+ .build();
+ }
+
+ // 定义重试策略
+ private static KafkaSpoutRetryService getRetryService() {
+ return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
+ TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
+ }
+}
diff --git a/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/DataSourceSpout.java b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/DataSourceSpout.java
new file mode 100644
index 00000000..27690469
--- /dev/null
+++ b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/DataSourceSpout.java
@@ -0,0 +1,52 @@
+package com.heibaiying.kafka.write;
+
+import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.*;
+
+/**
+ * 产生词频样本的数据源
+ */
+public class DataSourceSpout extends BaseRichSpout {
+
+ private List list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
+
+ private SpoutOutputCollector spoutOutputCollector;
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ this.spoutOutputCollector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ // 模拟产生数据
+ String lineData = productData();
+ spoutOutputCollector.emit(new Values("key",lineData));
+ Utils.sleep(1000);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare( new Fields("key", "message"));
+ }
+
+
+ /**
+ * 模拟数据
+ */
+ private String productData() {
+ Collections.shuffle(list);
+ Random random = new Random();
+ int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
+ return StringUtils.join(list.toArray(), "\t", 0, endIndex);
+ }
+
+}
\ No newline at end of file
diff --git a/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/WritingToKafkaApp.java b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/WritingToKafkaApp.java
new file mode 100644
index 00000000..281824ba
--- /dev/null
+++ b/code/Storm/storm-kafka-integration/src/main/java/com/heibaiying/kafka/write/WritingToKafkaApp.java
@@ -0,0 +1,67 @@
+package com.heibaiying.kafka.write;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.kafka.bolt.KafkaBolt;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.topology.TopologyBuilder;
+
+import java.util.Properties;
+
+/**
+ * 写入数据到Kafka的特定主题中
+ */
+public class WritingToKafkaApp {
+
+ private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
+ private static final String TOPIC_NAME = "storm-topic";
+
+ public static void main(String[] args) {
+
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ // 定义Kafka生产者属性
+ Properties props = new Properties();
+ /*
+ * 指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息。
+ * 不过建议至少要提供两个broker的信息作为容错。
+ */
+ props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
+ /*
+ * acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
+ * acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
+ * acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
+ * acks=all : 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
+ */
+ props.put("acks", "1");
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ KafkaBolt bolt = new KafkaBolt()
+ .withProducerProperties(props)
+ .withTopicSelector(new DefaultTopicSelector(TOPIC_NAME))
+ .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());
+
+ builder.setSpout("sourceSpout", new DataSourceSpout(), 1);
+ builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");
+
+
+ if (args.length > 0 && args[0].equals("cluster")) {
+ try {
+ StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology());
+ } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("LocalWritingToKafkaApp",
+ new Config(), builder.createTopology());
+ }
+ }
+}
diff --git a/code/Storm/storm-redis-integration/pom.xml b/code/Storm/storm-redis-integration/pom.xml
index 04c43b41..52ee5b8a 100644
--- a/code/Storm/storm-redis-integration/pom.xml
+++ b/code/Storm/storm-redis-integration/pom.xml
@@ -23,18 +23,52 @@
8
+
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
-
- src/main/resources/assembly.xml
-
-
-
- com.heibaiying.wordcount.ClusterWordCountApp
-
-
+ true
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.sf
+ META-INF/*.DSA
+ META-INF/*.dsa
+ META-INF/*.RSA
+ META-INF/*.rsa
+ META-INF/*.EC
+ META-INF/*.ec
+ META-INF/MSFTSIG.SF
+ META-INF/MSFTSIG.RSA
+
+
+
+
+
+ org.apache.storm:storm-core
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+
+
+
diff --git a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java
index c1591d5e..f3b75532 100644
--- a/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java
+++ b/code/Storm/storm-redis-integration/src/main/java/com/heibaiying/WordCountToRedisApp.java
@@ -17,10 +17,6 @@
/**
* 进行词频统计 并将统计结果存储到Redis中
- *
- * 编译打包: mvn clean assembly:assembly -Dmaven.test.skip=true
- * 提交Topology到集群: storm jar /usr/appjar/storm-redis-integration-1.0-with-dependencies.jar com.heibaiying.WordCountToRedisApp cluster
- * 停止Topology: storm kill ClusterWordCountApp -w 3
*/
public class WordCountToRedisApp {
diff --git a/code/Storm/storm-redis-integration/src/main/resources/assembly.xml b/code/Storm/storm-redis-integration/src/main/resources/assembly.xml
deleted file mode 100644
index dec0017b..00000000
--- a/code/Storm/storm-redis-integration/src/main/resources/assembly.xml
+++ /dev/null
@@ -1,25 +0,0 @@
-
-
- with-dependencies
-
-
-
- jar
-
-
- false
-
-
- /
- true
- true
- runtime
-
-
- org.apache.storm:storm-core
-
-
-
-
\ No newline at end of file
diff --git a/pictures/storm-hbase-result.png b/pictures/storm-hbase-result.png
new file mode 100644
index 00000000..78e1d897
Binary files /dev/null and b/pictures/storm-hbase-result.png differ
diff --git a/pictures/storm-hdfs-result.png b/pictures/storm-hdfs-result.png
new file mode 100644
index 00000000..79ad0fff
Binary files /dev/null and b/pictures/storm-hdfs-result.png differ
diff --git a/pictures/storm-jar-complie-error.png b/pictures/storm-jar-complie-error.png
new file mode 100644
index 00000000..ef91ae19
Binary files /dev/null and b/pictures/storm-jar-complie-error.png differ
diff --git a/pictures/storm-kafka-producer.png b/pictures/storm-kafka-producer.png
new file mode 100644
index 00000000..0ef8e4b4
Binary files /dev/null and b/pictures/storm-kafka-producer.png differ
diff --git a/pictures/storm-kafka-receiver.png b/pictures/storm-kafka-receiver.png
new file mode 100644
index 00000000..af86f14d
Binary files /dev/null and b/pictures/storm-kafka-receiver.png differ
diff --git a/pictures/strom-kafka-consumer.png b/pictures/strom-kafka-consumer.png
new file mode 100644
index 00000000..d63dc2b7
Binary files /dev/null and b/pictures/strom-kafka-consumer.png differ