开发Kafka_Streams晋阶之路(归约和聚合)

归约比聚合相对容易,但是聚合可以做更多的事情

知识点

  1. 归约和聚合概念
  2. 解决实际需求
  3. 如何归约
  4. 如何聚合

归约和聚合概念

1.归约即是reduce,代表着累加求和,无论是在现在的python已经jdk新特性中都有体现

2.聚合即是aggregate,代表着聚集合并在一起的操作,一般来说归约和聚合在一起是一种完美的搭配

相对参考的文章

解决实际需求

  1. 比如股票在不断的交易。累计计算从开始成交的总和
  2. 总和有的情况下,获取交易量前5.(按照成交量决定)

如何归约(代码示例)reduce

构建一个普通的用户交易记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class StockTransaction {
//股票标记
private String symbol;
//股票领域
private String sector;
//股票分类
private String industry;
//成交量
private int shares;
//成交价格
private double sharePrice;
//用户编号
private String customerId;
//交易时间
private Date transactionTimestamp;
//是否成交
private boolean purchase;
}

构建一个提取交易量的记录类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ShareVolume {
//股票标记
private String symbol;
//成交量
private int shares;
//股票领域
private String industry;
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
public int getShares() {
return shares;
}
public void setShares(int shares) {
this.shares = shares;
}
public String getIndustry() {
return industry;
}
public void setIndustry(String industry) {
this.industry = industry;
}
public ShareVolume() {
// TODO Auto-generated constructor stub
}
public ShareVolume(String symbol, int shares, String industry) {
super();
this.symbol = symbol;
this.shares = shares;
this.industry = industry;
}

//构建一个shareVolume对象
public static ShareVolume buildInstance(StockTransaction st) {
ShareVolume sv = new ShareVolume(st.getSymbol(), st.getShares(), st.getIndustry());
return sv;
}

//构建一个统计总和对象
public static ShareVolume sum(ShareVolume s1,ShareVolume s2) {
ShareVolume sv = new ShareVolume();
sv.setIndustry(s2.getIndustry());
sv.setSymbol(s1.getSymbol());
sv.setShares(s1.getShares()+s2.getShares());
return sv;
}
}

分别构建对应的序列化器

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ShareVolumeSerde  extends WrapperSerde<ShareVolume> {

public ShareVolumeSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(ShareVolume.class));
}
}
-------------------
public class StockTransactionSerde extends WrapperSerde<StockTransaction> {

public StockTransactionSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(StockTransaction.class));
}
}

构建流程序

先看下reduce接口

1
2
3
4
5
6
7
8
9
10
11
12
public interface Reducer<V> {

/**
* 合二为一
* Aggregate the two given values into a single one.
*
* @param value1 the first value for the aggregation
* @param value2 the second value for the aggregation
* @return the aggregated value
*/
V apply(final V value1, final V value2);
}

流程序代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class RG_Stream {

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-aggregations");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KTable-aggregations-id");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "KTable-aggregations-client");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "30000");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new StockTransactionSerde().getClass().getName());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

/**
* 构建流
*/
StreamsConfig streamsConfig = new StreamsConfig(props);
// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();

KeyValueMapper<String, ShareVolume, String> classKey1 = (key, sharevolume) -> sharevolume.getSymbol();
KStream<String, ShareVolume> shareStream = streamsBuilder.stream("STTOPIC",Consumed.with(Serdes.String(),new StockTransactionSerde()))
.mapValues(st ->ShareVolume.buildInstance(st));

shareStream.print(Printed.<String, ShareVolume>toSysOut().withLabel("股票信息"));
KTable<String, ShareVolume> shareKTable = shareStream
.selectKey(classKey1)
.groupByKey(Serialized.with(Serdes.String(), new ShareVolumeSerde()))
.reduce(ShareVolume::sum);
shareKTable.toStream().print(Printed.<String, ShareVolume>toSysOut().withLabel("股票成交量总量变更"));
// 开启流
final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Exception e) {
System.exit(1);
}
System.exit(0);

}


}

注意:在流内部设置从最先开始消费,这个配置大于初始配置。

代码测试

  1. 开起zookeeper
  2. 开起kafka
  3. 创建主题”STTOPIC”
  4. 启动流程序
  5. 模拟数据发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TestProducer2 {
public static void main(String[] args) {
//模拟数据
StockTransaction record = new StockTransaction();
record.setSymbol("好当家");
record.setSector("食品");
record.setIndustry("生产类");
record.setShares(1000);
record.setSharePrice(5.00);
record.setCustomerId("001");
record.setTransactionTimestamp(new Date());
record.setPurchase(true);

//配置生产者
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "1");
properties.put("retries", "3");
properties.put("compression.type", "snappy");
KafkaProducer<String, StockTransaction> kp = new KafkaProducer<String,StockTransaction>(properties,new StringSerializer(),new JsonSerializer<StockTransaction>());
kp.send(new ProducerRecord<String, StockTransaction>("STTOPIC", record));
kp.close();
}
}

结论如下:

IMAGE

即使是断掉程序,在重启程序后,计算依然保持从头开始

如何聚合(代码示例)aggregate

在聚合之前,归约是聚合的一种形式。归约操作是将产生相同类型的对象,聚合也是对结果求和。但是可以返回不同的类型

现在有一个需求,获取股票交易量前5的,降序产生

构建一个降序的优先级队列并构建序列化器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FixedSizePriorityQueue<T> {


private TreeSet<T> inner;

private int maxSize;

public FixedSizePriorityQueue(Comparator<T> comparator, int maxSize) {
this.inner = new TreeSet<>(comparator);
this.maxSize = maxSize;
}

public FixedSizePriorityQueue<T> add(T element) {
inner.add(element);
if (inner.size() > maxSize) {
inner.pollLast();
}
return this;
}

public FixedSizePriorityQueue<T> remove(T element) {
if (inner.contains(element)) {
inner.remove(element);
}
return this;
}

public Iterator<T> iterator() {
return inner.iterator();
}
}

改进流式程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class RG_Stream {

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-aggregation");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KTable-aggregations-i");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "KTable-aggregations-clien");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "30000");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new StockTransactionSerde().getClass().getName());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

/**
* 构建流
*/
StreamsConfig streamsConfig = new StreamsConfig(props);
// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();

/**
* 使用reduce
*/
KeyValueMapper<String, ShareVolume, String> classKey1 = (key, sharevolume) -> sharevolume.getSymbol();
KStream<String, ShareVolume> shareStream = streamsBuilder.stream("ST2",Consumed.with(Serdes.String(),new StockTransactionSerde()))
.mapValues(st ->ShareVolume.buildInstance(st));

shareStream.print(Printed.<String, ShareVolume>toSysOut().withLabel("股票信息"));
KTable<String, ShareVolume> shareKTable = shareStream
.selectKey(classKey1)
.groupByKey(Serialized.with(Serdes.String(), new ShareVolumeSerde()))
.reduce(ShareVolume::sum);
shareKTable.toStream().print(Printed.<String, ShareVolume>toSysOut().withLabel("股票成交量总量变更"));

/**
* 使用aggreate
*/
Comparator<ShareVolume> comparator = (s1,s2) -> s2.getShares()-s1.getShares();
FixedSizePriorityQueue<ShareVolume> fixedQueue = new FixedSizePriorityQueue<>(comparator, 5);
ValueMapper<FixedSizePriorityQueue, String> valueMapper =
fpq ->{
StringBuilder builder = new StringBuilder();
Iterator<ShareVolume> iterator = fpq.iterator();
int counter= 1;
while (iterator.hasNext()) {
ShareVolume stockVolume = iterator.next();
if (stockVolume != null) {
builder.append(counter++).append(")").append(stockVolume.getSymbol())
.append(":").append(NumberFormat.getInstance().format(stockVolume.getShares())).append(" ");
}
}
return builder.toString();
};


//pair重新生成 new key-value pair
KTable<String, String> fixKTable = shareKTable.groupBy((k, v) -> KeyValue.pair(v.getIndustry(), v), Serialized.with(Serdes.String(), new ShareVolumeSerde()))
.aggregate(() -> fixedQueue,
(k, v, agg) -> agg.add(v),
(k, v, agg) -> agg.remove(v),
Materialized.with(Serdes.String(), new FixedSizePriorityQueueSerde()))
.mapValues(valueMapper);

//peek可用于作日志记录 .peek((k,v) -> System.out.println("key值"+k+"------value"+v));
fixKTable.toStream()
.print(Printed.<String, String>toSysOut().withLabel("股票信息"));



// 开启流
final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Exception e) {
System.exit(1);
}
System.exit(0);

}


}

额外附属的操作(添加对复杂对象的序列化适配器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class FixedSizePriorityQueueAdapter extends TypeAdapter<FixedSizePriorityQueue<ShareVolume>> {

private Gson gson = new Gson();

/**
* 需要被序列化的对象
*/
@Override
public void write(JsonWriter writer, FixedSizePriorityQueue<ShareVolume> value) throws IOException {

if (value == null) {
writer.nullValue();
return;
}


Iterator<ShareVolume> iterator = value.iterator();
List<ShareVolume> list = new ArrayList<>();
while (iterator.hasNext()) {
ShareVolume stockTransaction = iterator.next();
if (stockTransaction != null) {
list.add(stockTransaction);
}
}
writer.beginArray();
for (ShareVolume transaction : list) {
writer.value(gson.toJson(transaction));
}
writer.endArray();
}

/**
* 反序列化实例
*/
@Override
public FixedSizePriorityQueue<ShareVolume> read(JsonReader reader) throws IOException {
List<ShareVolume> list = new ArrayList<>();
reader.beginArray();
while (reader.hasNext()) {
list.add(gson.fromJson(reader.nextString(), ShareVolume.class));
}
reader.endArray();

Comparator<ShareVolume> c = (c1, c2) -> c2.getShares() - c1.getShares();
FixedSizePriorityQueue<ShareVolume> fixedSizePriorityQueue = new FixedSizePriorityQueue<>(c, 5);

for (ShareVolume transaction : list) {
fixedSizePriorityQueue.add(transaction);
}

return fixedSizePriorityQueue;
}
}

注册到反序列化器中(修订)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson;
private Class<T> deserializedClass;
private Type reflectionTypeToken;

public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
init();

}

public JsonDeserializer(Type reflectionTypeToken) {
this.reflectionTypeToken = reflectionTypeToken;
init();
}

private void init () {
GsonBuilder builder = new GsonBuilder();
builder.registerTypeAdapter(FixedSizePriorityQueue.class, new FixedSizePriorityQueueAdapter().nullSafe());
gson = builder.create();
}

public JsonDeserializer() {
}

@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}

@Override
public T deserialize(String s, byte[] bytes) {
if(bytes == null){
return null;
}

Type deserializeFrom = deserializedClass != null ? deserializedClass : reflectionTypeToken;

return gson.fromJson(new String(bytes),deserializeFrom);

}

@Override
public void close() {

}

}
}

FixedSizePriorityQueueSerde修订

1
2
3
4
5
6
public class FixedSizePriorityQueueSerde extends WrapperSerde<FixedSizePriorityQueue>{

public FixedSizePriorityQueueSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(new TypeToken<FixedSizePriorityQueue<ShareVolume>>() {}.getType()));
}
}

注意: ShareVolume同时实现compareble

后续代码改进:可以考虑修订comparetor

测试代码通过

IMAGE

关机重启

IMAGE

数据计算保持一致