阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Java 线程池是如何工作的

85次阅读
没有评论

共计 14497 个字符,预计需要花费 37 分钟才能阅读完成。

导读 在我们的开发中“池”的概念并不罕见,有数据库连接池、线程池、对象池、常量池等等。下面我们主要针对线程池来一步一步揭开线程池的面纱。
使用线程池的好处

1、降低资源消耗

可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

2、提高响应速度

当任务到达时,任务可以不需要等到线程创建就能立即执行。

3、提高线程的可管理性

线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

线程池的工作原理

首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的

1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。

2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步

3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

线程池饱和策略

这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:

AbortPolicy

为 Java 线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记 ThreadPoolExecutor.execute 需要 try catch,否则程序会直接退出。

DiscardPolicy

直接抛弃,任务不执行,空方法

DiscardOldestPolicy

从队列里面抛弃 head 的一个任务,并再次 execute 此 task。

CallerRunsPolicy

在调用 execute 的线程里面执行此 command,会阻塞入口

用户自定义拒绝策略(最常用)

实现 RejectedExecutionHandler,并自己定义策略模式

下我们以 ThreadPoolExecutor 为例展示下线程池的工作流程图

Java 线程池是如何工作的

Java 线程池是如何工作的

1、如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2、如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。

3、如果无法将任务加入 BlockingQueue(队列已满),则在非 corePool 中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

4、如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute()方法调用都是执行步骤 2,而步骤 2 不需要获取全局锁。

关键方法源码分析

我们看看核心方法添加到线程池方法 execute 的源码如下:

//
     //Executes the given task sometime in the future.  The task
     //may execute in a new thread or in an existing pooled thread.
     //
     // If the task cannot be submitted for execution, either because this
     // executor has been shutdown or because its capacity has been reached,
     // the task is handled by the current {@code RejectedExecutionHandler}.
     //
     // @param command the task to execute
     // @throws RejectedExecutionException at discretion of
     //         {@code RejectedExecutionHandler}, if the task
     //         cannot be accepted for execution
     // @throws NullPointerException if {@code command} is null
     //
    public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
        //
         // Proceed in 3 steps:
         //
         // 1. If fewer than corePoolSize threads are running, try to
         // start a new thread with the given command as its first
         // task.  The call to addWorker atomically checks runState and
         // workerCount, and so prevents false alarms that would add
         // threads when it shouldn't, by returning false.
         // 翻译如下:// 判断当前的线程数是否小于 corePoolSize 如果是,使用入参任务通过 addWord 方法创建一个新的线程,// 如果能完成新线程创建 exexute 方法结束,成功提交任务
         // 2. If a task can be successfully queued, then we still need
         // to double-check whether we should have added a thread
         // (because existing ones died since last checking) or that
         // the pool shut down since entry into this method. So we
         // recheck state and if necessary roll back the enqueuing if
         // stopped, or start a new thread if there are none.
         // 翻译如下:// 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次 check,如果状态
         // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池 shutdown 了),非运行状态下当然是需要
         // reject;然后再判断当前线程数是否为 0(有可能这个时候线程数变为了 0),如是,新增一个线程;// 3. If we cannot queue task, then we try to add a new
         // thread.  If it fails, we know we are shut down or saturated
         // and so reject the task.
         // 翻译如下:// 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经 shutdown 或者线程池
         // 已经达到饱和状态,所以 reject 这个他任务
         //
        int c = ctl.get();
        // 工作线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 直接启动新线程,true 表示会再次检查 workerCount 是否小于 corePoolSize
            if (addWorker(command, true))
                return;
            c = ctl.get();}
        // 如果工作线程数大于等于核心线程数
        // 线程的的状态未 RUNNING 并且队列 notfull
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次检查线程的运行状态,如果不是 RUNNING 直接从队列中移除
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                // 移除成功,拒绝该非运行的任务
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 防止了 SHUTDOWN 状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。// 添加一个 null 任务是因为 SHUTDOWN 状态下,线程池不再接受新任务
                addWorker(null, false);
        }
        // 如果队列满了或者是非运行的任务都拒绝执行
        else if (!addWorker(command, false))
            reject(command);
    }

下面我们继续看看 addWorker 是如何实现的:

private boolean addWorker(Runnable firstTask, boolean core) {
        // java 标签
        retry:
        // 死循环
        for (;;) {int c = ctl.get();
            // 获取当前线程状态
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 这个逻辑判断有点绕可以改成 
            // rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
            // 逻辑判断成立可以分为以下几种情况均不接受新任务
            // 1、rs > shutdown:-- 不接受新任务
            // 2、rs >= shutdown && firstTask != null:-- 不接受新任务
            // 3、rs >= shutdown && workQueue.isEmppty:-- 不接受新任务
            // 逻辑判断不成立
            // 1、rs==shutdown&&firstTask != null: 此时不接受新任务,但是仍会执行队列中的任务
            // 2、rs==shotdown&&firstTask == null: 会执行 addWork(null,false)
            //  防止了 SHUTDOWN 状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。//  添加一个 null 任务是因为 SHUTDOWN 状态下,线程池不再接受新任务
            if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
                return false;
            // 死循环
            // 如果线程池状态为 RUNNING 并且队列中还有需要执行的任务
            for (;;) {
                // 获取线程池中线程数量
                int wc = workerCountOf(c);
                // 如果超出容量或者最大线程池容量不在接受新任务
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 线程安全增加工作线程数
                if (compareAndIncrementWorkerCount(c))
                    // 跳出 retry
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果线程池状态发生变化,重新循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 走到这里说明工作线程数增加成功
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    // RUNNING 状态 || SHUTDONW 状态下清理队列中剩余的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 检查线程状态
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将新启动的线程添加到线程池中
                        workers.add(w);
                        // 更新线程池线程数且不超过最大值
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {mainLock.unlock();
                }
                // 启动新添加的线程,这个线程首先执行 firstTask,然后不停的从队列中取任务执行
                if (workerAdded) {
                    // 执行 ThreadPoolExecutor 的 runWoker 方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 线程启动失败,则从 wokers 中移除 w 并递减 wokerCount
            if (! workerStarted)
                // 递减 wokerCount 会触发 tryTerminate 方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker 之后是 runWorker, 第一次启动会执行初始化传进来的任务 firstTask;然后会从 workQueue 中取任务执行,如果队列为空则等待 keepAliveTime 这么长时间

final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 允许中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 如果 getTask 返回 null 那么 getTask 中会将 workerCount 递减,如果异常了这个递减操作会在 processWorkerExit 中处理
            while (task != null || (task = getTask()) != null) {w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {task.run();
                    } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {processWorkerExit(w, completedAbruptly);
        }
    }

我们看下 getTask 是如何执行的

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?
        // 死循环
        retry: for (;;) {
            // 获取线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 1.rs > SHUTDOWN 所以 rs 至少等于 STOP, 这时不再处理队列中的任务
            // 2.rs = SHUTDOWN 所以 rs>=STOP 肯定不成立,这时还需要处理队列中的任务除非队列为空
            // 这两种情况都会返回 null 让 runWoker 退出 while 循环也就是当前线程结束了,所以必须要 decrement
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 递减 workerCount 值
                decrementWorkerCount();
                return null;
            }
            // 标记从队列中取任务时是否设置超时时间
            boolean timed; // Are workers subject to culling?
            // 1.RUNING 状态
            // 2.SHUTDOWN 状态,但队列中还有任务需要执行
            for (;;) {int wc = workerCountOf(c);
                // 1.core thread 允许被超时,那么超过 corePoolSize 的的线程必定有超时
                // 2.allowCoreThreadTimeOut == false && wc >
                // corePoolSize 时,一般都是这种情况,core thread 即使空闲也不会被回收,只要超过的线程才会
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
                // 从 addWorker 可以看到一般 wc 不会大于 maximumPoolSize,所以更关心后面半句的情形:// 1. timedOut == false 第一次执行循环,从队列中取出任务不为 null 方法返回 或者
                // poll 出异常了重试
                // 2.timeOut == true && timed ==
                // false: 看后面的代码 workerQueue.poll 超时时 timeOut 才为 true,// 并且 timed 要为 false,这两个条件相悖不可能同时成立(既然有超时那么 timed 肯定为 true)// 所以超时不会继续执行而是 return null 结束线程。if (wc <= maximumPoolSize && !(timedOut && timed))
                    break;
                // workerCount 递减,结束当前 thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get(); // Re-read ctl
                // 需要重新检查线程池状态,因为上述操作过程中线程池可能被 SHUTDOWN
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
            try {
                // 1. 以指定的超时时间从队列中取任务
                // 2.core thread 没有超时
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 超时
            } catch (InterruptedException retry) {timedOut = false;// 线程被中断重试}
        }
    }

下面我们看下 processWorkerExit 是如何工作的

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 正常的话再 runWorker 的 getTask 方法 workerCount 已经被减一了
        if (completedAbruptly)
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加线程的 completedTasks
            completedTaskCount += w.completedTasks;
            // 从线程池中移除超时或者出现异常的线程
            workers.remove(w);
        } finally {mainLock.unlock();
        }
        // 尝试停止线程池
        tryTerminate();
        int c = ctl.get();
        // runState 为 RUNNING 或 SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 线程不是异常结束
            if (!completedAbruptly) {
                // 线程池最小空闲数,允许 core thread 超时就是 0,否则就是 corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果 min == 0 但是队列不为空要保证有 1 个线程来执行队列中的任务
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 线程池还不为空那就不用担心了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1. 线程异常退出
            // 2. 线程池为空,但是队列中还有任务没执行,看 addWoker 方法对这种情况的处理
            addWorker(null, false);
        }
    }

tryTerminate

processWorkerExit 方法中会尝试调用 tryTerminate 来终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少 wokerCount 或 SHUTDOWN 状态下从队列中移除任务。

final void tryTerminate() {for (;;) {int c = ctl.get();
            // 以下状态直接返回:// 1. 线程池还处于 RUNNING 状态
            // 2.SHUTDOWN 状态但是任务队列非空
            // 3.runState >= TIDYING 线程池已经停止了或在停止了
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;
            // 只能是以下情形会继续下面的逻辑:结束线程池。// 1.SHUTDOWN 状态,这时不再接受新任务而且任务队列也空了
            // 2.STOP 状态,当调用了 shutdownNow 方法
            // workerCount 不为 0 则还不能停止线程池, 而且这时线程都处于空闲等待的状态
            // 需要中断让线程“醒”过来,醒过来的线程才能继续处理 shutdown 的信号。if (workerCountOf(c) != 0) { // Eligible to terminate
                // runWoker 方法中 w.unlock 就是为了可以被中断,getTask 方法也处理了中断。// ONLY_ONE: 这里只需要中断 1 个线程去处理 shutdown 信号就可以了。interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 进入 TIDYING 状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 子类重载:一些资源清理工作
                        terminated();} finally {
                        // TERMINATED 状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 继续 awaitTermination
                        termination.signalAll();}
                    return;
                }
            } finally {mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdown 这个方法会将 runState 置为 SHUTDOWN,会终止所有空闲的线程。shutdownNow 方法将 runState 置为 STOP。和 shutdown 方法的区别,这个方法会终止所有的线程。主要区别在于 shutdown 调用的是 interruptIdleWorkers 这个方法,而 shutdownNow 实际调用的是 Worker 类的 interruptIfStarted 方法:

他们的实现如下:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {checkShutdownAccess();
            // 线程池状态设为 SHUTDOWN,如果已经至少是这个状态那么则直接返回
            advanceRunState(SHUTDOWN);
            // 注意这里是中断所有空闲的线程:runWorker 中等待的线程被中断 → 进入 processWorkerExit →
            // tryTerminate 方法中会保证队列中剩余的任务得到执行。interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();
        }
        tryTerminate();}
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {checkShutdownAccess();
        // STOP 状态:不再接受新任务且不再执行队列中的任务。advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 返回队列中还没有被执行的任务。tasks = drainQueue();}
    finally {mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {for (Worker w : workers) {
            Thread t = w.thread;
            // w.tryLock 能获取到锁,说明该线程没有在运行,因为 runWorker 中执行任务会先 lock,// 因此保证了中断的肯定是空闲的线程。if (!t.isInterrupted() && w.tryLock()) {
                try {t.interrupt();
                } catch (SecurityException ignore) { } finally {w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    }
    finally {mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    // 初始化时 state == -1
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {t.interrupt();
        } catch (SecurityException ignore) {}}
}
线程池的使用
线程池的创建

我们可以通过 ThreadPoolExecutor 来创建一个线程池

/**
     * @param corePoolSize 线程池基本大小,核心线程池大小,活动线程小于 corePoolSize 则直接创建,大于等于则先加到 workQueue 中,* 队列满了才创建新的线程。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,* 等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的 prestartAllCoreThreads()方法,* 线程池会提前创建并启动所有基本线程。* @param maximumPoolSize 最大线程数,超过就 reject;线程池允许创建的最大线程数。如果队列满了,* 并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务
     * @param keepAliveTime
     * 线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率
     * @param unit  线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、* 毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)* @param workQueue 工作队列,线程池中的工作线程都是从这个工作队列源源不断的获取任务进行执行
     */
    public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue) {
        // threadFactory 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }
向线程池提交任务

可以使用两个方法向线程池提交任务,分别为 execute()和 submit()方法。execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以下代码可知 execute()方法输入的任务是一个 Runnable 类的实例。

threadsPool.execute(new Runnable() {
        @Override
        public void run() {}
    });

submit()方法用于提交需要返回值的任务。线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
  try
    {Object s = future.get();
    }catch(InterruptedException e)
    {// 处理中断异常}catch(ExecutionException e)
    {// 处理无法执行任务异常}finally
    {
        // 关闭线程池
        executor.shutdown();}
关闭线程池

可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。

合理的配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

1、任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。

2、任务的优先级:高、中和低。

3、任务的执行时间:长、中和短。

4、任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu+ 1 个线程的线程池。由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2 *Ncpu。混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行

如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU。

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。有时候我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行 SQL 变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。

线程池的监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于 taskCount。
  • largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
  • getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute 和 terminated 方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。

阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配

腾讯云新客低至 82 元 / 年,老客户 99 元 / 年

代金券:在阿里云专用满减优惠券

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2024-07-24发表,共计14497字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中