101-并发编程详解(上篇)

news/2024/4/29 6:54:37/文章来源:https://blog.csdn.net/qq_59609098/article/details/128949843

并发编程详解

在学习之前,如果多线程的理解足够,可以往下学习,否则的话,建议先看看26章博客(只是建议),注意:可能有些字的字体不对,那么一般是复制粘贴来的,但并不影响阅读,忽略这个问题即可
并发编程简介:
这里都只是大致的说明(理论的说明),在后面基本也是这样,你可以选择粗略的看一下即可
java是一个支持多线程的开发语言,多线程可以在包含多个CPU核心的机器上同时处理多个不同的任务,优化资源的使用率,提升程序的效率,在一些对性能要求比较高场合,多线程是java程序调优的重要方面
Java并发编程主要涉及以下几个部分:
1:并发编程三要素
原子性:即一个不可再被分割的颗粒,在Java中原子性指的是一个或多个操作要么全部执行成功要么全部 执行失败
有序性:程序执行的顺序按照代码的先后顺序执行(处理器可能会对指令进行重排序)
可见性:当多个线程访问同一个变量时,如果其中一个线程对其作了修改,其他线程能立即获取到最新的 值
2:线程的五大状态
创建状态:当用 new 操作符创建一个线程的时候
就绪状态:调用 start 方法,处于就绪状态的线程并不一定马上就会执行 run 方法,还需要等待CPU的调度
运行状态:CPU 开始调度线程,并开始执行 run 方法
阻塞状态:线程的执行过程中由于一些原因进入阻塞状态,比如:调用sleep 方法、尝试去得到一个锁等等
死亡状态:run 方法执行完 或者 执行过程中遇到了一个异常
3:悲观锁与乐观锁
悲观锁:每次操作都会加锁,会造成线程阻塞,因为很悲观,即我们通常会认为会使用同一个资源,使得一个线程改变该资源值后,另外一个线程的该资源值没有改变
乐观锁:每次操作不加锁,而是假设没有冲突,而去完成某项操作,如果因为冲突失败就重试,直到成功为止,不会造成线程阻塞
即我们通常认为他们获得的资源是可以得到改变的,但是在特别高的并发下,我们最好还是使用悲观锁(除非操作的对象是唯一的,在77章博客有过说明)),因为这个时候,基本会出现一个线程使得资源值改变,另外一个线程得到的资源值没有改变
4:线程之间的协作
线程间的协作有:wait/notify/notifyAll等
5:synchronized 关键字(一般来说他只需要写在返回类型的前面即可,你可以自己测试,否则会报错)
synchronized是Java中的关键字,是一种同步锁,它修饰的对象有以下几种(具体可以百度查看):
修饰一个代码块:被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码, 作用的对象是调用这个代码块的对象(new 类())
synchronized (new()) {}
修饰一个方法:被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象(即该关键字的参数,一般认为是this)
public synchronized void 方法名称(){}//也可以是这个:
synchronized (this) { //也就是对象(相当于修饰一个代码块)}
修饰一个静态的方法:其作用的范围是整个静态方法,作用的对象是这个类的所有对象(因为没有this)
public static synchronized void syncFunction(){}//相当于是这个
synchronized (当前类.class) { //也就是类(相当于修饰一个类)}
//通常是当前类,即静态方法所在的类
修饰一个类:其作用的范围是synchronized后面括号括起来的部分,作用的对象是这个类的所有对象(也就是代码块中的参数是类而已,比如类.class)
synchronized (.class) {}
6:CAS
CAS全称是Compare And Swap,即比较替换,是实现并发应用到的一种技术
操作包含三个操作数:内存位置(V)、预期原值(A)和新值(B)
如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值,否则,处理器不做任何操作
CAS存在三大问题:ABA问题,循环时间长开销大,以及只能保证一个共享变量的原子操作,一般乐观锁除了使用版本操作外(77章博客有使用),一般也会使用CAS技术来实现(可以认为是CAS算法)
7:线程池
如果我们使用线程的时候就去创建一个线程,虽然简单,但是存在很大的问题,如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率, 因为频繁创建线程和销毁线程需要时间(也就需要资源,任何事情基本都需要资源,就如单纯的赋值,也需要开辟栈的内存空间,几内存资源,只是很少而已),线程池通过复用可以大大减少线程频繁创建与销毁带来的性能上的损耗
第一部分:多线程&并发设计原理
多线程回顾 :
Thread和Runnable:
Java中创建执行线程有两种方法:
扩展Thread 类
实现Runnable 接口
扩展Thread类的方式创建新线程:
package com;/****/
public class MyThread extends Thread {@Overridepublic void run() {//static Thread currentThread(),获取当前正在执行线程的引用,也就相当于Thread的引用,比如下面的m//他本身也是this来调用的,使得操作获取自己//String getName(),获取调用对象所表示线程的名称System.out.println(Thread.currentThread().getName() + "运行了");//System.out.println(Thread.currentThread());与System.out.println(this);是一样的,因为都代表对应引用,比如下面的m对应的对象,换言之必定是当前类对象,虽然引用可能是当前类或者其父类,但最终的操作是当前类的对象而已,所以实际上this,必然是当前类的对象的,所以在自身方法里面操作时,默认的this必然就是当前类的对象//System.out.println(this);,如果是22,那么22抢到,那么首先是22在前面,打印就知道了System.out.println(getName()); //当前到run的引用就是Thread.currentThread(),所以可以直接的输出try {Thread.sleep(800);} catch (InterruptedException e) {e.printStackTrace();}}public MyThread() {}public MyThread(String name) {super(name);}public static void main(String[] args) {MyThread m = new MyThread();m.start();//都是操作对象MyThread,所以子类当引用也好,还是父类当引用也好,都行Thread mm = new MyThread("22"); //设置名称mm.start();//System.out.println(getName());System.out.println(Thread.currentThread().getName()); //main//为什么这里不能操作getName(),因为这里不能有this,那么我们需要使用Thread.currentThread()来拿取其线程对象(排在前面就是当前线程),在这里,那么自然就是main,看前面打印就知道了,即System.out.println(Thread.currentThread());,这里也可以进行打印,一般在这里代表Thread[main,5,main],其中第二个5是优先级,第三个main是主线程(一般代表线程所属线程组),第一个main是当前线程(即该线程进行操作,通常代码线程名称,即getName的结果)//即是这样的:[线程名称, 线程优先级, 线程所属线程组(一般是main,即主线程)],具体如何操作线程组,可以百度查看,这里就不多说了}
}
实现Runnable接口的方式创建线程:
package com;/****/
public class MyRunnable implements Runnable {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "运行了");try {Thread.sleep(800);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {Thread thread = new Thread(new MyRunnable());thread.start();}
}
Java中的线程:特征和状态 :
1:所有的Java 程序,不论并发与否,都有一个名为主线程的Thread 对象,执行该程序时, Java 虚拟机( JVM )将创建一个新Thread 并在该线程中执行main()方法,这是非并发应用程序中唯一的线程,也是并发应用程序中的第一个线程
2:Java中的线程共享应用程序中的所有资源,包括内存和打开的文件,快速而简单地共享信息,但是基本必须使用同步避免数据竞争
3:Java中的所有线程都有一个优先级,这个整数值介于Thread.MIN_PRIORITY(1)和Thread.MAX_PRIORITY(10)之间,默认优先级是Thread.NORM_PRIORITY(5),线程的执行顺序并没有保证,通常,较高优先级的线程将在较低优先级的钱程之前执行(即优先级越高的线程不一定先执行,但该线程获取到时间片的机会会更多一些(一般总体时间更长))
4:在Java 中,可以创建两种线程
守护线程和非守护线程
区别在于它们如何影响程序的结束
Java程序结束执行过程的情形:
(1):程序执行Runtime类的exit()方法, 而且用户有权执行该方法(执行Runtime.getRuntime().exit(0);即可,他与System.exit(0)基本一样
System.exit(0)和Runtime.getRuntime().exit(0)的主要区别是前者会结束当前Java虚拟机,而后者仅仅会结束当前Java应用程序,即System.exit(0)会关闭打开的资源,而Runtime.getRuntime().exit(0)则不会,他只是关闭Java应用程序,即关闭Java程序,实际上我们手动的关闭程序是也是他,一般必然是先关闭程序才关闭虚拟机的,但是他也只是操作后,没有进行关闭资源,而他的后续操作还是关闭了,只是分开了而已,所以说,最后还是会关闭资源,所以手动的关闭Java程序,资源还是关闭的,相当于我们打印两个数,一个是1,一个是2,其中System.exit(0)一起打印,而Runtime.getRuntime().exit(0)只打印1,然后判断,如果没有打印2,那么会打印2,所以他们最终都打印了2,即都关闭了资源,只是对于他们的本身的操作来说,一个是关闭资源的,另外一个没有关闭资源)
(2):应用程序的所有非守护线程均已结束执行,那么对应的守护线程都会结束,无论是否有正在运行的守护线程,即他们都会直接结束执行,一般main方法是非守护线程,因为我们默认是非守护线程的(默认不是守护线程,所以守护线程基本必然是非守护线程里面创建的,虽然他们互不影响,但是守护线程里面不能创建非守护线程,若有对应的代码操作,如start,那么在操作后,当前的守护线程直接关闭)
这里说明一下为什么所有非守护线程结束后,无论是否有正在运行的守护线程,他们都会结束执行:
可以这样的理解(第一种):守护线程是守护所有非守护线程的,那么当非守护线程关闭后,对应的守护线程没有守护目标了,自然就会关闭
也可以这样理解(第二种):因为main是主线程,那么所有的非守护线程与他有关,使得有联系,那么对应的非守护线程创建的守护线程必然守护所有非守护线程(因为联系)
或者也能这样理解(第三种):在main主线程下,若有非守护线程,那么他并不会真正的关闭(比如设置标志,只有自己创建的线程都执行完毕,才会操作最后一步的关闭,而这个不关闭,会导致守护线程会执行),且守护线程只操作创建他的非守护线程(会追溯到最顶端),也就是若main没有运行完,那么不会结束,但是由于其他非守护线程没有结束,那么main就不会真正的结束,使得守护线程必须让所有非守护线程关闭才会进行关闭
这里我们认为是第三种理解
守护线程通常用在作为垃圾收集器或缓存管理器的应用程序中,执行辅助任务,在线程start之前调用
isDaemon()方法检查线程是否为守护线程,也可以使用setDaemon()方法将某个线程确立为守护线程
最后,我们统称线程默认为非守护线程
5:Thread.States类中定义线程的状态如下:
NEW(new):Thread对象已经创建,但是还没有开始执行
RUNNABLE:Thread对象正在Java虚拟机中运行(一般没有等待锁定的情况就是这个,比如BLOCKED之后一般就是这个)
BLOCKED:Thread对象正在等待锁定(比如他在等待synchronized 释放锁,即等待该锁定释放,因为我需要得到该锁,然后操作)
WAITING(类似于wait):Thread 对象正在等待另一个线程的动作
TIME_WAITING(类似于wait操作时间):Thread对象正在等待另一个线程的操作,但是有时间限制
TERMINATED:Thread对象已经完成了执行(此时它可能处于等待状态或者可以被垃圾回收,即等待垃圾回收)
getState()方法获取Thread对象的状态(上面的状态,通常是上面的6种状态),通常可以直接更改线程的状态
在给定时间内, 线程只能处于一个状态,这些状态是JVM使用的状态,不能映射到操作系统的线程状态
线程状态的源码位置(在Thread类里面,是一个枚举类):

在这里插入图片描述

Thread类和Runnable 接口:
Runnable接口只定义了一种方法:run()方法,这是每个线程的主方法,当执行start()方法启动新线程时,它的操作会将调用run()方法
Thread类其他常用方法:
1:获取和设置Thread对象信息的方法:
getId():该方法返回Thread对象的标识符,该标识符是在钱程创建时分配的一个正整数,在线 程的整个生命周期中是唯一且无法改变的
getName()/setName():这两种方法允许你获取或设置Thread对象的名称,这个名称是一个String对象,也可以在Thread类的构造函数中建立
getPriority()/setPriority():你可以使用这两种方法来获取或设置Thread对象的优先级
isDaemon()/setDaemon():这两种方法允许你获取或建立Thread对象的守护条件
getState():该方法返回Thread对象的状态
2:interrupt():中断目标线程,给目标线程发送一个中断信号,线程被打上中断标记(在后面会进行学习的,即对应变成true,而不是真的中断线程,只是使得某个值由false变成true而已,虽然基本该变量基本由native类型的方法返回或者设置)
3:interrupted():判断目标线程是否被中断(中断了返回true,否则返回false),但是将会清除线程的中断标记(使得设置为false),之后线程会从上次被中断的地方开始执行
4:isinterrupted():判断目标线程是否被中断(如果是中断,即被中断了,那么该方法返回true,否则返回false),不会清除中断标记
注意:线程是默认没有中断的,也就是false
5:sleep(long ms):该方法将线程的执行暂停ms时间
join():暂停线程的执行,直到调用该方法的线程执行结束为止,可以使用该方法等待另一个Thread对象结束
6:setUncaughtExceptionHandler():当线程执行出现未校验异常时,会调用该方法定义的方法,该方法用于建立未校验异常的控制器,比如:
Thread.setDefaultUncaughtExceptionHandler((t, e) -> { //处理代码 
});
//这个了解即可,你可以随便在一个方法里操作,比如:
package com;/****/
public class a extends Thread {public void run() {System.out.println(1);int i = 1 / 0; //这里可以选中删除和不删除来进行测试}public static void main(String[] args) {Thread.setDefaultUncaughtExceptionHandler((t, e) -> {System.out.println(2);});Thread tt = new a();tt.start();Thread.setDefaultUncaughtExceptionHandler((t, e) -> {System.out.println(3); //后设置的进行覆盖});}//导致,不会出现打印对应的异常信息,而是打印(操作)里面的处理信息
}
7:currentThread():Thread类的静态方法,返回实际执行该代码的Thread对象
join示例程序:
package com;/****/
public class MyThread1 extends Thread {public void run() {for (int i = 0; i < 10; i++) {System.out.println("MyThread线程:" + i);}}public static void main(String[] args) throws InterruptedException {MyThread1 myThread = new MyThread1();myThread.start();myThread.join();System.out.println("main线程 - 执行完成");}}
Callable:
Callable 接口是一个与Runnable 接口非常相似的接口,Callable 接口的主要特征如下:
接口,有简单类型参数,与call()方法的返回类型相对应
声明了call()方法,执行器运行任务时,该方法会被执行器执行,它必须返回声明中指定类型的对象,即主要的与Runnable 的区别就是有返回值
call()方法可以抛出任何一种校验异常,可以实现自己的执行器并重载afterExecute()方法来处理这些异常
示例程序:
package com;import java.util.concurrent.Callable;/****/
public class MyCallable implements Callable<String> {@Overridepublic String call() throws Exception {Thread.sleep(5000);return "hello world call() invoked!";}
}
package com;import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;/****/
public class Main {public static void main(String[] args) throws ExecutionException, InterruptedException {MyCallable myCallable = new MyCallable();// 设置Callable对象,泛型表示Callable的返回类型(因为参数就是类的泛型,所以要一致)FutureTask<String> futureTask = new FutureTask<String>(myCallable);// 启动处理线程,当然,因为Callable与run基本一样,所以也会操作对应的方法,即最终也会执行,即也会看是否有对应的方法而已,实际上还是run方法,只是该run方法在FutureTask里面被重写了,使得(最终)会导致执行call方法new Thread(futureTask).start();// 同步等待线程运行的结果,相当于join方法,但这里可以得到一个返回值String result = futureTask.get();// 5s后得到结果System.out.println(result);}}
再来一个示例程序(使用了前面的MyCallable类):
package com;import java.util.concurrent.*;/****/
public class Main2 {public static void main(String[] args) throws ExecutionException, InterruptedException {//第一个5代表:5个线程,他操作线程池,即corePoolSize(核心线程池大小),他是不会被回收的,即可以认为最少要有5个线程可以操作//第二个5代表:maximumPoolSize(最大线程池大小),大于该数量的线程会进行排队等待//也就是说,线程可以放入多个,因为除了上面的5个外(核心,一般也是手动添加的),我们可以继续的手动的添加线程(注意:我们只是给出任务,是线程池自己创建操作线程的)//第三个1代表:keepAliveTime(存活时间),如果超过了上面的最多线程池大小,那么超过的,如果没有在1秒内进行操作,那么操作回收,除非到达了最小线程//第四个TimeUnit.SECONDS代表:时间单位,这里是秒,结合前面的存活时间,那么就是1秒钟//第五个代表:工作队列,ArrayBlockingQueue是一个有界阻塞队列(后面也会说明的),它使用一个固定大小的数组来实现先进先出(FIFO)的存取策略//当该队列满时,新加入的元素将会阻塞,直到有空间可用,这里定义10个长度的数组ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) {protected void afterExecute(Runnable r, Throwable t) {//call方法运行完毕后,无论是否有错误,基本都会执行这个地方,即则可以在此处进行处理
//                super.afterExecute(r, t);System.out.println(r); //这个r就是错误信息(如果没有错误,自然没有错误信息打印出现,只有基本的固定信息打印)System.out.println("任务执行完毕" + t); //这个t代表一般代表null}};//也就是说,如果出现异常,那么对应的线程会进行操作,然后操作队列再执行(前提是存活),当然,由于可以手动加上线程,自然可以超过10长度的队列//executor.submit给线程池提交任务,因为一般线程池只是定义界限,他本身需要我们自己添加任务的Future<String> future = executor.submit(new MyCallable());//因为线程池自己进行操作线程,所以我们直接的等待即可,不用我们创建线程执行了,那么有个问题,如果我们手动的操作线程,会执行吗,答:不会执行,即相同对象的call方法不会与相同对象的run一样(比如多次的执行start())的报错,而是之后的start()不执行,自己测试就知道了String s = future.get();System.out.println(s);executor.shutdown();}}//注意:线程池与连接池(比如数据库连接池,只要是连接的池,我们都称为连接池,比如数据库连接池,redis的资源(连接)池等等)不同
//线程池需要我们给任务,本身并没有线程在操作
//而连接池可以自己本身存在连接来使用
//他们都可以操作添加线程(连接)或者移除线程(连接)
synchronized关键字 :
锁的对象 :
synchronized关键字"给某个对象加锁",示例代码:
public Class MyClass {public void synchronized method1() {// ...}public static void synchronized method2() {// ...}
}
等价于:
public class MyClass {public void method1() {synchronized(this) {// ...,这个内容是前面的内容,即将整个内容包括起来}}public static void method2() {synchronized(MyClass.class) {// ...}}
}
实例方法的锁加在对象myClass(假设this是MyClass对象的引用变量myClass)上,静态方法的锁加在MyClass.class上
即我们一般将synchronized的参数称为锁,操作相同该值时,需要给该锁释放才可以操作,如果是不同的值,那么互不影响,可以直接操作,因为对应的不同的值是没有加锁的
锁的本质:
如果一份资源需要多个线程同时访问,需要给该资源加锁,加锁之后,可以保证同一时间只能有一个线程访问该资源,资源可以是一个变量、一个对象或一个文件等等数据

在这里插入图片描述

锁是一个"对象",作用如下:
1:这个对象内部得有一个标志位(state变量),记录自己有没有被某个线程占用,最简单的情况是这个state有0、1两个取值
0表示没有线程占用这个锁,1表示有某个线程占用了这个锁
2:如果这个对象被某个线程占用,记录这个线程的thread ID(简称线程ID)
3:这个对象维护一个thread id list(线程列表),记录其他所有阻塞的、等待获取拿这个锁的线程,在当前线程释放锁之后从这个thread id list里面取一个线程唤醒
要访问的共享资源本身也是一个对象,例如前面的类MyClass里也创建一个对象(将该对象设置为锁),对于this(上面说明的引用)来说,这两个对象可以合成(认为是)一个对象(即this可以代表this和创建的对象,因为this包括他这个对象,但他们也不是同一个对象哦,这里只是说明结合而已,所以使用时,他们是互不影响的,即其中一个操作,另外一个不会等待,直接操作,相同的对象则会等待)
即代码就变成synchronized(this) {…}(因为他里面的资源可能也是锁),即要访问的共享资源是对象a,锁加在对象a上
当然,也可以另外新建一个对象,代码变成synchronized(obj1) {…},这个时候,访问的共享资源是对象a,而锁加在新建的对象obj1上(可以也是a)
即资源和锁可以合二为一,使得在Java里面,synchronized关键字可以加在任何对象的成员上面,这意味着,这个对象既是共享资源,同时也具备"锁"的功能,当然他既然是锁(在synchronized操作),那么他也就相当于只能由一个线程来操作,所以他本身也是操作了锁内部的作用,也具备唯一性,即相当于synchronized对他对应的参数,加上了锁(虽然他的参数就是锁),使得看起来后面的所有内容也加上了锁(实际上并没有,只是进入需要得到锁而已,即占用他,因为该参数我们也认为是锁),而该锁只能由一个线程占用,所以导致synchronized后面只能进入一个线程,其他线程等待,使得内容只能是一个线程访问,即synchronized就实现了对应锁的功能
简单来说:我们就是对一个数据加上锁的限制,我们称这个数据为锁(但他也是数据),得到锁,就是占用他或者说线程给他加上锁,即相当于state变成1,然后可以访问里面的内容,其他线程等待,这就是锁的本质,即我们判断一个资源(锁)的状态而已(在80章博客也有过说明)
最后要注意:当synchronized对对象操作锁后,其标志始终存在,所以可以导致相同对象可以放在另外的一个类里面进行操作,比如26章博客中"为了确保两个线程共用同一个仓库",这个地方就进行了说明
实现原理:
锁如何实现?
在对象头里,有一块数据叫Mark Word,在64位机器上,Mark Word是8字节(64位)的
这64位中有2个重要字段(是重要的字段,自然可能包括其他,即不那么重要的字段,比如上面的(线程列表):锁标志位和占用该锁的thread ID,因为不同版本的JVM实现,对象头的数据结构会有各种差异
wait与notify:
生产者−消费者模型:
生产者-消费者模型是一个常见的多线程编程模型,如下图所示:

在这里插入图片描述

一个内存队列,多个生产者线程往内存队列中放数据,多个消费者线程从内存队列中取数据,要实现这样一个 编程模型,需要做下面几件事情:
1:内存队列本身要加锁,才能实现线程安全
2:阻塞,当内存队列满了,生产者放不进去时,会被阻塞,当内存队列是空的时候,消费者无事可做,会被阻塞
3:双向通知,消费者被阻塞之后,生产者放入新数据,要notify()消费者,反之,生产者被阻塞之后,消费者消费了数据,要notify()生产者
第1件事情必须要做,第2件和第3件事情不一定要做,例如,可以采取一个简单的办法,生产者放不进去之 后,睡眠几百毫秒再重试,消费者取不到数据之后,睡眠几百毫秒再重试,因为在这些时间下,对方必然会进行操作完毕,因为程序是很快的,但这个办法效率低下,也不实时操作,因为如果操作了,那么对应的,可能已经生产或者消费完毕了,你还在睡眠,并且等待时间长效率更低,等待时间短,可能还没有开始消费多少就生产等待了,使得有些数据一直存在(没有被消费)
所以,我们只讨论如何阻塞,如何通知的问题
如何阻塞:
办法1:线程自己阻塞自己,也就是生产者线程和消费者线程各自调用wait()或者notify()
办法2:用一个阻塞队列,当取不到或者放不进去数据的时候,入队/出队函数本身就是阻塞的(比如在RabbitMQ中的队列就可以操作,比如我们在81章博客中操作过限流处理)
如何双向通知:
办法1:wait()与notify()机制
办法2:Condition机制
接下来我们来操作一个简单的生产者和消费者模型:
具体代码如下:
先定义共同资源:
package com.My;/****/
public class MyQueue {private String[] data = new String[10];private int getIndex = 0;private int putIndex = 0;private int size = 0;//操作生产的方法public synchronized void put(String element) { //加上了锁if (size == data.length) {//只要满了就进行阻塞try {wait();} catch (InterruptedException e) {e.printStackTrace();}}//没有满或者消费了,那么操作如下:data[putIndex] = element;++putIndex; //定义下标位置if (putIndex == data.length) putIndex = 0; //只要是最后一个,那么我们重新开始,使得我们生产的基本不会是同一个,即循环消费++size; //添加当前的数量,因为我们生产一个,自然需要增加1notify(); //让消费者进行消费}public synchronized String get() {//如果没有,那么阻塞if (size == 0) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}//只要有数据或者生产了,那么操作这里String result = data[getIndex];++getIndex; //与前面的生产是同步的,导致可以跟上进行消费,且互不干扰if (getIndex == data.length) getIndex = 0;--size;notify(); //我们一般会把notify();放在wait前面,使得并没有必要先消费才提醒生产,当然,放在这里也是可以的,并没有什么要求,因为无论放在那里,他们都能进行解决该需求,所以位置基本没有什么要求,因为这里是操作单个消费者的,因为先后的原因,你操作消费者时,下一个基本是生产者,而不会是消费者//你可能会有疑问:如果cpu一开始操作消费者会多次的执行吗,实际上基本不会,因为阻塞的原因他会使得该线程卡住在这里,这时,只能是其他的线程进入,而不是自己再次的进入(只能让其他线程唤醒自己),而其他的线程,在这里那么只能是生产者了return result;}}
定义生产者(注意:里面的注释可以跳过,因为并不需要了解):
package com.My;import java.util.Random;/****/
//生产者
public class ProducerThread extends Thread {private final MyQueue myQueue;private final Random random = new Random();private int index = 0;public ProducerThread(MyQueue myQueue) {this.myQueue = myQueue;}@Override //该注解通常用于重写父类的方法(没有父类也会报错),以确保子类的实现与父类的方法一致(如果不是他会报错出现异常,在运行时出现的,即编译的异常,而不是运行的异常,只是需要我们运行他才会出现而已),它可以帮助开发者准确地确定重写的方法,并帮助校验源代码的正确性public void run() {while (true) {String tmp = "ele-" + index; //因为index是当前对象的,所以在后面的多个线程操作时,是会出现多个相同的值的,但是对应存放的数组位置是不同的哦(不要以为是相同的)myQueue.put(tmp); //进行生产//因为打印或者其他操作需要时间,所以导致生产并没有先生产很多//但是生产还是最先的(这里是解释26章博客中生产会操作很多次的"哪个地方")//你可以试着将这里的代码删除(System.out.println("添加元素:" + tmp);即这个),在对应的put方法里面打印1,就会发现1会出现很多次,而加回来,又变成少次数了,即的确也解释了"哪个地方"//但他之后基本趋于平衡,因为虽然wait需要时间(解除阻塞,在26章有过说明),也包含消费者外面的时间,但是其中其他的操作也需要时间(很多个赋值等等),所以趋于平衡,只是一开始有先抢占cpu的时间而已,且由于消费者外面的时间没有初始化,基本(远)大于其他操作时间,所以可以打印多次,而这里的打印与消费者那里的时间差不多(因为他并不是存在与锁里面的,即会影响抢占调用方法),所以原来一开始就是趋于平衡的,里面的时间和外面的时间可以来与解除阻塞平衡以及一开始的cpu时间进行联系,当然,只是删除这里,那么由于消费者那里也有打印,所以一开始是不会趋于平衡的,因为他需要更多时间,更何况我们加上了cpu多出时间了,之后,对方初始化了,那么由于内部的时间,导致平衡,当消费者那里删除后,那么由于本身的时间(内部的时间),过大,所以一开始就会趋于平衡,因为比解除阻塞要大(也包含cpu时间)//即这里出现的时间有,解除阻塞,内部时间(有内部操作),cpu时间(一开始先执行的线程,多出的时间),外部时间,导致打印的信息千变万化//现在我们来进行说明://单纯的解除阻塞(内部基本没有操作,无内部操作或者少内部操作我们通常认为是0或者说远远小于1,比如0.1,而获取锁,我们也认为是远远小于1,比如0.1,所以可以打印多次),导致双方打印多次,然后加上cpu,任然一开始就打印多次(就如3>2,4还是大于2),若单纯的加上了内部,那么阻塞有时间操作,趋于平衡(一开始),加上cpu,由于内部时间还是多的,所以趋于平衡(一开始),然后对方(消费者)外面加上时间(对应的打印可以说是比普通的打印需要更多的时间,当然,只是一开始需要初始化而已,即会有缓存的,因为有对应的变量),然后我加上cpu时间,一上一下,我出现的多很多了,但一开始是这里(本身不是对方)操作打印的,然后我加上cpu时间,由于抵消了我多余的时间,那么我操作时一开始就趋于平衡(前提是有变量的打印,否则还是与上面的一样的出现多很多),之所以说趋于平衡,因为在极端情况下,可能其中一个线程始终浮动范围很大,比如快了点,那么累积下来,就可能存在执行多一次(或者多次,一般只是多一次)//用数字表示就是,我们将解除阻塞称为1,内部时间称为5,cpu时间称为3,外部时间称为100(即有变量的打印),初始化后或者普通的打印,称为1//即单纯的阻塞,由于是1,那么若双方没有内部,那么这个1可以实现打印多次,若有cpu存在,那么自然就是4,也会打印多次,当然,就算是1,也会到达顶端//有内部,当外面进行操作时,由于需要5,那么解除阻塞的1自然会解除,使得准备占领,所以我操作完,对方就会操作,趋于平衡,而由于cpu是3,3+1<5,所以还是趋于平衡//若对方加上外面的时间,由于对方的时间过于大(加上cpu也是一样也是大的,反正很大),所以我会执行很多次,他才会进行抢占,当然,他会先操作可以进入(即第一次的抢占),因为还没有执行外面的打印,然后由于初始化完毕,那么他的这个100变成了1,自然是小于内部的,所以外面内部执行时,他自然操作了抢占,所以趋于平衡//当然,对于这样的说明,只要了解即可,因为打印前打印后,与实际业务并没有什么联系,所以可以不用管就行,只需要完成具体需求即可,所以从这里开始,在后面我就不做这样的打印说明了System.out.println("添加元素:" + tmp);index++;try {Thread.sleep(random.nextInt(1000)); //停止一定的时间} catch (InterruptedException e) {e.printStackTrace();}}}
}
定义消费者:
package com.My;import java.util.Random;/****/
public class ConsumerThread extends Thread{private final MyQueue myQueue;private final Random random = new Random();public ConsumerThread(MyQueue myQueue) {this.myQueue = myQueue;}@Overridepublic void run() {while (true) {String s = myQueue.get();System.out.println("\t\t消费元素:" + s);try {Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}
进行操作:
package com.My;/****/
public class Main {public static void main(String[] args) {MyQueue myQueue = new MyQueue();ProducerThread producerThread = new ProducerThread(myQueue);ConsumerThread consumerThread = new ConsumerThread(myQueue);//操作同一个new MyQueue(),那么就是同一个锁了(在这里是,因为是this)producerThread.start();consumerThread.start();}
}//最后注意:synchronized相关之间的获取锁,并不是CPU的资源获取,他们只是谁先到谁先操作,即有顺序的,即通常采用 "先到先得法"来决定谁会获得锁,也就是说最先进入代码同步块的线程最有可能抢占锁
//但他们可能会受某些实际的影响导致,比如wait的原因,使得notify让可以多个线程同时操作该加锁的资源,其中一个线程只能操作wait后面的(他是被唤醒的那个线程),那么可以因为在这个时间上(也就是wait的时间,实际上就是唤醒时间,这里补充以下:一般我们说的唤醒时间是,他在"可以进入"到"完全唤醒"之间的总时间,主要是"可以进入"解除阻塞的时间,在26章博客有说明"可以进入"到"完全唤醒"之间的范围),使得另外一个线程操作多次(在"可以进入"到"完全唤醒"之间进行操作的多次,完全唤醒后,该线程要开始操作了),前面已经说明过了,在"你可以试着将这里的代码删除"这里,当然也说明了"在后面我就不做这样的打印说明了"
多个生产者多个消费者的情形:
package com.My;/****/
public class MyQueue2 extends MyQueue {private String[] data = new String[10];private int getIndex = 0;private int putIndex = 0;private int size = 0;@Overridepublic synchronized void put(String element) {if (size == data.length) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}put(element);//重新调用自身来使得可以操作,当然也可以不这样操作,因为这里是加了锁的,但我们也不建议,因为他并没有操作判断(虽然因为锁的存在不会出现问题),但是若操作了其他的方法(我们也能称为其他部分的消费者,因为并不是只有该一个业务,且加了锁的,因为只有锁,wait基本才会操作)那么如果他直接的往后面运行,那么会出现数据不对的问题,比如假设,生产者生产后,其他的线程进入消费掉,然后唤醒,若这个唤醒是wait(其他消费者进行将我们唤醒)的,那么由于之前的已经操作了一次消费,那么这里还会进行消费(没有重新抢夺锁的情况下),若这是最后一个,那么可能会出现数据不对的情况(对应的size从0变成-1,或者得到的值是不对的,因为对应消费后,并没有设置为空置,比如null,但是我们通常会解决这样的情况,比如这里就有建议,看后面的说明,所以一般这里不操作null,使得代码少点,那么效率高点),所以这里建议(上面的"看后面的说明")他继续去进行抢夺锁或者重新等待(主要是重新等待,因为加上了锁,自然可以不这样操作,也就不用重新抢夺锁的,而这里之所以加上是保证进行判断,虽然可以不用),所以在多个消费者中或者业务中,需要防止其他操作进行唤醒的wait,这里在后面会进行说明} else {put0(element);notify();}}private void put0(String element) {data[putIndex] = element;++putIndex;if (putIndex == data.length) putIndex = 0;++size;}@Overridepublic synchronized String get() {if (size == 0) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}return get();} else {String result = get0();notify();return result;}}private String get0() {String result = data[getIndex];++getIndex;if (getIndex == data.length) getIndex = 0;--size;return result;}}
package com.My;/****/
public class Main2 {public static void main(String[] args) {MyQueue2 myQueue = new MyQueue2();for (int i = 0; i < 3; i++) {new ConsumerThread(myQueue).start();}for (int i = 0; i < 5; i++) {new ProducerThread(myQueue).start();}try {//等待10秒Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.exit(0); //自动退出,就不用你自己去手动的关闭了}}
为什么必须和synchronized一起使用 :
在Java里面,wait()和notify()是Object的成员函数,是基础中的基础,为什么Java要把wait()和notify()放在如此基础的类里面,而不是作为像Thread一类的成员函数,或者其他类的成员函数呢:
先看为什么wait()和notify()必须和synchronized一起使用,在这之前,请看下面的代码:
public class MyClass1 {private Object obj1 = new Object();public void method1() {synchronized(obj1) {//...obj1.wait();//...}}public void method2() {synchronized(obj1) {//...obj1.notify();//...}}
}
或者下面的代码:
public class MyClass1 {public void synchronized method1() {//...this.wait();//...}public void synchronized method2() {//...this.notify();//...}
}
然后,开两个线程,线程A调用method1(),线程B调用method2()
答案已经很明显:两个线程之间要通信, 对于同一个对象来说,一个线程调用该对象的wait(),另一个线程调用该对象的notify(),该对象本身就需要同步,所以,在调用wait()、notify()之前,要先通过synchronized关键字同步给对象,也就是给该对象加锁,很明显,对应的wait和notify也是根据锁对象来互通的,如果是不同的锁,自然不会互相操作,以后的Lock也是如此,如果说还是互通的,那么是错误的(这里要注意)
简单来说,wait和notify需要synchronized对应参数来调用,也能够说明他们只能是锁用来调用的,经过测试,如果没有设置成锁,那么是不能调用他们的,如果调用也就会报错,并会使得线程直接退出而不执行,但是也会看错误的捕获,比如InterruptedException会使得他的线程进行直接退出(notify不用捕获,他使用InterruptedException是检查报错的,因为他没有抛出),而Exception会使得他们进行继续操作,就单纯的报错而已(那么自然对应的如wait不会进行阻塞,但不会起作用,因为报错了)
synchronized关键字可以加在任何对象的实例方法上面,任何对象都可能成为锁,因此,为了进行统一,所以wait()和notify()只能放在Object里面了,这样,无论你的参数是什么类型,都可以直接的调用wait或者notify(),而不用考虑是否存在他们两个方法了
所以基本只有synchronized设置的锁,对应的wait和notify才可进行操作,即他们基本只能操作锁(一般只能是synchronized锁,其他锁可能很难操作,如Lock锁,具体实现方式可以百度),否则会报错的
为什么wait()的时候必须释放锁:
当线程A进入synchronized(obj1)中之后,也就是对obj1上了锁,此时,调用wait()进入阻塞状态,一直不能退出synchronized代码块
那么,线程B永远无法进入synchronized(obj1)同步块里,永远没有机会调用notify(),发生死锁,这就涉及一个关键的问题:在wait()的内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程 用notify()唤醒,重新获取锁(这里就会使得两个线程操作,我是这样的认为,即都进行格外的获得),其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出synchronized同步块,再次释放锁(他的格外)
wait()内部的伪代码如下:
wait() {// 释放锁// 阻塞,等待被其他线程notify// 重新获取锁(在对应的wait的位置来进行获取操作),在重新获得锁之前,我们需要解除阻塞,也就是我们之前说明的解除阻塞的地方(即这里需要时间),在后面的多线程设计模式中,我们也会操作一个加锁的案例,即读写锁的案例(先放在这里说明,在后面会知道的),注意:wait最终是调用native修饰的方法的哦,这里了解即可//当然,虽然他不是在synchronized那里进行获取,但是他们是相通的,因为该wait就是对应锁来调用的,所以可以进行对应的相通操作,而获得对应的锁,即也的确合理,即并不是非要在synchronized这里进行获得锁,他只是来定义锁的,而不是操作获得锁的操作而已,只是他的参数是锁,所以他也顺便进行了操作锁的获得的操作,并不妨碍其他获得锁的操作,就如Lock类似}
如此则可以避免死锁
wait()与notify()的问题:
以上述的生产者-消费者模型来看,其伪代码大致如下:
public void enqueue() {synchronized(queue) {while (queue.full()) {  //满了才进行阻塞queue.wait();}//... 数据入列(中间代码认为省略)queue.notify(); // 通知消费者,队列中有数据了}
}public void dequeue() {synchronized(queue) {while (queue.empty()) { //没有才进行阻塞queue.wait();}// 数据出队列(中间代码认为省略)queue.notify(); // 通知生产者,队列中有空间了,可以继续放数据了}
}
在前面的多个消费者和多个生产者中,我们给出了疑问:“这里在后面会进行说明”
即生产者在通知消费者的同时,也可以通知其他的生产者,消费者在通知生产者的同时,也可以通知其他消费者
原因在于wait()和notify()所作用的对象和synchronized所作用的对象是同一个,只能有一个对象,无法区分队列空和队列满两个条件是唤醒消费者还是生产者(在多个消费者那里也说明过,那里就建议继续抢夺),因为我们都可以进行唤醒,这也正是Condition要解决的问题,在后面会说明,这里先进行了解
InterruptedException与interrupt()方法 :
Interrupted异常:
什么情况下会抛出Interrupted异常:
假设while循环中没有调用任何的阻塞函数,比如就是通常的算术运算,或者打印一行⽇志信息,如下所示
package com.my1;public class MyThread extends Thread {@Overridepublic void run() {System.out.println(this);while (true) {//isInterrupted():判断目标线程是否被中断(如果是中断,即被中断了,那么该方法返回true,否则返回false),不会清除中断标记boolean interrupted = isInterrupted();//默认的this就是Thread对应的对象(因为我们就是使用当前类来操作的)//因为我们调用的start和run都是他//除非对应的run是另外一个操作接口的类,而不是当前继承Thread的类对象//当然打印自然是使用Thread的打印,因为我们值重写了run方法(当然这里是继承的重写,也可以不重写,重写的话,使用当前子类的版本,如果是接口,那么必须重写了),所以对应的结果this是一样的System.out.println("中断标记:" + interrupted);}}public static void main(String[] args) {Thread a = new MyThread();System.out.println(a);a.start();}
}
这个时候,在主线程中调用一句thread.interrupt(),请问该线程是否会抛出异常,答:不会
 public static void main(String[] args) throws InterruptedException {Thread a = new MyThread();System.out.println(a);//如果放在start前面,那么不会进行设置,默认false,因为中断是中断运行的a.start();Thread.sleep(10);a.interrupt();Thread.sleep(100);System.exit(0); //0为正常退出,非0为异常退出,前面的Runtime.getRuntime()的exit也差不多}//并且对应的打印信息由false变成了true了,但是我们可以发现,他的中断并不会使得线程停止运行,即只是一个标记而已
只有那些声明了会抛出InterruptedException的函数才会抛出异常(在上面的run方法里进行操作就知道了,但是这些异常并不会结束其运行,只会使得对应的阻塞不进行阻塞了(也就是往后执行),比如设置了wait,那么就进行唤醒,并且会自动设置为false的标记,当然如果他本来是false,那么不会出现异常,否则会出现异常,实际上是因为当他们进行阻塞时,如果判断是true的类型中断(会持续判断的,也就是说,在阻塞时,也会判断),那么会出现报错(这个报错可能慢一点出现,导致打印时,可能在对应的false后面,比如将run里面设置sleep,然后后面不延时的直接interrupt,那么观察打印即可),表示因为他在进行特殊的操作,不要暴力中断哦),比如也就是下面这些常用的函数:
public static native void sleep(long millis) throws InterruptedException {...}public final void wait() throws InterruptedException {...}public final void join() throws InterruptedException {...}//注意:他的异常在他内部进行了处理,由于是抛出,而不是try的处理,所以外面的catch(try的异常处理)一般会操作
//最后说明一下:当一般只有try的主体时(即都不写),catch基本只能是Exception可以操作,其他的基本都会报错,可能有其他的(具体可以百度,比如NullPointerException空指针异常就可以)
//当抛出异常后,自然他就会解除阻塞了
轻量级阻塞与重量级阻塞 :
通常也有轻量级锁和重量级锁,一般CAS实现的锁是轻量级的锁,而synchronized实现的锁是重量级的锁,这个轻量和重量代表执行效率,具体可以看这个博客:https://blog.csdn.net/Smartbbbb/article/details/120540878
能够被中断的阻塞称为轻量级阻塞,对应的线程状态是WAITING或者TIMED_WAITING
而像 synchronized 这种不能被中断的阻塞称为重量级阻塞(即没有办法进行操作他不阻塞的,而像wait这种,可以进行操作唤醒,而synchronized无论如何都需要等待锁释放,才可不被阻塞,所以你怎么操作,只要他没有锁释放,那么他就不会唤醒,即将我们的操作,使得不会进行唤醒的,就称为重量级的阻塞,也就是中断,中断会使得唤醒,或者说,解除阻塞),对应的状态是 BLOCKED
如图所示:调用不同的方法后,一个线程的状 态迁移过程

在这里插入图片描述

初始线程处于NEW状态,调用start()开始执行后,进入RUNNING或者READY状态,如果没有调用任何的阻塞 函数,线程只会在RUNNING和READY之间切换,也就是系统的时间⽚调度,这两种状态的切换是操作系统完成 的,除非⼿动调用yield()函数,放弃对CPU的占用(但并不一定会成功,可能还会回来,即又抢占的,因为这时还没有得出第一个,可以认为他的操作,相当于原本有多个start,你这个线程可以先运行操作的,你进行让步,使得其他线程先运行操作,这里就要知道一个问题,假设,有超多的start,那么在一定的程度上,对应的start可能会后执行,可能是必然的,所以在没有到达这个程度时,就算你让步,可能还会出现你会先运行的情况,但也只是可能,在这个程度后,自然,会有线程先一步运行,而不是可能,在26章博客中也说过"得到最开始的线程是需要时间的")
一旦调用了图中的任何阻塞函数,线程就会进入WAITING或者TIMED_WAITING状态,两者的区别只是前者为 无限期阻塞,后者则传入了一个时间参数,阻塞一个有限的时间,如果使用了synchronized关键字或者synchronized块,则会进入BLOCKED状态
不太常见的阻塞/唤醒函数,LockSupport.park()/unpark(),这对函数非常关键,Concurrent(一般是小写的concurrent)包中Lock(Callable也是该包下面的)的实现即依赖这一对操作原语(本质操作,一般来说原语的操作是不可分割的,即通常是原子性的,你可以认为是汇编的操作,那么汇编自然不能再次的分开,因为就如电流一样,必然是有顺序的,而必须的按照顺序执行,所以他的操作就是原子的,即只能一个线程进入),后面会说明的
因此interrupted()的精确含义是"唤醒轻量级阻塞"(而不用单独的操作对应的方法,而是一系列的轻量级阻塞,当然,这里说成唤醒还不够完善,最好是称为解除轻量级阻塞,因为唤醒基本都只是针对于wait来说的,而解除则都能说明),而不是字面意思"中断一个线程"
isInterrupted()与interrupted()的区别:
因为thread.interrupted()(将thread认为是Thread的引用)相当于给线程发送了一个唤醒的信号,所以如果线程此时恰好处于WAITING或者TIMED_WAITING状态,就会抛出一个InterruptedException,并且线程被唤醒,而如果线程此时并没有被阻塞, 则线程什么都不会做(只认为标记改变了,该执行的执行),但在后续,线程可以判断自己是否收到过其他线程发来的中断信号,然后做一些对应的处 理,当然,发生报错了,我们也能够在catch里进行操作,即这里可以操作中断的处理
这两个方法都是线程用来判断自己是否收到过中断信号的,前者是实例方法,后者是静态方法(当静态方法是自身拥有的时候,可以不用加上类名,继承的也可以不用加类名),二者的区别在 于,前者只是读取中断状态,不修改状态,后者不仅读取中断状态,还会重置中断标志位变成false
给出例子:
package com.my2;import com.my1.MyThread;/****/
public class Main extends Thread {public void run() {int i = 0;while (true) {boolean interrupted = isInterrupted();System.out.println("中断标记:" + interrupted);++i;if (i > 200) {// 检查并重置中断标志boolean interrupted1 = Thread.interrupted();System.out.println("重置中断状态:" + interrupted1);interrupted1 = Thread.interrupted();System.out.println("重置中断状态:" + interrupted1);interrupted = isInterrupted();System.out.println("中断标记:" + interrupted);break;}}}public static void main(String[] args) throws InterruptedException {Main myThread = new Main();myThread.start();Thread.sleep(10);myThread.interrupt();Thread.sleep(7);System.out.println("main中断状态检查-1:" + myThread.isInterrupted());System.out.println("main中断状态检查-2:" + myThread.isInterrupted());//打印也是需要时间的,所以最后两个可能是分开的,当然,打印的间隔没有时间,给先打印,自然在前面}}//这里需要考虑一件时,对中断的操作,必须是"认为阻塞"的,那么认为阻塞是什么意思呢,也就是说,如果对应的线程执行完了,那么设置中断是没有用的,也就是不会变成false,就如在start前面操作中断一样,不会进行设置,所以我们最好使得对应的run方法执行一定的时间(也就是"认为阻塞"),也就是上面的循环,你可以试着将循环去除,那么可以发现,主线程最后的两个打印是false
最后这里要注意的是,我们的中断只是给出具体的标志改变,具体的操作需要我们自己进行
比如说可以实现执行完毕,看如下代码:
package com.my2;/****/
public class Main1 extends Thread {public void run() {int i = 0;while (true) {//实际上之所以使用本类的方法不需要有引用是因为对应的this必然是这个类相关的,所以可以直接的调用(当在方法中使用本类的属性时,都会隐含地使用this关键字)boolean interrupted = isInterrupted();System.out.println("中断状态" + interrupted);if (interrupted == true) {System.out.println("退出了");break;}}}public static void main(String[] args) throws InterruptedException {Main1 myThread = new Main1();myThread.start();myThread.interrupt();//很明显,如果没有操作这个,那么线程不会结束,所以中断在一定的程度上,可以实现中断线程,虽然他只是标志而已}
}
而我们也可以这样的使用,由于synchronized对应的参数即是锁,也是资源,所以可以这样操作:
package com.my2;/****/
public class a extends Thread {static a a = new a();@Overridepublic void run() {while (true) {boolean interrupted = isInterrupted();System.out.println("中断状态" + interrupted);interrupt();System.out.println(66);break;}}
}
package com.my2;/****/
public class b extends Thread{@Overridepublic void run() {synchronized (a.a) { //是一个锁while (true) {if(a.a.isInterrupted()){ //也是资源System.out.println("退出了");break;}}}}
}
package com.my2;/****/
public class c {public static void main(String[] args) {b b = new b();a.a.start();b.start();}
}
即这里我们也进行了即是锁也是资源的验证,即可以说synchronized只是给资源加上标志而已,相当于前面的state,可以认为synchronized给资源加上标志,或者说任何资源都有标志,但我们基本理解为synchronized使得加上标志的
我们也可以操作catch来处理中断:
package com.my1;/****/
public class kk extends Thread {@Overridepublic synchronized void run() {int i = 0;while (i == 0) {System.out.println(1);try {wait();} catch (InterruptedException e) {e.printStackTrace();i = 1; //这里也可以使得进行退出}}}public static void main(String[] args) {kk kk = new kk();kk.start();kk.interrupt();}
}
所以中断的标志是非常有用的,因为他可以使得阻塞解除(或者说唤醒),而不用必须操作对应的方法进行解除阻塞(或者说唤醒)
线程的优雅关闭 :
stop与destroy函数 :
线程可以认为是"一段运行中的代码",比如一个运行中的方法,运行到一半的线程能否强制杀死?
答:基本最好不要,特别是在Java中,Thread类有stop()、destroy()等方法(destroy方法可能已经舍弃了,可能在老版本的JDK中能够找到),但这些方法官方明确不建议使用,原因很简单,如果强制杀死线程,则线程中所使用的资源,例如文件描述符、网络连接等无法正常关闭,那么可能会影响其他线程对其操作,相当于一直被占用,所以我们说基本最好不要(当然整个程序执行完毕会自动的清理的,即会进行完全释放,因为都清除了对应信息的,前提是程序都进行关闭)
因此,一个线程一旦运行起来,不要强行关闭,合理的做法是让其运行完(也就是方法执行完毕),干净地释 放掉所有资源,然后退出,如果是一个不断循环运行的线程,就需要用到线程间的通信机制,让主线程通知其退出即可
最后要注意:他们的只是可以说是直接的关闭,无论是在start之前执行,还是在之后执行,都会使得关闭,如果在start之前执行,那么不会启动线程操作,若在之后执行,则强制关闭
守护线程 :
daemon(我们也称为"守护")线程和非daemon线程的对比:
package com.my3;/****/
public class main extends Thread {@Overridepublic void run() {while (true) {System.out.println(1);System.out.println(Thread.currentThread().getName());try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {main myDaemonThread = new main();// 设置为daemon线程myDaemonThread.setDaemon(true); //设置为守护线程,本质上是设置值,即true代表守护线程//false代表非守护线程,所以如果这里是设置了false,那么就是设置非守护线程myDaemonThread.start();// 启动非daemon线程,当所有非daemon线程结束,不管daemon线程是否结束,都结束JVM进程new MyThread().start(); //删除该行代码来验证守护线程是否关闭,很明显,删除会进行直接的关闭,而没有删除则需要执行完才会关闭//最后注意:我们非守护线程完全关闭是需要时间的,而在这个时间下,守护线程可能会多执行了一会,可以选择删除上面的new MyThread().start();代码//然后将上面的main的run的定时操作删除,可以发现,对应打印很多次才进行关闭的}public static class MyThread extends Thread {public void run() {for (int i = 0; i < 10; i++) {System.out.println("非Daemon线程");try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}}}
}
对于上面的程序,在thread.start()前面加一行代码thread.setDaemon(true),当main函数退出后(没有其他的线程启动,即删除上面说明的那行代码的结果),线程thread就会退出,整个进程也会退出,当在一个JVM进程里面开多个线程时,这些线程被分成两类:守护线程和非守护线程,默认都是非守护线程,在Java中有一个规定:当所有的非守护线程退出后,整个JVM进程就会退出,意思就是守护线程"不算作数", 守护线程不影响整个 JVM 进程的退出,例如,垃圾回收线程就是守护线程,它们在后台默默工作,当开发者的所有前台线程(非守护线程)都退出之 后,整个JVM进程就退出了
但要知道,只有非守护进程都关闭了,那么无论守护进程是否执行完都会关闭,且基本是强制的,所以在一定的程度上,守护线程,最好不要操作资源,使得资源没有正常关闭(相当于一直被占用),即守护进程我们一般不操作需要关闭的代码操作,否则可能会出现资源没有正常关闭的情况(相当于一直被占用)
设置关闭的标志位 :
开发中一般通过设置标志位的方式,停⽌循环运行的线程
比如如下代码:
package com.my3;/****/
public class MyThread extends Thread {private boolean running = true;public void run() {while (running) {System.out.println("线程正在运行。。。");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public void stopRunning() {this.running = false;}public static void main(String[] args) throws InterruptedException {MyThread myThread = new MyThread();myThread.start();Thread.sleep(5000);myThread.stopRunning();myThread.join();}}//与前面通过中断标志基本类似,但是这里是我们设置的,而中断是自带的
但上面的代码有一个问题:如果对应run方中while循环中阻塞在某个地方,例如里面调用了 object.wait()函 数,那它可能永远没有机会再执行判断的代码,也就一直无法退出循环,此时,就要用到InterruptedException()(异常,自然会使用的)与interrupt()(主要是这个的执行)函数,使得他不阻塞了,即再次的进行判断使得退出,可以发现,这样的关闭我们并没有进行强制关闭线程,因为是程序自已进行关闭的,所以我们称这个关闭是优雅的关闭
具体使用中断的操作如下:
package com.my3;/****/
public class MyThread extends Thread {private boolean running = true;public synchronized void run() {while (running) {System.out.println("线程正在运行。。。");try {wait();Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public void stopRunning() {this.running = false;}public static void main(String[] args) throws InterruptedException {MyThread myThread = new MyThread();myThread.start();Thread.sleep(5000);myThread.stopRunning();myThread.interrupt(); //将这行代码进行删除或者不删除测试即可myThread.join();}}
并发核心概念:
并发与并行:
在单个处理器上采用单核执行多个任务即为并发,在这种情况下,操作系统的任务调度程序会很快从一个任务 切换到另一个任务,因此看起来所有的任务都是同时运行的
同一时间内在不同计算机、处理器或处理器核心上同时运行(总的)多个任务(可以一个核心一个或者多个,虽然有多个核心),就是所谓的"并行"(可以认为是多核,即多核心,如8核(心),即真正的一起操作了)
另一个关于并发的定义是,在系统上同时运行多个任务(不同的任务)就是并发
而另一个关于并行的定义 是:同时在某个数据集的不同部分上运行同一任务的不同实例就是并行
关于并行的最后一个定义是,系统中同时运行了多个任务(注意是同时)
关于并发的最后一个定义是,一种解释程序员将任 务和它们对共享资源的访问同步的不同技术和机制的方法,也基本是同时
这两个概念⾮常相似,而且这种相似性随着多核处理器的发展也在不断增强,因为核心越多,单个核心的线程就少,那么并发就容易变成并行(当然,这里只是对核心来说的,因为定义是多种的,若是程序来说,那么核心越多,执行速度越快,那么并发和并行就越相似了,但也只能是相似,而不能相等),即可以认为并行是并发的极致(即并行也可以称为并发,只是极致而已),但基本不会相同,因为核心不是无限多的
当然,上面是对资源的利用或者任务来说明的,在一般生活操作中或者某些业务需求 ,我们会认为进行分开操作说明,即同时进行为并行,不同时进行为并发
同步:
在并发中,我们可以将同步定义为一种协调两个或更多任务以获得预期结果的机制,为什么是并发中呢,实际上并发就代表任务(我们也称为并发任务),任何情况都可以称为并发,只是我们一般认为并发是同时操作的,而不是依次操作,所以在26章博客中"异步操作"和"同步操作"其实都属于并发,而其他直接说明并发的地方,我们一般都认为同时操作的,比如26章博客中"多线程并发(一起进发)的操作"
同步的方式有两种:
1:控制同步:例如,当一个任务的开始依赖于另一个任务的结束时,第二个任务不能再第一个任务 完成之前开始
2:数据访问同步:当两个或更多任务访问共享变量时(针对抢夺锁),在任意时间里,只有一个任务可以访问该变量(如synchronized的对应的参数,这是针对多个线程抢夺锁来说的,因为其他没有抢夺锁的线程能直接的使用并调用他的内容,或者操作他的内容)
可以发现,上面都相当于是锁的概念,因为对应的锁基本就是来满足同步的,而他们都是同步的一种方式,第一个决定"开始"使得同步,第二个决定"访问"使得同步
还有与同步密切相关的一个概念时临界段,临界段是一段代码,由于它可以访问共享资源,因此再任何给定时间内,只能被一个任务执行,互斥是用来保证这一要求的机制,而且可以采用不同的方式来实现
public synchronized void set(Object a) {//临界段   
}//或者:
public void set(Object a) {
synchronized(this){//临界段  
}
}
同步可以帮助你在完成并发任务的同时避免一些错误,但是它也为你的操作引入了一些开销(因为不能很快的同时进行了),你必须⾮常仔细地计算任务的数量,这些任务可以独立执行,他们在独立运行时,基本不需要并行(发)算法中的互通信(并发的互通,类似于对应的wait或者notify,因为这里说明的是单独的)
若要操作多个,那么这就涉及并发算法的粒度,如果算法有 着粗粒度(低互通信的大型任务,也就是不那么需要线程互通wait或者notify,既然互通少,自然线程就少,因为wait或者notify基本只会操作一个线程,既然他们少了自然线程少),那么同步方面的开销就会较低,因为低互通信自然线程就少,那么对于同步来说,开销就少,然而,也许你不会用到系统所有的核心,因为线程是比较少的,即粗粒度代表:不需要很细的互通
如果算法 有细粒度(高互通信的小型任务,这是相对的),同步方面的开销就会很高,而且该算法的吞吐量可能不会很好,因为越细代表操作越复杂(互通多了,那么就需要考虑通谁呢),即线程多,即细粒度代表:需要很细的互通
并发系统中有不同的同步机制,从理论角度看,最流行的机制如下:
信号量(semaphore):一种用于控制对一个或多个单位资源进行访问的机制,它有一个用于存放可用 资源数量的变量,而且可以采用两种原子操作来管理该变量,一般通过互斥来进行底层操作,互斥(mutex,mutual exclusion的简写形 式)是一种特殊类型的信号量,它只能取两个值(即资源空闲和资源忙),而且只有将互斥设置为忙的那 个进程才可以释放它资源,互斥可以通过保护临界段来帮助你避免出现竞争条件,一般我们会可以大致的(虽然并不是)认为wait或者notify是信号量的一种操作,其他具体解释可以百度查看
监视器:一种在共享资源上实现互斥的机制(一般我们也会认为是信号量,即利用了信号量,因为他有其他条件),它有一个互斥、一个条件变量、两种操作(等待条件和通报 条件),一旦你通报了该条件,在等待它的任务中只有一个会继续执行,一般我们会将synchronized对应的参数称为同步监视器,所以监视器我们是使用的最多的
如果共享数据的所有用户都受到同步机制的保护,那么代码(或方法、对象等等)就是线程安全的,数据的⾮阻塞 的CAS(compare-and-swap,比较和交换)原语是不可变的,这样就可以在并发应用程序中使用该代码而基本不会出现任何问题
不可变对象 :
不可变对象是一种⾮常特殊的对象,在其初始化后,不能修改其可视状态(如其属性值),如果想修改一个不可变对象,那么你就必须创建一个新的对象,不可变对象的主要优点在于它是线程安全的,你可以在并发应用程序中使用它而不会出现任何问题,不可变对象的一个例子就是java中的String类,当你给一个String对象赋新值时,会创建一个新的String对象(常量池),更具体的例子就是单例模式的饿汉式(前提是只读但不修改,否则也不是线程安全,因为可以多个线程得到,并修改的)
原子操作和原子变量 :
与应用程序的其他任务相比,原子操作是一种发⽣在瞬间的操作,在并发应用程序中,可以通过一个临界段来 实现原子操作,以便对整个操作采用同步机制
原子变量是一种通过原子操作来设置和获取其值的变量(即在原子操作里面),即可以认为只能一个来操作(其他人不可操作,即不可分割),我们可以使用某种同步机制(锁)来实现一个原子变量,或者也 可以使用CAS以无锁方式来实现一个原子变量(CAS可以认为是乐观锁里面的,实际上与版本号的操作思想类似,具体若要了解的话,可以看这个博客:https://blog.csdn.net/zhangjingao/article/details/86516038,这里就不多说了,在后面可能也会进行说明),而这种方式并不需要任何同步机制
一般我们认为:乐观锁是一种不使用锁的操作,而悲观锁是一种使用锁的操作
共享内存与消息传递 :
任务可以通过两种不同的方式来相互通信
第一种方法是共享内存,通常用于在同一台计算机上运行多任务的 情况,任务在读取和写入值的时候使用相同的内存区域,为了避免出现问题,对该共享内存的访问必须在一个由同 步机制保护的临界段内完成,可以认为这个共享内存是前面的生产者和消费者共同操作的数组或者队列或者数据等等
另一种同步机制是消息传递,通常用于在不同计算机上运行多任务的情形,当一个任务需要与另一个任务通信 时,它会发送一个遵循预定义协议的消息,如果发送方保持阻塞并等待响应,那么该通信就是同步的,如果发送方 在发送消息后继续执行自己的流程,那么该通信就是异步的,具体可以认为是分布式的集群操作,比如kafka的对应的同步发送(提交)或者异步发送(提交)等等,在对应的92章博客可以知道(当然,对应的kafka实际上可以是单个节点,具体可以看这个博客:http://t.zoukankan.com/aishanyishi-p-10325675.html,对应的92章博客是以三个节点为例子的,但他们基本都是差不多的操作,即我们可以选择就操作一个zk和ka,然后慢慢加即可,虽然我这里是3对3的)
这里概念只需要了解即可
并发的问题(这里概念也只需要了解即可) :
数据竞争:
如果有两个或者多个任务在临界段之外对一个共享变量进行写入操作,也就是说没有使用任何同步机制,那么 应用程序可能存在数据竞争(也叫做竞争条件)
在这些情况下,应用程序的最终结果可能取决于任务的执行顺序:
具体案例如下:
package com.my4;/****/
public class Data {private float myFloat;public void modify(float diff) {float value = myFloat;System.out.println(Thread.currentThread().getName() + "before-" + value); //我们将这个进行删除,然后执行,可以发现,他们的after打印并不都相同(因为上面的一开始的赋值是大于cpu时间,5>3,前面说明过时间的解释,而后续使得再次赋值时,时间大大减少,也是因为缓存,基本同时进行后面的赋值(他们是共同的,即若他们同时操作,只要有一个初始化完毕,那么另外一个同步也立即回使得初始化完毕,并会去除原来准备操作的缓存或者对应操作的信息或结果),这个时间可以认为是0.001,即可以忽略不计)//而如果加上了这个打印,甚至就是1的打印,那么会导致结果有相同,即其他的线程操作使得并没有影响了另外一个线程,即得到相同的数据了//那么上面的具体原因如下:而之所以删除会使得不相同,是因为单纯的读操作是线程安全的,所以有一定的间隙(认为是0.1,所以上面的0.001是可以忽略的),而后面正好是直接的进行操作,那么使得而不是都正好都通过,虽然不大,但是正好使得满足赋值的间隙(当然,通常都是可以满足的,但是可能因为浮动的原因,使得还是有相同了,因为可能操作时间长,那么还没有赋值,就已经得到原来的值了,即操作时间长了,与间隙小是一样的结果的意思),使得不相同,而加上了打印,使得在多次操作中,可能因为打印使得间隙变小使得操作相同的值了,因为里面的操作很多(打印的时间是相对来说是长的,即前面说明的100,所以自然会有变动),导致可能会变慢,而有些变快,这是不确定的,因为有浮动,而因为有浮动,所以可能操作相同的值,或者说缩短间隙了myFloat = value + diff;System.out.println(Thread.currentThread().getName() + "after-" + myFloat);}//所以这里需要提一下,单纯的读操作是线程安全的(一般来说他只是操作全局变量或者静态变量,当然,既然操作多线程,那么变量自然是全局的,所以这里一般没有限制的说明,即单纯的读操作是线程安全的)
}
package com.my4;/****/
public class MyThread extends Thread {private final Data data;public MyThread(Data data) {this.data = data;}public void run() {data.modify(1);}public static void main(String[] args) {Data data = new Data();new MyThread(data).start();new MyThread(data).start();new MyThread(data).start();}
}
若不操作前面的删除,该改变对应方法即可,即:
   public void run() {for (int i = 0; i < 10; i++) {data.modify(i);}}//因为是从1加上10的,那么如果正常的话,每个线程都会加上45,若结果不是135,那么代表有重合的数据,即其他的线程操作使得并没有影响了另外一个线程,即得到相同的数据了
假设有两个不同的任务执行了同一个modify方法,由于任务中语句的执行顺序不同,最终结果也会不同,也可能相同,而出现相同的结果自然是我们不希望看到的,因为有相同的数据了
这是因为modify方法不是原子的, 或者对应的Data的操作也不是线程安全导致的
死锁 :
当两个(或多个)任务正在等待必须由另一线程释放的某个共享资源,而该线程⼜正在等待必须由前述任务之一释放的另一共享资源时,并发应用程序就出现了死锁,当系统中同时出现如下四种条件时,就会导致这种情形,我们将其称为Coffman 条件:
互斥: 死锁中涉及的资源、必须是不可共享的,一次只有一个任务可以使用该资源
占有并等待条件: 一个任务在占有某一互斥的资源时⼜请求另一互斥的资源(比如说在内部加上锁,但是该内部的锁其他人在使用,而其他人的内部又是使用当前的锁,即持续等待了),当它在等待时,不会释放任何资源,这是死锁的主要条件
不可剥夺:资源只能被那些持有它们的任务释放
循环等待:任务1正等待任务2 所占有的资源, 而任务2 ⼜正在等待任务3 所占有的资源,以此类推,最 终任务n⼜在等待由任务1所占有的资源,这样就出现了循环等待
具体的例子,在26章博客有说明
有一些机制可以用来避免死锁:
忽略它们:这是最常用的机制,你可以假设自己的系统绝不会出现死锁,而如果发⽣死锁,结果就是你可 以停⽌应用程序并且重新执行它
检测:系统中有一项专⻔分析系统状态的任务,可以检测是否发⽣了死锁,如果它检测到了死锁,可以采取一些措施来修复该问题,例如,结束某个任务或者强制释放某一资源
预防:如果你想防⽌系统出现死锁,就必须预防Coffman 条件中的一条或多条出现
规避:如果你可以在某一任务执行之前得到该任务所使用资源的相关信息,那么死锁是可以规避的,当一 个任务要开始执行时,你可以对系统中空闲的资源和任务所需的资源进行分析,这样就可以判断任务是否 能够开始执行
活锁:
如果系统中有两个任务,它们总是因对方的行为而改变自己的状态, 那么就出现了活锁,最终结果是它们陷入 了状态变更的循环而无法继续向下执行
例如,有两个任务:任务1和任务2 ,它们都需要用到一个共同操作的资源(如int类型的变量)
假设他们内部都有一个循环,其中假设int类型的变量是a,初始值是10,任务1对他进行减减,任务2对他进行加加,任务1的结束条件是当该变量是0时结束,而任务二的结束条件是当该变量是20结束,很明显,由于又加加,又减减,这种 情况可以无限地持续下去,而这两个任务都不会结束自己的执行过程,当然,对应的减减的效率,或者加加的效率可能有细微的变化,使得可能会进行结束,在越小的范围就越容易结束,若结束了你可以选择将20变成200000,将10变成100000来进行测试
package com.my4;/****/
public class a extends Thread {static int i = 100000; //将原来的10变成了100000@Overridepublic void run() {while (i > 0) {i--;System.out.println("a:" + i);}}public static class b extends Thread {@Overridepublic void run() {while (i < 200000) { //将原来的20变成200000i++;System.out.println("b:" + i);}}}public static void main(String[] args) {a a = new a();b b = new b();a.start();b.start();}
}
当然,上面的例子是有结束的可能,所以我们再次的举例:
例如,仍然有两个任务:任务1和任务2 ,它们都需要用到两个资源:资源1和资源2,假设任务1对资源1加了一个 锁,而任务2 对资源2 加了一个锁,当它们无法访问所需的资源时(因为锁,所以要等待),就会重新开始循环操作(假设可以这样),只有资源都访问了,才不会使得继续循环操作,这种 情况可以无限地持续下去(因为必然是得不到对方的资源的,因为有锁了,必须等待,只是这里假设可以操作了循环,所以并不认为是死锁),所以这两个任务都不会结束自己的执行过程,当然,这只是一个例子
我们可以总结一下:死锁由于一直等待,那么cpu是没有重新获取的,只是一直等待的操作,可能会占用内存,因为任何操作基本都需要内存
而活锁:由于会进行重新获取,所以他又要获取cpu也会占用内存,通常情况下,活锁出现的多,因为死锁出现了,他是直接卡住的,自然容易被解决,而不会明面上的出现,比较具体的例子就是生产者和消费者,只是他们的条件是等待而不是退出线程,但中间的操作也就是活锁的概念
我们以操作共同资源的例子为主
资源不⾜:
当某个任务在系统中无法获取维持其继续执行所需的资源时,就会出现资源不⾜,当有多个任务在等待某一资 源且该资源被释放时,系统需要选择下一个可以使用该资源的任务,如果你的系统中没有设计良好的算法,那么系 统中有些线程很可能要为获取该资源而等待很长时间
要解决这一问题就要确保公平原则,所有等待某一资源的任务必须在某一给定时间之内占有该资源,可选方案 之一就是实现一个算法,在选择下一个将占有某一资源的任务时,对任务已等待该资源的时间因素加以考虑,然 而,实现锁的公平需要增加额外的开销,这可能会降低程序的吞吐量
优先权反转:
当一个低优先权的任务持有了一个高优先级任务所需的资源时(或者就是一个资源),就会发⽣优先权反转,这样的话,低优先权的 任务就会在高优先权的任务之前执行,实际上也就是说,谁先获得资源,谁先操作,即优先级越高的线程不一定先执行(前面也说明过了)
JMM内存模型:
JMM与happen-before:
为什么会存在"内存可见性"问题:
下图为x86架构下CPU缓存的布局,即在一个CPU的4核下,L1、L2、L3三级缓存与主内存的布局,每个核上面 有L1、L2缓存,L3缓存为所有核共用(可以看成JMM内存模型,我们认为是java虚拟机jvm的操作规范,后面会再次的说明的)

在这里插入图片描述

因为存在CPU缓存一致性协议(缓存同步,这是因为L3的存在导致的),例如MESI,多个CPU核心之间缓存不会出现不同步的问题,不会有"内存可见性"问题
缓存一致性协议对性能有很大损耗,为了解决这个问题,⼜进行了各种优化,例如,在计算单元和L1之间加了Store Buffer、Load Buffer(还有其他各种Buffer),如下图:

在这里插入图片描述

L1、L2、L3和主内存之间是同步的,有缓存一致性协议的保证,但是Store Buffer(写)、Load Buffer(读)和L1之间却是异步的,向内存中写入一个变量,这个变量会保存在Store Buffer里⾯,稍后才异步地写入L1中,然后同时同步写入 主内存中(他们是同时的,当然,缓存会先保留,然后将缓存内容写入内存,即缓存是在写入内存之前的),我们将他们统称为Buffer,以后也这样的说明
操作系统内核视角下的CPU缓存模型(注意是多个cpu,一般来说,我们的电脑是一个cpu,而我们之前说的核心是一个cpu的核心,即一个cpu可以有多个核心,如8核,只是之前我们统称为cpu了,即之前的说明都是说明核心的,而这里是多个cpu,真正的多个,而不是cpu里面的核心):

在这里插入图片描述

多CPU,每个CPU多核,每个核上⾯可能还有多个硬件线程,对于操作系统来讲,就相当于一个个的逻辑CPU,每个逻辑CPU都有自己的缓存,这些缓存和主内存之间不是完全同步的(即会有不一致的情况,即内存可见性的问题),所以一般来说,一个电脑我们基本只会有一个cpu,虽然可以增加cpu(具体可以去网上找教程)
对应到Java里,就是JVM抽象内存模型,如下图所示(多线程):

在这里插入图片描述

因为线程的就是获取cpu资源的(真正的来说就是获取对应核心提供的资源操作),那么对于这里来说,共享内存类似于多个cpu中的主内存,所以这里也会出现内存可见性的问题
更具体来说,就是上面的多个cpu或者多个线程(cpu核心),都没有像L3一样的操作同步,所以导致出现内存可见性的问题,那么为什么不使用呢,因为性能损耗较大,特别是多个cpu或者线程(核心),那么需要更多(因为他们是没有上限的,即线程可以无限增加,cpu也是如此,虽然cpu增加并不实际,但还是存在可以的),所以没有使用
上面的"内存可见性"的问题的概念了解即可,即内存可见性就是他进行操作改变数据了,但是我们看到的却是没有改变数据的结果,即改变的数据在当时是不可见的,即内存可见性问题,即内存可见性就是对应的当时对数据的操作的改变是否可见的意思,而对应的问题就是对改变不可见,那么就是只能看到原来的数据,而单纯的内存可见性,我们会认为是依次的,即可见改变的,所以若在后面说明了对某某可见,那么就是说明改变直接的可见的,即没有出现这个内存可见性的问题,比如在后面说明的"意味着A的执行结果必须对B可见"就是这样的例子
重排序与内存可见性的关系(接下来就是比较重要的说明了):
Store Buffer的延迟写入是重排序的一种,我们称这个重排序为内存重排序(Memory Ordering),即以前面的core0和core1为例,其中假设core0操作内存时,比如他是进行放入或者加加,由于对应的Buffer是异步的,那么可能core1操作内存后(比如读取或者减减),他才会操作缓存放入,使得看起来缓存中是core1先执行了,即比如先减减,然后加加(可能打印的信息是这样的),除此之外,还有编译器和CPU的指令重排序
重排序类型:
1:编译器重排序
对于没有先后依赖关系的语句,编译器可以重新调整语句的执行顺序
2:CPU指令重排序
在指令级别,让没有依赖关系的多条指令并行
3:CPU内存重排序(也可以称为内存重排序)
CPU有自己的缓存,指令的执行顺序和写入主内存的顺序不完全一致(相当于上面的core1先执行操作内存了,原本指令是core0开始的,但是写入或者操作主内存可能是core1来先操作的)
很明显,我们之前也说,cpu开始的抢占,实际上可以认为就是这里,即如start的主要抢占cpu是对应的Buffer(需要操作必然需要cpu来操作,只不过他是操作自身的cpu资源)
在三种重排序中,第三类或者第二类就是造成"内存可见性"问题的主因,我们以第三类为主,我们看如下案例:
线程1:X=1,a=Y
线程2:Y=1,b=X
假设X、Y是两个全局变量,初始的时候,X=0,Y=0,请问,这两个线程执行完毕之后,a、b的正确结果应该 是什么?
很显然,线程1和线程2的执行先后顺序是不确定的,可能顺序执行,也可能交叉执行,最终正确的结果可能 是:
a=0,b=1(线程1先操作完或者线程1操作快点,线程2再操作)
a=1,b=0(线程2先操作完或者线程2操作快点,线程1再操作)
a=1,b=1(线程1操作X=1,线程2操作Y=1,即一起操作,然后继续一起操作,虽然前面也是一起,即一起操作的,无论是单核还是多核,对应中间的间隔可以忽略,就算是单核,但对应的时间片切换的时间是非常小的,所以可以忽略,即时间片切换的时间,我们可以认为是0.0001,超级小,比单纯的普通操作或者没有操作还要小,所以就算你多执行0.0001秒,可能并不会有什么影响,当然抢占的时间是一开始是包含cpu资源分配的,所以时间多点,然后再该资源下进行时间片切换)
也就是不管谁先谁后,执行结果应该是这三种场景中的一种,但实际可能是a=0,b=0(极少情况出现(通常无论是少的线程还是多的线程,都是如此,因为对应的操作有先后的,即X=1在b=X之前操作,否则怎么出现对应的问题呢,因为如果你都先得到值了,那么对应的修改自然与你没有关系了),或者不会出现,所以在后面的测试中,基本上都是时间的问题导致的,所以了解即可,我们认为后面的是理论情况,但是如果出现,那么通常Buffer的cpu操作资源太慢了,因为是异步,所以对应的两个操作是不同的获取资源的操作,这是一般来说都很快,即再0.01的情况下,赋值完毕(比上面的0.0001秒多很多,所以时间片的切换一般可以使得认为同时运行),并操作内存,但是既然是两个操作,如果前面一个是0.1才操作完毕,那么就会出现这种两个都是0的情况,一般当电脑卡顿时,可能会出现,否则基本不会出现,但还是可能的)
两个线程的指令都没有重排序,执行顺序就是代码的顺序(因为X=1后面基本必然是a=Y),但仍然可能出现a=0,b=0,原因是线程1先执行X=1,后执行a=Y,但此时X=1还在自己的Store Buffer里⾯,没有及时写入主内存中(或者写入一半或者部分,写入一半或者部分在后面的构造方法溢出那里会体现,也有具体的例子),所以,线程2看到的X还是0,线程2的道理与此相同
虽然线程1觉得自己是按代码顺序正常执行的,但在线程2看来,a=Y和X=1顺序却是颠倒的,指令没有重排 序,但写入内存的操作被延迟了,也就是内存被重排序了,这就造成内存可见性问题
内存屏障 :
由于内存重排序基本不好处理,所以这里我们主要操作其他两种说明
为了禁⽌编译器重排序和 CPU 重排序(可能包括内存重排序,这里可以认为没有),在编译器和 CPU 层⾯都有对应的指令,也就是内存屏障(Memory Barrier),这也正是JMM和happen-before规则的底层实现原理
编译器的内存屏障,只是为了告诉编译器不要对指令进行重排序,当编译完成之后,这种内存屏障就消失了,CPU并不会感知到编译器中内存屏障的存在
而CPU的内存屏障是CPU提供的指令,可以由开发者显示调用
内存屏障是很底层的概念,对于 Java 开发者来说,一般用 volatile 关键字就⾜够了,但从JDK 8开始,Java在Unsafe类中提供了三个内存屏障函数,如下所示:
public final class Unsafe { //一般是这个包里面:jdk.internal.misc// ...public native void loadFence();public native void storeFence();public native void fullFence();// ...}
在理论层⾯,可以把基本的CPU内存屏障分成四种:
1:LoadLoad:禁⽌读和读的重排序
2:StoreStore:禁⽌写和写的重排序
3: LoadStore:禁⽌读和写的重排序
4:StoreLoad:禁⽌写和读的重排序
上面代表先后顺序,在后面的volatile实现原理中,会知道为什么的,这里先了解知道即可
Unsafe中的方法:
1:loadFence=LoadLoad+LoadStore
2:storeFence=StoreStore+LoadStore
3:fullFence=LoadLoad+LoadStore+StoreStore+StoreLoad(也就是loadFence+storeFence+StoreLoad,相同的这里认为是覆盖)
对于其他两个重排序,我们再程序里一般认为是执行的操作,由于执行是有顺序的,但是可能某些操作可能必须等待之前的加载,但是该操作也不会影响该之前的操作,那么我们可以选择将该操作放在前面来不用等待,你可能会有点疑惑,那么这里给出一个代码:
 public void fa() {int i = 0;int d = 0;while (d < 1) {d++;i = 9;break;}}
 public void fa() {int i = 0;i = 9;int d = 0;while (d < 1) {d++;break;}}
我们可以发现,第一个代码中,i=9,必须需要进入循环才可以操作,即i=9他需要等待,假设,while是异步的,且后面需要使用i=9才可以进行操作,那么如果我们将i=9放在前面,那么必然是能直接的提高效率,这里我只是大致的说明一下,具体可以看这个博客:https://blog.csdn.net/qq_40820563/article/details/118142192
特别的,我们最好定义好之后,直接的使用,使得他能当场使用,而不会去继续找,即可以认为操作会有缓存,如果进行了其他操作那么缓存覆盖
至此,很明显,重排序只是对应的顺序发生改变的说明(通常指令没有改变,只是对应的写入或者操作内存变慢了,在内存里面认为是重排序了,即重排序是相对于正常流程顺序发生改变的说明),或者操作的说明(手动改变顺序,我们一般认为是正常流程顺序发生改变的说明,而不是这里的手动改变顺序)
as-if-serial(英文意思:如同串行)语义 :
重排序的原则是什么?什么场景下可以重排序,什么场景下不能重排序呢?
1:单线程程序的重排序规则
无论什么语言,站在编译器和CPU的角度来说,不管怎么重排序,单线程程序的执行结果不能改变,这就是单 线程程序的重排序规则,即只要操作之间没有数据依赖性(注意是没有依赖的,因为对应的依赖若有,那么相当于前面的X=1,a=X,而不是a=Y了,自然也会影响对应情况,即也出现了内存可见性的问题,所以依赖关系就是对应的重排序的结果被对方使用的关系,即与对方是依赖关系),编译器和CPU都可以任意重排序,因为执行结果不会改变,代码看起来就像是完全串行地一行行从头执行到尾,这也就是as-if-serial语义
对于单线程程序来说,编译器和CPU可能做了重排序,但开发者感知不到,也不存在内存可见性问题
2:多线程程序的重排序规则
编译器和CPU的这一行为对于单线程程序没有影响(特别的对于内存重排序来说,也是一样,因为他不需要操作同步),但对多线程程序却有影响(比如前面的两个0的操作),对于多线程程序来说,线程之间的数据依赖性太复杂,编译器和CPU没有办法完全理解这种依赖性并据此做出最合理的优化,编译器和CPU只能保证每个线程的as-if-serial语义,线程之间的数据依赖和相互影响,需要编译器和CPU的上层来确定,上层要告知编译器和CPU在多线程场景下什么时候可以重排序,什么时候不能重排序
happen-before是什么:
使用happen-before是来描述两个操作之间的内存可见性的
java内存模型(JMM)是一套规范,在多线程中,一方⾯,要让编译器和CPU可以灵活地重排序,另一方⾯, 要对开发者做一些承诺,明确告知开发者不需要感知什么样的重排序,需要感知什么样的重排序,然后,根据需要 决定这种重排序对程序是否有影响,如果有影响,就需要开发者显示地通过volatile、synchronized等线程同步机 制来禁⽌重排序
关于happen-before(英文意思:发生在之前):
如果A happen-before B(A发生在B之前),意味着A的执行结果必须对B可见,也就是保证跨线程的内存可见性,但A happen before B不代表A一定在B之前执行,因为,对于多线程程序而言,两个操作的执行顺序是不确定的(前面说明的两个都是0的情况)
happen-before只确保如果A在B之前执行了,则A的执行结果必须对B可见,定义了内存可见性的约束,也就定义了一系列重 排序的约束
基于happen-before的这种描述方法,JMM对开发者做出了一系列承诺:
1:单线程中的每个操作,happen-before 对应该线程中任意后续操作(也就是 as-if-serial语义保证)
2:对volatile变量的写入,happen-before对应后续是对这个变量的读取,即写入要在读取的前面,相反,读取的后续也就是写入,即读取在写入后面,即相当于对应的Buffer如果操作的是该变量,那么必须等待其先写入或者读取,然后才可以读取或者写入,即等待操作放入缓存了,即变得有顺序了,而不是在对应的前面的
3:对synchronized的解锁,happen-before对应后续是对这个锁的加锁(即也会导致对应的操作有先后,因为我们必须等待该锁释放)
你可能会认为这个后续是什么意思,你可以先不考虑,因为在后面举出synchronized的该例子时,会有解释
可能还有其他的承诺,这里就不说明了
JMM对编译器和CPU 来说,volatile 变量不能重排序,⾮ volatile 变量可以任意重排序
happen-before的传递性 :
除了这些基本的happen-before规则,happen-before还具有传递性,即若A happen-before B,B happenbefore C,则A happen-before C
如果一个变量不是volatile变量,当一个线程读取、一个线程写入时可能有问题(前面的两个0的例子),那岂不是说,在多线程程序 中,我们要么加锁,要么必须把所有变量都声明为volatile变量,但这显然不可能,而这就得归功于happen-before的传递性:
这里进行具体说明该概念,先看代码:
class A {private volatile int a = 0;private volatile int c = 0;public void set() {a = 5; // 操作1c = 1; // 操作2}public int get() {int d = c; // 操作3return a; // 操作4}
}
假设线程A==先调用(或者执行快点)==了set,设置了a=5,之后线程B调用了get,返回值一定是a=5,为什么呢?
操作1和操作2是在同一个线程内存中执行的,操作1 happen-before 操作2,同理,操作3 happen-before操 作4,⼜因为c是volatile变量,对c的写入happen-before对c的读取(因为写在读前面),所以因为volatile的存在,使得操作2 happen-before操作3,利用happen-before的传递性,就得到:操作1 happen-before 操作2 happen-before 操作3 happen-before操作4,所以,操作1的结果,一定对操作4可见,即这也是volatile关键字的主要作用(上面有承诺说明),虽然这里主要是因为时间
volatile关键字使得读取等待写入进行操作后,读取才会操作(你可以认为是阻塞),即相当于对应的Buffer操作该关键字的资源时,会等待写入操作完毕(上一层阻塞,而使得认为Buffer阻塞了),而不会直接的操作了,即就算是你操作的慢,但是我也要等等,这样就解决了对应的重排序的问题,但是要注意,只要你操作写入完毕后,他的阻塞立马解除,那么如果对应的写入操作的方法有多个写入,可能只会是其中一个获取,但一般是最前面的几个,之所以是最前面的几个,是因为他的阻塞立马解除也是需要时间的,也就是说,如果a=5后面有a=6,那么返回的可能就是6,且基本是6,因为阻塞解除需要时间,然后若你在a=5和a=6中加上延时,比如设置为1000秒钟的延时阻塞,那么对应的结果是5了,因为他当前读取的也就是5,因为6还没有在读取之前进行设置,但是他是一开始就等待的吗,答:不是,而是操作了写入后,才会进行等待,即写入开始(等待)到写入结束(解除等待),所以如果先让读取执行,那么他还是会先读取的(并且对应写入阻塞),在这里好像并不会出现读取先进行操作,这是因为对应的对象初始化需要时间,而该时间导致后来的操作了缓存,使得先操作的与后操作的时间基本类似(可以认为是0.001的时间,前面也说明过),在这个情况下,由于return也需要时间(少于普通打印的1,认为是0.5),所以如果要进行测试该情况出现,可以在a=5前面加上打印"System.out.println(1);"来进行延时,即可进行测试,因为对于0.5的时间来说,对应打印1的时间是非常大的时间的,自然使得后操作
这里再次的给出例子:
class A {private int a = 0;private int c = 0;public synchronized void set() {a = 5; // 操作1c = 1; // 操作2}public synchronized int get() {return a;}
}
假设线程A先调用了set(注意是假设他先调用的或者执行快点),设置了a=5,之后线程B调用了get,注意他们是相同的锁(因为可以操作相同的A对象,这里就是这样的认为),那么返回值也一定是a=5
因为与volatile一样,synchronized同样具有happen-before语义,因为他本身就是依次操作的,展开上⾯的代码可得到类似于下⾯的伪代 码(伪代码就是简略的代码):
线程A:加锁; // 操作1a = 5; // 操作2c = 1; // 操作3解锁; // 操作4
线程B:加锁; // 操作5,因为在线程A对应调用完之前,不会进入,而是等待,即前面说明的"对应后续是对这个锁的加锁",后续这就是这个意思,即这个加上就是后续了,即操作4 happen-before 操作5,即对应后续就是操作5读取a; // 操作6解锁; // 操作7
根据synchronized的happen-before语义,操作4 happen-before 操作5,再结合传递性,最终就会得到:
操作1 happen-before 操作2……happen-before 操作7,所以,a、c都不是volatile变量,但仍然有内存可见性
那么有个问题,对应的Buffer在解锁后他还会没有操作完吗,那么这样会出现内存可见性的问题吗,答:不会没有操作完,因为解锁的操作他隐含的表示,需要执行完,而这个执行完并不是代码执行完,代表必须都放入缓存,然后缓存到内存(按照前面的图片模型),才算执行完毕,所以不会出现内存可见性的问题
既然上面说到了volatile关键字 ,那么接下来我们来进行学习他这个volatile关键字
volatile关键字 :
64位写入的原子性(Half Write) :
如,对于一个long型变量的赋值和取值操作而言,在多线程场景下,线程A调用set(100),线程B调用get(),在 某些场景下,返回值可能不是100
public class MyClass {private long a = 0;// 线程A调用set(100)public void set(long a) {this.a = a;}// 线程B调用get(),返回值一定是100吗?public long get() {return this.a;}
}
//实际上根据经验,我们也可以知道,线程B可能会先执行,但是就算线程A先执行,可能也会出现返回值不是100,前面已经说明了两个0的那个地方,只是这种情况基本不会出现而已,主要是因为时间
但这里还有个问题,由于JVM的规范并没有要求64位的long或者double的写入是原子的,在32位的机器上(是32位的机器,现在一般都是64位了),一个64位变量的写入 可能被拆分成两个32位的写操作来执行,这样一来,读取的线程就可能读到"一半的值",解决办法也很简单,在long前⾯加上volatile关键字即可,使得写入时,读取认为是阻塞的
重排序:DCL问题 :
单例模式的线程安全的写法不⽌一种,常用写法为DCL(Double Checking Locking),如下所示:
public class Singleton {private static Singleton instance;public static Singleton getInstance() {if (instance == null) {synchronized(Singleton.class) {if (instance == null) {// 此处代码有问题instance = new Singleton();}}}return instance;}
}
上述的 instance = new Singleton(); 代码有问题:其底层会分为三个操作:
1:分配一块内存
2:在内存上初始化成员变量
3:把instance引用指向内存
在这三个操作中,操作2和操作3可能重排序(通过前面的两个0,应该明白了,所以这里就不多说了),即先把instance指向内存,再初始化成员变量,虽然⼆者并没有 先后的依赖关系,但是使用者却有依赖关系,因为我要使用的,那么此时,另外一个线程可能拿到一个未完全初始化的对象,这时,直接访问里⾯的成员变量,就可能出错,这就是典型的"构造方法溢出"问题
解决办法也很简单,就是为instance变量加上volatile修饰
volatile的三重功效:64位写入的原子性、内存可⻅性和禁⽌重排序(用来解决内存可⻅性的)
所以虽然volatile他并不是内存重排序的解决方案,但是其实也间接的解决他了,因为他操作上一层的阻塞的原因(所以在前面我也说明"相当于对应的Buffer操作该关键字的资源"但也只是相当于)
volatile实现原理:
由于不同的CPU架构的缓存体系不一样,重排序的策略不一样,所提供的内存屏障指令也就有差异
这里只探讨为了实现volatile关键字的语义的一种参考做法:
1:在volatile写操作的前⾯插入一个StoreStore屏障(禁止写和写重排序),保证volatile写操作不会和之前的写操作重排序(因为他是禁止写和写的重排序的)
2:在volatile写操作的后⾯插入一个StoreLoad屏障(禁止写和读重排序),保证volatile写操作不会和之后的读操作重排序
也就是说,若放在前面,那么后面的写就是我们当前的,若放在后面那么前面的写就是我们当前的,同理对应的读也是如此
3:在volatile读操作的后⾯插入一个LoadLoad屏障(禁止读和读重排序)+LoadStore屏障(禁止读和写重排序),保证volatile读操作不会和之后的读操 作、写操作重排序,也就是说,我们前面虽然操作了写入时,读操作认为阻塞,实际上反过来读操作也会阻塞写操作
具体到x86平台上,其实不会有LoadLoad、LoadStore和StoreStore重排序,只有StoreLoad一种重排序(内 存屏障),也就是只需要在volatile写操作后⾯加上StoreLoad屏障,即写后面只有读,所以在前面说明volatile时,一开始就是写入的后续是读的
JSR-133对volatile语义的增强 :
在JSR -133之前的旧内存模型中,一个64位long/ double型变量的读/ 写操作可以被拆分为两个32位的读/写操 作来执行,从JSR -133内存模型开始 (即从JDK5开始),仅仅只允许把一个64位long/ double型变量的写操作拆分
为了两个32位的写操作来执行,任意的读操作在JSR -133中都必须具有原子性(即 任意读操作必须要在单个读事务中 执行,或者可以说基本在写操作后面)
这也正体现了Java对happen-before规则的严格遵守
final关键字 (final是可以被反射改变的,具体可以看这个博客:https://www.bbsmax.com/A/E35paeaEdv/):
构造方法溢出问题 :
考虑下⾯的代码:
public class MyClass {private int num1;private int num2;private static MyClass myClass;public MyClass() {num1 = 1;num2 = 2;}/*** 线程A先执行write()或者执行快点*/public static void write() {myClass = new MyClass();}/*** 线程B接着执行read()*/public static void read() {if (myClass != null) {int num3 = myClass.num1;int num4 = myClass.num2;}}
}
num3和num4的值是否一定是1和2?
num3、num4不⻅得一定等于1,2,和DCL的例子类似,也就是构造方法溢出问题,你可能在测试中,测试不出来,试着将start改变顺序,就会测试出来了,因为时间的问题(可能在前面说明过,他的开始线程需要时间,但是实际上,在前面的start基本是最先开始的,即可以认为最先开始执行start的线程容易先操作)
当然,对应的重排序我们基本是测试不出来的,大多数情况下是时间的问题导致的,所以前面的说明都只是理论而已
myClass = new MyClass()这行代码,分解成三个操作:
1:分配一块内存
2:在内存上初始化i=1,j=2(初始化时,可能初始化一半就操作了,因为其中的初始化是多个操作,而不是一个写里面的,所以可能导致volatile仍然出现一半)
3:把myClass指向这块内存
操作2和操作3可能重排序,因此线程B可能看到未正确初始化的值,对于构造方法溢出,就是一个对象的构造并不是"原子的",当一个线程正在构造对象时,另外一个线程却可以读到未构造好的"一半对象或者部分"
final的happen-before语义 :
要解决这个问题,不⽌有一种办法
办法1:给num1,num2加上volatile关键字(volatile只需要在类型的前面即可,其他任何的关键字随便放位置),你可能加上了这个也不会有什么作用,这是因为他们并没有出现特殊的情况,而只是由于时间的问题而导致的
办法2:为read/write方法都加上synchronized关键字, 如果num1,num2只需要初始化一次,还可以使用final关键字, 之所以能解决问题,是因为同volatile一样,final关键字也有相应的happen-before语义:
1:对final域的写(构造方法内部),happen-before与后续对final域所在对象的读
2:对final域所在对象的读,happen-before于后续对final域的读
这是因为他们因为不能改变了,所以final实际上也内部操作了该语义(即内存屏障)
通过这种happen-before语义的限定,保证了final域的赋值,一定在构造方法之前完成,不会出现另外一个线 程读取到了对象,但对象里⾯的变量却还没有初始化的情形,避免出现构造方法溢出的问题
happen-before规则总结:
1:单线程中的每个操作,happen-before与该线程中任意后续操作
2: 对volatile变量的写,happen-before与后续对这个变量的读
3:对synchronized的解锁,happen-before与后续对这个锁的加锁
4:对final变量的写,happen-before与final域对象的读,happen-before于后续对final变量的读
四个基本规则再加上happen-before的传递性,就构成JMM对开发者的整个承诺,在这个承诺以外的部分,程 序都可能被重排序,都需要开发者小心地处理内存可⻅性问题

在这里插入图片描述

至此,对应的理论说明完毕,虽然他们基本不会出现
第⼆部分:JUC (JUC是java.util.concurrent包的简称,所以在后面我们主要说明JUC):
从这里开始,对应的操作只是大致的说明一下,即这里了解即可(可以大致的过一遍)
并发容器 :
在学习他们之前,我们首先了解Lock和他的Condition操作,这样,在后面我们才能进行更加深入的了解:
首先是Lock,我们直接的看代码:
package com;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/****/
public class my5 extends Thread {//创建锁private ReentrantLock lock = new ReentrantLock();//得到Lock的直接操作的类似于wait以及notify的操作,而正是这个创建,所以与synchronized的操作阻塞的一样,他也基本需要(通常是只能)在该Lock里(注意是"该Lock",在后面的"LinkedBlockingQueue和ArrayBlockingQueue的差异"也会提到的)进行使用,否则也会报错Condition cd = lock.newCondition(); //在idea可能是没有提示的,因为这里是成员,idea为了防止某些先后操作(如静态的),他就不会提示了,这里来创建Condition,通常需要lock来创建才行//也可以这样,因为对象是相同的private Lock lockk = new ReentrantLock(); //Lock是ReentrantLock的父类public void run() {lock.lock(); //这样代表我们加上了锁,注意,这个锁有时候可以与synchronized不同,他可能允许多个锁可以同时被获得(比如后面的读写锁的读锁,后面会说明的,这个是互斥锁,当然这里只是对于获得锁来说的,而对于Condition来说,对应的读锁是得不到的,后面也会说明),一般默认多个,这是因为创建锁的缘故,即类的缘故,虽然这里的ReentrantLock这个类不会这样,换言之当lock在其他方法里操作时,可能需要等待释放或者不需要,即这是lock对象的缘故try {cd.await(); //代表阻塞,相当于wait} catch (InterruptedException e) {e.printStackTrace();}cd.signal(); //代表唤醒阻塞,相当于notifySystem.out.println(1);lock.unlock(); //这样代表释放锁}public static void main(String[] args) {my5 m = new my5();m.start();}
}//那么await和wait以及signal和notify他们的作用是完全一样的吗,答:作用可以说是完全一样,那么我们以这样的说明往后面进行学习//至此,我们说明了Condition了,即完成了对Condition的前面的"在后面会说明,这里先进行了解"//最后说明一下:多线程执行相同的方法或者变量时(无论是静态的还是不是静态的),都是使用副本,只是可能变量的获取可能不是变化的,所以会出现不一致的问题
//并且也要注意:lock操作加锁时,只能操作对应的引用,所以其他也操作这个lock,那么与这个共同阻塞操作,如果是其他的lock引用,那么不会阻塞,因为他阻塞的只是对应的lock引用而已,就如synchronized只是对其操作的参数进行阻塞操作而已一样
//当然,还有其他的细节,在后面会说明的
在说明容器之前我要说明一下:可能不同版本的jdk对应的类的部分代码与对应我给出的类的部分代码可能会有所不同,即发生了改变,但变化不大,基本是不会出现问题的,如果不同,一般也能根据作用来理解意思,具体还是要看自己的能力了
BlockingQueue (Block英文意思:阻塞):
在所有的并发容器中,BlockingQueue是最常⻅的一种,BlockingQueue是一个带阻塞功能的队列,当入队列 时,若队列已满,则阻塞调用者,当出队列时,若队列为空,则也阻塞调用者
在Concurrent包中,BlockingQueue是一个接口,有许多个不同的实现类,如图所示(我们一般只会说明基本的,可能不会都进行说明,看后面吧):

在这里插入图片描述

该接口的定义如下:
public interface BlockingQueue<E> extends Queue<E> {//...boolean add(E e);boolean offer(E e);void put(E e) throws InterruptedException;boolean remove(Object o);E take() throws InterruptedException;E poll(long timeout, TimeUnit unit) throws InterruptedException;//...}
该接口和JDK集合包中的Queue接口是兼容的,同时在其基础上增加了阻塞功能
在这里,入队提供了add(…)、offer(…)、put(…)3个方法,那么有什么区别呢:
从上⾯的定义可以看到,add(…)和offer(…)的返回值是布尔类 型,而put无返回值,还会抛出中断异常,所以add(…)和offer(…)是无阻塞的,也是Queue本身定义的接口,而put(…)是阻塞的,是该BlockingQueue接口自己定义的
add(…)和offer(…)的区别不大,当队列为满的时候,前者会抛出异常,后者则直接返回false
出队列与之类似,提供了remove()、poll()、take()等方法,remove()是⾮阻塞式的,take()和poll()是阻塞式 的(take他BlockingQueue自己写的,pull是重写的,因为接口也是类哦,无论什么类型,我们创建一个该类型说明的java文件时(比如接口,注解,枚举等等),虽然没有直接叫类,但我们称他为特殊类,他们都是类,所以重写也符合他们)
ArrayBlockingQueue :
ArrayBlockingQueue是一个用数组实现的环形队列,在构造方法中,会要求传入数组的容量
public ArrayBlockingQueue(int capacity) { //capacity英文翻译:容量this(capacity, false);
}public ArrayBlockingQueue(int capacity, boolean fair) { //fair英文翻译:公平的// ...}public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {this(capacity, fair);// ...}
其核心数据结构如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {//...
//数据存放的数组final Object[] items;// 队头指针int takeIndex;// 队尾指针int putIndex;int count;// 核心为1个锁外加两个条件final ReentrantLock lock; //用来操作lock锁的,比如可以这样:lock.lock();,在26章博客有操作该ReentrantLock,并且使用了lock.lock();private final Condition notEmpty; //使用了Conditionprivate final Condition notFull;//...}
其put/take方法也很简单,如下所示
put方法:

在这里插入图片描述

 public void put(E e) throws InterruptedException { //他是可以中断的,因为他抛出这个异常,后面的方法中,基本都操作了这个Objects.requireNonNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //可中断的Lock,因为Lock也就是锁,一般来说,自然可以操作中断,使得不进行阻塞,虽然这里没有操作,这里是加锁try {while (count == items.length)notFull.await(); //若队列满,则阻塞,这里相当于waitenqueue(e);} finally {lock.unlock(); //解锁,如果不操作他,那么说明他是一直阻塞的,相当于lock的加锁之后的代码都是加锁的,而有unlock说明到头了,并且释放}}

在这里插入图片描述

private void enqueue(E e) { //如果满了自然阻塞,但是如果没有满,那么到这里进行添加// assert lock.isHeldByCurrentThread();// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items; //得到对应的数组,在这里我们也称为队列items[putIndex] = e; //该添加的位置进行赋值添加的数据,因为对应是加锁的,所以这里不会出现不一致的问题if (++putIndex == items.length) putIndex = 0; //如果当前的最后,那么变成0开始进行赋值count++;notEmpty.signal(); //当将数据put到queue队列之后,通知非空条件,即没有空,可以得到数据了,认为使得take不阻塞,这里相当于定向的notify}
take方法:

在这里插入图片描述

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await(); //take的时候,如果队列为空,则阻塞,这里相当于waitreturn dequeue();} finally {lock.unlock();}}

在这里插入图片描述

 private E dequeue() {// assert lock.isHeldByCurrentThread();// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E e = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length) takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal(); //take结束,通知非满条件,即没有满,可以添加数据了,认为使得put不阻塞,这里相当于定向notifyreturn e;}
我们可以发现,使用图片加代码的形式,比较麻烦且占博客空间,所以在后面,我只给出代码了
并且你还可以发现,我们在前面操作的生产者和消费者模型的代码与他有点类似,的确是有点类似,因为他也是使用锁,只不过是lock锁,所以他的思想就与之前的生产者和消费者的代码思想是差不多的,之所以是差不多是因为他使用了Condition的操作,来导致消费者通知生产者或者生产者通知消费者,而不会消费者通知消费者或者生产者通知生产者,虽然因为锁的存在,可以不用这样,但我们需要严谨的,这里我们来具体说明一下,为什么Condition可以不会自己通知自己,我们知道notify是随机的,所以他不用说明,那么我们来具体说明一下Condition的说明(补充说明):实际上他之所以可以定向的唤醒,是因为他只能唤醒相同的Condition,什么意思呢,也就是说,如果你创建了三个Condition,其中分别是A,B,C,那么A的唤醒只能唤醒A的阻塞(因为在lock中Condition是用来阻塞和唤醒的),所以Condition只能操作对应的唤醒,也就实现了定向的唤醒操作,所以可以使得消费者只通知生产者,因为A阻塞后面的代码可以设置为B的唤醒就是这样,但是他也可以消费者唤醒消费者,因为A阻塞的代码后面也可以是A的唤醒,至此说明完毕
LinkedBlockingQueue :
LinkedBlockingQueue是一种基于单向链表的阻塞队列,因为队头和队尾是2个指针分开操作的,所以用了2把 锁+2个条件,同时有1个AtomicInteger的原子变量记录count数
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {// ...private final int capacity;// 原子变量,一般用来存放数据个数的private final AtomicInteger count = new AtomicInteger(0);// 单向链表的头部private transient Node<E> head;
// 单向链表的尾部private transient Node<E> last;// 两把锁,两个条件private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFUll = putLock.newCondition();//一个锁可以创建多个Condition,但是该多个Condition是不相同的,因为底层是:return new ConditionObject();,所以他们之间的唤醒也是定向的,即一个Condition只能唤醒自身,除非调用其他的Condition// ...
}
在其构造方法中,也可以指定队列的总容量,如果不指定,默认为Integer.MAX_VALUE
public LinkedBlockingQueue() {this(Integer.MAX_VALUE); //也就是后面的LinkedBlockingQueue(int capacity)}@Native public static final int MAX_VALUE = 0x7fffffff; 
//0x7fffffff表示int的最大值,0x表示是16进制,7表示二进制0111,F表示二进制1111
//那么由于他是int类型,所以结果是01111111 11111111 11111111 11111111即2^31-1
//进行分开就是0111 1111 1111 1111 1111 1111 1111 1111
 public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null); //可以这样连接的赋值,因为赋值本身返回的就是赋值的变量,所以可以看成返回head,然后赋值给last}
put/take实现(put英文意思:放,take英文意思:拿):
public E take() throws InterruptedException {final E x;final int c;final AtomicInteger count = this.count; //得到变量(原子的,自然只能一个线程操作,前面说明过了原子操作就是,相当于加了锁的)final ReentrantLock takeLock = this.takeLock; //得到锁takeLock.lockInterruptibly(); //加锁try {while (count.get() == 0) {notEmpty.await(); //没有元素就等待}x = dequeue();c = count.getAndDecrement(); //得到当前元素,他里面进行减减了,所以c如果是1,说明没有元素了,因为减减变成0if (c > 1)notEmpty.signal(); //如果还有元素,则通知其他take线程,你可能会有疑惑,上面的阻塞不是与这里冲突吗,在后面会解释的} finally {takeLock.unlock(); //到这里,上面是一个整体锁了}if (c == capacity) //capacity代表当前的阻塞队列的容量,很明显,如果c与他相等,说明他将满的消费了,自然直接的唤醒signalNotFull(); //通知put,你可以看成是这样//那么有个问题,为什么这里要加判断呢,好像是可以不加的,答:的确可以不加,因为你到这里,必然是消费的,但是如果你不加,就需要超多的锁,因为需要加锁(锁是有两个的),所以为了节省次数,我们就进行最后的判断加锁即可,因为对方是始终生产的,只有满了才会阻塞,那么这里只需要使得满了唤醒即可,其他时刻不需要唤醒了,因为是始终生产的(两个锁造成的,所以这里是这样)return x;}private void signalNotFull() {final ReentrantLock putLock = this.putLock; //得到对应的锁putLock.lock();try {notFull.signal(); //直接的通知,那么对应的notFull就会进行唤醒} finally {putLock.unlock();}}
   public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();final int c;final Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly(); //这里就不多说了,仍然是加锁try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*/while (count.get() == capacity) {notFull.await(); //如果满了则阻塞,而消费者如果满了自然会唤醒,其他情况不用操作唤醒的,因为也不会阻塞}enqueue(node); //进行添加c = count.getAndIncrement(); //返回加加(也可以说成加1)之前的数if (c + 1 < capacity) //如果是倒数第一个,那么实际上已经满了,也就没有剩余空间了,那么返回的c+1必然就是容量,所以这里要小于才算有剩余空间notFull.signal(); //如果队列还有剩余空间,则通知其他put线程} finally {putLock.unlock();}if (c == 0) //与消费者同理(前面的对应的解释说明),如果刚好加上了数据(即有数据了),自然进行通知signalNotEmpty(); //通知take}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}
从上面你可以发现,与我们之前的生产者和消费者有个不同的是,他并不是生产一个或者消费一个就进行唤醒,而是生产第一个或者消费最后一个才会唤醒,这样能减少唤醒的次数,因为对应的中间情况我们是不需要进行唤醒的(实际上是因为其他情况,消费者和生产者是自动操作的,并不需要通知,而不会像单个锁一个需要通知,所以这里只需要边界通知即可),即因为他也并没有阻塞,即节省了时间,我们一般将这种优化称为边界通知,那么还有一个重要问题,为什么他的中间会操作唤醒自身呢,但是前面已经是阻塞的,为什么还要进行呢,这是为了当多个线程一起操作时来唤醒另外一个线程的,因为他是定向的操作,所以他也只会唤醒对应自己的消费者或者生产者的线程(因为其他多个线程可以都被阻塞,因为阻塞是会释放锁的,前提是该阻塞会释放锁,比如sleep虽然阻塞了,但并没有释放锁,而await和wait都会释放锁),而不会操作对立的线程,但他们直接的唤醒也仍然是随机的(如多个await的唤醒,但是是同一个Condition),至此我们说明完毕
LinkedBlockingQueue和ArrayBlockingQueue的差异:
1:为了提高并发度,LinkedBlockingQueue用2把锁,分别控制队头、队尾的操作(上面说的生产第一个或者消费最后一个才会唤醒),且操作通知自身消费者或者生产者,即两个通知,这就是使用两把锁的原因(使得他们的通知有两个且互不影响,并且可以生产和消费都进行操作,即不互斥),所以并且也意味着在put(…)和put(…)之间、take()与take()之间是互斥的(需要其他线程,单个线程的put不会操作他的再次的put,同样take也是如此),put(…)和take()之间并不互斥(各自的线程调用),但顶端可以认为互斥(虽然单个锁必然都互斥),互斥代表只能一个线程进入,但对于count变量,双方都需要操作,所以必须是原子类 型(他是在锁里面的,所以是原子变量),而ArrayBlockingQueue只是一个锁,即只会通知对方,这是主要的差异
2:LinkedBlockingQueue因为各自拿了一把锁,所以当需要调用对方的condition的signal时,还必须再加上对方的锁(这里补充一下,对应的condition只能在创建他的锁里进行操作,否则报错),这样才能操作对方,使得定向,就是signalNotEmpty()和signalNotFull()方法,示例如下所示
  private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock(); //必须先获取对方的锁,才可调用try {notEmpty.signal(); //通知消费take} finally {takeLock.unlock();}}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock(); //必须先获取对方的锁,才可调用,该锁如果被其他线程获取,我们需要等待,try {notFull.signal(); //通知生产put} finally {putLock.unlock();}}
总之:不仅put会通知 take,take 也会通知 put,当put 发现⾮满的时候,也会通知其他 put线程,当take发现 ⾮空的时候,也会通知其他take线程,主要的差异
PriorityBlockingQueue:
队列通常是先进先出的,而PriorityQueue是按照元素的优先级从小到大出队列的,正因为如此,PriorityQueue中的2个元素之间需要可以比较大小,并且他们需要实现了Comparable接口
其核⼼数据结构如下:
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {//...// 用数组实现的⼆插小根堆(或者说最小推或者说堆),因为从小到大,那么数组第一个或者说堆的顶部就是最小的private transient Object[] queue;private transient int size;private transient Comparator<? super E> comparator; //定义比较,如果定义了,那么认为比较方式是这个,否则就是元素的,一般以比较者为主,必然,4要和3比较大小,那么使用4的比较器// 1个锁+一个条件,没有⾮满条件private final ReentrantLock lock;private final Condition notEmpty;//...}
其构造方法如下所示,如果不指定初始大小,内部会设定一个默认值11,当元素个数超过这个大小之后,会自 动扩容,这就是没有非满条件,因为是没有满的(会扩容嘛)
public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null); //也就是下面的方法了,因为他调用另外一个构造函数,这是this或者说this()或者说this(参数列表)的操作private static final int DEFAULT_INITIAL_CAPACITY = 11;}public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) { //这里就定义了比较if (initialCapacity < 1)throw new IllegalArgumentException();this.comparator = comparator;this.queue = new Object[Math.max(1, initialCapacity)];}
下⾯是对应的put/take方法的实现:
put方法的实现:
public void put(E e) {offer(e); // never need to block:永远不需要阻挡,即不会阻塞,因为会扩容}public boolean offer(E e) {if (e == null)throw new NullPointerException(); //直接空指针异常final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] es;while ((n = size) >= (cap = (es = queue).length))tryGrow(es, cap); //元素数超过了数据的长度,则扩容try {final Comparator<? super E> cmp;if ((cmp = comparator) == null) //如果没有定义比较条件,则使用元素自带的比较功能siftUpComparable(n, e, es); //通过源码可以发现他是key.compareTo((T) e),这个key也就是e,所以的确是使用比较者的比较器,然后元素入堆,底层的意思是比较数组从末尾依次比较到开始,只要比对应的大或者相等,就赋值(他是往后面赋值,看代码就知道了,是k,只要数量都满了自然扩容),或者都小,那么就是第一个赋值了(k已经赋值了或者说需要k=0跳出),即数组[0]就是最小的值,且他们底层操作向右移动,并且有扩容的判断,所以不用考虑覆盖或者数据遗失问题,下面的基本也是如此,只是比较器的操作需要他自身来了else//使用我们设置的比较器,然后元素入堆,即这里我们都执行siftUp(简写了)操作siftUpUsingComparator(n, e, es, cmp);size = n + 1;notEmpty.signal(); //唤醒可以消费了,即不为空条件} finally {lock.unlock();}return true;}
take的实现:
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {//出队列while ( (result = dequeue()) == null) //如果队列是空的,那么阻塞,否则会直接的返回notEmpty.await();} finally {lock.unlock();}return result; //直接的返回}private E dequeue() {// assert lock.isHeldByCurrentThread();final Object[] es;final E result;//因为是最小二叉堆,堆顶就是要出队的元素,即我们将最小的进行出队if ((result = (E) ((es = queue)[0])) != null) { //得到堆顶元素,即第一个,在前面的元素入堆中已经说明过了final int n;final E x = (E) es[(n = --size)];es[n] = null;if (n > 0) {final Comparator<? super E> cmp;if ((cmp = comparator) == null)//调整堆,执行siftDown操作siftDownComparable(0, x, es, n);elsesiftDownUsingComparator(0, x, es, n, cmp);}}return result;}
从上⾯可以看到,在阻塞的实现方⾯,和ArrayBlockingQueue的机制相似,主要区别是用数组实现了一个⼆ 叉堆,从而实现按优先级从小到大出队列,另一个区别是没有notFull条件,当元素个数超出数组长度时,执行扩容 操作
DelayQueue:
DelayQueue即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue(优先队列,一般代表元素通过比较器来操作的,如这里就是延迟时间),所谓延迟时间,就是"未来 将要执行的时间"减去"当前时间",为此,放入DelayQueue中的元素,必须实现Delayed接口(因为接口可以指向对象,而元素自然就是对象),如下所示:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> //因为泛型的原因,基本上只能是传递接口指向对象的那个变量(即接口的变量,因为父类可以指向子类)implements BlockingQueue<E> {public interface Delayed extends Comparable<Delayed> {/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit* @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit);
}
关于该接口:
1:如果getDelay的返回值小于或等于0,则说明该元素到期,需要从队列中拿出来执行
2:该接口⾸先继承了 Comparable 接口,所以要实现该接口,必须也会实现 Comparable 接口,具体来说,就是基于getDelay()的返回值(需要)比较两个元素的大小
下⾯看一下DelayQueue的核⼼数据结构:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {// ...// 一把锁和一个⾮空条件private final transient ReentrantLock lock = new ReentrantLock();private final Condition available = lock.newCondition();// 优先级队列(或者说最小推或者说堆),这个是保存数据的,因为从小到大,那么第一个或者说堆的顶部就是最小的private final PriorityQueue<E> q = new PriorityQueue<E>();// ...}
下⾯介绍put/take的实现,先从take说起,因为这样更能看出DelayQueue的特性:
 public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek(); //取出二叉堆的堆顶元素但不清除(删除,移除)他,即延迟时间最小的,因为是以延迟时间作为比较的if (first == null)available.await(); //若队列为空,take线程阻塞else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0L) //若堆顶的元素值,即延迟时间小于等于0,出队列然后返回return q.poll(); //取出并清除(删除,移除)他first = null; // don't retain ref while waiting:等待时不保留ref,即不需要使用该变量了if (leader != null) //如果有其他线程也在等待该元素,那么我自然也会无限期等待(也就是没有设置时间)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay); //否则阻塞有限的时间(也就是有设置时间)} finally {if (leader == thisThread)leader = null;}}}}} finally {//一般通过中断会到这里(因为上面是无限循环)if (leader == null && q.peek() != null)available.signal(); //当前线程是leader,已经获取了堆顶元素,唤醒其他线程lock.unlock();}}
关于take()方法:
1:不同于一般的阻塞队列,只在队列为空的时候,才阻塞,如果堆顶元素的延迟时间没到,也会阻塞
2:在上⾯的代码中使用了一个优化技术,用一个Thread leader变量记录了等待堆顶元素的第1个线程或者说,在等待的线程(一般来说多个线程操作该同一个DelayQueue类,所以对应的变量是共同的),为 什么这样做呢?
由于通过 getDelay(…)可以知道堆顶元素何时到期,不必无限期等待,可以使用condition.awaitNanos()等待一个有限的时间,只有当发现还有其他线程也在等待堆顶元素(leader!=NULL)时,才需要无限期等待,因为你如果等待了,那么自然我也需要等待,因为你必然是等待他过期的,而我后进来的,自然也要与你一样等待他过期,但是你必然是等待有限的时间,会使得消费,所以我自然不能也等待有限的时间使得也会消费(使得消费同一个),所以我需要无限的时间
put的实现:
public void put(E e) {offer(e);}public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e); //元素放入二叉堆/*如果放进去的元素刚好在堆顶,说明放入的元素延迟时间最小,需要通知等待的线程否则放入的元素若不在堆顶,没有必要通知等待的线程,因为他必然会消费掉,不需要持续通知的,即在边界通知(前面说明过了)*/if (q.peek() == e) { leader = null;available.signal();}return true; //一般来说,return后,如果没有执行unlock,那么锁还是没有释放的,这样会影响后面的操作,所以需要finally使得在退出或者跳出后执行,来进行释放锁} finally {lock.unlock();}}
注意:不是每放入一个元素,都需要通知等待的线程,放入的元素,如果其延迟时间大于当前堆顶的元素延迟 时间
就没必要通知等待的线程,只有当延迟时间是最小的,在堆顶时,才有必要通知等待的线程
也就是上⾯代 码中的 部分if (q.peek() == e) { ,即边界通知
SynchronousQueue:
SynchronousQueue是一种特殊的BlockingQueue,之所以特殊,是它本身没有容量(因为是链表(实现),可以基本可以无限的增加或者改变容量),一般没有容量的基本都是特殊的阻塞队列,先调put(…),线程会阻塞(不是锁,而是CAS方式来操作的,看代码就知道了),直到另外一 个线程调用了take(),两个线程才同时解锁,反之亦然
对于多个线程而言,例如3个线程,调用3次put(…),3个线 程都会阻塞,直到另外的线程调用3次take(),6个线程才同时解锁,反之亦然
接下来看SynchronousQueue的实现
构造方法:
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable { public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();//三元运算在=之前,==之后,这里是=,所以我们赋值了transferer}public SynchronousQueue() {this(false);}
和锁一样,也有公平和⾮公平模式,如果是公平模式,则用TransferQueue实现,如果是⾮公平模式,则用TransferStack实现
没有指定的话,默认是false,而false代表非公平的模式
这两个类分别是什么呢?先看一下put/take的实现
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();if (transferer.transfer(e, false, 0) == null) {Thread.interrupted();throw new InterruptedException();}}public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();}
//很明显,具体的主要代码就是其对应的transfer方法,在后面会说明的
可以看到,put/take都调用了transfer(…)方法,而TransferQueue和TransferStack分别实现了这个方法
该 接口方法在SynchronousQueue内部,如下所示,如果是put(…),则第1个参数就是对应的元素,如果是take(),则第1个参数为null,否则(put)代表传递的值
而后2个参数他们两个分别为是否设置超时(false代表没有)和对应的超时时间(0代表没有超时时间,一般是纳秒的单位,1 纳秒=0.000000001 秒=0.000001毫秒=0.001微秒)
abstract static class Transferer<E> { //被TransferQueue和TransferStack继承了,并实现了对应的方法/*** Performs a put or take.** @param e if non-null, the item to be handed to a consumer;*          if null, requests that transfer return an item*          offered by producer.* @param timed if this operation should timeout* @param nanos the timeout, in nanoseconds* @return if non-null, the item provided or received; if null,*         the operation failed due to timeout or interrupt --*         the caller can distinguish which of these occurred*         by checking Thread.interrupted.*/abstract E transfer(E e, boolean timed, long nanos);}
接下来看一下什么是公平模式和⾮公平模式,假设3个线程分别调用了put(…),并且3个线程会进入阻塞状态,直到 其他线程调用3次take(),他们会和和3个put(…) 一 一 (依次,都配对的意思,虽然意思不同,但这里我们认为相同)配对
如果是公平模式(队列模式),则第1个调用put(…)的线程1会在队列头部,第1个到来的take()线程和它进行配 对,遵循先到先配对的原则,所以是公平的,如果是⾮公平模式(栈模式),则第3个调用put(…)的线程3会在栈 顶,第1个到来的take()线程和它进行配对,遵循的是后到先配对的原则,所以是⾮公平的,我们可以认为他们都与第一个take先配对,即put利用主要的模式的,而take固定,但无论是公平的还是非公平的最终的结果基本是一样的,所以是否设置与实际操作没有什么影响,以后的该说明只是提一下,就不进行具体说明了
至于他们配对后的结果是如何操作的,主要看他们自身的transfer方法了

在这里插入图片描述

下⾯分别看一下TransferQueue和TransferStack的实现:
TransferQueue(公平的):
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {// ...static final class TransferQueue<E> extends Transferer<E> {static final class QNode { //链表形式的volatile QNode next;volatile Object item;volatile Thread waiter;final boolean isData;//...}transient volatile QNode head;transient volatile QNode tail;// ...}
}
从上⾯的代码可以看出,TransferQueue是一个基于单向链表而实现的队列,通过head和tail 2个指针记录头 部和尾部,初始的时候,head和tail会指向一个空节点,构造方法如下所示
TransferQueue() {QNode h = new QNode(null, false); // initialize to dummy node.head = h;tail = h;}
以三个线程为例,流程是这样:
阶段(a):队列中是一个空的节点,head/tail都指向这个空节点
阶段(b):3个线程分别调用put,⽣成3个QNode,进入队列
阶段(c):来了一个线程调用take,会和队列头部的第1个QNode进行配对
阶段(d):第1个QNode出队列

在这里插入图片描述

这里有一个关键点:put节点和take节点一旦相遇,就会配对出队列,所以在队列中不可能同时存在put节点和take节点,要么所有节点都是put节点,要么所有节点都是take节点(虽然我们认为是put来配对的,实际上只是同一个状态而已,在后面的源码中可以知道,就是true和false的匹配,即isData,所以无论如果是消费还是生产,都只是对应的数据状态而已,当都匹配成功后,才会移除,所以你也可以先消费来确定状态,或者先生产来确定状态)
接下来看一下TransferQueue的代码实现:
 E transfer(E e, boolean timed, long nanos) {QNode s = null; // constructed/reused as neededboolean isData = (e != null);for (;;) {QNode t = tail;QNode h = head; //队列还未初始化,自旋等待if (t == null || h == null)         // saw uninitialized valuecontinue;                       // spin//队列为空或者当前线程和队列中的元素为同一形式,那么进入(一般Data代表是谁,通常true代表生产者,false代表消费者,即他们进行匹配)if (h == t || t.isData == isData) { // empty or same-modeQNode tn = t.next;//不一致读,重写执行for循环,需要保证赋值成功哦if (t != tail)                  // inconsistent readcontinue;if (tn != null) {               // lagging tailadvanceTail(t, tn);continue;}if (timed && nanos <= 0L)       // can't waitreturn null;if (s == null)//新建一个节点s = new QNode(e, isData);//加入尾部if (!t.casNext(null, s))        // failed to link incontinue;//后移tail指针advanceTail(t, s);              // swing tail and wait//进行阻塞状态,等待匹配Object x = awaitFulfill(s, e, timed, nanos);if (x == s) {                   // wait was cancelledclean(t, s);return null;}//从阻塞中唤醒,确定已经处于队列中的第一个元素if (!s.isOffList()) {           // not already unlinkedadvanceHead(t, s);          // unlink if headif (x != null)              // and forget fieldss.item = s;s.waiter = null;}return (x != null) ? (E)x : e;//当前线程可以和队列中的第一个元素进行配对} else {                            // complementary-mode//取队列中的第一个元素QNode m = h.next;               // node to fulfill//不一致读,重写执行for循环if (t != tail || m == null || h != head)continue;                   // inconsistent readObject x = m.item;//已经配对还是没有if (isData == (x != null) ||    // m already fulfilledx == m ||                   // m cancelled//尝试配对!m.casItem(x, e)) {         // lost CAS//已经配对直接出队列advanceHead(h, m);          // dequeue and retrycontinue;}//配对成功,出队列advanceHead(h, m);              // successfully fulfilled//唤醒队列中的第一个元素对应的线程LockSupport.unpark(m.waiter);return (x != null) ? (E)x : e;}}}
整个 for 循环有两个大的 if-else 分⽀,如果当前线程和队列中的元素是同一种模式(都是put节点或者take节 点),则与当前线程对应的节点被加入队列尾部并且阻塞,如果不是同一种模式,则选取队列头部的第1个元素进 行配对
这里的配对就是m.casItem(x,e),把自己的item x换成对方的item e,如果CAS操作成功,则配对成功
如果是put节点,则isData=true,item!=null,如果是take节点,则isData=false,item=null,如果CAS操作不成 功,则isData和item之间将不一致,也就是isData!=(x!=null),使得为false了,通过这个条件可以判断节点是否已经被匹配 过了
TransferStack:
TransferStack的定义如下所示,⾸先,它也是一个单向链表,不同于队列,只需要head指针就能实现入栈和 出栈操作
static final class TransferStack extends Transferer { //好像只有空的构造,且什么都没有操作static final int REQUEST = 0;static final int DATA = 1;static final int FULFILLING = 2;static final class SNode {volatile SNode next; // 单向链表volatile SNode match; // 配对的节点volatile Thread waiter; // 对应的阻塞线程Object item;int mode; // 三种模式//...}volatile SNode head;
}
链表中的节点有三种状态,REQUEST对应take节点,DATA对应put节点,⼆者配对之后,会⽣成一个FULFILLING节点,入栈,然后FULLING节点和被配对的节点一起出栈
阶段(a):head指向NULL,不同于TransferQueue,这里没有空的头节点
阶段(b):3个线程调用3次put,依次入栈
阶段(c):线程4调用take,和栈顶的第1个元素配对,⽣成FULLFILLING节点,入栈
阶段(d):栈顶的2个元素同时出栈

在这里插入图片描述

下⾯看一下具体的代码实现:
 E transfer(E e, boolean timed, long nanos) {SNode s = null; // constructed/reused as neededint mode = (e == null) ? REQUEST : DATA;for (;;) {SNode h = head;//同一种模式,与前面的true和false一样的来表示是消费者还是生产者DATA是生产者,REQUEST是消费者if (h == null || h.mode == mode) {  // empty or same-modeif (timed && nanos <= 0L) {     // can't waitif (h != null && h.isCancelled())casHead(h, h.next);     // pop cancelled nodeelsereturn null;//入栈} else if (casHead(h, s = snode(s, e, h, mode))) {//阻塞等待SNode m = awaitFulfill(s, timed, nanos);if (m == s) {               // wait was cancelledclean(s);return null;}if ((h = head) != null && h.next == s)casHead(h, s.next);     // help s's fulfillerreturn (E) ((mode == REQUEST) ? m.item : s.item);}//非同一种模式,待匹配} else if (!isFulfilling(h.mode)) { // try to fulfillif (h.isCancelled())            // already cancelledcasHead(h, h.next);         // pop and retry//生产一个FUIFILLING节点,入栈else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) { // loop until matched or waiters disappearSNode m = s.next;       // m is s's matchif (m == null) {        // all waiters are gonecasHead(s, null);   // pop fulfill nodes = null;           // use new node next timebreak;              // restart main loop}SNode mn = m.next;if (m.tryMatch(s)) {//两个节点一起出栈,或者说匹配,与之前的!m.casItem(x, e)) { 类似casHead(s, mn);     // pop both s and mreturn (E) ((mode == REQUEST) ? m.item : s.item);} else                  // lost matchs.casNext(m, mn);   // help unlink}}//已经匹配过了(或者说创建FUIFILLING节点过了),出栈,一般用来完成或者解决对应的生产一个FUIFILLING节点后的break,使得不用再创建了} else {                            // help a fulfillerSNode m = h.next;               // m is h's matchif (m == null)                  // waiter is gonecasHead(h, null);           // pop fulfilling nodeelse {SNode mn = m.next;if (m.tryMatch(h))          // help match//配对,一起出栈casHead(h, mn);         // pop both h and melse                        // lost matchh.casNext(m, mn);       // help unlink}}}}
当然,他们的匹配都是各自的作用,一个先放先匹配,一个后放先匹配
很明显,一个线程只有配对后,才可做自己的事情,虽然他比较安全,但效率还是低的,因为不能做其他的事情(虽然其他的事情本来就不需要做)
BlockingDeque :
BlockingDeque定义了一个阻塞的双端队列接口,如下所示
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {void putFirst(E e) throws InterruptedException;void putLast(E e) throws InterruptedException;E takeFirst() throws InterruptedException;E takeLast() throws InterruptedException;// ...}
该接口继承了BlockingQueue接口,同时增加了对应的双端队列操作接口,该接口只有一个实现类(而不像BlockingQueue一样有多个,虽然前面只是大致的说明,并没有都给出来),就是LinkedBlockingDeque
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { //可以点击接口或者类的地方,按下ctrl+alt+B,可以知道实现或者继承他的所有接口(继承他,因为接口不能实现接口)或者类(实现或者继承他)static final class Node<E> {E item;Node<E> prev; // 双向链表的NodeNode<E> next;Node(E x) {item = x;}}transient Node<E> first; // 队列的头和尾transient Node<E> last;private transient int count; // 元素个数(一般是小于等于容量的)private final int capacity; // 容量// 一把锁+两个条件final ReentrantLock lock = new ReentrantLock();private final Condition notEmpty = lock.netCondition();private final Condition notFull = lock.newCondition();// ...}
对应的实现原理,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表
 public E takeFirst() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock();try {E x;while ( (x = unlinkFirst()) == null)notEmpty.await();return x;} finally {lock.unlock();}}public E takeLast() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock();try {E x;while ( (x = unlinkLast()) == null)notEmpty.await();return x;} finally {lock.unlock();}}public void putFirst(E e) throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {while (!linkFirst(node))notFull.await();} finally {lock.unlock();}}public void putLast(E e) throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {while (!linkLast(node))notFull.await();} finally {lock.unlock();}}//上面就是操作两个方向的消费和生产,所以就不多说了
CopyOnWrite:
CopyOnWrite指在"写"的时候,不是直接"写"源数据,而是把数据拷⻉一份进行修改,再通过悲观锁或者乐观 锁的方式写回
那为什么不直接修改,而是要拷⻉一份修改呢? 这是为了在"读"的时候不加锁(这样,就不会使得你在写操作时,出现改变原来的值,使得读不准确了,而是直接的给出结果,所以读可以不加锁了,因为该结果必然是原来的或者修改好的,所以不用加锁,而不会是中间操作的部分)
CopyOnWriteArrayList:
和ArrayList一样,CopyOnWriteArrayList的核⼼数据结构也是一个数组,代码如下:
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {// ...private volatile transient Object[] array;
}
下⾯是CopyOnArrayList的⼏个"读"方法:
final Object[] getArray() {return array;}// public E get(int index) {return elementAt(getArray(), index);}public boolean isEmpty() {return size() == 0;}public boolean contains(Object o) {return indexOf(o) >= 0;}public int indexOf(Object o) {Object[] es = getArray();return indexOfRange(o, es, 0, es.length);}private static int indexOfRange(Object o, Object[] es, int from, int to) {if (o == null) {for (int i = from; i < to; i++)if (es[i] == null)return i;} else {for (int i = from; i < to; i++)if (o.equals(es[i]))return i;}return -1;}
既然这些"读"方法都没有加锁,那么是如何保证"线程安全"呢?,答案在"写"方法里⾯
public class CopyOnWriteArrayList<E>implements List<E>, RandomAccess, Cloneable, java.io.Serializable {// 锁对象final transient Object lock = new Object();public boolean add(E e) {synchronized (lock) { // 同步锁对象Object[] es = getArray();int len = es.length;es = Arrays.copyOf(es, len + 1); // CopyOnWrite,写的时候,先拷⻉一份之前的数组es[len] = e;setArray(es);return true;}}public void add(int index, E element) {synchronized (lock) { // 同步锁对象Object[] es = getArray();int len = es.length;if (index > len || index < 0)throw new IndexOutOfBoundsException(outOfBounds(index, len));Object[] newElements;int numMoved = len - index;if (numMoved == 0)newElements = Arrays.copyOf(es, len + 1);else {newElements = new Object[len + 1];System.arraycopy(es, 0, newElements, 0, index); // CopyOnWrite,写的时候,先拷⻉一份之前的数组System.arraycopy(es, index, newElements, index + 1,numMoved);}newElements[index] = element;setArray(newElements); // 把新数组赋值给⽼数组}}
其他"写"方法(即写操作,如增删改),例如remove和add类似,此处不再详述
CopyOnWriteArraySet :
CopyOnWriteArraySet 就是用 Array 实现的一个 Set,保证所有元素都不重复(也就是加上了判断,使得若是重复的那么覆盖或者不添加等等,一般是覆盖,而不是不添加),其内部是封装的一个CopyOnWriteArrayList
public class CopyOnWriteArraySet<E> extends AbstractSet<E> implements java.io.Serializable {// 新封装的CopyOnWriteArrayListprivate final CopyOnWriteArrayList<E> al;public CopyOnWriteArraySet() {al = new CopyOnWriteArrayList<E>();}public boolean add(E e) {return al.addIfAbsent(e); // 不重复的加进去}
}
ConcurrentLinkedQueue/Deque (Deque:双队列,一般代表双向的操作,由于基本相同,所以这里就只给出Queue的案例了,因为他们大致相同),这个了解即可
AQS内部的阻塞队列实现原理:一般(之所以是一般,因为有ConcurrentLinkedQueue,而他是单向列表 )基于双向链表,通过对head/tail进行CAS操作,实现入队和出队
ConcurrentLinkedQueue 的实现原理和AQS 内部的阻塞队列类似:同样是基于 CAS,同样是通过head/tail指 针记录队列头部和尾部,但还是有稍许差别
⾸先,它是一个单向链表,定义如下:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {private static class Node<E> {volatile E item;volatile Node<E> next;//...}private transient volatile Node<E> head;private transient volatile Node<E> tail;//...}
其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置,每次出队,head一定后移一个位置,以保 证head指向队列头部,tail指向链表尾部,因为出队是头出队,入队是尾入队
但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直接对 head/tail指针进行 CAS操作的,而是对 Node中的 item进行操作,下⾯进行详细分析:
初始化:
初始的时候, head 和 tail 都指向一个 null 节点,对应的代码如下:
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null); //可能你的没有null
}

在这里插入图片描述

入队列:
代码如下所示:
 public boolean offer(E e) {final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));for (Node<E> t = tail, p = t;;) { //可以这样的后面得到前面的这样的操作的赋值,也能单纯的定义Node<E> q = p.next;if (q == null) {// p is last node//对tail的next指针执行CAS操作,而不是对tail指针执行CAS操作if (NEXT.compareAndSet(p, null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this queue,// and for newNode to become "live".//每入队两个节点,后移一次tail指针,失败也无所谓的,因为CAS并不会一定会移动到最后的,我们高并发下,可能操作了同一操作,使得只会移动一次,而不是两次,即这样就是失败,但是我们继续移动即可if (p != t) // hop two nodes at a time; failure is OKTAIL.weakCompareAndSet(this, t, newNode);return true;}// Lost CAS race to another thread; re-read next}else if (p == q)// We have fallen off list.  If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable.  Else the new tail is a better bet.//已经到达队列的尾部p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops.//后移p指针p = (p != t && t != (t = tail)) ? t : q;}}
上⾯的入队其实是每次在队尾追加2个节点时,才移动一次tail节点,如下图所示:
初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:
step1:p=tail,q=p.next=NULL
step2:对p的next执行CAS操作,追加item2,成功之后,由于p=tail,所以上⾯的if (p != t) 里面的代码不会执行,直接返 回,此时tail指针没有变化

在这里插入图片描述

之后,假设线程2要入队item3节点,如下图所示:
step3:p=tail,q=p.next
step4:q!=NULL,因此不会入队新节点,p,q都后移1位
step5:q=NULL,对p的next执行CAS操作,入队item3节点
step6:p!=t,满⾜条件,执行上⾯的对应操作,tail后移2个位置,到达队列尾部

在这里插入图片描述

最后总结一下入队列的两个关键点:
1:即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列
2:只有当 p != tail的时候,才会后移tail指针,也就是说,每连续追加2个节点,才后移1次tail指针(他的这次是直接的到尾部,也就是移动两个位置),实际上即使CAS失败也没关系,因为可以由下1个线程来移动tail指针,因为p!=t
出队列:
上⾯说了入队列之后,tail指针不变化,那是否会出现入队列之后,要出队列却没有元素可出的情况呢?
public E poll() {restartFromHead: for (;;) {for (Node<E> h = head, p = h, q;; p = q) { //这个q是单纯的定义的final E item;//注意:在出队列的时候,并没有移动head指针,而是把item设置为nullif ((item = p.item) != null && p.casItem(item, null)) {// Successful CAS is the linearization point// for item to be removed from this queue.if (p != h) // hop two nodes at a time//每产生2个null节点,才把head指针后移2位updateHead(h, ((q = p.next) != null) ? q : p);return item;}else if ((q = p.next) == null) {updateHead(h, p);return null;}else if (p == q)continue restartFromHead; //结束该循环,但是,他是结束对应的循环的,也就是说,无视层级的,与break outer是基本一样的,具体在第9章博客有说明}}}
出队列的代码和入队列类似,也有p、q2个指针,整个变化过程如下图所示,假设初始的时候head指向空节 点,队列中有item1、item2、item3 三个节点
step1:p=head,q=p.next,p!=q
step2:后移p指针,使得p=q
step3:出队列,关键点:此处并没有直接删除item1节点,只是把该节点的item通过CAS操作置为了NULL
step4:p!=head,此时队列中有了2个 NULL 节点,再前移1次head指针,对其执行updateHead操作

在这里插入图片描述

最后总结一下出队列的关键点:
1:出队列的判断并⾮观察 tail 指针的位置,而是依赖于 head 指针后续的节点是否为NULL这一条件
2:只要对节点的item执行CAS操作,置为NULL成功,则出队列成功,即使head指针没有成功移动,也可以 由下1个线程继续完成
队列判空:
因为head/tail 并不是精确地指向队列头部和尾部,所以不能简单地通过比较 head/tail 指针来判断队列是否为 空,而是需要从head指针开始遍历,找第1个不为NULL的节点,如果找到,则队列不为空;如果找不到,则队列 为空,代码如下所示:
 public boolean isEmpty() {//寻找第一个不是null的节点return first() == null;}Node<E> first() {restartFromHead: for (;;) {//从head指针开始遍历,查找第一个不是null的节点的情况for (Node<E> h = head, p = h, q;; p = q) {boolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) {updateHead(h, p);return hasItem ? p : null;}else if (p == q)continue restartFromHead;}}}
ConcurrentHashMap :
HashMap通常的实现方式是"数组+链表",这种方式被称为"拉链法"
ConcurrentHashMap在这个基本原理之 上进行了各种优化
⾸先是所有数据都放在一个大的HashMap中,其次是引入了红⿊树
其原理如下图所示:

在这里插入图片描述

如果头节点是Node类型,则尾随它的就是一个普通的链表,如果头节点是TreeNode类型,它的后⾯就是一颗 红⿊树,TreeNode是Node的子类
链表和红⿊树之间可以相互转换:初始的时候是链表,当链表中的元素超过某个阈值时,把链表转换成红⿊ 树,反之,当红⿊树中的元素个数小于某个阈值时,再转换为链表
那为什么要做这种设计呢?
1:使用红⿊树,当一个槽里有很多元素时,其查询和更新速度会比链表快很多,Hash冲突导致链表变多使得不好查找的问题由此得到 较好的解决
2:加锁的粒度,并⾮整个ConcurrentHashMap,而是对每个头节点分别加锁,即并发度,就是Node数组 的长度,初始长度一般为16(HashMap默认也是16)
3:并发扩容,这是难度最大的,当一个线程要扩容Node数组的时候,其他线程还要读写,因此处理过程很 复杂,后⾯会详细分析
由上述对比可以总结出来:这种设计一方⾯降低了Hash冲突查找等等问题,另一方⾯也提升了并发度
下⾯从构造方法开始,一步步深入分析其实现过程
构造方法分析:
public ConcurrentHashMap() {}public ConcurrentHashMap(int initialCapacity) {this(initialCapacity, LOAD_FACTOR, 1);}public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel)   // Use at least as many binsinitialCapacity = concurrencyLevel;   // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;}
在上⾯的代码中,变量cap就是Node数组的长度,保持为2的整数次方,tableSizeFor(…)方法是根据传入的初 始容量,计算出一个合适的数组长度,具体而言:1.5倍的初始容量+1,再往上取最接近的2的整数次方,作为数组 长度cap的初始值
这里的 sizeCtl,其含义是用于控制在初始化或者并发扩容时候的线程数,只不过其初始值设置成cap
初始化:
在上⾯的构造方法里只计算了数组的初始大小,并没有对数组进行初始化,当多个线程都往里⾯放入元素的时 候,再进行初始化,这就存在一个问题:多个线程重复初始化,下⾯看一下是如何处理的
private final Node<K,V>[] initTable() { //后面put中分支1就操作了这个地方Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)//自旋等待Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSetInt(this, SIZECTL, sc, -1)) { //重点:将sizectl设置为-1,CAS操作//代表准备初始化,后面还会继续说明sizectl的//注意:CAS操作并非一定是不能多个线程的,只是在没有高的并发下基本不会出现问题而已,但是一般来说CAS操作的中间是互斥的,所以一般情况下,CAS就算在高并发下也不会出现问题,除非对应的操作没有互斥或者存在没有互斥的可能性,在77章博客说明过乐观锁的唯一性,那么这个唯一性就是是否互斥的问题,若唯一,基本不会出现问题,否则就会try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //初始化table = tab = nt;//sizectl不是数组长度,因此初始化成功后,就不再等于数组长度//而是n- ( n >>>2 )=0.75n,表示下一次的扩容的阈值:n-n/4sc = n - (n >>> 2);}} finally {sizeCtl = sc; //设置sizectl的值为sc}break;}}return tab;}
通过上⾯的代码可以看到,多个线程的竞争是通过对sizeCtl进行CAS操作实现的,如果某个线程成功地把 sizeCtl 设置为-1,它就拥有了初始化的权利,进入初始化的代码模块,等到初始化完成,再把sizeCtl设置回去,其 他线程则一直执行while循环,自旋等待,直到数组不为null,即当初始化结束时,退出整个方法
因为初始化的工作量很小,所以此处选择的策略是让其他线程一直等待,而没有帮助其初始化
put(…)实现分析:
public V put(K key, V value) {return putVal(key, value, false);}final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh; K fk; V fv;//分支1:整个数组初始化if (tab == null || (n = tab.length) == 0)tab = initTable(); //上面的操作//分支2:第i个元素初始化,槽为空else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))break;                   // no lock when adding to empty bin}//分支3:扩容else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else if (onlyIfAbsent // check first node without acquiring lock&& fh == hash&& ((fk = f.key) == key || (fk != null && key.equals(fk)))&& (fv = f.val) != null)return fv;//分支4:放入元素else {V oldVal = null;//重点:加锁synchronized (f) {//链表if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key, value);break;}}}//红黑树else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}else if (f instanceof ReservationNode)throw new IllegalStateException("Recursive update");}}//如果是链表,上面的binCount会一直累加if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD) //这个默认是8,因为是从0开始的,所以这个>=相当于大于8,因为0,1,2,3,4,5,6,7就是8个,所以我们才会说,大于8扩容,同理,HashMap基本也是这样的,但他却是binCount >= TREEIFY_THRESHOLD - 1,只是他是先加元素的,而不是后加元素,所以还是大于8变红黑树的意思//虽然他们的判断不是直接的大于8(没有等于)treeifyBin(tab, i); //超出阈值,转换为红黑树if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount); //总元素个数累加1return null;}static final int TREEIFY_THRESHOLD = 8; //上面的默认是8
上⾯的for循环有4个大的分⽀:
第1个分⽀,是整个数组的初始化,前⾯已讲
第2个分⽀,是所在的槽为空,说明该元素是该槽的第一个元素,直接新建一个头节点,然后返回
第3个分⽀,说明该槽正在进行扩容,帮助其扩容
第4个分⽀,就是把元素放入槽内,槽内可能是一个链表,也可能是一棵红⿊树,通过头节点的类型可以判断 是哪一种,第4个分⽀是包裹在synchronized (f)里⾯的,f对应的数组下标位置的头节点,意味着每个数组元素 有一把锁,并发度等于数组的长度
上⾯的binCount表示链表的元素个数,当这个数⽬超过TREEIFY_THRESHOLD=8时,把链表转换成红⿊树,也 就是 treeifyBin(tab,i)方法,但在这个方法内部,不一定需要进行红⿊树转换,可能只做扩容操作,所以接下 来从扩容讲起
扩容:
扩容的实现是最复杂的,下⾯从treeifyBin(Node[] tab, int index)讲起:
 private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n;if (tab != null) {if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//数组长度小于阈值64,不做红黑树转换,直接扩容tryPresize(n << 1);else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {//链表转换为红黑树synchronized (b) {if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;//遍历链表,初始化红黑树for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}}static final int MIN_TREEIFY_CAPACITY = 64;
在上⾯的代码中,MIN_TREEIFY_CAPACITY=64,意味着当数组的长度没有超过64的时候,数组的每个节点里 都是链表,只会扩容,不会转换成红⿊树,只有当数组长度大于或等于64时,才考虑把链表转换成红⿊树(实际上HashMap也是这样,这里进行补充),那么为什么这样呢,这是因为总数据太少了,单纯的操作可以进行处理(扩容处理),而只有总数据变大时,变成红黑树效率才会明显提高,而64就是对应的操作系统的一个阈值(一般操作系统都是64位,所以是64),刚好需要多操作了,所以这时才会变成红黑树来进行提高效率
static final int MIN_TREEIFY_CAPACITY = 64;
在 tryPresize(int size)内部调用了一个核⼼方法 transfer(Node<K,V>[] tab,Node<K,V>[] nextTab),先从这个方法的分析说起:
//我们来看看他的扩容吧private final void tryPresize(int size) {int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}else if (c <= sc || n >= MAXIMUM_CAPACITY)break;else if (tab == table) {int rs = resizeStamp(n);if (U.compareAndSetInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null); //我们主要看这个}}}private static final int MIN_TRANSFER_STRIDE = 16;
//在持久化对象时,对于一些特殊的数据成员(如用户的密码,银行卡号等),我们不想用序列化机制来保存它,为了在一个特定对象的一个成员变量上关闭序列化,可以在这个成员变量前加上关键字transient
private transient volatile int transferIndex;private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)//计算步长,默认是16stride = MIN_TRANSFER_STRIDE; // subdivide range//初始化新的HashMapif (nextTab == null) {            // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //扩容两倍nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;//初始的transferIndex为旧HashMap的数组长度//private transient volatile int transferIndex;transferIndex = n;}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTab// 此处,i为遍历下标,bound为边界// 如果成功获取一个任务,则i=nextIndex-1// bound=nextIndex-stride// 如果获取不到,则i=0,bound=0for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// advance表示在从i=transferIndex-1遍历到bound位置的过程中,是否一直继续while (advance) {int nextIndex, nextBound;// 以下是哪个分⽀中的advance都是false,表示如果三个分⽀都不执行,才可以一直while循环// ⽬的在于当对transferIndex执行CAS操作不成功的时候,需要自旋,以期获取一个stride的迁移任务if (--i >= bound || finishing)//对数组遍历,通过这里的--i进行,如果成功执行了--i,就不需要继续while循环了,因为advance只能进一步advance = false;else if ((nextIndex = transferIndex) <= 0) {// transferIndex <= 0,整个HashMap完成i = -1;advance = false;}// 对transferIndex执行CAS操作,即为当前线程分配1个stride// CAS操作成功,线程成功获取到一个stride的迁移任务// CAS操作不成功,线程没有抢到任务,会继续执行while循环,自旋else if (U.compareAndSetInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}// i越界,整个HashMap遍历完成if (i < 0 || i >= n || i + n >= nextn) {int sc;// finishing表示整个HashMap扩容完成if (finishing) {nextTable = null;// 将nextTab赋值给当前tabletable = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}// tab[i]迁移完毕,赋值一个ForwardingNodeelse if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// tab[i]的位置已经在迁移过程中else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 对tab[i]进行迁移操作,tab[i]可能是一个链表或者红⿊树synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// 链表if (fh >= 0) {int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;// 表示lastRun之后的所有元素,hash值都是一样的// 记录下这个最后的位置lastRun = p;}}if (runBit == 0) {// 链表迁移的优化做法ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}// 红⿊树,迁移做法和链表类似else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}
上面的方法⾮常复杂,下⾯一步步分析:
1:扩容的基本原理如下图,⾸先建一个新的HashMap,其数组长度是旧数组长度的2倍,然后把旧的元素 逐个迁移过来,所以,上⾯的方法参数有2个,第1个参数tab是扩容之前的HashMap,第2个参数nextTab是扩容之后的HashMap,当nextTab=null的时候,方法最初会对nextTab进行初始化,这里有 一个关键点要说明:该方法会被多个线程调用,所以每个线程只是扩容旧的HashMap部分,这就涉及如 何划分任务的问题

在这里插入图片描述

2:上图为多个线程并行扩容-任务划分示意图,旧数组的长度是N,每个线程扩容一段,一段的长度用变量stride(步长)来表示,transferIndex表示了整个数组扩容的进度
stride的计算公式如上⾯的代码所示,即:在单核模式下直接等于n,因为在单核模式下没有办法多个线 程并行扩容,只需要1个线程来扩容整个数组,在多核模式下为 (n>>>3)/NCPU,并且保证步长的 最小值是 16,显然,需要的线程个数约为n/stride
//我们继续看transfer方法里面的如下: 
int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)//计算步长stride = MIN_TRANSFER_STRIDE; // subdivide range//NCPU:核心数量
static final int NCPU = Runtime.getRuntime().availableProcessors(); //我这里代表8个核心
transferIndex是ConcurrentHashMap的一个成员变量,记录了扩容的进度,初始值为n,从大到小扩容,每 次减stride个位置,最终减⾄n<=0,表示整个扩容完成,因此,从[0,transferIndex-1]的位置表示还没有分配到 线程扩容的部分,从[transfexIndex,n-1]的位置表示已经分配给某个线程进行扩容,当前正在扩容中,或者已经 扩容成功
因为transferIndex会被多个线程并发修改,每次减stride,所以需要通过CAS进行操作,如下⾯的代码所示:
  //我们继续看transfer方法里面的如下: 
else if (U.compareAndSetInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}

在这里插入图片描述

待扩容的部分就是准备迁移到新的已经扩容的地方的部分,为了简单的说,所以我们称为待扩容的部分
3:在扩容未完成之前,有的数组下标对应的槽已经迁移到了新的HashMap里⾯,有的还在旧的 HashMap 里⾯,这个时候,所有调用 get(k,v)的线程还是会访问旧 HashMap,怎么处理呢?
下图(后面的图)为扩容过程中的转发示意图:当Node[0]已经迁移成功,而其他Node还在迁移过程中时,如果有线 程要读取Node[0]的数据,就会访问失败或者得到旧数据(一般是失败),为此,新建一个ForwardingNode,即转发节点,在这个节点 里⾯记录的是新的 ConcurrentHashMap 的引用,这样,当线程访问到ForwardingNode之后,会去查 询新的ConcurrentHashMap
4:因为数组的长度 tab.length 是2的整数次方,每次扩容⼜是2倍,而 Hash 函数是hashCode%tab.length,等价于hashCode&(tab.length-1)
这意味着:处于第i个位置的元素,在新 的Hash表的数组中一定处于第i个或者第i+n个位置,如下图所示
举个简单的例子:假设数组长度是8, 扩容之后是16:
若hashCode=5,5%8=0,扩容后,5%16=0,位置保持不变
若hashCode=24,24%8=0,扩容后,24%16=8,后移8个位置
若hashCode=25,25%8=1,扩容后,25%16=9,后移8个位置
若hashCode=39,39%8=7,扩容后,39%8=7,位置保持不变
等等,后面就不说明了

在这里插入图片描述

正因为有这样的规律,所以如下有代码:
//我们继续看transfer方法里面的如下:  
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);
也就是把tab[i]位置的链表或红⿊树重新组装成两部分,一部分链接到nextTab[i]的位置,一部分链接到nextTab[i+n]的位置,如上面所示,然后把tab[i]的位置指向一个ForwardingNode节点
同时,当tab[i]后⾯是链表时,使用类似于JDK 7中在扩容时的优化方法,从lastRun往后的所有节点,不需依 次拷⻉,而是直接链接到新的链表头部,从lastRun往前的所有节点,需要依次拷⻉
了解了核⼼的迁移函数transfer(tab,nextTab),再回头看tryPresize(int size)函数,这个函数的输入是 整个Hash表的元素个数,在函数里⾯,根据需要对整个Hash表进行扩容,想要看明⽩这个函数,需要透彻地理解sizeCtl变量,下⾯这段注释摘自源码

在这里插入图片描述

当sizeCtl=-1时,表示整个HashMap正在初始化
当sizeCtl=某个其他负数时,表示多个线程在对HashMap做并发扩容
当sizeCtl=cap时,tab=null,表示未初始之前的初始容量(如上⾯的构造函数所示)
扩容成功之后,sizeCtl存储的是下一次要扩容的阈值,即上⾯初始化代码中的n-(n>>>2)=0.75n
所以,sizeCtl变量在Hash表处于不同状态时,表达不同的含义,明⽩了这个道理,再来看上⾯的tryPresize(int size)函数
private final void tryPresize(int size) {int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}else if (c <= sc || n >= MAXIMUM_CAPACITY)break;else if (tab == table) {int rs = resizeStamp(n);if (U.compareAndSetInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}}
tryPresize(int size)是根据期望的元素个数对整个Hash表进行扩容,核⼼是调用transfer函数,在第一次扩 容的时候,sizeCtl会被设置成一个很大的负数U.compareAndSwapInt(this,SIZECTL,sc,(rs << RESIZE_STAMP_SHIFT)+2),之后每一个线程扩容的时候,sizeCtl 就加 1,相当于U.compareAndSwapInt(this,SIZECTL,sc,sc+1),待扩容完成之后,sizeCtl减1
ConcurrentSkipListMap/Set:
ConcurrentHashMap 是一种 key 无序的 HashMap,ConcurrentSkipListMap则是 key 有序的,实现了NavigableMap接口,此接口⼜继承了SortedMap接口
ConcurrentSkipListMap :
为什么要使用SkipList实现Map?
在Java的util包中,有一个⾮线程安全的HashMap,也就是TreeMap,是key有序的,基于红⿊树实现,而在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查 表)来实现的,这里为什么不用红⿊树,而用跳查表来实现呢?
借用Doug Lea(开发了ConcurrentSkipListMap的人)的原话:
The reason is that there are no known efficient lock0free insertion and deletion 
algorithms for search trees./*翻译:原因是没有已知的有效锁(认为可以无锁),使得自由插入和删除搜索树的算法*/
也就是⽬前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法,那为什么SkipList可以无锁地实现节点的增加、删除呢?这要从无锁链表的实现说起
无锁链表:
在前⾯讲解类似使用了AQS的类时,曾反复用到无锁队列(也就是ConcurrentLinkedQueue,因为没有使用具体的锁,而是使用CAS的无锁方式,所以简称为无锁,虽然实际上CAS也可能操作了锁,但他通常只是操作原子变量,就算他操作了锁,但是锁的实现比其他的直接的锁要效率大,所以无锁的真正说明是,没有直接的使用锁,在后面会具体说明的),其实现也是链表,究竟⼆者的区别在哪呢?
前⾯讲的无锁队列、栈(TransferStack,栈的说明,因为操作栈了,所以认为是栈,当然说成队列也行,反正他们基本都是通过链表或者数组实现的),都是只在队头或者队尾进行CAS操作,通常不会有问题,如果在链表的中间进行插入或 删除操作,按照通常的CAS做法,就会出现问题(因为插入和删除并不能只使用一个CAS来进行搞定,后面会说明为什么不只使用一个CAS)
关于这个问题,Doug Lea的论文中有清晰的论述,此处引用如下:
操作1:在节点10后⾯插入节点20,如下图所示,⾸先把节点20的next指针指向节点30,然后对节点10的next指针执行CAS操作(因为其他人不能操作哦,因为这是本来保存的数据,特别是查询,所以之前的操作两端也是这样),使其指向节点20即可

在这里插入图片描述

操作2:删除节点10,如下图所示,只需把头节点的next指针,进行CAS操作到节点30即可

在这里插入图片描述

但是,如果两个线程同时操作,一个删除节点10,一个要在节点10后⾯插入节点20,并且这两个操作都各自 是CAS的(不是同一个,因为不同的操作自然不会是同一个CAS,虽然也行,但效率会大大减低的,因为其中的操作竟然受其他操作影响),此时可能就会出现问题,如下图所示,删除节点10,会同时把新插入的节点20也删除掉(因为这个时候,可能头节点指向30了),这个问题超出了CAS的解决范围

在这里插入图片描述

为什么会出现这个问题呢?
究其原因:在删除节点10的时候,实际受到操作的是节点10的前驱,也就是头节点,他认为没有20添加进来(这个时候还没有因为插入而改变10的指向),那么就是指向30,即认为节点10本身没有任何变 化,故而,再往节点10后插入节点20的线程(认为后操作),并不知道节点10已经被删除了(他可以指向,是因为我们只是移动头节点), 针对这个问题,很明显,主要是插入操作还没有完全操作完,删除已经操作完了,导致插入出现问题,因为若插入操作完毕了,那么删除是不会出错的,所以这里是主要的错误(因为删除是直接操作完毕的,即直接的改变头指向,指向到当前的下一个,所以基本只有这个主要的问题,即插入没有操作完,即10还没有改变指向,变成你从指向30到指向20),在论文中提出了如下的解决办法,如下图所示,把节点 10 的删除分为两2步:
第一步,把节点10的next指针,mark成删除(mark在后面会说明),即软删除
第⼆步,找机会,物理删除
做标记之后,当线程再往节点10后⾯插入节点20的时候,便可以先进行判断,节点10是否已经被删除,从而 避免在一个删除的节点10后⾯插入节点20,这个解决方法有一个关键点:"把节点10的next指针指向节点20(插入 操作)“和"判断节点10本身是否已经删除(判断操作)”,必须是原子的,必须在1个CAS操作里⾯完成(因为总不能已经插入了,然后判断吧),然后因为是不同的CAS所以我们需要考虑找机会真正的删除(即找机会移动头节点,来进行保证删除操作,当然,这个机会可以认为是最后的判断删除,或者移动,这里就不多说了)

在这里插入图片描述

具体的实现有两个办法:
办法一:AtomicMarkableReference
保证每个 next 是 AtomicMarkableReference 类型,但这个办法不够高效,Doug Lea 在ConcurrentSkipListMap的实现中用了另一种办法
办法2:Mark节点
我们的⽬的是标记节点10已经删除,也就是标记它的next字段,那么可以新造一个marker节点,使节点10的next指针指向该Marker节点,这样,当向节点10的后⾯插入节点20的时候,就可以在插入的同时判断节点10的next指针是否指向了一个Marker节点,这两个操作可以在一个CAS操作里⾯完成
跳查表(也就是跳跃表,在99章博客也说明过了):
解决了无锁链表的插入或删除问题,也就解决了跳查表的一个关键问题,因为跳查表就是多层链表叠起来的
下⾯先看一下跳查表的数据结构(下⾯所用代码都引用自JDK 7,JDK 8中的代码略有差异,但不影响下⾯的原 理分析)
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {//..
static final class Node<K,V> {final K key; // currently, never detachedV val;Node<K,V> next;Node(K key, V value, Node<K,V> next) {this.key = key;this.val = value;this.next = next;}}
上面中的Node就是跳查表底层节点类型,所有的对都是由这个单向链表串起来的,然后是Index层的节点:
static final class Index<K,V> {final Node<K,V> node;  // currently, never detachedfinal Index<K,V> down;Index<K,V> right;Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {this.node = node;this.down = down;this.right = right;}}
上面中的node属性不存储实际数据,一般是指向Node节点
down属性:每个Index节点,必须有一个指针,指向其下一个Level对应的节点
right属性:Index也组成单向链表
整个ConcurrentSkipListMap就只需要记录顶层的head节点即可:
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {// ...private transient Index<K,V> head;// ...}

在这里插入图片描述

下⾯详细分析如何从跳查表上查找、插入和删除元素
put实现分析:
  public V put(K key, V value) {if (value == null)throw new NullPointerException();return doPut(key, value, false);}private V doPut(K key, V value, boolean onlyIfAbsent) {if (key == null)throw new NullPointerException();Comparator<? super K> cmp = comparator;for (;;) {Index<K,V> h; Node<K,V> b;VarHandle.acquireFence();int levels = 0;                    // number of levels descended//初始化if ((h = head) == null) {          // try to initializeNode<K,V> base = new Node<K,V>(null, null, null);h = new Index<K,V>(base, null, null);b = (HEAD.compareAndSet(this, null, h)) ? base : null;}else {for (Index<K,V> q = h, r, d;;) { // count while descendingwhile ((r = q.right) != null) {Node<K,V> p; K k;if ((p = r.node) == null || (k = p.key) == null ||p.val == null)RIGHT.compareAndSet(q, r, r.right);else if (cpr(cmp, key, k) > 0)q = r;elsebreak;}if ((d = q.down) != null) {++levels;q = d;}else {b = q.node;break;}}}if (b != null) {Node<K,V> z = null;              // new node, if insertedfor (;;) {                       // find insertion pointNode<K,V> n, p; K k; V v; int c;if ((n = b.next) == null) {if (b.key == null)       // if empty, type check key nowcpr(cmp, key, key);c = -1;}else if ((k = n.key) == null)break;                   // can't append; restartelse if ((v = n.val) == null) {unlinkNode(b, n);c = 1;}else if ((c = cpr(cmp, key, k)) > 0)b = n;else if (c == 0 &&(onlyIfAbsent || VAL.compareAndSet(n, v, value)))return v;if (c < 0 &&NEXT.compareAndSet(b, n,p = new Node<K,V>(key, value, n))) {z = p;break;}}if (z != null) {int lr = ThreadLocalRandom.nextSecondarySeed();if ((lr & 0x3) == 0) {       // add indices with 1/4 probint hr = ThreadLocalRandom.nextSecondarySeed();long rnd = ((long)hr << 32) | ((long)lr & 0xffffffffL);int skips = levels;      // levels to descend before addIndex<K,V> x = null;for (;;) {               // create at most 62 indicesx = new Index<K,V>(z, x, null);if (rnd >= 0L || --skips < 0)break;elsernd <<= 1;}if (addIndices(h, skips, x, cmp) && skips < 0 &&head == h) {         // try to add new levelIndex<K,V> hx = new Index<K,V>(z, x, null);Index<K,V> nh = new Index<K,V>(h.node, h, hx);HEAD.compareAndSet(this, h, nh);}if (z.val == null)       // deleted while adding indicesfindPredecessor(key, cmp); // clean}addCount(1L);return null;}}}}
在底层,节点按照从小到大的顺序排列,上⾯的index层间隔地串在一起,因为从小到大排列,查找的时候, 从顶层index开始,自左往右、自上往下,形成图示的遍历曲线,假设要查找的元素是32,遍历过程如下:
先遍历第2层Index,发现在21的后⾯
从21下降到第1层Index,从21往后遍历,发现在21和35之间
从21下降到底层,从21往后遍历,最终发现在29和35之间
在整个的查找过程中,范围不断缩小,最终定位到底层的两个元素之间,这样类似于二分法

在这里插入图片描述

关于上⾯的put(…)方法,有一个关键点需要说明:在通过findPredecessor找到了待插入的元素在[b,n]之间 之后,并不能⻢上插入,因为其他线程也在操作这个链表,b、n都有可能被删除,所以在插入之前执行了一系列的 检查逻辑,而这也正是无锁链表的复杂之处(前面说明的检查是否删除的意思,通常也包括了找机会删除),这里了解即可
remove(…)分析:
public V remove(Object key) {return doRemove(key, null);}// 若找到了(key, value)就删除,并返回value,找不到就返回nullfinal V doRemove(Object key, Object value) {if (key == null)throw new NullPointerException();Comparator<? super K> cmp = comparator;V result = null;Node<K,V> b;outer: while ((b = findPredecessor(key, cmp)) != null &&result == null) {for (;;) {Node<K,V> n; K k; V v; int c;if ((n = b.next) == null)break outer;else if ((k = n.key) == null)break;else if ((v = n.val) == null)unlinkNode(b, n);else if ((c = cpr(cmp, key, k)) > 0)b = n;else if (c < 0)break outer;else if (value != null && !value.equals(v))break outer;else if (VAL.compareAndSet(n, v, null)) {result = v;unlinkNode(b, n);break; // loop to clean up}}}if (result != null) {tryReduceLevel();addCount(-1L);}return result;}
上⾯的删除方法和插入方法的逻辑⾮常类似,因为无论是插入,还是删除,都要先找到元素的前驱,也就是定 位到元素所在的区间[b,n],在定位之后,执行下⾯⼏个步骤:
1:如果发现b、n已经被删除了,则执行对应的删除清理逻辑
2:否则,如果没有找到待删除的(k, v),返回null
3:如果找到了待删除的元素,也就是节点n,则把n的value置为null,同时在n的后⾯加上Marker节点,同 时检查是否需要降低Index的层次
get分析:
public V get(Object key) {return doGet(key);}private V doGet(Object key) {Index<K,V> q;VarHandle.acquireFence();if (key == null)throw new NullPointerException();Comparator<? super K> cmp = comparator;V result = null;if ((q = head) != null) {outer: for (Index<K,V> r, d;;) {while ((r = q.right) != null) {Node<K,V> p; K k; V v; int c;if ((p = r.node) == null || (k = p.key) == null ||(v = p.val) == null)RIGHT.compareAndSet(q, r, r.right);else if ((c = cpr(cmp, key, k)) > 0)q = r;else if (c == 0) {result = v;break outer;}elsebreak;}if ((d = q.down) != null)q = d;else {Node<K,V> b, n;if ((b = q.node) != null) {while ((n = b.next) != null) {V v; int c;K k = n.key;if ((v = n.val) == null || k == null ||(c = cpr(cmp, key, k)) > 0)b = n;else {if (c == 0)result = v;break;}}}break;}}}return result;}
无论是插入、删除,还是查找,都有相似的逻辑,都需要先定位到元素位置[b,n],然后判断b、n是否已经被 删除,如果是,则需要执行相应的删除清理逻辑,这也正是无锁链表复杂的地方
ConcurrentSkipListSet :
如下⾯代码所示,ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装,此处不再进一步展开叙 述
public class ConcurrentSkipListSet<E>extends AbstractSet<E>implements NavigableSet<E>, Cloneable, java.io.Serializable {
// 封装了一个ConcurrentSkipListMapprivate final ConcurrentNavigableMap<E,Object> m;public ConcurrentSkipListSet() {m = new ConcurrentSkipListMap<E,Object>();}public boolean add(E e) {return m.putIfAbsent(e, Boolean.TRUE) == null;}// ...}
至此,我们大致说明完毕,当然,前面的并发容器中,我们只需要了解即可
我们说明完并发容器后,现在我们说明一些具体使用的类(同步工具类)
同步工具类 (通常也是JUC里面的说明):
Semaphore(中文意思:信号标,我们有时候称为信号量):
Semaphore也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单,如下所示:
// 一开始有5份共享资源,第⼆个参数表示是否是公平
//true代表公平的,false代表不公平的,默认若不设置的话,就是不公平的
Semaphore myResources = new Semaphore(5, true);// ⼯作线程每获取一份资源,就在该对象上记下来
// 在获取的时候是按照公平的⽅式还是⾮公平的⽅式,就要看上一⾏代码的第⼆个参数了
// 一般⾮公平抢占效率较⾼
myResources.acquire(); //假设有10个线程,那么他们只能有5个线程获取资源(我们一般称为信标)
//上限为总资源// ⼯作线程每归还一份资源,就在该对象上记下来
// 此时资源可以被其他线程使⽤
myResources.release(); //只要有一个线程释放了信标,那么其他线程可以继续抢占该资源了,因为有记录(记下来)//对应线程到这里就会释放/*
释放指定数⽬的许可,并将它们归还给信标(释放)
可⽤许可数加上该指定数⽬
如果线程需要获取N个许可,在有N个许可可⽤之前,该线程阻塞
如果线程获取了N个许可,还有可⽤的许可,则依次将这些许可赋予等待获取许可的其他线程
*/
semaphore.release(2); //统一释放2个线程/*
从信标获取指定数⽬的许可,如果可⽤许可数⽬不够,则线程阻塞,直到被中断
该⽅法效果与循环相同
for (int i = 0; i < permits; i++) acquire(); //即对应许可数的线程
只不过该⽅法是原子操作
如果可⽤许可数不够,则当前线程阻塞,直到:(⼆选一)
1. 如果其他线程释放了许可,并且可⽤的许可数满⾜当前线程的请求数字(因为可以中途设置)
2. 其他线程中断了当前线程
permits:要获取的许可数
*/semaphore.acquire(3); //也就是上限
可能上面说明的有点,模糊,那么这里给出案例:
大学⽣到自习室抢座,写作业:
package main;import java.util.Random;
import java.util.concurrent.Semaphore;/****/
public class MyThread extends Thread {private final Semaphore semaphore;private final Random random = new Random();public MyThread(String name, Semaphore semaphore) {super(name); //设置名称,自然是需要父类的操作,因为我们继承只是操作重写的方法而已this.semaphore = semaphore;}@Overridepublic void run() {try {semaphore.acquire(); //抢占,没有设置上限,所以上限就是总资源,会操作加System.out.println(Thread.currentThread().getName() + " - 抢座成功,开始写作业");Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName() + " - 作业完成,腾出座位");} catch (InterruptedException e) {e.printStackTrace();}semaphore.release(); //释放,有就释放,会操作减//又加又减导致知道操作对应的上限}
}
package main;import java.util.concurrent.Semaphore;/****/
public class Demo {public static void main(String[] args) {Semaphore semaphore = new Semaphore(2);for (int i = 0; i < 5; i++) {new MyThread("学⽣-" + (i + 1), semaphore).start();}}
}
如下图所示,假设有n个线程来获取Semaphore里⾯的10份资源(假设n > 10),n个线程中只有10个线程能获取 到,其他线程都会阻塞,直到有线程释放了资源,其他线程才能获取到

在这里插入图片描述

与同步有点类似,只不过这里是直接的抢资源,且可以多个人一起抢,而不是同步的一个人,所以在前面我们说明信号量时,说过:一种用于控制对一个或多个单位资源进行访问的机制(有多个的存在,而不是监视器的一个共享资源)
这就是信号量与监视器的主要区别
当初始的资源个数为1的时候,Semaphore退化为排他锁(排它锁是任意时刻只能有且只有一个线程持有,其它获取不到排它锁的线程要么自旋等待要么阻塞等待被唤醒,synchronized就是典型的排它锁,我们有时候也会称为互斥锁,因为互斥就是该思想,共享锁是一种可以同时被多个线程持有的锁,持有共享锁的线程之间不会相互竞争和阻塞,资源个数大于1时,就可以认为是共享锁),正因为如此,Semaphore的实现原理和锁⼗分类似,也可以认为是基于AQS,有公平和⾮公平之分,Semaphore相关类的继承体系如下图所示:

在这里插入图片描述

有颜色(黑色)的箭头代表是其(指向的)内部类,没有颜色的(白色)的箭头代表是其(指向的)子类
public class Semaphore implements java.io.Serializable {//..
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public void release() {sync.releaseShared(1);
}//除了AbstractQueuedSynchronizer是一个我们继承的类外,其他的都是Semaphore的内部类,看源码就知道了
由于Semaphore和锁的实现原理基本相同,上⾯的代码不再展开解释,资源总数即state的初始值,在acquire里对state变量进行CAS减操作,减到0之后,线程阻塞,在release里对state变量进行CAS加操作,一般来说synchronized是使用JVM来实现的,而其他的锁基本都是使用CAS来实现的(通常包括Lock,即一般是ReentrantLock),CAS一般在JVM的基础上实现或者CPU基础上(本质上都是这个)实现的,即CAS是利用系统的阻塞来实现原子操作,而synchronized是利用JVM的阻塞来实现原子操作(最终操作系统的阻塞),很明显,CAS比较底层,那么自然效率大,因为他并没有像synchronized一样的阻塞导致系统阻塞(因为这里中间的"导致"自然是需要时间的),因为最终的阻塞是系统的阻塞也就是线程的阻塞,而synchronized比较上层,那么需要中间的操作多,而不是我CAS直接的阻塞,这里解释之前的"没有直接的使用锁,在后面会具体说明的"
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizer implements java.io.Serializable {// ...//前面的 sync.acquireSharedInterruptibly(1);会到这里(抢占):
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); //到这里考虑是否阻塞,通常需要考虑抢占或者阻塞,一般是直接的阻塞,当被唤醒时,他会使得继续操作当前方法,那么可能也会被阻塞或者不阻塞了(不阻塞自然直接的操作对应run后面的代码,一般就是后面的代码,只是通常是run对应后面的而已,使得认为抢占成功了,虽然是的确抢占成功的)}//前面的 sync.releaseShared(1);会到这里(释放):public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared(); //到这里考虑释放,通常直接的释放,一般释放会使得都进行唤醒,因为都进行抢占的,虽然对应可能抢占会失败(即没有抢到)return true;}return false;}// ...}public class Semaphore implements java.io.Serializable {//..abstract static class Sync extends AbstractQueuedSynchronizer {//tryReleaseShared(arg),我们会使用最子类的版本,释放相关protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState(); //一般都是AbstractQueuedSynchronizer的方法,并且初始化时,也是操作这个变量,自己看源码就知道了,后面基本都是如此int next = current + releases; //加if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) //CAS操作的操作,在后面有关于该操作,基本都是CAS的,实际上是同一个操作,通常是将对应的current(实际上会是state,即这里不是主要的,一般是对应方法的注解的作用,后面的VarHandle的对应给出的方法上面的注解,使得可以找到被赋值的东西(如变量),或者通过自身来的某些操作找到对应变量的修改记录,那么因为这样使得可以修改具体变量,比如state,或者说对应的VarHandle保存了记录(一般之后需要直接操作,因为记录是有限的,且需要确定是哪个记录位置,但是,一般我们也说,他可能是自动的与对应固定的变量进行比较,比如state,具体固定变量可能与类本身的一些操作有关,因为注解的原因,所以可能只是对该类固定,其他类中可能是改变的,我们一般是倾向于后面的固定变量的说明(因为这样效率高,不用保留对应的记录了),具体原理可以百度查看,这里只是我的理解,可以忽略),而后面的Unsafe没有,所以需要进行查找,然后找到后比较,那么他一般需要偏移量(也可以认为是地址,只是我们一般说成偏移量而已)这里Unsafe在后面才会学习,即现在了解即可)设置成next,这样就认为加了,最好使得先将阻塞放在前面,因为由于是设置state,所以如果先操作释放,那么必然是可以都进行抢占的而没有阻塞的情况出现,使得他们都操作了打印,然后一起操作释放,可以在semaphore.acquire();前面加上semaphore.release();就知道了return true;//实际上无论是前面的学习还是后面的学习,都会使用到他的底层,也就是如下:/*private static final VarHandle STATE;protected final boolean compareAndSetState(int expect, int update) {return STATE.compareAndSet(this, expect, update);}大多数情况下,具体的操作他执行的,所以主要是操作expect, update变量进行设置一般来说,只要设置成功,或者达到参数的减少数量(或者到0),那么一般就会返回true(所以上面通常操作释放,前提是该线程没有阻塞了),否则可能返回false,当然,具体是由于注解的存在(或者说是因为VarHandle类),以及他本身类的作用,可能会使得不同(因为对应注解可能使用本身类存在的某些方法或者变量,即东西,但大多数都是上面的说法),并且可能会改变固定的操作变量的对比(比如上面的state,这只是我的猜测,因为对应的注解并不知道是干什么的,并且其对应最终操作的方法也是native的,所以你可以选择忽略)*/}}
}//..static final class FairSync extends Sync { //父类指向子类,所以对应的sync.acquireSharedInterruptibly(1);或者sync.releaseShared(1);的sync可能是这个类的对象(这里一般是其(对象)父类,因为操作公平类和不公平类的,看Semaphore的构造方法就知道了,这里一般是AbstractQueuedSynchronizer的操作)来调用的,或者说最子类来操作,那么操作的tryAcquireShared(arg)或者tryReleaseShared(arg)可能也是他里面的操作(这里特别:是从sync类型开始说明的最子类,否则一般是对象最大的父类开始说明)或者其(sync)父类操作(这两个一般不是sync的父类AbstractQueuedSynchronizer的操作,后面会给出代码的),当然也可以说对应的最子类(最子类:找到最小的子类或者可能就是本身类型,从对象的最大的父类开始找(说明)的,即持续找子类,直到找到没有其子类有对应的方法的地方,那么该子类就是最子类,比如a继承b,b继承c,c继承d,假设我们调用a的对象,如果c有对应的方法,且b没有,那么就直接使用c的,即c就是最子类,从最大父类d开始找(说明)的)的版本,当然,对象调用方法时,对应的引用类型的类前提是有(通常是父类或者本身,一般父类才是这样的说明),否则编译期会报错(虽然这里也的确有,一般来说,上面的后面两个在AbstractQueuedSynchronizer的操作是抛出异常)
// ...FairSync(int permits) {super(permits);}//tryAcquireShared(arg),抢占相关protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors()) //主要的CAS操作,是操作阻塞的前提return -1; //没有抢到int available = getState();int remaining = available - acquires; //减if (remaining < 0 || compareAndSetState(available, remaining)) //CAS操作的操作,因为remaining < 0发存在,所以使得没有设置负数的情况(因为||有一个true时,后面的不操作)return remaining;}}}//..}//他们都操作了compareAndSetState方法,而该方法内部操作在如下:
package java.lang.invoke;public abstract class VarHandle {// ...// CAS,原子操作,其他语言的操作(native),一般主要是用来在Java程序中调用c/c++的代码,也有可能是其他语言的代码public final native@MethodHandle.PolymorphicSignature@HotSpotIntrinsicCandidateboolean compareAndSet(Object... args);// ...}
CountDownLatch :
CountDownLatch使用场景 :
假设一个主线程要等待5个 Worker 线程(工人线程,一般代表我们自己操作的线程,认为就是对应的线程,必然下面代码中的new MyThread(“线程1”, latch).start();就是一个Worker线程)执行完才能退出,可以使用CountDownLatch来实现:
具体线程代码:
package main1;import java.util.Random;
import java.util.concurrent.CountDownLatch;/****/
public class MyThread extends Thread {private final CountDownLatch latch;private final Random random = new Random();public MyThread(String name, CountDownLatch latch) {super(name);this.latch = latch;}@Overridepublic void run() {try {Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "运行结束");latch.countDown(); //认为对应的线程结束了,但是只是相对来说减少1}
}
package main1;import java.util.concurrent.CountDownLatch;/****/
public class Main {public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(5); //代表要操作对应的5个线程,上限5个new MyThread("线程1", latch).start();new MyThread("线程2", latch).start();new MyThread("线程3", latch).start();new MyThread("线程4", latch).start();
// new MyThread("线程5", latch).start();// 当前线程等待latch.await(); //等待被减少5个,对应的由于每次只是减少1,所以我们需要操作5个线程,否则即上面的注释若不解除,那么这里就不会打印"程序运行结束"System.out.println("程序运行结束");}
}
下图为CountDownLatch相关类的继承层次,CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和⾮公平之分

在这里插入图片描述

await()实现分析:
如下所示,await()调用的是AQS 的模板方法,这个方法在前⾯已经介绍过(看下面代码就知道了),CountDownLatch.Sync重新实现 了tryAccuqireShared方法:
public void await() throws InterruptedException {// AQS的模板方法,与前面的Semaphore的对应的操作基本类似sync.acquireSharedInterruptibly(1);
}//前面的public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 被CountDownLatch.Sync实现if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}//对应的CountDownLatch的Sync的该方法,虽然前面的Semaphore也有Sync,但他们是不同的protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
从tryAcquireShared(…)方法的实现来看,只要state != 0,调用await()方法的线程便会被放入AQS的阻塞队列,进入阻塞状态,因为对应的还没有释放掉,就会始终阻塞
countDown()实现分析:
public void countDown() {sync.releaseShared(1); //与前面的Semaphore也是一样的
}// AQS的模板方法
public final boolean releaseShared(int arg) {// 由CountDownLatch.Sync实现if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}//同样也是自身的对应的CountDownLatch的Sync的该方法protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0) //到0就不会继续减了,而是直接的返回false,那么结果就不进行任何的操作(因为false是不操作 doReleaseShared();的,很明显,他到0的过程必然会导致经过doReleaseShared方法,即经过true或者有true的出现),所以在解除阻塞后,如果继续操作对应的方法是什么都不做事情的return false;int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}}}
countDown()调用的AQS的模板方法releaseShared(),里⾯的tryReleaseShared(…)由CountDownLatch.Sync实现,从上⾯的代码可以看出,只有state=0(nextc),tryReleaseShared(…)才会返回true,然后执行doReleaseShared(…),一次性唤醒队列中所有阻塞的线程(对应的Semaphore也是如此),因为阻塞只有一个,那么唤醒一次就行,而不是唤醒很多次,前面的Semaphore因为抢占的原因,所以会唤醒很多次(但是每一次的唤醒也是唤醒全部,只是会继续判断抢占而已,而这里没有)
总结:由于是基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过countDown()一直减state,减到0后一次性唤醒所有线程(注意如果多次的执行该方法,那么可以一个线程就能唤醒,你可以试着在一个线程里面执行很多次该方法就知道了),如下图所示,假设初始总数为M,N个线程await(),M个线程countDown(),减到0之后,N个线程被唤醒(一般只是一个主线程,而之所以说是全部,是因为与Semaphore一样,Semaphore是唤醒多个线程来进行抢占,虽然可能抢占不到,使得什么都没有做或者又使得阻塞中,而这里也同样如此,可以自己另外操作一个线程来进行设置阻塞就知道了,对应的也会进行唤醒,所以是全部唤醒)

在这里插入图片描述

注意:这里的对应的唤醒,可以认为是相关的操作被判断重新执行(认为循环判断的),使得重新执行后没有阻塞了,导致认为唤醒的(一般我们认为他是直接的往后面执行,而没有重新执行对应的方法,对应的方法只是第一次的操作而已,之后基本认为是继续操作相同的操作了,所以我们也会认为是重新执行,一般的唤醒基本都这样的认为,前提是没有操作锁的,因为锁可以直接的使用唤醒方法,而这里却需要再次的重新执行或者说判断,因为没有锁这样的直接的唤醒方法,所以需要我们自己进行操作,实际上唤醒方法与重新执行或者说判断是基本类似的,他们唤醒后,自然会操作对应阻塞代码后面的了,通常是run方法对应的后面的代码,或者其他对应后面的代码,反正是操作后面的代码),一般认为是唤醒所有线程的阻塞,只有少数的只是唤醒一个的,比如notify就是随机唤醒一个,即可以认为他们基本都是notifyAll
前面的Semaphore释放是操作加(使得会不阻塞),CountDownLatch释放是操作减(使得会不阻塞),当然,他们是不同的类,自然操作不同
CyclicBarrier :
CyclicBarrier使用场景 :
CyclicBarrier使用方式比较简单:
CyclicBarrier barrier = new CyclicBarrier(5);barrier.await();
该类用于协调多个线程同步执行操作的场合
使用场景:10个工程师一起来公司应聘,招聘方式分为笔试和⾯试,⾸先,要等人到⻬后,开始笔试,笔试结 束之后,再一起参加⾯试,把10个人看作10个线程,10个线程之间的同步过程如下图所示:

在这里插入图片描述

具体案例代码:
package main2;import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;/****/
public class MyThread extends Thread {private final CyclicBarrier barrier;private final Random random = new Random();public MyThread(String name, CyclicBarrier barrier) {super(name);this.barrier = barrier;}@Overridepublic void run() {try {Thread.sleep(random.nextInt(2000));System.out.println(Thread.currentThread().getName() + " - 已经到达公司");barrier.await(); //只有对应上限的线程到这里,这个地方才会操作释放,否则一直阻塞,你可以试着将后面的循环中的5变成4就知道了Thread.sleep(random.nextInt(2000));System.out.println(Thread.currentThread().getName() + " - 已经笔试结束");barrier.await(); //可以操作继续阻塞,这里与前面的Semaphore是主要的区别,因为Semaphore解除阻塞后,他对应的值始终为0了,即不会再次的阻塞了,而对应的释放并没有操作加,只是判断而已,所以一般我们只会使用Semaphore来操作主线程的阻塞,即前面说过的"假设一个主线程要等待5个 Worker 线程"Thread.sleep(random.nextInt(2000));System.out.println(Thread.currentThread().getName() + " - 已经⾯试结束");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}
//        super.run();}
}
package main2;import java.util.concurrent.CyclicBarrier;/****/
public class Main {public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(5);  //这里就操作5个了(不操作10个了)for (int i = 0; i < 5; i++) { //这里就操作5个了new MyThread("线程-" + (i + 1), barrier).start();}}
}
在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试,第2个同步点, 要等所有应聘者都结束笔试,之后一起进入⾯试环节
CyclicBarrier实现原理 :
CyclicBarrier基于ReentrantLock+Condition实现
public class CyclicBarrier {private final ReentrantLock lock = new ReentrantLock();// 用于线程之间相互唤醒private final Condition trip = lock.newCondition();// 线程总数private final int parties;private int count;private Generation generation = new Generation();// ...}
下⾯详细介绍 CyclicBarrier 的实现原理,先看构造方法:
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();// 参与方数量this.parties = parties; //我们设置的5就是这个this.count = parties;// 当所有线程被唤醒时,执行barrierCommand表示的Runnablethis.barrierCommand = barrierAction;/*这里的测试,只需要将前面的main方法的对应修改如下即可:CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {@Overridepublic void run() {System.out.println("该阶段结束");}});//也就是说,对应的唤醒后,他就会执行,并且要知道,只有他执行完毕,对应的才能真正的进行解除阻塞(唤醒),在后面的源码中可以知道的*/
}public CyclicBarrier(int parties) {this(parties, null);}
接下来看一下await()方法的实现过程:
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();//响应中断if (Thread.interrupted()) {//唤醒所有阻塞的线程breakBarrier(); //后面会给出这个代码的内容throw new InterruptedException();}// 每个线程调用一次await(),count都要减1,其中在构造方法中,我们就操作了this.count = parties;int index = --count;// 当count减到0的时候,此线程唤醒其他所有线程,因为阻塞会释放锁,导致可以使得单个线程操作到if (index == 0) {  // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run(); //这里就操作了执行,很明显,在唤醒方法之前操作的,所以我们也说在解除阻塞之前操作ranAction = true;nextGeneration(); //后面会给出这个代码的内容return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {if (!timed)trip.await(); //其他情况操作真正的阻塞,在对应的类里面是:private final Condition trip = lock.newCondition();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation) //一般来说上面的trip.await();或者nanos = trip.awaitNanos(nanos);都会使得generation发生改变,所以一般直接的返回了return index;if (timed && nanos <= 0L) { //一般代表是否计时,以及计时是否完成,通常这里代表错误的出现,一般我们不会操作到这个(一般计时我们也不会操作到,因为上面基本直接的返回,所以会发现,设置了10000,并没有什么作用,可能在某些情况下,会操作到的,但好像并没有什么计时的操作判断,而只有错误的判断,你可以试着将5变成4,也会发现是始终阻塞的,当然错误的出现一般是时间太短导致可能会出现问题,比如设置1,那么一般会报错,即就会到这里,时间太少了,导致出现问题了,一般有界限的,防止出现负数),所以不考虑,具体实现可以考虑barrier.await(10000, TimeUnit.SECONDS);,第二个参数代表单位,这里是秒breakBarrier();throw new TimeoutException();}//一般情况下,被唤醒后,直接什么都不操作,使得操作对应后面里面的内容,即往后面执行}} finally {lock.unlock();}}//我们来看看唤醒所有线程的代码:
private void breakBarrier() {generation.broken = true;count = parties; //在构造方法中,我们操作了this.parties = parties;,所以保存了对应的上限,所以导致可以使得继续进行操作,即barrier.await(); 可以操作继续阻塞trip.signalAll(); //我们说lock的signal是唤醒对应的lock的阻塞,那么signalAll就是唤醒所有该lock的阻塞//注意是该lock,因为我们也说过,当同一个lock被多个线程操作时,单纯的signal与notify一样是随机的唤醒,而signalAll就与notifyAll一样是都唤醒了,因为阻塞是会导致释放锁的(前提是可以释放,这里就可以),自然可以使得多个线程来阻塞,所以我们需要signalAll
}//也的确操作都唤醒
private void nextGeneration() {// signal completion of last generationtrip.signalAll();// set up next generationcount = parties;generation = new Generation();
}
关于上⾯的方法,有⼏点说明:
1:CyclicBarrier是可以被重用的,以前面的应聘场景为例,来了10个线程,这10个线程互相等待,到⻬ 后一起被唤醒,各自执行接下来的逻辑,然后,这10个线程继续互相等待,到⻬后再一起被唤醒,每一 轮被称为一个Generation(没有中断或者计时错误,虽然计时基本不会操作到),就是一次同步点,所以正常没有中断使得唤醒时,是操作generation = new Generation();的,否则设置对应的为true(中断,后面会说明),那么会使得抛出异常,而抛出异常后,自然不会操作后面的内容,所以对应的线程就不会继续执行了,即都基本打印后程序结束,但是他会使得原来已经阻塞的先唤醒,当再次的操作到阻塞时,才会使得打印异常,然后结束,即他会使得先做好当前的事情,然后结束
2:CyclicBarrier 会响应中断,10 个线程没有到⻬,如果有线程收到了中断信号,所有阻塞的线程也会被唤 醒,就是上⾯的breakBarrier()方法,然后count被重置为初始值(parties),重新开始
3:上⾯的回调方法,barrierAction只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线 程每个都执行1次,因为对应的中断只是指定一个线程来进行操作的,所以只有对应的线程会使得操作,即执行一次
当然,中断后可能还有线程在阻塞中,因为该线程可能在"判断中断之后"和"准备阻塞之前"的这个时间段,导致他在唤醒后才阻塞,所以也会有线程在阻塞的,你可以多次的执行几次就知道了
实际上我们也可以手动的编写一个类似的,只需要也自己定义上限即可,我们自然也能编写出来,当然,其他的说明的类我们也能编写(因为他们也是编写出来的),但这里相对简单点,但是可能你并不会CAS操作,实际上你可以参照77章博客中的乐观锁操作即可或者百度找案例,这里就不多说了,因为一般CAS操作通常可能是C或者C++编写的,所以不做说明
这里我需要提一下可重入锁,代表如果多次的操作加锁,那么他只是重新获得锁而已(或者说不进行改变),当然,这个获得在其他线程来看,仍然是被阻塞的,这里要注意,也就是说他的获得(不进行改变)期间也是锁(可以认为他在获得的时候,还是锁住的,更加简单的说明就是,对应的锁的锁住标志(类似于前面说明的"标志位(state变量)“)没有变成0,或者说加了1或者加了几个数,即还是锁住的,即可以这样的理解,若对应的线程ID与记录的线程ID(类似于前面说明的"thread ID(简称线程ID)”)一样,那么锁的标志加1(一次重入)或者加几次(前提是多次的重入),因为线程ID是创建此线程时生成的正整数,且线程ID是唯一的,那么基本不能改变),所以对应的释放锁,虽然多次的获得锁,但是仍然还是一样的只需要释放即可(因为还是只有一人得到,因为标志加几次),但是这个释放需要多次的进行,也就是说,需要将对应的次数标志变成0,也就可以说,每次的释放只是减1而已(当然,这个释放由于只是减1,那么自然可以放在一起,一般来说当变成0了,他就不会继续减了,或者如果继续减,那么会报错,通常来说对于lock或者synchronized是报错,而其他的,比如CountDownLatch就不会继续减,一般直接的返回,即返回0,这里的介绍,在后面的Lock与Condition会再次具体说明其他细节的),你可以试着将对应的lock多次的加锁,或者递归当前方法就知道了,所以很明显,经过我的测试,对应的lock(一般是ReentrantLock)以及synchronized是可重入锁
Exchanger :
使用场景:
Exchanger用于线程之间交换数据,其使用代码很简单,是一个exchange(…)方法,使用示例如下:
package main3;import java.util.Random;
import java.util.concurrent.Exchanger;/****/
public class Main {private static final Random random = new Random();public static void main(String[] args) {// 建一个多线程共用的exchange对象// 把exchange对象传给3个线程对象,每个线程在自己的run方法中调用exchange,把自己的数据作为参数// 传递进去,返回值是另外一个线程调用exchange传进去的参数,所以对应的线程可以得到其他线程传递的数据,所以我们说Exchanger用于线程之间交换数据Exchanger<String> exchanger = new Exchanger<>();new Thread("线程1") {@Overridepublic void run() {while (true) {try {// 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调用exchange为⽌,并且得到对应第一个线程调用的他传递的参数,这样说你可能不是很明白,自己运行就知道了String otherData = exchanger.exchange("交换数据1");System.out.println(Thread.currentThread().getName() + "得到 <==" + otherData);Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}}}}.start();new Thread("线程2") {@Overridepublic void run() {while (true) {try {String otherData = exchanger.exchange("交换数据2");System.out.println(Thread.currentThread().getName() + "得到 <==" + otherData);Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}}}}.start();new Thread("线程3") {@Overridepublic void run() {while (true) {try {String otherData = exchanger.exchange("交换数据3");System.out.println(Thread.currentThread().getName() + "得到 <==" + otherData);Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}}}}.start();}
}
在上⾯的例子中,3个线程并发地调用exchange(…),会两两交互数据,如:
线程2得到:交换数据1
线程1得到:交换数据2
线程1得到:交换数据3
实现原理 :
Exchanger的核⼼机制和Lock一样,也是CAS+park/unpark
⾸先,在Exchanger内部,有两个内部类:Participant和Node,代码如下:
public class Exchanger<V> {// ...public Exchanger() {participant = new Participant();}//..// 添加了Contended注解,表示伪共享与缓存行填充,可以认为设置对应的参数值@jdk.internal.vm.annotation.Contended static final class Node {int index; // Arena indexint bound; // Last recorded value of Exchanger.boundint collides; // 本次绑定中,CAS操作失败次数int hash; // 自旋伪随机//下面三个重要的字段(变量)Object item; // 本线程要交换的数据volatile Object match; // 对方线程交换来的数据// 当前线程volatile Thread parked; // 当前线程阻塞的时候设置该属性,不阻塞为null}static final class Participant extends ThreadLocal<Node> {public Node initialValue() { return new Node(); } //对应的初始化方法}// ...
private final Participant participant;//..
}
每个线程在调用exchange(…)方法交换数据的时候,会先创建一个Node对象
这个Node对象就是对该线程的包装,里⾯包含了3个重要字段:第一个是该线程要交互的数据,第⼆个是对方 线程交换来的数据,最后一个是该线程自身
一个Node只能⽀持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此在Exchanger里⾯定义了Node数组,所以可以认为Node就是线程
private volatile Node[] arena;
exchange(V x)实现分析 :
明⽩了大致思路,下⾯来看exchange(V x)方法的详细实现:
 public V exchange(V x) throws InterruptedException {Object v;Node[] a;Object item = (x == null) ? NULL_ITEM : x; // translate null args//private volatile Node[] arena;if (((a = arena) != null ||(v = slotExchange(item, false, 0L)) == null) &&((Thread.interrupted() || // disambiguates null return(v = arenaExchange(item, false, 0L)) == null)))throw new InterruptedException();return (v == NULL_ITEM) ? null : (V)v;}
//如果||和&&没有括号的话,那么相同的就是从左到右判断,不同的那么一般&&优先(前提是对应先得到结果,得到结果是从左到右,然后&&优先操作结果),如果有括号,那么先操作括号里面的
/*
案例:if(true||true&&false){System.out.println(1);}由于是&&先操作,所以打印1,如果是||先操作,那么应该不会打印1,但是经过测试,是打印1的,所以&&先操作*/
上⾯方法中,如果arena不是null,表示准备启用了arena方式交换数据(一般代表可以操作arenaExchange方法,有时候也可以认为是后面的return,虽然一般都是v本身,因为返回对应交换的数据的),如果arena不是null,并且线程被中断, 则抛异常(因为双方都是true,那么对于&&来说是true),而线程中断就从false变成true了,即满足条件了,即抛出异常
如果arena不是null,并且arenaExchange的返回值为null(一般是false的,那么如果这样就也变成true了,与中断一样的),则抛异常,一般来说对方线程交换来的null值是封装为NULL_ITEM对象的,而不是null(Object item = (x == null) ? NULL_ITEM : x;),所以对应得到的返回值是该对象的打印信息,虽然可能判断单纯的NULL_ITEM打印出来的也是null(private static final Object NULL_ITEM = new Object();),之所以说判断,是因为直接的打印是地址的
如果slotExchange的返回值是null(与arena不是null一样,因为代表true),并且线程被中断,那么自然,则抛异常
如果slotExchange的返回值是null,并且areaExchange的返回值是null,则抛异常
无论怎么看,只有两边都有true,自然抛出异常,而一个不会,因为&&的存在
slotExchange的实现:
/*** 如果不启用arenas(即对应为false,即arenas是null,那么他就会进行执行,这是对于||来说的),则使用该方法进行线程间数据交换** @param item 需要交换的数据* @param timed 是否是计时等待,true表示是计时等待,而false表示不(是)计时等待* 比如前面的slotExchange(item, false, 0L)中就是false* @param ns 如果是计时等待,该值表示最大等待的时长,这个值要操作需要是true,否则无论你怎么设置他不会操作,对于相关的该类似的说明,一般也是如此,比如前面的dowait(false, 0L);,若是false,那么后面无论怎么设置也基本不会操作,只是为了方便或者合理,我们都会设置位0L* @return 对方线程交换来的数据;如果等待超时或线程中断,或者准备启用了arena,则返回null*/private final Object slotExchange(Object item, boolean timed, long ns) {// participant在初始化的时候设置初始值为new Node()// 获取本线程要交换的数据节点Node p = participant.get(); //看后面方法就知道了// 获取当前线程Thread t = Thread.currentThread();// 如果线程被中断,则返回null,也就使得在判断中变成true,自然也会使得对应的&&后面也是true(因为中断),即会抛出异常if (t.isInterrupted()) // preserve interrupt status so caller can recheckreturn null;for (Node q;;) {// 如果slot⾮空,表明有其他线程在等待该线程交换数据//private volatile Node slot;,一般代表先交换的线程,或者说先操作或者抢占的线程if ((q = slot) != null) {// CAS操作,将当前线程的slot由slot设置为null// 如果操作成功,则执行if中的语句if (SLOT.compareAndSet(this, q, null)) {// 获取对方线程交换来的数据Object v = q.item; //注解导致item值是设置的参数// 设置要交换的数据q.match = item;// 获取q中阻塞的线程对象Thread w = q.parked;if (w != null)// 如果对方阻塞的线程⾮空,则唤醒阻塞的线程LockSupport.unpark(w);return v;}// create arena on contention, but continue until slot null// 创建arena用于处理多个线程需要交换数据的场合,防⽌slot冲突if (NCPU > 1 && bound == 0 &&BOUND.compareAndSet(this, 0, SEQ))arena = new Node[(FULL + 2) << ASHIFT];}// 如果arena不是null,需要调用者调用arenaExchange方法接着获取对方线程交换来的数据else if (arena != null)return null; // caller must reroute to arenaExchangeelse {// 如果slot为null,表示对方没有线程等待该线程交换数据// 设置要交换的本方数据p.item = item;// 设置当前线程要交换的数据到slot// CAS操作,如果设置失败,则进入下一轮for循环if (SLOT.compareAndSet(this, null, p))break;p.item = null;}}// await release// 没有对方线程等待交换数据,将当前线程要交换的数据放到slot中,是一个Node对象// 然后阻塞,等待唤醒int h = p.hash;// 如果是计时等待交换,则计算超时时间;否则设置为0long end = timed ? System.nanoTime() + ns : 0L;// 如果CPU核⼼数大于1,则使用SPINS数,自旋;否则为1,没必要自旋int spins = (NCPU > 1) ? SPINS : 1;// 记录对方线程交换来的数据Object v;// 如果p.match==null,表示还没有线程交换来数据while ((v = p.match) == null) {// 如果自旋次数大于0,计算hash随机数if (spins > 0) {// ⽣成随机数,用于自旋次数控制h ^= h << 1; h ^= h >>> 3; h ^= h << 10;if (h == 0)h = SPINS | (int)t.getId();else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield();}// p是ThreadLocal记录的当前线程的Node// 如果slot不是p表示slot是别的线程放进去的else if (slot != p)spins = SPINS;else if (!t.isInterrupted() && arena == null &&(!timed || (ns = end - System.nanoTime()) > 0L)) {p.parked = t;if (slot == p) {if (ns == 0L)// 阻塞当前线程LockSupport.park(this);else// 如果是计时等待,则阻塞当前线程指定时间LockSupport.parkNanos(this, ns);}p.parked = null;}else if (SLOT.compareAndSet(this, p, null)) {// 没有被中断但是超时了,返回TIMED_OUT,否则返回nullv = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;break;}}// match设置为null值 CASMATCH.setRelease(p, null);p.item = null;p.hash = h;// 返回获取的对方线程交换来的数据return v;}//对应的初始化public T get() {Thread t = Thread.currentThread();ThreadLocalMap map = getMap(t);if (map != null) {ThreadLocalMap.Entry e = map.getEntry(this);if (e != null) {@SuppressWarnings("unchecked")T result = (T)e.value;return result;}}return setInitialValue();}private T setInitialValue() {T value = initialValue(); //调用对应调用者的initialValue方法,由于对应的调用者是Participant对象,所以执行对应的方法,也就使得操作这个:public Node initialValue() { return new Node(); },即初始化Thread t = Thread.currentThread();ThreadLocalMap map = getMap(t);if (map != null) {map.set(this, value);} else {createMap(t, value);}if (this instanceof TerminatingThreadLocal) {TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);}return value;}
arenaExchange的实现:
/*** 当启用arenas的时候(对应不是false,而是true,自然对于&&来说,自然会考虑这里),使用该方法进行线程间的数据交换** @param item 本线程要交换的⾮null数据* @param timed 如果需要计时等待,则设置为true* @param ns 表示计时等待的最大时长* @return 对方线程交换来的数据。如果线程被中断,或者等待超时,则返回null*/
private final Object arenaExchange(Object item, boolean timed, long ns) {Node[] a = arena;int alen = a.length;Node p = participant.get();// 访问下标为i处的slot数据,进行选择,而不是操作先操作的for (int i = p.index;;) {                      // access slot at iint b, m, c;int j = (i << ASHIFT) + ((1 << ASHIFT) - 1);if (j < 0 || j >= alen)j = alen - 1;// 取出arena数组的第j个Node元素Node q = (Node)AA.getAcquire(a, j);// 如果q不是null,则将数组的第j个元素由q设置为nullif (q != null && AA.compareAndSet(a, j, q, null)) {// 获取对方线程交换来的数据Object v = q.item;                     // release// 设置本方线程交换的数据q.match = item;// 获取对方线程对象Thread w = q.parked;if (w != null)// 如果对方线程⾮空,则唤醒对方线程LockSupport.unpark(w);return v;}// 如果自旋次数没达到边界,且q为nullelse if (i <= (m = (b = bound) & MMASK) && q == null) {// 提供本方数据p.item = item;                         // offer// 将arena的第j个元素由null设置为pif (AA.compareAndSet(a, j, null, p)) {long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;Thread t = Thread.currentThread(); // wait// 自旋等待for (int h = p.hash, spins = SPINS;;) {// 获取对方交换来的数据Object v = p.match;// 如果对方交换来的数据⾮空if (v != null) {// 将p设置为null,CAS操作MATCH.setRelease(p, null);// 清空p.item = null;             // clear for next usep.hash = h;// 返回交换来的数据return v;}// 产⽣随机数,用于限制自旋次数else if (spins > 0) {h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshiftif (h == 0)                // initialize hashh = SPINS | (int)t.getId();else if (h < 0 &&          // approx 50% true(--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield();        // two yields per wait}// 如果arena的第j个元素不是pelse if (AA.getAcquire(a, j) != p)spins = SPINS;       // releaser hasn't set match yetelse if (!t.isInterrupted() && m == 0 &&(!timed ||(ns = end - System.nanoTime()) > 0L)) {p.parked = t;              // minimize windowif (AA.getAcquire(a, j) == p) {if (ns == 0L)// 当前线程阻塞,等待交换数据LockSupport.park(this);elseLockSupport.parkNanos(this, ns);}p.parked = null;}// arena的第j个元素是p并且CAS设置arena的第j个元素由p设置为null成功else if (AA.getAcquire(a, j) == p &&AA.compareAndSet(a, j, p, null)) {if (m != 0)                // try to shrinkBOUND.compareAndSet(this, b, b + SEQ - 1);p.item = null;p.hash = h;i = p.index >>>= 1;        // descend// 如果线程被中断,则返回null值if (Thread.interrupted())return null;if (timed && m == 0 && ns <= 0L)// 如果超时,返回TIMED_OUTreturn TIMED_OUT;break;                     // expired; restart}}}elsep.item = null;                     // clear offer}else {if (p.bound != b) {                    // stale; resetp.bound = b;p.collides = 0;i = (i != m || m == 0) ? m : m - 1;}else if ((c = p.collides) < m || m == FULL ||!BOUND.compareAndSet(this, b, b + SEQ + 1)) {p.collides = c + 1;i = (i == 0) ? m : i - 1;          // cyclically traverse}elsei = m + 1;                         // growp.index = i;}}}
//他一般返回的不是null,自然会使得对应为false,所以通常不会抛出异常
这里提一下:对应的源码只需要了解即可,因为我们并不需要深入的操作,特别是对应的类是非常多的,终其一生也可能学不完的,只需要大致了解即可,无论是前面的说明还是从这里开始后面的说明都是如此
Phaser :
用Phaser替代CyclicBarrier(到齐解除阻塞(一般需要他自动,我们通常不能干扰),可以继续操作)和CountDownLatch(等待执行完(实际上是对应的操作数量,所以他还有其他可能性的操作,可以干扰),才解除阻塞,通常不能继续操作)
从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大
用Phaser替代CountDownLatch:
考虑讲CountDownLatch时的例子,1个主线程要等10个worker(Worker)线程完成之后,才能做接下来的事情,也可以 用Phaser来实现此功能,在CountDownLatch中,主要是2个方法:await()和countDown(),在Phaser中,与之相 对应的方法是awaitAdance(int n)和arrive()
具体案例如下:
package main4;import java.util.Random;
import java.util.concurrent.Phaser;/****/
public class Main {public static void main(String[] args) {Phaser phaser = new Phaser(5);for (int i = 0; i < 5; i++) {new Thread("线程-" + (i + 1)) {private final Random random = new Random();@Overridepublic void run() {System.out.println(getName() + " - 开始运行");try {Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(getName() + " - 运行结束");phaser.arrive();}}.start();}System.out.println("线程启动完毕");phaser.awaitAdvance(phaser.getPhase());System.out.println("线程运行结束");}
}
自己运行就知道了
用Phaser替代CyclicBarrier:
考虑前⾯讲CyclicBarrier时,10个工程师去公司应聘的例子,也可以用Phaser实现,代码基本类似
具体案例如下:
package main5;import java.util.Random;
import java.util.concurrent.Phaser;/****/
public class MyThread extends Thread {private final Phaser phaser;private final Random random = new Random();public MyThread(String name, Phaser phaser) {super(name);this.phaser = phaser;}@Overridepublic void run() {System.out.println(getName() + " - 开始向公司出发");slowly();System.out.println(getName() + " - 已经到达公司");// 到达同步点,等待其他线程phaser.arriveAndAwaitAdvance();System.out.println(getName() + " - 开始笔试");slowly();System.out.println(getName() + " - 笔试结束");// 到达同步点,等待其他线程phaser.arriveAndAwaitAdvance();System.out.println(getName() + " - 开始⾯试");slowly();System.out.println(getName() + " - ⾯试结束");}private void slowly() {try {Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}
package main5;import java.util.concurrent.Phaser;/****/
public class Main {public static void main(String[] args) {Phaser phaser = new Phaser(5);for (int i = 0; i < 5; i++) {new MyThread("线程-" + (i + 1), phaser).start();}//由于对应的功能是一起的,所以在唤醒上通常会相通,那么当对应的线程操作唤醒时,那么这里也会解除phaser.awaitAdvance(phaser.getPhase()); //一般我们固定这样写,写成0也行,默认0就是该对应的值(一般该值代表对于以及操作完成的对应值,通常需要判断使得解除阻塞)System.out.println(1);}
}
arriveAndAwaitAdance()相当于就是 arrive()与 awaitAdvance(int)的组合说明,表示"我自己已到达这个同步点,同时要 等待所有人都到达这个同步点,然后再一起前行",在后面说明源码时,可以只需要了解,因为只是大致的说明
由于博文(客)字数限制的原因,请到下一篇(章)博文(客)学习

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_255730.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开关电源-一种方便快捷计算开关电源环路参数的方法及实例

一种方便快捷计算开关电源环路参数的方法及实例 接上文《技术实例 | 开关电源环路测量时&#xff0c;注入信号的幅值对测量结果的影响》&#xff0c;得到电流环功率级的开环传递函数后&#xff0c;我们通过matlab的sisotool工具箱自动计算出了电流环路补偿器的传递函数C&#…

三层交换机【实验】

目录 1、要求&#xff1a; 2、拓扑&#xff1a; 3、创建vlan和端口定义并划入vlan&#xff1a; 4、创建以太网中继Eth-Trunk使sw1和sw2的相互冗余并且不浪费链路&#xff1a; 5、使用mstp定义组和对应的根&#xff1a; 6、配置网关冗余&#xff1a; 7、核心层的路由的IP配…

云仓仓储的运行模式是什么?

仓库能够简单地定义为一个规划空间&#xff0c;通常是一个用于处置和贮存货物的大型商业建筑。因而&#xff0c;仓储是指在这样一个规划空间中存储和处置货物所触及的一切过程。仓库中常见的货物包括&#xff1a;;机械零配件、建筑资料、废品农产品、家具和电子产品。仓库中的一…

Fluid-数据缓存亲和性调度原理解析

前言在Fluid中&#xff0c;Dataset资源对象中所定义的远程文件是可被调度的&#xff0c;这意味着你能够像管理你的Pod一样管理远程文件缓存在Kubernetes集群上的存放位置。另外&#xff0c;Fluid同样支持对于应用的数据缓存亲和性调度&#xff0c;这种调度方式将应用(e.g. 数据…

二进制部署K8S集群

目录 一、架构图 二、部署步骤 1、实验环境 2、操作系统初始化配置 3、部署 docker引擎 4、部署 etcd 集群 5、部署 Master 组件 一、架构图 二、部署步骤 1、实验环境 服务器类型IP地址master192.168.80.5node01192.168.80.8node02192.168.80.9 2、操作系统初始化配置…

SpringBoot整合Mybatis的核心原理

0. 前言&#xff1a;1. 自动配置类MybatisAutoConfiguration&#xff1a;1.1. SqlSessionFactory的生成&#xff1a;1.2. Mapper的扫描和代理生成&#xff1a;1.2.1. MapperScannerConfigurer1.2.2. MapperFactoryBean1.2.3. getMapper生成代理对象2. 小结&#xff1a;0. 前言&…

3D模型深度生成网络【ShapeAssembly】

推荐&#xff1a;使用 NSDT场景设计器 快速搭建 3D场景。 我们提出了一个深度生成模型&#xff0c;该模型学习在ShapeAssembly中编写新颖的程序&#xff0c;ShapeAssembly是一种用于建模3D形状结构的特定领域语言。 执行 ShapeAssembly 程序会生成一个由部件代理长方体的分层连…

2023,考个软考中级证书稳妥深圳入户,5月考试8月办入户

最新消息&#xff01;最新消息&#xff01;最新消息&#xff01; 2023年2月8日&#xff0c;深圳市发展和改革委员会深圳市公安局深圳市人力资源和社会保障局关于印发《深圳市积分入户办法》的最新通知↓ 来源《深圳市发展和改革委员会》 该积分入户将于2023年2月15日正式实施&…

Prometheus监控Java-JMX

一、什么是 JMX Exporter ? JMX Exporter 利用 Java 的 JMX 机制来读取 JVM 运行时的一些监控数据&#xff0c;然后将其转换为 Prometheus 所认知的 metrics 格式&#xff0c;以便让 Prometheus 对其进行监控采集。 那么&#xff0c;JMX 又是什么呢&#xff1f;它的全称是&a…

ChatGPT 支持的搜索引擎 Bing 究竟什么样?

微软于2月8日北京时间凌晨在 Redmond 线下举办一场媒体活动&#xff0c;围绕微软的产品以及 AI&#xff0c;公布最新消息。这里我们先回顾一下微软在 AI 上的布局。 2019年&#xff0c;微软向 OpenAI 投资10亿美元&#xff0c;成为了 OpenAI 紧密的合作伙伴&#xff0c;而微软…

Java中动态调用setter以及getter

0x00 前言 对于非专业程序员的安全人员来说&#xff0c;因为没有代码项目的积累&#xff0c;很多知识体系都不完善&#xff0c;所以有必要在一些常用的内容进行学习的总结。 在很多的调用链中都会用到**“动态调用setter以及getter”**这个知识点&#xff0c;比如经典的CB链&a…

Python语言零基础入门教程(九)

Python pass 语句 Python pass 是空语句&#xff0c;是为了保持程序结构的完整性。 pass 不做任何事情&#xff0c;一般用做占位语句。 Python 语言 pass 语句语法格式如下&#xff1a; pass测试实例&#xff1a; #!/usr/bin/python # -*- coding: UTF-8 -*- # 输出 Pytho…

线程池小结

什么是线程池 线程池其实就是一种多线程处理形式&#xff0c;处理过程中可以将任务添加到队列中&#xff0c;然后在创建线程后自动启动这些任务。这里的线程就是我们前面学过的线程,这里的任务就是我们前面学过的实现了Runnable或Callable接口的实例对象; 为什么使用线程池 …

leetcode刷题之背包问题(01背包)

01 背包 概念&#xff1a;有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i]weight[i]weight[i]&#xff0c;得到的价值是value[i]value[i]value[i]。每件物品只能用一次&#xff0c;求解将哪些物品装入背包里物品价值总和最大。 方法1&#xff1a;暴力回溯…

FPGA时序约束与分析 --- 实例教程(1)

注意&#xff1a; 时序约束辅助工具或者相关的TCL命令&#xff0c;都必须在 open synthesis design / open implemention design 后才能有效运行。 1、时序约束辅助工具 2、查看相关时序信息 3、一般的时序约束顺序 1、 时序约束辅助工具&#xff08;1&#xff09;时序约束编辑…

微服务负载均衡器Ribbon

目录 什么是Ribbon 客户端的负载均衡 服务端的负载均衡 常见负载均衡算法 Nacos使用Ribbon 添加LoadBalanced注解 修改controller Ribbon负载均衡策略 IRule AbstractLoadBalancerRule 修改默认负载均衡策略 自定义负载均衡策略 配置自定义的策略 饥饿加载 Ribbo…

【golang】1797. 设计一个验证系统

你需要设计一个包含验证码的验证系统。每一次验证中&#xff0c;用户会收到一个新的验证码&#xff0c;这个验证码在 currentTime 时刻之后 timeToLive 秒过期。如果验证码被更新了&#xff0c;那么它会在 currentTime &#xff08;可能与之前的 currentTime 不同&#xff09;时…

C语言经典编程题100例(1-20)

1、练习2-1 Programming in C is fun!本题要求编写程序&#xff0c;输出一个短句“Programming in C is fun!”。输入格式:本题目没有输入。输出格式:在一行中输出短句“Programming in C is fun!”。代码&#xff1a;#include<stdio.h> int main() {printf("Progra…

Java中类是什么

类(class)是构造对象的模板或蓝图。 我们可以将类想象成制作小甜饼的模具&#xff0c;将对象想象为小甜饼。由类构造(construct)对象的过程称为创建类的实例(instance)。 正如前面所看到的&#xff0c;用Java 编写的所有代码都位于某个类里面。 标准 Java 库提供了几千个类&a…

bcript 算法

一、简介 今天要给大家介绍的一种“加密”算法叫做 bcrypt&#xff0c;bcrypt 是由 Niels Provos 和 David Mazires 设计的密码哈希函数&#xff0c;他是基于 Blowfish 密码而来的&#xff0c;并于 1999 年在 USENIX 上提出。 除了加盐来抵御 rainbow table 攻击之外&#xf…