阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hikari连接池源码解读

71次阅读
没有评论

共计 24297 个字符,预计需要花费 61 分钟才能阅读完成。

导读 几年前,我最开始接触的数据库连接池是 C3P0,后来是阿里的 Druid,但随着 Springboot 2.0 选择 HikariCP 作为默认数据库连接池这一事件之后,HikariCP 作为一个后起之秀出现在大众的视野中,以其速度快,性能高等特点受到越来越多人青睐。
写在前面

几年前,我最开始接触的数据库连接池是 C3P0,后来是阿里的 Druid,但随着 Springboot 2.0 选择 HikariCP 作为默认数据库连接池这一事件之后,HikariCP 作为一个后起之秀出现在大众的视野中,以其速度快,性能高等特点受到越来越多人青睐。

在实际开发工作中,数据库一直是引发报警的重灾区,而与数据库打交道的就是 Hikari 连接池,看懂 Hikari 报警日志并定位异常原因,是实际工作中必不可少的技能!

本文以 Hikari 2.7.9 版本源码进行分析,带大家理解 Hikari 原理,学会处理线上问题!

源码地址:​​https://gitee.com/mirrors/hikaricp/tree/HikariCP-2.7.9/​

1、概念释义

在学习一项技术之前,需要先在宏观的层面去看到它的位置,比如我们今天学习的 HikariCP,它在什么位置?

Hikari 连接池源码解读

以 Spring Boot 项目为例,我们有 Service 业务层,编写业务代码,而与数据库打交道的是 ORM 框架(例如 MyBatis),ORM 框架的下一层是 Hikari 连接池,Hikari 连接池的下一层是 MySQL 驱动,MySQL 驱动的下一层是 MySQL 服务器。理解了这个宏观层次,我们再去学习 Hikari 就不会学的那么稀里糊涂了。

其次,我们需要明白数据库连接池是干什么的?

简单来说,数据库连接池负责分配、管理和释放数据库的连接。有了数据库连接池就可以复用数据库连接,可以避免连接频繁建立、关闭的开销,提升系统的性能。它可以帮助我们释放过期的数据库连接,避免因为使用过期的数据库连接而引起的异常。

至于 Hikari,它是一个“零开销”生产就绪的 JDBC 连接池。库非常轻,大约 130 Kb。

2、配置使用

我们先来看一个线上 Hikari 连接池配置需要哪些参数。

@Bean("dataSource")
public DataSource dataSource() {HikariConfig cfg = new HikariConfig();
    // 从池中借出的连接是否默认自动提交事务,默认开启
    cfg.setAutoCommit(false);
    // 从池中获取连接时的等待时间
    cfg.setConnectionTimeout();
    // MYSQL 连接相关
    cfg.setJdbcUrl();
    cfg.setDriverClassName();
    cfg.setUsername();
    cfg.setPassword();
    // 连接池的最大容量
    cfg.setMaximumPoolSize();
    // 连接池的最小容量,官网不建议设置,保持与 MaximumPoolSize 一致,从而获得最高性能和对峰值需求的响应
    // cfg.setMinimumIdle();
    // 连接池的名称,用于日志监控,多数据源要区分
    cfg.setPoolName();
    // 池中连接的最长存活时间,要比数据库的 wait_timeout 时间要小不少
    cfg.setMaxLifetime();
    // 连接在池中闲置的最长时间,仅在 minimumIdle 小于 maximumPoolSize 时生效(本配置不生效)cfg.setIdleTimeout();
    // 连接泄露检测,默认 0 不开启
    // cfg.setLeakDetectionThreshold();
    // 测试链接是否有效的超时时间,默认 5 秒
    // cfg.setValidationTimeout();
    // MYSQL 驱动环境变量
    // 字符编解码
    cfg.addDataSourceProperty("characterEncoding",);
    cfg.addDataSourceProperty("useUnicode",);
    // 较新版本的 MySQL 支持服务器端准备好的语句
    cfg.addDataSourceProperty("useServerPrepStmts",);
    // 缓存 SQL 开关
    cfg.addDataSourceProperty("cachePrepStmts",);
    // 缓存 SQL 数量
    cfg.addDataSourceProperty("prepStmtCacheSize",);
    // 缓存 SQL 长度,默认 256
    // prepStmtCacheSqlLimit
    return new HikariDataSource(cfg);
}

官方配置说明:​​https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby​

3、源码分析
1)分析入口

万事开头难,下载 Hikari 源码到本地后该从哪开始去看呢?不妨从下面两个入口去分析。

// 1、初始化入口
new HikariDataSource(cfg)

// 2、获取连接
public interface DataSource  extends CommonDataSource, Wrapper {Connection getConnection() throws SQLException;
}
2)初始化分析

初始化分析主要有两部分工作,一是校验配置并且会矫正不符合规范的配置;二是实例化 Hikari 连接池。

public HikariDataSource(HikariConfig configuration)
   {
      // 1、校验配置 并 矫正配置
      configuration.validate();
      configuration.copyStateTo(this);

      LOGGER.info("{} - Starting...", configuration.getPoolName());
      // 2、创建连接池,注意这里设置了 fastPathPool 
      pool = fastPathPool = new HikariPool(this);
      LOGGER.info("{} - Start completed.", configuration.getPoolName());

      this.seal();}

矫正配置
校验配置会直接抛异常,大部分坑来源于矫正配置这一步,这会使你的配置不生效。

private void validateNumerics() {
      // maxLifetime 链接最大存活时间最低 30 秒,小于 30 秒不生效
      if (maxLifetime != 0 && maxLifetime  maxLifetime && maxLifetime > 0) {LOGGER.warn("{} - idleTimeout is close to or more than maxLifetime, disabling it.", poolName);
         idleTimeout = 0;
      }
      // idleTimeout 空闲超时不能低于默认值 10 秒
      if (idleTimeout != 0 && idleTimeout  0 && !unitTest) {if (leakDetectionThreshold  maxLifetime && maxLifetime > 0)) {LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName);
            leakDetectionThreshold = 0;
         }
      }
      // 从连接池获取连接时最大等待时间,默认值 30 秒, 低于 250 毫秒不生效
      if (connectionTimeout  maxPoolSize) {minIdle = maxPoolSize;}
   }

创建连接池
通过分析连接池实例化过程,可以看到 Hikari 的作者是多么喜欢用异步操作了,包括空闲线程处理、添加连接、关闭连接、连接泄露检测等。

这一步会创建 1 个 LinkedBlockQueue 阻塞队列,需要明确的是,这个队列并不是实际连接池的队列,只是用来放置添加连接的请求。

public HikariPool(final HikariConfig config)
   {super(config);
      // 创建 ConcurrentBag 管理连接池,有连接池的四个重要操作:borrow 获取连接,requite 归还连接,add 添加连接,remove 移除连接。this.connectionBag = new ConcurrentBag(this);
      // getConnection 获取连接时的并发控制,默认关闭
      this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
      // 空闲线程池 处理定时任务
      this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
      // 快速预检查 创建 1 个链接
      checkFailFast();
      // Metrics 监控收集相关
      if (config.getMetricsTrackerFactory() != null) {setMetricsTrackerFactory(config.getMetricsTrackerFactory());
      }
      else {setMetricRegistry(config.getMetricRegistry());
      }
      // 健康检查注册相关,默认 无
      setHealthCheckRegistry(config.getHealthCheckRegistry());
      // 处理 JMX 监控相关
      registerMBeans(this);

      ThreadFactory threadFactory = config.getThreadFactory();
      // 创建 maxPoolSize 大小的 LinkedBlockQueue 阻塞队列,用来构造 addConnectionExecutor
      LinkedBlockingQueue addConnectionQueue = new LinkedBlockingQueue(config.getMaximumPoolSize());
      // 镜像只读队列
      this.addConnectionQueue = unmodifiableCollection(addConnectionQueue);
      // 创建 添加连接的 线程池,实际线程数只有 1,拒绝策略是丢弃不处理
      this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + "connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
      // 创建 关闭连接的 线程池,实际线程数只有 1,拒绝策略是调用线程同步执行
      this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + "connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
      // 创建 检测连接泄露 的工厂,使用的时候只需要传 1 个连接对象
      this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
      // 延时 100ms 后,开启任务,每 30s 执行空闲线程处理
      this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS);
   }
3)获取连接分析

Hikari 的连接获取分为两步,一是调用 connectionBag.borrow() 方法从池中获取连接,这里等待超时时间是 connectionTimeout;二是获取连接后,会主动检测连接是否可用,如果不可用会关闭连接,连接可用的话会绑定一个定时任务用于连接泄露的检测。

很多时候,会在异常日志中看到 Connection is not available 错误日志后携带的 request timed out 耗时远超 connectionTimeout,仔细分析源码这也是合理的。

HikariDataSource

@Override
   public Connection getConnection() throws SQLException
   {if (isClosed()) {throw new SQLException("HikariDataSource" + this + "has been closed.");
      }
      // 因为初始化 HikariDataSource 的时候已经设置了,所以这里直接走 return
      if (fastPathPool != null) {return fastPathPool.getConnection();
      }

      // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
      HikariPool result = pool;
      if (result == null) {synchronized (this) {
            result = pool;
            if (result == null) {validate();
               LOGGER.info("{} - Starting...", getPoolName());
               try {pool = result = new HikariPool(this);
                  this.seal();}
               catch (PoolInitializationException pie) {if (pie.getCause() instanceof SQLException) {throw (SQLException) pie.getCause();}
                  else {throw pie;}
               }
               LOGGER.info("{} - Start completed.", getPoolName());
            }
         }
      }

      return result.getConnection();}
HikariPool
   public Connection getConnection() throws SQLException
   {
      // 这里传了设置的链接超时
      return getConnection(connectionTimeout);
   }

   public Connection getConnection(final long hardTimeout) throws SQLException
   {suspendResumeLock.acquire(); // 并发数量控制,默认关闭
      final long startTime = currentTime();

      try {
         long timeout = hardTimeout;
         do {
            // 此处等待 connectionTimeout,获取不到抛异常
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {break; // We timed out... break and throw exception}
            
            final long now = currentTime();
            // 移除已经标记为废弃的连接 或者 空闲超过 500 毫秒且不可用的连接(超时时间是 validationTimeout,默认 5 秒)if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            else {metricsTracker.recordBorrowStats(poolEntry, startTime);
               // 先添加连接泄露检测任务,再通过 Javassist 创建代理连接
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         metricsTracker.recordBorrowTimeoutStats(startTime);
         // 抛异常 Connection is not available, request timed out after {}ms.
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {Thread.currentThread().interrupt();
         throw new SQLException(poolName + "- Interrupted during connection acquisition", e);
      }
      finally {suspendResumeLock.release();
      }
   }
4)空闲连接回收

Hikari 在初始化连接池的时候,就已经开启了一条异步定时任务。该任务每 30 秒执行一次空闲连接回收,代码如下:

/**
    * The house keeping task to retire and maintain minimum idle connections.
    * 用于补充和移除最小空闲连接的管理任务。*/
   private final class HouseKeeper implements Runnable
   {private volatile long previous = plusMillis(currentTime(), -HOUSEKEEPING_PERIOD_MS);

      @Override
      public void run()
      {
         try {
            // refresh timeouts in case they changed via MBean
            connectionTimeout = config.getConnectionTimeout();
            validationTimeout = config.getValidationTimeout();
            leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());

            final long idleTimeout = config.getIdleTimeout();
            final long now = currentTime();

            // Detect retrograde time, allowing +128ms as per NTP spec.
            // 为了防止时钟回拨,给了 128ms 的 gap,正常情况下,ntp 的校准回拨不会超过 128ms
            // now = plusMillis(previous, HOUSEKEEPING_PERIOD_MS) + 100ms
            if (plusMillis(now, 128)  plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2)) {
               // No point evicting for forward clock motion, this merely accelerates connection retirement anyway
               LOGGER.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
            }

            previous = now;

            String afterPrefix = "Pool";
            // 回收符合条件的空闲连接:如果最小连接数等于最大连接数,就不会回收
            if (idleTimeout > 0L && config.getMinimumIdle()  notInUse = connectionBag.values(STATE_NOT_IN_USE);
               int toRemove = notInUse.size() - config.getMinimumIdle();
               for (PoolEntry entry : notInUse) {
                  // 有空闲连接 且 空闲时间达标 且 CAS 更改状态成功
                  if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
                     // 关闭连接
                     closeConnection(entry, "(connection has passed idleTimeout)");
                     toRemove--;
                  }
               }
            }

            logPoolState(afterPrefix);
            // 补充链接
            fillPool(); // Try to maintain minimum connections}
         catch (Exception e) {LOGGER.error("Unexpected exception in housekeeping task", e);
         }
      }
   }
5)存活时间处理

Hikari 在创建一个连接实例的时候,就已经为其绑定了一个定时任务用于关闭连接。

private PoolEntry createPoolEntry()
   {
      try {final PoolEntry poolEntry = newPoolEntry();

         final long maxLifetime = config.getMaxLifetime();
         if (maxLifetime > 0) {
            // variance up to 2.5% of the maxlifetime
            // 减去一部分随机数,避免大范围连接断开
            final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40) : 0;
            final long lifetime = maxLifetime - variance;
            // 此处 maxLifetime 不能超过数据库最大允许连接时间
            poolEntry.setFutureEol(houseKeepingExecutorService.schedule(() -> {if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {addBagItem(connectionBag.getWaitingThreadCount());
                  }
               },
               lifetime, MILLISECONDS));
         }

         return poolEntry;
      }
      catch (Exception e) {if (poolState == POOL_NORMAL) {// we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
            LOGGER.debug("{} - Cannot acquire connection from data source", poolName, (e instanceof ConnectionSetupException ? e.getCause() : e));
         }
         return null;
      }
   }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
关闭连接的过程是先将连接实例标记为废弃,这样哪怕因为连接正在使用导致关闭失败,也可以在下次获取连接时再对其进行关闭。复制
private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner)
   {
      // 先标记为废弃、哪怕下面关闭失败,getConnection 时也会移除
      poolEntry.markEvicted();
      // 使用中的连接不会关闭
      if (owner || connectionBag.reserve(poolEntry)) {closeConnection(poolEntry, reason);
         return true;
      }

      return false;
   }
6)连接泄露处理

Hikari 在处理连接泄露时使用到了工厂模式,只需要将连接实例 PoolEntry 传入工厂,即可提交连接泄露检测的延时任务。而所谓的链接泄露检测只是打印 1 次 WARN 日志。

class ProxyLeakTaskFactory
{
   private ScheduledExecutorService executorService;
   private long leakDetectionThreshold;

   ProxyLeakTaskFactory(final long leakDetectionThreshold, final ScheduledExecutorService executorService)
   {
      this.executorService = executorService;
      this.leakDetectionThreshold = leakDetectionThreshold;
   }
   // 1、传入连接对象
   ProxyLeakTask schedule(final PoolEntry poolEntry)
   {  // 连接泄露检测时间等于 0 不生效
      return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry);
   }

   void updateLeakDetectionThreshold(final long leakDetectionThreshold)
   {this.leakDetectionThreshold = leakDetectionThreshold;}
   // 2、提交延时任务
   private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {ProxyLeakTask task = new ProxyLeakTask(poolEntry);
      task.schedule(executorService, leakDetectionThreshold);

      return task;
   }
}
ProxyLeakTask
class ProxyLeakTask implements Runnable
{private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class);
   static final ProxyLeakTask NO_LEAK;

   private ScheduledFuture> scheduledFuture;
   private String connectionName;
   private Exception exception;
   private String threadName;
   private boolean isLeaked;

   static
   {NO_LEAK = new ProxyLeakTask() {
         @Override
         void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}

         @Override
         public void run() {}

         @Override
         public void cancel() {}
      };
   }

   ProxyLeakTask(final PoolEntry poolEntry)
   {this.exception = new Exception("Apparent connection leak detected");
      this.threadName = Thread.currentThread().getName();
      this.connectionName = poolEntry.connection.toString();}

   private ProxyLeakTask()
   { }

   void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold)
   {scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS);
   }

   /** {@inheritDoc} */
   @Override
   public void run()
   {
      isLeaked = true;

      final StackTraceElement[] stackTrace = exception.getStackTrace();
      final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
      System.arraycopy(stackTrace, 5, trace, 0, trace.length);
      // 打印 1 次连接泄露的 WARN 日志
      exception.setStackTrace(trace);
      LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
   }

   void cancel()
   {scheduledFuture.cancel(false);
      if (isLeaked) {LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName);
      }
   }
}
7)连接池类分析

ConcurrentBag 才是真正的连接池,也是 Hikari“零开销”的奥秘所在。

简而言之,Hikari 通过 CopyOnWriteArrayList + State(状态)+ CAS 来避免了上锁。

CopyOnWriteArrayList 存放真正的连接对象,每个连接对象都有四种状态:

  • STATE_NOT_IN_USE:空闲
  • STATE_IN_USE:活跃
  • STATE_REMOVED:移除
  • STATE_RESERVED:不可用
  • 比如在获取连接时,通过调用 bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE) 方法解决并发问题。

    public class ConcurrentBag implements AutoCloseable {private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
    
       // 所有连接:通过 CopyOnWriteArrayList + State + cas 来避免了上锁
       private final CopyOnWriteArrayList sharedList;
       // threadList 是否使用弱引用
       private final boolean weakThreadLocals;
       // 归还的时候缓存空闲连接到 ThreadLocal:requite()、borrow()
       private final ThreadLocal> threadList;
       private final IBagStateListener listener;
       // 等待获取连接的线程数:调 borrow() 方法 +1,调完 -1
       private final AtomicInteger waiters;
       // 连接池关闭标识
       private volatile boolean closed;
       // 队列大小为 0 的阻塞队列:生产者消费者模式
       private final SynchronousQueue handoffQueue;
    
       public interface IConcurrentBagEntry {
          int STATE_NOT_IN_USE = 0; // 空闲
          int STATE_IN_USE = 1; // 活跃
          int STATE_REMOVED = -1; // 移除
          int STATE_RESERVED = -2; // 不可用
    
          boolean compareAndSet(int expectState, int newState);
    
          void setState(int newState);
    
          int getState();}
    
       public interface IBagStateListener {void addBagItem(int waiting);
       }
       
       public ConcurrentBag(final IBagStateListener listener) {
          this.listener = listener;
          this.weakThreadLocals = useWeakThreadLocals();
    
          this.handoffQueue = new SynchronousQueue(true);
          this.waiters = new AtomicInteger();
          this.sharedList = new CopyOnWriteArrayList();
          if (weakThreadLocals) {this.threadList = ThreadLocal.withInitial(() -> new ArrayList(16));
          } else {this.threadList = ThreadLocal.withInitial(() -> new FastList(IConcurrentBagEntry.class, 16));
          }
       }
    
       public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
          // Try the thread-local list first
          // 先从 threadLocal 缓存中获取
          final List list = threadList.get();
          for (int i = list.size() - 1; i >= 0; i--) {
             // 从尾部读取:后缓存的优先用,细节!final Object entry = list.remove(i);
             @SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry;
             if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}
          }
    
          // Otherwise, scan the shared list ... then poll the handoff queue
          // 如果本地缓存获取不到,从 shardList 连接池中获取,等待连接数 +1
          final int waiting = waiters.incrementAndGet();
          try {for (T bagEntry : sharedList) {if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                   // If we may have stolen another waiter's connection, request another bag add.
                   // 并发情况下,保证能够及时补充连接
                   if (waiting > 1) {listener.addBagItem(waiting - 1);
                   }
                   return bagEntry;
                }
             }
             // 如果 shardList 连接池中也没获得连接,提交添加连接的异步任务,然后再从 handoffQueue 阻塞获取。listener.addBagItem(waiting);
    
             timeout = timeUnit.toNanos(timeout);
             do {final long start = currentTime();
                final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
                if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}
    
                timeout -= elapsedNanos(start);
             } while (timeout > 10_000);
    
             return null;
          } finally {
             // 等待连接数减 1
             waiters.decrementAndGet();}
       }
       
       public void requite(final T bagEntry) {bagEntry.setState(STATE_NOT_IN_USE);
          // 如果有线程正在获取链接,则优先通过 handoffQueue 阻塞队列归还给其他线程使用
          for (int i = 0; waiters.get() > 0; i++) {if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {return;} else if ((i & 0xff) == 0xff) {
                // 每遍历 255 个休眠 10 微妙
                parkNanos(MICROSECONDS.toNanos(10));
             } else {
                // 线程让步
                yield();}
          }
    
          // 没有其它线程用,就放入本地缓存
          final List threadLocalList = threadList.get();
          threadLocalList.add(weakThreadLocals ? new WeakReference(bagEntry) : bagEntry);
       }
    
       public void add(final T bagEntry) {if (closed) {LOGGER.info("ConcurrentBag has been closed, ignoring add()");
             throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
          }
    
          sharedList.add(bagEntry);
    
          // spin until a thread takes it or none are waiting
          // 如果有线程等待获取连接,循环通过 handoffQueue 提交连接 
          while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {yield();
          }
       }
    
       public boolean remove(final T bagEntry) {
          // 使用 CAS 将连接置为 STATE_REMOVED 状态
          if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
             return false;
          }
          // CAS 成功后再删除连接
          final boolean removed = sharedList.remove(bagEntry);
          if (!removed && !closed) {LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
          }
    
          return removed;
       }
    
       @Override
       public void close() {closed = true;}
    
       public boolean reserve(final T bagEntry) {return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
       } 
    }
    4、报警实战
    1)实战一

    报警日志
    先来看一个真实的线上报警:

    Caused by: org.springframework.jdbc.CannotGetJdbcConnectionException: Failed to obtain JDBC Connection; nested exception is java.sql.SQLTransientConnectionException: hikari-pool - Connection is not available, request timed out after 6791ms.
      at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81)
      at org.mybatis.spring.transaction.SpringManagedTransaction.openConnection(SpringManagedTransaction.java:80)
      at org.mybatis.spring.transaction.SpringManagedTransaction.getConnection(SpringManagedTransaction.java:67)
      at org.apache.ibatis.executor.BaseExecutor.getConnection(BaseExecutor.java:337)
      at org.apache.ibatis.executor.SimpleExecutor.prepareStatement(SimpleExecutor.java:86)
      at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:62)
      at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:325)
      at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:156)
      at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:109)
      at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:89)
      ... 125 common frames omitted
    Caused by: java.sql.SQLTransientConnectionException: hikari-pool-storecenter - Connection is not available, request timed out after 6791ms.
      at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:669)
      at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183)
      at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148)
      at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:100)
      at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:151)
      at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:115)
      at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:78)
      ... 134 common frames omitted
    Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: [DataSource IP:127.0.0.1:3306] No operations allowed after connection closed.
      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
      at com.mysql.jdbc.Util.getInstance(Util.java:408)
      at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:919)
      at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
      at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
      at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
      at com.mysql.jdbc.ConnectionImpl.throwConnectionClosedException(ConnectionImpl.java:1184)
      at com.mysql.jdbc.ConnectionImpl.checkClosed(ConnectionImpl.java:1179)
      at com.mysql.jdbc.ConnectionImpl.setNetworkTimeout(ConnectionImpl.java:5498)
      at com.zaxxer.hikari.pool.PoolBase.setNetworkTimeout(PoolBase.java:541)
      at com.zaxxer.hikari.pool.PoolBase.isConnectionAlive(PoolBase.java:162)
      at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:172)
      ... 139 common frames omitted

    思路分析
    No operations allowed after connection closed 表示访问了已经被 MySQL 关闭的连接。

    request timed out after 6791ms 包含等待连接超时 connectionTimeout(配置 2 秒)和测试连接可用 validationTimeout(默认 5 秒)两个时间。

    boolean isConnectionAlive(final Connection connection)
       {
          try {
             try {setNetworkTimeout(connection, validationTimeout);
                // validationTimeout 默认 5 秒,最低 1 秒
                final int validationSeconds = (int) Math.max(1000L, validationTimeout) / 1000;
    
                // 测试链接是否有效
                if (isUseJdbc4Validation) {return connection.isValid(validationSeconds);
                }
    
                try (Statement statement = connection.createStatement()) {if (isNetworkTimeoutSupported != TRUE) {setQueryTimeout(statement, validationSeconds);
                   }
    
                   statement.execute(config.getConnectionTestQuery());
                }
             }
             finally {setNetworkTimeout(connection, networkTimeout);
    
                if (isIsolateInternalQueries && !isAutoCommit) {connection.rollback();
                }
             }
    
             return true;
          }
          catch (Exception e) {lastConnectionFailure.set(e);
             // 此处打印 WARN 日志,可以通过 console.log 查看是否存在 获取到已被关闭连接 的情况
             LOGGER.warn("{} - Failed to validate connection {} ({})", poolName, connection, e.getMessage());
             return false;
          }
       }

    查看 console.log,存在大量获取到已关闭连接的情况:

    2022-06-15 01:34:20.445 WARN com.zaxxer.hikari.pool.PoolBase          : hikari-pool - Failed to validate connection com.mysql.jdbc.JDBC4Connection@200203c3 ([DataSource IP:127.0.0.1:3306] No operations allowed after connection closed.)

    所以推断报警原因是因为获取到已经被数据库关闭的连接。

    解决方法
    DBA 反馈数据库的 wait_timeout 是 600 秒,线上配置的 maxLifeTime 是 900 秒,配置有误,更改为 450 秒。

    上线后验证 console.log 不再持续打印 Failed to validate connection 日志,并且没有 No operations allowed after connection closed 报警日志。

    2)实战二

    报警日志
    优化上线后,观察到又发生了几十条报警,并且只集中在 1 台机器:

    Caused by: org.springframework.jdbc.CannotGetJdbcConnectionException: Failed to obtain JDBC Connection; nested exception is java.sql.SQLTransientConnectionException: hikari-pool - Connection is not available, request timed out after 2000ms.
      at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81)
      at org.mybatis.spring.transaction.SpringManagedTransaction.openConnection(SpringManagedTransaction.java:80)
      at org.mybatis.spring.transaction.SpringManagedTransaction.getConnection(SpringManagedTransaction.java:67)
      at org.apache.ibatis.executor.BaseExecutor.getConnection(BaseExecutor.java:337)
      at org.apache.ibatis.executor.SimpleExecutor.prepareStatement(SimpleExecutor.java:86)
      at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:62)
      at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:325)
      at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:156)
      at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:109)
      at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:89)
      ... 125 common frames omitted
    Caused by: java.sql.SQLTransientConnectionException: hikari-pool - Connection is not available, request timed out after 2000ms.
      at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:669)
      at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183)
      at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148)
      at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:100)
      at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:151)
      at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:115)
      at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:78)
      ... 134 common frames omitted

    思路分析
    报警日志中没有 No operations allowed after connection closed,且耗时为 connectionTimeout,推测是没有获取到连接,原因可能有:

    机器异常:机器负载过大有可能引起 IO 夯。

    连接池被打满:比如存在慢 SQL,或者流量太大支撑不住等,连接数实在不够用。Hikari 提供 HikariPoolMXBean 接口获取连接池监控信息。

    连接泄露:开启连接泄露参数后,可在日志中查看。

    解决方法
    机器异常:迁移机器,观察后续情况。

    连接池被打满:增加 Hikari 连接池监控日志,观察连接池使用情况,进一步再做判断。比如可以通过一个定时任务,每秒打印连接池相关状态:

    @Slf4j
    @Component
    public class HikariPoolTask {
    
        @Resource
        private Map dataSourceMap;
    
        /**
         * 延时 1 秒,每隔 1 秒
         */
        @Scheduled(initialDelay = 1000, fixedDelay = 1000)
        public void run() {if (CollUtil.isNotEmpty(dataSourceMap)) {for (HikariDataSource dataSource : dataSourceMap.values()) {
                    // 连接池名称
                    String poolName = dataSource.getPoolName();
                    HikariPoolMXBean hikariPoolMXBean = dataSource.getHikariPoolMXBean();
                    // 活跃连接数量
                    int activeConnections = hikariPoolMXBean.getActiveConnections();
                    // 空闲连接数量
                    int idleConnections = hikariPoolMXBean.getIdleConnections();
                    // 全部连接数量
                    int totalConnections = hikariPoolMXBean.getTotalConnections();
                    // 等待连接数量
                    int threadsAwaitingConnection = hikariPoolMXBean.getThreadsAwaitingConnection();
                    log.info("{} - activeConnections={}, idleConnections={}, totalConnections={}, threadsAwaitingConnection={}",
                            poolName, activeConnections, idleConnections, totalConnections, threadsAwaitingConnection);
                }
            }
        }
    }

    连接泄露: 增加连接泄露检测参数,比如可以设置 10 秒

    leakDetectionThreshold=10000

    作者介绍
    薛师兄,在某头部互联网公司担任高级研发工程师,热衷于 Java 技术栈,对底层原理有独特的追求。

    阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配

    腾讯云新客低至 82 元 / 年,老客户 99 元 / 年

    代金券:在阿里云专用满减优惠券

    正文完
    星哥玩云-微信公众号
    post-qrcode
     0
    星锅
    版权声明:本站原创文章,由 星锅 于2024-07-25发表,共计24297字。
    转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
    【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
    阿里云-最新活动爆款每日限量供应
    评论(没有评论)
    验证码
    【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中