Callable接口
1、为什么使用Callable接口
Thread和Runnable
都有的缺点:启动子线程的线程 不能获取子线程的执行结果,也不能捕获子线程的异常
从java5开始,提供了Callable接口,是Runable接口的增强版。用Call()方法作为线程的执行体,增强了之前的run()方法。因为call方法可以有返回值,也可以声明抛出异常。
1.Runnable方式创建:在主线程里捕获子线程的异常? 不可以。
try {//Runnable方式:主线程无法捕获子线程执行的异常new Thread(()->{int i = 1/0;String result = "jieguo";}).start(); } catch (Exception e) {System.out.println("主线程获取到了子线程异常:"+ e.getMessage()); }
根据控制台打印,在main线程里并没有捕获到子线程出现的异常
2、使用Callable创建线程并运行
Callable接口中注意点: 可以使用泛型<V>指定返回值类型,并且call方法可以抛出异常
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}
使用Callable创建的子线程需要借助FutureTask对象来执行它的call方法
无论哪种方式创建多线程 都必须借助Thread对象的start方法启动线程,Thread只能接受Runnable对象: thread.run()-> runnable.run()
public static void main(String[] args) {Callable<String> callable = ()->{System.out.println("callable的call方法执行了.....");return "hehe....";};//juc包中提供了FutureTask 间接实现了Runnable接口,并实现了run方法new Thread(new FutureTask<String>(callable)).start(); }
1.Thread 调用了start方法后,系统CPU调度执行线程的run方法,run方法中判断 传入的runnable对象如果不为空则调用它的run方法。
2.我们传入的runnable对象是FutureTask的对象,所以调用的是FutureTask的run方法
3.FutureTask的run方法执行时,调用了我们传入的Callable对象的call方法执行 并接受返回的结果总的来说:就是移花接木,FutureTask间接实现了Runnable接口 并实现了run方法:run方法中调用了Callable的call方法
java.util.concurrent.FutureTask类
1.juc包中提供了FutureTask 间接实现了Runnable接口,并实现了run方法
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V> {void run(); }
2.FutureTask的run方法执行时的结果 和异常 会通过FutureTask的成员属性接收,并通过一个布尔类型的标记记录执行是否有异常
//结果和异常使用同一个变量接收 private Object outcome;
3.FutureTask中会在run方法执行结束时 将线程执行的状态从0(线程还未执行完毕)改为1(线程执行结束)
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1;
3、FutureTask的get()方法
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。
callable可以返回方法的执行结果:通过 futureTask的get方法 阻塞获取返回结果
public static void main(String[] args) {//Callable:获取执行结果+捕获异常Callable<String> callable = ()->{System.out.println(Thread.currentThread().getName()+"执行了call方法..");return "haha....";};//FutureTask它的泛型跟Callable的泛型要一样,因为都是代表该子线程的执行结果类型FutureTask<String> futureTask = new FutureTask<String>(callable);new Thread(futureTask,"AA").start();try {//获取子线程执行的结果 调用get方法会阻塞主线程,所以一定要将获取子线程结果的操作写在方法的最后String result = futureTask.get();System.out.println("主线程获取到的子线程结果:"+futureTask.get());} catch (Exception e) {//捕获子线程执行的异常System.out.println(Thread.currentThread().getName()+" 获取到子线程的异常:"+e.getMessage());} }
在使用futureTask的get方法时,要去捕获异常,可以获取子线程的异常
public static void main(String[] args) {//Callable:获取执行结果+捕获异常Callable<String> callable = ()->{int i = 1/0;System.out.println(Thread.currentThread().getName()+"执行了call方法..");return "haha....";};//FutureTask它的泛型跟Callable的泛型要一样,因为都是代表该子线程的执行结果类型FutureTask<String> futureTask = new FutureTask<String>(callable);new Thread(futureTask,"AA").start();try {//获取子线程执行的结果 调用get方法会阻塞主线程,所以一定要将获取子线程结果的操作写在方法的最后String result = futureTask.get();System.out.println("主线程获取到的子线程结果:"+futureTask.get());} catch (Exception e) {//捕获子线程执行的异常System.out.println(Thread.currentThread().getName()+" 获取到子线程的异常:"+e.getMessage());}}
控制台打印:
为什么说get方法会阻塞主线程呢?
//以下为FutureTask的源码
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}
开发时:一定要把获取子线程结果的位置放在方法的最后
4、FutureTask的复用
只计算一次,FutureTask会复用之前计算过得结果
public static void main(String[] args) {//FutureTask复用问题: 执行异步任务时 为了提高效率它会缓存执行结果FutureTask<String> futureTask = new FutureTask<>(() -> {System.out.println(Thread.currentThread().getName()+"...");return "haha..";});new Thread(futureTask,"AA").start();new Thread(futureTask,"BB").start();}
执行异步任务时 为了提高效率它会缓存执行结果。。所以BB线程是从缓存拿的,并没有去走call方法
不想复用之前的计算结果。怎么办?再创建一个FutureTask对象即可
public static void main(String[] args) {//FutureTask复用问题: 执行异步任务时 为了提高效率它会缓存执行结果FutureTask<String> futureTask = new FutureTask<>(() -> {System.out.println(Thread.currentThread().getName()+"...");return "haha..";});FutureTask<String> futureTask2 = new FutureTask<>(() -> {System.out.println(Thread.currentThread().getName()+"...");return "haha..";});new Thread(futureTask,"AA").start();new Thread(futureTask2,"BB").start();
}
5、Callable接口与Runnable接口的区别
老师版:
1、callable有返回值 可以抛出异常
2、runnable可以直接通过Thread启动
3、callable需要通过FutureTask来接收,再由Thread启动
4、callable的任务方法时call(),runnable的是run()
笔记版:
相同点:都是接口,都可以编写多线程程序,都采用Thread.start()启动线程
不同点:
具体方法不同:一个是run,一个是call
Runnable没有返回值;Callable可以返回执行结果,是个泛型
Callable接口的call()方法允许抛出异常;Runnable的run()方法异常只能在内部消化,不能往上继续抛
它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。
阻塞队列
1、队列 queue 和 栈 stack
queue: 先进先出,后进后出。怎么容易理解呢:左进右出
线程池使用、mq也使用了
---------------------
Stack: 特点 先进后出 后进先出
public class Stack<E> extends Vector<E>{// Stack就是一个集合类 是一个线程安全的集合类//1、入栈方法public E push(E item) {addElement(item);return item;}// 数组:连续的一块内存,按照添加的索引先后顺序有序// Vector中的方法:添加元素到 elementData元素数组中public synchronized void addElement(E obj) {modCount++;ensureCapacityHelper(elementCount + 1);elementData[elementCount++] = obj;//将元素添加到elementData数组的最后一个位置}//2、出栈方法public synchronized E pop() {E obj;int len = size();obj = peek();removeElementAt(len - 1);//将获取到的最后一个位置元素删除return obj;}// 获取数组最后一个索引的元素 返回public synchronized E peek() {int len = size();//获取元素个数if (len == 0)throw new EmptyStackException();return elementAt(len - 1);//数组长度-1 也就是数组最后一个位置的元素}
}
扩展
方法栈:java代码执行时,每个线程jvm会为他创建一个栈来存储线程调用方法的执行过程数据
线程方法栈。每个方法都是一个栈帧
2、阻塞队列 BlockingQueue
线程池用来存不能及时处理的任务的数据结构
BlockingQueue:阻塞队列,看名字就知道可以解决并发问题。
在开发中我们可以不用关心向队列中添加元素的线程 如果队列满了 它如何阻塞等待 获取队列中元素的线程 如果队列空了 它如何阻塞等待 以及阻塞的线程如何被唤醒
BlockingQueue是一个接口,继承Queue接口,Queue接口继承 Collection接口
BlockingQueue接口主要有以下7个实现类:
ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
LinkedTransferQueue:由链表组成的无界阻塞队列。
LinkedBlockingDeque:由链表组成的双向阻塞队列。
线程池使用阻塞队列
- ArrayBlockingQueue: 数组阻塞队列
- LinkedBlockingQueue:链表阻塞队列
- SynchronousQueue:同步阻塞队列(不存储元素)
ArrayBlockingQueue:创建时手动指定的长度就是该队列的最大的长度
LinkedBlockingQueue:
默认长度:Integer.MAX_VALUE 最多存储21亿左右的元素
阻塞队列:添加元素 获取元素 移除元素的方法有4套,根据方法是否返回结果 是否抛出异常 是否可以阻塞 是否可以阻塞超时划分
抛出异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
获取 | element() | peek() | 不可用 | 不可用 |
要是看着懵,是因为没继续看下去,看完再回来看就一目了然了
3、抛出异常的方法
add正常执行返回true,element(不删除)和remove正常执行会返回阻塞队列中的第一个元素
- 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full
- 当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException
- 当阻塞队列空时,再调用element检查元素会抛出NoSuchElementException
add():添加元素失败抛出异常
public static void main(String[] args) {//数组阻塞队列初始化时需要传入 初始化数组的长度ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);System.out.println("1:"+queue.add("a"));System.out.println("2:"+queue.add("b"));System.out.println("3:"+queue.add("c"));System.out.println("4:"+queue.add("d")); }
----------------------
remove():移除成功返回移除的数据 移除失败抛出异常
public static void main(String[] args) {//数组阻塞队列初始化时需要传入 初始化数组的长度ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);System.out.println("1:"+queue.add("a"));System.out.println("2:"+queue.add("b"));System.out.println("3:"+queue.add("c"));//移除成功返回移除的数据System.out.println(queue.remove());System.out.println(queue.remove());System.out.println(queue.remove());//移除失败抛出异常System.out.println(queue.remove()); }
--------------------------element():获取最先添加的元素 获取不到抛出异常
public static void main(String[] args) {//数组阻塞队列初始化时需要传入 初始化数组的长度ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);//获取最先添加的元素,即左进右出情况下,获取最右边的元素,获取不到抛出异常System.out.println(queue.element());System.out.println("1:"+queue.add("a"));}
4、返回特殊值的方法
offer()/poll()/peek()
offer() 插入方法,成功ture失败false
public static void main(String[] args) {//数组阻塞队列初始化时需要传入 初始化数组的长度ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);//添加成功ture,添加失败falseSystem.out.println(queue.offer("a"));System.out.println(queue.offer("b"));System.out.println(queue.offer("c"));System.out.println(queue.offer("d"));}
-------------------------------
poll() 移除方法,成功返回出队列的元素,队列里没有就返回null
public static void main(String[] args) {//数组阻塞队列初始化时需要传入 初始化数组的长度ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);//添加成功ture,添加失败falseSystem.out.println(queue.offer("a"));System.out.println(queue.offer("b"));System.out.println(queue.offer("c"));//移除元素成功返回出队列的元素,移除失败返回nullSystem.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll()); }
-------------------------------
peek() 获取方法,成功返回队列中的元素,没有返回null
public static void main(String[] args) {//数组阻塞队列初始化时需要传入 初始化数组的长度ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);//获取成功返回队列中的最右边的元素,没有返回nullSystem.out.println(queue.peek());//添加成功ture,添加失败falseSystem.out.println(queue.offer("a"));System.out.println(queue.offer("b"));System.out.println(queue.offer("c"));//获取成功返回队列中的最右边的元素,没有返回nullSystem.out.println(queue.peek()); }
5、阻塞等待方法
put() / take()
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
当阻塞队列满时,再往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出 当阻塞队列空时,再从队列里take元素,队列会一直阻塞消费者线程直到队列可用
public static void main(String[] args) throws InterruptedException {//数组阻塞队列初始化时需要传入 初始化数组的长度ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);//put给队列中从添加元素,左进右出queue.put("a");queue.put("b");queue.put("c");System.out.println("take前,队列大小:"+queue.size());//take取走队列中最右面的元素System.out.println(queue.take());System.out.println("take后,队列大小:"+queue.size());
}
6、超时等待方法
offer( timeout) /poll(timeout)
- offer( timeout):阻塞等待添加元素,成功返回true 超时失败返回false
- poll(timeout): 超时不能移除元素返回null
public static void main(String[] args) throws InterruptedException {//数组阻塞队列初始化时需要传入 初始化数组的长度ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);//poll(timeout): 超时不能移除元素返回nullSystem.out.println(queue.poll(3, TimeUnit.SECONDS));//offer( timeout):阻塞等待添加元素,成功返回true 超时失败返回falseSystem.out.println(queue.offer("aa", 3, TimeUnit.SECONDS));System.out.println(queue.offer("cc", 3, TimeUnit.SECONDS));System.out.println(queue.offer("bb", 3, TimeUnit.SECONDS));System.out.println(queue.offer("dd", 3, TimeUnit.SECONDS));
}
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。
7、阻塞等待方法的源码
put() / take()
请欣赏我的画作,以我的理解应该是这么画的。。。
put:
添加元素时使用ReentrantLock加锁
如果队列的长度等于队列中存入元素的个数代表队列已满,当前线程使用notFull.await()进入阻塞等待状态//以下为ArrayBlockingQueue的源码 public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();} }
如果队列的长度不等于队列中存入元素的个数代表队列未满,当前线程将元素添加到队列数组中,notEmpty.signal()唤醒等待消费队列中元素的线程
//以下为ArrayBlockingQueue的源码 private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal(); }
take:
移除元素时使用Lock加锁
如果队列中元素个数为0,代表队列是空的,当前线程使用notEmpty.await()让自己等待//以下为ArrayBlockingQueue的源码 public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();} }
如果队列中元素个数>0,队列中有元素,当前线程获取元素返回 并调用notFull.signal()唤醒向队列添加元素阻塞的线程
//以下为ArrayBlockingQueue的源码 private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x; }