自定义数据类型,利用kafka进行数据传递,以及如何去定义流拓扑。什么是流,我们需要有一个初步认识
从零到有的kafka进阶
- 环境的准备
- 利用kafkaStreams实现helloworld-HELLOWORLD
- 利用kafka自定义序列化器以及构建生产者和消费者
环境的准备
- jdk1.8
- maven构建普通项目
- 引入关联的jar包
- 引入log4j
pom.xml如下
1 | <dependencies> |
lo4j配置
1 | log4j.rootLogger=INFO, stdout |
利用kafkaStreams实现helloworld-HELLOWORLD
流式处理API(DSL)
- 高级API的核心是KStream对象(该对象代表流/值记录)
- DSL方法都返回了一个KStream对象的引用
- 返回的KStream对象是一个新的实例。而不是最初的实例
构建一个简单的流向图
类似的步骤
- 定义配置项
- 创建自定义或预定义的Serde实例
- 创建处理器拓扑
- 创建和启动KStream
补充说明:kafka流在进行处理的时候,数据会进行序列化和反序列化操作。在kafka流中,默认提供了一个Serdes类来构建Serde对象。而默认提供以下类型作为支撑1.String 2.byte数组 3.Long 4.Integer 5.Double
代码示例
1 | package com.wwj.kafka; |
操作步骤
- 启动zookeeper和kafka
- 创建2个主题,一个src-topic 和 out-topic
- 建立一个生产者和消费者
- 启动流式就计算程序
1 | bin/zookeeper-server-start.sh config/zookeeper.properties |
最终结果如下:
利用kafka自定义序列化器以及构建生产者和消费者
创建数据对象
1 | /** |
创建序列化器
1.使用序列化器(可选则用GSON)
1 | <dependency> |
2.序列化器和反序列化器的代码如下(kafka在传递数据的时候,会将数据转换为字节,然后消费的时候将字节转换为具体的对象)
1 | //自定义序列化器 |
1 | //自定义反序列化器 |
创建生产者
1 | public class TestProducer { |
创建消费者
1 | public class TestConsumer { |
所有代码均通过测试