您的位置:首页 > 编程语言 > Lua

Lua学习笔记15:非抢先式的多线程

2020-06-05 05:18 183 查看

一、 概念
协同程序提供一种协作式的多线程。每个协同程序都等于是一个线程。一对yield-resume可以将执行权在不同线程之间切换。然而,协同程序与常规的多线程的不同之处在于,协同程序是非抢先式的。就是说,当一个协同程序在运行时,是无法从外部停止它的。只有当协同程序显示地要求挂起时(调用yield),它才会停止。对于有些应用而言,这没有问题,而对于另外一些应用则可能无法接受这种情况。当不存在抢先时,编程会简单许多。无需为同步的bug而抓狂,在程序中所有线程间的同步都是显式的,只需确保一个协同程序在它的临界区域之外调用yield即可。对于这样非抢先式的多线程来说,只要有一个线程调用了一个阻塞操作,整个程序在该操作完成前,都会停止下来。对于大多数应用程序来说,这种行为是无法接受的,这也就导致了许多程序员放弃协同程序,转而使用传统的多线程
下面用一个有趣的方法来解决这个问题:
通过HTTP下载几个远程文件。(该例子中会使用Lua的socket库)
为了下载一个文件,必须先打开一个到该站点的连接,然后发送下载文件的请求并且接收文件,最后关闭连接
1、 require “socket”--加载LuaSocket库
2、local host = "www.w3.org" --定义主机
     local file1 = "/TR/REC-html132.html --下载的文件
3、c = assert(socket.connect(host,80))--连接到该站点的80端口(打开一个TCP连接,连接到该站点的80端口)
4、c:send("GET " .. file1 .. " HTTP/1.0\r\n\r\n) --(连接完成后将返回一个连接对象,可以用这个连接对象来发送文件请求)
5、while true do
         local s,status,partial=c:receive(2^10)
         io.write(s or partial)
         if status=="closed" then 
             break 
         end
      end
--在正常情况下receive函数会返回一个字符串。若发生错误,则会返回nil,并且附加错误代码(status)及出错前读取到的内容partial。当主机关闭连接时,就将其余接收到的内容打印出来,然后退出接收循环。
6、c:close() -- 关闭连接

二、接下来下载几个文件,最笨的办法就是逐个下载,但是太慢。程序大部分时间花费在等待数据的接收上。更明确地说,是将时间花在了receive阻塞调用上。因此如果一个程序可以同时下载所有文件的话,其效率就会快很多了。
当一个连接没有可用数据时,程序便可以从其他的连接处读取数据。
很明显协同程序提供了一种简便的方式来构建这种并发下载。
为每个下载任务创建一个新的线程,只要一个线程无可用数据,它就把控制权转让给一个简单的调度程序。
而这个调度程序则会调用其他下载线程。
在以协同程序来重写程序,先将前面的下载代码重新写:
function receive(connection)
    local s,status,partial = connection:receive(2^10)
    return s or partial,status
end 

function download(host,file)
    local c = assert(socket.connect(host,80))
    local count = 0            --记录接收到的字节数
    c:send("GET " .. file1 .. " HTTP/1.0\r\n\r\n)
    while true do
         local s,status,partial = receive(c)
         count = count + #(s or partial)  --计算并打印文件的大小
         if status=="closed" then 
             break 
         end
      end
end

download(host,file1)
这是下载一个文件的函数封装,只需调用download就可以。单独下载一个文件需要18秒左右。
但是在并发的情况中,receive代码不能阻塞,因此在它没有可用数据时应该挂起:
function receive(connection)
    connection:settimeout(0)    --使receive不会阻塞
    local s,status,partial = connection:receive(2^10)
       if status == "timeout" then
           coroutine.yield(connection)
       end
    return s or partial,status
end 
settimeout的调用,使得对此链接的操作不会阻塞。
即使在超时的情况下,连接也是会返回已经读取到的内容,即记录在partial变量中。

以下代码用table threads为调度程序保存所有正在运行中的线程。
get函数保证每个下载任务都在一个独立的线程中执行。
调度程序本身主要就是一个循环,遍历所有的线程,逐个唤醒它们的执行。
当有线程完成时,就将该线程从列表中删除:
threads = {}             --保存活跃线程的表
function get(host,file)
    local co = coroutine.create(function() --创建协同程序
        download(host,file)
    end)
    table.insert(threads,co)      --将协同程序插入到列表中
end 

function dispatch()
    local i = 1
    while true do 
        if threads[i] == nil then        --没有线程了
            if threads[1] == nil then 
                break 
            end --表是空表吗
           i = 1                        --重新开始循环
        end
        local status,res = coroutine.resume(threads[i]) --唤醒第i个协同程序改线程继续下载文件
        if not res then             --线程是否已经完成了任务
            table.remove(threads,i) --移除list中第i个线程
        else
            i = i + 1               --检查下一个线程
        end
    end
end

最后,主程序需要创建所有的线程,并调用调度程序(假设要下载w3g站点上的四个文件):
host="www.w3g.org"
get(host,"/TR/html401/html40.txt")
get(host,"/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host,"/TR/REC-HTML32.html")
get(host,"/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")

dispatch()  --主循环
运行这段代码,计算机可以在很短的时间内就可以下载完成这四个文件,但如果使用顺序下载的话,就要多耗费2倍的时间,除了速度有所提高以外,这个程序还有不够完美的地方,用于调度的函数dispatch(),只要有一个线程在读取数据,那这样程序就没有什么问题,但如果所有数据都没有线程可读的话,调度程序就会执行一个忙碌等待,不断的从一个线程切换到了另一个线程,仅仅是为了检测是否还有数据可读。这样会导致这个协同程序运行时会比顺序下载多耗费30倍的CPU时间。
为了避免这样的情况,可以使用LuaSocket中的select函数(socket.select(recvt, sendt [, timeout]))。
这个函数可以用于等待一组socket状态的改变,在等待时程序是陷入阻塞状态的,若要在当前实现中应用这个函数,只需要修该调度即可:
function dispatch()
    local i = 1
    local connections = {}                  --将所有的超时连接都放到connections的table变量中
    while true do 
        if threads[i] == nil then        --没有线程了
            if threads[1] == nil then break end --表是空表
            i = 1                        --重新开始循环
            timedout = {}                --遍历完所有线程,开始新一轮的遍历
        end
        local status,res = coroutine.resume(threads[i]) --唤醒该线程继续下载文件
        if not res then               --若完成了res就为nil,只有status一个返回值true。否则res为yield传入的参数connection。
            table.remove(threads,i)   --移除list中第i个线程
        else
            i = i + 1                 --检查下一个线程
            timedout[#timedout +1] = res
            if #connections == #threads then  --所有线程都阻塞了吗?
                socket.select(connections)    --如果线程有数据,就返回
            end
        end
    end
end

-------------------------------------------------------------------------------
threads = {}             --保存活跃线程的表
function get(host,file)
    local co = coroutine.create(function() --创建协同程序
        download(host,file)
    end)
    table.insert(threads,co)      --将协同程序插入到列表中
end 

function dispatch()
    local i = 1
    local connections = {}                  --将所有的超时连接都放到connections的table变量中
    while true do 
        if threads[i] == nil then        --没有线程了
            if threads[1] == nil then break end --表是空表
            i = 1                        --重新开始循环
            timedout = {}                --遍历完所有线程,开始新一轮的遍历
        end
        local status,res = coroutine.resume(threads[i]) --唤醒该线程继续下载文件
        if not res then               --若完成了res就为nil,只有status一个返回值true。否则res为yield传入的参数connection。
            table.remove(threads,i)   --移除list中第i个线程
        else
            i = i + 1                 --检查下一个线程
            timedout[#timedout +1] = res
            if #connections == #threads then  --所有线程都阻塞了吗?
                socket.select(connections)    --如果线程有数据,就返回
            end
        end
    end
end

host="www.w3g.org"
get(host,"/TR/html401/html40.txt")
get(host,"/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host,"/TR/REC-HTML32.html")
get(host,"/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")

dispatch()  --主循环
-------------------------------------------------------------------------------

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: