您的位置:首页 > 编程语言 > Lua

Lua学习笔记-9.4章-非抢占式的多线程

2016-04-07 17:38 357 查看
1、coroutine运行一系列的协作多线程。每个coroutine相当于一个thread。通过yield-resume实现在不同thread之间切换控制权。但是,跟常规的多线程不同,coroutine是非抢占式的。一个coroutine在运行的时候,不可能被其他的coroutine从外部将其挂起,只有由其本身显式地调用yield才会挂起,并交出控制权。对一些程序来说,这没有任何问题,相反,因为非抢占式的缘故,程序变得更加简单。我们不需要担心同步问题的bug,因为在threads之间的同步都是显式的。我们只需要保证在对的时刻调用yield就可以了。

使用非抢占式multithreading,不管哪个thread调用了一个阻塞的操作,那么整个程序都会被阻塞,这是不能容忍的。由于这个原因,很多程序员并不认为coroutine可以替代传统的multithreading。但是,下面我们可以看到一个有趣的解决办法。

2、一个很典型的multithreading场景:通过http下载多个remote files。先来看下如何下载一个文件,这需要使用LuaSocket库,如果你的开发环境没有这个库的话,可以 Lua基础 安装LuaSocket,了解下如何在Linux上安装LuaSocket。需要先创建一个到该网站的连接,发送对该文件的一个请求,从而获取文件,最后关闭连接。下载一个file的lua代码如下:

local socket = require "socket"
--定义主机和指明待下载的文件
host = "www.w3.org"
file = "/TR/REC-html32.html"
--打开TCP连接,连接到80端口(这是HTTP连接的标准端口号)
c = assert(socket.connect(host, 80))--该连接操作返回一个连接对象,我们利用该连接对象发送文件的请求信息
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n") -- 注意GET后和HTTP前面的空格
--读取文件块(以1kB为最小单元)
while true do
local s, status, partial = c:receive(2^10)--receive函数返回接收到的数据加上一个表示操作状态的字符串
io.write(s or partial)
if status == "closed" then--当主机断开连接时,退出循环
break
end
end

c:close()
现在回到前面说的下载多个remote files的问题。当我们接收一个remote file的时候,程序花费了大多数时间去等待数据的到来,也就是在receive函数的调用大部分是阻塞。因此,如果能够同时下载所有的files,当连接点并没有进行数据下载后,该程序可以从其他的连接点进行数据的下载,那么程序的运行速度会快很多。下面我们看一下如何用coroutine来模拟这个实现。我们为每一个下载任务创建一个thread,当一个thread没有数据可用的时候,就调用yield 将程序控制权交给一个简单的dispatcher,由dispatcher来唤醒另一个thread。下面先把之前的代码写成一个函数,但是有少许改动,不再将file的内容输出到stdout了,而只是间的的输出file
size。

function download(host, file)--并不关心file中的内容
local c = assert(socket.connect(host, 80))
local count = 0  --  counts number of bytes read
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status, partial = receive(c)
count = count + #(s or partial)
if status == "closed" then
break
end
end
c:close()
print(file, count)
end

--上面代码中有个函数receive ,相当于下载单个文件中的实现如下,无阻塞:
function receive (connection)
return connection:receive(2^10)
end

--但是,如果要同时下载多文件的话,这个函数必须非阻塞地接收数据。在没有数据接收的时候,就调用yield挂起,交出控制权。实现应该如下:
function receive(connection)
connection:settimeout(0)  -- do not block
local s, status, partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)--当没有数据的时候,阻塞挂起
end
return s or partial, status
end
settimeout(0)将这个连接设为非阻塞模式。当status变为“timeout”时,意味着该操作还没完成就返回了,这种情况下,该thread就yield。传递给yield的non-false参数,
告诉dispatcher该线程仍然在运行。注意,即使timeout了,该连接还是会返回它已经收到的东西,存在partial变量中。

下面的代码展示了一个简单的dispatcher。表threads保存了一系列的运行中的thread。函数get 确保每个下载任务都单独一个thread。dispatcher本身是一个循环,
不断的遍历所有的thread,一个一个的去resume。
如果一个下载任务已经完成,一定要将该thread从表thread中删除。当没有thread在运行的时候,循环就停止了。最后,程序创建它需要的threads,并调用dispatcher。
例如,从w3c网站下载四个文档,程序如下所示:
require "socket"

function receive(connection)
connection:settimeout(0)  -- do not block
local s, status, partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s or partial, status
end

function download(host, file)
local c = assert(socket.connect(host, 80))
local count = 0  --  counts number of bytes read
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status, partial = receive(c)
count = count + #(s or partial)
if status == "closed" then
break
end
end
c:close()
print(file, count)
end

threads = {}  -- list of all live threads

function get(host, file)
-- create coroutine
local co = coroutine.create(function ()
download(host, file)
end)
-- intert it in the list
table.insert(threads, co)
end

function dispatch()
local i = 1
while true do
if threads[i] == nil then  -- no more threads?
if threads[1] == nil then -- list is empty?
break
end
i = 1  -- restart the loop
end
local status, res = coroutine.resume(threads[i])
if not res then   -- thread finished its task?
table.remove(threads, i)
else
i = i + 1
end
end
end

host = "www.w3.org"
get(host, "/TR/html401/html40.txt")
get(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host, "/TR/REC-html32.html")
get(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
dispatch() -- main loop
耗时大约7s左右



重新用阻塞式的顺序下载重试了一下,需要时间7s多一点,可能文件比较小,也不够多,优势并不明显。阻塞的多文件下载代码如下

function receive (connection)
return connection:receive(2^10)
end

function download(host, file)
local c = assert(socket.connect(host, 80))
local count = 0  --  counts number of bytes read
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status, partial = receive(c)
count = count + #(s or partial)
if status == "closed" then
break
end
end
c:close()
print(file, count)
end

require "socket"

host = "www.w3.org"

download(host, "/TR/html401/html40.txt")
download(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
download(host, "/TR/REC-html32.html")
download(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
下载截图:下载完成的顺序,就是代码中写的顺序:



那对于非阻塞式的这种如何优化呢?

当没有thread有数据接收时,dispatcher遍历了每一个thread去看它有没有数据过来,结果这个过程比阻塞式的版本多耗费了30倍的cpu。

为了避免这个情况,我们使用LuaSocket提供的select函数。它运行程序在等待一组sockets状态改变时阻塞。代码改动比较少,在循环中,收集timeout的连接到表connections 中,当所有的连接都timeout了,dispatcher调用select 来等待这些连接改变状态。该版本的程序,在博主开发环境测试,只需7s不到,就下载完成4个文件,除此之外,对cpu的消耗也小了很多,只比阻塞版本多一点点而已。新的dispatch代码如下:

function dispatch()
local i = 1
local connections = {}
while true do
if threads[i] == nil then  -- no more threads?
if threads[1] == nil then -- list is empty?
break
end
i = 1  -- restart the loop
connections = {}
end
local status, res = coroutine.resume(threads[i])
if not res then   -- thread finished its task?
table.remove(threads, i)
else
i = i + 1
connections[#connections + 1] = res
if #connections == #threads then   -- all threads blocked?
socket.select(connections)
end
end
end
end
耗时大约5s,提升相对明显,运行结果如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: