基于curator的延迟队列
2020-03-31 19:48
1541 查看
这里不介绍关于curator的用法及优劣,旨在探究curator对于延迟队列的使用原理
怎么使用
<!--dependency--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency>
public class Processor { private final static CuratorFramework client; private final static DistributedDelayQueue<String> queue; static{ ZookeeperConfig config = ZookeeperConfig.getConfig(); // create client client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(), new ExponentialBackoffRetry(3000, 2)); // build queue queue = QueueBuilder.builder(client, new AutoSubmitConsumer(), new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath()) .buildDelayQueue(); // 开启执行计划 enable(); } /** * 生产数据 * * @param id * @param endTime * @throws Exception */ public void producer(String id, Date endTime) throws Exception { queue.put(id, endTime.getTime()); } private static void enable(){ try { client.start(); queue.start(); } catch (Exception e) { logger.error("enable queue fail, exception:{}", e); } } } // Serializer class AutoSubmitQueueSerializer implements QueueSerializer<String> { @Override public byte[] serialize(String s) { return s.getBytes("utf-8"); } @Override public String deserialize(byte[] bytes) { return new String(bytes); } } // consumer AutoSubmitConsumer implements QueueConsumer<String> { @Override public void consumeMessage(String id) { logger.info("consumeMessage, :{}", id); // service processor. logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id); } @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { } }
是临时节点还是持久化节点,如果基于内存的话客户端或者服务端挂了以后就会存在数据丢失的问题? 是否会重新排序,zk是按照请求的时间先后顺序写入的,那么curator是怎么监听到期时间的呢?
猜想
- 是否持久化
- 是否会在每次请求的时候拿到服务端所有的节点数据进行排序后存入到服务端
验证
-
针对第一点,我们关闭
zookeeper
服务端和客户端后重新启动后之前的节点还存在所以是持久化节点 -
通过客户端工具连接
zookeeper
发现并不会每次请求的时候都会重新排序,也就是说可能在client端进行处理的
以下是在客户端工具上截取的一部分信息,key是由三部分组成的,第一部分固定的queue- , 第二部分暂不确定,第三部分是节点的序号
源码求证
// org.apache.curator.framework.recipes.queue.DistributedQueue#start // 部分片段 client.create().creatingParentContainersIfNeeded().forPath(queuePath); if ( !isProducerOnly ) { service.submit ( new Callable<Object>() { @Override public Object call() { runLoop(); // step1 return null; } } ); } // org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop // step1中的代码片段 while ( state.get() == State.STARTED ) { try { ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion); currentVersion = data.version; // 诸如: //queue-|2E1D86A3BB6|0000000019 //queue-|1712F752AA0|0000000036 //queue-|1712F76FF60|0000000035 // 拿到所有的子节点 List<String> children = Lists.newArrayList(data.children); // 根据过期时间排序 // step6 sortChildren(children); // 排序后 //queue-|1712F752AA0|0000000036 //queue-|1712F76FF60|0000000035 //queue-|2E1D86A3BB6|0000000019 if ( children.size() > 0 ) { //获取到期时间 maxWaitMs = getDelay(children.get(0)); if ( maxWaitMs > 0 ) continue; } else continue; // 死循环不断轮询是否有满足条件的节点; // 只要有满足条件的节点就将整个排序后的集合往下传递 processChildren(children, currentVersion); // step2 } } // org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren // step2对应的代码片段: private void processChildren(List<String> children, long currentVersion) { final Semaphore processedLatch = new Semaphore(0); final boolean isUsingLockSafety = (lockPath != null); int min = minItemsBeforeRefresh; for ( final String itemNode : children ) { if ( Thread.currentThread().isInterrupted() ) { processedLatch.release(children.size()); break; } if ( !itemNode.startsWith(QUEUE_ITEM_NAME) ) { processedLatch.release(); continue; } if ( min-- <= 0 ) { if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) ) { processedLatch.release(children.size()); break; } } // step3 if ( getDelay(itemNode) > 0 ) { processedLatch.release(); continue; } //这里使用了线程池,为了保证每一个节点都执行完毕后才返回方法所以使用了信号灯 executor.execute ( new Runnable() { @Override public void run() { try { //是否采用了分布式锁,因为我们初始化的时候并未使用所以没有用到这里的安全锁,实际上是进入到了else中 if ( isUsingLockSafety ) { processWithLockSafety(itemNode, ProcessType.NORMAL); } else { // 看这里 step4 processNormally(itemNode, ProcessType.NORMAL); } }finally { processedLatch.release(); } } } ); } processedLatch.acquire(children.size()); } // org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String) // 对应step3处的代码片段 protected long getDelay(String itemNode) { return getDelay(itemNode, System.currentTimeMillis()); } private long getDelay(String itemNode, long sortTime) { // 会从key上获取时间戳 // step5 long epoch = getEpoch(itemNode); return epoch - sortTime; // 计算过期时间 } // 对应step5处的代码 private static long getEpoch(String itemNode) { // itemNode -> queue-|时间戳|序号 int index2 = itemNode.lastIndexOf(SEPARATOR); int index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1; if ( (index1 > 0) && (index2 > (index1 + 1)) ) { try { String epochStr = itemNode.substring(index1 + 1, index2); return Long.parseLong(epochStr, 16); // 从这里可以知道queue-|这里是16进制的时间戳了|序号| 可能是出于key长度的考量吧(更节省内存),用10进制的时间戳会长很多 } } return 0; } // org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren // 会根据延时时间排序 // step6处的代码片段 protected void sortChildren(List<String> children) { final long sortTime = System.currentTimeMillis(); Collections.sort ( children, new Comparator<String>() { @Override public int compare(String o1, String o2) { long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime); return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0); } } ); } // 对应step4处的代码片段 private boolean processNormally(String itemNode, ProcessType type) throws Exception { try { String itemPath = ZKPaths.makePath(queuePath, itemNode); Stat stat = new Stat(); byte[] bytes = null; if ( type == ProcessType.NORMAL ) { // 获取key对应的value bytes = client.getData().storingStatIn(stat).forPath(itemPath); } if ( client.getState() == CuratorFrameworkState.STARTED ) { // 移除节点 client.delete().withVersion(stat.getVersion()).forPath(itemPath); } if ( type == ProcessType.NORMAL ) { //step7 processMessageBytes(itemNode, bytes); } return true; } return false; } //对应step7处代码,会回调我们的业务代码 private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception { ProcessMessageBytesCode resultCode = ProcessMessageBytesCode.NORMAL; MultiItem<T> items; try { // 根据我们定义的序列化器序列化 items = ItemSerializer.deserialize(bytes, serializer); } for(;;) { // 省略一部分代码 try { consumer.consumeMessage(item); // 这里就会回调到我们的业务代码 } } return resultCode; }
总结
- org.apache.curator.framework.recipes.queue.DistributedQueue#internalCreateNode这个方法也证实了确实是持久化且有序的节点;
- 如果过期时间太长而数据生产的过于频繁的话,那么势必会造成数据的积压对于性能和内存都是很大的考验;
- 而且是客户端不断的循环获取所有的节点、排序、再处理,由此我们也证明了前面猜想是排序后在服务端重新添加所有节点每次监听第一个节点变化的想法看来是错误的;
相关文章推荐
- 基于Redis实现的延迟消息队列
- SpringBoot | 第三十八章:基于RabbitMQ实现消息延迟队列方案
- 基于redis的延迟消息队列设计
- 基于 rabbitmq 实现延迟队列
- 基于redis的延迟消息队列设计
- 灵感来袭,基于Redis的分布式延迟队列
- 基于Redis实现延迟队列
- 基于redis的延迟消息队列设计
- 基于2PC和延迟更新完成分布式消息队列多条事务Golang版本
- 基于redis的延迟消息队列设计
- 基于Redis实现延迟队列
- 基于redis的延迟消息队列设计
- 基于 rabbitmq 实现延迟队列
- 基于redis的延迟消息队列设计
- 基于Redis实现的延迟队列
- 如何基于RabbitMQ实现优先级队列
- 数据结构学习记录(二)-----栈与队列(基于动态数组的实现)
- 分布式延迟消息队列实现分析与设计
- rabbitmq 延迟队列的实现(PHP)http://blog.yuhai.win
- 【转】基于ZooKeeper的分布式锁和队列