微服务中的Kafka与Micronaut
2019-11-27 17:08
1606 查看
今天,我们将通过
Apache Kafka主题构建一些彼此异步通信的微服务。我们使用
Micronaut框架,它为与
Kafka集成提供专门的库。让我们简要介绍一下示例系统的体系结构。我们有四个微型服务:
订单服务,
行程服务,
司机服务和
乘客服务。这些应用程序的实现非常简单。它们都有内存存储,并连接到同一个
Kafka实例。
我们系统的主要目标是为客户安排行程。订单服务应用程序还充当网关。它接收来自客户的请求,保存历史记录并将事件发送到
orders主题。所有其他微服务都在监听
orders这个主题,并处理
order-service发送的订单。每个微服务都有自己的专用主题,其中发送包含更改信息的事件。此类事件由其他一些微服务接收。架构如下图所示。
在阅读本文之前,有必要熟悉一下
Micronaut框架。您可以阅读之前的一篇文章,该文章描述了通过
REST API构建微服务通信的过程:使用microaut框架构建微服务的快速指南。
1 运行Kafka
要在本地机器上运行
Apache Kafka,我们可以使用它的Docker映像。最新的镜像是由https://hub.docker.com/u/wurstmeister共享的。在启动
Kafka容器之前,我们必须启动
kafka所用使用的
ZooKeeper服务器。如果在
Windows上运行
Docker,其虚拟机的默认地址是
192.168.99.100。它还必须设置为
Kafka容器的环境。
Zookeeper和
Kafka容器都将在同一个网络中启动。在docker中运行Zookeeper以
zookeeper的名称提供服务,并在暴露
2181端口。
Kafka容器需要在环境变量使用
KAFKA_ZOOKEEPER_CONNECT的地址。
$ docker network create kafka $ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper $ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka
2 添加micronaut-kafka依赖
使用
Kafka构建的
microaut应用程序可以在HTTP服务器存在的情况下启动,也可以在不存在HTTP服务器的情况下启动。要启用
Micronaut Kafka,需要添加
micronaut-kafka库到依赖项。如果您想暴露
HTTP API,您还应该添加
micronaut-http-server-netty:
<dependency> <groupId>io.micronaut.configuration</groupId> <artifactId>micronaut-kafka</artifactId> </dependency> <dependency> <groupId>io.micronaut</groupId> <artifactId>micronaut-http-server-netty</artifactId> </dependency>
3 构建订单微服务
订单微服务是唯一一个启动嵌入式HTTP服务器并暴露
REST API的应用程序。这就是为什么我们可以为
Kafka提供内置
Micronaut健康检查。要做到这一点,我们首先应该添加
micronaut-management依赖:
<dependency> <groupId>io.micronaut</groupId> <artifactId>micronaut-management</artifactId> </dependency>
为了方便起见,我们将通过在
application.yml中定义以下配置来启用所有管理端点并禁用它们的HTTP身份验证。
endpoints: all: enabled: true sensitive: false
现在,可以在地址栏http://localhost:8080/health下使用
health check。我们的示例应用程序还将暴露
添加新订单和
列出所有以前创建的订单的简单
REST API。下面是暴露这些端点的
Micronaut控制器实现:
@Controller("orders") public class OrderController { @Inject OrderInMemoryRepository repository; @Inject OrderClient client; @Post public Order add(@Body Order order) { order = repository.add(order); client.send(order); return order; } @Get public Set<Order> findAll() { return repository.findAll(); } }
每个微服务都使用内存存储库实现。以下是
订单微服务(Order-Service)中的存储库实现:
@Singleton public class OrderInMemoryRepository { private Set<Order> orders = new HashSet<>(); public Order add(Order order) { order.setId((long) (orders.size() + 1)); orders.add(order); return order; } public void update(Order order) { orders.remove(order); orders.add(order); } public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) { return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny(); } public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) { return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type) .max(Comparator.comparing(Order::getId)); } public Set<Order> findAll() { return orders; } }
内存存储库存储
Order对象实例。
Order对象还被发送到名为
orders的Kafka主题。下面是
Order类的实现:
public class Order { private Long id; private LocalDateTime createdAt; private OrderType type; private Long userId; private Long tripId; private float currentLocationX; private float currentLocationY; private OrderStatus status; // ... GETTERS AND SETTERS }
4 使用Kafka异步通信
现在,让我们想一个可以通过示例系统实现的用例——
添加新的行程。
我们创建了
OrderType.NEW_TRIP类型的新订单。在此之后,(1)
订单服务创建一个订单并将其发送到
orders主题。订单由三个微服务接收:
司机服务、
乘客服务和
行程服务。
(2)所有这些应用程序都处理这个新订单。
乘客服务应用程序检查乘客帐户上是否有足够的资金。如果没有,它就取消了行程,否则它什么也做不了。
司机服务正在寻找最近可用的司机,(3)
行程服务创建和存储新的行程。
司机服务和
行程服务都将事件发送到它们的主题(
drivers,
trips),其中包含相关更改的信息。
每一个事件可以被其他
microservices访问,例如,(4)
行程服务侦听来自
司机服务的事件,以便为行程分配一个新的司机
下图说明了在添加新的行程时,我们的微服务之间的通信过程。
现在,让我们继续讨论实现细节。
4.1 发送订单
首先,我们需要创建Kafka 客户端,负责向主题发送消息。我们创建的一个接口,命名为
OrderClient,为它添加
@KafkaClient并声明用于发送消息的一个或多个方法。每个方法都应该通过
@Topic注解设置目标主题名称。对于方法参数,我们可以使用三个注解
@KafkaKey、
@Body或
@Header。
@KafkaKey用于分区,这是我们的示例应用程序所需要的。在下面可用的客户端实现中,我们只使用
@Body注解。
@KafkaClient public interface OrderClient { @Topic("orders") void send(@Body Order order); }
4.2 接收订单
一旦客户端发送了一个订单,它就会被监听
orders主题的所有其他微服务接收。下面是
司机服务中的监听器实现。监听器类
OrderListener应该添加
@KafkaListener注解。我们可以声明
groupId作为一个注解参数,以防止单个应用程序的多个实例接收相同的消息。然后,我们声明用于处理传入消息的方法。与客户端方法相同,应该通过
@Topic注解设置目标主题名称,因为我们正在监听
Order对象,所以应该使用
@Body注解——与对应的客户端方法相同。
@KafkaListener(groupId = "driver") public class OrderListener { private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class); private DriverService service; public OrderListener(DriverService service) { this.service = service; } @Topic("orders") public void receive(@Body Order order) { LOGGER.info("Received: {}", order); switch (order.getType()) { case NEW_TRIP -> service.processNewTripOrder(order); } } }
4.3 发送到其他主题
现在,让我们看一下
司机服务中的
processNewTripOrder方法。
DriverService注入两个不同的
Kafka Client
bean:
OrderClient和
DriverClient。当处理新订单时,它将试图寻找与发送订单的乘客最近的司机。找到他之后,将该司机的状态更改为
UNAVAILABLE,并将带有
Driver对象的事件发送到
drivers主题。
@Singleton public class DriverService { private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class); private DriverClient client; private OrderClient orderClient; private DriverInMemoryRepository repository; public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) { this.client = client; this.orderClient = orderClient; this.repository = repository; } public void processNewTripOrder(Order order) { LOGGER.info("Processing: {}", order); Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY()); driver.ifPresent(driverLocal -> { driverLocal.setStatus(DriverStatus.UNAVAILABLE); repository.updateDriver(driverLocal); client.send(driverLocal, String.valueOf(order.getId())); LOGGER.info("Message sent: {}", driverLocal); }); } // ... }
这是
Kafka Client在
司机服务中的实现,用于向
driver主题发送消息。因为我们需要将
Driver与
Order关联起来,所以我们使用
@Header注解 的
orderId参数。没有必要把它包括到
Driver类中,将其分配给监听器端的正确行程。
@KafkaClient public interface DriverClient { @Topic("drivers") void send(@Body Driver driver, @Header("Order-Id") String orderId); }
4.4 服务间通信
由
DriverListener收到
@KafkaListener在
行程服务中声明。它监听传入到
trip主题。接收方法的参数和客户端发送方法的类似,如下所示:
@KafkaListener(groupId = "trip") public class DriverListener { private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class); private TripService service; public DriverListener(TripService service) { this.service = service; } @Topic("drivers") public void receive(@Body Driver driver, @Header("Order-Id") String orderId) { LOGGER.info("Received: driver->{}, header->{}", driver, orderId); service.processNewDriver(driver); } }
最后一步,将
orderId查询到的行程
Trip与
driverId关联,这样整个流程就结束。
@Singleton public class TripService { private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class); private TripInMemoryRepository repository; private TripClient client; public TripService(TripInMemoryRepository repository, TripClient client) { this.repository = repository; this.client = client; } public void processNewDriver(Driver driver, String orderId) { LOGGER.info("Processing: {}", driver); Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId)); trip.ifPresent(tripLocal -> { tripLocal.setDriverId(driver.getId()); repository.update(tripLocal); }); } // ... OTHER METHODS }
5 跟踪
我们可以使用Micronaut Kafka轻松地启用分布式跟踪。首先,我们需要启用和配置Micronaut跟踪。要做到这一点,首先应该添加一些依赖项:
<dependency> <groupId>io.micronaut</groupId> <artifactId>micronaut-tracing</artifactId> </dependency> <dependency> <groupId>io.zipkin.brave</groupId> <artifactId>brave-instrumentation-http</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>io.zipkin.reporter2</groupId> <artifactId>zipkin-reporter</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>io.opentracing.brave</groupId> <artifactId>brave-opentracing</artifactId> </dependency> <dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-client</artifactId> <version>0.0.16</version> <scope>runtime</scope> </dependency>
我们还需要在
application.yml配置文件中,配置Zipkin 的追踪的地址等
tracing: zipkin: enabled: true http: url: http://192.168.99.100:9411 sampler: probability: 1
在启动应用程序之前,我们必须运行
Zipkin容器:
$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin
6 总结
在本文中,您将了解通过
Apache Kafka使用异步通信构建微服务架构的过程。我已经向大家展示了
Microaut Kafka库最重要的特性,它允许您轻松地声明
Kafka主题的生产者和消费者,为您的微服务启用
健康检查和
分布式跟踪。我已经为我们的系统描述了一个简单的场景的实现,包括根据客户的请求添加一个新的行程。本示例系统的整体实现,请查看GitHub上的源代码
相关文章推荐
- Ambari引入kafka服务并进行基本的测试
- Kafka服务不可用(宕机)问题踩坑记
- SpringBoot整合消息服务(SpringBoot 整合 ActiveMQ、SpringBoot 整合 RabbitMQ、SpringBoot 整合 Kafka)
- apache kafka消息服务
- windows下kafka服务搭建
- Spring Cloud构建微服务架构(七)消息总线(续:Kafka)
- Java消息服务____ActiveMQ_RabbitMQ_Kafka介绍
- kafka消息服务的producer、broker、consumer的配置
- 使用kafka消息队列解决分布式事务(可靠消息最终一致性方案-本地消息服务)
- sparkstreaming接受kafka数据实时存入hbse并集成rest服务
- jeesz分布式架构 Dubbo、zookeeper、KafKa、redis、fastdfs、单点登录sso、springmvc+mybatis+shiro、Restful服务
- Kafka源码系列之Broker的IO服务及业务处理
- CM添加kafka服务
- kafka消息服务使用前体会
- sparkstreaming接受kafka数据实时存入hbse并集成rest服务
- jeesz分布式企业框架 javaWeb分布式架构 springmvc+mybatis+shiro dubbo zookeeper redis kafka app服务
- CM添加kafka服务
- 如何使用Docker内的kafka服务
- Spring Cloud构建微服务架构(七)消息总线(续:Kafka)
- 【推荐】微服务分布式企业框架 Springmvc+mybatis+shiro+Dubbo+ZooKeeper+Redis+KafKa