您的位置:首页 > 编程语言 > Java开发

springboot+kafka+邮件发送(最佳实践)

2019-08-01 17:18 2101 查看
  1. 导读
    集成spring-kafka,生产者生产邮件message,消费者负责发送
  2. 引入线程池,多线程发送消息
  3. 多邮件服务器配置
  4. 定时任务生产消息;计划邮件发送
  • 实现过程
      导入依赖
    <properties>
    <java.version>1.8</java.version>
    <mysql.version>5.1.38</mysql.version>
    <mapper.version>2.1.5</mapper.version>
    <mybatis.version>1.3.2</mybatis.version>
    <gson.version>2.8.2</gson.version>
    <lang3.version>3.4</lang3.version>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- Spring Test -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>${mybatis.version}</version>
    </dependency>
    <!--数据库驱动-->
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.version}</version>
    </dependency>
    <!-- 通用Mapper启动器 -->
    <dependency>
    <groupId>tk.mybatis</groupId>
    <artifactId>mapper-spring-boot-starter</artifactId>
    <version>${mapper.version}</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- 自定义配置文件需要 -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
    </dependency>
    <!-- 使用SLF4J + Logback 作为日志框架 -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-mail-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-mail</artifactId>
    </dependency>
    <dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>${gson.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>${lang3.version}</version>
    </dependency>
    </dependencies>

    2.application.yml配置kafka、邮箱、数据库参数

    本文采用的yml配置方式,若使用的properties的可使用https://www.toyaml.com/index.html这个在线工具转换;

    邮箱配置,若使用的163、或者qq等,自己百度怎么申请授权码(一大堆教程);

    数据库采用的Hikari连接池,号称java平台最快的;

    # 配置Kafka集群IP地址,多个IP以逗号隔开:
    spring:
    kafka:
    bootstrap-servers: 你的kafkaIP:端口号
    producer:
    retries: 2 #发送失败后的重复发送次数
    key-serializer: org.apache.kafka.common.serialization.StringSerializer #key序列化方式
    value-serializer: org.apache.kafka.common.serialization.StringSerializer #value序列化方式
    compression-type: gzip #压缩格式
    batch-size: 16384 #批量发送的消息数量
    buffer-memory: 33554432 #32M的批处理缓冲区
    consumer:
    auto-offset-reset: earliest #最早未被消费的offset
    enable-auto-commit: false #是否开启自动提交
    #auto-commit-interval: 1000 #自动提交的时间间隔
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key解码方式
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value解码方式
    group-id: kafka.consumer.group.id.1
    max-poll-records: 50
    properties:
    session-timeout-ms: 20000 #连接超时时间
    max-poll-interval-ms: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    max-partition-fetch-bytes: 15728640 #设置拉取数据的大小 15M
    client-id: kafkacli
    listener:
    ack-mode: manual_immediate

    datasource:
    type: com.zaxxer.hikari.HikariDataSource
    driver-class-name: com.mysql.jdbc.Driver
    url: * #自己的
    username: * #账号
    password: * #密码
    hikari:
    minimum-idle: 5
    # 空闲连接存活最大时间,默认600000(10分钟)
    idle-timeout: 180000
    # 连接池最大连接数,默认是10
    maximum-pool-size: 10
    # 此属性控制从池返回的连接的默认自动提交行为,默认值:true
    auto-commit: true
    # 连接池名称
    pool-name: MyHikariCP
    # 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
    max-lifetime: 1800000
    # 数据库连接超时时间,默认30秒,即30000
    connection-timeout: 30000
    connection-test-query: SELECT 1

    # 邮箱服务器配置,以163邮箱为例
    mail:
    host: smtp.163.com #邮箱服务器地址
    port: 25 #端口
    username: * #用户名
    password: * #授权密码
    default-encoding: UTF-8
    properties:
    from: * #用户名
    mail:
    smtp:
    connectiontimeout: 5000
    timeout: 3000
    writetimeout: 5000

    # 邮件模板
    thymeleaf:
    cache: false
    prefix: classpath:/views/
    # 邮件附件
    servlet:
    multipart:
    max-file-size: 10MB #限制单个文件大小
    max-request-size: 50MB #限制请求总量


    logging:
    level:
    com.example: debug
    pattern:
    # console: %d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n
    # file: %d{yyyy/MM/dd-HH:mm} [%thread] %-5level %logger- %msg%n
    path: C:\log

    # 邮件失败重试次数
    com:
    example:
    mail:
    sendNumber: 3 #邮件发送失败重试次数
    threadKillTime: 60 #线程超时杀死

    mybatis:
    type-aliases-package: com.example.mail.entity
    configuration:
    map-underscore-to-camel-case: true
    mapper-locations: mappers/*Mapper.xml


    # 异步线程配置,配置核心线程数
    async:
    executor:
    thread:
    core_pool_size: 15 #核心线程数量,线程池创建时候初始化的线程数
    max_pool_size: 15 #最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
    queue_capacity: 99999 #缓冲队列,用来缓冲执行任务的队列
    keep_alive_seconds: 60 #当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
    await_termination_seconds: 30 #设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
    name:
    prefix: async-service-
    prefixson: async-service-son

    3.邮件消息格式

    {
    "mailUid": "邮件唯一标识",
    "fromName": "发件人别名",
    "fromMail": "发件人地址",
    "toMail": "收件人地址(多个邮箱则用逗号","隔开)",
    "ccMail": "抄送人地址(多个邮箱则用逗号","隔开)",
    "bccMail": "密送人地址(多个邮箱则用逗号","隔开)",
    "planSendTime": "计划邮件时间",
    "mailSubject": "邮件主题",
    "mailContent": "邮件正文",
    "sendNum": 发送次数,
    "serverFlag": "邮件服务器标识(多邮件服务器用)"
    }

    4.kafka生产者、消费者、mail发送类等主要方法代码

    //生产者
    public void sendToKafkaStandardMessageAsync(MailDTO mailDTO) {

    producer = new KafkaProducer<String, Object>(kafkaConfig.producerConfigs());

    producer.send(new ProducerRecord<String, Object>(topicName, gson.toJson(mailDTO)), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (metadata != null) {
    log.info("生产消息成功{},发送次数{},checksum:{},offset:{},partition:{},topic:{}", mailDTO.getMailUid(),mailDTO.getSendNum(),metadata.checksum(), metadata.offset(), metadata.partition(), metadata.topic());
    }
    if (exception != null) {
    log.info("生产消息失败{}", exception.getMessage());
    }
    }
    });
    producer.close();
    }
    //消费者
    /**
    * 监听一个Kafka 主题
    **/
    @KafkaListener(topics = MQConstants.Topic.ITEM_EXCHANGE_NAME)
    public void receiveMessageFromKafka(ConsumerRecord<?, ?> record, Acknowledgment ack) {
    log.info("监听消息,MailUid:{}", gson.fromJson(String.valueOf(record.value()), MailDTO.class).getMailUid());

    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
    sendMessageService.sendMessages(gson.fromJson(String.valueOf(record.value()), MailDTO.class));
    }
    ack.acknowledge();//手动提交偏移量
    }
    //构建复杂邮件信息类
    public void sendMimeMail(MailVo mailVo) {
    try {
    MimeMessageHelper messageHelper = new MimeMessageHelper(mailSender.createMimeMessage(), true);//true表示支持复杂类型
    mailVo.setFrom("这里读取配置文件中配的from地址");//邮件发信人从配置项读取
    messageHelper.setFrom(mailVo.getFrom());//邮件发信人
    messageHelper.setSentDate(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2019-07-18 12:45:48"));
    messageHelper.setTo(mailVo.getTo().split(","));//邮件收信人
    messageHelper.setSubject(mailVo.getSubject());//邮件主题
    messageHelper.setText(mailVo.getText());//邮件内容
    if (!StringUtils.isEmpty(mailVo.getCc())) {//抄送
    messageHelper.setCc(mailVo.getCc().split(","));
    }
    if (!StringUtils.isEmpty(mailVo.getBcc())) {//密送
    messageHelper.setCc(mailVo.getBcc().split(","));
    }
    if (mailVo.getMultipartFiles() != null) {//添加邮件附件
    for (MultipartFile multipartFile : mailVo.getMultipartFiles()) {
    messageHelper.addAttachment(multipartFile.getOriginalFilename(), multipartFile);
    }
    }
    if (StringUtils.isEmpty((CharSequence) mailVo.getSentDate())) {//发送时间
    mailVo.setSentDate(new Date());
    messageHelper.setSentDate(mailVo.getSentDate());
    }
    mailSender.send(messageHelper.getMimeMessage());//正式发送邮件
    mailVo.setStatus("ok");
    log.info("发送邮件成功:{}->{}", mailVo.getFrom(), mailVo.getTo());
    } catch (Exception e) {
    throw new RuntimeException(e);//发送失败
    }
    }

     5.思路解析,画图吧,口述太费劲

     

    完整代码: https://github.com/wwt729/mail.git

     

  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: