您的位置:首页 > 编程语言 > Python开发

python channels笔记--Getting started with channels

2016-06-15 12:51 363 查看
1.Channels概念
channel是一个队列,每个task最多一个consumer接收
用name字符串辨别channels,可由不同的机器向同一个channel通信
 
def
my_consumer(message):

    pass
channel_routing
= {

    "some-channel":
"myapp.consumers.my_consumer",

}
      
对此channel的每个message,Django会用一个message对象调用consumer函数。message对象含一个content属性,是数据的字典,一个channel属性,是channel。
      
区别于Django的传统request-response模式,channel将Django改编为worker模式。传统Django只工作在与WSGI服务器关联的单进程,而Channel中的Django在三个不同的层运行:1.服务层:包括WSGI适配器和WebSocket服务器。2.channelbackend。3.workers。
 
# Listens onhttp.request
def my_consumer(message):

    # Decode the request from message format to aRequest object

    django_request =
AsgiRequest(message)

    # Run view

    django_response =
view(django_request)

    # Encode the response into message format

    for chunk
in AsgiHandler.encode_response(django_response):

        message.reply_channel.send(chunk)
 

   
事实上channels有两种作用。一种是将work给consumers。另一种用于responses,只有interface
server在监听它,每个response channel命名不同,并且客户端终止时会被退回interface
server。两种channel差别不大,不同在于message可以由任意channel传递,但response
channel只会将message传到它所监听的channel
server。所以将它们视为两种不同的channel,并用!标记response
channel,normal channel没有标记,但它只能包含字母和-_

 
因为channels只分配给一个监听器,所以他们不能广播。若希望给一组client发送消息,需要记录这些response
channels。
redis_conn
= redis.Redis("localhost",
6379)
@receiver(post_save,
sender=BlogUpdate)
def send_update(sender,
instance,
**kwargs):

    # Loop through all response channels and sendthe update

    for reply_channel
in redis_conn.smembers("readers"):

        Channel(reply_channel).send({

            "id":
instance.id,

            "content":
instance.content,

        })
# Connected towebsocket.connect
def ws_connect(message):

    # Add to reader set

    redis_conn.sadd("readers",
message.reply_channel.name)
 

     
     可以通过监听websocket.disconnect将不需要的从readers中移出,对于在调用该方法之前就终止的特殊情况,用Group。Group通常只用于response
channel

 
@receiver(post_save,
sender=BlogUpdate)
def send_update(sender,
instance,
**kwargs):

    Group("liveblog").send({

        "id":
instance.id,

        "content":
instance.content,

    })
# Connected towebsocket.connect
def ws_connect(message):

    # Add to reader group

    Group("liveblog").add(message.reply_channel)
# Connected towebsocket.disconnect
def ws_disconnect(message):

    # Remove from reader group on clean disconnect

    Group("liveblog").discard(message.reply_channel)
 

        
但是channel不保证发送task。如果需要确保可以用专门的系统如celery。
 
2.安装
pip install -U channels
add channels to your INSTALLED_APPS setting
 
3.开始使用Channels
       
下面的例子可能不常用,但是可以很好地说明channels如何在Django的下层发挥作用。
       
创建过程,将下面代码放入consumer.py
from
django.http import
HttpResponse
from channels.handler
import AsgiHandler
def
http_consumer(message):

    # Make standard HTTP response -access ASGI path attribute directly

    response =
HttpResponse("Hello world! You asked for %s"
% message.content['path'])

    # Encode that response into messageformat (ASGI)

    for chunk
in AsgiHandler.encode_response(response):

        message.reply_channel.send(chunk)

 
       
最重要的是,message都是可JSON序列化的,所以request和response都是键值对形式。对于ASGI,你只需要知道有一个AsgiRequest类完成从ASGI到Django的request对象翻译,
并且AsgiHandler类负责HttpResponse到ASGI消息的翻译.
通常传统Django的内建代码会帮你完成这些工作.
       还有一件事,
需要告诉Django,
这个consumer应该与http.request channel关联.
需要在settings文件中默认的channel layer以及它的路由规则.
# In settings.py
CHANNEL_LAYERS = {

    "default": {

        "BACKEND":
"asgiref.inmemory.ChannelLayer",

        "ROUTING":
"myproject.routing.channel_routing",

    },

}
 
# In routing.py
from channels.routing
import route
channel_routing = [

    route("http.request",
"myapp.consumers.http_consumer"),

]
 
       
特别注意,
此例及之后大部分样例中都用"in memory"channel layer.
它适合初学者,
但是没有跨进程的channel沟通,
也只能用于"runserver".
实际产品中需要选择其他的backend.
建议将这个文件命名为routing.py,
并且放在urls.py的同一目录下.

 
       
再来一个简单的聊天服务器,如果不为指定http.request指定consumer,则它会由Django的views处理。
# Inconsumers.py
defws_message(message):

    # ASGI WebSocket packet-received andsend-packet message types

    # both have a "text" keyfor their textual data.

   message.reply_channel.send({

        "text":message.content['text'],

    })
# Inrouting.py

from channels.routing import route

from myapp.consumers import ws_message
channel_routing= [

    route("websocket.receive",ws_message),

]
 
// Note thatthe path doesn't matter for routing; any WebSocket

// connection gets bumped over to WebSocket consumers

socket = new WebSocket("ws://" + window.location.host +"/chat/");

socket.onmessage = function(e) {

   alert(e.data);

}

socket.onopen = function() {

    socket.send("helloworld");

}
 
Groups
       
接下来是一个真正实现聊天的服务器。注意,channels 的设计默认允许有一小部分消息不会被正常发送,这样保证当错误出现时不会影响整个系统。
 
# Inconsumers.py

from channels import Group
# Connectedto websocket.connect

def ws_add(message):

   Group("chat").add(message.reply_channel)
# Connectedto websocket.receive

def ws_message(message):

   Group("chat").send({

        "text": "[user]%s" % message.content['text'],

    })
# Connectedto websocket.disconnect

def ws_disconnect(message):

   Group("chat").discard(message.reply_channel)
 
fromchannels.routing import route

from myapp.consumers import ws_add, ws_message, ws_disconnect
channel_routing= [

    route("websocket.connect",ws_add),

    route("websocket.receive",ws_message),

   route("websocket.disconnect", ws_disconnect),

]
 
// Note thatthe path doesn't matter right now; any WebSocket

// connection gets bumped over to WebSocket consumers

socket = new WebSocket("ws://" + window.location.host +"/chat/");

socket.onmessage = function(e) {

   alert(e.data);

}

socket.onopen = function() {

    socket.send("helloworld");

}
 
Runningwith Channels
       
因为Channels将Django置入多进程模型,所以不再依靠一个WSGI服务器工作在单进程。而是运行通过channel
layer关联起来的多个interface server和多个worker
servers。
       
有许多种interface server,每个都负责一种request。它们与worker
server解耦,而由channellayer传输channel的内容。从产品的角度看,你通常会将worker
server当作与interfaceservers不同的集群运行,虽然你也可以将他们作为不同的进程运行在同一台机器上。
        Django默认没有channel layer。上个例子我们用的in-memory
channel layer,它将channel数据存储在内存中的一个字典里,所以不能跨进程。当部署时还是要换成Redis后端asgi_redis等。
       
第二件事,当我们设置好了channel后端,要确保我们的interface
layer可以用于WebSockets。Channels用daphne解决这个问题。它是个可以同时处理HTTP和WebSockets的interfaceserver,并且当你runserver时运行。(其实就是,runserver运行Daphne在一个线程,一个worker在另一个线程,但都在同一个进程)
       
我们来试试Redis后端。先安装asgi_redis包。然后设置channel
layer:
# In settings.py
CHANNEL_LAYERS = {

    "default": {

        "BACKEND":
"asgi_redis.RedisChannelLayer",

        "CONFIG": {

            "hosts": [("localhost",
6379)],

        },

        "ROUTING":
"myproject.routing.channel_routing",

    },

}
       
运行runserver,它会和前面的一样运行,你也可以试试跨进程类型。将下面两个命令在两个终端运行:
manage.py runserver --noworker
manage.py runworker
       你可能猜到了,它在runserver中禁用了worker并且在另一个进程中处理。如果你想看运行consumers的logging,也可以向runworker传递-v
2参数。
       如果Django运行在debug模式,runworker像runserver一样会用于静态文件。就像通常的Django设置,你需要设置Debug模式关掉后你自己的静态文件。
 
Persisting Data
       我们考虑一个基本的聊天网站,基于初始连接。
       记住,Channels是网络透明的,并且可以在多个worker运行,所以你不能仅把东西存储在本地的全局变量。正如Django的session框架用cookie作为key,Channels提供channel_session装饰器。它提供message.channel_session属性,就像Django
session。
# In consumers.py
from channels
import Group
from channels.sessions
import channel_session
# Connected towebsocket.connect

@channel_session
def ws_connect(message):

    # Work out room name from path (ignore slashes)

    room =
message.content['path'].strip("/")

    # Save room in session and add us to the group

    message.channel_session['room']
= room

    Group("chat-%s"
% room).add(message.reply_channel)
# Connected towebsocket.receive

@channel_session
def ws_message(message):

    Group("chat-%s"
% message.channel_session['room']).send({

        "text":
message['text'],

    })
# Connected towebsocket.disconnect

@channel_session
def ws_disconnect(message):

    Group("chat-%s"
% message.channel_session['room']).discard(message.reply_channel)
 
# in routing.py
from channels.routing
import route
from myapp.consumers
import ws_connect,
ws_message,
ws_disconnect
channel_routing
= [

    route("websocket.connect",
ws_connect),

    route("websocket.receive",
ws_message),

    route("websocket.disconnect",
ws_disconnect),

]
 
Authenticantion
       目前的WebSocket无法与网站的其余成员通信。幸运的是,由于Channels在WebSocket和ASGI的底层,它自带了认证和Django
session的装饰器。Channels可以通过cookies或session_key属性得到Django
sessions。你用http_session装饰器获得Django
session(message.http_session属性,就像request.session)。你也可以用http_session_user装饰器获得message.user属性。
       注意这些只是WebSocket的HTTP消息的详细信息,并没有消耗多余的带宽。
       也意味着我们需要从connection handler中获取user并将它保存在session中,。Channels自带了channel_session_user装饰器,就像http_session_user装饰器,不同的是前者从channel
session获取user。还有一个函数transfer_user从一个session复制user到另一个。更棒的是,它把上面两个功能都组合进channnel_session_user_from_http装饰器。
     我们实现一个服务器,只能与名字第一个字母相同的人对话。
# In consumers.py
from channels
import Channel,
Group
from channels.sessions
import channel_session
from channels.auth
import http_session_user,
channel_session_user,
channel_session_user_from_http
# Connected towebsocket.connect

@channel_session_user_from_http
def ws_add(message):

    # Add them to the right group

    Group("chat-%s"
% message.user.username[0]).add(message.reply_channel)
# Connected towebsocket.receive

@channel_session_user
def ws_message(message):

    Group("chat-%s"
% message.user.username[0]).send({

        "text":
message['text'],

    })
# Connected towebsocket.disconnect

@channel_session_user
def ws_disconnect(message):

    Group("chat-%s"
% message.user.username[0]).discard(message.reply_channel)
 
       如果你只是runserver(Daphne),你就可以正常连接并且你的cookies也应该传递你的auth。如果你将WebSockets运行在不同的端口,你必须在URL中提供Django
session ID。
socket = newWebSocket("ws://127.0.0.1:9000/?session_key=abcdefg");
你可以在模板中得到session key{{request.session.session_key
}}。注意它对于signed cookie sessions无效。

 

Routing
        routing.py文件很像Django的urls.py。也可以用正则。
http_routing = [

    route("http.request",poll_consumer, path=r"^/poll/$", method=r"^POST$"),

]
chat_routing = [

    route("websocket.connect",chat_connect, path=r"^/(?P<room>[a-zA-Z0-9_]+)/$),

   route("websocket.disconnect", chat_disconnect),

]
routing = [

    # You can use a string import path asthe first argument as well.

    include(chat_routing,path=r"^/chat"),

    include(http_routing),

]
routing按顺序执行,有短路的可能。也可以不以^起始,而用python的re.match函数,但这需要经验。

 

Models
     
可以用Django的ORM处理信息的持久化。考虑性能:我们可以为聊天消息定制channel,并且加入保存和发送的步骤,这样发送进程和consumer就可以快速结束而不用等待。
# In consumers.py
from channels
import Channel
from channels.sessions
import channel_session
from .models
import ChatMessage
# Connected tochat-messages
def msg_consumer(message):

    # Save to model

    room =
message.content['room']

    ChatMessage.objects.create(

        room=room,

        message=message.content['message'],

    )

    # Broadcast to listening sockets

    Group("chat-%s"
% room).send({

        "text":
message.content['message'],

    })
# Connected towebsocket.connect

@channel_session
def ws_connect(message):

    # Work out room name from path (ignore slashes)

    room =
message.content['path'].strip("/")

    # Save room in session and add us to the group

    message.channel_session['room']
= room

    Group("chat-%s"
% room).add(message.reply_channel)
# Connected towebsocket.receive

@channel_session
def ws_message(message):

    # Stick the message onto the processing queue

    Channel("chat-messages").send({

        "room":
channel_session['room'],

        "message":
message['text'],

    })
# Connected towebsocket.disconnect

@channel_session
def ws_disconnect(message):

    Group("chat-%s"
% message.channel_session['room']).discard(message.reply_channel)
       
注意我们可以从任何地方将message加入chat-messages
channel。

 

Enforcing Ording
       
因为Channles是分布式系统,默认它按workers从队列中获得的顺序处理消息。很可能interface
server发出非常近的connect和receive消息,connect还未被处理完,receive就被另一个worker处理。
        Channels的解决方法是enforce_ordering装饰器。所有websocket消息都包含一个order键,这个装饰器用这个键确保message按顺序处理。有两个模式:
        Slightordering:connect先处理,其他无序。
        Strictordering:所有都按序。
# In consumers.py
from channels
import Channel,
Group
from channels.sessions
import channel_session,
enforce_ordering
from channels.auth
import http_session_user,
channel_session_user,
channel_session_user_from_http
# Connected towebsocket.connect

@enforce_ordering(slight=True)

@channel_session_user_from_http
def ws_add(message):

    # Add them to the right group

    Group("chat-%s"
% message.user.username[0]).add(message.reply_channel)
# Connected towebsocket.receive

@enforce_ordering(slight=True)

@channel_session_user
def ws_message(message):

    Group("chat-%s"
% message.user.username[0]).send({

        "text":
message['text'],

    })
# Connected towebsocket.disconnect

@enforce_ordering(slight=True)

@channel_session_user
def ws_disconnect(message):

    Group("chat-%s"
% message.user.username[0]).discard(message.reply_channel)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: