您的位置:首页 > 其它

微软同步框架入门之八--使用WCF同步远程元数据

2009-01-04 17:03 766 查看
在该框架中,提供了用于存取维护元数据的一些相应工具类,即Metadata Storage Service。它可帮助存储提供程序的同步元数据,该提供程序表示无法通过其他方式存储元数据的副本。Metadata Storage Service 使用占用较小内存和磁盘空间的轻型数据库,可以随提供程序重新分发,并且安全可靠。
其所提供的API 将元数据存储区与用于访问元数据存储区的接口和方法明确区分开,这样即可实现备用存储区,且对提供程序稍作变更即可使用备用存储区。
当然上面是SDK中的一些说法:)
Metadata Storage Service API中有一些类要着重说明一下:
SqlMetadataStore 类表示通过使用轻型数据库存储元数据来实现的元数据存储区。Metadata Storage Service 提供了 Metadata Storage Service API 的一个完整的实现,该实现使用轻型数据库在文件系统指定位置的一个数据库文件中存储同步所需的所有元数据。此实现的入口点是 SqlMetadataStore。

ReplicaMetadata 类提供对元数据存储区中的副本元数据和项元数据的访问。ReplicaMetadata 还对删除检测和用于实现同步提供程序方法的帮助器提供服务。

这两个类是今天DEMO的主角,里面的大部分方面都在源码中的MySimpleSyncProvider.cs加以实现。

今天的这个例子,主要是演示了如何使用wcf来同步远程的元数据。当然这个例子是MSF小组公布的一些DEMO中的一个。通过它我们就可以大概了解一下如何使用etadata Storage Service进行维护操作同步元数据以及使用WCF来同步这些修改变更的原数据信息。
首先要说明的是,当前的同步方式是MSF中所提到的四种参与者类型中的代理参与者,相关内容参见链接:)



其中的代理程序参见如下:




Code
public class RemoteProviderProxy : KnowledgeSyncProvider
{
private SyncIdFormatGroup idFormats;
private Sync101WebServiceClient client;
private SyncSessionContext syncSessionContext;
private string folderPath;
private string storeName;
private string endpointConfigurationName;

public RemoteProviderProxy(
string folderPath,
string storeName,
string endpointConfigurationName)
{
this.folderPath = folderPath;
this.storeName = storeName;
this.endpointConfigurationName = endpointConfigurationName;

// Create a client
this.client = new Sync101WebServiceClient(
endpointConfigurationName);

this.client.CreateProviderForSyncSession(folderPath, this.storeName);
this.idFormats = this.client.GetIdFormats();
}

public override SyncIdFormatGroup IdFormats
{
get
{
return this.idFormats;
}
}

public override void BeginSession(
SyncProviderPosition position,
SyncSessionContext syncSessionContext)
{
if (this.client == null)
{
// Allow for the same proxy to be use in several unidirectional session
this.client = new Sync101WebServiceClient(
endpointConfigurationName);

this.client.CreateProviderForSyncSession(folderPath, this.storeName);
}

this.syncSessionContext = syncSessionContext;
this.client.BeginSession();
}

public override void EndSession(
SyncSessionContext syncSessionContext)
{
this.syncSessionContext = null;
this.client.EndSession();
this.client = null;
}

public override ChangeBatch GetChangeBatch(
uint batchSize,
SyncKnowledge destinationKnowledge,
out object changeDataRetriever)
{
CachedChangeDataRetriever cachedChangeDataRetriever;

ChangeBatch changeBatch = this.client.GetChangeBatch(
batchSize,
destinationKnowledge,
out cachedChangeDataRetriever);

changeDataRetriever = cachedChangeDataRetriever;

return changeBatch;
}

public override FullEnumerationChangeBatch GetFullEnumerationChangeBatch(
uint batchSize,
SyncId lowerEnumerationBound,
SyncKnowledge knowledgeForDataRetri.,
out object changeDataRetriever)
{
CachedChangeDataRetriever cachedChangeDataRetriever;

FullEnumerationChangeBatch fullEnumerationChangeBatch = this.client.GetFullEnumerationChangeBatch(
batchSize,
lowerEnumerationBound,
knowledgeForDataRetri.,
out cachedChangeDataRetriever);

changeDataRetriever = cachedChangeDataRetriever;

return fullEnumerationChangeBatch;
}

public override void GetSyncBatchParameters(
out uint batchSize,
out SyncKnowledge knowledge)
{
this.client.GetSyncBatchParameters(
out batchSize,
out knowledge);
}

public override void ProcessChangeBatch(
ConflictResolutionPolicy resolutionPolicy,
ChangeBatch sourceChanges,
object changeDataRetriever,
SyncCallbacks syncCallback,
SyncSessionStatistics sessionStatistics)
{
CachedChangeDataRetriever cachedChangeDataRetriever = new CachedChangeDataRetriever(
changeDataRetriever as IChangeDataRetriever,
sourceChanges);

byte[] newChangeApplierInfo = this.client.ProcessChangeBatch(
resolutionPolicy,
sourceChanges,
cachedChangeDataRetriever,
this.syncSessionContext.ChangeApplierInfo);

this.syncSessionContext.ChangeApplierInfo = newChangeApplierInfo;
}

public override void ProcessFullEnumerationChangeBatch(
ConflictResolutionPolicy resolutionPolicy,
FullEnumerationChangeBatch sourceChanges,
object changeDataRetriever,
SyncCallbacks syncCallback,
SyncSessionStatistics sessionStatistics)
{
CachedChangeDataRetriever cachedChangeDataRetriever = new CachedChangeDataRetriever(
changeDataRetriever as IChangeDataRetriever,
sourceChanges);

byte[] newChangeApplierInfo = this.client.ProcessFullEnumerationChangeBatch(
resolutionPolicy,
sourceChanges,
cachedChangeDataRetriever,
this.syncSessionContext.ChangeApplierInfo);

this.syncSessionContext.ChangeApplierInfo = newChangeApplierInfo;
}

#region For demo purpose, not required for RCA pattern
public void CleanupTombstones(TimeSpan timespan)
{
this.client.CleanupTombstones(timespan);
}
#endregion
}

当然上面RemoteProviderProxy中的一个重要属性:Sync101WebServiceClient是WCF客户端的实现代码。
当进行同步SyncProvider绑定时,会将RemoteProviderProxy绑定到RemoteProvider属性上,当然通过上面代码,我们会发现,其实最终还是要通过Sync101WebServiceClient实例了进行远程元数据同步操作。下面是WCF接口代码(ISync101WebService.cs):




Code
[ServiceContract(Name = "Sync101WebService",Namespace = "http://microsoft.synchronization/",
SessionMode = SessionMode.Required)]
[ServiceKnownType(typeof(SyncIdFormatGroup))]
[ServiceKnownType(typeof(SyncId))]
public interface ISync101WebService
{
[OperationContract(
IsInitiating = true,
IsTerminating = false)]
void CreateProviderForSyncSession(
string dataAndMetadataFolderPath,
string storeName);

[OperationContract(
IsInitiating = false,
IsTerminating = false)]
SyncIdFormatGroup GetIdFormats();

[OperationContract(
IsInitiating = false,
IsTerminating = false)]
void BeginSession();

[OperationContract(
IsInitiating = false,
IsTerminating = true)]
void EndSession();

[OperationContract(
IsInitiating = false,
IsTerminating = false)]
ChangeBatch GetChangeBatch(
uint batchSize,
SyncKnowledge destinationKnowledge,
out Sync101.CachedChangeDataRetriever changeDataRetriever);

[OperationContract(
IsInitiating = false,
IsTerminating = false)]
FullEnumerationChangeBatch GetFullEnumerationChangeBatch(
uint batchSize,
SyncId lowerEnumerationBound,
SyncKnowledge knowledgeForDataRetri.,
out Sync101.CachedChangeDataRetriever changeDataRetriever);

[OperationContract(
IsInitiating = false,
IsTerminating = false)]
void GetSyncBatchParameters(
out uint batchSize,
out SyncKnowledge knowledge);

[OperationContract(
IsInitiating = false,
IsTerminating = false)]
byte[] ProcessChangeBatch(
ConflictResolutionPolicy resolutionPolicy,
ChangeBatch sourceChanges,
Sync101.CachedChangeDataRetriever changeDataRetriever,
byte[] changeApplierInfo);

[OperationContract(
IsInitiating = false,
IsTerminating = false)]
byte[] ProcessFullEnumerationChangeBatch(
ConflictResolutionPolicy resolutionPolicy,
FullEnumerationChangeBatch sourceChanges,
Sync101.CachedChangeDataRetriever changeDataRetriever,
byte[] changeApplierInfo);

#region For demo purpose, not required for RCA pattern
[OperationContract(
IsInitiating = false,
IsTerminating = false)]
void CleanupTombstones(TimeSpan timespan);
#endregion
}

而最终的实现类Sync101WebService:




Code
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Single,InstanceContextMode = InstanceContextMode.PerSession,
IncludeExceptionDetailInFaults = true)]
public class Sync101WebService : ISync101WebService
{
MySyncProvider provider;

public Sync101WebService()
{
}

public SyncIdFormatGroup GetIdFormats()
{
return provider.IdFormats;
}

public void CreateProviderForSyncSession(string dataAndMetadataFolderPath, string name)
{
this.provider = new MySyncProvider(dataAndMetadataFolderPath, name);
}

public void BeginSession()
{
provider.BeginSession();
}

public void EndSession()
{
provider.EndSession();
}

public ChangeBatch GetChangeBatch(
uint batchSize,
SyncKnowledge destinationKnowledge,
out CachedChangeDataRetriever changeDataRetriever)
{
object dataRetriever;

ChangeBatch changeBatch = provider.GetChangeBatch(
batchSize,
destinationKnowledge,
out dataRetriever);

changeDataRetriever = new CachedChangeDataRetriever(
dataRetriever as IChangeDataRetriever,
changeBatch);

return changeBatch;
}

public FullEnumerationChangeBatch GetFullEnumerationChangeBatch(
uint batchSize,
SyncId lowerEnumerationBound,
SyncKnowledge knowledgeForDataRetri.,
out CachedChangeDataRetriever changeDataRetriever)
{
object dataRetriever;

FullEnumerationChangeBatch changeBatch = provider.GetFullEnumerationChangeBatch(
batchSize,
lowerEnumerationBound,
knowledgeForDataRetri.,
out dataRetriever);

changeDataRetriever = new CachedChangeDataRetriever(
dataRetriever as IChangeDataRetriever,
changeBatch);

return changeBatch;
}

public void GetSyncBatchParameters(
out uint batchSize,
out SyncKnowledge knowledge)
{
provider.GetSyncBatchParameters(
out batchSize,
out knowledge);
}

public byte[] ProcessChangeBatch(
ConflictResolutionPolicy resolutionPolicy,
ChangeBatch sourceChanges,
CachedChangeDataRetriever changeDataRetriever,
byte[] changeApplierInfo)
{
return provider.ProcessRemoteChangeBatch(
resolutionPolicy,
sourceChanges,
changeDataRetriever,
changeApplierInfo);
}

public byte[] ProcessFullEnumerationChangeBatch(
ConflictResolutionPolicy resolutionPolicy,
FullEnumerationChangeBatch sourceChanges,
CachedChangeDataRetriever changeDataRetriever,
byte[] changeApplierInfo)
{
return provider.ProcessRemoteFullEnumerationChangeBatch(
resolutionPolicy,
sourceChanges,
changeDataRetriever,
changeApplierInfo);
}

#region For demo purpose, not required for RCA pattern
public void CleanupTombstones(TimeSpan timespan)
{
provider.CleanupTombstones(timespan);
}
#endregion
}

到这里服务端的代码就全部列举出来了。当然本文中介绍的是元数据同步,而相应的SyncProvider实现是通过MySimpleSyncProvider.cs文件提供的,当然为了不让我的某些错误理解影响大家阅读源码,这里直接将相应的英文注释也放在这里。




Code
public class MySyncProvider : KnowledgeSyncProvider, IChangeDataRetriever, INotifyingChangeApplierTarget
{
// The name of the metadata store custom column that is used to save a timestamp of last change on an
// item in the metadata store so we can do change detection.
const string TIMESTAMP_COLUMNNAME = "timestamp";

// This is our sample in memory data store which for simplicty, stores sets of string name-ProcessChangeBatch pairs
// referenced by identifiers of the type 'Guid'
MySimpleDataStore _store;

// Use the Sync Framework's optional metadata store to track version information
SqlMetadataStore _metadataStore = null;//表示通过使用轻型数据库存储元数据来实现的元数据存储区。
ReplicaMetadata _metadata = null;//提供对元数据存储区中的副本元数据和项元数据的访问。ReplicaMetadata 还对删除检测和用于实现同步提供程序方法的帮助器提供服务
private string _name = null;
private string _folderPath = null;
private string _replicaMetadataFile = null;
private string _replicaIdFile = null;

// The provider's unique identifier


SyncId _replicaId = null;

SyncIdFormatGroup _idFormats = null;
SyncSessionContext _currentSessionContext = null;

// Construct a data store by providing a name for the endpoint (replica) and
// a file to which we'll persist the sync metadata (file)
public MySyncProvider(string folderPath, string name)
{
_name = name;
_folderPath = folderPath;

_replicaMetadataFile = _folderPath.ToString() + "\\" + _name.ToString() + ".Metadata";
_replicaIdFile = _folderPath.ToString() + "\\" + _name.ToString() + ".Replicaid";

// Set ItemIdFormat and ReplicaIdFormat for using Guid ids.
_idFormats = new SyncIdFormatGroup();
_idFormats.ItemIdFormat.IsVariableLength = false;
_idFormats.ItemIdFormat.Length = 16;
_idFormats.ReplicaIdFormat.IsVariableLength = false;
_idFormats.ReplicaIdFormat.Length = 16;
}

public SyncId ReplicaId
{
get
{
if (_replicaId == null)
{
_replicaId = GetReplicaIdFromFile( _replicaIdFile);
}

return _replicaId;
}
}

#region Metadata Store Related Methods
private void InitializeMetadataStore()
{
// Values for adding a custom field to the metadata store
List<FieldSchema> fields = new List<FieldSchema>();
SyncId id = ReplicaId;

// Create or open the metadata store, initializing it with the id formats we'll use to reference our items and endpoints
if (!File.Exists(_replicaMetadataFile))
{
fields.Add(new FieldSchema(TIMESTAMP_COLUMNNAME, typeof(System.UInt64)));
//创建一个具有指定名称和位置的元数据存储区文件,然后返回表示该文件的元数据存储区对象。
_metadataStore = SqlMetadataStore.CreateStore(_replicaMetadataFile);
//在元数据存储区创建和初始化副本的元数据,并返回一个用于访问该副本元数据的副本元数据对象。
_metadata = _metadataStore.InitializeReplicaMetadata(_idFormats,//提供程序的 ID 格式架构
_replicaId, //与此元数据相关联的副本 ID
fields, //每个元数据项的自定义元数据字段的架构信息集合。如果不存在自定义元数据字段,则可为 null 引用
null/*No indexes to create*/);//可用于更有效地查找元数据存储区中的项的索引架构列表。如果不存在自定义索引,则可以是 null 引用
}
else
{
_metadataStore = SqlMetadataStore.OpenStore(_replicaMetadataFile);//打开现有的元数据存储区文件,并返回表示该文件的元数据存储区对象
_metadata = _metadataStore.GetReplicaMetadata(_idFormats, _replicaId);//获取用于访问元数据存储区中的副本元数据的副本元数据对象。
}
}

private void CloseMetadataStore()
{
_metadataStore.Dispose();
_metadataStore = null;
}

//Update the metadata store with changes that have occured on the data store since the last time it was updated.
public void UpdateMetadataStoreWithLocalChanges()
{
SyncVersion newVersion = new SyncVersion(0, _metadata.GetNextTickCount());

_metadata.DeleteDetector.MarkAllItemsUnreported();
foreach (Guid id in _store.Ids)
{
ItemData data = _store.Get(id);
ItemMetadata item = null;

//Look up an item's metadata by its ID


item = _metadata.FindItemMetadataById(new SyncId(id));
if (null == item)
{
//New item, must have been created since that last time the metadata was updated.
//Create the item metadata required for sync (giving it a SyncID and a version, defined to be a DWORD and a ULONGLONG
//For creates, simply provide the relative replica ID (0) and the tick count for the provider (ever increasing)
item = _metadata.CreateItemMetadata(new SyncId(id), newVersion);
item.ChangeVersion = newVersion;
SaveItemMetadata(item, data.TimeStamp);
}
else
{
if (data.TimeStamp > item.GetUInt64Field(TIMESTAMP_COLUMNNAME)) // the item has changed since the last sync operation.
{
//Changed Item, this item has changed since the last time the metadata was updated.
//Assign a new version by simply stating "who" modified this item (0 = local/me) and "when" (tick count for the store)
item.ChangeVersion = newVersion;
SaveItemMetadata(item, data.TimeStamp);
}
else
{
//Unchanged item, nothing has changes so just mark it as live so that the metadata knows it has not been deleted.
_metadata.DeleteDetector.ReportLiveItemById(new SyncId(id));
}
}
}

//Now go back through the items that are no longer in the store and mark them as deleted in the metadata.
//This sets the item as a tombstone.
foreach (ItemMetadata item in _metadata.DeleteDetector.FindUnreportedItems())
{
item.MarkAsDeleted(newVersion);
SaveItemMetadata(item, 0); // set timestamp to 0 for tombstones
}
}

private void SaveItemMetadata(ItemMetadata item, ulong timeStamp)
{
item.SetCustomField(TIMESTAMP_COLUMNNAME, timeStamp);
SaveItemMetadata(item);
}

private void SaveItemMetadata(ItemMetadata item)
{
_metadata.SaveItemMetadata(item);
}

// Method for cleaning up tombstones older than a certain TimeSpan
public void CleanupTombstones(TimeSpan timespan)
{
InitializeMetadataStore();
_metadataStore.BeginTransaction();//对元数据存储区启动显式事务
_metadata.CleanupDeletedItems(timespan);
_metadataStore.CommitTransaction();//提交已对元数据存储区启动的显式事务
CloseMetadataStore();
}
#endregion Metadata Store Related Methods

#region KnowledgeSyncProvider Overrides
//BeginSession is called at the beginning of each sync operation. Do initialization here. For example update
//metadata if it was not updated as the actual data was changed.
public override void BeginSession(SyncProviderPosition position, SyncSessionContext syncSessionContext)
{
BeginSession();

_currentSessionContext = syncSessionContext;
}

//EndSession is called after the sync operation is completed. Cleanup happens here.
public override void EndSession(SyncSessionContext syncSessionContext)
{
EndSession();
}

//Simply ask the metadata store to compute my change batch for me, providing the batch size and the knowledge of the other endpoint!
//The engine is asking for the list of changes that the destination provider does not know about.
//SyncKnowledge: 表示副本所具有的有关自己项存储区的知识。
//此类型的所有公共静态(在 Visual Basic 中共享 )成员都是线程安全的。不保证任何实例成员的线程安全。
public override ChangeBatch GetChangeBatch(uint batchSize, SyncKnowledge destinationKnowledge, out object changeDataRetriever)
{
ChangeBatch batch = _metadata.GetChangeBatch(batchSize, destinationKnowledge);
changeDataRetriever = this; //this is where the transfer mechanism/protocol would go. For an in memory provider, this is sufficient
return batch;
}

//This is only called when the engine has detected that the destination is out of date due to Tombstone cleanup.
public override FullEnumerationChangeBatch GetFullEnumerationChangeBatch(uint batchSize, SyncId lowerEnumerationBound, SyncKnowledge knowledgeForDataRetri., out object changeDataRetriever)
{
FullEnumerationChangeBatch batch = _metadata.GetFullEnumerationChangeBatch(batchSize, lowerEnumerationBound, knowledgeForDataRetri.);
changeDataRetriever = this; //this is where the transfer mechanism/protocol would go. For an in memory provider, this is sufficient
return batch;
}

//指定batchSize为10,以及相应的知识
public override void GetSyncBatchParameters(out uint batchSize, out SyncKnowledge knowledge)
{
batchSize = 10;
knowledge = _metadata.GetKnowledge();
}

//应用修改
public override void ProcessChangeBatch(ConflictResolutionPolicy resolutionPolicy, ChangeBatch sourceChanges,
object changeDataRetriever, SyncCallbacks syncCallback, SyncSessionStatistics sessionStatistics)
{
_metadataStore.BeginTransaction();

//从原数据存储中获得取得本地所有修改
IEnumerable<ItemChange> localChanges = _metadata.GetLocalVersions(sourceChanges);

//Create a changeapplier object to make change application easier (make the engine call me
//when it needs data and when I should save data)
NotifyingChangeApplier changeApplier = new NotifyingChangeApplier(_idFormats);

changeApplier.ApplyChanges(resolutionPolicy, sourceChanges, changeDataRetriever as IChangeDataRetriever, localChanges, _metadata.GetKnowledge(),
_metadata.GetForgottenKnowledge(), this, _currentSessionContext, syncCallback);

_metadataStore.CommitTransaction();
}

//If full enumeration is needed because this provider is out of date due to tombstone cleanup, then this method will be called by the engine.
public override void ProcessFullEnumerationChangeBatch(ConflictResolutionPolicy resolutionPolicy, FullEnumerationChangeBatch sourceChanges, object changeDataRetriever, SyncCallbacks syncCallback, SyncSessionStatistics sessionStatistics)
{
_metadataStore.BeginTransaction();

//Get all my local change versions from the metadata store
IEnumerable<ItemChange> localChanges = _metadata.GetFullEnumerationLocalVersions(sourceChanges);

//Create a changeapplier object to make change application easier (make the engine call me
//when it needs data and when I should save data)
NotifyingChangeApplier changeApplier = new NotifyingChangeApplier(_idFormats);
changeApplier.ApplyFullEnumerationChanges(resolutionPolicy, sourceChanges, changeDataRetriever as IChangeDataRetriever, localChanges, _metadata.GetKnowledge(),
_metadata.GetForgottenKnowledge(), this, _currentSessionContext, syncCallback);

_metadataStore.CommitTransaction();
}
#endregion KnowledgeSyncProvider Overrides

#region INotifyingChangeApplierTarget Implementation
public IChangeDataRetriever GetDataRetriever()
{
return this;
}

//Simply up my tick count everytime I'm asked for it to ensure conflict resolution and detection work!
public ulong GetNextTickCount()
{
return _metadata.GetNextTickCount();
}

public void SaveChangeWithChangeUnits(ItemChange change, SaveChangeWithChangeUnitsContext context)
{
//Change units are not supported by this sample provider.
throw new NotImplementedException();
}

public void SaveConflict(ItemChange conflictingChange, object conflictingChangeData, SyncKnowledge conflictingChangeKnowledge)
{
//The resolution policy is set to DestinationWins so there is never a conflict to save.
throw new NotImplementedException();
}

//Save the item, taking the appropriate action for the 'change' and the data from the item (in 'context')
public void SaveItemChange(SaveChangeAction saveChangeAction, ItemChange change, SaveChangeContext context)
{
ulong timeStamp = 0;
ItemMetadata item = null;
ItemData data = null;
switch (saveChangeAction)
{
case SaveChangeAction.Create:
//Do duplicate detection here
item = _metadata.FindItemMetadataById(change.ItemId);
if (null != item)
{
throw new Exception("SaveItemChange must not have Create action for existing items.");
}
item = _metadata.CreateItemMetadata(change.ItemId, change.CreationVersion);
item.ChangeVersion = change.ChangeVersion;
data = new ItemData( (ItemData)context.ChangeData);
//We are using the same id for both the local and global item id.
_store.CreateItem( data, change.ItemId.GetGuidId());
SaveItemMetadata(item, _store.Get(change.ItemId.GetGuidId()).TimeStamp);
break;

case SaveChangeAction.UpdateVersionAndData:
case SaveChangeAction.UpdateVersionOnly:
item = _metadata.FindItemMetadataById(change.ItemId);
if (null == item)
{
throw new Exception("Item Not Found in Store!?");
}

item.ChangeVersion = change.ChangeVersion;
if (saveChangeAction == SaveChangeAction.UpdateVersionOnly)
{
SaveItemMetadata(item);
}
else //Also update the data and the timestamp.
{
data = new ItemData((ItemData)context.ChangeData);
timeStamp = _store.UpdateItem(item.GlobalId.GetGuidId(), data);
SaveItemMetadata(item, timeStamp);
}
break;

case SaveChangeAction.DeleteAndStoreTombstone:
item = _metadata.FindItemMetadataById(change.ItemId);
if (null == item)
{
item = _metadata.CreateItemMetadata(change.ItemId, change.CreationVersion);
}
//ChangeKind 表示对项所做的变更的类型。
// // Deleted 变更是一个删除操作。
// // UnknownItem 要变更的项对于副本来说是未知的。
// // Update 变更是一个更新操作。
if (change.ChangeKind == ChangeKind.Deleted)
{
item.MarkAsDeleted(change.ChangeVersion);
}
else
{
// This should never happen in Sync Framework V1.0
throw new Exception("Invalid ChangeType");
}

item.ChangeVersion = change.ChangeVersion;
SaveItemMetadata(item, 0); // set timestamp to 0 for tombstones
_store.DeleteItem(item.GlobalId.GetGuidId());
break;

//Merge the changes! (Take the data from the local item + the remote item),noting to update the tick count to propagate the resolution!
case SaveChangeAction.UpdateVersionAndMergeData:
item = _metadata.FindItemMetadataById(change.ItemId);

if (null == item)
{
throw new Exception("Item Not Found in Store!?");
}
if (item.IsDeleted != true)
{
//Note - you must update the change version to propagate the resolution!
// //SyncVersion 表示项或变更单位的版本。
// // ReplicaKey 获取与该版本相关联的副本键。
// // TickCount 获取与该版本相关联的滴答计数。
// // UnknownVersion 返回一个包含副本键并且滴答计数设置为 0 的 SyncVersion 对象。
item.ChangeVersion = new SyncVersion(0, _metadata.GetNextTickCount());

//Combine the conflicting data


ItemData mergedData = (_store.Get(item.GlobalId.GetGuidId())).Merge( (ItemData)context.ChangeData);
timeStamp = _store.UpdateItem(item.GlobalId.GetGuidId(), mergedData);
SaveItemMetadata(item, timeStamp);
}
break;

case SaveChangeAction.DeleteAndRemoveTombstone:
item = _metadata.FindItemMetadataById(change.ItemId);
if (item != null)
{
List<SyncId> ids = new List<SyncId>();
ids.Add(item.GlobalId);
_metadata.RemoveItemMetadata(ids);
}
_store.DeleteItem(change.ItemId.GetGuidId());
break;
}
}

//Simply store the knowlege given to us
// //ForgottenKnowledge: 表示由于清除逻辑删除而遗忘的知识。
//遗忘知识跟踪已清除的逻辑删除的最大版本。从项存储区中删除某项时,将保留该项的元数据,但是该项标记为已删除。已删除项的元数据称为逻辑删除。逻辑删除必须定期清除,否则最终将在项存储区中占用非常大的空间。从元数据中删除逻辑删除时,必须更新遗忘知识,以包含删除了的逻辑删除的版本。请注意,遗忘知识是对已删除其元数据的项数的过高估计。因此,遗忘知识可能还包含在元数据中仍然具有活动条目的项。

public void StoreKnowledgeForScope(SyncKnowledge knowledge, ForgottenKnowledge forgottenKnowledge)
{
_metadata.SetKnowledge(knowledge);
_metadata.SetForgottenKnowledge(forgottenKnowledge);

_metadata.SaveReplicaMetadata();
}

public bool TryGetDestinationVersion(ItemChange sourceChange, out ItemChange destinationVersion)
{
ItemMetadata metadata = _metadata.FindItemMetadataById(sourceChange.ItemId);

if (metadata == null)
{
destinationVersion = null;
return false;
}
else
{
destinationVersion = new ItemChange(_idFormats, _replicaId, sourceChange.ItemId,
metadata.IsDeleted ? ChangeKind.Deleted : ChangeKind.Update,
metadata.CreationVersion, metadata.ChangeVersion);
return true;
}
}
#endregion INotifyingChangeApplierTarget Implementation

#region IChangeDataRetriever Implementation
public override SyncIdFormatGroup IdFormats
{
get { return _idFormats; }
}

//This is where we load data to provide to the other endpoint


public object LoadChangeData(LoadChangeContext loadChangeContext)
{
//return a copy of the data.
return new ItemData(_store.Get(loadChangeContext.ItemChange.ItemId.GetGuidId()));
}

#endregion IChangeDataRetriever Implementation

#region ReplicaId Initialization Methods
private static SyncId GetReplicaIdFromFile(string replicaIdFile)
{
SyncId replicaId;

if (System.IO.File.Exists(replicaIdFile))
{
replicaId = ReadReplicaIdFromFile(replicaIdFile);
}
else
{
// Create the replica id and save it.
replicaId = new SyncId(Guid.NewGuid());
WriteReplicaIdToFile(replicaIdFile, replicaId);
}

return replicaId;
}

private static void WriteReplicaIdToFile(string file, SyncId replicaId)
{
FileStream fs = new FileStream(file, FileMode.Create);

// Construct a BinaryFormatter and use it to serialize the data to the stream.
BinaryFormatter formatter = new BinaryFormatter();
try
{
formatter.Serialize(fs, replicaId);
}
catch (SerializationException e)
{
Console.WriteLine("Failed to serialize replica id to file. Reason: " + e.Message);
throw;
}
finally
{
fs.Close();
}
}

private static SyncId ReadReplicaIdFromFile(string file)
{
FileStream fs = new FileStream(file, FileMode.Open);
SyncId replicaId;

// Construct a BinaryFormatter and use it to serialize the data to the stream.
BinaryFormatter formatter = new BinaryFormatter();
try
{
replicaId = (SyncId)formatter.Deserialize(fs);
}
catch (SerializationException e)
{
Console.WriteLine("Failed to deserialize replica id from file. Reason: " + e.Message);
throw;
}
finally
{
fs.Close();
}

return replicaId;
}
#endregion ReplicaId Initialization Methods

#region Methods necessary for Remote Change Application
public void BeginSession()
{
InitializeMetadataStore();
_store = MySimpleDataStore.ReadStoreFromFile(_folderPath, _name);

//Make sure the metadata store is updated to reflect the state of the data before each sync operation.
_metadataStore.BeginTransaction();
UpdateMetadataStoreWithLocalChanges();
_metadataStore.CommitTransaction();
}

public void EndSession()
{
_store.WriteStoreToFile(_folderPath, _name);
CloseMetadataStore();
}

//Change application!
public byte[] ProcessRemoteChangeBatch(
ConflictResolutionPolicy resolutionPolicy,
//ChangeBatch表示一组变更的元数据。
//同步提供程序使用变更批将项变更的元数据从源提供程序传输到目标提供程序。源提供程序枚举变更,并将指定数目的变更添加到变更批中。然后将变更批发送到目标提供程序。目标提供程序枚举变更批中的变更,并将这些变更应用到项存储区。
ChangeBatch sourceChanges,
CachedChangeDataRetriever changeDataRetriever,
byte[] changeApplierInfo)
{
_metadataStore.BeginTransaction();

//Get all my local change versions from the metadata store
IEnumerable<ItemChange> localChanges = _metadata.GetLocalVersions(sourceChanges);

////NotifyingChangeApplier 表示一个变更应用方,它检查源提供程序中的一组变更,检测与目标副本中的项的冲突,并根据需要调用已注册的变更应用方目标来保存变更或保存冲突。
NotifyingChangeApplier changeApplier = new NotifyingChangeApplier(_idFormats);

// The following step is required because we are remote change application
changeApplier.LoadChangeApplierInfo(changeApplierInfo);

////对变更批执行冲突检测、冲突处理和变更应用。
changeApplier.ApplyChanges(
resolutionPolicy,
sourceChanges,
changeDataRetriever,
localChanges,
_metadata.GetKnowledge(),
_metadata.GetForgottenKnowledge(),
this,
null, // Note that we do not pass a sync session context
new SyncCallbacks());

_metadataStore.CommitTransaction();

// Return the ChangeApplierInfo
return changeApplier.GetChangeApplierInfo();
}

//If full enumeration is needed because this provider is out of date due to tombstone cleanup, then this method will be called by the engine.
public byte[] ProcessRemoteFullEnumerationChangeBatch(
ConflictResolutionPolicy resolutionPolicy,
FullEnumerationChangeBatch sourceChanges,
CachedChangeDataRetriever changeDataRetriever,
byte[] changeApplierInfo)
{
_metadataStore.BeginTransaction();

//Get all my local change versions from the metadata store
IEnumerable<ItemChange> localChanges = _metadata.GetFullEnumerationLocalVersions(sourceChanges);

NotifyingChangeApplier changeApplier = new NotifyingChangeApplier(_idFormats);

// The following step is required because we are remote change application
changeApplier.LoadChangeApplierInfo(changeApplierInfo);

changeApplier.ApplyFullEnumerationChanges(
resolutionPolicy,
sourceChanges,
changeDataRetriever as IChangeDataRetriever,
localChanges,
_metadata.GetKnowledge(),
_metadata.GetForgottenKnowledge(),
this,
null, // Note that we do not pass a sync session context
new SyncCallbacks());

_metadataStore.CommitTransaction();

// Return the ChangeApplierInfo
return changeApplier.GetChangeApplierInfo();
}
#endregion

//Quick debug mechanism to show what the store "knows" and what it contains


public override string ToString()
{
StringBuilder buffer = new StringBuilder();
if (_metadataStore == null)
{
InitializeMetadataStore();
buffer.AppendFormat("{2}Provider {0} has the following for Knowledge: {1}{2}", _replicaId.GetGuidId(), _metadata.GetKnowledge().ToString(), Environment.NewLine);
CloseMetadataStore();
}
else
{
buffer.AppendFormat("{2}Provider {0} has the following for Knowledge: {1}{2}", _replicaId.GetGuidId(), _metadata.GetKnowledge().ToString(), Environment.NewLine);
}
return buffer.ToString();
}
}

最后,我们只要用下列代码来绑定相应的类并进行同步即可(参见MyTestProgram.cs文件):




Code
SyncOrchestrator agent = new SyncOrchestrator();
agent.Direction = SyncDirectionOrder.DownloadAndUpload;
//本地
agent.LocalProvider = new MySyncProvider(folderPathForDataAndMetadata, providerNameA);
//远程
agent.RemoteProvider = new RemoteProviderProxy(remotefolderPathForDataAndMetadata, name,
endpointConfigurationName);
stats = agent.Synchronize();

好了,今天的内容就到这里了,更多的内容参见这个链接:)

原文链接:http://www.cnblogs.com/daizhj/archive/2008/11/25/1340735.html
作者: daizhj, 代震军
Tags: 微软同步框架,metadata,元数据同步,wcf
网址: http://daizhj.cnblogs.com/
DEMO下载,请点击这里:)

最后发一些关于MSF的链接:

MSDN: http://forums.microsoft.com/sync/showforum.aspx?forumid=1225&siteid=75
http://blogs.msdn.com/sync
http://code.msdn.microsoft.com/sync

PDC2008大会的一些视频:
Sync Framework: Enterprise Data in the Cloud and . Devices
Microsoft Sync Framework Advances
SQL Server: Database to Data Platform - Road from Server to Devices to the Cloud

sync toy(微软开发的一个基于MSF的工具软件),下载链接:)
Sync guru: http://www.syncguru.com/default.aspx
Sync Framework 核心参与者类型 http://msdn.microsoft.com/zh-cn/library/bb902817.aspx
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息