1. 先来看构建MongoClient使用的参数
MongoClientOptions.Builder optionsBuilder = builder()
.applicationName(applicationName)
.maxConnectionIdleTime(config.getMaxConnectionIdleTime() == 0 ?
MAX_CONNECTION_IDLE_TIME : config.getMaxConnectionIdleTime())
.connectionsPerHost(config.getConnectionsPerHost() == 0 ?
MAX_MONGO_CONNECTION_PER_HOST : config.getConnectionsPerHost())
.threadsAllowedToBlockForConnectionMultiplier(config.getThreadsAllowedToBlockForConnectionMultiplier() == 0 ?
CONNECTION_MULTIPLIER : config.getThreadsAllowedToBlockForConnectionMultiplier())
// 鎻愰珮闃熷垪闀垮害
.maxWaitTime(config.getMaxWaitTime() == 0 ?
TIME_OUT : config.getMaxWaitTime())
.connectTimeout(config.getConnectTimeout() == 0 ?
TIME_OUT : config.getConnectTimeout())
.socketTimeout(config.getSocketTimeout() == 0 ?
TIME_OUT : config.getSocketTimeout())
.serverSelectionTimeout(config.getServerSelectionTimeout() == 0 ?
TIME_OUT : config.getServerSelectionTimeout())
.socketKeepAlive(true)
.addServerListener(new MongoServerListener())
.addCommandListener(new MongoClientListener());
List<MongoCredential> credentials = MongoCredentialPropertyUtils.getCredentials(config.getCredentials());
if (!config.isStandalone()) {
optionsBuilder.readPreference(readPreferenceEnum.getReadPreference());
}
List<String> servers = config.getServers();
MongoClient client;
if (!config.isStandalone()) {
List<ServerAddress> addresses = asList(
servers.stream()
.map(server -> {
if (server.contains(":")) {
String[] hosts = server.split(":");
return new ServerAddress(hosts[0], Integer.parseInt(hosts[1]));
}
return new ServerAddress(server);
}).toArray(ServerAddress[]::new));
if (CollectionUtils.isNotEmpty(credentials)) {
client = new MongoClient(addresses, credentials, optionsBuilder.build());
} else {
client = new MongoClient(addresses, optionsBuilder.build());
}
} else {
if (CollectionUtils.isNotEmpty(credentials)) {
client = new MongoClient(new ServerAddress(servers.get(0)), credentials, optionsBuilder.build());
} else {
client = new MongoClient(new ServerAddress(servers.get(0)), optionsBuilder.build());
}
}
client.listDatabaseNames();
2.MongoClientOptions类
我们重点看一下MongoClientOptions及MongoClientOptions.Builder中与连接池相关的部分
如果我们要对连接池做一些监控,需要在外层注册JMXConnectionPoolListener到connectionPoolListeners里
ConnectionPoolSettings.Builder connectionPoolSettingsBuilder = ConnectionPoolSettings.builder()
.minSize(getMinConnectionsPerHost())
.maxSize(getConnectionsPerHost())
.maxWaitQueueSize(getThreadsAllowedToBlockForConnectionMultiplier() * getConnectionsPerHost())
.maxWaitTime(getMaxWaitTime(), MILLISECONDS)
.maxConnectionIdleTime(getMaxConnectionIdleTime(), MILLISECONDS)
.maxConnectionLifeTime(getMaxConnectionLifeTime(), MILLISECONDS);
for (ConnectionPoolListener connectionPoolListener : builder.connectionPoolListeners) {
connectionPoolSettingsBuilder.addConnectionPoolListener(connectionPoolListener);
}
3.JMXConnectionPoolListener
我们来看看JMXConnectionPoolListener这个类做了什么
public class JMXConnectionPoolListener implements ConnectionPoolListener {
private final ConcurrentMap<ServerId, ConnectionPoolStatistics> map =
new ConcurrentHashMap<ServerId, ConnectionPoolStatistics>();
@Override
public void connectionPoolOpened(final ConnectionPoolOpenedEvent event) {
ConnectionPoolStatistics statistics = new ConnectionPoolStatistics(event);
map.put(event.getServerId(), statistics);
MBeanServerFactory.getMBeanServer().registerMBean(statistics, getMBeanObjectName(event.getServerId()));
}
@Override
public void connectionPoolClosed(final ConnectionPoolClosedEvent event) {
map.remove(event.getServerId());
MBeanServerFactory.getMBeanServer().unregisterMBean(getMBeanObjectName(event.getServerId()));
}
@Override
public void connectionCheckedOut(final ConnectionCheckedOutEvent event) {
ConnectionPoolStatistics statistics = getStatistics(event.getConnectionId());
if (statistics != null) {
statistics.connectionCheckedOut(event);
}
}
@Override
public void connectionCheckedIn(final ConnectionCheckedInEvent event) {
ConnectionPoolStatistics statistics = getStatistics(event.getConnectionId());
if (statistics != null) {
statistics.connectionCheckedIn(event);
}
}
@Override
public void waitQueueEntered(final ConnectionPoolWaitQueueEnteredEvent event) {
ConnectionPoolListener statistics = getStatistics(event.getServerId());
if (statistics != null) {
statistics.waitQueueEntered(event);
}
}
@Override
public void waitQueueExited(final ConnectionPoolWaitQueueExitedEvent event) {
ConnectionPoolListener statistics = getStatistics(event.getServerId());
if (statistics != null) {
statistics.waitQueueExited(event);
}
}
@Override
public void connectionAdded(final ConnectionAddedEvent event) {
ConnectionPoolStatistics statistics = getStatistics(event.getConnectionId());
if (statistics != null) {
statistics.connectionAdded(event);
}
}
@Override
public void connectionRemoved(final ConnectionRemovedEvent event) {
ConnectionPoolStatistics statistics = getStatistics(event.getConnectionId());
if (statistics != null) {
statistics.connectionRemoved(event);
}
}
String getMBeanObjectName(final ServerId serverId) {
String name = format("org.mongodb.driver:type=ConnectionPool,clusterId=%s,host=%s,port=%s",
ensureValidValue(serverId.getClusterId().getValue()),
ensureValidValue(serverId.getAddress().getHost()),
serverId.getAddress().getPort());
if (serverId.getClusterId().getDescription() != null) {
name = format("%s,description=%s", name, ensureValidValue(serverId.getClusterId().getDescription()));
}
return name;
}
// for unit test
ConnectionPoolStatisticsMBean getMBean(final ServerId serverId) {
return getStatistics(serverId);
}
private ConnectionPoolStatistics getStatistics(final ConnectionId connectionId) {
return getStatistics(connectionId.getServerId());
}
private ConnectionPoolStatistics getStatistics(final ServerId serverId) {
return map.get(serverId);
}
private String ensureValidValue(final String value) {
if (containsQuotableCharacter(value)) {
return ObjectName.quote(value);
} else {
return value;
}
}
private boolean containsQuotableCharacter(final String value) {
if (value == null || value.length() == 0) {
return false;
}
List<String> quoteableCharacters = asList(",", ":", "?", "*", "=", "\"", "\\", "\n");
for (String quotable : quoteableCharacters) {
if (value.contains(quotable)) {
return true;
}
}
return false;
}
}
通过事件监听的方式,向jmsServer中包括ConnectionPoolStatistics对象,这个对象包含了连接池的一些基本信息,可以通过此对象获取到你想要的一些基础信息。
4.现在我们来看一下连接池
MultiServerCluster.addNewHosts方法表明,集群模式下每台host都对应一个单独的DefaultConnectionPool
private void addNewHosts(final Set<String> hosts) {
for (final String cur : hosts) {
addServer(new ServerAddress(cur));
}
}
5.DefaultConnectionPool底层使用的真正的连接池是ConcurrentPool,DefaultConnectionPool对外暴露get等方法。
这个类中的maintenanceTask任务定期清理过期对象并维护池内对象不小于最少连接数
class DefaultConnectionPool implements ConnectionPool {
private static final Logger LOGGER = Loggers.getLogger("connection");
private final ConcurrentPool<UsageTrackingInternalConnection> pool;
private final ConnectionPoolSettings settings;
private final AtomicInteger waitQueueSize = new AtomicInteger(0);
private final AtomicInteger generation = new AtomicInteger(0);
private final ExecutorService sizeMaintenanceTimer;
private ExecutorService asyncGetter;
private final Runnable maintenanceTask;
private final ConnectionPoolListener connectionPoolListener;
private final ServerId serverId;
private volatile boolean closed;
DefaultConnectionPool(final ServerId serverId, final InternalConnectionFactory internalConnectionFactory,
final ConnectionPoolSettings settings) {
this.serverId = notNull("serverId", serverId);
this.settings = notNull("settings", settings);
UsageTrackingInternalConnectionItemFactory connectionItemFactory
= new UsageTrackingInternalConnectionItemFactory(internalConnectionFactory);
pool = new ConcurrentPool<UsageTrackingInternalConnection>(settings.getMaxSize(), connectionItemFactory);
maintenanceTask = createMaintenanceTask();
sizeMaintenanceTimer = createMaintenanceTimer();
this.connectionPoolListener = getConnectionPoolListener(settings);
connectionPoolListener.connectionPoolOpened(new ConnectionPoolOpenedEvent(serverId, settings));
}
@Override
public InternalConnection get() {
return get(settings.getMaxWaitTime(MILLISECONDS), MILLISECONDS);
}
@Override
public InternalConnection get(final long timeout, final TimeUnit timeUnit) {
try {
if (waitQueueSize.incrementAndGet() > settings.getMaxWaitQueueSize()) {
throw createWaitQueueFullException();
}
try {
connectionPoolListener.waitQueueEntered(new ConnectionPoolWaitQueueEnteredEvent(serverId));
PooledConnection pooledConnection = getPooledConnection(timeout, timeUnit);
if (!pooledConnection.opened()) {
try {
pooledConnection.open();
} catch (Throwable t) {
pool.release(pooledConnection.wrapped, true);
if (t instanceof MongoException) {
throw (MongoException) t;
} else {
throw new MongoInternalException(t.toString(), t);
}
}
}
return pooledConnection;
} finally {
connectionPoolListener.waitQueueExited(new ConnectionPoolWaitQueueExitedEvent(serverId));
}
} finally {
waitQueueSize.decrementAndGet();
}
}
@Override
public void getAsync(final SingleResultCallback<InternalConnection> callback) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Asynchronously getting a connection from the pool for server %s", serverId));
}
final SingleResultCallback<InternalConnection> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
PooledConnection connection = null;
try {
connection = getPooledConnection(0, MILLISECONDS);
} catch (MongoTimeoutException e) {
// fall through
} catch (Throwable t) {
callback.onResult(null, t);
return;
}
if (connection != null) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Asynchronously opening pooled connection %s to server %s",
connection.getDescription().getConnectionId(), serverId));
}
openAsync(connection, errHandlingCallback);
} else if (waitQueueSize.incrementAndGet() > settings.getMaxWaitQueueSize()) {
waitQueueSize.decrementAndGet();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Asynchronously failing to get a pooled connection to %s because the wait queue is full",
serverId));
}
callback.onResult(null, createWaitQueueFullException());
} else {
final long startTimeMillis = System.currentTimeMillis();
connectionPoolListener.waitQueueEntered(new ConnectionPoolWaitQueueEnteredEvent(serverId));
getAsyncGetter().submit(new Runnable() {
@Override
public void run() {
try {
if (getRemainingWaitTime() <= 0) {
errHandlingCallback.onResult(null, createTimeoutException());
} else {
PooledConnection connection = getPooledConnection(getRemainingWaitTime(), MILLISECONDS);
openAsync(connection, errHandlingCallback);
}
} catch (Throwable t) {
errHandlingCallback.onResult(null, t);
} finally {
waitQueueSize.decrementAndGet();
connectionPoolListener.waitQueueExited(new ConnectionPoolWaitQueueExitedEvent(serverId));
}
}
private long getRemainingWaitTime() {
return startTimeMillis + settings.getMaxWaitTime(MILLISECONDS) - System.currentTimeMillis();
}
});
}
}
private void openAsync(final PooledConnection pooledConnection,
final SingleResultCallback<InternalConnection> callback) {
if (pooledConnection.opened()) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Pooled connection %s to server %s is already open",
pooledConnection.getDescription().getConnectionId(), serverId));
}
callback.onResult(pooledConnection, null);
} else {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Pooled connection %s to server %s is not yet open",
pooledConnection.getDescription().getConnectionId(), serverId));
}
pooledConnection.openAsync(new SingleResultCallback<Void>() {
@Override
public void onResult(final Void result, final Throwable t) {
if (t != null) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Pooled connection %s to server %s failed to open",
pooledConnection.getDescription().getConnectionId(), serverId));
}
callback.onResult(null, t);
pool.release(pooledConnection.wrapped, true);
} else {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Pooled connection %s to server %s is now open",
pooledConnection.getDescription().getConnectionId(), serverId));
}
callback.onResult(pooledConnection, null);
}
}
});
}
}
private synchronized ExecutorService getAsyncGetter() {
if (asyncGetter == null) {
asyncGetter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
}
return asyncGetter;
}
private synchronized void shutdownAsyncGetter() {
if (asyncGetter != null) {
asyncGetter.shutdownNow();
}
}
@Override
public void invalidate() {
LOGGER.debug("Invalidating the connection pool");
generation.incrementAndGet();
}
@Override
public void close() {
if (!closed) {
pool.close();
if (sizeMaintenanceTimer != null) {
sizeMaintenanceTimer.shutdownNow();
}
shutdownAsyncGetter();
closed = true;
connectionPoolListener.connectionPoolClosed(new ConnectionPoolClosedEvent(serverId));
}
}
/**
* Synchronously prune idle connections and ensure the minimum pool size.
*/
public void doMaintenance() {
if (maintenanceTask != null) {
maintenanceTask.run();
}
}
private PooledConnection getPooledConnection(final long timeout, final TimeUnit timeUnit) {
UsageTrackingInternalConnection internalConnection = pool.get(timeout, timeUnit);
while (shouldPrune(internalConnection)) {
pool.release(internalConnection, true);
internalConnection = pool.get(timeout, timeUnit);
}
connectionPoolListener.connectionCheckedOut(new ConnectionCheckedOutEvent(internalConnection.getDescription().getConnectionId()));
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Checked out connection [%s] to server %s", getId(internalConnection), serverId.getAddress()));
}
return new PooledConnection(internalConnection);
}
private MongoTimeoutException createTimeoutException() {
return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s.",
settings.getMaxWaitTime(MILLISECONDS), serverId.getAddress()));
}
private MongoWaitQueueFullException createWaitQueueFullException() {
return new MongoWaitQueueFullException(format("Too many threads are already waiting for a connection. "
+ "Max number of threads (maxWaitQueueSize) of %d has been exceeded.",
settings.getMaxWaitQueueSize()));
}
ConcurrentPool<UsageTrackingInternalConnection> getPool() {
return pool;
}
private Runnable createMaintenanceTask() {
Runnable newMaintenanceTask = null;
if (shouldPrune() || shouldEnsureMinSize()) {
newMaintenanceTask = new Runnable() {
@Override
public synchronized void run() {
try {
if (shouldPrune()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Pruning pooled connections to %s", serverId.getAddress()));
}
pool.prune();
}
if (shouldEnsureMinSize()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Ensuring minimum pooled connections to %s", serverId.getAddress()));
}
pool.ensureMinSize(settings.getMinSize(), true);
}
} catch (MongoInterruptedException e) {
// don't log interruptions due to the shutdownNow call on the ExecutorService
} catch (Exception e) {
LOGGER.warn("Exception thrown during connection pool background maintenance task", e);
}
}
};
}
return newMaintenanceTask;
}
private ExecutorService createMaintenanceTimer() {
if (maintenanceTask == null) {
return null;
} else {
ScheduledExecutorService newTimer = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("MaintenanceTimer"));
newTimer.scheduleAtFixedRate(maintenanceTask, settings.getMaintenanceInitialDelay(MILLISECONDS),
settings.getMaintenanceFrequency(MILLISECONDS), MILLISECONDS);
return newTimer;
}
}
private boolean shouldEnsureMinSize() {
return settings.getMinSize() > 0;
}
private boolean shouldPrune() {
return settings.getMaxConnectionIdleTime(MILLISECONDS) > 0 || settings.getMaxConnectionLifeTime(MILLISECONDS) > 0;
}
private boolean shouldPrune(final UsageTrackingInternalConnection connection) {
return fromPreviousGeneration(connection) || pastMaxLifeTime(connection) || pastMaxIdleTime(connection);
}
private boolean pastMaxIdleTime(final UsageTrackingInternalConnection connection) {
return expired(connection.getLastUsedAt(), System.currentTimeMillis(), settings.getMaxConnectionIdleTime(MILLISECONDS));
}
private boolean pastMaxLifeTime(final UsageTrackingInternalConnection connection) {
return expired(connection.getOpenedAt(), System.currentTimeMillis(), settings.getMaxConnectionLifeTime(MILLISECONDS));
}
private boolean fromPreviousGeneration(final UsageTrackingInternalConnection connection) {
return generation.get() > connection.getGeneration();
}
private boolean expired(final long startTime, final long curTime, final long maxTime) {
return maxTime != 0 && curTime - startTime > maxTime;
}
/**
* If there was a socket exception that wasn't some form of interrupted read, increment the generation count so that any connections
* created prior will be discarded.
*
* @param connection the connection that generated the exception
* @param t the exception
*/
private void incrementGenerationOnSocketException(final InternalConnection connection, final Throwable t) {
if (t instanceof MongoSocketException && !(t instanceof MongoSocketReadTimeoutException)) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(format("Got socket exception on connection [%s] to %s. All connections to %s will be closed.",
getId(connection), serverId.getAddress(), serverId.getAddress()));
}
invalidate();
}
}
private ConnectionId getId(final InternalConnection internalConnection) {
return internalConnection.getDescription().getConnectionId();
}
private class PooledConnection implements InternalConnection {
private final UsageTrackingInternalConnection wrapped;
private final AtomicBoolean isClosed = new AtomicBoolean();
PooledConnection(final UsageTrackingInternalConnection wrapped) {
this.wrapped = notNull("wrapped", wrapped);
}
@Override
public void open() {
isTrue("open", !isClosed.get());
wrapped.open();
}
@Override
public void openAsync(final SingleResultCallback<Void> callback) {
isTrue("open", !isClosed.get());
wrapped.openAsync(callback);
}
@Override
public void close() {
// All but the first call is a no-op
if (!isClosed.getAndSet(true)) {
if (!DefaultConnectionPool.this.closed) {
connectionPoolListener.connectionCheckedIn(new ConnectionCheckedInEvent(getId(wrapped)));
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Checked in connection [%s] to server %s", getId(wrapped), serverId.getAddress()));
}
}
pool.release(wrapped, wrapped.isClosed() || shouldPrune(wrapped));
}
}
@Override
public boolean opened() {
isTrue("open", !isClosed.get());
return wrapped.opened();
}
@Override
public boolean isClosed() {
return isClosed.get() || wrapped.isClosed();
}
@Override
public ByteBuf getBuffer(final int capacity) {
return wrapped.getBuffer(capacity);
}
@Override
public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId) {
isTrue("open", !isClosed.get());
try {
wrapped.sendMessage(byteBuffers, lastRequestId);
} catch (MongoException e) {
incrementGenerationOnSocketException(this, e);
throw e;
}
}
@Override
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final SessionContext sessionContext) {
isTrue("open", !isClosed.get());
try {
return wrapped.sendAndReceive(message, decoder, sessionContext);
} catch (MongoException e) {
incrementGenerationOnSocketException(this, e);
throw e;
}
}
@Override
public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<T> decoder,
final SessionContext sessionContext, final SingleResultCallback<T> callback) {
isTrue("open", !isClosed.get());
wrapped.sendAndReceiveAsync(message, decoder, sessionContext, new SingleResultCallback<T>() {
@Override
public void onResult(final T result, final Throwable t) {
if (t != null) {
incrementGenerationOnSocketException(PooledConnection.this, t);
}
callback.onResult(result, t);
}
});
}
@Override
public ResponseBuffers receiveMessage(final int responseTo) {
isTrue("open", !isClosed.get());
try {
return wrapped.receiveMessage(responseTo);
} catch (MongoException e) {
incrementGenerationOnSocketException(this, e);
throw e;
}
}
@Override
public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequestId, final SingleResultCallback<Void> callback) {
isTrue("open", !isClosed.get());
wrapped.sendMessageAsync(byteBuffers, lastRequestId, new SingleResultCallback<Void>() {
@Override
public void onResult(final Void result, final Throwable t) {
if (t != null) {
incrementGenerationOnSocketException(PooledConnection.this, t);
}
callback.onResult(null, t);
}
});
}
@Override
public void receiveMessageAsync(final int responseTo, final SingleResultCallback<ResponseBuffers> callback) {
isTrue("open", !isClosed.get());
wrapped.receiveMessageAsync(responseTo, new SingleResultCallback<ResponseBuffers>() {
@Override
public void onResult(final ResponseBuffers result, final Throwable t) {
if (t != null) {
incrementGenerationOnSocketException(PooledConnection.this, t);
}
callback.onResult(result, t);
}
});
}
@Override
public ConnectionDescription getDescription() {
isTrue("open", !isClosed.get());
return wrapped.getDescription();
}
}
private class UsageTrackingInternalConnectionItemFactory implements ConcurrentPool.ItemFactory<UsageTrackingInternalConnection> {
private final InternalConnectionFactory internalConnectionFactory;
UsageTrackingInternalConnectionItemFactory(final InternalConnectionFactory internalConnectionFactory) {
this.internalConnectionFactory = internalConnectionFactory;
}
@Override
public UsageTrackingInternalConnection create(final boolean initialize) {
UsageTrackingInternalConnection internalConnection =
new UsageTrackingInternalConnection(internalConnectionFactory.create(serverId), generation.get());
if (initialize) {
internalConnection.open();
}
connectionPoolListener.connectionAdded(new ConnectionAddedEvent(getId(internalConnection)));
return internalConnection;
}
@Override
public void close(final UsageTrackingInternalConnection connection) {
if (!closed) {
connectionPoolListener.connectionRemoved(new ConnectionRemovedEvent(getId(connection)));
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Closed connection [%s] to %s because %s.", getId(connection), serverId.getAddress(),
getReasonForClosing(connection)));
}
connection.close();
}
private String getReasonForClosing(final UsageTrackingInternalConnection connection) {
String reason;
if (connection.isClosed()) {
reason = "there was a socket exception raised by this connection";
} else if (fromPreviousGeneration(connection)) {
reason = "there was a socket exception raised on another connection from this pool";
} else if (pastMaxLifeTime(connection)) {
reason = "it is past its maximum allowed life time";
} else if (pastMaxIdleTime(connection)) {
reason = "it is past its maximum allowed idle time";
} else {
reason = "the pool has been closed";
}
return reason;
}
@Override
public Prune shouldPrune(final UsageTrackingInternalConnection usageTrackingConnection) {
return DefaultConnectionPool.this.shouldPrune(usageTrackingConnection) ? Prune.YES : Prune.NO;
}
}
}
6.最后来看一下,真正的底层连接池ConcurrentPool
ConcurrentPool使用ConcurrentLinkedDeque来保存连接,通过Semaphore来控制最大连接数,当需要获取连接时,首先获取Semaphore,双向队列的队尾有元素,则poll出,无则新建连接,使用完成后将元素放回队尾,并释放Semaphore.
public class ConcurrentPool<T> implements Pool<T> {
private final int maxSize;
private final ItemFactory<T> itemFactory;
private final ConcurrentLinkedDeque<T> available = new ConcurrentLinkedDeque<T>();
private final Semaphore permits;
private volatile boolean closed;
public enum Prune {
/**
* Prune this element
*/
YES,
/**
* Don't prone this element
*/
NO,
/**
* Don't prune this element and stop attempting to prune additional elements
*/
STOP
}
/**
* Factory for creating and closing pooled items.
*
* @param <T>
*/
public interface ItemFactory<T> {
T create(boolean initialize);
void close(T t);
Prune shouldPrune(T t);
}
/**
* Initializes a new pool of objects.
*
* @param maxSize max to hold to at any given time. if < 0 then no limit
* @param itemFactory factory used to create and close items in the pool
*/
public ConcurrentPool(final int maxSize, final ItemFactory<T> itemFactory) {
this.maxSize = maxSize;
this.itemFactory = itemFactory;
permits = new Semaphore(maxSize, true);
}
/**
* Return an instance of T to the pool. This method simply calls {@code release(t, false)}
*
* @param t item to return to the pool
*/
@Override
public void release(final T t) {
release(t, false);
}
/**
* call done when you are done with an object from the pool if there is room and the object is ok will get added
*
* @param t item to return to the pool
* @param prune true if the item should be closed, false if it should be put back in the pool
*/
@Override
public void release(final T t, final boolean prune) {
if (t == null) {
throw new IllegalArgumentException("Can not return a null item to the pool");
}
if (closed) {
close(t);
return;
}
if (prune) {
close(t);
} else {
available.addLast(t);
}
releasePermit();
}
/**
* Gets an object from the pool. This method will block until a permit is available.
*
* @return An object from the pool.
*/
@Override
public T get() {
return get(-1, TimeUnit.MILLISECONDS);
}
/**
* Gets an object from the pool - will block if none are available
*
* @param timeout negative - forever 0 - return immediately no matter what positive ms to wait
* @param timeUnit the time unit of the timeout
* @return An object from the pool, or null if can't get one in the given waitTime
* @throws MongoTimeoutException if the timeout has been exceeded
*/
@Override
public T get(final long timeout, final TimeUnit timeUnit) {
if (closed) {
throw new IllegalStateException("The pool is closed");
}
if (!acquirePermit(timeout, timeUnit)) {
throw new MongoTimeoutException(String.format("Timeout waiting for a pooled item after %d %s", timeout, timeUnit));
}
T t = available.pollLast();
if (t == null) {
t = createNewAndReleasePermitIfFailure(false);
}
return t;
}
public void prune() {
for (RemovalReportingIterator<T> iter = available.iterator(); iter.hasNext();) {
T cur = iter.next();
Prune shouldPrune = itemFactory.shouldPrune(cur);
if (shouldPrune == Prune.STOP) {
break;
}
if (shouldPrune == Prune.YES) {
boolean removed = iter.reportingRemove();
if (removed) {
close(cur);
}
}
}
}
public void ensureMinSize(final int minSize, final boolean initialize) {
while (getCount() < minSize) {
if (!acquirePermit(10, TimeUnit.MILLISECONDS)) {
break;
}
release(createNewAndReleasePermitIfFailure(initialize));
}
}
private T createNewAndReleasePermitIfFailure(final boolean initialize) {
try {
T newMember = itemFactory.create(initialize);
if (newMember == null) {
throw new MongoInternalException("The factory for the pool created a null item");
}
return newMember;
} catch (RuntimeException e) {
permits.release();
throw e;
}
}
protected boolean acquirePermit(final long timeout, final TimeUnit timeUnit) {
try {
if (closed) {
return false;
} else if (timeout >= 0) {
return permits.tryAcquire(timeout, timeUnit);
} else {
permits.acquire();
return true;
}
} catch (InterruptedException e) {
throw new MongoInterruptedException("Interrupted acquiring a permit to retrieve an item from the pool ", e);
}
}
protected void releasePermit() {
permits.release();
}
/**
* Clears the pool of all objects.
*/
@Override
public void close() {
closed = true;
Iterator<T> iter = available.iterator();
while (iter.hasNext()) {
T t = iter.next();
close(t);
iter.remove();
}
}
public int getMaxSize() {
return maxSize;
}
public int getInUseCount() {
return maxSize - permits.availablePermits();
}
public int getAvailableCount() {
return available.size();
}
public int getCount() {
return getInUseCount() + getAvailableCount();
}
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append("pool: ")
.append(" maxSize: ").append(maxSize)
.append(" availableCount ").append(getAvailableCount())
.append(" inUseCount ").append(getInUseCount());
return buf.toString();
}
// swallow exceptions from ItemFactory.close()
private void close(final T t) {
try {
itemFactory.close(t);
} catch (RuntimeException e) {
// ItemFactory.close() really should not throw
}
}
}
7.最后我们来看一下connectionsPerHost和threadsAllowedToBlockForConnectionMultiplier
connectionsPerHost是指允许本地连接到集群中一台主机的最大连接数,也就是上文中连接池的最大值(Semaphore的值)。
threadsAllowedToBlockForConnectionMultiplier代表一个系数,举个例子connectionsPerHost=10,threadsAllowedToBlockForConnectionMultiplier=5,则表示最多允许有5*10=50个线程去连接池申请连接(当然最大仅能有10个获取到连接,其余的全在阻塞等待),当第51个线程想去获取连接时,直接拒绝。
最后来看一下几个重要的日志点:
InternalStreamConnection LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress()));
DefaultConnectionPool LOGGER.warn(format("Got socket exception on connection [%s] to %s. All connections to %s will be closed.",getId(connection), serverId.getAddress(), serverId.getAddress()));