您的位置:首页 > 编程语言 > Java开发

springboot2.x+kafka使用和源码分析六(自定义序列化器)

2020-01-11 18:26 609 查看

 

Apache Kafka提供了一个高级API,用于对值及其键进行序列化和反序列化。

  1. org.apache.kafka.common.serialization.Serializer<T> 序列化器
  2. org.apache.kafka.common.serialization.Deserializer<T> 反序列化器

我们可以使用
Producer
Consumer
配置属性来指定序列化器和反序列化器类。如下图所示:

[code]        //生产者
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
//key 序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
//value 序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

//消费者
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
//key 反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
//value 反序列化
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

1:kafka也默认提供一些内置实现类:

如ByteArraySerializer,IntegerSerializer,LongSerializer,StringSerializer等。源码在kafka-clients包中如下图所示:

2:spring提供序列化器

2.1 JsonSerializer JsonDeserializer

         JsonSerializer<T>:可以将任意的Java对象序列化为byte[]数组

          核心代码:

[code]
@Override
@Nullable
public byte[] serialize(String topic, @Nullable T data) {
if (data == null) {
return null;
}
try {
return this.objectMapper.writeValueAsBytes(data);
}
catch (IOException ex) {
throw new SerializationException("Can't serialize data [" + data + "] for topic [" + topic + "]", ex);
}
}

        JsonDeserializer<T>:需要添加的

Class<?> targetType
参数将byte[]反序列化为指定bean对象

         核心源码为:

[code]	@Override
public T deserialize(String topic, @Nullable byte[] data) {
if (data == null) {
return null;
}
Assert.state(this.reader != null, "No headers available and no default type provided");
try {
return this.reader.readValue(data);
}
catch (IOException e) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
"] from topic [" + topic + "]", e);
}
}

2.2:其它类型

3:自定义序列化器

当上诉序列化器满足不了我们的需求时我们肯定自定义序列化器,例如我们发送的消息是一个javabean对象(当然我们可以将数据变为json字符串格式通过StringSeializer来进行序列化。而且我们一般都会这么做)。

定义messageBean

[code]import lombok.*;

import javax.validation.constraints.Max;
import javax.validation.constraints.NotNull;
import java.io.Serializable;

/**
* @author fangyuan
*/
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PersonInfo implements Serializable {

private static final long serialVersionUID = -5666930682610937456L;

@NotNull
private String name;

@Max(100)
private Integer age;

@NotNull
private String sex;
}

实现Serializer和Deserializer

[code]package com.demo.kafkaDemo.demo.Serializer;

import com.demo.kafkaDemo.demo.bean.PersonInfo;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
* 自定义序列化器
* @author fangyaun
*/
public class PersonInfoSerializer implements Serializer<PersonInfo> {

@Override
public byte[] serialize(String topic, PersonInfo personInfo) {

ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024);

try {
if (!(personInfo instanceof Serializable)) {
throw new IllegalArgumentException(this.getClass().getSimpleName() + " requires a Serializable payload but received an object of type [" + personInfo.getClass().getName() + "]");
} else {
ObjectOutputStream objectOutputStream = null;
try {
objectOutputStream = new ObjectOutputStream(outputStream);
} catch (IOException e) {
e.printStackTrace();
}
objectOutputStream.writeObject(personInfo);
objectOutputStream.flush();
}
return outputStream.toByteArray();
} catch (Throwable ex) {
throw new SerializationException("Can't serialize data [" + personInfo + "] for topic [" + topic + "]", ex);
}
}

}
[code]package com.demo.kafkaDemo.demo.Serializer;

import com.demo.kafkaDemo.demo.bean.PersonInfo;
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.core.ConfigurableObjectInputStream;
import org.springframework.core.serializer.support.SerializationFailedException;

import java.io.ByteArrayInputStream;

/**
* 自定义反序列化器
* @author fangyaun
*/
public class PersonInfoDeserializer implements Deserializer<PersonInfo> {

@Override
public PersonInfo deserialize(String topic, byte[] bytes) {

ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);

try {
ConfigurableObjectInputStream objectInputStream = new ConfigurableObjectInputStream(inputStream,null);
return (PersonInfo) objectInputStream.readObject();

} catch (Throwable var4) {
throw new SerializationFailedException("Failed to deserialize payload for topic [" + topic + "]" , var4);
}

}

}

Demo项目github地址:https://github.com/fangyuan94/kafkaDemo

  • 点赞 1
  • 收藏
  • 分享
  • 文章举报
F_Hello_World 发布了38 篇原创文章 · 获赞 47 · 访问量 1062 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: