Kafka系列(21)java消费者是如何管理Tcp连接的
Kafka的世界中,无论是ServerSocket,还是SocketChannel,它们实现的都是TCP协议
。Kafka的网络传输是基于Tcp协议的,而不是基于UDP协议。
何时创建Tcp连接?
消费者端主要的程序入口是KafkaConsumer类。和生产者不同的是,构建KafkaConsumer实例时是不会创建任何
Tcp连接的,也就是说,当你执行完new KafkaConsumer语句时,你会发现,没有Socket连接被创建出来。
这一点和Java生产者是由区别的,主要原因就是生产者入口类KafkaProducer在构建实例的时候,会在后台默默启动一个Sender
线程,这个Sender线程负责Socket连接的创建。
Tcp连接是在调用KafkaConsumer.poll方法时创建的。再细粒度地说,在 poll方法内部有3个时机可以创建Tcp连接。
1.发起FindCoordinator请求时
消费者端有个组件叫协调者,驻留在Broker端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。
当消费者程序首次启动调用poll方法时,它需要向Kafka集群发送一个名为FindCoordinator请求。希望Kafka集群告诉它哪个Broker是管理它的协调者。
消费者程序会向集群中当前负载最小的那台Broker发送请求。负载如何评估?就是看消费者连接的所有Broker中,
谁的待发送请求最少。这种评估显然是消费者端的单向评估,并非是站在全局角度。
2连接协调者时。
Broker处理完上一步发送的FindCoordinator请求之后,会返还的响应结果(Response),消费者知晓了真正的协调者,会创建连向该Broker的Scoket连接。只有成功连入协调者,协调者才能开启正常得组协调操作,比如加入组,等待组分配方案,心跳请求处理,位移获取,位移提交。
3消费数据时
消费者会为每个要消费的分区创建与该分区领导者副本所在的Broker连接的Tcp
假设消费者要消费5个分区的数据,该5个分区各自的领导者副本分布在4台Broker上,那么该消费者在消费时会创建与这4台Broker的socket连接
消费者程序会创建3类Tcp连接
1确定协调者和获取集群元数据
2连接协调者,令其执行组成员管理操作
3执行实际的消息获取。
何时关闭Tcp连接
手动关闭
调用 KafkaConsumer.close()方法 执行kill 命令 kill -2/ kill -9
自动关闭
消费者端参数connection.max.idle.ms 默认是9分钟,如果某个Socket连接上持续9分钟都没有任何请求,消费者会强行杀掉这个Socket连接。
和生产者有些不同,如果编写消费者程序,使用循环的方式来调用poll方法消费消息,上面提到的所有请求都会被定期发送到
Broker,因此这些Socket连接上总是能保证有请求在发送,从而也就实现了“长连接”的效果。
当第三类Tcp连接成功创建后,消费者程序就会废弃第一类tcp连接,之后在定期请求元数据时,它会改为使用第三类Tcp
连接。最终发现,第一类tcp连接会在后台被默默关闭掉,对于运行了一段时间的程序来说,只会有后面两类Tcp连接存在。
- java程序员菜鸟进阶(七)《HTTP权威指南》之HTTP连接管理及对TCP性能的考虑
- TCP系列02—连接管理—1、三次握手与四次挥手
- java程序员菜鸟进阶(七) HTTP权威指南 之HTTP连接管理及对TCP性能的考虑
- 到主机 的 TCP/IP 连接失败。 java.net.ConnectException: Connection refused: connect
- webservice系列教学(4)-如何调用webservice(pb,java)
- JN系列(3):如何得到JavaVM,JNIEnv接口
- 如何连接SQL Server数据库(Java版)
- 如何连接oracle,mysql, SQL Server数据库(Java版)
- 如何在OPhone 1.0和1.5中打开和管理数据连接
- 服务器端如何管理很多个客户端的socket连接?
- JAVA用TCP 实现反向连接屏幕监视
- 如何连接SQL Server数据库(Java版)
- 如何连接SQL Server数据库(Java版)
- TCP连接断线后,如何检测?
- 一步一步学Silverlight 2系列(21):如何在Silverlight中调用JavaScript (转)
- 在Domino中使用Java系列1--如何在Domino上导入jar包
- Java连接sqlserver2005到主机的TCP/IP 连接失败问题
- TCP长连接服务的Java实现
- java如何连接access数据库
- [MySQL FAQ]系列 -- 如何设定连接服务器时的字符集