Storm 集成 Redis 详解
一、简介
Storm-Redis 提供了 Storm 与 Redis 的集成支持,你只需要引入对应的依赖即可使用:
1 | <dependency> |
Storm-Redis 使用 Jedis 为 Redis 客户端,并提供了如下三个基本的 Bolt 实现:
- RedisLookupBolt:从 Redis 中查询数据;
- RedisStoreBolt:存储数据到 Redis;
- RedisFilterBolt : 查询符合条件的数据;
RedisLookupBolt
、RedisStoreBolt
、RedisFilterBolt
均继承自 AbstractRedisBolt
抽象类。我们可以通过继承该抽象类,实现自定义 RedisBolt,进行功能的拓展。
二、集成案例
2.1 项目结构
这里首先给出一个集成案例:进行词频统计并将最后的结果存储到 Redis。项目结构如下:
用例源码下载地址:storm-redis-integration
2.2 项目依赖
项目主要依赖如下:
1 | <properties> |
2.3 DataSourceSpout
1 | /** |
产生的模拟数据格式如下:
1 | Spark HBase |
2.4 SplitBolt
1 | /** |
2.5 CountBolt
1 | /** |
2.6 WordCountStoreMapper
实现 RedisStoreMapper 接口,定义 tuple 与 Redis 中数据的映射关系:即需要指定 tuple 中的哪个字段为 key,哪个字段为 value,并且存储到 Redis 的何种数据结构中。
1 | /** |
2.7 WordCountToRedisApp
1 | /** |
2.8 启动测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin
进行打包,打包命令如下:
1 | mvn clean package -D maven.test.skip=true |
启动后,查看 Redis 中的数据:
三、storm-redis 实现原理
3.1 AbstractRedisBolt
RedisLookupBolt
、RedisStoreBolt
、RedisFilterBolt
均继承自 AbstractRedisBolt
抽象类,和我们自定义实现 Bolt 一样,AbstractRedisBolt
间接继承自 BaseRichBolt
。
AbstractRedisBolt
中比较重要的是 prepare 方法,在该方法中通过外部传入的 jedis 连接池配置 ( jedisPoolConfig/jedisClusterConfig) 创建用于管理 Jedis 实例的容器 JedisCommandsInstanceContainer
。
1 | public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt { |
JedisCommandsInstanceContainer
的 build()
方法如下,实际上就是创建 JedisPool 或 JedisCluster 并传入容器中。
1 | public static JedisCommandsInstanceContainer build(JedisPoolConfig config) { |
3.2 RedisStoreBolt和RedisLookupBolt
RedisStoreBolt
中比较重要的是 process 方法,该方法主要从 storeMapper 中获取传入 key/value 的值,并按照其存储类型 dataType
调用 jedisCommand 的对应方法进行存储。
RedisLookupBolt 的实现基本类似,从 lookupMapper 中获取传入的 key 值,并进行查询操作。
1 | public class RedisStoreBolt extends AbstractRedisBolt { |
3.3 JedisCommands
JedisCommands 接口中定义了所有的 Redis 客户端命令,它有以下三个实现类,分别是 Jedis、JedisCluster、ShardedJedis。Strom 中主要使用前两种实现类,具体调用哪一个实现类来执行命令,由传入的是 jedisPoolConfig 还是 jedisClusterConfig 来决定。
3.4 RedisMapper 和 TupleMapper
RedisMapper 和 TupleMapper 定义了 tuple 和 Redis 中的数据如何进行映射转换。
1. TupleMapper
TupleMapper 主要定义了两个方法:
getKeyFromTuple(ITuple tuple): 从 tuple 中获取那个字段作为 Key;
getValueFromTuple(ITuple tuple):从 tuple 中获取那个字段作为 Value;
2. RedisMapper
定义了获取数据类型的方法 getDataTypeDescription()
,RedisDataTypeDescription 中 RedisDataType 枚举类定义了所有可用的 Redis 数据类型:
1 | public class RedisDataTypeDescription implements Serializable { |
3. RedisStoreMapper
RedisStoreMapper 继承 TupleMapper 和 RedisMapper 接口,用于数据存储时,没有定义额外方法。
4. RedisLookupMapper
RedisLookupMapper 继承 TupleMapper 和 RedisMapper 接口:
- 定义了 declareOutputFields 方法,声明输出的字段。
- 定义了 toTuple 方法,将查询结果组装为 Storm 的 Values 的集合,并用于发送。
下面的例子表示从输入 Tuple
的获取 word
字段作为 key,使用 RedisLookupBolt
进行查询后,将 key 和查询结果 value 组装为 values 并发送到下一个处理单元。
1 | class WordCountRedisLookupMapper implements RedisLookupMapper { |
5. RedisFilterMapper
RedisFilterMapper 继承 TupleMapper 和 RedisMapper 接口,用于查询数据时,定义了 declareOutputFields 方法,声明输出的字段。如下面的实现:
1 |
|
四、自定义RedisBolt实现词频统计
4.1 实现原理
自定义 RedisBolt:主要利用 Redis 中哈希结构的 hincrby key field
命令进行词频统计。在 Redis 中 hincrby
的执行效果如下。hincrby 可以将字段按照指定的值进行递增,如果该字段不存在的话,还会新建该字段,并赋值为 0。通过这个命令可以非常轻松的实现词频统计功能。
1 | HSET myhash field 5 |
4.2 项目结构
4.3 自定义RedisBolt的代码实现
1 | /** |
4.4 CustomRedisCountApp
1 | /** |