kafka实时计算出版效果.
springboot结合kafkaStream流推送消息Echart图形展示(3-4)
- 实现要求(学生名字,学生年龄,班级) (班级多少人,年龄分布人数,实时)
- 构建流式程序2个步骤(一个处理统计班级人数,一个统计年龄分布人数)
- 从studentMsg主题源获取数据,通过2个处理器处理输出到classCount和ageCount主题源中
- Echarts采取使用南丁玫瑰图
- 存放数据采取使用ConcurrentSkipListMap<>();
构建DAG图
构建一个普通的maven项目
1 | <dependencies> |
数据模拟
在producer中随机录入数据
1 | <script type="text/javascript"> |
构建一个模块编写流式程序
- 项目结构
- 构建必要的序列化器
- 创建流式程序选择必要的序列器
- 流式程序设置必要的序列器
项目结构
必要的序列化器代码
序列化器
1 | public class StudentSer implements Closeable, AutoCloseable, Serializer<Student>{ |
反序列化器
1 | public class StudentDser implements Closeable, AutoCloseable, Deserializer<Student> { |
序列化组合
1 | public class StudentSerde implements Serde<Student>{ |
流式计算程序代码
1 | public class ClassCountStream { |
操作步骤
- 启动zookeeper
- 启动kafka
- 启动计算程序
- 启动消费者 (消费者序列化分别为String和Long)
- 启动生产者
通过websocket推送至前台
- 推送详见前期代码
- 消费监听数据源需要更改value值,以及去重判断
消费者代码如下
1 | @Component |
最终结果如下: