开发Kafka_Streams晋阶之路(KTable)

只有清晰的明白流和表,你才有可能明白聚合以及开窗

知识要点

  1. 流与表的关系
  2. 记录流概念
  3. 更新记录和日志
  4. 2者之间的对比
  5. 工作原理

流与表的关系

在生活中,我们无时无刻都在产生一些事件,每个事件上面都可以看成在历史的记录中不断的添加一笔新的操作.而这些记录与其他的记录无关,都是独立的。

记录流的概念

流被定义为无限的事件序列

例如股票市场中,每只股票的报价都是一个离散时间,它们彼此之间没有任何关联。即使一家公司股票有多次报价。在某个时候我们称作为记录流,如图:

IMAGE

每个时间就是一个插入项,为表中每个插入项建立一个地增量为1的key

更定记录和变更日志

如果讲事件流看成是一个日志,更新流可以看成是一个不断在变更的日志。

如上图所示,如果以股票名字作为主键。那么动作发生将是更新操作。

注意:日志和变更日志都是讲记录追加到文件末尾,在日志中可以看到所有的记录.但是在变更日志中,对任何一个给定键只保留最新记录。

对于变更日志和更新流来说,我们用KTable进行抽象的表现与描述

2者之间的对比

通过代码我们来进行呈现说明

构建股票信息类StockMsg

1
2
3
4
5
6
public class StockMsg {
//股票价格
private double stockPrice;
//股票名字
private String stockName;
}

改进通用化序列器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class WrapperSerde<T>  implements  Serde<T>{

private JsonSerializer<T> serializer;
private JsonDeserializer<T> deserializer;



public WrapperSerde() {
super();
// TODO Auto-generated constructor stub
}

public WrapperSerde(JsonSerializer<T> serializer, JsonDeserializer<T> deserializer) {
this.serializer = serializer;
this.deserializer = deserializer;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public Serializer<T> serializer() {
// TODO Auto-generated method stub
return serializer;
}

@Override
public Deserializer<T> deserializer() {
// TODO Auto-generated method stub
return deserializer;
}

}

类序列器的构建

1
2
3
4
5
6
public class StockSerde extends WrapperSerde<StockMsg>{

public StockSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(StockMsg.class));
}
}

构建流式程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class KTVSKS_Stream {
public static void main(String[] args) {
Properties props = new Properties();
// stream流的名字
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "KStreamVSKTable_app");
// 消费者组名字
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KStreamVSKTable_group");
// 消费者名字
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "KStreamVSKTable_client");
// 每次消费最新的数据
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 自动提交偏移
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "30000");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "15000");
// 服务器地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 并行的线程数
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new StockSerde().getClass().getName());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
/**
* 构建流
*/
StreamsConfig streamsConfig = new StreamsConfig(props);
// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();
// 分别创建KStream和KTable实例并打印
KeyValueMapper<String, StockMsg, String> classKey1 = (key, stock) -> stock.getStockName();
KStream<String, StockMsg> stockStream = streamsBuilder.stream("STOCKTOPIC");
KStream<String, StockMsg> ssKStream = stockStream.selectKey(classKey1);
ssKStream.to("STOCKTOPICTABLE");


ssKStream.print(Printed.<String, StockMsg>toSysOut().withLabel("股市交易流"));

KTable<String, StockMsg> stocKTable = streamsBuilder.table("STOCKTOPICTABLE");


stocKTable.toStream().print(Printed.<String, StockMsg>toSysOut().withLabel("股市交易表"));

// 开启流
final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Exception e) {
System.exit(1);
}
System.exit(0);

}
}

模拟生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
//模拟数据
StockMsg record = new StockMsg();
record.setStockName("好当家");
record.setStockPrice(3.09);
//配置生产者
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "1");
properties.put("retries", "3");
properties.put("compression.type", "snappy");
KafkaProducer<String, StockMsg> kp = new KafkaProducer<String,StockMsg>(properties,new StringSerializer(),new JsonSerializer<StockMsg>());
kp.send(new ProducerRecord<String, StockMsg>("STOCKTOPIC", record));
kp.send(new ProducerRecord<String, StockMsg>("STOCKTOPICTABLE", record));
kp.close();
}

测试

  1. 启动zookeeper
  2. 启动kafka
  3. 创建主题–STOCKTOPIC 和 STOCKTOPICTABLE
  4. 启动流程序 (需要指定key值)
  5. 模拟数据发送

IMAGE

注意:表始终在更新以最新的标准被基准,当然前提是流有进行过选择对应的key值

工作原理

1.在创建KTable的时候,同时在后台创建了一个追踪流状态的状态存储,从而创建了一个更新流。创建后会有一个内容名称,但是 不能显式的进行交互式访问。但是KTable通过使用Materialized(计量)可以进行显式的查询

2.KTable何时进行更新,并发往下游处理器.

因素

  1. 较高的数据流入速率将增加发送更新记录的频率
  2. 不同键越多
  3. 通过配置cache.max.bytes.buffering以及commit.intrval.ms达到更新的设置

cache.max.bytes.buffering设置缓存缓冲大小

设置该缓存用于删除具有相同键重复的更新记录。使用持久化存储时就可以显著提升性能

commit.intrval.ms设置提交时间间隔

提交间隔参数用来指定保存数据的频率,它会强制刷新,将最新的记录更新,并发送到下游

注意:默认的提交时间是30秒以及默认10M缓存,当然在上线之前,肯定要平衡大小和时间以及处理的线程数。这个是需要进行考量的。