开发Kafka_Streams2

熟知kafka提供的高级算子,以及熟练使用我们常用的算子是不可或缺的操作。

模拟数据进行流拓扑的设计

  1. 根据一个模拟的购物数据进行流的设计
  2. 设计规则以及如何将流切分为多个流(再分流)

根据一个模拟的购物数据进行流的设计

IMAGE

  1. 通过屏蔽处理器处理屏蔽卡号问题
  2. 提取购买的物品以及邮编,确定购买模式
  3. 获取会员号,以及金额.根据金额确定奖励
  4. 获取所有完成的数据,以备后续进行特定的分析

操作步骤

1.模拟数据对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class PurchaseRecord {
/**
* 用户姓名
*/
private String firstName;
private String lastName;
//用户信用卡编号
private String creditCardNumber;
//用户购买物品
private String itemPurchased;
//购物物品数量
private int quantity;
//物品单价
private double price;
//购买日期
private Date purchaseDate;
//用户会员卡编号
private String zipCode;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
PurchaseRecord record = new PurchaseRecord();
record.setFirstName("王");
record.setLastName("伟杰");
record.setCreditCardNumber("1000-5000-1987-0215");
record.setItemPurchased("福特钥匙扣");
record.setQuantity(1);
record.setPrice(28.00);
record.setPurchaseDate(new Date());
record.setZipCode("100187");
Gson g = new Gson();
System.out.println(g.toJson(record));
}

数据模拟效果如下:{"firstName":"王","lastName":"伟杰","creditCardNumber":"1000-5000-1987-0215","itemPurchased":"福特钥匙扣","quantity":1,"price":28.0,"purchaseDate":"Nov 19, 2019 4:55:58 PM","zipCode":"100187"}

2.构建通用的序列化器

说明:kafka以字节的方式传输数据,在传输数据的时候需要将对象转换为json,发送到对应的主题时候,需要转换成字节的数组,其次在消费的时候需要将主题中的字节数组转换成json,其次在转为对应的对象类型.当然上一节已经提到过.Kafka默认对一些类型进行了支持,比如String,Long,Integer等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 通用的序列化操作
* @author Yun
*
* @param <T>
*/
public class JsonSerializer<T> implements Serializer<T>{
private Gson g = new Gson();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String topic, T data) {
// TODO Auto-generated method stub
return g.toJson(data).getBytes(Charset.forName("UTF-8"));
}

@Override
public void close() {
// TODO Auto-generated method stub

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 通用的反序列化操作
* @author Yun
*
* @param <T>
*/
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson g = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer(Class<T> deserializedClass) {
super();
this.deserializedClass = deserializedClass;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public T deserialize(String topic, byte[] data) {
// TODO Auto-generated method stub
return g.fromJson(new String(data), deserializedClass);
}

@Override
public void close() {
// TODO Auto-generated method stub

}

}

3.编写对应的规则

1.处理信用卡的规则

2.提取购买的物品,以及邮编(购买模式的规则)

3.提取会员号以及话费的金额(奖励机制的规则)

构建一个工具类,以及规则所产出的对应的实体

1
2
3
4
5
6
public class PurchasePattern {
private String zipCode;
private String item;
private Date date;
private double amount;
}
1
2
3
4
public class RewardAccumulator {
private String customerId;
private double purchaseTotal;
}

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ActionUtil {
private final static String MARK = "xxxx-xxxx-xxxx";

/**
* 屏蔽信用卡号
* @param pr 需要被屏蔽的信息记录
* @return
*/
public static PurchaseRecord mask(PurchaseRecord pr){
String[] parts = pr.getCreditCardNumber().split("-");
pr.setCreditCardNumber(MARK+"-"+parts[3]);
return pr;

}
/**
* 提取部分数据
* @param pr 需要提取的记录信息
* @return
*/
public static PurchasePattern getSomeOne(PurchaseRecord pr){
PurchasePattern pp = new PurchasePattern();
pp.setZipCode(pr.getZipCode());
pp.setDate(pr.getPurchaseDate());
pp.setItem(pr.getItemPurchased());
pp.setAmount(pr.getPrice()*pr.getQuantity());
return pp;
}

public static RewardAccumulator getReward(PurchaseRecord pr){
RewardAccumulator ra = new RewardAccumulator();
ra.setCustomerId(pr.getFirstName()+pr.getLastName());
ra.setPurchaseTotal(pr.getPrice()*pr.getQuantity());
return ra;

}

}

测试结果数据如下:

1
2
3
 {"firstName":"王","lastName":"伟杰","creditCardNumber":"xxxx-xxxx-xxxx-0215","itemPurchased":"福特钥匙扣","quantity":1,"price":28.0,"purchaseDate":"Nov 20, 2019 5:07:13 PM","zipCode":"100187"}
{"zipCode":"100187","item":"福特钥匙扣","date":"Nov 20, 2019 5:07:13 PM","amount":28.0}
{"customerId":"王伟杰","purchaseTotal":28.0}
  1. 根据拓扑创建一个流式应用程式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class FirstStream {

private static final Logger LOG = LoggerFactory.getLogger(FirstStream.class);

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstZmart-Kafka-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
// 配置当前时间
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

// 加载配置
StreamsConfig streamsConfig = new StreamsConfig(props);
// 构建序列化器
// 默认的
Serde<String> stringSerde = Serdes.String();
// 自定义的(可优化)
JsonSerializer<PurchaseRecord> ps = new JsonSerializer<>();
JsonDeserializer<PurchaseRecord> pds = new JsonDeserializer<>(PurchaseRecord.class);
JsonSerializer<PurchasePattern> pps = new JsonSerializer<>();
JsonDeserializer<PurchasePattern> ppds = new JsonDeserializer<>(PurchasePattern.class);
JsonSerializer<RewardAccumulator> rs = new JsonSerializer<>();
JsonDeserializer<RewardAccumulator> rds = new JsonDeserializer<>(RewardAccumulator.class);
// 构建自定义序列化器
Serde<PurchaseRecord> PurchaseRecordSerde = Serdes.serdeFrom(ps, pds);
Serde<PurchasePattern> PurchasePatternSerde = Serdes.serdeFrom(pps, ppds);
Serde<RewardAccumulator> RewardAccumulatorSerde = Serdes.serdeFrom(rs, rds);

// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();
// 流拓扑
/**
* mapValues 原始的键不会发生变化 ,可获取到传递进来的Value值
*/
KStream<String, PurchaseRecord> PurchaseRecordStream = streamsBuilder
.stream("transactions", Consumed.with(stringSerde, PurchaseRecordSerde))
.mapValues(pr -> ActionUtil.mask(pr));
/**
* 因为机器缘由,我们可以通过打印来替代 这一步等于处理完数据后发送给对应的主题,后续配置消费者消费即可
* PurchaseRecordStream.to("Purchase", Produced.with(stringSerde,
* PurchaseRecordSerde));
*/
PurchaseRecordStream.print(Printed.<String, PurchaseRecord>toSysOut().withLabel("PurchaseRecord"));

// 同理如下
KStream<String, PurchasePattern> PurchasePatternStream = PurchaseRecordStream
.mapValues(pr -> ActionUtil.getSomeOne(pr));
PurchasePatternStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("PurchasePattern"));

KStream<String, RewardAccumulator> RewardAccumulatorStream = PurchaseRecordStream
.mapValues(pr -> ActionUtil.getReward(pr));
RewardAccumulatorStream.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("RewardAccumulator"));

// 开启流
final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Exception e) {
System.exit(1);
}
System.exit(0);

}
}

5.配置生产者以及模拟数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TestProducer {
public static void main(String[] args) {
//模拟数据
PurchaseRecord record = new PurchaseRecord();
record.setFirstName("王");
record.setLastName("伟杰");
record.setCreditCardNumber("1000-5000-1987-0215");
record.setItemPurchased("福特钥匙扣");
record.setQuantity(1);
record.setPrice(28.00);
record.setPurchaseDate(new Date());
record.setZipCode("100187");

//配置生产者
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "1");
properties.put("retries", "3");
properties.put("compression.type", "snappy");
KafkaProducer<String, PurchaseRecord> kp = new KafkaProducer<String,PurchaseRecord>(properties,new StringSerializer(),new JsonSerializer<PurchaseRecord>());
kp.send(new ProducerRecord<String, PurchaseRecord>("transactions", record));
kp.close();
}
}

6.测试

  1. 启动zookeeper和kafka
  2. 创建一个transactions的主题
  3. 启动流程序
  4. 生产者发送消息

7.结果如下,对象要重写toString方法

1
2
3
[PurchaseRecord]: null, PurchaseRecord [firstName=徐, lastName=小二, creditCardNumber=xxxx-xxxx-xxxx-0213, itemPurchased=福特钥匙扣, quantity=1, price=28.0, purchaseDate=Wed Nov 20 18:18:35 CST 2019, zipCode=100187]
[PurchasePattern]: null, PurchasePattern [zipCode=100187, item=福特钥匙扣, date=Wed Nov 20 18:18:35 CST 2019, amount=28.0]
[RewardAccumulator]: null, RewardAccumulator [customerId=徐小二, purchaseTotal=28.0]

如何将流切分为多个流(再分流)

截止到现在,数据流的分配已经实现。接下我们需要细化一些规则

  1. 一定金额下面的信息我们需要进行过滤,小额交易可能对我们起不到任何帮助
  2. 在进入拓扑,默认没有对应的key值进行分类,这是需要我们为数据生成一个key值,以便做好新的归类
  3. 可能有一些的新的信息,接下来我们需要进行对应的分流到新的主题中
  4. 把一些需要的记录写入kafka之外

1.为模型添加一个Department字段

1
2
//用户购买物品的种类
private String departMent;

2.过滤小额交易物品以及选择时间作为key值已方便归类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SecondStream {
private static final Logger LOG = LoggerFactory.getLogger(SecondStream.class);

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstZmart-Kafka-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
// 配置当前时间
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

// 加载配置
StreamsConfig streamsConfig = new StreamsConfig(props);
// 构建序列化器
// 默认的
Serde<String> stringSerde = Serdes.String();
// 自定义的(可优化)
JsonSerializer<PurchaseRecord> ps = new JsonSerializer<>();
JsonDeserializer<PurchaseRecord> pds = new JsonDeserializer<>(PurchaseRecord.class);
// 构建自定义序列化器
Serde<PurchaseRecord> PurchaseRecordSerde = Serdes.serdeFrom(ps, pds);
// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();
// 流拓扑
/**
* mapValues 原始的键不会发生变化 ,可获取到传递进来的Value值
*/
KStream<String, PurchaseRecord> PurchaseRecordStream = streamsBuilder
.stream("transactions", Consumed.with(stringSerde, PurchaseRecordSerde))
.mapValues(pr -> ActionUtil.mask(pr));

/**
* 过滤小额操作以及选择特定的字段作为key
*/
//声明keyValueMapper创建谓词匹配对应的key值
KeyValueMapper<String, PurchaseRecord, Long> PurchaseRecordAsDateKey =
(key,PurchaseRecord) -> PurchaseRecord.getPurchaseDate().getTime();
//过滤小额交易的操作进入对应的流并选择特定的条件作为key值 使用filter可以进行条件进行过滤
KStream<Long,PurchaseRecord> filteredKStream =
PurchaseRecordStream
.filter((key,purchaseRecord) -> purchaseRecord.getPrice()>5.00)
.selectKey(PurchaseRecordAsDateKey);
filteredKStream.print(Printed.<Long, PurchaseRecord>toSysOut().withLabel("purchases"));

}
}

分流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 分流的操作 分流可以根据数据特定的条件进行分流 需要使用到特定的谓词条件 Predicate
*/
Predicate<String,PurchaseRecord> isOne =
(key,purchaseRecord) ->purchaseRecord.getDepartMent().equalsIgnoreCase("钥匙套");

Predicate<String,PurchaseRecord> isTwo =
(key,purchaseRecord) ->purchaseRecord.getDepartMent().equalsIgnoreCase("小五金");

//返回分流的数组
KStream<String, PurchaseRecord>[] kstreamByDepart =
PurchaseRecordStream.branch(isOne,isTwo);
kstreamByDepart[0].print(Printed.<String, PurchaseRecord>toSysOut().withLabel("钥匙类"));
kstreamByDepart[1].print(Printed.<String, PurchaseRecord>toSysOut().withLabel("小五金"));

使用foreach操作将记录写入kafka之外

假定会员编码为xxxx-xxxx-xxxx-0000为恶意会员码

1
2
3
4
5
6
//会员编码为恶意编码
ForeachAction<String, PurchaseRecord> purchaseRecordForeachAction =
(key,purchaseRecord) -> System.out.println("做额外的操作");
PurchaseRecordStream
.filter((key,purchaseRecord) ->purchaseRecord.getZipCode().equals("xxxx-xxxx-xxxx-0000") )
.foreach(purchaseRecordForeachAction);

测试代码是否通过

  1. 启动zookeeper和kafka
  2. 启动流程序
  3. 生产者输送信息到transactions的主题

测试结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[purchases]: 1575279440000, PurchaseRecord [firstName=徐, lastName=小四, creditCardNumber=xxxx-xxxx-xxxx-0001, itemPurchased=福特钥匙扣, departMent=钥匙套, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:37:20 CST 2019, zipCode=100187]
[钥匙类]: null, PurchaseRecord [firstName=徐, lastName=小四, creditCardNumber=xxxx-xxxx-xxxx-0001, itemPurchased=福特钥匙扣, departMent=钥匙套, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:37:20 CST 2019, zipCode=100187]
[purchases]: 1575279951000, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=福特钥匙扣, departMent=钥匙套, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:45:51 CST 2019, zipCode=100187]
[钥匙类]: null, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=福特钥匙扣, departMent=钥匙套, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:45:51 CST 2019, zipCode=100187]
[purchases]: 1575280227000, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=福特钥匙扣, departMent=钥匙套, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:50:27 CST 2019, zipCode=100187]
[钥匙类]: null, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=福特钥匙扣, departMent=钥匙套, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:50:27 CST 2019, zipCode=100187]
[purchases]: 1575280293000, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=福特钥匙扣, departMent=钥匙套, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:51:33 CST 2019, zipCode=000000]
[钥匙类]: null, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=福特钥匙扣, departMent=钥匙套, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:51:33 CST 2019, zipCode=000000]
做额外的操作
[purchases]: 1575280333000, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=汤勺, departMent=小五金, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:52:13 CST 2019, zipCode=100187]
[小五金]: null, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=汤勺, departMent=小五金, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:52:13 CST 2019, zipCode=100187]
[purchases]: 1575280351000, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=大锅, departMent=小五金, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:52:31 CST 2019, zipCode=100187]
[小五金]: null, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=大锅, departMent=小五金, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:52:31 CST 2019, zipCode=100187]
[小五金]: null, PurchaseRecord [firstName=徐, lastName=小五, creditCardNumber=xxxx-xxxx-xxxx-0002, itemPurchased=大锅, departMent=小五金, quantity=1, price=5.0, purchaseDate=Mon Dec 02 17:52:42 CST 2019, zipCode=100187]

完整代码示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.wwj.streams;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.wwj.model.PurchaseRecord;
import com.wwj.serde.JsonDeserializer;
import com.wwj.serde.JsonSerializer;
import com.wwj.util.ActionUtil;

public class SecondStream {
private static final Logger LOG = LoggerFactory.getLogger(SecondStream.class);

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SecondZmart-Kafka-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
// 配置当前时间
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

// 加载配置
StreamsConfig streamsConfig = new StreamsConfig(props);
// 构建序列化器
// 默认的
Serde<String> stringSerde = Serdes.String();
// 自定义的(可优化)
JsonSerializer<PurchaseRecord> ps = new JsonSerializer<>();
JsonDeserializer<PurchaseRecord> pds = new JsonDeserializer<>(PurchaseRecord.class);
// 构建自定义序列化器
Serde<PurchaseRecord> PurchaseRecordSerde = Serdes.serdeFrom(ps, pds);
// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();
// 流拓扑
/**
* mapValues 原始的键不会发生变化 ,可获取到传递进来的Value值
*/
KStream<String, PurchaseRecord> PurchaseRecordStream = streamsBuilder
.stream("transactions", Consumed.with(stringSerde, PurchaseRecordSerde))
.mapValues(pr -> ActionUtil.mask(pr));

/**
* 过滤小额操作以及选择特定的字段作为key
*/
//声明keyValueMapper创建谓词匹配对应的key值
KeyValueMapper<String, PurchaseRecord, Long> PurchaseRecordAsDateKey =
(key,PurchaseRecord) -> PurchaseRecord.getPurchaseDate().getTime();
//过滤小额交易的操作进入对应的流并选择特定的条件作为key值 使用filter可以进行条件进行过滤
KStream<Long,PurchaseRecord> filteredKStream =
PurchaseRecordStream
.filter((key,purchaseRecord) -> purchaseRecord.getPrice()>5.00)
.selectKey(PurchaseRecordAsDateKey);
filteredKStream.print(Printed.<Long, PurchaseRecord>toSysOut().withLabel("purchases"));
/**
* 分流的操作 分流可以根据数据特定的条件进行分流 需要使用到特定的谓词条件 Predicate
*/
Predicate<String,PurchaseRecord> isOne =
(key,purchaseRecord) ->purchaseRecord.getDepartMent().equalsIgnoreCase("钥匙套");

Predicate<String,PurchaseRecord> isTwo =
(key,purchaseRecord) ->purchaseRecord.getDepartMent().equalsIgnoreCase("小五金");

//返回分流的数组
KStream<String, PurchaseRecord>[] kstreamByDepart =
PurchaseRecordStream.branch(isOne,isTwo);
kstreamByDepart[0].print(Printed.<String, PurchaseRecord>toSysOut().withLabel("钥匙类"));
kstreamByDepart[1].print(Printed.<String, PurchaseRecord>toSysOut().withLabel("小五金"));
//会员编码为恶意编码
ForeachAction<String, PurchaseRecord> purchaseRecordForeachAction =
(key,purchaseRecord) -> System.out.println("做额外的操作");
PurchaseRecordStream
.filter((key,purchaseRecord) ->purchaseRecord.getZipCode().equals("xxxx-xxxx-xxxx-0000") )
.foreach(purchaseRecordForeachAction);

// 开启流
final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Exception e) {
System.exit(1);
}
System.exit(0);

}
}