springboot2.x+kafka使用和源码分析六(自定义序列化器)
2020-01-11 18:26
609 查看
Apache Kafka提供了一个高级API,用于对值及其键进行序列化和反序列化。
org.apache.kafka.common.serialization.Serializer<T> 序列化器
org.apache.kafka.common.serialization.Deserializer<T> 反序列化器
我们可以使用Producer
或Consumer
配置属性来指定序列化器和反序列化器类。如下图所示:
[code] //生产者 Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); //key 序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); //value 序列化 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //消费者 Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); //key 反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); //value 反序列化 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
1:kafka也默认提供一些内置实现类:
如ByteArraySerializer,IntegerSerializer,LongSerializer,StringSerializer等。源码在kafka-clients包中如下图所示:
2:spring提供序列化器
2.1 JsonSerializer JsonDeserializer
JsonSerializer<T>:可以将任意的Java对象序列化为byte[]数组
核心代码:
[code] @Override @Nullable public byte[] serialize(String topic, @Nullable T data) { if (data == null) { return null; } try { return this.objectMapper.writeValueAsBytes(data); } catch (IOException ex) { throw new SerializationException("Can't serialize data [" + data + "] for topic [" + topic + "]", ex); } }
JsonDeserializer<T>:需要添加的
Class<?> targetType参数将byte[]反序列化为指定bean对象
核心源码为:
[code] @Override public T deserialize(String topic, @Nullable byte[] data) { if (data == null) { return null; } Assert.state(this.reader != null, "No headers available and no default type provided"); try { return this.reader.readValue(data); } catch (IOException e) { throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", e); } }
2.2:其它类型
3:自定义序列化器
当上诉序列化器满足不了我们的需求时我们肯定自定义序列化器,例如我们发送的消息是一个javabean对象(当然我们可以将数据变为json字符串格式通过StringSeializer来进行序列化。而且我们一般都会这么做)。
定义messageBean
[code]import lombok.*; import javax.validation.constraints.Max; import javax.validation.constraints.NotNull; import java.io.Serializable; /** * @author fangyuan */ @Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor public class PersonInfo implements Serializable { private static final long serialVersionUID = -5666930682610937456L; @NotNull private String name; @Max(100) private Integer age; @NotNull private String sex; }
实现Serializer和Deserializer
[code]package com.demo.kafkaDemo.demo.Serializer; import com.demo.kafkaDemo.demo.bean.PersonInfo; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; /** * 自定义序列化器 * @author fangyaun */ public class PersonInfoSerializer implements Serializer<PersonInfo> { @Override public byte[] serialize(String topic, PersonInfo personInfo) { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024); try { if (!(personInfo instanceof Serializable)) { throw new IllegalArgumentException(this.getClass().getSimpleName() + " requires a Serializable payload but received an object of type [" + personInfo.getClass().getName() + "]"); } else { ObjectOutputStream objectOutputStream = null; try { objectOutputStream = new ObjectOutputStream(outputStream); } catch (IOException e) { e.printStackTrace(); } objectOutputStream.writeObject(personInfo); objectOutputStream.flush(); } return outputStream.toByteArray(); } catch (Throwable ex) { throw new SerializationException("Can't serialize data [" + personInfo + "] for topic [" + topic + "]", ex); } } }
[code]package com.demo.kafkaDemo.demo.Serializer; import com.demo.kafkaDemo.demo.bean.PersonInfo; import org.apache.kafka.common.serialization.Deserializer; import org.springframework.core.ConfigurableObjectInputStream; import org.springframework.core.serializer.support.SerializationFailedException; import java.io.ByteArrayInputStream; /** * 自定义反序列化器 * @author fangyaun */ public class PersonInfoDeserializer implements Deserializer<PersonInfo> { @Override public PersonInfo deserialize(String topic, byte[] bytes) { ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); try { ConfigurableObjectInputStream objectInputStream = new ConfigurableObjectInputStream(inputStream,null); return (PersonInfo) objectInputStream.readObject(); } catch (Throwable var4) { throw new SerializationFailedException("Failed to deserialize payload for topic [" + topic + "]" , var4); } } }
Demo项目github地址:https://github.com/fangyuan94/kafkaDemo
- 点赞 1
- 收藏
- 分享
- 文章举报
相关文章推荐
- springboot2.x+kafka使用和源码分析七(消费者和生产者使用拦截器)
- springboot2.x +kafka使用和源码分析五(消费者配置使用)
- springboot2.x +kafka使用和源码分析二(topic配置)
- springboot2.x +kafka使用和源码分析一(自动装配)
- springboot2.x +redis使用和源码分析三(序列化器)
- springboot2.x +kafka使用和源码分析八(自定义分区器)
- springboot2.x +kafka使用和源码分析四(kafka事务)
- springboot2.x +kafka使用和源码分析三(生产者配置)
- springboot2.x +redis使用和源码分析二(RedisTemplate)
- springboot2.x +redis使用和源码分析一(springboot自动装配源码分析)
- springboot源码分析10-ApplicationContextInitializer使用
- 自定义spring boot starter三部曲之三:源码分析spring.factories加载过程
- springboot源码分析4-springboot之SpringFactoriesLoader使用
- Springboot 2使用外部Tomcat源码分析
- springboot源码分析16-spring boot监听器使用
- springboot使用自定义配置文件
- Spring Boot下Druid连接池的使用配置分析
- spring boot + kafka 使用详细步骤
- Spring之WebContext不使用web.xml启动 初始化重要的类源码分析(Servlet3.0以上的)
- SpringBoot源码分析:spring的基本架构