知识分享-2022-10-31


一、线程池技术

1.使用线程池技术之前

显式地为任务创建线程,为了提供更好的响应性,可以为每个服务请求创建一个新的线程。

class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            new Thread(task).start();
        }
    }
}

优点:

执行任务的负载已经脱离了主线程,这让主循环能够更迅速地重新开始等待下一个连接。这使得程序可以在完成前面的请求之前接受新的请求,从而提高了响应性。

并行处理任务,这使得多个请求可以同时得到服务。如果有多个处理器,程序的吞吐量会得到提高。

缺点:

线程生命周期的开销。线程的创建与关闭不是“免费”的。那么为每个请求创建一个新线程的做法就会消耗大量的计算资源。

资源消耗。活动线程会消耗系统资源,尤其是内存。如果可运行的线程数多于可用的处理器数,线程将会空闲。大量空闲线程占用更多内存,给垃圾回收器带来压力;

稳定性。无限制的创建线程,可能触发收到一个outofMemoryError,导致系统挂掉;

2.使用线程池技术之后

class TaskExecutionWebServer {
    private static final int NTHREADS = 100;
    private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

在线程池中执行任务线程,这种方法有很多“每任务每线程”无法比拟的优势:

重用存在的线程,而不是创建新的线程,这可以在处理多请求时抵消线程创建、消亡产生的开销

另一项额外的好处就是,在请求到达时,工作者线程通常已经存在,用于创建线程的等待时间并不会延迟任务的执行,因此提高了响应性

通过适当地调整线程池的大小,你可以得到足够多的线程以保持处理器忙碌,同时可以还防止过多的线程相互竞争资源,导致应用程序耗尽内存或者失败。

3.详细介绍线程池原理

总体设计****

image-20221104134104629 image-20221104134459027

ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

ExecutorService接口增加了一些能力:

(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;

(2)提供了管控线程池的方法,比如停止线程池的运行。

  • void shutdown():关闭线程池,会等待任务执行完。
  • List<Runnable> shutdownNow():立刻关闭线程池,尝试停止所有正在积极执行的任务,停止等待任务的处理,并返回一个正在等待执行的任务列表(还没有执行的)
  • boolean isShutdown():判断线程池是不是已经关闭,但是可能线程还在执行。
  • boolean isTerminated():在执行 shutdown/shutdownNow 之后,所有的任务已经完成,这个状态就是 true。
  • boolean awaitTermination(long timeout, TimeUnit unit):执行 shutdown 之后,阻塞等到 terminated 状态,除非超时或者被打断。
  • <T> Future<T> submit(Callable<T> task): 提交一个有返回值的任务,并且返回该任务尚未有结果的 Future,调用 future.get()方法,可以返回任务完成的时候的结果。
  • <T> Future<T> submit(Runnable task, T result):提交一个任务,传入返回结果,这个 result 没有什么作用,只是指定类型和一个返回的结果。
  • Future<?> submit(Runnable task): 提交任务,返回 Future
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):批量执行 tasks,获取 Future 的 list,可以批量提交任务。
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit):批量提交任务,并指定超时时间
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks): 阻塞,获取第一个完成任务的结果值,取消剩余未完成的任务
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit):阻塞,获取第一个完成结果的值,指定超时时间

AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。构造函数:

public ThreadPoolExecutor(int corePoolSize,// 核心线程数
                          int maximumPoolSize,// 最大线程数
                          long keepAliveTime,// 非核心线程的存活时间
                          TimeUnit unit,// 时间的单位
                          BlockingQueue<Runnable> workQueue,// 存放任务的队列
                          ThreadFactory threadFactory,// 线程工厂
                          RejectedExecutionHandler handler// 拒绝处理器:如果添加任务失败,将由该处理器处理
                         ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

运行机制

图2 ThreadPoolExecutor运行流程

生命周期

img

生命周期转换如下

图3 线程池生命周期

任务调度详解

首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

其执行流程如下图所示:

图4 任务调度流程

展开说明

  1. 阻塞队列缓存任务

    工作线程从阻塞队列中获取任务。阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:

图5 阻塞队列

  1. 工作线程获取任务

    线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,其执行流程如下图所示:

图6 获取任务流程图
  1. 任务拒绝

    任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略,其特点如下:

    img

4.线程池妙用

Java仅依靠构造函数的不一样就提供了一个灵活的线程池实现和一些有用的预设配置。你可以通过调用Executors 中的某个静态工厂方法就可以创建一个有极具特征的线程池:

newFixedThreadpool 创建一个定长的线程池,等价于new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());每当提交一个任务就创建一个线程,直到达到池的最大长度,这时线程池会保持长度不再变化(如果一个线程由于非预期的Exception而结束,线程池会补充一个新的线程)。

newCachedThreadPool 创建一个可缓存的线程池,等价于new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());,如果当前线程池的长度超过了处理的需要时,它可以灵活地回收空闲的线程,当需求增加时,它可以灵活地添加新的线程,而并不会对池的长度作任何限制。

newSingleThreadExecutor创建一个单线程化的executor,等价于new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())),它只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它。executor会保证任务依照任务队列所规定的顺序(FIFO,LIFO,优先级)。

阿里建议

image-20221104144234071

实战:

/**
 * 隐式同步车辆最新位置线程ID
 */
public static final AtomicInteger SYNCHRONIZE_LOCATION_THREAD_ID = new AtomicInteger(1);

/**
 * 隐式同步车辆最新位置线程池
 */
public static final ExecutorService SYNCHRONIZE_LOCATION_EXECUTOR = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(1000),
        task -> new Thread(null, task, "隐式同步车辆最新位置线程-" + SYNCHRONIZE_LOCATION_THREAD_ID.getAndIncrement(), 0),
        new ThreadPoolExecutor.CallerRunsPolicy());

二、线程同步工具

1.Jvm机制内部锁

Java提供了强制原子性的内置锁机制: synchronized块,一个synchronized 块有两部分:锁对象的引用, 以及这个锁保护的代码块。synchronized 方法的锁,就是该方法所在的对象本身。(静态的synchronized方法从Class对象上获取锁。)

synchronized (lock) {
// 访问或修改被锁保护的共享状态
}

每个 Java 对象都可以隐式地扮演一个用于同步的锁的角色;这些内置的锁被称作内部锁(intrinsic locks)或监视器锁(monitor locks)。执行线程进入 synchronized 块之前会自动获得锁; 而无论通过正常控制路径退出, 还是从块中抛出异常,线程都在放弃对synchronized 块的控制时自动释放锁。 获得内部锁的唯一途径是: 进入这个内部锁保护的同步块或方法。

内部锁在Java中扮演了互斥锁的角色,意味着至多只有一个线程可以拥有锁,当线程 A 尝试请求一个被线程B占有的锁时,线程A必须等待或者阻塞,直到B释放它。如果 B永远不释放锁,A将永远等下去。

同一时间, 只能有一个线程可以运行特定锁保护的代码块,因此,由同一个锁保护的synchronized 块会各自原子地执行, 不会相互干扰。在并发的上下文中, 原子性的含义与它在事务性应用中相同一组语句(statements)作为单独的,不可分割的单元运行。

2.AQS框架

java.util.concurrent.locks.AbstractQueuedSynchronizer
image-20221104151107688 image-20221104150931624

3.CAS机制


文章作者: dhslegen
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dhslegen !
评论
 上一篇
CAS与OAuth2的区别 CAS与OAuth2的区别
CAS与OAuth2的区别CAS的单点登录时保障客户端的用户资源的安全 。 OAuth2则是保障服务端的用户资源的安全 。 CAS客户端要获取的最终信息是,这个用户到底有没有权限访问我(CAS客户端)的资源。 OAuth2获取的最终信息是
下一篇 
Idea 技巧 Idea 技巧
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
2022-01-04
  目录