您的位置:首页 > 其它

对象池common-pool源码分析

2015-07-30 17:28 531 查看
前几天在看一个应用的日志时,发现与MySQL连接时发生如下错误:

The last packet successfully received from the server was XXXXXX seconds ago.

The last packet sent successfully to the server was XXXXXX seconds ago,

大概意思是距离上一次连接MySQL的间隔时间,已经超出了MySQL设置的'wait_timeout'时长啦。

查看dbcp的源码

public class GenericObjectPool extends BaseObjectPool implements ObjectPool {

    public GenericObjectPool(){

//从ObjectPool创建开始启动回收定时任务,<span style="font-family: Arial, Helvetica, sans-serif;">timeBetweenEvictionRunsMillis为回收时间间隔</span>
startEvictor(_timeBetweenEvictionRunsMillis);
}

protected synchronized void startEvictor(long delay) {
        if(null != _evictor) {
            EvictionTimer.cancel(_evictor);
            _evictor = null;
        }
        if(delay > 0) {
            _evictor = new Evictor();
            EvictionTimer.schedule(_evictor, delay, delay);
        }
    }

 private class Evictor extends TimerTask {
        /**
         * Run pool maintenance.  Evict objects qualifying for eviction and then
         * invoke {@link GenericObjectPool#ensureMinIdle()}.
         */
        public void run() {
            try {
//1.回收池对象
                evict();
            } catch(Exception e) {
                // ignored
            }
            try {
//2.确保最小的空闲数
                ensureMinIdle();
            } catch(Exception e) {
                // ignored
            }
        }
    }

 public void evict() throws Exception {
        assertOpen();
        synchronized (this) {
            if(_pool.isEmpty()) {
                return;
            }
            if (null == _evictionCursor) {
                _evictionCursor = (_pool.cursor(_lifo ? _pool.size() : 0));
            }
        }

//numTestsPerEvictionRun可以设置
        for (int i=0,m=getNumTests();i<m;i++) {
            final ObjectTimestampPair pair;
            synchronized (this) {
                if ((_lifo && !_evictionCursor.hasPrevious()) ||
                        !_lifo && !_evictionCursor.hasNext()) {
                    _evictionCursor.close();
                    _evictionCursor = _pool.cursor(_lifo ? _pool.size() : 0);
                }

                pair = _lifo ?
                        (ObjectTimestampPair) _evictionCursor.previous() :
                        (ObjectTimestampPair) _evictionCursor.next();

                _evictionCursor.remove();
                _numInternalProcessing++;
            }

            boolean removeObject = false;
            final long idleTimeMilis = System.currentTimeMillis() - pair.tstamp;
            if ((getMinEvictableIdleTimeMillis() > 0) &&
                    (idleTimeMilis > getMinEvictableIdleTimeMillis())) {
                removeObject = true;
            } else if ((getSoftMinEvictableIdleTimeMillis() > 0) &&
                    (idleTimeMilis > getSoftMinEvictableIdleTimeMillis()) &&
                    ((getNumIdle() + 1)> getMinIdle())) { // +1 accounts for object we are processing
                removeObject = true;
            }
            if(getTestWhileIdle() && !removeObject) {
                boolean active = false;
                try {
                    _factory.activateObject(pair.value);
                    active = true;
                } catch(Exception e) {
                    removeObject=true;
                }
                if(active) {
//验证失败,需要回收对象
                    if(!_factory.validateObject(pair.value)) {
                        removeObject=true;
                    } else {
                        try {
                            _factory.passivateObject(pair.value);
                        } catch(Exception e) {
                            removeObject=true;
                        }
                    }
                }
            }

            if (removeObject) {
                try {
//工厂销毁对象
                    _factory.destroyObject(pair.value);
                } catch(Exception e) {
                    // ignored
                }
            }
            synchronized (this) {
                if(!removeObject) {
                    _evictionCursor.add(pair);
                    if (_lifo) {
                        // Skip over the element we just added back
                        _evictionCursor.previous();
                    }
                }
                _numInternalProcessing--;
            }
        }
    }

public Object borrowObject() throws Exception {
//pool会一直borrow直到创建成功
 for(;;) {

if(latch.getPair() == null) {

 try {
                synchronized (latch) {
                // Before we wait, make sure another thread didn't allocate us an object
                // or permit a new object to be created
                if (latch.getPair() == null && !latch.mayCreate()) {
                  if(maxWait <= 0) {
                        latch.wait();
                   } else {
                      // this code may be executed again after a notify then continue cycle
                     // so, need to calculate the amount of time to wait
                     final long elapsed = (System.currentTimeMillis() - starttime);
                     final long waitTime = maxWait - elapsed;
                     if (waitTime > 0)
                      {
                          latch.wait(waitTime);
                       }
                    }
                  } else {
                     break;
                  }
                 }
               } catch(InterruptedException e) {
                      Thread.currentThread().interrupt();
                      throw e;
               }
              if(maxWait > 0 && ((System.currentTimeMillis() - starttime) >= maxWait)) {
                  synchronized(this) {
                 // Make sure allocate hasn't already assigned an object
                // in a different thread or permitted a new object to be created
               if (latch.getPair() == null && !latch.mayCreate()) {
                // Remove latch from the allocation queue
               _allocationQueue.remove(latch);
                } else {
                    break;
                }
             }
                throw new NoSuchElementException("Timeout waiting for idle object");
              } else {
                continue; // keep looping
            }

}

//factory makeObject
boolean newlyCreated = false;
            if(null == latch.getPair()) {
                try {
                    Object obj = _factory.makeObject();
                    latch.setPair(new ObjectTimestampPair(obj));
                    newlyCreated = true;
                } finally {
                    if (!newlyCreated) {
                        // object cannot be created
                        synchronized (this) {
                            _numInternalProcessing--;
                            // No need to reset latch - about to throw exception
                            allocate();
                        }
                    }
                }
            }

 try {
               //factory activateObject
_factory.activateObject(latch.getPair().value);
//testOnBorrow可以自己设置
                if(_testOnBorrow &&
                        !_factory.validateObject(latch.getPair().value)) {
                    throw new Exception("ValidateObject failed");
                }
                synchronized(this) {
                    _numInternalProcessing--;
                    _numActive++;
                }
                return latch.getPair().value;
            }
            catch (Throwable e) {
                // object cannot be activated or is invalid
                try {
                    _factory.destroyObject(latch.getPair().value);
                } catch (Throwable e2) {
                    // cannot destroy broken object
                }
                synchronized (this) {
                    _numInternalProcessing--;
                    if (!newlyCreated) {
                        latch.reset();
                        _allocationQueue.add(0, latch);
                    }
                    allocate();
                }
//创建后的对象,验证失败后会抛异常
                if(newlyCreated) {
                    throw new NoSuchElementException("Could not create a validated object, cause: " + e.getMessage());
                }
                else {
                    continue; // keep looping
                }
            }
        }

}

}

/**
* 归还对象
**/
public void returnObject(Object obj) throws Exception {
            try {
                addObjectToPool(obj, true);
            } catch (Exception e) {
               ....
            }
        }

private void addObjectToPool(Object obj, boolean decrementNumActive) throws Exception {
        boolean success = true;
//testOnBorrow参数可配置,testOnBorrow为true时,归回后验证失败会将对象销毁
        if (_testOnReturn && !(_factory.validateObject(obj))) {
            success = false;
        } else {
            _factory.passivateObject(obj);
        }
        boolean shouldDestroy = !success;
// Add instance to pool if there is room and it has passed validation
// (if testOnreturn is set)
 synchronized (this) {
            if (isClosed()) {
                shouldDestroy = true;
            } else {
                if((_maxIdle >= 0) && (_pool.size() >= _maxIdle)) {
                    shouldDestroy = true;
                } else if(success) {
                    // borrowObject always takes the first element from the queue,
                    // so for LIFO, push on top, FIFO add to end
                    if (_lifo) {
                        _pool.addFirst(new ObjectTimestampPair(obj));
                    } else {
                        _pool.addLast(new ObjectTimestampPair(obj));
                    }
                    if (decrementNumActive) {
                        _numActive--;
                    }
                    allocate();
                }
            }
        }
}


总结

1.超过minEvictableIdleTimeMillis对象会被destroy

2.testWhileIdle为true时,对象的minEvictableIdleTimeMillis内也会validateObject对象是否有效,无效对象将被destroy

下面看看PoolableConnectionFactory

public class PoolableConnectionFactory implements PoolableObjectFactory {

 public void activateObject(Object obj) throws Exception {
 if (conn.getAutoCommit() != _defaultAutoCommit) {
                conn.setAutoCommit(_defaultAutoCommit);
            }

}

 public void destroyObject(Object obj) throws Exception {
        if(obj instanceof PoolableConnection) {
            ((PoolableConnection)obj).reallyClose();
        }
    }

 public boolean validateObject(Object obj) {
  String query = _validationQuery;
if(conn.isClosed()) {
            throw new SQLException("validateConnection: connection closed");
        }

 rset = stmt.executeQuery(query);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: