您的位置:首页 > 编程语言 > Lua

协程Coroutine——用同步的方式编写异步的逻辑

2015-10-01 16:29 351 查看
摘要:Lua中的协程是用户级线程,任何时候只有一个协程在真正运行,程序员能够控制协程的切换和运行,可以用同步的方式编写异步的逻辑。


进程、线程、协程

在操作系统中,进程拥有自己独立的内存空间,多个进程同时运行不会相互干扰,但是进程之间的通信比较麻烦;线程拥有独立的栈但共享内存,因此数据共享比较容易,但是多线程中需要利用加锁来进行访问控制:这是个非常头痛的问题,不加锁非常容易导致数据的错误,加锁容易出现死锁。多线程在多个核上同时运行,程序员根本无法控制程序具体的运行过程,难以调试。而且线程的切换经常需要深入到内核,因此线程的切换代价还是比较大的。

协程coroutine拥有自己的栈和局部变量,相互之间共享全局变量。任何时候只有一个协程在真正运行,程序员能够控制协程的切换和运行,因此协程的程序相比多线程编程来说轻松很多。由于协程是用户级线程,因为协程的切换代价很小。

协程的挂起

程序员能够控制协程的切换,这句话需要认真理解下。程序员通过yield让协程在空闲(比如等待io,网络数据未到达)时放弃执行权,通过resume调度协程运行。协程一旦开始运行就不会结束,直到遇到yield交出执行权。Yield和resume这一对控制可以比较方便地实现程序之间的“等待”需求,即“异步逻辑”。总结起来,就是协程可以比较方便地实现用同步的方式编写异步的逻辑。

“生产者-消费者”

异步逻辑最常见的例子便是“生产者-消费者”案例,消费者consumer需要等待生产者producer,只有生产了数据才能消费,这便是一个“等待的异步需求”。

Lua中协程常用接口:

coroutine接口说明:
coroutine.create(func)创建一个协程
coroutine.resume(coroutine, [arg1, arg2..])执行协程,第一次从头开始运行,之后每次从上次yield处开始运行,每次运行到遇到yield或者协程结束
coroutine.yield(…)挂起当前协程,交出执行权

利用协程的yield和resume实现的生产者-消费者代码:

--生产者
function producer()
return coroutine.create(
function()
while true do
local a = io.read()
--挂起协程,交出执行权
coroutine.yield(a)
end
end
)
end

--消费者
function consumer(pro)
while true do
--执行生产者协程
local s, v = coroutine.resume(pro)
print ('s='..tostring(s)..', v='..v)
end
end
p = producer()
consumer(p)


coroutine实现server

接下来再看一个用协程处理客户端请求的服务器:server主线程接收client请求,接受连接上来后为每个client创建一个coroutine,这个coroutine监听client发来的数据如果有数据发来那么进行处理,如果没有数据那么yield挂起交出执行权。

-- server.lua
--listen connection from client, and make a coroutine for each connection
--each coroutine recv data from client and send data back to client

local socket = require("socket")
local host = "127.0.0.1"
local port = "8888"
local connections = {}
local threads = {}

function receive_data(sock_id)
local conn = connections[sock_id]
if conn ~= nil then
local recvt, t, status = socket.select({conn}, nil, 1)
if #recvt > 0 then
local receive, receive_status = conn:receive()

if receive_status ~= "closed" then
if receive then
assert(conn:send(receive .. "\n"))
print("Receive Client " .. sock_id.. " : " ..receive)
end
--disconnect
else
print("Client " .. sock_id .. " disconnect!")
connections[sock_id].close()
connections[sock_id]
4000
= nil
threads[sock_id] = nil
end
end
--yield, stop execution of this coroutine
coroutine.yield()
end
end

--handle data from client: send data back to client
function connection_handler(sock_id)
while true do
--print ('connection_handler.. id=' .. sock_id)
local conn = connections[sock_id]
if conn == nil then
break
end
local data, status = receive_data(sock_id)
end
end

--create coroutine to handle data from client
function create_handler(sock_id)
--print 'create_handler..'
local handler = coroutine.create(function ()
connection_handler(sock_id)
end)
return handler
end

function accept_connection(sock_id, conn)
print("accepted new socket ,id = " .. sock_id)
connections[sock_id] = conn
threads[sock_id] = create_handler(sock_id)
end

--schedule all clients
function dispatch()
for _sock_id, _thread in ipairs(threads) do
--print ('dispatch, _sock_id = '.. _sock_id)
coroutine.resume(threads[_sock_id])
end
end

function start_server()
local server = assert(socket.bind(host, port, 1024))
print("Server Start " .. host .. ":" .. port)
server:settimeout(0)

local conn_count = 0
while true do
--accept new connection
local conn = server:accept()
if conn then
conn_count = conn_count + 1
accept_connection(conn_count, conn)
end
--deal data from connection
dispatch()
end
end

function main()
start_server()
end
main()

--client.lua
-- send user input data to server and print data from server
local socket = require("socket")
local host = "127.0.0.1"
local port = 8888
local sock = assert(socket.connect(host, port))
sock:settimeout(0)

function main()
print("Press enter after input something:")
local input, recvt, sendt, status
while true do
input = io.read()
if #input > 0 then
assert(sock:send(input .. "\n"))
end
recvt, sendt, status = socket.select({sock}, nil, 1)
while #recvt > 0 do
local response, receive_status = sock:receive()
if receive_status ~= "closed" then
if response then
print ('recv from server:'..response)
recvt, sendt, status = socket.select({sock}, nil, 1)
end
else
break
end
end
end
end
main()


coroutine_server 运行

首先启动server,然后启动两个client去连接并进行输入,server利用coroutine对client的输入进行了回写,执行情况如下图所示:







github源码

https://github.com/harryzeng/coroutine_server

参考

http://my.oschina.net/wangxuanyihaha/blog/186401

http://www.kuqin.com/system-analysis/20110910/264592.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息