您的位置:首页 > 其它

大量数据转录的多线程和同步处理实现

2008-11-15 04:50 99 查看
项目中需要对两个不同格式的存储设备进行数据转录,因为数据量非常大,所以时间非常缓慢;解决方案是使用ReaderWriterSlim类建立一个共享的同步数据,可以支持一个线程读取外部设备,向同步数据写入;多个线程从同步数据中读取,转换格式,然后写入到本地设备。

本例中采用Queue<T>作为存放数据的集合,写入线程向它的尾部写入对象,读取线程从它的头部获取对象。

需要注意的是,由于Queue会抛弃已处理的对象,所以在同步数据队列中无法验证数据对象的唯一性,被写入的数据库需要去掉唯一约束,或在写入时向数据库请求验证。

首先定义一个读写接口:

namespace Common

{

public interface IReaderWriter<T>

{

T Read(int argument);

void Write(int arugment, T instance);

void Delete(int argument);

void Clear();

}

}

然后实现一个队列的读写器:

namespace Common

{

public class QueueReaderWriter<T> : IReaderWriter<T>

{

private Queue<T> queues;

public QueueReaderWriter()

{

queues = new Queue<T>();

}

public QueueReaderWriter(int capacity)

{

queues = new Queue<T>(capacity);

}

#region IReadWrite<T> 成员

public T Read(int argument)

{

return queues.FirstOrDefault();

}

public void Write(int arugment, T instance)

{

queues.Enqueue(instance);

}

public void Delete(int argument)

{

queues.Dequeue();

}

public void Clear()

{

queues.Clear();

queues.TrimExcess();

}

#endregion

}

}

使用ReaderWriterLockSlim实现同步数据类:

namespace Common

{

public class SynchronizedWriteData<T> : IDisposable

{

private ReaderWriterLockSlim _dataLock = new ReaderWriterLockSlim();

private IReaderWriter<T> _innerData;

private SynchronizedWriteData()

{ }

public SynchronizedWriteData(IReaderWriter<T> innerData)

{

_innerData = innerData;

}

public T Read()

{

_dataLock.EnterReadLock();

try

{

return _innerData.Read(0);

}

finally

{

_dataLock.ExitReadLock();

}

}

public T Read(int argument)

{

_dataLock.EnterReadLock();

try

{

return _innerData.Read(argument);

}

finally

{

_dataLock.ExitReadLock();

}

}

public void Add(T instance)

{

_dataLock.EnterWriteLock();

try

{

_innerData.Write(0, instance);

}

finally

{

_dataLock.ExitWriteLock();

}

}

public void Add(int argument, T instance)

{

_dataLock.EnterWriteLock();

try

{

_innerData.Write(argument, instance);

}

finally

{

_dataLock.ExitWriteLock();

}

}

public bool AddWithTimeout(T instance, int timeout)

{

if (_dataLock.TryEnterWriteLock(timeout))

{

try

{

_innerData.Write(0, instance);

}

finally

{

_dataLock.ExitWriteLock();

}

return true;

}

else

{

return false;

}

}

public bool AddWithTimeout(int argument, T instance, int timeout)

{

if (_dataLock.TryEnterWriteLock(timeout))

{

try

{

_innerData.Write(argument, instance);

}

finally

{

_dataLock.ExitWriteLock();

}

return true;

}

else

{

return false;

}

}

public void Delete()

{

_dataLock.EnterWriteLock();

try

{

_innerData.Delete(0);

}

finally

{

_dataLock.ExitWriteLock();

}

}

public void Delete(int argument)

{

_dataLock.EnterWriteLock();

try

{

_innerData.Delete(argument);

}

finally

{

_dataLock.ExitWriteLock();

}

}

#region IDisposable 成员

public void Dispose()

{

try

{

_dataLock.EnterWriteLock();

{

try

{

_innerData.Clear();

}

finally

{

_dataLock.ExitWriteLock();

}

}

}

finally

{

_dataLock.Dispose();

}

}

#endregion

}

}

namespace ExternalDataHandle

{

/// <summary>

/// 从外部数据源获取到内部数据源的适配器抽象类

/// </summary>

/// <typeparam name="T">T 数据对象类型</typeparam>

public abstract class ExternalDataAdapter<T> : IDisposable

{

/// <summary>

/// 外部数据源连接字符串

/// </summary>

protected abstract string ConnectString { get; }

/// <summary>

/// 提供初始化数据适配器的方法

/// </summary>

protected abstract void Initialize();

/// <summary>

/// 提供数据传递的方法

/// </summary>

public abstract void Transmit();

/// <summary>

/// 提供从外部数据设备读取数据的方法

/// </summary>

protected abstract void ReadFromExternalDevice();

/// <summary>

/// 提供保存数据到内部设备的方法

/// </summary>

protected abstract void SaveToInternalDevice();

#region IDisposable 成员

public abstract void Dispose();

#endregion

}

}

多线程数据转录类,本例只使用了一个读取线程:

namespace ExternalDataHandle

{

/// <summary>

/// 提供多线程方式从外部数据源获取到内部数据源的适配器类

/// </summary>

/// <typeparam name="T"></typeparam>

public abstract class MultiThreadAdapter<T> : ExternalDataAdapter<T>

{

protected SynchronizedWriteData<T> _data;

protected Thread _readThread;

protected abstract override string ConnectString { get; }

protected abstract override void Initialize();

public sealed override void Transmit()

{

_readThread = new Thread(new ThreadStart(ReadFromExternalDevice));

_readThread.Start();

Thread.Sleep(10000);

while (_readThread.IsAlive)

{

SaveToInternalDevice();

}

_readThread.Join();

}

protected abstract override void ReadFromExternalDevice();

protected abstract override void SaveToInternalDevice();

public override void Dispose()

{

if (_data != null)

{

_data.Dispose();

}

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: