diff --git a/README.md b/README.md index fc14aca7..468e780c 100644 --- a/README.md +++ b/README.md @@ -77,10 +77,10 @@ TODO 2. [Storm核心概念详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm核心概念详解.md) 3. [Storm单机版本环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Storm%E5%8D%95%E6%9C%BA%E7%89%88%E6%9C%AC%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA.md) 4. [Storm编程模型详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm编程模型详解.md) -5. Storm整合Redis -6. Storm整合HDFS/HBase -7. Storm整合Kafka -8. Storm Topology的两种打包方式 +5. [Storm项目三种打包方式对比分析](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm三种打包方式对比分析.md) +6. [Storm集成Redis详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm集成Redis详解.md) +7. [Storm集成HDFS/HBase](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm集成HBase和HDFS.md) +8. [Storm集成Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm集成Kakfa.md) ## 六、Flume @@ -115,15 +115,18 @@ TODO 10. [Spring/Spring Boot 整合 Mybatis + Phoenix](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spring%2BMybtais%2BPhoenix%E6%95%B4%E5%90%88.md) ## 十、Kafka -1. Kafka 简介及消息处理过程分析 +1. [Kafka 核心概念介绍](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka核心概念介绍.md) 2. 基于Zookeeper搭建Kafka高可用集群 -3. Kafka 副本机制以及选举原理剖析 +3. Kafka生产者详解 +4. Kafka消费者详解 +5. Kafka 副本机制以及选举原理剖析 +6. Kafka的数据可靠性 ## 十一、Zookeeper -1. Zookeeper 简介及原理介绍 -2. Zookeeper 集群搭建Zookeeper -3. 分布式锁实现方案Zookeeper +1. Zookeeper 简介及核心概念 +2. Zookeeper集群搭建Zookeeper +3. Zookeeper分布式锁实现方案 4. 集群升级、迁移深入分析 Zookeeper 5. Zab协议及选举机制 diff --git "a/notes/Kafka \345\211\257\346\234\254\346\234\272\345\210\266\344\273\245\345\217\212\351\200\211\344\270\276\345\216\237\347\220\206\345\211\226\346\236\220.md" "b/notes/Kafka \345\211\257\346\234\254\346\234\272\345\210\266\344\273\245\345\217\212\351\200\211\344\270\276\345\216\237\347\220\206\345\211\226\346\236\220.md" new file mode 100644 index 00000000..e69de29b diff --git "a/notes/Kafka\346\240\270\345\277\203\346\246\202\345\277\265\344\273\213\347\273\215.md" "b/notes/Kafka\346\240\270\345\277\203\346\246\202\345\277\265\344\273\213\347\273\215.md" new file mode 100644 index 00000000..ae7e06de --- /dev/null +++ "b/notes/Kafka\346\240\270\345\277\203\346\246\202\345\277\265\344\273\213\347\273\215.md" @@ -0,0 +1,65 @@ +# Kafka核心概念介绍 + + + + +## 一、Kafka简介 + +ApacheKafka是一个分布式的流处理平台。它具有以下特点: + ++ 支持消息的发布和订阅,类似于RabbtMQ、ActiveMQ等消息队列; ++ 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错性; ++ 支持数据在线实时处理; ++ 高吞吐率,单broker可以轻松处理数千个分区以及每秒百万级的消息量; ++ 能保证消息的可靠性投递。 + +## 二、Kafka核心概念 + +### 2.1 Messages And Batches + +Kafka的基本数据单元被称为message(消息),为减少网络开销,提高效率,多个消息会被放入同一批次(Batch)中后再写入。 + +### 2.2 Topics And Partitions + +kafka 的消息通过Topics(主题)进行分类,可以把Topics理解为关系型数据库中的表。一个主题可以被分为若干个Partitions(分区),一个分区就是一个提交日志(commit log)。 + +消息以追加的方式写入分区,然后以先入先出的顺序读取。kafka通过分区来实现数据的冗余和伸缩性,分区可以分布在不同的服务器上,这意味着一个Topic可以横跨多个服务器,以提供比单个服务器更强大的性能。 + +由于一个Topic包含多个分区,因此无法在整个Topic范围内保证消息的顺序性,但可以保证消息在单个分区内的顺序性。 + +
+ +### 2.3 Producers And Consumers + +#### 1. 生产者 + +生产者负责创建消息。一般情况下,生产者在把消息均衡地分布到在主题的所有分区上,而并不关心消息会被写到哪个分区。如果我们想要把消息写到指定的分区,通过分区器对消息键进行散列来实现。 + +#### 2. 消费者 + +消费者是消费者群组的一部分,消费者负责消费消息。消费者可以订阅一个或者多个主题,并按照消息生成的顺序来读取它们。消费者通过检查消息的偏移量(offset)来区分读取过的消息。 + +偏移量是一个不断递增的数值,在创建消息时,Kafka会把它添加到其中,在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的偏移量保存在Zookeeper或Kafka上,如果消费者关闭或者重启,它还可以重新获取该偏移量,以保证读取状态不会丢失。 + +
+ +一个分区只能被同一个消费者群组里面的一个消费者读取,但可以被不同消费者群组中所组成的多个消费者共同读取。多个消费者群组中消费者共同读取同一个主题时,彼此之间互不影响。 + +
+ +### 2.4 Brokers And Clusters + +一个独立的kafka服务器被称为broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘的消息。 + +broker是集群(Cluster)的组成部分。每一个集群都有一个broker同时充当了集群控制器(controller)的角色(自动从集群的活跃成员中选举出来),控制器负责管理工作,包括将分区分配给broker和监控broker。 + +在集群中,一个分区(Partition)从属一个broker,该broker被称为分区的首领(leader)。一个分区可以分配给多个broker,这个时候会发生分区复制。这种复制机制为分区提供了消息冗余,如果有一个broker失效,其他broker可以接管领导权。 + +
\ No newline at end of file diff --git "a/notes/Kafka\346\266\210\350\264\271\350\200\205\350\257\246\350\247\243.md" "b/notes/Kafka\346\266\210\350\264\271\350\200\205\350\257\246\350\247\243.md" new file mode 100644 index 00000000..e69de29b diff --git "a/notes/Kafka\347\224\237\344\272\247\350\200\205\350\257\246\350\247\243.md" "b/notes/Kafka\347\224\237\344\272\247\350\200\205\350\257\246\350\247\243.md" new file mode 100644 index 00000000..e69de29b diff --git "a/notes/Kafka\347\232\204\346\225\260\346\215\256\345\217\257\351\235\240\346\200\247.md" "b/notes/Kafka\347\232\204\346\225\260\346\215\256\345\217\257\351\235\240\346\200\247.md" new file mode 100644 index 00000000..e69de29b diff --git "a/notes/Storm\345\244\232\347\247\215\346\211\223\345\214\205\346\226\271\345\274\217\345\257\271\346\257\224\345\210\206\346\236\220.md" "b/notes/Storm\344\270\211\347\247\215\346\211\223\345\214\205\346\226\271\345\274\217\345\257\271\346\257\224\345\210\206\346\236\220.md" similarity index 83% rename from "notes/Storm\345\244\232\347\247\215\346\211\223\345\214\205\346\226\271\345\274\217\345\257\271\346\257\224\345\210\206\346\236\220.md" rename to "notes/Storm\344\270\211\347\247\215\346\211\223\345\214\205\346\226\271\345\274\217\345\257\271\346\257\224\345\210\206\346\236\220.md" index c74e8ed4..05a20a9c 100644 --- "a/notes/Storm\345\244\232\347\247\215\346\211\223\345\214\205\346\226\271\345\274\217\345\257\271\346\257\224\345\210\206\346\236\220.md" +++ "b/notes/Storm\344\270\211\347\247\215\346\211\223\345\214\205\346\226\271\345\274\217\345\257\271\346\257\224\345\210\206\346\236\220.md" @@ -1,8 +1,18 @@ -# Storm多种打包方式对比分析 +# Storm三种打包方式对比分析 + + + ## 一、简介 -在将Storm Topology提交到服务器集群进行运行时,需要先将项目进行打包,本文主要对比分析各种打包方式,并将打包过程中需要注意的事项进行说明。主要打包方式有以下三种: +在将Storm Topology提交到服务器集群进行运行时,需要先将项目进行打包。本文主要对比分析各种打包方式,并将打包过程中需要注意的事项进行说明。主要打包方式有以下三种: + 第一种:不加任何插件,直接使用mvn package打包; + 第二种:使用maven-assembly-plugin插件进行打包; @@ -30,7 +40,7 @@ + 如果第三方JAR包在远程中央仓库,可以使用`--artifacts` 指定,此时如果想要排除某些依赖,可以使用 `^` 符号; + 如果第三方JAR包在其他仓库,还需要使用 `--artifactRepositories`指明仓库地址,库名和地址使用 `^` 符号分隔。 -以下是包含上面三种情况的一个例子: +以下是包含上面三种情况的一个样例命令: ```shell ./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/" @@ -42,7 +52,7 @@ ### 3.1 官方文档说明 -maven-assembly-plugin是官方文档中介绍的打包方法,以下表述来源于官方文档:[Running Topologies on a Production Cluster](http://storm.apache.org/releases/2.0.0-SNAPSHOT/Running-topologies-on-a-production-cluster.html) +maven-assembly-plugin是官方文档中介绍的打包方法,来源于官方文档:[Running Topologies on a Production Cluster](http://storm.apache.org/releases/2.0.0-SNAPSHOT/Running-topologies-on-a-production-cluster.html) > If you're using Maven, the [Maven Assembly Plugin](http://maven.apache.org/plugins/maven-assembly-plugin/) can do the packaging for you. Just add this to your pom.xml: > @@ -73,9 +83,9 @@ maven-assembly-plugin的使用非常简单,只需要在POM.xml中引入即可 ### 3.2 排除Storm jars -这里说明一下,`jar-with-dependencies`是Maven官方内置的一种打包格式,Maven官方文档[Pre-defined Descriptor Files](http://maven.apache.org/plugins/maven-assembly-plugin/descriptor-refs.html)中有所说明: +`jar-with-dependencies`是Maven官方内置的一种打包格式,Maven官方文档[Pre-defined Descriptor Files](http://maven.apache.org/plugins/maven-assembly-plugin/descriptor-refs.html)中有所说明: -![jar-with-dependencies](D:\BigData-Notes\pictures\jar-with-dependencies.png) +
如果你想排除某个依赖,这里以排除`storm-core`为例,你可以在`jar-with-dependencies`的XML上进行修改。 @@ -167,7 +177,7 @@ assembly.xml文件内容如下: >在配置文件中不仅可以排除依赖,还可以排除指定的文件,更多的配置规则可以参考官方文档:[Descriptor Format](http://maven.apache.org/plugins/maven-assembly-plugin/assembly.html#) -#### 2. 打包命令 +#### 2. 打包命令 采用maven-assembly-plugin进行打包时命令如下: @@ -177,7 +187,7 @@ assembly.xml文件内容如下: 打包后会同时生成两个JAR包,其中后缀为`jar-with-dependencies`是含有第三方依赖的JAR包,后缀是由`assembly.xml`中``标签指定的,可以自定义修改。提交该JAR到集群环境即可直接使用。 -![storm-jar](D:\BigData-Notes\pictures\storm-jar.png) +
@@ -205,11 +215,11 @@ assembly.xml文件内容如下: RuntimeException异常。 -采用maven-shade-plugin有很多好处,比如你的工程依赖很多的JAR包,而被依赖的JAR又会依赖其他的JAR包,这样,当工程中依赖到不同的版本的 JAR时,并且JAR中具有相同名称的资源文件时,shade插件会尝试将所有资源文件打包在一起时,而不是和assembly一样执行覆盖操作。 +采用maven-shade-plugin打包有很多好处,比如你的工程依赖很多的JAR包,而被依赖的JAR又会依赖其他的JAR包,这样,当工程中依赖到不同的版本的 JAR时,并且JAR中具有相同名称的资源文件时,shade插件会尝试将所有资源文件打包在一起时,而不是和assembly一样执行覆盖操作。 ### 4.2 配置 -配置示例如下: +采用`maven-shade-plugin`进行打包时候,配置示例如下: ```xml @@ -260,9 +270,9 @@ RuntimeException异常。 ``` -配置说明: +以上配置示例来源于Storm在Github上的examples,这里做一下说明: -有些jar包生成时,会使用jarsigner生成文件签名(完成性校验),分为两个文件存放在META-INF目录下。 +在上面的配置中,排除了部分文件,这是因为有些JAR包生成时,会使用jarsigner生成文件签名(完成性校验),分为两个文件存放在META-INF目录下: + a signature file, with a .SF extension; + a signature block file, with a .DSA, .RSA, or .EC extension; @@ -279,7 +289,7 @@ RuntimeException异常。 打包后会生成两个JAR包,提交到服务器集群时使用非original开头的JAR. -![storm-jar2](D:\BigData-Notes\pictures\storm-jar2.png) +
## 五、结论 @@ -291,7 +301,7 @@ RuntimeException异常。 无论采用任何打包方式,都必须排除集群环境中已经提供的storm jars。这里比较典型的是storm-core,其在安装目录的lib目录下已经存在。 -![storm-lib](D:\BigData-Notes\pictures\storm-lib.png) +
@@ -306,4 +316,5 @@ Caused by: java.lang.RuntimeException: java.io.IOException: Found multiple defau ... 39 more ``` -![storm-jar-complie-error](D:\BigData-Notes\pictures\storm-jar-complie-error.png) \ No newline at end of file +
+ diff --git "a/notes/Storm\347\274\226\347\250\213\346\250\241\345\236\213\350\257\246\350\247\243.md" "b/notes/Storm\347\274\226\347\250\213\346\250\241\345\236\213\350\257\246\350\247\243.md" index 3c014013..89766b9e 100644 --- "a/notes/Storm\347\274\226\347\250\213\346\250\241\345\236\213\350\257\246\350\247\243.md" +++ "b/notes/Storm\347\274\226\347\250\213\346\250\241\345\236\213\350\257\246\350\247\243.md" @@ -11,12 +11,13 @@     4.2 BaseRichBolt抽象类
五、词频统计案例
六、提交到服务器集群运行
-七、通用打包方法
+七、关于项目打包的扩展说明
+ ## 一、简介 下图为Strom的运行流程图,也是storm的编程模型图,在storm 进行流处理时,我们需要自定义实现自己的spout(数据源)和bolt(处理单元),并通过`TopologyBuilder`将它们之间进行关联,定义好数据处理的流程。 @@ -219,6 +220,8 @@ public interface IRichBolt extends IBolt, IComponent {
+> 案例源码下载地址:[storm-word-count](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-word-count) + ### 5.2 代码实现 #### 1. 项目依赖 @@ -231,8 +234,6 @@ public interface IRichBolt extends IBolt, IComponent { ``` - - #### 2. DataSourceSpout ```java @@ -467,11 +468,11 @@ storm kill ClusterWordCountApp -w 3 -## 七、通用打包方法 +## 七、关于项目打包的扩展说明 -### 1. mvn package的局限性 +### mvn package的局限性 -上面我们没有在POM中配置任何插件,直接使用`mvn package`进行项目打包,这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包,就会出现问题,因为`package`打包后的JAR中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。 +在上面的步骤中,我们没有在POM中配置任何插件,直接使用`mvn package`进行项目打包,这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包,就会出现问题,因为`package`打包后的JAR中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。 这时候可能大家会有疑惑,在我们的项目中不是使用了`storm-core`这个依赖吗?其实上面之所以我们能运行成功,是因为在Storm的集群环境中提供了这个JAR包,在安装目录的lib目录下: @@ -500,104 +501,15 @@ private String productData() { } ``` -此时直接使用`mvn clean package`打包上传到服务器运行,就会抛出下图异常。 - -其实官方文档里面并没有推荐使用这种打包方法,而是网上很多词频统计的Demo使用了。所以在此说明一下:这种打包方式并不适用于实际的开发,因为实际开发中通常都是需要第三方的JAR包的。 +此时直接使用`mvn clean package`打包上传到服务器运行,就会抛出下图异常。所以在此说明一下:这种直接打包的方式并不适用于实际的开发,因为实际开发中通常都是需要第三方的JAR包的。
-### 2. 官方推荐的的打包方法 - ->If you're using Maven, the [Maven Assembly Plugin](http://maven.apache.org/plugins/maven-assembly-plugin/) can do the packaging for you. Just add this to your pom.xml: -> ->```xml -> -> maven-assembly-plugin -> -> -> jar-with-dependencies -> -> -> -> com.path.to.main.Class -> -> -> -> ->``` -> ->Then run mvn assembly:assembly to get an appropriately packaged jar. Make sure you [exclude](http://maven.apache.org/plugins/maven-assembly-plugin/examples/single/including-and-excluding-artifacts.html) the Storm jars since the cluster already has Storm on the classpath. - -其实就是两点: - -+ 使用maven-assembly-plugin进行打包,因为maven-assembly-plugin会把所有的依赖一并打包到最后的JAR中; -+ 排除掉Storm集群环境中已经提供的Storm jars。 - -按照官方文档的说明,修改我们的POM文件,如下: - -```xml - - - - maven-assembly-plugin - - - src/main/resources/assembly.xml - - - - com.heibaiying.wordcount.ClusterWordCountApp - - - - - - -``` - -其中`assembly.xml`的文件内容如下: - -```xml - - - jar-with-dependencies - - - - jar - - - false - - - / - true - true - runtime - - - org.apache.storm:storm-core - - - - -``` - -打包命令为: - -```shell -# mvn clean assembly:assembly -Dmaven.test.skip=true -``` - -打包后会同时生成两个JAR包,其中后缀为`jar-with-dependencies`是含有第三方依赖的JAR包,通过压缩工具可以看到内部已经打入了依赖包。另外后缀是由`assembly.xml`中``标签指定的,你可以自定义修改。提交该JAR到集群环境即可。 - -
- +想把依赖包一并打入最后的JAR中,maven提供了两个插件来实现,分别是`maven-assembly-plugin`和`maven-shade-plugin`。鉴于本篇文章篇幅已经比较长,且关于Storm打包还有很多需要说明的地方,所以关于Storm的打包方式单独整理至下一篇文章: +[Storm三种打包方式对比分析](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Storm三种打包方式对比分析.md) ## 参考资料 diff --git "a/notes/Storm\351\233\206\346\210\220HBase\345\222\214HDFS.md" "b/notes/Storm\351\233\206\346\210\220HBase\345\222\214HDFS.md" index d4247402..421ef0fe 100644 --- "a/notes/Storm\351\233\206\346\210\220HBase\345\222\214HDFS.md" +++ "b/notes/Storm\351\233\206\346\210\220HBase\345\222\214HDFS.md" @@ -1,9 +1,16 @@ # Storm集成HDFS和HBase + + ## 一、Storm集成HDFS ### 1.1 项目结构 +
+ > 本用例源码下载地址:[storm-hdfs-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-hdfs-integration) ### 1.2 项目主要依赖 @@ -211,7 +218,7 @@ hadoop fs -tail -f /strom-hdfs/文件名 -![storm-hdfs-result](D:\BigData-Notes\pictures\storm-hdfs-result.png) +
@@ -219,7 +226,11 @@ hadoop fs -tail -f /strom-hdfs/文件名 ### 2.1 项目结构 -本用例源码下载地址:[storm-hbase-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-hbase-integration) +集成用例: 进行词频统计并将最后的结果存储到HBase,项目主要结构如下: + +
+ +> 本用例源码下载地址:[storm-hbase-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-hbase-integration) ### 2.2 项目主要依赖 @@ -454,7 +465,7 @@ public class WordCountToHBaseApp { hbase > scan 'WordCount' ``` -![storm-hbase-result](D:\BigData-Notes\pictures\storm-hbase-result.png) +
@@ -470,3 +481,9 @@ SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withColumnFamily("cf"); ``` + + +## 参考资料 + +1. [Apache HDFS Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hdfs.html) +2. [Apache HBase Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hbase.html) \ No newline at end of file diff --git "a/notes/Storm\351\233\206\346\210\220Kakfa.md" "b/notes/Storm\351\233\206\346\210\220Kakfa.md" index 20d7ed03..679b902e 100644 --- "a/notes/Storm\351\233\206\346\210\220Kakfa.md" +++ "b/notes/Storm\351\233\206\346\210\220Kakfa.md" @@ -1,5 +1,12 @@ # Storm集成Kafka + + + ## 一、整合说明 Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下: @@ -13,7 +20,7 @@ Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下 ### 2.1 项目结构 - +
### 2.2 项目主要依赖 @@ -159,7 +166,7 @@ public class WritingToKafkaApp { ### 2.5 测试准备工作 -进行测试前需要启动Kakfa。 +进行测试前需要启动Kakfa: #### 1. 启动Kakfa @@ -173,7 +180,7 @@ bin/zkServer.sh start bin/zookeeper-server-start.sh config/zookeeper.properties ``` -启动单节点kafka: +启动单节点kafka用于测试: ```shell # bin/kafka-server-start.sh config/server.properties @@ -189,7 +196,9 @@ bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-fac bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092 ``` -#### 3. 启动一个消费者用于观察写入情况 +#### 3. 启动消费者 + + 启动一个消费者用于观察写入情况,启动命令如下: ```shell # bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic storm-topic --from-beginning @@ -205,7 +214,7 @@ bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-fac 启动后,消费者监听情况如下: -![strom-kafka-consumer](D:\BigData-Notes\pictures\strom-kafka-consumer.png) +
@@ -213,7 +222,7 @@ bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-fac ### 3.1 项目结构 - +
### 3.2 ReadingFromKafkaApp @@ -302,7 +311,9 @@ public class LogConsoleBolt extends BaseRichBolt { 这里从`value`字段中获取kafka输出的值数据。 -默认情况下,我们可以通过继承`RecordTranslator`接口定义了Kafka中Record与输出流之间的转换关系,可以在构建`KafkaSpoutConfig`的时候通过构造器或者`setRecordTranslator()`传入,并最后传递给具体的`KafkaSpout`。如果不指定的情况下,则默认使用内置的`DefaultRecordTranslator`,其源码如下,`FIELDS`中 定义了tuple中所有可用的字段: +在开发中,我们可以通过继承`RecordTranslator`接口定义了Kafka中Record与输出流之间的映射关系,可以在构建`KafkaSpoutConfig`的时候通过构造器或者`setRecordTranslator()`方法传入,并最后传递给具体的`KafkaSpout`。 + +默认情况下使用内置的`DefaultRecordTranslator`,其源码如下,`FIELDS`中 定义了tuple中所有可用的字段:主题,分区,偏移量,消息键,值。 ```java public class DefaultRecordTranslator implements RecordTranslator { @@ -337,8 +348,20 @@ public class DefaultRecordTranslator implements RecordTranslator { # bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic ``` -![storm-kafka-producer](D:\BigData-Notes\pictures\storm-kafka-producer.png) +
本地运行的项目接收到从Kafka发送过来的数据: -![storm-kafka-receiver](D:\BigData-Notes\pictures\storm-kafka-receiver.png) \ No newline at end of file +
+ + + +
+ +> 用例源码下载地址:[storm-kafka-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-kafka-integration) + + + +## 参考资料 + +1. [Storm Kafka Integration (0.10.x+)](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html) \ No newline at end of file diff --git "a/notes/Storm\351\233\206\346\210\220Redis\350\257\246\350\247\243.md" "b/notes/Storm\351\233\206\346\210\220Redis\350\257\246\350\247\243.md" index 9a9516c7..d13c477e 100644 --- "a/notes/Storm\351\233\206\346\210\220Redis\350\257\246\350\247\243.md" +++ "b/notes/Storm\351\233\206\346\210\220Redis\350\257\246\350\247\243.md" @@ -1,14 +1,16 @@ # Storm 集成 Redis 详解 -## 一、简介 + -storm-redis提供了Storm与Redis的集成支持,你只需要引入对应的依赖即可使用。Storm-redis使用Jedis为Redis客户端,提供了基本的Bolt实现, `RedisLookupBolt` and `RedisStoreBolt`。 -+ RedisLookupBolt:从Redis中查询数据; -+ RedisStoreBolt:存储数据到Redis; -+ RedisFilterBolt : 查询符合条件的数据; +## 一、简介 -`RedisLookupBolt`和`RedisStoreBolt`都继承自`AbstractRedisBolt`抽象类,我们也可以继承自该抽象类,然后按照我们自己的的业务逻辑进行功能的拓展。 +storm-redis提供了Storm与Redis的集成支持,你只需要引入对应的依赖即可使用。 ```xml @@ -19,10 +21,26 @@ storm-redis提供了Storm与Redis的集成支持,你只需要引入对应的 ``` +Storm-redis使用Jedis为Redis客户端,并提供了如下三个基本的Bolt实现: + ++ RedisLookupBolt:从Redis中查询数据; ++ RedisStoreBolt:存储数据到Redis; ++ RedisFilterBolt : 查询符合条件的数据; + +`RedisLookupBolt`、`RedisStoreBolt`、`RedisFilterBolt `均继承自`AbstractRedisBolt`抽象类。我们可以通过继承该抽象类,进行功能的拓展,实现自定义RedisBolt。 + + + ## 二、集成案例 ### 2.1 项目结构 +这里首先给出一个集成案例:进行词频统计并将最后的结果存储到Redis。项目结构如下: + +
+ +> 用例源码下载地址:[storm-redis-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-redis-integration) + ### 2.2 项目依赖 项目主要依赖如下: @@ -176,7 +194,7 @@ public class CountBolt extends BaseRichBolt { ### 2.6 WordCountStoreMapper -实现RedisStoreMapper,并定义定义tuple与Redis中数据的映射关系,Redis存储的是Key/Value键值对,并且支持多种数据结构,你需要指定tuple中的那个字段为key,那个字段为value,并且存储为什么数据结构。 +实现RedisStoreMapper接口,并定义tuple与Redis中数据的映射关系,Redis中存储的是Key/Value键值对,并且支持多种数据结构,你需要指定tuple中的哪个字段为key,哪个字段为value,并且存储到什么数据结构中。 ```java /** @@ -265,7 +283,7 @@ public class WordCountToRedisApp { 启动后,查看Redis中的数据: -![store-redis-manager](D:\BigData-Notes\pictures\store-redis-manager.png) +
@@ -273,11 +291,11 @@ public class WordCountToRedisApp { ### 3.1 AbstractRedisBolt -`RedisLookupBolt`和`RedisStoreBolt`都继承自`AbstractRedisBolt`抽象类,和我们自定义实现Bolt一样,`AbstractRedisBolt`间接继承自`BaseRichBolt`。 +`RedisLookupBolt`、`RedisStoreBolt`、`RedisFilterBolt `均继承自`AbstractRedisBolt`抽象类,和我们自定义实现Bolt一样,`AbstractRedisBolt`间接继承自`BaseRichBolt`。 -![storm-abstractRedisBolt](D:\BigData-Notes\pictures\storm-abstractRedisBolt.png) +
`AbstractRedisBolt`中比较重要的是prepare方法,在该方法中通过外部传入的jedis连接池配置( jedisPoolConfig/jedisClusterConfig) 创建用于管理Jedis实例的容器`JedisCommandsInstanceContainer`。 @@ -422,13 +440,13 @@ public class RedisStoreBolt extends AbstractRedisBolt { JedisCommands接口中定义了所有的 Redis 客户端命令,它有以下三个实现类,分别是Jedis、JedisCluster、ShardedJedis。Strom中主要使用前两种实现类,具体调用哪一个实现类来执行命令,由传入的是jedisPoolConfig还是jedisClusterConfig来决定。 -![storm-jedicCommands](D:\BigData-Notes\pictures\storm-jedicCommands.png) +
### 3.4 RedisMapper 和 TupleMapper RedisMapper 和 TupleMapper 定义了 tuple 和 Redis 中的数据如何进行映射转换。 -![storm-Redis-Mapper](D:\BigData-Notes\pictures\storm-Redis-Mapper.png) +
#### 1. TupleMapper @@ -517,7 +535,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { ## 四、自定义RedisBolt实现词频统计 -自定义RedisBolt:利用Redis中哈希结构的hincrby key field命令进行词频统计。在Redis中`hincrby`的执行效果如下,如果在执行时字段不存在,则在执行操作之前将值设置为0。通过这个命令可以非常轻松的实现词频统计功能。 +### 4.1 实现原理 + +自定义RedisBolt:主要利用Redis中哈希结构的hincrby key field命令进行词频统计。在Redis中`hincrby`的执行效果如下,如果在执行时字段不存在,则在执行操作之前将值设置为0。通过这个命令可以非常轻松的实现词频统计功能。 ```shell redis> HSET myhash field 5 @@ -531,7 +551,11 @@ redis> HINCRBY myhash field -10 redis> ``` -### 4.1 自定义RedisBolt的代码实现 +### 4.2 项目结构 + +
+ +### 4.3 自定义RedisBolt的代码实现 ```java /** @@ -581,7 +605,7 @@ public class RedisCountStoreBolt extends AbstractRedisBolt { } ``` -### 4.2 CustomRedisCountApp +### 4.4 CustomRedisCountApp ```java /** @@ -622,4 +646,10 @@ public class CustomRedisCountApp { } } } -``` \ No newline at end of file +``` + + + +## 参考资料 + +1. [Storm Redis Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html) \ No newline at end of file diff --git "a/notes/Zab\345\215\217\350\256\256\345\217\212\351\200\211\344\270\276\346\234\272\345\210\266.md" "b/notes/Zab\345\215\217\350\256\256\345\217\212\351\200\211\344\270\276\346\234\272\345\210\266.md" new file mode 100644 index 00000000..e69de29b diff --git "a/notes/Zookeeper\345\210\206\345\270\203\345\274\217\351\224\201\345\256\236\347\216\260\346\226\271\346\241\210.md" "b/notes/Zookeeper\345\210\206\345\270\203\345\274\217\351\224\201\345\256\236\347\216\260\346\226\271\346\241\210.md" new file mode 100644 index 00000000..e69de29b diff --git "a/notes/Zookeeper\347\256\200\344\273\213\345\217\212\346\240\270\345\277\203\346\246\202\345\277\265.md" "b/notes/Zookeeper\347\256\200\344\273\213\345\217\212\346\240\270\345\277\203\346\246\202\345\277\265.md" new file mode 100644 index 00000000..e69de29b diff --git a/pictures/CustomRedisCountApp.png b/pictures/CustomRedisCountApp.png new file mode 100644 index 00000000..e93e41c4 Binary files /dev/null and b/pictures/CustomRedisCountApp.png differ diff --git a/pictures/WordCountToHBaseApp.png b/pictures/WordCountToHBaseApp.png new file mode 100644 index 00000000..d75f8b4a Binary files /dev/null and b/pictures/WordCountToHBaseApp.png differ diff --git a/pictures/datasourcetohdfs.png b/pictures/datasourcetohdfs.png new file mode 100644 index 00000000..53860ae1 Binary files /dev/null and b/pictures/datasourcetohdfs.png differ diff --git a/pictures/kafka-cluster.png b/pictures/kafka-cluster.png new file mode 100644 index 00000000..19470311 Binary files /dev/null and b/pictures/kafka-cluster.png differ diff --git a/pictures/kafka-producer-consumer.png b/pictures/kafka-producer-consumer.png new file mode 100644 index 00000000..8a3e5ca9 Binary files /dev/null and b/pictures/kafka-producer-consumer.png differ diff --git a/pictures/kafka-send-messgaes.png b/pictures/kafka-send-messgaes.png new file mode 100644 index 00000000..ce2ec94d Binary files /dev/null and b/pictures/kafka-send-messgaes.png differ diff --git a/pictures/kafka-topic.png b/pictures/kafka-topic.png new file mode 100644 index 00000000..0873987f Binary files /dev/null and b/pictures/kafka-topic.png differ diff --git "a/pictures/kafka\346\266\210\350\264\271\350\200\205.png" "b/pictures/kafka\346\266\210\350\264\271\350\200\205.png" new file mode 100644 index 00000000..16fe2936 Binary files /dev/null and "b/pictures/kafka\346\266\210\350\264\271\350\200\205.png" differ diff --git a/pictures/readfromkafka.png b/pictures/readfromkafka.png new file mode 100644 index 00000000..3654bb4f Binary files /dev/null and b/pictures/readfromkafka.png differ diff --git a/pictures/storm-abstractRedisBolt.png b/pictures/storm-abstractRedisBolt.png index 62c15df4..3d33577b 100644 Binary files a/pictures/storm-abstractRedisBolt.png and b/pictures/storm-abstractRedisBolt.png differ diff --git a/pictures/storm-wordcounttoredis.png b/pictures/storm-wordcounttoredis.png new file mode 100644 index 00000000..0fc44a7a Binary files /dev/null and b/pictures/storm-wordcounttoredis.png differ diff --git a/pictures/writetokafka.png b/pictures/writetokafka.png new file mode 100644 index 00000000..1ab1d018 Binary files /dev/null and b/pictures/writetokafka.png differ