使用 erlang OTP 模式编写非阻塞的 tcp 服务器(来自erlang wiki)
2014-09-17 00:12
639 查看
参考资料:http://erlangcentral.org/wiki/index.php/Building_a_Non-blocking_TCP_server_using_OTP_principles
服务器设计
tcp_server_app下的根监控树使用one_for_one重启策略。两个子树应用,第一个是一个tcp套接字监听服务器,使用gen_server模式来实现,采用异步监听的客户端连接的模式。第二个是一个客户端应用,使用gen_fsm模式实现,使用标准SASL错误报告接口,记录客户端消息处理的日志以及非正常与服务器断开连接日志。
整体应用架构:
+----------------+
| tcp_server_app |
+--------+-------+
| (one_for_one)
+----------------+---------+
| |
+-------+------+ +-------+--------+
| tcp_listener | + tcp_client_sup |
+--------------+ +-------+--------+
| (simple_one_for_one)
+-----|---------+
+-------|--------+|
+--------+-------+|+
| tcp_echo_fsm |+
+----------------+
tcp_server代码如下:
下面是服务端socket监听程序,这里使用了一个不具有官方文档的 api
prim_inet:async_accept/2 来实现一个异步监听套接字的服务器程序,代码如下:
下面是客户端处理输出的状态机:
最后是app文件了:
以上基本上都是个人查找资料过程的笔记,有理解错误的地方请评论指出,谢谢!
服务器设计
tcp_server_app下的根监控树使用one_for_one重启策略。两个子树应用,第一个是一个tcp套接字监听服务器,使用gen_server模式来实现,采用异步监听的客户端连接的模式。第二个是一个客户端应用,使用gen_fsm模式实现,使用标准SASL错误报告接口,记录客户端消息处理的日志以及非正常与服务器断开连接日志。
整体应用架构:
+----------------+
| tcp_server_app |
+--------+-------+
| (one_for_one)
+----------------+---------+
| |
+-------+------+ +-------+--------+
| tcp_listener | + tcp_client_sup |
+--------------+ +-------+--------+
| (simple_one_for_one)
+-----|---------+
+-------|--------+|
+--------+-------+|+
| tcp_echo_fsm |+
+----------------+
tcp_server代码如下:
%% TCP Server Application (tcp_server_app.erl) -module(tcp_server_app). -author('saleyn@gmail.com'). %% 实现application模式 -behaviour(application). -export([start_client/0]). %% 应用程序启动以及监控树回调函数 -export([start/2, stop/1, init/1]). %% 宏变量定义 -define(MAX_RESTART, 5). -define(MAX_TIME, 60). -define(DEF_PORT, 2222). %% 启动客户端进程的接口 %% 在监听程序建立连接时调用 start_client() -> %% 回调第二个init函数,因为第二个是动态添加监控树子节点 %% 也就是说这里是两颗不同的监控树,使用了一个模块两个 init 函数来实现 supervisor:start_child(tcp_client_sup, []). %%---------------------------------------------------------------------- %% Application behaviour callbacks %%---------------------------------------------------------------------- start(_Type, _Args) -> %% 获取端口配置参数,找不到时返回默认端口 ?DEF_PORT ListenPort = get_app_env(listen_port, ?DEF_PORT), %% 启动应用程序,回调函数为 第一个 init 函数,根据参数匹配,参数为 [端口,客户端回调模块] %% 第一个 init 函数仅仅是启动了两个监控树 supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]). stop(_S) -> ok. %%---------------------------------------------------------------------- %% Supervisor behaviour callbacks %%---------------------------------------------------------------------- init([Port, Module]) -> {ok, %% 监控树策略参数,ono_for_one策略,设置MAX_TIME最多重启的MAX_RESTART次 {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME}, [ % TCP Listener { tcp_server_sup, % Id = internal id {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A} permanent, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [tcp_listener] % Modules = [Module] | dynamic }, % Client instance supervisor { %% Module参数初始化了tcp_client_sup监控树的 init 函数, init 函数在下面 tcp_client_sup, %% 子节点启动策略 {supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]}, permanent, % Restart = permanent | transient | temporary infinity, % Shutdown = brutal_kill | int() >= 0 | infinity supervisor, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } }; %% 在服务器接收连接时,创建客户端进程时会回调到这个函数,使用simple_one_for_one启动策略 %% 参数 Module 在第一个 init([Module]) -> {ok, %% 另外一种根监督树模式,simple_one_for_one策略子节点只能动态添加 {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME}, [ % TCP Client { undefined, % Id = internal id {Module,start_link,[]}, % StartFun = {M, F, A} temporary, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } }. %%---------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------- %% 获取配置文件xxx.app文件中的配置变量 get_app_env(Opt, Default) -> case application:get_env(application:get_application(), Opt) of {ok, Val} -> Val; _ -> case init:get_argument(Opt) of [[Val | _]] -> Val; error -> Default end end.
下面是服务端socket监听程序,这里使用了一个不具有官方文档的 api
prim_inet:async_accept/2 来实现一个异步监听套接字的服务器程序,代码如下:
% TCP Listener Process (tcp_listener.erl) -module(tcp_listener). -author('saleyn@gmail.com'). %% 实现 gen_server 模式 -behaviour(gen_server). %% 内部接口 -export([start_link/2]). %% gen_server 回调函数 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% 定义了一个 record 记录 gen_server 进程的状态 -record(state, { listener, % Listening socket acceptor, % Asynchronous acceptor's internal reference module % FSM handling module }). %%-------------------------------------------------------------------- %% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason} %% @doc 监控树调用并开始进行tcp套接字监听 %% @end %%---------------------------------------------------------------------- start_link(Port, Module) when is_integer(Port), is_atom(Module) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []). %%%------------------------------------------------------------------------ %%% Callback functions from gen_server %%%------------------------------------------------------------------------ %%---------------------------------------------------------------------- %% @spec (Port::integer()) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% %% @doc gen_server启动时回调,并创建 tcp 监听 %% @end %%---------------------------------------------------------------------- init([Port, Module]) -> process_flag(trap_exit, true), Opts = [binary, {packet, 2}, {reuseaddr, true}, {keepalive, true}, {backlog, 30}, {active, false}], %% 使用 gen_tcp 模块启动套接字监听,这是一个阻塞动作 case gen_tcp:listen(Port, Opts) of {ok, Listen_socket} -> %% 创建监听成功返回监听socket %% 创建第一个接受连接的进程 %% prim_inet:async_accept/2开启异步监听 %% 之后有客户端连接时会向此进程发送一个异步消息inet_async到进程消息队列 %% Ref 存储接受进程的引用 {ok, Ref} = prim_inet:async_accept(Listen_socket, -1), {ok, #state{listener = Listen_socket, acceptor = Ref, module = Module}}; {error, Reason} -> {stop, Reason} end. %%------------------------------------------------------------------------- %% @spec (Request, From, State) -> {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | %% {stop, Reason, State} %% @doc 服务进程被同步调用时的回调函数 %% @end %% @private %%------------------------------------------------------------------------- handle_call(Request, _From, State) -> {stop, {unknown_call, Request}, State}. %%------------------------------------------------------------------------- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc 服务进程被异步调用时的回调函数 %% @end %% @private %%------------------------------------------------------------------------- handle_cast(_Msg, State) -> {noreply, State}. %%------------------------------------------------------------------------- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc 回调函数,处理那些直接发消息到进程邮箱的事件 %% 这里处理的是 {inet_async, ListSock, Ref, {ok, CliSocket}}事件, %% inet_async 表示是一个异步事件,服务器端接收连接采用异步的方式, %% 客户端连接最终会被转化成一个 inet_async 消息发送到进程邮箱等待处理 %% {{ok, CliSocket}} 里的CliSocket表示客户端建立的连接套接口 %% @end %% @private %%------------------------------------------------------------------------- %% 注意这里 ListSock 以及 Ref 做了匹配,只有匹配了才是该监听口接收的连接 handle_info({inet_async, ListSock, Ref, {ok, CliSocket}}, #state{listener=ListSock, acceptor=Ref, module=Module} = State) -> try case set_sockopt(ListSock, CliSocket) of ok -> ok; {error, Reason} -> exit({set_sockopt, Reason}) end, %% 接收新的客户端连接,启动一个新的客户端状态机进程,动态添加到 tcp_client_sup 客户端监控树 {ok, Pid} = tcp_server_app:start_client(), %% 绑定 CliSocet 到客户端进程 Pid, 这样CliSocket接收数据都会被转化成Pid代表进程的邮箱消息 gen_tcp:controlling_process(CliSocket, Pid), %% Instruct the new FSM that it owns the socket. Module:set_socket(Pid, CliSocket), %% Signal the network driver that we are ready to accept another connection %% 重新设置异步监听下一个客户端连接的消息,设置新的监听引用 %% 必须重新设置才能监听到 {inet_async,S,Ref,Status} 消息 case prim_inet:async_accept(ListSock, -1) of {ok, NewRef} -> ok; {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)}) end, %% 更新新的监听引用 {noreply, State#state{acceptor=NewRef}} catch exit:Why -> error_logger:error_msg("Error in async accept: ~p.\n", [Why]), {stop, Why, State} end; %%客户端建立连接的容错处理 handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) -> error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]), {stop, Error, State}; handle_info(_Info, State) -> {noreply, State}. %%------------------------------------------------------------------------- %% @spec (Reason, State) -> any %% @doc Callback executed on server shutdown. It is only invoked if %% `process_flag(trap_exit, true)' is set by the server process. %% The return value is ignored. %% @end %% @private %%------------------------------------------------------------------------- terminate(_Reason, State) -> gen_tcp:close(State#state.listener), ok. %%------------------------------------------------------------------------- %% @spec (OldVsn, State, Extra) -> {ok, NewState} %% @doc Convert process state when code is changed. %% @end %% @private %%------------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. %%%------------------------------------------------------------------------ %%% Internal functions %%%------------------------------------------------------------------------ %% 设置客户端socket的参数选项,只是简单的复制了监听服务器的配置选项 set_sockopt(ListSock, CliSocket) -> true = inet_db:register_socket(CliSocket, inet_tcp), case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of {ok, Opts} -> case prim_inet:setopts(CliSocket, Opts) of ok -> ok; Error -> gen_tcp:close(CliSocket), Error end; Error -> gen_tcp:close(CliSocket), Error end.
下面是客户端处理输出的状态机:
%% TCP Client Socket Handling FSM (tcp_echo_fsm.erl) %% 客户端输出处理状态机,这里其实就是一个 echo_server 的客户端版本 -module(tcp_echo_fsm). -author('saleyn@gmail.com'). %% 实现 gen_fsm 模式,事实上状态机应用场景没有 gen_server 多 %% 不过能用的场景都比较特殊,比如游戏客户端,服务端战斗模块 -behaviour(gen_fsm). -export([start_link/0, set_socket/2]). %% gen_fsm 回调函数 -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). %% FSM States FSM 状态机的状态 -export([ 'WAIT_FOR_SOCKET'/2, %% 等待socket 'WAIT_FOR_DATA'/2 %% 等待socket数据 ]). -record(state, { socket, % client socket addr % client address }). -define(TIMEOUT, 120000). %%%------------------------------------------------------------------------ %%% API %%%------------------------------------------------------------------------ %%------------------------------------------------------------------------- %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error} %% @doc To be called by the supervisor in order to start the server. %% If init/1 fails with Reason, the function returns {error,Reason}. %% If init/1 returns {stop,Reason} or ignore, the process is %% terminated and the function returns {error,Reason} or ignore, %% respectively. %% @end %%------------------------------------------------------------------------- start_link() -> gen_fsm:start_link(?MODULE, [], []). set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) -> gen_fsm:send_event(Pid, {socket_ready, Socket}). %%%------------------------------------------------------------------------ %%% Callback functions from gen_server %%%------------------------------------------------------------------------ %%------------------------------------------------------------------------- %% Func: init/1 %% Returns: {ok, StateName, StateData} | %% {ok, StateName, StateData, Timeout} | %% ignore | %% {stop, StopReason} %% @private %%------------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), %% 状态机启动之后的初始化状态 {ok, 'WAIT_FOR_SOCKET', #state{}}. %%------------------------------------------------------------------------- %% Func: StateName/2 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- %% 创建客户端之后 set_socket 函数发送消息之后在这里被处理了 %% 大致逻辑是:收到通知,客户端连接socket到手,可以设置套接字选项并开始接收数据 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) -> % Now we own the socket inet:setopts(Socket, [{active, once}, {packet, 2}, binary]), {ok, {IP, _Port}} = inet:peername(Socket), %% 确定了socket之后,状态机的下一个状态就是等着接收数据了 {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT}; 'WAIT_FOR_SOCKET'(Other, State) -> error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]), %% Allow to receive async messages {next_state, 'WAIT_FOR_SOCKET', State}. %% 显示来自客户端的事件 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) -> ok = gen_tcp:send(S, Data), {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}; 'WAIT_FOR_DATA'(timeout, State) -> error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]), {stop, normal, State}; 'WAIT_FOR_DATA'(Data, State) -> io:format("~p Ignoring data: ~p\n", [self(), Data]), {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}. %%------------------------------------------------------------------------- %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- handle_event(Event, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}. %%------------------------------------------------------------------------- %% Func: handle_sync_event/4 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %% @private %%------------------------------------------------------------------------- handle_sync_event(Event, _From, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}. %%------------------------------------------------------------------------- %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) -> % Flow control: enable forwarding of next TCP message inet:setopts(Socket, [{active, once}]), ?MODULE:StateName({data, Bin}, StateData); handle_info({tcp_closed, Socket}, _StateName, #state{socket=Socket, addr=Addr} = StateData) -> error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]), {stop, normal, StateData}; handle_info(_Info, StateName, StateData) -> {noreply, StateName, StateData}. %%------------------------------------------------------------------------- %% Func: terminate/3 %% Purpose: Shutdown the fsm %% Returns: any %% @private %%------------------------------------------------------------------------- terminate(_Reason, _StateName, #state{socket=Socket}) -> (catch gen_tcp:close(Socket)), ok. %%------------------------------------------------------------------------- %% Func: code_change/4 %% Purpose: Convert process state when code is changed %% Returns: {ok, NewState, NewStateData} %% @private %%------------------------------------------------------------------------- code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}.
最后是app文件了:
%% tcp_server.app 文件 {application, tcp_server, [ {description, "Demo TCP server"}, {vsn, "1.0"}, {id, "tcp_server"}, {modules, [tcp_listener, tcp_echo_fsm]}, {registered, [tcp_server_sup, tcp_listener]}, {applications, [kernel, stdlib]}, %% %% mod: 指定应用启动初始化的模块 %% {mod, {tcp_server_app, []}}, {env, []} ] }.
以上基本上都是个人查找资料过程的笔记,有理解错误的地方请评论指出,谢谢!
相关文章推荐
- 使用OTP原理构建一个非阻塞的TCP服务器(转)
- 使用OTP原则构建一个非阻塞的TCP服务器
- 使用OTP原理构建一个非阻塞的TCP服务器
- 使用netty4.x 编写TCP服务器关于握手问题
- 使用libevent和boost编写一个简单的tcp服务器
- erlang OTP 通用服务器行为模式理解
- 计算机网络教程-应用层(七)客户-服务器模式、套接字、使用传输层的服务(tcp、udp)
- 使用libevent和boost编写一个简单的tcp服务器
- java中TCP与UDP协议的使用以及多线程服务器编写
- 基于Erlang OTP构建一个TCP服务器
- WCF学习问题之“net.tcp://localhost/service/”不支持正在使用的 .Net 组帧模式。有关详细信息,请参见服务器日志。
- WCF:“net.tcp://localhost:8001/MemberShips”不支持正在使用的 .Net 组帧模式。有关详细信息,请参见服务器日志。
- 使用libevent和boost编写一个简单的tcp服务器
- C#的TCP服务器编写
- 使用Java NIO编写高性能的服务器 文件下载(转)
- 使用Java NIO编写高性能的服务器
- 使用winrar来自动备份domino服务器数据
- 【转帖】使用 C++ 编写内核模式驱动程序的优点与缺点
- (已测试)在本地处理模式下将数据库数据源与 ReportViewer Web 服务器控件一起使用
- C#实现SMTP服务器,使用TCP命令发送Email