您的位置:首页 > 其它

WCF4.0 –- RESTful WCF Services (实例) (并发同步服务 SyncService)

2013-07-19 17:21 555 查看
最近写自动化测试时遇到一个问题: 我们在进行一个并发测试的过程中,需要所有客户端测试代码,在某个时机同步。回想所学到的,线程同步手段很多了,同一台PC上的进程间同步也可以通过Metux实现,多PC的时候怎么办。因为最近在学习REST WCF,自然想到它,用它来做个同步服务,即可以解决多线程,多进程,多PC同步,还可以支持跨语言,真是一举多得。(类似的解决方案还有PNUNIT,它是通过.Net
Remoting实现的,因为它还要写配置,还要起Lancher/Agent,有点烦)。

1. SyncService 的主要功能——Barrier(栏栅):

借用PNUNIT的概念Barrier,也就是异步过程中的同步点,进到Barrier里的所有对象都要等待其他对象进入。这些对象可以是不同的线程,进程(不同PC,不同语言实现的客户端),过程如下图:3个客户端启动之后,有快有慢,但是在Barrier处进行一次同步,先到的等待后到的。



举个实际例子: 假如我们要实现两个客户端通信的功能的测试,必须是两个客户端同时上线。那么我们可以在代码中设计一个barrier,让双方都确认上线之后,再进行通信测试。

(1) 准备Barrier

[c-sharp]
view plaincopyprint?

var init = SyncService.Init("Barrier_Logon",
"Client1", "Client2");

// 启动Client1
Process.Start("Client1.exe");

// 启动Client2
Process.Start("Client2.exe");

[c-sharp]
view plaincopyprint?

// client1登录
var client1 = Login("Client1");

// 同步,等待Client2登录

var enter = SyncService.Enter("Barrier_Logon",
"Client1");
// client1 和 client2 相互通信 ...

// client1登录
var client1 = Login("Client1");
// 同步,等待Client2登录
var enter = SyncService.Enter("Barrier_Logon", "Client1");
// client1 和 client2 相互通信 ...


(3) Client2 和 Client1 类似

[c-sharp]
view plaincopyprint?

// client2登录
var client2 = Login("Client2");

// 同步,等待Client1登录

var enter = SyncService.Enter("Barrier_Logon",
"Client2");
// client1 和 client2 相互通信 ...

[c-sharp]
view plaincopyprint?

// 设置消息给client2

var set = SyncService.SetMessage("Barrier",
"key", "hello client2");

// 进入Barrier, 等待client2

var enter = SyncService.Enter("Barrier",
"Client1");

// 设置消息给client2
var set = SyncService.SetMessage("Barrier", "key", "hello client2");
// 进入Barrier, 等待client2
var enter = SyncService.Enter("Barrier", "Client1");


Client2的代码:

[c-sharp]
view plaincopyprint?

// 进入Barrier,等待client1

var enter = SyncService.Enter("Barrier",
"Client2");
// 取得消息
var get = SyncService.GetMessage("Barrier",
"key");
// 确认获得消息,是"hello client2"

Assert.AreEqual(get, "hello client2");

[c-sharp]
view plaincopyprint?

using System;
using System.Collections.Generic;

using System.Linq;

using System.ServiceModel;
using System.ServiceModel.Activation;

using System.ServiceModel.Web;

using System.Text;

using System.Threading;
using System.Runtime.Serialization;

namespace SyncService

{
[ServiceContract]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]

public class SyncService

{
private static Dictionary<string, SyncGroup> _syncPool =
new Dictionary<string, SyncGroup>();

[WebGet(UriTemplate="Init/{barrier}/{targetnames}")]

public string Init(string barrier,
string targetnames)
{
var ctx = WebOperationContext.Current;

try
{
lock (_syncPool)

{
_syncPool[barrier] = new SyncGroup();

var syncGroup = _syncPool[barrier];

var targets = targetnames.Split(new[] {
',' }, StringSplitOptions.RemoveEmptyEntries);

Array.ForEach(targets, t => syncGroup.ResetEventDict.Add(t,
new ManualResetEvent(false)));

}
return
"ok";
}
catch (Exception ex)

{
return ex.Message;

}
}

[WebGet(UriTemplate = "Enter/{barrier}/{targetname}/{timeout=60000}")]

public string Enter(string barrier,
string targetname, string timeout)

{
var ctx = WebOperationContext.Current;
try

{
var syncObj = _syncPool[barrier];
var target = syncObj.ResetEventDict[targetname];

target.Set();
var intTimeout = int.Parse(timeout);

var success = WaitHandle.WaitAll(syncObj.ResetEventDict.Values.ToArray(), intTimeout);

if (success)

return
"ok";
else
return
"timeout";
}
catch (Exception ex)

{
return ex.Message;

}
}

[WebGet(UriTemplate = "SetMessage/{barrier}/{key}/{message=null}")]

public string SetMessage(string barrier,
string key, string message)

{
var ctx = WebOperationContext.Current;
try

{
var syncObj = _syncPool[barrier];
lock (syncObj)

{
var query = syncObj.Messages.FirstOrDefault(m => m.Key == key);

syncObj.Messages.Remove(query);

var messageInfo = new MessageInfo

{
BarrierName = barrier,
Key = key,
Message = message,
UpdateDateTime = DateTime.Now

};
syncObj.Messages.Add(messageInfo);

}
return
"ok";
}
catch (Exception ex)

{
return ex.Message;

}
}

[WebGet(UriTemplate = "GetMessage/{barrier}/{key}")]

public string GetMessage(string barrier,
string key)
{
var ctx = WebOperationContext.Current;
try

{
var syncObj = _syncPool[barrier];
var query = syncObj.Messages.FirstOrDefault(m => m.Key == key);

return query.Message;

}
catch (Exception ex)

{
return ex.Message;

}
}

[WebGet(UriTemplate = "ListMessages/{barrier=all}", ResponseFormat=WebMessageFormat.Xml)]

public List<MessageInfo> ListMessages(string barrier)

{
var ctx = WebOperationContext.Current;
try

{
var messages = new List<MessageInfo>();

if (barrier ==
"all")
_syncPool.Values.ToList().ForEach(t => messages.AddRange(t.Messages));

else
messages = _syncPool[barrier].Messages;

return messages;

}
catch
{
return null;

}
}

[WebGet(UriTemplate="Check", ResponseFormat=WebMessageFormat.Xml)]

public string Check()

{
return
"Welcome to the SyncService! " +
DateTime.Now.ToLongDateString() +
" " + DateTime.Now.ToLongTimeString();
}

}

[DataContract]
[KnownType(typeof(MessageInfo))]

public class SyncGroup

{
internal Dictionary<string, ManualResetEvent> ResetEventDict {
get; set; }

[DataMember]
public string Name {
get; set; }

[DataMember]
public List<MessageInfo> Messages {
get; set; }

[DataMember]
public Dictionary<string,
string> States { get;
set; }

public SyncGroup()

{
Messages = new List<MessageInfo>();

ResetEventDict = new Dictionary<string, ManualResetEvent>();

}
}

[DataContract]
public class MessageInfo

{
[DataMember]
public string BarrierName {
get; set; }

[DataMember]
public string Key {
get; set; }

[DataMember]
public string Message {
get; set; }

[DataMember]
public string Identity {
get; set; }

[DataMember]
public DateTime UpdateDateTime {
get; set; }

}
}

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.ServiceModel.Activation;
using System.ServiceModel.Web;
using System.Text;
using System.Threading;
using System.Runtime.Serialization;

namespace SyncService
{
[ServiceContract]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
public class SyncService
{
private static Dictionary<string, SyncGroup> _syncPool = new Dictionary<string, SyncGroup>();

[WebGet(UriTemplate="Init/{barrier}/{targetnames}")]
public string Init(string barrier, string targetnames)
{
var ctx = WebOperationContext.Current;
try
{
lock (_syncPool)
{
_syncPool[barrier] = new SyncGroup();
var syncGroup = _syncPool[barrier];
var targets = targetnames.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
Array.ForEach(targets, t => syncGroup.ResetEventDict.Add(t, new ManualResetEvent(false)));
}
return "ok";
}
catch (Exception ex)
{
return ex.Message;
}
}

[WebGet(UriTemplate = "Enter/{barrier}/{targetname}/{timeout=60000}")]
public string Enter(string barrier, string targetname, string timeout)
{
var ctx = WebOperationContext.Current;
try
{
var syncObj = _syncPool[barrier];
var target = syncObj.ResetEventDict[targetname];
target.Set();
var intTimeout = int.Parse(timeout);
var success = WaitHandle.WaitAll(syncObj.ResetEventDict.Values.ToArray(), intTimeout);
if (success)
return "ok";
else
return "timeout";
}
catch (Exception ex)
{
return ex.Message;
}
}

[WebGet(UriTemplate = "SetMessage/{barrier}/{key}/{message=null}")]
public string SetMessage(string barrier, string key, string message)
{
var ctx = WebOperationContext.Current;
try
{
var syncObj = _syncPool[barrier];
lock (syncObj)
{
var query = syncObj.Messages.FirstOrDefault(m => m.Key == key);
syncObj.Messages.Remove(query);
var messageInfo = new MessageInfo
{
BarrierName = barrier,
Key = key,
Message = message,
UpdateDateTime = DateTime.Now
};
syncObj.Messages.Add(messageInfo);
}
return "ok";
}
catch (Exception ex)
{
return ex.Message;
}
}

[WebGet(UriTemplate = "GetMessage/{barrier}/{key}")]
public string GetMessage(string barrier, string key)
{
var ctx = WebOperationContext.Current;
try
{
var syncObj = _syncPool[barrier];
var query = syncObj.Messages.FirstOrDefault(m => m.Key == key);
return query.Message;
}
catch (Exception ex)
{
return ex.Message;
}
}

[WebGet(UriTemplate = "ListMessages/{barrier=all}", ResponseFormat=WebMessageFormat.Xml)]
public List<MessageInfo> ListMessages(string barrier)
{
var ctx = WebOperationContext.Current;
try
{
var messages = new List<MessageInfo>();
if (barrier == "all")
_syncPool.Values.ToList().ForEach(t => messages.AddRange(t.Messages));
else
messages = _syncPool[barrier].Messages;
return messages;
}
catch
{
return null;
}
}

[WebGet(UriTemplate="Check", ResponseFormat=WebMessageFormat.Xml)]
public string Check()
{
return "Welcome to the SyncService! " +
DateTime.Now.ToLongDateString() + " " + DateTime.Now.ToLongTimeString();
}

}

[DataContract]
[KnownType(typeof(MessageInfo))]
public class SyncGroup
{
internal Dictionary<string, ManualResetEvent> ResetEventDict { get; set; }
[DataMember]
public string Name { get; set; }
[DataMember]
public List<MessageInfo> Messages { get; set; }
[DataMember]
public Dictionary<string, string> States { get; set; }

public SyncGroup()
{
Messages = new List<MessageInfo>();
ResetEventDict = new Dictionary<string, ManualResetEvent>();
}
}

[DataContract]
public class MessageInfo
{
[DataMember]
public string BarrierName { get; set; }
[DataMember]
public string Key { get; set; }
[DataMember]
public string Message { get; set; }
[DataMember]
public string Identity { get; set; }
[DataMember]
public DateTime UpdateDateTime { get; set; }
}
}


默认使用JSON格式,另外为了查看当前的同步的状况和消息,可以通过 ListStates/ListMessages 查看。

(1) 初始化Barrier则发送: http://server/SyncService/Init/MyBarrier/Client1,Client2
(2) 客户端进入Barrier则发送: http://server/SyncService/Enter/MyBarrier/Client1/10000 (最后是timeout设定)

(3) 设置消息则发送: http://server/SyncService/SetMessage/MyBarrier/Key/MessageContent
(4) 取得消息则发送: http://server/SyncService/GetMessage/MyBarrier/Key
(5) 查看所有的“锁”则发送:http://server/SyncService/ListStates(或者指定某个Barrier: /MyBarrier)

(6) 查看所有的消息则发送:http://server/SyncService/ListMessages(或者指定某个Barrier:
/MyBarrier)

(7)
清空所有SyncGroup
则发送:http://server/SyncService/Restart

是的,全部的操作全部是 HttpRequest 的"GET", 因此各种客户端都可以轻松调用,很方便。 (用WCF创建这样一个服务也非常简单全部代码一百多行,正所谓天下武功无快不破:)

【REST WCF系列】

RESTful WCF Services (1) (入门)

RESTful WCF Services (2) (实现增,删,改,查)

RESTful WCF Services (3) (Raw Stream)

RESTful WCF Services (4) (Basic Security)

RESTful WCF Services (实例) (并发同步服务 SyncService)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: