ssh 默认的连接数量有限,当大量请求连接 ssh 时会概率性连接失败甚至直接失败,因此需要对连接池化,当然如果不要求实时的话可以用生产者消费者。
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.8.1</version> </dependency>
DefaultPooledObject
。BasePooledObjectFactory
。GenericObjectPoolConfig
等等,常用参数如下。
0
。8
。8
。-1
(负值时为无限等待)。false
。false
。false
。-1
(负值时为无限等待)。GenericObjectPool
,它可以配置 PooledObjectFactory
(对象工厂)、 GenericObjectPoolConfig
(对象池的配置)以及 AbandonedConfig
(废弃对象检测,可以不配置)。一个最简单的使用步骤为:
PooledObject
。这里使用默认实现 DefaultPooledObject
。PooledObjectFactory
。新建 CustomPooledObjectFactory
,继承 BasePooledObjectFactory<Object>
,并实现 create()
和 wrap(Object obj)
方法,其中 create()
方法返回的是我们想要放入和借出的对象;wrap(Object obj)
方法返回的是通过 PooledObject
包装的 Object,这里可以使用默认实现 new DefaultPooledObject<Object>(obj)
。GenericObjectPoolConfig<Object>
,参数通过对应的 set
方法传入。GenericObjectPool
,构造函数为 public GenericObjectPool(final PooledObjectFactory<T> factory, final GenericObjectPoolConfig<T> config)
,第一个参数传入第二步创建的 CustomPooledObjectFactory
,第二个参数传入第三步创建的 GenericObjectPoolConfig
。代码如下,其中 main
方法为创建对象池的 demo 和使用对象池的官方 demo:
import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class CustomPooledObjectFactory extends BasePooledObjectFactory<Object> { @Override public Object create() throws Exception { return new Object(); } @Override public PooledObject<Object> wrap(Object obj) { return new DefaultPooledObject<>(obj); } public static void main(String[] args) { final GenericObjectPool<Object> objectPool = new GenericObjectPool<>(new CustomPooledObjectFactory(), new GenericObjectPoolConfig<>()); Object obj = null; try { obj = objectPool.borrowObject(); try { // 使用对象 } catch(Exception e) { // 使对象无效 objectPool.invalidateObject(obj); // 不要将对象返回到池中两次 obj = null; } finally { // 确保对象返回到池中 if(null != obj) { objectPool.returnObject(obj); } } } catch(Exception e) { // 从池中借出对象失败 } } }
sftp 是通过 ssh 完成安全的传输的,而 ssh 的连接数是有限的,具体的 ssh 连接参数见 /etc/ssh/sshd_config
的 MaxStartups
参数,默认为 10
或 10:30:100
,含义如下:
10:30:100
这种格式时表示 100% 可以连接成功的连接数。因此为了尽可能少的创建新连接就需要使用连接池。
<dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.55</version> </dependency>
首先创建被池化的对象 SftpClient
,其中 originalDir
属性用于保存登录时的原始目录,validateConnect
方法用于验证连接,disconnect
方法用于销毁连接。
package com.haibara.toys.sftp.core; import com.jcraft.jsch.*; import lombok.Data; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; /** * @author haibara */ @Data public class SftpClient { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final AtomicLong CLIENT_NUMBER = new AtomicLong(1L); private ChannelSftp channelSftp; private Session session; /** * */ private String clientInfo = "sftpclient"; /** * ssh 根目录。 * 用于判断是否成功返回连接到连接池的条件之一 */ private String originalDir; public SftpClient(SftpProperties sftpProperties) throws SftpException, JSchException { try { JSch jsch = new JSch(); session = jsch.getSession(sftpProperties.getUsername(), sftpProperties.getHost(), sftpProperties.getPort()); session.setPassword(sftpProperties.getPassword()); Properties config = new Properties(); if (sftpProperties.getSession() != null) { sftpProperties.getSession().forEach(config::put); } session.setConfig(config); session.connect(); channelSftp = (ChannelSftp) session.openChannel("sftp"); channelSftp.connect(); clientInfo += CLIENT_NUMBER.getAndIncrement() + ",createTime:" + DATE_TIME_FORMATTER.format(LocalDateTime.now()); originalDir = channelSftp.pwd(); } catch (Exception e) { disconnect(); throw e; } } public void disconnect() { if (channelSftp != null) { try { channelSftp.disconnect(); } catch (Exception ignored) { } } if (session != null) { try { session.disconnect(); } catch (Exception ignored) { } } } public boolean validateConnect() { try { return session.isConnected() && channelSftp.isConnected() && originalDir.equals(channelSftp.pwd()); } catch (Exception e) { return false; } } }
接下来和前面一样分 4 步创建。
创建被包装的对象,同样使用默认的 DefaultPooledObject
。
创建对象工厂 SftpFactory
,重写 validateObject
和 destroyObject
方法为 SftpClient
的验证和销毁方法,SftpFactory
代码见第 4 步 SftpPool
的静态内部类 SftpFactory
。
创建连接池配置(以及 sftp 的配置) SftpProperties
类和 GenericObjectPoolConfig
(连接池配置),GenericObjectPoolConfig
创建代码见第 4 步 SftpPool
的 getPoolConfig
方法 。
package com.haibara.toys.sftp.core; import lombok.Data; import java.util.Map; /** * @author haibara */ @Data public class SftpProperties { /** * 地址 */ private String host = "localhost"; /** * 端口号 */ private int port = 22; /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * Session 参数配置 */ private Map<String, String> session; /** * 连接池配置 */ private Pool pool; /** * 连接池配置类 */ @Data public static class Pool { /** * 池中最小的连接数,只有当 timeBetweenEvictionRuns 为正时才有效 */ private int minIdle = 0; /** * 池中最大的空闲连接数,为负值时表示无限 */ private int maxIdle = 8; /** * 池可以产生的最大对象数,为负值时表示无限 */ private int maxActive = 16; /** * 当池耗尽时,阻塞的最长时间,为负值时无限等待 */ private long maxWait = -1; /** * 从池中取出对象是是否检测可用 */ private boolean testOnBorrow = true; /** * 将对象返还给池时检测是否可用 */ private boolean testOnReturn = false; /** * 检查连接池对象是否可用 */ private boolean testWhileIdle = true; /** * 距离上次空闲线程检测完成多久后再次执行 */ private long timeBetweenEvictionRuns = 300000L; } }
yml 中对应的配置如下:
sftp: host: 127.0.0.1 port: 22 username: root password: 123456 session: StrictHostKeyChecking: no kex: diffie-hellman-group1-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group-exchange-sha256 pool: max-idle: 8 min-idle: 1 max-active: 16 max-wait: 150000 test-on-borrow: true test-on-return: false test-while-idle: true time-between-eviction-runs: 120000
SftpPool
。需要注意的一点是返还给连接池的连接要和新连接的状态相同,因此重写 GenericObjectPool
的 returnObject
方法,在原来的 returnObject
前恢复(还原当前目录为初始目录等等),同时在 SftpClient
的 validateConnect
方法中添加判断是否恢复的条件(判断当前目录是否是初始目录等等)。package com.haibara.toys.sftp.core; import com.jcraft.jsch.SftpException; import lombok.SneakyThrows; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import java.util.NoSuchElementException; /** * @author haibara */ public class SftpPool implements ObjectPool<SftpClient> { private final GenericObjectPool<SftpClient> internalPool; public SftpPool(SftpProperties sftpProperties) { this.internalPool = new GenericObjectPool<SftpClient>(new SftpFactory(sftpProperties), getPoolConfig(sftpProperties.getPool())){ @Override public void returnObject(SftpClient sftpClient) { try { sftpClient.getChannelSftp().cd(sftpClient.getOriginalDir()); } catch (Exception ignored) { } super.returnObject(sftpClient); } }; } @Override public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException { internalPool.addObject(); } @Override public SftpClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException { return internalPool.borrowObject(); } @Override public void clear() throws Exception, UnsupportedOperationException { internalPool.clear(); } @Override public void close() { internalPool.close(); } @Override public int getNumActive() { return internalPool.getNumActive(); } @Override public int getNumIdle() { return internalPool.getNumIdle(); } @Override public void invalidateObject(SftpClient obj) throws Exception { internalPool.invalidateObject(obj); } @Override public void returnObject(SftpClient obj) { internalPool.returnObject(obj); } private static class SftpFactory extends BasePooledObjectFactory<SftpClient> { private final SftpProperties sftpProperties; public SftpFactory(SftpProperties sftpProperties) { this.sftpProperties = sftpProperties; } @Override public SftpClient create() throws Exception { return new SftpClient(sftpProperties); } @Override public PooledObject<SftpClient> wrap(SftpClient sftpClient) { return new DefaultPooledObject<>(sftpClient); } @Override public boolean validateObject(PooledObject<SftpClient> p) { return p.getObject().validateConnect(); } @Override public void destroyObject(PooledObject<SftpClient> p) { p.getObject().disconnect(); } } private GenericObjectPoolConfig<SftpClient> getPoolConfig(SftpProperties.Pool properties) { if (properties == null) { properties = new SftpProperties.Pool(); } GenericObjectPoolConfig<SftpClient> config = new GenericObjectPoolConfig<>(); config.setMinIdle(properties.getMinIdle()); config.setMaxIdle(properties.getMaxIdle()); config.setMaxTotal(properties.getMaxActive()); config.setMaxWaitMillis(properties.getMaxWait()); config.setTestOnBorrow(properties.isTestOnBorrow()); config.setTestOnReturn(properties.isTestOnReturn()); config.setTestWhileIdle(properties.isTestWhileIdle()); config.setTimeBetweenEvictionRunsMillis(properties.getTimeBetweenEvictionRuns()); return config; } }
最后只需要创建一个 SftpPool
对象就好了,使用代码如下(SftpTemplate
是对 SftpClient
封装的工具类):
@Test void testSftp() { String localFile = ""; String remotePath = ""; String fileName = ""; SftpClient sftpClient = null; try (FileInputStream fileInputStream = new FileInputStream(localFile)) { try { sftpClient = sftpPool.borrowObject(); SftpTemplate sftpTemplate = new SftpTemplate(sftpClient); sftpTemplate.cd(remotePath); sftpTemplate.upload(fileInputStream, fileName); } catch (Exception e) { sftpPool.invalidateObject(sftpClient); sftpClient = null; } finally { if (null != sftpClient) { sftpPool.returnObject(sftpClient); } } } catch (Exception e) { // 从池中借出对象失败 } }
具体代码见 hligaty/Toys 。
apache common pool2原理与实战 - 海向 - 博客园 (cnblogs.com)