归约比聚合相对容易,但是聚合可以做更多的事情
知识点
- 归约和聚合概念
- 解决实际需求
- 如何归约
- 如何聚合
归约和聚合概念
1.归约即是reduce,代表着累加求和,无论是在现在的python已经jdk新特性中都有体现
2.聚合即是aggregate,代表着聚集合并在一起的操作,一般来说归约和聚合在一起是一种完美的搭配
解决实际需求
- 比如股票在不断的交易。累计计算从开始成交的总和
- 总和有的情况下,获取交易量前5.(按照成交量决定)
如何归约(代码示例)reduce
构建一个普通的用户交易记录
1 | public class StockTransaction { |
构建一个提取交易量的记录类
1 | public class ShareVolume { |
分别构建对应的序列化器
1 | public class ShareVolumeSerde extends WrapperSerde<ShareVolume> { |
构建流程序
先看下reduce接口
1 | public interface Reducer<V> { |
流程序代码示例
1 | public class RG_Stream { |
注意:在流内部设置从最先开始消费,这个配置大于初始配置。
代码测试
- 开起zookeeper
- 开起kafka
- 创建主题”STTOPIC”
- 启动流程序
- 模拟数据发送
1 | public class TestProducer2 { |
结论如下:
即使是断掉程序,在重启程序后,计算依然保持从头开始
如何聚合(代码示例)aggregate
在聚合之前,归约是聚合的一种形式。归约操作是将产生相同类型的对象,聚合也是对结果求和。但是可以返回不同的类型
现在有一个需求,获取股票交易量前5的,降序产生
构建一个降序的优先级队列并构建序列化器
1 | public class FixedSizePriorityQueue<T> { |
改进流式程序
1 | public class RG_Stream { |
额外附属的操作(添加对复杂对象的序列化适配器)
1 | public class FixedSizePriorityQueueAdapter extends TypeAdapter<FixedSizePriorityQueue<ShareVolume>> { |
注册到反序列化器中(修订)
1 | public class JsonDeserializer<T> implements Deserializer<T> { |
FixedSizePriorityQueueSerde修订
1 | public class FixedSizePriorityQueueSerde extends WrapperSerde<FixedSizePriorityQueue>{ |
注意: ShareVolume同时实现compareble
后续代码改进:可以考虑修订comparetor
测试代码通过
关机重启
数据计算保持一致