数据本来平淡无奇,一旦有了思想,数据变成为了有想法的一种境界状态
知识点
- 状态在流中的重要性
- 状态存储阅读历史,展望未来
- 分区一致.数据正确
- 状态存储的选择前置条件
- 使用状态存储
- 代码示例
状态在流中的重要性
- 什么是状态?
状态是人赋予的主观意识判断。本身数据是无状态的。举个例子:股票的买卖交易,每天的交易买入卖出是一件很正常的事情。
比如:9点30分买入1000只股票。10点买入1000只股票。 这个数据流现在无状态的。
但是如果在10点30分的时候如果该只股票出现了重大利好重组交易,数据从无变为了有状态。而这时候我们的主观意识已经在判断,是否前面这个买卖交易是否正常,这也可以说明这个是我们的价值观
- 流是否需要状态?
一个事件本身产生的流没有特别的特殊性,但是如果产生一些额外的上下文,那么可能会错过一些关键的机会,甚至于你可能会以全新的视角来看待这个事情.
很多时候流式处理意味着:彼此之间没有关联。源源不断的数据,当发生时就一定要加以时间处理。而状态的概念可能会产生静态的资源映像,比如对应到数据库中。
数据流的变化速度往往比数据表更新的更快更频繁。有些情况下,离散的数据已经携带了足够多的数据。但是通常情况下,数据流需要从某类存储的数据来加以丰富。
将状态操作应用到kafka_Stream中
在前期的奖励节点中.对于每次客户的消费累计的奖励点数未做任何要求,如果假定现在用户的每一次消费达到一点奖励点数,我们会附上额外的动作,也是将本来不带状态的值,变为有状态的意义
1 | public class RewardAccumulator { |
转变为
1 | public class RewardAccumulator { |
注意:设置奖励点的总数代码为:
1 | public void addRewardPoints(int previousTotalPoints) { |
状态存储阅读历史,展望未来
如何进行存储
- 在kafkaStream中提供了一个基本的有状态的函数 transformvalues()
- 提供一个值转换器,转换器接口为ValueTransformer<V,R>,设置状态储存
构建一个值转换器代码示例
1 | public class PurchaseRewardTransformer implements ValueTransformer<PurchaseRecord, RewardAccumulator>{ |
分区一致,数据正确
在kafkaStreams中在没有指定分区的时候,是按照轮询进行分区。而分区中有着对应的StreamTask.每个Task中有自己不同状态存储.对于上面我们使用存储,有可能客户的交易信息不会在同一个分区中.那么这个时候只有指定到相同的分区进行解决
使用流分区器解决
- 通过kafkaStream中的through可以创建一个中间主题,达到无缝分区
- 通过自定义分区器,解决数据分配到不同地方的问题
自定义流分区器如下:
1 | public class RewardsStreamPartitioner implements StreamPartitioner<String, PurchaseRecord>{ |
状态存储的选择前置条件
- 数据本地化
- 故障恢复和容错
数据本地化
数据本地化对性能至关重要,通过一个流式程序处理百万级以上的数据,即使很小的网络延迟也会产生巨大的影响
流式程序尽管需要状态但是不是绝对的必要,但是应该设计在本地。应用程序的每个服务器和节点都应该有一个单独的数据存储
进程和线程之间不共享,即便一个进程失败,但不会影响其它的进程和线程
故障恢复和容错
在kafkaStreams中每个处理器都它的本地存储和一个用于备份状态存储的变更日志主题
使用状态存储
- kafka添加状态存储使用stores类静态工厂创建storesupplier实例.
- 用于定制存储的附加类使用Meterialized(计量类)以及StoreBuilder类,高阶推荐用前者,低阶推荐后者。
- 除了本身这2个类之外,还分别提供了persistentKeyValueStore和lruMap,persistentWindowStore,persistentSeesionStore
状态存储容错以及改变日志主题
状态存储容错
- 所有的stateStoreSupplier默认都启用了日志,日志即是一个主题,该主题的作用是变更日志用来被封存储中的值,提供容错
配置变更日志主题
- 用于状态的变更日志采用压缩策略的主题.可以使用withLoggingEnabled进行配置
代码示例
注意:重写奖励类的equals方法
1 | public class StoreStream { |
代码测试观察结果
- 启动zookeeper
- 启动kafka
- 启动流
- 模拟数据
数据结果如下:(包含服务器断掉重连,依旧保持先前的记录)