您的位置:首页 > 其它

Mycat的JDBC后端框架

2016-07-02 00:00 309 查看
摘要: Mycat对JDBC支持部分的代码比较简单,主要实现了下面三个类:
1. JDBCDatasource JDBC物理数据源
2. JDBCConnection JDBC连接类
3. JDBCHeartbeat JDBC心跳类

JDBC方式访问后端数据库

Mycat对JDBC支持部分的代码比较简单,主要实现了下面三个类:

1. JDBCDatasource JDBC物理数据源

2. JDBCConnection JDBC连接类

3. JDBCHeartbeat JDBC心跳类

JDBC相关类图



JDBCDatasource

JDBCDatasource继承PhysicalDatasource

初始化的时候加载支持数据库的驱动

static { // 加载可能的驱动

List<String> drivers = Lists.newArrayList("com.mysql.jdbc.Driver", "org.opencloudb.jdbc.mongodb.MongoDriver", "oracle.jdbc.OracleDriver", "com.microsoft.sqlserver.jdbc.SQLServerDriver","org.apache.hive.jdbc.HiveDriver","com.ibm.db2.jcc.DB2Driver","org.postgresql.Driver");

for (String driver : drivers) {

try{

Class.forName(driver);

} catch (ClassNotFoundException ignored) {

}

}

}

创建连接的时候,从配置文件中获取host,port,dbtype,还有连接数据库的url,User,Password

public void createNewConnection(ResponseHandler handler,

String schema) throws IOException {

DBHostConfig cfg = getConfig();

JDBCConnection c = new JDBCConnection();

c.setHost(cfg.getIp());

c.setPort(cfg.getPort());

c.setPool(this);

c.setSchema(schema);

c.setDbType(cfg.getDbType());

try {

// TODO 这里应该有个连接池

Connection con = getConnection();

// c.setIdleTimeout(pool.getConfig().getIdleTimeout());

c.setCon(con); // notify handler

handler.connectionAcquired(c);

} catch (Exception e) {

handler.connectionError(e, c);

}

}

获取连接的时候,判断是否配置的初始化语句,如果存在,就执行初始化语句,此功能可用于设置日期格式,字符集等

Connection getConnection() throws SQLException {

DBHostConfig cfg = getConfig();

Connection connection = DriverManager.getConnection(cfg.getUrl(),

cfg.getUser(), cfg.getPassword());

String initSql=getHostConfig().getConnectionInitSql();

if(initSql!=null&&!"".equals(initSql)) //初始化语句是否存在

{

Statement statement =null;

try{

statement = connection.createStatement();

statement.execute(initSql);

}finally{

if(statement!=null){

statement.close();

}

}

}

return connection;

}

mycat 又从哪里创建JDBCDatasource的呢?

请看org.opencloudb.ConfigInitializer.

判断是否dbType是mysql并且dbDriver是native,使用MySQLDataSource连接后台数据库,如果dbDriver是jdbc就使用JDBCDatasource连接后台数据库,否则抛出异常。

private PhysicalDatasource[] createDataSource(DataHostConfig conf, String hostName, String dbType, String dbDriver, DBHostConfig[] nodes, boolean isRead) { PhysicalDatasource[] dataSources = new PhysicalDatasource[nodes.length]; if (dbType.equals("mysql") && dbDriver.equals("native")) { for (int i = 0; i < nodes.length; i++) { nodes[i].setIdleTimeout(system.getIdleTimeout()); MySQLDataSource ds = new MySQLDataSource(nodes[i], conf, isRead); dataSources[i] = ds; } } else if(dbDriver.equals("jdbc"))//是jdbc方式 { for (int i = 0; i < nodes.length; i++) { nodes[i].setIdleTimeout(system.getIdleTimeout()); JDBCDatasource ds = new JDBCDatasource(nodes[i], conf, isRead); dataSources[i] = ds; } } else { throw new ConfigException("not supported yet !" + hostName); } return dataSources; }

JDBCConnection

JDBCConnection主要做两件事情,就是执行SQL语句,然后把执行结果发回给mpp(SQL合并引擎,mycat处理多节点结果集排序,分组,分页),需要实现ResponseHandler的接口。

下面来分析下执行SQL语句的代码:

创建线程Runnable,在线程中执行executeSQL的方法,并把线程放入MycatServer的线程池中执行,据测试,比不用线程方式执行SQL语句效率提高20%-30%。

public void execute(final RouteResultsetNode node, final ServerConnection source, final boolean autocommit) throws IOException { Runnable runnable=new Runnable() { @Override public void run() { try { executeSQL(node, source, autocommit); } catch (IOException e) { throw new RuntimeException(e); } } } ; MycatServer.getInstance().getBusinessExecutor().execute(runnable); }

执行SQL语句的过程,先判断是select,或show语句还是ddl语句

1:如果是show指令,并且不是mysql数据库,执行ShowVariables.execute,构造mysql的固定信息包

2:如果是SELECT CONNECTION_ID()语句,执行ShowVariables.justReturnValue,也是构造mysql的固定信息包

3:如果是SELECT语句,执行并且有返回结果数据集

4:如果是DDL语句,执行并且返回OkPacket

private void executeSQL(RouteResultsetNode rrn, ServerConnection sc, boolean autocommit) throws IOException {

String orgin = rrn.getStatement();

if (!modifiedSQLExecuted && rrn.isModifySQL()) {

modifiedSQLExecuted = true;

}

try {

if (!this.schema.equals(this.oldSchema)) {//判断

con.setCatalog(schema);

this.oldSchema = schema;

}

if (!this.isSpark){//spark sql ,hive 不支持事务

con.setAutoCommit(autocommit);

}

int sqlType = rrn.getSqlType(); //判断是否是查询或者mysql的show指令

if (sqlType == ServerParse.SELECT || sqlType == ServerParse.SHOW ) {

if ((sqlType ==ServerParse.SHOW) && (!dbType.equals("MYSQL")) ){ ShowVariables.execute(sc, orgin,this);//show指令的返回结果 } else if("SELECT CONNECTION_ID()".equalsIgnoreCase(orgin)) { ShowVariables.justReturnValue(sc, String.valueOf(sc.getId()),this); }else {

ouputResultSet(sc, orgin);//执行select语句,并处理结果集

}

} else {//sql ddl 执行

executeddl(sc, orgin);

}

} catch (SQLException e) {//异常处理

String msg = e.getMessage();

ErrorPacket error = new ErrorPacket();

error.packetId = ++packetId;

error.errno = e.getErrorCode();

error.message = msg.getBytes(); //触发错误数据包的响应事件 this.respHandler.errorResponse(error.writeToBytes(sc), this);

} finally {

this.running = false;

} }

ouputResultSet(sc, orgin);//执行select语句,并处理结果集

stmt = con.createStatement();

rs = stmt.executeQuery(sql); 执行sql语句

List<FieldPacket> fieldPks = new LinkedList<FieldPacket>();//创建字段列表 //把字段的元数据转换为mysql的元数据并放入fieldPks中,主要是数据类型 ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);

把字段信息封装成mysql的网络封包

int colunmCount =fieldPks.size();

ByteBuffer byteBuf = sc.allocate();

ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();

headerPkg.fieldCount = fieldPks.size();

headerPkg.packetId = ++packetId;

byteBuf = headerPkg.write(byteBuf, sc, true);

byteBuf.flip();

byte[] header = new byte[byteBuf.limit()];

byteBuf.get(header);

byteBuf.clear();

List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());

Iterator<FieldPacket> itor = fieldPks.iterator();

while (itor.hasNext()) {

FieldPacket curField = itor.next();

curField.packetId = ++packetId;

byteBuf = curField.write(byteBuf, sc, false);

byteBuf.flip();

byte[] field = new byte[byteBuf.limit()];

byteBuf.get(field);

byteBuf.clear();

fields.add(field);

itor.remove();

}

EOFPacket eofPckg = new EOFPacket();

eofPckg.packetId = ++packetId;

byteBuf = eofPckg.write(byteBuf, sc, false);

byteBuf.flip();

byte[] eof = new byte[byteBuf.limit()];

byteBuf.get(eof);

byteBuf.clear();

//触发收到字段数据包结束的响应事件

this.respHandler.fieldEofResponse(header, fields, eof, this);

遍历结果数据集ResultSet,并把每一条记录封装成一个数据包,数据发送完成,还需要在封装一个行结束的数据包

// output row

while (rs.next()) {

RowDataPacket curRow = new RowDataPacket(colunmCount);

for (int i = 0; i < colunmCount; i++) {

int j = i + 1;

curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));

}

curRow.packetId = ++packetId;

byteBuf = curRow.write(byteBuf, sc, false);

byteBuf.flip();

byte[] row = new byte[byteBuf.limit()];

byteBuf.get(row);

byteBuf.clear();

//触发收到行数据包的响应事件

this.respHandler.rowResponse(row, this);

} // end row

eofPckg = new EOFPacket();

eofPckg.packetId = ++packetId;

byteBuf = eofPckg.write(byteBuf, sc, false);

byteBuf.flip();

eof = new byte[byteBuf.limit()];

byteBuf.get(eof);

//收到行数据包结束的响应处理

this.respHandler.rowEofResponse(eof, this);

JDBCHeartbeat

JDBCHeartbeat就是定时执行schema.xml中dataHost的heartbeat语句。

在启动的时候判断心跳语句是否为空,如果为空则执行stop(),后面再执行heartbeat()方法时,直接返回。

public class JDBCHeartbeat extends DBHeartbeat{

private final ReentrantLock lock;

private final JDBCDatasource source;

private final boolean heartbeatnull;

public JDBCHeartbeat(JDBCDatasource source) {

this.source = source;

lock = new ReentrantLock(false);

this.status = INIT_STATUS;

this.heartbeatSQL = source.getHostConfig().getHearbeatSQL().trim(); this.heartbeatnull= heartbeatSQL.length()==0;//判断心跳语句是否为空 }

@Override

public void start()//启动

{

if (this.heartbeatnull){

stop();

return;

}

lock.lock();

try {

isStop.compareAndSet(true, false);

this.status = DBHeartbeat.OK_STATUS;

} finally{

lock.unlock();

}

}

@Override

public void stop()//停止

{

lock.lock();

try{

if (isStop.compareAndSet(false, true)) {

isChecking.set(false);

}

} finally {

lock.unlock();

}

}

....

@Override

public void heartbeat()//执行心跳语句

{

if (isStop.get()) return;

lock.lock();

try {

isChecking.set(true);

try (Connection c = source.getConnection()){

try (Statement s = c.createStatement()){

s.execute(heartbeatSQL);

}

}

status = OK_STATUS;

} catch (SQLException ex) {

status = ERROR_STATUS;

} finally {

lock.unlock();

this.isChecking.set(false);

}

}

更多内容请关注微信公众号:it_haha
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: