您的位置:首页 > 其它

分布式消息中间件(2):Kafka系统学习—集群搭建与使用、副本机制和实时日志统计流程

2021-04-26 16:50 211 查看

前言

这个系列主要是讲解关于分布式消息中间件的一些心得

关于分布式系统、中间件是什么、消息中间件能做什么、分布式消息中间件长什么样诸如此类基础概念在上一篇文章——
分布式消息中间件(1):Rabbitmq入门到高可用实战!都已经讲过,这里就不赘述了,感兴趣的朋友可以自己去看一下。

这是本系列的第二篇,准备写kafka,

Kakfa 广泛应用于国内外大厂,例如 BAT、字节跳动、美团、Netflix、Airbnb、Twitter 等等。其重要性不言而喻。

kafka与其他三个主流中间件相比,优势有两个:

  • 性能高,每秒百万级别;
  • 分布式,高可用,水平扩展。

今天我们通过这篇文章深入了解一下 Kafka的工作原理。由于篇幅所限,肯定不会完全写到,只能挑比较重要的几个点来跟大家分析一下,面试题的话也不会在这篇文章里解析了,单独整理了一份kafka学习笔记PDF以及定经典高频面试题解析,需要的朋友可以自行领取

好了,话不多说,坐稳扶好,发车喽!

一、Kafka集群搭建与使用

kafka官网图

有中文官网,可以详细看看。

地址:http://kafka.apachecn.org/intro.html

1、软件下载

1.1 kakfa下载

地址:http://kafka.apache.org/downloads

1.2 zookeeper下载

(1)因为kafka要依赖于zookeeper做调度,kafka中实际自带的有kafka,但是一般建议使用独立的zookeeper,方便后续升级及公用。

(2)下载地址:

http://zookeeper.apache.org/

1.3 下载说明

文件都不大,zk是9m多,kafka是50多兆

2、 kafka单机部署及集群部署

说明:北游在本地弄了三台虚拟机,ip分别为:

192.168.85.158
192.168.85.168
192.168.85.178

2.1 单机部署

(1)上传jar包,就不再新建用户了,直接在root账户下执行,将kafka和zookeeper的tar包上传到/root/tools目录下。

(2)解压

[root@ruanjianlaowang158 tools]# tar -zxvf kafka_2.12-2.4.1.tgz
[root@ruanjianlaowang158 tools]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz

(3)配置zookeeper及启动

[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# cd /root/tools/apache-zookeeper-3.5.7-bin
#北游,首先创建个空文件夹,在接下来的配置文件中配置
[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# mkdir data
[root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf
[root@ruanjianlaowang158 conf]# cp zoo_sample.cfg  zoo.cfg
[root@ruanjianlaowang158 conf]# vi  zoo.cfg
#单机只改一个值,保存退出。
#dataDir=/tmp/zookeeper
dataDir=/root/tools/apache-zookeeper-3.5.7-bin/data

#启动zookeeper
[root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin
[root@ruanjianlaowang158 bin]# ./zkServer.sh  start

(4)配置kafka及启动

[root@ruanjianlaowang158 kafka_2.12-2.4.1]# cd /root/tools/kafka_2.12-2.4.1

#北游,新建个空文件夹
[root@ruanjianlaowang158 kafka_2.12-2.4.1]# mkdir data

#北游,更改配置文件
[root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config
[root@ruanjianlaowang158 config]# vi server.properties

#需要改3个值
#log.dirs=/tmp/kafka-logs
log.dirs=/root/tools/kafka_2.12-2.4.1/data
#listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.85.158:9092
#zookeeper.connect=localhost:2181
zookeeper.connect=192.168.85.158:2181

#启动kafka
[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin
[root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &

启动完毕,单机验证就不验证了,直接在集群中进行验证。

2.2 集群部署

(1)集群方式,首先把上面的单机模式,再在192.168.85.168和192.168.85.178服务器上先解压配置一遍。

(2)zookeeper是还是更改zoo.cfg

158,168,178三台服务器一样:

[root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf

[root@ruanjianlaowang158 conf]# vi zoo.cfg
#其他不变,最后面新加,三行,三台服务器配置一样,北游
server.1=192.168.85.158:2888:3888
server.2=192.168.85.168:2888:3888
server.3=192.168.85.178:2888:3888

158服务器执行:
echo "1" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid
168服务器执行:
echo "2" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid
178服务器执行:
echo "3" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid

(3)kafka集群配置

[root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config
[root@ruanjianlaowang158 config]# vi server.properties
#broker.id 三台服务器不一样,158服务器设置为1,168服务器设置为2,178服务器设置为3

broker.id=1
#三个服务器配置一样
zookeeper.connect=192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181

Kafka常用Broker配置说明:

配置项 默认值/示例值 说明
broker.id 0 Broker唯一标识
listeners PLAINTEXT://192.168.85.158:9092 监听信息,PLAINTEXT表示明文传输
log.dirs /root/tools/apache-zookeeper-3.5.7-bin/data kafka数据存放地址,可以填写多个。用","间隔
message.max.bytes message.max.bytes 单个消息长度限制,单位是字节
num.partitions 1 默认分区数
log.flush.interval.messages Long.MaxValue 在数据被写入到硬盘和消费者可用前最大累积的消息的数量
log.flush.interval.ms Long.MaxValue 在数据被写入到硬盘前的最大时间
log.flush.scheduler.interval.ms Long.MaxValue 检查数据是否要写入到硬盘的时间间隔。
log.retention.hours 24 控制一个log保留时间,单位:小时
zookeeper.connect 192.168.85.158:2181,

192.168.85.168:2181,
192.168.85.178:2181 | ZooKeeper服务器地址,多台用","间隔 |

(4)集群启动

启动方式跟单机一样:

#启动zookeeper
[root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin
[root@ruanjianlaowang158 bin]# ./zkServer.sh  start

#启动kafka
[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin
[root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &

(5)注意点

集群启动的时候,单机那台服务器(158)可能会报:Kafka:Configured broker.id 2 doesn't match stored broker.id 0 in meta.properties.
方案:在158服务器data中有个文件:meta.properties,文件中的broker.id也需要修改成与server.properties中的broker.id一样,所以造成了这个问题。

(6)创建个topic,后面springboot项目测试使用。

[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin
[root@ruanjianlaowang158 bin]# ./kafka-topics.sh --create --zookeeper 192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181 --replication-factor 3 --partitions 5 --topic aaaa

3、结合springboot项目

3.1 pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.itany</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

说明:

主要就两个gav,一个是spring-boot-starter-web,启动web服务使用;一个是spring-kafka,这个是springboot集成额kafka核心包。

3.2 application.yml

spring:
kafka:
# 北游,kafka集群服务器地址
bootstrap-servers: 192.168.85.158:9092,192.168.85.168:9092,192.168.85.178:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.3 producer(消息生产者)

@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate template;
//北游,topic使用上测试创建的aaaa
@RequestMapping("/sendMsg")
public String sendMsg(String topic, String message){
template.send(topic,message);
return "success";
}
}

3.4 consumer(消费者)

@Component
public class KafkaConsumer {
//北游,这里是监控aaaa这个topic,直接打印到idea中,北游
@KafkaListener(topics = {"aaaa"})
public void listen(ConsumerRecord record){
System.out.println(record.topic()+":"+record.value());
}
}

3.5 验证结果

(1)浏览器上输入

http://localhost:8080/sendMsg?topic=aaaa&message=bbbb

(2)北游的idea控制台打印信息

二、Kafka副本机制

1、什么是副本机制:

通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝

2、副本机制的好处:

2.1 提供数据冗余

系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性

2.2 提供高伸缩性

支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量

2.3 改善数据局部性

允许将数据放入与用户地理位置相近的地方,从而降低系统延时。

3、kafka的副本

(1)、 本质就是一个只能追加写消息的日志文件

(2)、同一个分区下的所有副本保存有相同的消息序列

(3)、副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用(Kafka 是有若干主题概,每个主题可进一步划分成若干个分区。每个分区配置有若干个副本)

如下:有 3 台 Broker 的 Kafka 集群上的副本分布情况

4、kafka如何保证同一个分区下的所有副本保存有相同的消息序列:

基于领导者(Leader-based)的副本机制

工作原理如图:

(1)、Kafka 中分成两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。

(2)、Kafka 中,追随者副本是不对外提供服务的。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。(因此目前kafka只能享受到副本机制带来的第 1 个好处,也就是提供数据冗余实现高可用性和高持久性)

(3)、领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。

5、kafka追随者副本到底在什么条件下才算与 Leader 同步

Kafka 引入了 In-sync Replicas,也就是所谓的 ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的

6、kafka In-sync Replicas(ISR)

(1)、ISR不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本

(2)、通过Broker 端replica.lag.time.max.ms 参数(Follower 副本能够落后 Leader 副本的最长时间间隔)值来控制哪个追随者副本与 Leader 同步?只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

(3)、ISR 是一个动态调整的集合,而非静态不变的。

某个追随者副本从领导者副本中拉取数据的过程持续慢于 Leader 副本的消息写入速度,那么在

replica.lag.time.max.ms
时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。

倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。

(4)、ISR集合为空则leader副本也挂了,这个分区就不可用了,producer也无法向这个分区发送任何消息了。(反之leader副本挂了可以从ISR集合中选举leader副本)

7、kafka leader副本所在broker挂了,leader副本如何选举

(1)、ISR不为空,从ISR中选举

(2)、ISR为空,Kafka也可以从不在 ISR 中的存活副本中选举,这个过程称为Unclean 领导者选举,通过Broker 端参数

unclean.leader.election.enable
控制是否允许 Unclean 领导者选举。

开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。

强烈建议不要开启unclean leader election,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。

ps1:leader副本的选举也可以理解为分区leader的选举

ps2:broker的leader选举与分区leader的选举不同,

Kafka的Leader选举是通过在zookeeper上创建/controller临时节点来实现leader选举,并在该节点中写入当前broker的信息 

{“version”:1,”brokerid”:1,”timestamp”:”1512018424988”} 

利用Zookeeper的强一致性特性,一个节点只能被一个客户端创建成功,创建成功的broker即为leader,即先到先得原则,leader也就是集群中的controller,负责集群中所有大小事务。 

当leader和zookeeper失去连接时,临时节点会删除,而其他broker会监听该节点的变化,当节点删除时,其他broker会收到事件通知,重新发起leader选举

再给你们留个小问题:如果允许 Follower 副本对外提供读服务,你觉得应该如何避免或缓解因 Follower 副本与 Leader 副本不同步而导致的数据不一致的情形?

三、实时日志统计流程

1、项目流程

  在整合这套方案的时候,项目组也是经过一番讨论,在讨论中,观点很多,有人认为直接使用Storm进行实时处理,去掉Kafka环节;也有认为直接使用Kafka的API去消费,去掉Storm的消费环节等等,但是最终组内还是一致决定使用这套方案,原因有如下几点:

  • 业务模块化
  • 功能组件化

  我们认为,Kafka在整个环节中充当的职责应该单一,这项目的整个环节她就是一个中间件,下面用一个图来说明这个原因,如下图所示:

  整个项目流程如上图所示,这样划分使得各个业务模块化,功能更加的清晰明了。

  • Data Collection

  负责从各个节点上实时收集用户上报的日志数据,我们选用的是Apache的Flume NG来实现。

  • Data Access

  由于收集的数据的速度和数据处理的速度不一定是一致的,因此,这里添加了一个中间件来做处理,所使用的是Apache的Kafka,关于Kafka集群部署。另外,有一部分数据是流向HDFS分布式文件系统了的,方便于为离线统计业务提供数据源。

  • Stream Computing

  在收集到数据后,我们需要对这些数据做实时处理,所选用的是Apache的Storm。关于Storm的集群搭建部署博客后面补上,较为简单。

  • Data Output

  在使用Storm对数据做处理后,我们需要将处理后的结果做持久化,由于对响应速度要求较高,这里采用Redis+MySQL来做持久化。整个项目的流程架构图,如下图所示:

2、Flume

  Flume是一个分布式的、高可用的海量日志收集、聚合和传输日志收集系统,支持在日志系统中定制各类数据发送方(如:Kafka,HDFS等),便于收集数据。Flume提供了丰富的日志源收集类型,有:Console、RPC、Text、Tail、Syslog、Exec等数据源的收集,在我们的日志系统中目前我们所使用的是spooldir方式进行日志文件采集,配置内容信息如下所示:

producer.sources.s.type = spooldir
producer.sources.s.spoolDir = /home/hadoop/dir/logdfs

  当然,Flume的数据发送方类型也是多种类型的,有:Console、Text、HDFS、RPC等,这里我们系统所使用的是Kafka中间件来接收,配置内容如下所示:

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test

3、Kafka

  Kafka是一种提供高吞吐量的分布式发布订阅消息系统,她的特性如下所示:

  • 通过磁盘数据结构提供消息的持久化,这种结构对于即使数据达到TB+级别的消息,存储也能够保持长时间的稳定。
  • 搞吞吐特性使得Kafka即使使用普通的机器硬件,也可以支持每秒数10W的消息。
  • 能够通过Kafka Cluster和Consumer Cluster来Partition消息。

  Kafka的目的是提供一个发布订阅解决方案,他可以处理Consumer网站中的所有流动数据,在网页浏览,搜索以及用户的一些行为,这些动作是较为关键的因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于Hadoop这样的日志数据和离线计算系统,这样的方案是一个解决实时处理较好的一种方案。

  关于Kafka集群的搭建部署和使用,上面已经写了,不会的朋友翻上去再看一下,这里就不赘述了。

4、Storm

  Twitter将Storm开源了,这是一个分布式的、容错的实时计算系统,已被贡献到Apache基金会,下载地址如下所示:

http://storm.apache.org/downloads.html

  Storm的主要特点如下:

  • 简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
  • 可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
  • 容错性。Storm会管理工作进程和节点的故障。
  • 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
  • 可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
  • 快速。系统的设计保证了消息能得到快速的处理,使用ØMQ作为其底层消息队列。
  • 本地模式。Storm有一个本地模式,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。

Storm集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节 点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。

Nimbus和Supervisor都能快速失败,而且是无 状态的,这样一来它们就变得十分健壮,两者的协调工作是由Apache的ZooKeeper来完成的。

Storm的术语包括

Stream
Spout
Bolt
Task
Worker
Stream Grouping
Topology

  • Stream是被处理的数据。
  • Spout是数据源。
  • Bolt处理数据。
  • Task是运行于Spout或Bolt中的 线程。
  • Worker是运行这些线程的进程。
  • Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为 Direct)。
  • Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。在Storm Concepts页面里对这些术语有更详细的描述。

  关于Storm集群的搭建部署,博客在下一篇中更新,到时候会将更新地址附在这里,这里就先不对Storm集群的搭建部署做过多的赘述了。

5、总结

Kafka 日志消息保存时间总结
Kafka 作为一个高吞吐的消息中间件和传统的消息中间件一个很大的不同点就在于它的日志实际上是以日志的方式默认保存在/kafka-logs文件夹中的。虽然默认有7天清楚的机制,但是在数据量大,而磁盘容量不足的情况下,经常出现无法写入的情况。如何调整Kafka的一些默认参数就显得比较关键了。这里笔者整理了一些常见的配置参数供大家参考:

分段策略属性
属性名
含义 默认值
log.roll.{hours,ms} 日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment 168(7day)
log.segment.bytes 每个segment的最大容量。到达指定容量时,将强制生成一个新的segment 1G(-1为不限制)
log.retention.check.interval.ms 日志片段文件检查的周期时间 60000

日志刷新策略

Kafka的日志实际上是开始是在缓存中的,然后根据策略定期一批一批写入到日志文件中去,以提高吞吐率。

属性名 含义 默认值
log.flush.interval.messages 消息达到多少条时将数据写入到日志文件 10000
log.flush.interval.ms 当达到该时间时,强制执行一次flush null
log.flush.scheduler.interval.ms 周期性检查,是否需要将信息flush 很大的值

日志保存清理策略

属性名 含义 默认值
log.cleanup.polict 日志清理保存的策略只有delete和compact两种 delete
log.retention.hours 日志保存的时间,可以选择hours,minutes和ms 168(7day)
log.retention.bytes 删除前日志文件允许保存的最大值 -1
log.segment.delete.delay.ms 日志文件被真正删除前的保留时间 60000
log.cleanup.interval.mins 每隔一段时间多久调用一次清理的步骤 10
log.retention.check.interval.ms 周期性检查是否有日志符合删除的条件(新版本使用 ) 300000

这里特别说明一下,日志的真正清楚时间。当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。

但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除。

文章写到这里差不多了,比我预计要写得短一些,因为还有一些东西要写出来难免长篇大论,篇幅不允许,想更透彻的掌握kafka的同学可以领取我整理的完整版kafka学习笔记,最近要准备面试的同学可以看看我这份kafka高频面试题整理

后面我会把另外两个中间件也分别写文章分析,可以给我点个关注第一时间接到通知

然后,可以点个赞吗兄弟们!

end

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐