QX项目实战-11.基础架构试验二:反序列化对象、重写数据库
2012-11-25 16:38
711 查看
上篇文章[1]中,实现了实验的前三步。下面开始实验的后两步:重建对象、数据库操作和消息平台的实现与完善。整个系统的架构如图所示,即传递的对象是封装了数据库操作对象和数据库SQL语言的report对象,在客户端解析完成后,重做数据库:
首先修正上篇文章中的发送和接收消息程序,之前的代码估计因为配置问题导致程序发送消息不是太流畅,这里改用如下版本程序,进行发送和接受report对象。QueueSend代码如下:
QueueReceive代码如下:
以上代码可以较为可靠实现消息的传递。注意这里传递的消息也是封装了操作对象user和sql类的report对象。在接收端重新解析之后,生成新的数据库操作对象,使用rebuild方法对report下的sql进行重构。Rebuild代码如下:
以上三段代码实现了数据库操作的重做业务逻辑,保证了消息的传递和数据库的同步。下一步准备采用web架构写一个消息平台,同时准备对异构数据库的同步进行实验。
参考
1. QX项目实战-10.基础架构实验一
首先修正上篇文章中的发送和接收消息程序,之前的代码估计因为配置问题导致程序发送消息不是太流畅,这里改用如下版本程序,进行发送和接受report对象。QueueSend代码如下:
package server.activemq; import java.io.Serializable; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import client.curd.rebuild; import client.curd.report; import client.curd.sql; import client.db.user; public class QueueSend { private static finalint SEND_NUMBER = 5; public static voidmain(String[] args) { //ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactoryconnectionFactory; //Connection :JMS 客户端到JMS Provider 的连接 Connectionconnection = null; // Session:一个发送或接收消息的线程 Sessionsession; //Destination :消息的目的地;消息发送给谁. Destinationdestination; //MessageProducer:消息发送者 MessageProducerproducer; //TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory= new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.195.54:61616"); try { // 构造从工厂得到连接对象 connection= connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session= connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //queue1需要在admin界面创建 destination= session.createQueue("queue1"); // 得到消息生成者 producer= session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 //sendMessage(session, producer); useru = new user(); u.setName("gqk"); u.setPassword("123"); sqls = new sql(); s.setQuery("updateusers set password ='1' where id =1;"); reportr = new report(); r.setO(u); r.setSqlQuery(s); sendObject(session,producer, r); session.commit(); } catch(Exception e) { e.printStackTrace(); } finally { try{ if(null != connection) session=null; connection.close(); producer= null; System.exit(0); }catch (Throwable ignore) { } } } public static voidsendMessage(Session session, MessageProducer producer) throwsException { for (int i =1; i <= SEND_NUMBER; i++) { TextMessagemessage = session.createTextMessage("ActiveMq 发送的消息" +i); // 发送消息到目的地方 System.out.println("发送消息:" + i + "成功"); producer.send(message); } } public static voidsendObject(Session session, MessageProducer producer, reportr) throws JMSException { for (int i =1; i <= SEND_NUMBER; i++) { ObjectMessageom = session.createObjectMessage(r); producer.send(om); System.out.println("发送对象:" + om + "成功"); } } }
QueueReceive代码如下:
package client.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import client.curd.rebuild; import client.curd.report; import client.db.user; public class QueueReceive { public static voidmain(String[] args) { //ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactoryconnectionFactory; //Connection :JMS 客户端到JMS Provider 的连接 Connectionconnection = null; // Session:一个发送或接收消息的线程 Sessionsession = null; //Destination :消息的目的地;消息发送给谁. Destinationdestination; // 消费者,消息接收者 MessageConsumerconsumer = null; connectionFactory= new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.195.54:61616"); try { // 构造从工厂得到连接对象 connection= connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session= connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数是一个服务器的queue,须在在ActiveMq的console配置 destination= session.createQueue("queue1"); consumer= session.createConsumer(destination); rebuildrb = new rebuild(); while(true) { ObjectMessageom = (ObjectMessage) consumer.receive(10); if(om != null) { // useru = (user)om.getObject(); reportr = (report)om.getObject(); // System.out.println("收到对象" + r.toString()); rb.doReport(r); }else { ///////////////我家的 session.close(); /////////////// break; } } /* consumer.setMessageListener( newMessageListener() { publicvoid onMessage(Message message) { ObjectMessageom = (ObjectMessage) message; System.out.println(om); // session.commit(); } } ); Thread.sleep(300);*/ } catch(Exception e) { e.printStackTrace(); } finally { try{ if(null != connection) session.close(); connection.close(); consumer.close(); }catch (Throwable ignore) { } } } }
以上代码可以较为可靠实现消息的传递。注意这里传递的消息也是封装了操作对象user和sql类的report对象。在接收端重新解析之后,生成新的数据库操作对象,使用rebuild方法对report下的sql进行重构。Rebuild代码如下:
package client.curd; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import server.db.DB; public class rebuild { public voiddoReport(report r) { try { Connectionconn = DB.getConn(); Statementstmt = conn.createStatement(); Stringsql = r.getSqlQuery().getQuery(); System.out.println(sql); if(sql.startsWith("select")){ ResultSetrs = stmt.executeQuery(sql); while(rs.next()){ System.out.println(rs.getInt(1)+rs.getString(2)+rs.getString(3)); } } elseif(sql.startsWith("update")){ intresult = stmt.executeUpdate(sql); System.out.println(result); } } catch(ClassNotFoundException e) { e.printStackTrace(); } catch(SQLException e) { e.printStackTrace(); } finally{ } } }
以上三段代码实现了数据库操作的重做业务逻辑,保证了消息的传递和数据库的同步。下一步准备采用web架构写一个消息平台,同时准备对异构数据库的同步进行实验。
参考
1. QX项目实战-10.基础架构实验一
相关文章推荐
- QX项目实战-10.基础架构实验一:传递消息、序列化对象和数据库封装
- QX项目实战-13.基础架构试验四:JavaWeb消息平台
- QX项目实战-12.基础架构试验三:异构数据库同步
- 手把手0基础项目实战 · 微服务架构下的数据库分库分表实践
- 项目实战11—企业级nosql数据库应用与实战-redis的主从和集群
- 【slighttpd】基于lighttpd架构的Server项目实战(11)—C++的Name Mangling
- Flask零基础到项目实战(七)请求方法、g对象和钩子函数
- 项目实战7—Mysql实现企业级数据库主从复制架构实战
- Web基础之Cookie对象和Session对象项目实战和对比
- 安全保护项目: 一种分阶段的数据库基础架构保护方法 (第一阶段)
- 安全保护项目: 一种分阶段的数据库基础架构保护方法 (第二阶段)
- Flask零基础到项目实战(四)SQLAlchemy数据库(一)
- QX项目实战-3.读取数据、数据写入数据库、读出数据生成新文件
- 架构之路--实战项目记录(二) 忘记数据库 开始抽象
- 安全保护项目: 一种分阶段的数据库基础架构保护方法 (第三阶段)
- QX项目实战-6.数据库的建立
- Flask零基础到项目实战(四)SQLAlchemy数据库(二)
- 安全保护项目: 一种分阶段的数据库基础架构保护方法 (第四阶段)
- c语言基础学习11_项目实战:IDE(集成开发环境)
- 项目实战-对象序列化