开发Kafka_Streams(3-4)

kafka实时计算出版效果.

springboot结合kafkaStream流推送消息Echart图形展示(3-4)

  1. 实现要求(学生名字,学生年龄,班级) (班级多少人,年龄分布人数,实时)
  2. 构建流式程序2个步骤(一个处理统计班级人数,一个统计年龄分布人数)
  3. 从studentMsg主题源获取数据,通过2个处理器处理输出到classCount和ageCount主题源中
  4. Echarts采取使用南丁玫瑰图
  5. 存放数据采取使用ConcurrentSkipListMap<>();

构建DAG图

IMAGE

构建一个普通的maven项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>

数据模拟

在producer中随机录入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<script type="text/javascript">
function randomData(){
var x = 100;
var y = 0;
var z = 10;
var studentAge = (parseInt(Math.random() * (x - y + 1) + y)).toString();
var studentClassNo = "00"+parseInt(Math.random() * (z - y + 1) + z);
$.ajax({
url:'${pageContext.request.contextPath}/mock',
type:'post',
dataType:'json',
data:{
studentName:'小小王',
studentAge:studentAge,
studentClassNo:studentClassNo
},
success:function(resp){
}
})
}

$(function(){

$("button").click(function(){
t = setInterval("randomData()", 1000);
})
})

构建一个模块编写流式程序

  1. 项目结构
  2. 构建必要的序列化器
  3. 创建流式程序选择必要的序列器
  4. 流式程序设置必要的序列器

项目结构

IMAGE

必要的序列化器代码

序列化器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class StudentSer implements Closeable, AutoCloseable, Serializer<Student>{

private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new Gson();

@Override
public void configure(Map<String, ?> map, boolean b) {
}

@Override
public byte[] serialize(String s, Student person) {
// Transform the Person object to String
String line = gson.toJson(person);
// Return the bytes from the String 'line'
return line.getBytes(CHARSET);
}

@Override
public void close() {

}
}

反序列化器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class StudentDser implements Closeable, AutoCloseable, Deserializer<Student>  {

private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new Gson();

@Override
public void configure(Map<String, ?> map, boolean b) {
}

@Override
public Student deserialize(String topic, byte[] bytes) {
try {
// Transform the bytes to String
String student = new String(bytes, CHARSET);
// Return the Person object created from the String 'person'
return gson.fromJson(student, Student.class);
} catch (Exception e) {
throw new IllegalArgumentException("Error reading bytes", e);
}
}

@Override
public void close() {

}

}

序列化组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class StudentSerde implements Serde<Student>{
private StudentSer serializer = new StudentSer();
private StudentDser deserializer = new StudentDser();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}

@Override
public void close() {
serializer.close();
deserializer.close();
}

@Override
public Serializer<Student> serializer() {
return serializer;
}

@Override
public Deserializer<Student> deserializer() {
return deserializer;
}

}

流式计算程序代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class ClassCountStream {
private static final Logger LOG = LoggerFactory.getLogger(ClassCountStream.class);

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Count-Kafka-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Here we set the Seder for the values that we are going to process.
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StudentSerde.class);
// 配置当前时间
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
// 加载配置
StreamsConfig streamsConfig = new StreamsConfig(props);
// 构建序列化器
// 默认的
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();


// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();
/**
* 统计班级人数
*/
// 声明键值
// 声明keyValueMapper创建谓词匹配对应的key值
//根据班级进行人数归类
KeyValueMapper<String, Student, String> classKey0 = (key, student) -> student.getStudentClassNo();
KStream<String, Student> ClassCountStream = streamsBuilder.stream("studentMsg");
KTable<String, Long> ClassCounTable = ClassCountStream.selectKey(classKey0)
.groupByKey()
.count();

//根据年龄筛选20-25岁的人均分布
KeyValueMapper<String, Student, String> classKey1 = (key, student) -> student.getStudentAge();
KStream<String, Student> AgeCountStream = ClassCountStream;
KTable<String, Long> AgeCountTable = AgeCountStream.filter((key,student) -> Integer.parseInt(student.getStudentAge()) > 0)
.selectKey(classKey1)
.groupByKey()
.count();

ClassCounTable.toStream().to("classCount", Produced.with(stringSerde, longSerde));
AgeCountTable.toStream().to("ageCount", Produced.with(stringSerde, longSerde));
// 开启流
final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Exception e) {
System.exit(1);
}
System.exit(0);

}

}

操作步骤

  1. 启动zookeeper
  2. 启动kafka
  3. 启动计算程序
  4. 启动消费者 (消费者序列化分别为String和Long)
  5. 启动生产者

通过websocket推送至前台

  1. 推送详见前期代码
  2. 消费监听数据源需要更改value值,以及去重判断

消费者代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Component
public class KafkaConsumer {
private static final Logger LOG = LogManager.getLogger(KafkaConsumer.class);
private static ConcurrentSkipListMap<String, Long> rs2 = new ConcurrentSkipListMap<>();
private static ConcurrentSkipListMap<String, Object> rs3 = new ConcurrentSkipListMap<>();
private static List<ClassMsg> rs = new ArrayList<>();

private static ConcurrentSkipListMap<String, Object> rs6 = new ConcurrentSkipListMap<>();
private static ConcurrentSkipListMap<String, Long> rs4 = new ConcurrentSkipListMap<>();
private static List<String> rs001 =new ArrayList<>();
private static List<Long> rs002 = new ArrayList<>();

// @KafkaListener(topics = {"studentMsg"})
// public void receiveDate(ConsumerRecord<?, ?> record) {
// System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
// try {
// WebSocketServer.sendInfo(record.value().toString(),"all");
// } catch (IOException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }

@KafkaListener(topics = { "classCount" })
public void receiveClassCount(ConsumerRecord<?, ?> record) {
String classname = "班级" + record.key();
Long classCountLong = (Long) record.value();
rs2.put(classname, classCountLong);
rs3.put("dataType", 1);
rs3.put("data1", rs2.keySet());
Set<Map.Entry<String, Long>> entryseSet = rs2.entrySet();
for (Map.Entry<String, Long> entry : entryseSet) {
ClassMsg cms = new ClassMsg();
cms.setName(entry.getKey());
cms.setValue(entry.getValue());
if (!rs.contains(cms)) {
rs.add(cms);
} else {
rs.get(rs.indexOf(cms)).setValue(cms.getValue());
}
}
rs3.put("data2", rs);
try {
WebSocketServer.sendInfo(JSON.toJSONString(rs3), "all");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

@KafkaListener(topics = { "ageCount" })
public void receiveClassCount1(ConsumerRecord<?, ?> record) {
rs001.clear();
rs002.clear();
String agename = "年龄" + record.key();
Long classCountLong = (Long) record.value();
rs6.put("dataType", 2);
rs4.put(agename,classCountLong);
Set<Map.Entry<String, Long>> entryseSet = rs4.entrySet();
for (Map.Entry<String, Long> entry : entryseSet) {
rs001.add(entry.getKey());
rs002.add(entry.getValue());
}
rs6.put("data1",rs001 );
rs6.put("data2", rs002);
try {
WebSocketServer.sendInfo(JSON.toJSONString(rs6), "all");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

最终结果如下:

IMAGE