开发Kafka_Streams晋阶之路(数据状态)

数据本来平淡无奇,一旦有了思想,数据变成为了有想法的一种境界状态

知识点

  1. 状态在流中的重要性
  2. 状态存储阅读历史,展望未来
  3. 分区一致.数据正确
  4. 状态存储的选择前置条件
  5. 使用状态存储
  6. 代码示例

状态在流中的重要性

  1. 什么是状态?

状态是人赋予的主观意识判断。本身数据是无状态的。举个例子:股票的买卖交易,每天的交易买入卖出是一件很正常的事情。
比如:9点30分买入1000只股票。10点买入1000只股票。 这个数据流现在无状态的。

但是如果在10点30分的时候如果该只股票出现了重大利好重组交易,数据从无变为了有状态。而这时候我们的主观意识已经在判断,是否前面这个买卖交易是否正常,这也可以说明这个是我们的价值观

  1. 流是否需要状态?

一个事件本身产生的流没有特别的特殊性,但是如果产生一些额外的上下文,那么可能会错过一些关键的机会,甚至于你可能会以全新的视角来看待这个事情.

很多时候流式处理意味着:彼此之间没有关联。源源不断的数据,当发生时就一定要加以时间处理。而状态的概念可能会产生静态的资源映像,比如对应到数据库中。

数据流的变化速度往往比数据表更新的更快更频繁。有些情况下,离散的数据已经携带了足够多的数据。但是通常情况下,数据流需要从某类存储的数据来加以丰富。

将状态操作应用到kafka_Stream中

在前期的奖励节点中.对于每次客户的消费累计的奖励点数未做任何要求,如果假定现在用户的每一次消费达到一点奖励点数,我们会附上额外的动作,也是将本来不带状态的值,变为有状态的意义

1
2
3
4
5
public class RewardAccumulator {

private String customerId;
private double purchaseTotal;
}

转变为

1
2
3
4
5
6
7
8
9
10
11
public class RewardAccumulator {

private String customerId;
private double purchaseTotal;
//添加当前这笔消费的奖励点数
private int currentRewardPoints;
//添加最后一次最后购买的时间
private int daysFromLastPurchase;
//累计的奖励总点数
private long totalRewardPoints;
}

注意:设置奖励点的总数代码为:

1
2
3
public void addRewardPoints(int previousTotalPoints) {
this.totalRewardPoints += previousTotalPoints;
}

状态存储阅读历史,展望未来

如何进行存储

  1. 在kafkaStream中提供了一个基本的有状态的函数 transformvalues()
  2. 提供一个值转换器,转换器接口为ValueTransformer<V,R>,设置状态储存

构建一个值转换器代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class PurchaseRewardTransformer  implements  ValueTransformer<PurchaseRecord, RewardAccumulator>{
//声明一个状态存储变量
private KeyValueStore<String, Integer> stateStore;
//声明一个状态存储的名字
private final String storeName;
//声明上下文容器
private ProcessorContext context;

public PurchaseRewardTransformer(String storeName) {

this.storeName = storeName;
}


@Override
public void init(ProcessorContext context) {
// TODO Auto-generated method stub
this.context = context;
stateStore = (KeyValueStore<String, Integer>) this.context.getStateStore(storeName);

}



/**
* 执行值转换时.会进入到transform方法中
*/
@Override
public RewardAccumulator transform(PurchaseRecord value) {
// TODO Auto-generated method stub
//1. 执行和mapvalues一样的动作
RewardAccumulator reward = ActionUtil.getReward(value);
System.out.println(reward.hashCode());
//2.使用状态存储总的奖励点数
Integer x = stateStore.get(reward.getCustomerId());
if (x !=null) {
System.out.println("进入条件判断");
System.out.println(x);
int total = x+reward.getCurrentRewardPoints();
System.out.println(total);
stateStore.put(reward.getCustomerId(), total);
}else {
stateStore.put(reward.getCustomerId(), (int) reward.getCurrentRewardPoints());
}
System.out.println("当前用户"+reward.getCustomerId()+"的奖励点数为"+stateStore.get(reward.getCustomerId()));
System.out.println("执行相信的逻辑操作");
return reward;
}

@Override
public RewardAccumulator punctuate(long timestamp) {
// TODO Auto-generated method stub
return null;
}

@Override
public void close() {
// TODO Auto-generated method stub

}



}

分区一致,数据正确

在kafkaStreams中在没有指定分区的时候,是按照轮询进行分区。而分区中有着对应的StreamTask.每个Task中有自己不同状态存储.对于上面我们使用存储,有可能客户的交易信息不会在同一个分区中.那么这个时候只有指定到相同的分区进行解决

使用流分区器解决

  1. 通过kafkaStream中的through可以创建一个中间主题,达到无缝分区
  2. 通过自定义分区器,解决数据分配到不同地方的问题

自定义流分区器如下:

1
2
3
4
5
6
7
8
9
10
11
12
public class RewardsStreamPartitioner  implements StreamPartitioner<String, PurchaseRecord>{

/**
* 根据用户的姓氏进行匹配对应的分区
*/
@Override
public Integer partition(String key, PurchaseRecord value, int numPartitions) {
// TODO Auto-generated method stub
return value.getFirstName().hashCode()%numPartitions;
}

}

状态存储的选择前置条件

  1. 数据本地化
  2. 故障恢复和容错

数据本地化

数据本地化对性能至关重要,通过一个流式程序处理百万级以上的数据,即使很小的网络延迟也会产生巨大的影响

流式程序尽管需要状态但是不是绝对的必要,但是应该设计在本地。应用程序的每个服务器和节点都应该有一个单独的数据存储

进程和线程之间不共享,即便一个进程失败,但不会影响其它的进程和线程

故障恢复和容错

在kafkaStreams中每个处理器都它的本地存储和一个用于备份状态存储的变更日志主题

使用状态存储

  1. kafka添加状态存储使用stores类静态工厂创建storesupplier实例.
  2. 用于定制存储的附加类使用Meterialized(计量类)以及StoreBuilder类,高阶推荐用前者,低阶推荐后者。
  3. 除了本身这2个类之外,还分别提供了persistentKeyValueStore和lruMap,persistentWindowStore,persistentSeesionStore

状态存储容错以及改变日志主题

状态存储容错

  1. 所有的stateStoreSupplier默认都启用了日志,日志即是一个主题,该主题的作用是变更日志用来被封存储中的值,提供容错

配置变更日志主题

  1. 用于状态的变更日志采用压缩策略的主题.可以使用withLoggingEnabled进行配置

代码示例

注意:重写奖励类的equals方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class StoreStream {
private static final Logger LOG = LoggerFactory.getLogger(StoreStream.class);

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Store-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
// 配置当前时间
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

// 加载配置
StreamsConfig streamsConfig = new StreamsConfig(props);
// 构建序列化器
// 默认的
Serde<String> stringSerde = Serdes.String();
// 自定义的(可优化)
JsonSerializer<PurchaseRecord> ps = new JsonSerializer<>();
JsonDeserializer<PurchaseRecord> pds = new JsonDeserializer<>(PurchaseRecord.class);
JsonSerializer<RewardAccumulator> rs = new JsonSerializer<>();
JsonDeserializer<RewardAccumulator> rds = new JsonDeserializer<>(RewardAccumulator.class);
// 构建自定义序列化器
Serde<PurchaseRecord> PurchaseRecordSerde = Serdes.serdeFrom(ps, pds);
Serde<RewardAccumulator> RewardAccumulatorSerde = Serdes.serdeFrom(rs, rds);
// 创建流的构造器
StreamsBuilder streamsBuilder = new StreamsBuilder();
// 流拓扑
/**
* mapValues 原始的键不会发生变化 ,可获取到传递进来的Value值
*/
KStream<String, PurchaseRecord> PurchaseRecordStream = streamsBuilder
.stream("transactions", Consumed.with(stringSerde, PurchaseRecordSerde))
.mapValues(pr -> ActionUtil.mask(pr));

/**
* 加入状态处理器
*/
//状态名字
String rewardsStateStoreName = "rewardsPointsStore";
//指定分区规则,集群环境下
RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
//选择存储状态的类型
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.Integer());
/**
* storeBuilder.withLoggingEnabled(config) 可以使用该方法进行日志主题的配置
*/
//添加到拓扑中
streamsBuilder.addStateStore(storeBuilder);

/**
* 将原本的流设计重新分区的方式,通过产生一个中间主题进行操作
* 因为现在只有一个分区。故不配置分区
*/

//KStream<String, PurchaseRecord> transByCustomerStream = PurchaseRecordStream.through( "customer_transactions", Produced.with(stringSerde, PurchaseRecordSerde, streamPartitioner));
KStream<String, PurchaseRecord> transByCustomerStream = PurchaseRecordStream.through( "customer_transactions", Produced.with(stringSerde, PurchaseRecordSerde));

/**
* 转换为有状态的流
*/
KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(() -> new PurchaseRewardTransformer(rewardsStateStoreName),
rewardsStateStoreName);

statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("rewards"));

/**
* 开起流
*/
// 开启流
final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfig);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Exception e) {
System.exit(1);
}
System.exit(0);

}

}

代码测试观察结果

  1. 启动zookeeper
  2. 启动kafka
  3. 启动流
  4. 模拟数据

数据结果如下:(包含服务器断掉重连,依旧保持先前的记录)

IMAGE