开发Kafka_Streams晋阶之路(连接能力)

连接以为可以观测时间,有了时间,我们才会具有洞察力.

知识点

  1. 连接的目的,增加洞察力
  2. 时间戳的定义以及分类

连接的目的

在前期我们通过给定谓词(也就是加入筛选条件)将流分为了2类,比如钥匙类和小五金类

IMAGE

如何让这2个不同的流连接在一起,能够观察增加需求能力。

连接的要求

  1. 2个流以上
  2. 有一个相同的key,作为连接的条件

前期code如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class JoinStream {
private static final Logger LOG = LoggerFactory.getLogger(JoinStream.class);

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SecondZmart-Kafka-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
// 配置当前时间
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
// 加载配置
StreamsConfig streamsConfig = new StreamsConfig(props);
// 构建序列化器
// 默认的
Serde<String> stringSerde = Serdes.String();
// 自定义的(可优化)
JsonSerializer<PurchaseRecord> ps = new JsonSerializer<>();
JsonDeserializer<PurchaseRecord> pds = new JsonDeserializer<>(PurchaseRecord.class);
// 构建自定义序列化器
Serde<PurchaseRecord> PurchaseRecordSerde = Serdes.serdeFrom(ps, pds);
// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();
//初次屏蔽处理器
KStream<String, PurchaseRecord> PurchaseRecordStream = streamsBuilder
.stream("transactions", Consumed.with(stringSerde, PurchaseRecordSerde))
.mapValues(pr -> ActionUtil.mask(pr));

//定义谓词,以及选择key键
Predicate<String,PurchaseRecord> isOne =
(key,purchaseRecord) ->purchaseRecord.getDepartMent().equalsIgnoreCase("钥匙套");

Predicate<String,PurchaseRecord> isTwo =
(key,purchaseRecord) ->purchaseRecord.getDepartMent().equalsIgnoreCase("小五金");

//获得分流的集合,并且在之前选择相同的key值
KStream<String, PurchaseRecord>[] branchesStream =
PurchaseRecordStream.selectKey((k,v)-> v.getFirstName()+v.getLastName())
.branch(isOne,isTwo);
}
}

如何建立连接

  1. 连接的记录,需要创建一个ValueJoiner<V1,V2,R>对象.V1和V2代表着接收的2个连接对象,他们应该有相同的key.类型可以不同。R代表着可以选择组合后返回的新的对象

新的合并对象设计

对象设计如下:

1
2
3
4
5
6
7
public class CorrelatedPurchase {

private Date firstDate;
private Date seconDate;
private List<String> purchaseListItem;
private double totalAmount;
}

ValueJoiner设计代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class PurchaseJoiner  implements ValueJoiner<PurchaseRecord, PurchaseRecord, CorrelatedPurchase> {

@Override
public CorrelatedPurchase apply(PurchaseRecord value1, PurchaseRecord value2) {
Date date1 =
value1 != null ? value1.getPurchaseDate():null;
Date date2 =
value2 != null ? value2.getPurchaseDate():null;

String purchaseName1 =
value1 != null ? value1.getItemPurchased():null;
String purchaseName2 =
value2 != null ? value2.getItemPurchased():null;
List<String> purchasedItem = new ArrayList<String>();
if (purchaseName1 != null) {
purchasedItem.add(purchaseName1);
}
if (purchaseName2 != null) {
purchasedItem.add(purchaseName2);
}
Double price1 = value1 !=null ? value1.getPrice():0.0;
Double price2 = value2 !=null ? value2.getPrice():0.0;

return ActionUtil.getNewFace(date1, date2, purchasedItem, price1+price2);
}

}

实现连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class JoinStream {
private static final Logger LOG = LoggerFactory.getLogger(JoinStream.class);

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SecondZmart-Kafka-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
// 配置当前时间
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
// 加载配置
StreamsConfig streamsConfig = new StreamsConfig(props);
// 构建序列化器
// 默认的
Serde<String> stringSerde = Serdes.String();
// 自定义的(可优化)
JsonSerializer<PurchaseRecord> ps = new JsonSerializer<>();
JsonDeserializer<PurchaseRecord> pds = new JsonDeserializer<>(PurchaseRecord.class);
// 构建自定义序列化器
Serde<PurchaseRecord> PurchaseRecordSerde = Serdes.serdeFrom(ps, pds);
// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();
//初次屏蔽处理器
KStream<String, PurchaseRecord> PurchaseRecordStream = streamsBuilder
.stream("transactions", Consumed.with(stringSerde, PurchaseRecordSerde))
.mapValues(pr -> ActionUtil.mask(pr));

//定义谓词,以及选择key键
Predicate<String,PurchaseRecord> isOne =
(key,purchaseRecord) ->purchaseRecord.getDepartMent().equalsIgnoreCase("钥匙套");

Predicate<String,PurchaseRecord> isTwo =
(key,purchaseRecord) ->purchaseRecord.getDepartMent().equalsIgnoreCase("小五金");

//获得分流的集合,并且在之前选择相同的key值
KStream<String, PurchaseRecord>[] branchesStream =
PurchaseRecordStream.selectKey((k,v)-> v.getFirstName()+v.getLastName())
.branch(isOne,isTwo);

// 连接流
KStream<String, PurchaseRecord> stream1 = branchesStream[0];
KStream<String, PurchaseRecord> stream2 = branchesStream[1];
/**
* 1.设置时间窗口
* 2.构建连接器
* 3.调用join产生连接.joined分别代表着key值序列化器,以及2个流value值的序列化器
*/
JoinWindows f15sWindow = JoinWindows.of(60*1000);
ValueJoiner<PurchaseRecord, PurchaseRecord, CorrelatedPurchase> joiner = new PurchaseJoiner();
KStream<String, CorrelatedPurchase> joinedKStream =
stream1.join(stream2, joiner,f15sWindow,Joined.with(stringSerde, PurchaseRecordSerde, PurchaseRecordSerde));

joinedKStream.print(Printed.<String, CorrelatedPurchase>toSysOut().withLabel("joinedStream"));

/**
* 开起流
*/
// 开启流
final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Exception e) {
System.exit(1);
}
System.exit(0);

}

}

代码测试

[joinedStream]: 王小十四, CorrelatedPurchase [firstDate=Mon Dec 30 11:18:12 CST 2019, seconDate=Mon Dec 30 11:18:21 CST 2019, purchaseListItem=[小小锅, 小小锅], totalAmount=40.0]
[joinedStream]: 王小十四, CorrelatedPurchase [firstDate=Mon Dec 30 11:18:45 CST 2019, seconDate=Mon Dec 30 11:18:21 CST 2019, purchaseListItem=[小小锅1, 小小锅], totalAmount=40.0]
[joinedStream]: 王小十四, CorrelatedPurchase [firstDate=Mon Dec 30 11:19:30 CST 2019, seconDate=Mon Dec 30 11:19:58 CST 2019, purchaseListItem=[榔头, 锤子], totalAmount=40.0]
[joinedStream]: 王小十四, CorrelatedPurchase [firstDate=Mon Dec 30 11:19:46 CST 2019, seconDate=Mon Dec 30 11:19:58 CST 2019, purchaseListItem=[打榔头, 锤子], totalAmount=40.0]
[joinedStream]: 王小十四, CorrelatedPurchase [firstDate=Mon Dec 30 11:21:12 CST 2019, seconDate=Mon Dec 30 11:20:57 CST 2019, purchaseListItem=[锤子1, 锤子], totalAmount=40.0]
[joinedStream]: 王小十四, CorrelatedPurchase [firstDate=Mon Dec 30 11:31:31 CST 2019, seconDate=Mon Dec 30 11:31:09 CST 2019, purchaseListItem=[锤子4, 锤子2], totalAmount=40.0]
[joinedStream]: 王小十四, CorrelatedPurchase [firstDate=Mon Dec 30 11:31:31 CST 2019, seconDate=Mon Dec 30 11:31:17 CST 2019, purchaseListItem=[锤子4, 锤子3], totalAmount=40.0]

只要符合交易时间,都会出发对应的链接操作.只要满足连接操作,可用foreachAction或者发送主题信息做对应的逻辑动作

连接的进阶

记录的先后顺序

从上面结果可以看到,数据的时间并没有关注先来后到,只要满足1分钟之内及产生逻辑

如何要指定顺序:

需要使用 JoinWindows.after 或者 JoinWindows.before

分别代表着 streamA.join(streamB) B的记录时间戳比A的记录时间滞后或者是提前.

连接的前提首要条件

在Stream中执行连接操作,必要要保证数据具有相同数量的分区,按键分区且键的类型相同
通过selectKey出发了重新分区的要求。这个是被自动处理的.这个动作我们归纳为协同分区

连接的动作

  1. join 等同于 innerjoin
  2. outerJoin 等同于左右连接都满足
  3. leftJoin 只需要左连接

kafkaStreams中时间戳

时间戳的作用有3点:

  1. 连接流
  2. 更新变更日志
  3. 决定方法合适被触发

时间戳被kafkaStreams分为了三类

事件发生时间

特指事件发生时候的时间,通常设置在内置对象中,当然也可以考虑创建生产者的时间为事件时间

摄取时间

特指数据首次进入数据处理管道时设置的时间戳。可以考虑日志追加时间LogAppendTime作为摄取时间

处理时间

特指数据或者记录首次开始流经处理管道时设置的时间戳。

处理不同的时间语义

  1. 时间戳提取器 TimeStampExtractor接口
  2. kafkaStreams自带一个处理时间语义WallclockTimestampExtractor本质是通过调用系统当前时间,以毫秒数返回当前时间

自定义时间戳

代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
public class TransactionTimestampExtractor  implements  TimestampExtractor{

@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
// 获取发到kafka中的数据对象
PurchaseRecord ptRecord = (PurchaseRecord) record.value();
//返回数据本省所内置的时间
return ptRecord.getPurchaseDate().getTime();
}

}

配置方式

  1. 一种是在流程序中统一添加

props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TransactionTimestampExtractor.class);

  1. 一种是在Consumed中配置

Consumed.with(Serdes.1,Serdes.2).withTimestampExtractor(new TransactionTimestampExtractor());

梳理当前的DAG

IMAGE