您的位置:首页 > 运维架构 > Apache

Apache CMS学习笔记2 - Session

2009-12-09 09:20 302 查看
 

by viki 2009/12/08
 
自己根据需要实现生产者和消费者类
AdvisoryProducer advisoryProducer( session.get() );
AdvisoryConsumer advisoryConsumer( session.get() );
 
内部:
scoped_ptr<cms::MessageProducer> producer;
shared_ptr<cms::Topic> destination( session->createTopic(
        "HEART-BEAT-CHANNEL" ) );
producer.reset( session->createProducer( destination.get() ) );
 
scoped_ptr<cms::MessageConsumer> consumer;
shared_ptr<cms::Topic> destination( session->createTopic(
        "HEART-BEAT-CHANNEL" ) );
consumer.reset( session->createConsumer( destination.get() ) );
 
生产者内部的run利用Session来创建Message等。
shared_ptr<cms::TextMessage> message( session->createTextMessage( "XXX" ) );
producer->send( message.get() );
 
消费者内部的run或者onMessage接受Message。
 



 
 
会话确认模式
每次收到消息之后,需要确认,才算消费成功。
enum AcknowledgeMode
 {
            /**
             * With this acknowledgment mode, the session automatically
             * acknowledges a client's receipt of a message either when
             * the session has successfully returned from a call to receive
             * or when the message listener the session has called to
             * process the message successfully returns.
             */
//自动确认模式。当会话从接收函数成功返回,或者当会话调用的消息监听者成功地处理了消息并返回的时候,会话自动确认客户端接收到的消息。
            AUTO_ACKNOWLEDGE,
 
            /**
             * With this acknowledgment mode, the session automatically
             * acknowledges a client's receipt of a message either when
             * the session has successfully returned from a call to receive
             * or when the message listener the session has called to
             * process the message successfully returns.
4000
  Acknowledgments
             * may be delayed in this mode to increase performance at
             * the cost of the message being redelivered this client fails.
             */
//自动确认模式。当会话从接收函数成功返回,或者当会话调用的消息监听者成功地处理了消息并返回的时候,会话自动确认客户端接收到的消息。确认的时候可能有延迟,这是提高性能的需要(客户端失败时消息被重新投递)。
            DUPS_OK_ACKNOWLEDGE,
 
            /**
             * With this acknowledgment mode, the client acknowledges a
             * consumed message by calling the message's acknowledge method.
             */

 
 

//客户端(手动)调用message的确认方法(acknowledge)确认一个被消费的消息。以下内容来自JMS,需确认:客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
 

            CLIENT_ACKNOWLEDGE,
 
            /**
             * Messages will be consumed when the transaction commits.
             */
//当事务提交的时候消息会被消费
            SESSION_TRANSACTED,
 
            /**
             * Message will be acknowledged individually.  Normally the acks sent
             * acknowledge the given message and all messages received before it, this
             * mode only acknowledges one message.
             */
//消息会被独立确认。一般地,acks发送指定消息和所有此前接收到的消息的确认信息。该模式只确认一个message
            INDIVIDUAL_ACKNOWLEDGE
 
        };
 
 
创建Destination
对应四种Destination模式
 
virtual Queue* createQueue( const std::string& queueName ) throw ( CMSException ) = 0;
 
virtual Topic* createTopic( const std::string& topicName ) throw ( CMSException ) = 0;
 
virtual TemporaryQueue* createTemporaryQueue() throw ( CMSException ) = 0;
 
virtual TemporaryTopic* createTemporaryTopic() throw ( CMSException ) = 0;
 
 
以Destination指针去创建生产者消费者
生产者
virtual MessageProducer* createProducer( const Destination* destination ) throw ( CMSException ) = 0;
 
消费者
virtual MessageConsumer* createConsumer( const Destination* destination ) throw ( CMSException ) = 0;
 
提供选择器
virtual MessageConsumer* createConsumer( const Destination* destination, const std::string& selector ) throw ( CMSException ) = 0;
 
noLocal如果为true, destination为topic, 则阻止它自己连接发布的消息投递。destination为queue时上述行为无效
virtual MessageConsumer* createConsumer( const Destination* destination, const std::string& selector, bool noLocal ) throw ( CMSException ) = 0;
 
用一个消息选择器创建一个特定话题的持久订阅者。创建持久消费者的会话必须用和订阅最后使用的ID相同的客户端ID,以接收客户端离线前投递的所有消息。
virtual MessageConsumer* createDurableConsumer( const Topic* destination, const std::string& name, const std::string& selector, bool noLocal = false ) throw ( CMSException ) = 0;
退订
virtual void unsubscribe( const std::string& name ) throw ( CMSException ) = 0;
 
观察者
Creates a new QueueBrowser to peek at Messages on the given Queue.
 
virtual QueueBrowser* createBrowser( const cms::Queue* queue ) throw( CMSException ) = 0;
 
virtual QueueBrowser* createBrowser( const cms::Queue* queue, const std::string& selector ) throw( CMSException ) = 0;
 
 
 
创建Message
virtual Message* createMessage() throw ( CMSException ) = 0;
 
virtual BytesMessage* createBytesMessage() throw ( CMSException) = 0;
 
virtual BytesMessage* createBytesMessage( const unsigned char* bytes, std::size_t bytesSize ) throw ( CMSException) = 0;
 
virtual StreamMessage* createStreamMessage() throw ( CMSException ) = 0;
 
virtual TextMessage* createTextMessage() throw ( CMSException ) = 0;
 
virtual TextMessage* createTextMessage( const std::string& text ) throw ( CMSException ) = 0;
 
virtual MapMessage* createMapMessage() throw ( CMSException ) = 0;
 
 
其他
会话对象是单线程的上下文对象,用来生产和消费Message
用途:
     *  - It is a factory for its message producers and consumers.
     *  - It supplies provider-optimized message factories.
     *  - It is a factory for TemporaryTopics and TemporaryQueues.
     *  - It provides a way to create Queue or Topic objects for those clients
     *    that need to dynamically manipulate provider-specific destination
     *    names.
     *  - It supports a single series of transactions that combine work spanning
     *    its producers and consumers into atomic units.
     *  - It defines a serial order for the messages it consumes and the messages
     *    it produces.
     *  - It retains messages it consumes until they have been acknowledged.
     *  - It serializes execution of message listeners registered with its message
     *    consumers.
 
一个会话可以创建并服务于多个生产者和消费者
一个典型的用法是用一个线程阻塞在同步的MessageConsumer上,直到消息到达。
线程可以使用一个或者多个会话的MessageProducer
如果客户端要求有一个线程生产Message,同时其他线程消费Message,那么客户端应该为它的生产开辟一个独立的线程
 
close方法
     *  - There is no need to close the producers and consumers of a closed session.
     *  - The close call will block until a receive call or message listener in progress
     *    has completed. A blocked message consumer receive call returns null when this
     *    session is closed.
     *  - Closing a transacted session must roll back the transaction in progress.
     *  - The close method is the only Session method that can be called concurrently.
     *  - Invoking any other Session method on a closed session must throw an
     *    IllegalStateException. Closing a closed session must not throw  any exceptions.
 
 
事务化会话
     * When a Session is created it can be set to operate in a Transaction based mode.  Each
     * Session then operates in a single transaction for all Producers and Consumers of that
     * Session.  Messages sent and received within a Transaction are grouped into an atomic
     * unit that is committed or rolled back together.
 
     * For a MessageProducer this implies that all messages sent by the producer are not sent
     * to the Provider unit the commit call is made.  Rolling back the Transaction results in
     * all produced Messages being dropped.
 
     * For a MessageConsumer this implies that all received messages are not Acknowledged until
     * the Commit call is made.  Rolling back the Transaction results in all Consumed Message
     * being redelivered to the client, the Provider may allow configuration that limits the
     * Maximum number of redeliveries for a Message.
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息