Skip to content

Commit

Permalink
strom集成其他框架
Browse files Browse the repository at this point in the history
  • Loading branch information
heibaiying committed Apr 18, 2019
1 parent 85f2539 commit 756d0eb
Show file tree
Hide file tree
Showing 22 changed files with 514 additions and 108 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ TODO
2. [Storm核心概念详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm核心概念详解.md)
3. [Storm单机版本环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Storm%E5%8D%95%E6%9C%BA%E7%89%88%E6%9C%AC%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA.md)
4. [Storm编程模型详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm编程模型详解.md)
5. Storm整合Redis
6. Storm整合HDFS/HBase
7. Storm整合Kafka
8. Storm Topology的两种打包方式

## 六、Flume

Expand Down
84 changes: 59 additions & 25 deletions code/Storm/storm-hbase-integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,25 @@
<version>1.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>1.2.2</storm.version>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--Storm整合HBase依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>${storm.version}</version>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
Expand All @@ -23,35 +38,54 @@
<target>8</target>
</configuration>
</plugin>
<!--使用shade进行打包-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/resources/assembly.xml</descriptor>
</descriptors>
<archive>
<manifest>
<mainClass>com.heibaiying.wordcount.ClusterWordCountApp</mainClass>
</manifest>
</archive>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>


<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--Storm整合HBase依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>${storm.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.heibaiying;

import com.heibaiying.component.CountBolt;
import com.heibaiying.component.DataSourceSpout;
import com.heibaiying.component.SplitBolt;
import org.apache.storm.Config;
Expand All @@ -18,9 +19,6 @@

/**
* 进行词频统计 并将统计结果存储到HBase中
* <p>
* 编译打包: mvn clean assembly:assembly -Dmaven.test.skip=true
* hdfs://hadoop001:8020/hbase
*/
public class WordCountToHBaseApp {

Expand All @@ -45,11 +43,13 @@ public static void main(String[] args) {
// 定义流数据与HBase中数据的映射
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withColumnFields(new Fields("word"))
.withCounterFields(new Fields("count"))
.withColumnFamily("cf");
.withColumnFields(new Fields("word","count"))
.withColumnFamily("info");

// 给HBaseBolt传入表名、数据映射关系、和HBase的配置信息
/*
* 给HBaseBolt传入表名、数据映射关系、和HBase的配置信息
* 表需要预先创建: create 'WordCount','info'
*/
HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
.withConfigKey("hbase.conf");

Expand All @@ -58,12 +58,14 @@ public static void main(String[] args) {
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
// save to HBase
builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(SPLIT_BOLT, new Fields("word"));
builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);


// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 1 && args[1].equals("cluster")) {
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.heibaiying.component;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
* 进行词频统计
*/
public class CountBolt extends BaseRichBolt {

private Map<String, Integer> counts = new HashMap<>();

private OutputCollector collector;


@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}

@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
collector.emit(new Values(word, String.valueOf(count)));

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

Expand Down
25 changes: 0 additions & 25 deletions code/Storm/storm-hbase-integration/src/main/resources/assembly.xml

This file was deleted.

27 changes: 26 additions & 1 deletion code/Storm/storm-hdfs-integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,24 @@
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
Expand Down Expand Up @@ -82,6 +96,17 @@
<artifactId>storm-hdfs</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@
import org.apache.storm.topology.TopologyBuilder;

/**
* 进行词频统计 并将统计结果存储到HDFS中
* <p>
* hdfs://hadoopp001:8020 path
* 将样本数据存储到HDFS中
*/
public class WordCountToHdfsApp {
public class DataToHdfsApp {

private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String HDFS_BOLT = "hdfsBolt";

public static void main(String[] args) {

// 定义存储文本的分隔符
// 指定Hadoop的用户名 如果不指定,则在HDFS创建目录时候有可能抛出无权限的异常(RemoteException: Permission denied)
System.setProperty("HADOOP_USER_NAME", "root");

// 定义输出字段(Field)之间的分隔符
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");

Expand All @@ -41,7 +42,7 @@ public static void main(String[] args) {
// 文件策略: 每个文件大小上限1M,超过限定时,创建新文件并继续写入
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);

// 定义完整路径
// 定义存储路径
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm-hdfs/");

Expand All @@ -57,20 +58,20 @@ public static void main(String[] args) {
// 构建Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// save to HBase
// save to HDFS
builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);


// 如果外部传参cluster则代表线上环境启动,否则代表本地启动
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToHdfsApp", new Config(), builder.createTopology());
StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToHdfsApp",
cluster.submitTopology("LocalDataToHdfsApp",
new Config(), builder.createTopology());
}
}
Expand Down
Loading

0 comments on commit 756d0eb

Please sign in to comment.