您的位置:首页 > 编程语言 > C语言/C++

使用STL流(stream)来简化C++“线程安全”日志记录

2008-01-24 10:44 351 查看
使用STL(stream)来简化C++“线程安全日志记录

作者: winter

流是一种强大的数据处理抽象机制,它允许你调用泛型的读/写函数,不必关心数据从什么地方来、到什么地方去。使用流,同样的代码可从控制台、文件、套接字等地方读取数据。STL通常不是线程安全的,本文提出了如何在线程安全的方式下使用流的方案。

C++将流的强大能力与运算符重载合并到一起,为我们提供了>>和<<运算符,以便从流中读取,以及向流中写入,如清单A所示。

向流中写入通常不是“线程安全”(thread-safe)的。事实上,即便是fread和fwrite这样的基元函数,也不要求是线程安全的。不过,使用正确实现的标准模板库(STL)流,不仅能继续发挥流简单易用的特点,还能保证代码的线程安全。另外,你以前使用了<<运算符的代码仍可继续使用,所有线程安全问题都会在幕后解决。

STL流为什么默认不是线程安全的 

C++标准没有规定向流中写入是否线程安全,但它通常都不是。这主要是考虑到效率问题。如果向流中的写入是线程安全的,就要求所有public(以及一些protected)函数也是线程安全的。根据STL文档,basic_ostream<>类(所有输出类的基类)有大量public函数。例如以下代码:

for (int idx = 0; idx < 1000000; ++idx) std::cout << idx << ' ';

假如向std::cout的写入是线程安全的,就需要至少200万个锁,这无疑会造成巨大的性能瓶颈。

另外,假如流使用了流缓冲(stream buffers),则缓冲也必须是线程安全的。对一个STL流类进行扩展,还会造成更大的线程安全性能问题。最后,即使STL流是线程安全的,如清单B所示的代码也不能担保生成你所希望的结果。

从表面看,清单B应该在控制台上打印以下结果:
message from thread 1
message from thread 2

但它实际输出的是:
message from message from thread 2
thread 1

这是因为在第一个线程成功打印message和from之后,第二个线程将接管控制权,打印它的完整的消息。之后,第一个线程继续打印它尚未完成打印的内容。这个简单的例子演示了假如代码不是线程安全的,可能有什么后果。

记录要比写入容易

流允许定位,这类似于文件访问。你可直接访问一个流的中部(如果流允许的话),并从那里开始写入。然而,记录信息时,采用的总是“追加”(append)方式。这时不需要任何定位——肯定会从尾部开始。为了简化问题,我们的日志记录类根本不允许定位。 

刷新和std::endl
我们知道,要用<<运算符向一个流中写入,比如:
std::cout << val1 << val2 << …等等 

在内部,你写入的一切其实都保存在一个流缓冲中。只有当程序员调用flush()成员函数时,或者因缓冲满而自动刷新时,才会实际地写入目的地。

但是,完全可以采用一种简单的方式来要求一个流进行刷新,这就是向它写入std::endl。这等价于向流中写入/n并刷新它。

为简化问题和便于理解,我们约定:当一个流刷新时,当前消息会终止,并开始一条新的消息,如以下代码所示:

// 写100条消息
for ( int idx = 0; idx < 100; ++idx)
    get_log() << "message " << idx << " - just for testing" << std::endl;

为了保证“写入消息”过个操作是线程安全的,需要按以下5个步骤操作:

1.      创建一个底层的流U,你希望以线程安全的方式来访问它。 

2.      创建一个流S,它含有对U的一个引用。 

3.      确定每个线程都有它自己的流S。 

4.      向流S写入时,将数据暂存在一个缓冲的内部,直至刷新(flush)。 

5.      流S被刷新之后(当前消息终止,开始一条新的消息),应该以一种线程安全的方式,将它的缓冲写入底层的流U。然后,流S应该刷新它的缓冲。 

以下代码演示了上述步骤:
thread_safe_log log = safe_cout(); // 返回std::cout的线程安全版本
for (int idx = 0; idx < 1000000; ++idx) log << idx << ' ';
log << std::endl;

对刷新过程进行监视并不像表面上那么容易。你必须覆盖流缓冲的sync()函数。清单C展示了basic_message_handler_log类,它负责处理所有这些细节。要记住,你可创建自己的日志类,并覆盖on_new_message函数;该函数会在一条新消息向它写入时进行调用。

临时变量的技巧

这里稍微总结一下:每个线程都应该有它自己的流S,S容纳了对U的一个引用(要求对U进行线程安全的访问)。为了使每个线程都有它自己的流S,最简单的办法是使用临时变量。临时变量天生便是线程安全的,因为只有创建变量的线程才有权访问它。这里采用的一个技巧是,我们使用一个名为get_log的函数,它内部含有一个静态变量(底层的流U),返回的则是一个临时变量(流S),如清单D所示。

除了要比其他大多数备选方案都简单之外,清单D展示的技术也是最有效的。只有在流刷新时,才会发生锁定。

方案1

thread_safe_log类派生自如清单C所示的basic_message_handler_log,并覆盖了on_new_message,它可采取“线程安全”的方式向底层日志写入。现在的问题是,应该在哪里容纳critical_section对象呢?要将thread_safe_log分离出来,需要创建另一个名为internal_thread_safe_log的类,它专门负责线程处理问题。

下面是一个get_log函数的基本形式:
thread_safe_log get_log()
{
    static underlying_stream_type U( args);
    static internal_thread_safe_log log( U);
    return log;
}

请记住internal_thread_safe_log和thread_safe_log类;后文还会经常提到它们。 

清单E演示了上述技术。注意,清单E的代码要使用由清单F提供的CriticalSection.h文件,否则无法正常工作。

你应该注意以下要点:

无论internal_thread_safe_log还是thread_safe_log,都是它们的basic_*版本的typedef。这遵循了目前通行的编程经验法则:因为std::ostream是std::basic_ostream<char>的一个typedef,而std::streambuf是std::basic_streambuf<char>的一个typedef,以此类推。 

在对get_log()的每个访问之后,都跟有一个.ts()。这是必需的,因为某些流操作要求流是一个左值(lvalue)。简单地说,左值是可以放在operator=左侧的一个操作数。临时变量是右值,所以必须转变成左值。 

临时变量流S在构造时,通过copy_state_to从流U获得它的状态;并在析构时,将那个状态拷贝回U。可将流的状态想象成一些特殊信息,它们规定了特定数据(比如填充字符、locale和其他格式化信息等等)应该如何写入。 

如果忽视这些问题,可能造成严重后果。假定U已被设为German locale(所以5.235要写成5,235)。如果S在打印数字时使用默认locale,对输出的解释就是完全错误的。

清单E也包括一个测试(要运行它,你需要清单C和清单F)。这个测试使用out.txt作为底层流U。它创建200个并发线程,每个线程都写500条消息,消息的形式是:

"writing double 5.23"(第一种类型的消息) 

"message <idx> from thread <thread>"(第二种类型的消息) 

索引10的线程写第11条消息时,U的locale更改为German。后续所有消息都会变成"writing double 5,23",而不是"writing double 5.23"。

方案2

对于清单E的方案来说,它的问题在于,假如100个线程同时试图写入,在任何给定的时间,只有一个会成功,其他线程必须等着获得锁。取决于底层的流,执行m_underlyingLog << str;可能会花很长的时间,这会使其他所有线程都处于停顿状态。这很快就会成为一个严重的瓶颈。 在internal_thread_safe_log::write_message函数中,消息追加到一个队列上。一个专用线程不停地从这个队列中读取,并将消息写到底层的流中。internal_thread_safe_log::write_message函数现在只负责在队列中添加一个指针,这几乎不花任何时间,因此避免了瓶颈。清单G展示了更新过的方案。

下面列出了两个版本的区别: 

internal_thread_safe_log有一个writer对象。 

每个writer对象(thread_safe_log_writer)都有自己的专用线程,后者负责将消息写入底层的流U。 

每个writer对象都需要使用一个线程管理器(threading manager)来创建自己的线程。 

线程处理管理器的角色是将独立于平台的线程处理问题(比如新线程的创建)抽离出来。我们要到后面才会讲解线程管理器,目前,你只需知道现在使用的线程管理器是win32_thread_manager。最终的方案还将支持其他线程管理器。

方案3

取决于应用程序要使用的日志和/或线程数目,方案2也许并不恰当。多个日志可能共享同一个线程,由这个线程分别向每个日志中写入。方案3实现了这一功能,它具有以下关键特性:

一个writer线程 

多个日志,每个日志都向writer线程注册。 

在writer线程中,每个日志都有一个指定的优先级(日志的构造函数新增了一个日志优先级参数)。根据这个优先级,writer线程可能优先在某些日志中写入。 

例如,假定有3个日志:log1的优先级是6,log2的优先级是3,log3的优先级是1(它们的优先级累加起来是10 = 6 + 3 + 1)。对于writer线程,它每进行10次写入,有6次会写入log1,有3次会写入log2,有1次会写入log3。假如试图向一个日志写入,但日志的队列中没有消息,这一次写入就会被忽略。

清单H对测试进行了修改,包括了以下特性:

新测试包括10个日志:out0.txt,out1.txt,…,out9.txt。 

对于索引为<idx>的日志来说,它的优先级公式是10 * (idx + 1)^2。例如,日志out3.txt具有优先级10 * 4 * 4 = 160。 

有20个线程向out4.txt写入。 

运行清单H的代码时,注意由于索引编号较大的日志优先级较高,所以填充速度会比其他日志快一些。

方案4

方案4允许你任意选择方案2和方案3,具体由应用程序的需求来决定。为了从一种方案切换到另一种方案,需要更改get_log()函数中的一、两行代码。对于这个支持多种格式的方案来说,它必须具有以下特性: 

要有两个internal_thread_safe_log类:internal_thread_safe_log_ownthread,它与单独一个专用的日志相匹配;以及internal_thread_safe_log_sharethread,它由多个日志共享。 

internal_thread_safe_log_sharethread需要一个对应的thread_safe_log_writer_sharethread,它是日志需要共享的线程。 

清单I证明了internal_thread_safe_log_ownthread和internal_thread_safe_log_sharethread的相互切换有多么容易。清单J则给出了方案4的完整实现。

清单J包括以下增补特性:

internal_thread_safe_log类现在接受一个额外的模板参数,即thread_manager,它将独立于平台的线程处理问题抽离出来,后文还会具体解释。 

清单J不像清单F那样还需要CriticalSection.h;因为CriticalSection.h封装在thread_manager中。 

每次刷新流,都会开始一条新消息,这意味着最后一条消息永远不会刷新。所以,必须对最后一条消息进行特殊处理。我们修改了message_handler_log.h(如清单K所示),允许处理最后一条消息。 

忘记刷新也许是一个错误,如清单L所示。所以,我们强制在每个临时变量析构之前都进行刷新,如清单M所示。做到这一点很容易,请仔细研究thread_safe_log::on_last_message。

在thread_safe_log的析构函数中,我们防止处理无效的引用。由于要处理临时变量,所以必须解决一系列特殊问题:怎样在它析构之后使用它([temp-destructed])。

测试也作了一些修改:

有10个日志(日志0-9)来自上一次测试(它们共享一个线程)。 

还有10个日志(日志10-19)分别有它自己的线程。 

有10个线程向一个指定的日志写入(线程4,24,44,…,184向第4个日志写入)。 

最后讨论一下线程管理器。线程管理器是一个特殊的类,它规定如何解决一些必要的线程处理问题。它必须提供:

thread_obj_base该类应在创建一个线程时作为参数传递;它的重载的operator()要针对其他线程而执行。 

sleep( nMillisecs)当前线程将休眠nMillisecs毫秒。 

create_thread( thread_obj_base & obj)它创建一个线程,并为其执行obj.operator()。 

critical_sectionauto_lock_unlock classes两者的行为类似于CCriticalSection和CAutoLockUnlock(参见清单F)。 

我提供了两个线程管理器:

win32_thread_manager这是Win32应用程序的线程管理器。 

boost_thread_manager这是在你使用boost线程([boost])时的线程管理器。 

在你的代码中,可以加#define USE_WIN32_THREAD_MANAGER语句,指定第一个管理器是默认管理器;或者添#define USE_BOOST_THREAD_MANAGER语句,将第二个管理器作为默认管理器。除此之外,还可设计自己的线程管理器,然后添加#define DEFAULT_THREAD_MANAGER your_threading_manager_class语句。

运行清单J(记住,它需要由清单K提供的最新的message_handler_log.h)。注意清单J包含数量相当多的类。这正是要把它们分解成多个文件的原因。

把握重点

使用thread_safe_log和internal_thread_safe_log_*类,你可采用自己最熟悉的方式来记录日志,这样做既高效,又能保证线程安全。从此以后,使用了STL流的代码、辅助函数和类都能以线程安全的方式使用。对现有的应用程序进行重构也变得更容易。线程安全虽然是一个难以掌握的主题,但在本文的帮助下,再加上你的少许努力,就能透彻理解它,并真正体验到它的巨大好处。

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息