您的位置:首页 > 其它

04storm 源码阅读 storm的进程间消息通信实现clojure端 生成带netty能力的线程

2015-07-12 19:11 453 查看
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements.  See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership.  The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License.  You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0 ;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

;;主要负责加载持有netty功能的线程,对外暴露两个函数,mk-local-context,获取netty的Context对象。launch-receive-thread!,初始化持有netty能力的处理线程
(ns backtype.storm.messaging.loader
(:use [backtype.storm util log])
(:import [java.util ArrayList Iterator])
(:import [backtype.storm.messaging IContext IConnection TaskMessage])
(:import [backtype.storm.utils DisruptorQueue MutableObject])
(:require [backtype.storm.messaging [local :as local]])
(:require [backtype.storm [disruptor :as disruptor]]))

;;这里拿到了local.clj对外暴露的接口IContext,相对于获取了netty的消息传递能力,这个是在外面单独被调用的,回头会被放入lauch方法
(defn mk-local-context []
(local/mk-context))

;;开启消息接受线程,持有netty,context(参数列表的socket),用于做流失计算。
(defn- mk-receive-thread [context storm-id port transfer-local-fn  daemon kill-fn priority socket max-buffer-size thread-id]
(async-loop
;;定义匿名fn [],函数。
(fn []
(log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id  " ]")
(fn []
(let [batched (ArrayList.)
^Iterator iter (.recv ^IConnection socket 0 thread-id)
closed (atom false)]
(when iter
(while (and (not @closed) (.hasNext iter))
(let [packet (.next iter)
task (if packet (.task ^TaskMessage packet))
message (if packet (.message ^TaskMessage packet))]
(if (= task -1)
(do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
(.close socket)
(reset! closed  true))
(when packet (.add batched [task message]))))))

(when (not @closed)
(do
(if (> (.size batched) 0)
(transfer-local-fn batched))
0)))))
;;定义匿名fn [],函数。
;;序列map结构,给mk-receive-thread的参数列表传进来的几个参数,放在返回值内。key类型的以后好用
:factory? true
:daemon daemon
:kill-fn kill-fn
:priority priority
:thread-name (str "worker-receiver-thread-" thread-id)))

;;调用mk-receive-thread。根据thread-count参数,启动了对应的现场数量
(defn- mk-receive-threads [context storm-id port transfer-local-fn  daemon kill-fn priority socket max-buffer-size thread-count]
(into [] (for [thread-id (range thread-count)]
(mk-receive-thread context storm-id port transfer-local-fn  daemon kill-fn priority socket max-buffer-size thread-id))))

;;参数列表的几个key类型给下面的提供引用,以后好用
(defnk launch-receive-thread!
[context storm-id receiver-thread-count port transfer-local-fn max-buffer-size
:daemon true
:kill-fn (fn [t] (System/exit 1))
:priority Thread/NORM_PRIORITY]
(let [max-buffer-size (int max-buffer-size)
socket (.bind ^IContext context storm-id port)
;;线程数默认值给1,根据这个receiver-thread-count定。
thread-count (if receiver-thread-count receiver-thread-count 1)
vthreads (mk-receive-threads context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-count)]
;;匿名函数fn [],执行加载逻辑
(fn []
(let [kill-socket (.connect ^IContext context storm-id "localhost" port)]
(log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
(.send ^IConnection kill-socket
-1 (byte-array []))

(.close ^IConnection kill-socket)

(log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")

(for [thread-id (range thread-count)]
(.join (vthreads thread-id)))

(log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
))))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: