熟知kafka提供的高级算子,以及熟练使用我们常用的算子是不可或缺的操作。
模拟数据进行流拓扑的设计
- 根据一个模拟的购物数据进行流的设计
- 设计规则以及如何将流切分为多个流(再分流)
根据一个模拟的购物数据进行流的设计
- 通过屏蔽处理器处理屏蔽卡号问题
- 提取购买的物品以及邮编,确定购买模式
- 获取会员号,以及金额.根据金额确定奖励
- 获取所有完成的数据,以备后续进行特定的分析
操作步骤
1.模拟数据对象
1 | public class PurchaseRecord { |
1 | public static void main(String[] args) { |
数据模拟效果如下:{"firstName":"王","lastName":"伟杰","creditCardNumber":"1000-5000-1987-0215","itemPurchased":"福特钥匙扣","quantity":1,"price":28.0,"purchaseDate":"Nov 19, 2019 4:55:58 PM","zipCode":"100187"}
2.构建通用的序列化器
说明:kafka以字节的方式传输数据,在传输数据的时候需要将对象转换为json,发送到对应的主题时候,需要转换成字节的数组,其次在消费的时候需要将主题中的字节数组转换成json,其次在转为对应的对象类型.当然上一节已经提到过.Kafka默认对一些类型进行了支持,比如String,Long,Integer等
1 | /** |
1 | /** |
3.编写对应的规则
1.处理信用卡的规则
2.提取购买的物品,以及邮编(购买模式的规则)
3.提取会员号以及话费的金额(奖励机制的规则)
构建一个工具类,以及规则所产出的对应的实体
1 | public class PurchasePattern { |
1 | public class RewardAccumulator { |
工具类
1 | public class ActionUtil { |
测试结果数据如下:
1 | {"firstName":"王","lastName":"伟杰","creditCardNumber":"xxxx-xxxx-xxxx-0215","itemPurchased":"福特钥匙扣","quantity":1,"price":28.0,"purchaseDate":"Nov 20, 2019 5:07:13 PM","zipCode":"100187"} |
- 根据拓扑创建一个流式应用程式
1 | public class FirstStream { |
5.配置生产者以及模拟数据
1 | public class TestProducer { |
6.测试
- 启动zookeeper和kafka
- 创建一个transactions的主题
- 启动流程序
- 生产者发送消息
7.结果如下,对象要重写toString方法
1 | [PurchaseRecord]: null, PurchaseRecord [firstName=徐, lastName=小二, creditCardNumber=xxxx-xxxx-xxxx-0213, itemPurchased=福特钥匙扣, quantity=1, price=28.0, purchaseDate=Wed Nov 20 18:18:35 CST 2019, zipCode=100187] |
如何将流切分为多个流(再分流)
截止到现在,数据流的分配已经实现。接下我们需要细化一些规则
- 一定金额下面的信息我们需要进行过滤,小额交易可能对我们起不到任何帮助
- 在进入拓扑,默认没有对应的key值进行分类,这是需要我们为数据生成一个key值,以便做好新的归类
- 可能有一些的新的信息,接下来我们需要进行对应的分流到新的主题中
- 把一些需要的记录写入kafka之外
1.为模型添加一个Department字段
1 | //用户购买物品的种类 |
2.过滤小额交易物品以及选择时间作为key值已方便归类
1 | public class SecondStream { |
分流
1 | /** |
使用foreach操作将记录写入kafka之外
假定会员编码为xxxx-xxxx-xxxx-0000为恶意会员码
1 | //会员编码为恶意编码 |
测试代码是否通过
- 启动zookeeper和kafka
- 启动流程序
- 生产者输送信息到transactions的主题
测试结果如下
1 | [purchases]: 1575279440000, PurchaseRecord [firstName=徐, lastName=小四, creditCardNumber=xxxx-xxxx-xxxx-0001, itemPurchased=福特钥匙扣, departMent=钥匙套, quantity=1, price=28.0, purchaseDate=Mon Dec 02 17:37:20 CST 2019, zipCode=100187] |
完整代码示例如下
1 | package com.wwj.streams; |