kafka实时计算出版效果.
springboot结合kafkaStream流推送消息Echart图形展示(3-4)
- 实现要求(学生名字,学生年龄,班级) (班级多少人,年龄分布人数,实时)
 - 构建流式程序2个步骤(一个处理统计班级人数,一个统计年龄分布人数)
 - 从studentMsg主题源获取数据,通过2个处理器处理输出到classCount和ageCount主题源中
 - Echarts采取使用南丁玫瑰图
 - 存放数据采取使用ConcurrentSkipListMap<>();
 
构建DAG图

构建一个普通的maven项目
1  | <dependencies>  | 
数据模拟
在producer中随机录入数据
1  | <script type="text/javascript">  | 
构建一个模块编写流式程序
- 项目结构
 - 构建必要的序列化器
 - 创建流式程序选择必要的序列器
 - 流式程序设置必要的序列器
 
项目结构

必要的序列化器代码
序列化器
1  | public class StudentSer implements Closeable, AutoCloseable, Serializer<Student>{  | 
反序列化器
1  | public class StudentDser implements Closeable, AutoCloseable, Deserializer<Student> {  | 
序列化组合
1  | public class StudentSerde implements Serde<Student>{  | 
流式计算程序代码
1  | public class ClassCountStream {  | 
操作步骤
- 启动zookeeper
 - 启动kafka
 - 启动计算程序
 - 启动消费者 (消费者序列化分别为String和Long)
 - 启动生产者
 
通过websocket推送至前台
- 推送详见前期代码
 - 消费监听数据源需要更改value值,以及去重判断
 
消费者代码如下
1  | @Component  | 
最终结果如下:
