开发Kafka_Streams1

自定义数据类型,利用kafka进行数据传递,以及如何去定义流拓扑。什么是流,我们需要有一个初步认识

从零到有的kafka进阶

  1. 环境的准备
  2. 利用kafkaStreams实现helloworld-HELLOWORLD
  3. 利用kafka自定义序列化器以及构建生产者和消费者

环境的准备

  1. jdk1.8
  2. maven构建普通项目
  3. 引入关联的jar包
  4. 引入log4j

关于log4j的使用可参考

pom.xml如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>

lo4j配置

1
2
3
4
5
6
log4j.rootLogger=INFO, stdout
# stdout Appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

利用kafkaStreams实现helloworld-HELLOWORLD

流式处理API(DSL)

  1. 高级API的核心是KStream对象(该对象代表流/值记录)
  2. DSL方法都返回了一个KStream对象的引用
  3. 返回的KStream对象是一个新的实例。而不是最初的实例

构建一个简单的流向图

IMAGE

类似的步骤

  1. 定义配置项
  2. 创建自定义或预定义的Serde实例
  3. 创建处理器拓扑
  4. 创建和启动KStream

补充说明:kafka流在进行处理的时候,数据会进行序列化和反序列化操作。在kafka流中,默认提供了一个Serdes类来构建Serde对象。而默认提供以下类型作为支撑1.String 2.byte数组 3.Long 4.Integer 5.Double

关于序列化的原理和认知

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.wwj.kafka;


import java.util.Properties;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamsHelloWorld {

public final static Logger LOG = LoggerFactory.getLogger(KafkaStreamsHelloWorld.class);

public static void main(String[] args) throws InterruptedException {
//配置基本属性
Properties props = new Properties();
//每个流式程序有特定的id和节点(必选)
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "helloworl_app_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsConfig streamsConfig = new StreamsConfig(props);
Serde<String> stringSerde = Serdes.String();
//1.构建流的创建者实例
StreamsBuilder builder = new StreamsBuilder();
// 1.1 接收源数据产生的处理器流
KStream<String,String> simpleFirstStream = builder.stream("src-topic", Consumed.with(stringSerde, stringSerde));
//1.2 将数据全部转化为大写 V代表接收值的类型,VR代表
KStream<String, String> upperCaseStream = simpleFirstStream.mapValues(new ValueMapper<String, String>() {

@Override
public String apply(String value) {

return value.toUpperCase();
}
});
//1.3 将处理后的值送到输出主题中
upperCaseStream.through("out-topic", Produced.with(stringSerde, stringSerde));

//2. 构建流容器
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);

//3.启动流式程序
kafkaStreams.start();
Thread.sleep(60000);
LOG.info("流式程序结束");
kafkaStreams.close();
}

}

操作步骤

  1. 启动zookeeper和kafka
  2. 创建2个主题,一个src-topic 和 out-topic
  3. 建立一个生产者和消费者
  4. 启动流式就计算程序
1
2
3
4
5
6
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --topic src-topic --replication-factor 1 --partitions 1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --topic out-topic --replication-factor 1 --partitions 1 --zookeeper localhost:2181
bin/kafka-console-producer.sh --topic src-topic --broker-list localhost:9092
bin/kafka-console-consumer.sh --topic out-topic --bootstrap-server localhost:9092 --from-beginning

最终结果如下:

IMAGE

IMAGE

利用kafka自定义序列化器以及构建生产者和消费者

创建数据对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 测试数据模型
* @author Yun
*
*/
public class OrderMsg {
//消费卡号
private String cardNumber;
//消费人
private String personName;
//消费金额
private Double money;
public String getCardNumber() {
return cardNumber;
}
public void setCardNumber(String cardNumber) {
this.cardNumber = cardNumber;
}
public String getPersonName() {
return personName;
}
public void setPersonName(String personName) {
this.personName = personName;
}
public Double getMoney() {
return money;
}
public void setMoney(Double money) {
this.money = money;
}
public OrderMsg(String cardNumber, String personName, Double money) {
super();
this.cardNumber = cardNumber;
this.personName = personName;
this.money = money;
}
public OrderMsg() {
super();
// TODO Auto-generated constructor stub
}
@Override
public String toString() {
return "OrderMsg [cardNumber=" + cardNumber + ", personName=" + personName + ", money=" + money + "]";
}

}

创建序列化器

1.使用序列化器(可选则用GSON)

1
2
3
4
5
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>

2.序列化器和反序列化器的代码如下(kafka在传递数据的时候,会将数据转换为字节,然后消费的时候将字节转换为具体的对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//自定义序列化器
public class JsonSer<T> implements Serializer<T> {

private Gson gson = new Gson();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String topic, T data) {
// TODO Auto-generated method stub
return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
}

@Override
public void close() {
// TODO Auto-generated method stub

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//自定义反序列化器
public class JsonDser<T> implements Deserializer<T>{
private Gson gson = new Gson();
private Class<T> deserClass;

public JsonDser(Class<T> deserClass){
this.deserClass = deserClass;
}

public JsonDser() {
// TODO Auto-generated constructor stub
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public T deserialize(String topic, byte[] data) {
// TODO Auto-generated method stub

return gson.fromJson(new String(data), deserClass);
}

@Override
public void close() {
// TODO Auto-generated method stub

}
}

创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TestProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", new JsonSer<OrderMsg>().getClass());
properties.put("acks", "1");
properties.put("retries", "3");
properties.put("compression.type", "snappy");
System.out.println(new JsonSer<OrderMsg>().getClass());
KafkaProducer<String, OrderMsg> kp = new KafkaProducer<String,OrderMsg>(properties);
kp.send(new ProducerRecord<String, OrderMsg>("test-topic", new OrderMsg("005", "zzz", 100.1)));
kp.close();
}
}

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "simple-consumer-example1");
properties.put("auto.offset.reset", "earliest");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "3000");
KafkaConsumer<String,OrderMsg> ks = new KafkaConsumer<String,OrderMsg>(properties,new StringDeserializer(),new JsonDser<>(OrderMsg.class));
ks.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, OrderMsg> records = ks.poll(1000);
for (ConsumerRecord<String, OrderMsg> record : records) {
String message = String.format("Consumed: key = %s value = %s with offset = %d ",
record.key(), record.value().toString(), record.offset());
System.out.println(message);
}
}
}
}

所有代码均通过测试

后期参考springboot整合kafka并自定义序列化器