04storm 源码阅读 storm的进程间消息通信实现clojure端 生成带netty能力的线程
2015-07-12 19:11
453 查看
;; Licensed to the Apache Software Foundation (ASF) under one ;; or more contributor license agreements. See the NOTICE file ;; distributed with this work for additional information ;; regarding copyright ownership. The ASF licenses this file ;; to you under the Apache License, Version 2.0 (the ;; "License"); you may not use this file except in compliance ;; with the License. You may obtain a copy of the License at ;; ;; http://www.apache.org/licenses/LICENSE-2.0 ;; ;; Unless required by applicable law or agreed to in writing, software ;; distributed under the License is distributed on an "AS IS" BASIS, ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ;; See the License for the specific language governing permissions and ;; limitations under the License. ;;主要负责加载持有netty功能的线程,对外暴露两个函数,mk-local-context,获取netty的Context对象。launch-receive-thread!,初始化持有netty能力的处理线程 (ns backtype.storm.messaging.loader (:use [backtype.storm util log]) (:import [java.util ArrayList Iterator]) (:import [backtype.storm.messaging IContext IConnection TaskMessage]) (:import [backtype.storm.utils DisruptorQueue MutableObject]) (:require [backtype.storm.messaging [local :as local]]) (:require [backtype.storm [disruptor :as disruptor]])) ;;这里拿到了local.clj对外暴露的接口IContext,相对于获取了netty的消息传递能力,这个是在外面单独被调用的,回头会被放入lauch方法 (defn mk-local-context [] (local/mk-context)) ;;开启消息接受线程,持有netty,context(参数列表的socket),用于做流失计算。 (defn- mk-receive-thread [context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-id] (async-loop ;;定义匿名fn [],函数。 (fn [] (log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id " ]") (fn [] (let [batched (ArrayList.) ^Iterator iter (.recv ^IConnection socket 0 thread-id) closed (atom false)] (when iter (while (and (not @closed) (.hasNext iter)) (let [packet (.next iter) task (if packet (.task ^TaskMessage packet)) message (if packet (.message ^TaskMessage packet))] (if (= task -1) (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice") (.close socket) (reset! closed true)) (when packet (.add batched [task message])))))) (when (not @closed) (do (if (> (.size batched) 0) (transfer-local-fn batched)) 0))))) ;;定义匿名fn [],函数。 ;;序列map结构,给mk-receive-thread的参数列表传进来的几个参数,放在返回值内。key类型的以后好用 :factory? true :daemon daemon :kill-fn kill-fn :priority priority :thread-name (str "worker-receiver-thread-" thread-id))) ;;调用mk-receive-thread。根据thread-count参数,启动了对应的现场数量 (defn- mk-receive-threads [context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-count] (into [] (for [thread-id (range thread-count)] (mk-receive-thread context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-id)))) ;;参数列表的几个key类型给下面的提供引用,以后好用 (defnk launch-receive-thread! [context storm-id receiver-thread-count port transfer-local-fn max-buffer-size :daemon true :kill-fn (fn [t] (System/exit 1)) :priority Thread/NORM_PRIORITY] (let [max-buffer-size (int max-buffer-size) socket (.bind ^IContext context storm-id port) ;;线程数默认值给1,根据这个receiver-thread-count定。 thread-count (if receiver-thread-count receiver-thread-count 1) vthreads (mk-receive-threads context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-count)] ;;匿名函数fn [],执行加载逻辑 (fn [] (let [kill-socket (.connect ^IContext context storm-id "localhost" port)] (log-message "Shutting down receiving-thread: [" storm-id ", " port "]") (.send ^IConnection kill-socket -1 (byte-array [])) (.close ^IConnection kill-socket) (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die") (for [thread-id (range thread-count)] (.join (vthreads thread-id))) (log-message "Shutdown receiving-thread: [" storm-id ", " port "]") ))))
相关文章推荐
- git 学习笔记2-linux上安装git
- 物质、能量、信息
- WooCommerce 微信支付插件
- 11个超棒的iOS开发学习网站
- apache hide index.php
- XUtils数据库模块在多线程中的打开与关闭问题
- Java 代码特殊注释详解
- pandas的read_csv()函数读取的文件路径问题
- Isomorphic Strings(leetcode 205)
- 03 storm 源码阅读 storm的进程间消息通信实现clojure端 加载java端netty能力
- javascript格式化table标签内容
- db dw dd 与equ的区别
- cf554题意的理解和组合问题
- 关于byte 进制 float String 编码 16进制字符串转16进制byte的问题
- 图解Javascript上下文与作用域
- 02 storm 源码阅读 storm的进程间消息通信实现netty client实现
- 循环-18. 龟兔赛跑
- 前端开发工具vue.js开发实践总结
- STM32W108无线射频模块串行通信接口编程实例
- Android得到控件在屏幕中的坐标