教程

狂神说java官网

B站视频链接

多线程回顾

什么是JUC

java.util工具包。

进程和线程

一个进程可以包含多个线程,至少包含一个。Java默认有2个线程:main、GC。

多线程,对于Java而言:ThreadRunnableCallable

Java真的可以开启线程吗?开不了!Java是调用本地方法,底层是C++。Java无法直接操作硬件。

并发和并行:

  • 并发:多线程操作同一个资源。CPU一核,模拟出来多条线程,快速交替。
  • 并行:多个人一起行走。CPU多核,多个线程可以同时执行。
//获取CPU的核数
System.out.println(Runtime.getRuntime().availableProcessors());

并发编程的本质:充分利用CPU的资源

// 线程的几个状态
public enum State {
// 新生
NEW,

// 运行
RUNNABLE,

// 阻塞
BLOCKED,

// 等待,死等
WAITING,

// 超时等待
TIMED_WAITING,

// 终止
TERMINATED;
}

wait和sleep的区别

  1. 来自不同的类:wait=>Object,sleep=>Thread。

  2. 关于锁的释放:wait会释放锁,sleep不会。

  3. 使用的范围不同:wait必须在同步代码块中,sleep可以在任何地方睡。

Lock锁

公平锁:十分公平,先来后到

非公平锁:不公平,可以插队(默认)

Synchronized和Lock的区别

/**
* 真正的多线程开发,降低耦合性
* 线程就是一个单独的资源类,没有任何附属的操作
*/
public class SaleTicketDemo1 {
public static void main(String[] args) {
// 并发:多线程操作同一个资源类,把资源类丢入线程
Ticket ticket = new Ticket();

// lambda表达式 ()->{}
new Thread(() -> {
for (int i = 0; i < 60; i++) ticket.sale();
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 60; i++) ticket.sale();
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 60; i++) ticket.sale();
}, "C").start();
}
}

// 资源类OOP
class Ticket {
private int number = 30;

public synchronized void sale() {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + " sales NO." + (number--) + ",剩余" + number);
}
}
}
public class SaleTicketDemo2 {
public static void main(String[] args) {
// 并发:多线程操作同一个资源类,把资源类丢入线程
Ticket2 ticket = new Ticket2();

// lambda表达式 ()->{}
new Thread(() -> {
for (int i = 0; i < 60; i++) ticket.sale();
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 60; i++) ticket.sale();
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 60; i++) ticket.sale();
}, "C").start();
}
}

// 资源类OOP
class Ticket2 {
private int number = 30;

Lock lock = new ReentrantLock();

public void sale() {
lock.lock();
try {
// 业务代码
if (number > 0) {
System.out.println(Thread.currentThread().getName() + " sales NO." + (number--) + ",剩余" + number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
  1. Synchronized是内置Java关键字,Lock是一个Java类。
  2. Synchronized无法判断获取锁的状态,Lock可以判断是否获取到了锁。
  3. Synchronized会自动释放锁,Lock必须手动释放锁,如果不释放,死锁
  4. Synchronized中,线程A获取锁并阻塞,线程B就只能死等。Lock锁就不一定会等待下去。tryLock()
  5. Synchronized可重入锁,不可中断的,非公平。Lock,可重入,可判断锁,可设置公平/非公平锁。
  6. Synchronized适合锁少量的代码同步问题,Lock适合锁大量的同步代码。

生产者和消费者问题

面试常见:单例模式、排序算法、生产者和消费者、死锁

synchronized版

/**
* 线程之间的通信问题 等待唤醒和通知唤醒
* 口诀:判断等待,业务,通知。
*/
public class TestNum {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}

//数字 资源类
class Data {
private int num = 0;

public synchronized void increment() throws InterruptedException {
//用while而不是if,防止虚假唤醒问题。
while (num != 0) {
this.wait();//wait
}
num++;
System.out.println(Thread.currentThread().getName() + "=>" + num);
this.notifyAll();//通知其他线程

}

public synchronized void decrement() throws InterruptedException {
while (num == 0) {
this.wait();//wait
}
num--;
System.out.println(Thread.currentThread().getName() + "=>" + num);
this.notifyAll();//通知其他线程
}
}

JUC版

await():使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。该方法和Object的wait()类似。

awaitUninterruptibly():该方法和await()相同,但是不响应中断。

await(long time, TimeUnit unit):当前线程等待,直到线程被唤醒或者中断,或者等待时间耗尽,这里第二个参数就是等待时间单位。

awaitNanos(long nanosTimeout:当前线程等待,直到线程被唤醒或者中断,或者等待时间耗尽,等待的时间单位是固定的为纳秒(十亿分之一秒)。

awaitUntil(Date deadline):前线程等待,直到线程被唤醒或者中断,或者到达具体的截至日期。

signal():唤醒一个等待的线程。

signalAll():唤醒所有等待的线程。

public class TestNum2 {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "D").start();
}
}


//数字 资源类
class Data2 {
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void increment() {
lock.lock();
try {
//业务代码
while (num != 0) {
condition.await();//等待
}
num++;
System.out.println(Thread.currentThread().getName() + "=>" + num);
condition.signalAll();//唤醒全部
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void decrement() {
lock.lock();
try {
while (num == 0) {
condition.await();//等待
}
num--;
System.out.println(Thread.currentThread().getName() + "=>" + num);
condition.signalAll();//唤醒全部
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

问题:线程仍然是随机顺序运行。若想有序执行ABCD,就要使用Condition精准通知和唤醒线程。

public class TestNum3 {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
data.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
data.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
data.printC();
}
}, "C").start();
}
}

//A->B->C->A循环唤醒
class Data3 {
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
private int num = 1;

public void printA() {
lock.lock();
try {
while (num != 1) {
condition1.await();//等待应该总是出现在循环中,防止虚假唤醒
}
System.out.println(Thread.currentThread().getName() + "=>" + num);
num++;
condition2.signal();//唤醒指定的线程
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printB() {
lock.lock();
try {
while (num != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "=>" + num);
num++;
condition3.signal();//唤醒指定的线程
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printC() {
lock.lock();
try {
while (num != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "=>" + num);
num = 1;
condition1.signal();//唤醒指定的线程
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

八锁问题

八锁问题,就是关于锁的八个问题

参考链接:经典8锁问题–助你彻底搞懂锁的概念

场景一

/**
* 标准情况下 是先sendMsg() 还是先call()?
* 答案:sendMsg
* 解释:被 synchronized 修饰的方式,锁的对象是方法的调用者
* 两个方法调用的对象是同一个,先调用的先执行!
*/
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone1 phone = new Phone1();
new Thread(phone::sendMsg, "A").start();
TimeUnit.SECONDS.sleep(3);
new Thread(phone::call, "B").start();
}
}

class Phone1 {
public synchronized void sendMsg() {
System.out.println("sendMsg");
}

public synchronized void call() {
System.out.println("call");
}
}

场景二

/**
* sendMsg内休眠两秒的情况下 是先sendMsg() 还是先call()?
* 答案:sendMsg
* 解释:被 synchronized 修饰的方式,锁的对象是方法的调用者
* 两个方法调用的对象是同一个,先调用的先执行!
*/
public class Test2 {
public static void main(String[] args) throws InterruptedException {
Phone2 phone = new Phone2();
new Thread(() -> {
try {
phone.sendMsg();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
TimeUnit.SECONDS.sleep(2);
new Thread(phone::call, "B").start();
}
}

class Phone2 {
public synchronized void sendMsg() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);//休眠两秒
System.out.println("sendMsg");
}

public synchronized void call() {
System.out.println("call");
}
}

场景三

/**
* 被synchronized修饰的方法和普通方法 是先sendMsg() 还是先call()?
* 答案:call(普通方法)
* 解释:普通方法没有 synchronized 修饰,不是同步方法,不受锁的影响!
*/
public class Test3 {
public static void main(String[] args) throws InterruptedException {
Phone3 phone = new Phone3();
new Thread(() -> {
try {
phone.sendMsg();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
TimeUnit.SECONDS.sleep(2);
new Thread(phone::call, "B").start();
}
}

class Phone3 {
public synchronized void sendMsg() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);//休眠两秒
System.out.println("sendMsg");
}

public void call() {
System.out.println("call");
}
}

场景四

/**
* 两个不同的phone对象 先执行sendEmail() 还是callPhone()?
* 答案:call
* 解释:被synchronized 修饰的不同方法 锁的对象是调用者
* 这里锁的是两个对象(两个不同的调用者),所以互不影响,根据延迟得出结果
*/
public class Test4 {
public static void main(String[] args) throws InterruptedException {
//两个对象
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
new Thread(() -> {
try {
phone1.sendMsg();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(phone2::call, "B").start();
}
}

class Phone4 {
public synchronized void sendMsg() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);//休眠两秒
System.out.println("sendMsg");
}

public synchronized void call() {
System.out.println("call");
}
}

场景五

/**
* 两个静态同步方法 都被synchronized 修饰 是先sendEmail() 还是callPhone()?
* 答案:sendMsg
* 解释:只要方法被 static 修饰,锁的就是 Class,这个全局唯一!
* 所以说这里是同一个锁,并不是因为synchronized 这里程序会从上往下依次执行
*/
public class Test5 {
public static void main(String[] args) throws InterruptedException {
Phone5 phone = new Phone5();
new Thread(() -> {
try {
phone.sendMsg();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.call();
}, "B").start();
}
}

class Phone5 {
public static synchronized void sendMsg() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);//休眠两秒
System.out.println("sendMsg");
}

public static synchronized void call() {
System.out.println("call");
}
}

场景六

/**
* 两个静态同步方法 两个不同的phone对象 是先sendEmail() 还是callPhone()?
* 答案:sendMsg
* 解释:只要方法被 static 修饰,锁的就是 Class,这个全局唯一!
* 所以说这里还是同一个锁,不因为是两个不同的phone对象而发生变化
*/
public class Test6 {
public static void main(String[] args) throws InterruptedException {
Phone6 phone1 = new Phone6();
Phone6 phone2 = new Phone6();
new Thread(() -> {
try {
phone1.sendMsg();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}

class Phone6 {
public static synchronized void sendMsg() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);//休眠两秒
System.out.println("sendMsg");
}

public static synchronized void call() {
System.out.println("call");
}
}

场景七

/**
* 一个静态同步方法 一个静态普通方法 是先sendEmail() 还是callPhone()?
* 答案:call
* 解释:被static修饰锁的是class模板, 而synchronized锁的是调用的对象
* 这里两个锁互不影响,按时间先后执行
*/
public class Test7 {
public static void main(String[] args) throws InterruptedException {
Phone7 phone = new Phone7();
new Thread(() -> {
try {
phone.sendMsg();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.call();
}, "B").start();
}
}

class Phone7 {
public static synchronized void sendMsg() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);//休眠两秒
System.out.println("sendMsg");
}

public static void call() {
System.out.println("call");
}
}

场景八

/**
* 一个静态同步方法 一个静态普通方法 两个对象 是先sendEmail() 还是callPhone()?
* 答案:call
* 解释:被static修饰的锁的就是整个class模板
* 这里一个锁的是class模板 一个锁的是调用者 所以两个锁互不影响
*/
public class Test8 {
public static void main(String[] args) throws InterruptedException {
Phone8 phone1 = new Phone8();
Phone8 phone2 = new Phone8();
new Thread(() -> {
try {
phone1.sendMsg();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}

class Phone8 {
public static synchronized void sendMsg() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);//休眠两秒
System.out.println("sendMsg");
}

public static void call() {
System.out.println("call");
}
}

小结

非静态的,锁的是一个对象(一个手机);静态的,锁的是一个模板。

集合类不安全

CopyOnWriteArrayList

/**
* 并发下,ArrayList不安全的
* 解决方案:
* 1、List<String> list = new Vector<>();
* 2、List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3、List<String> list = new CopyOnWriteArrayList<>();
* CopyOnWrite 写入时复制 COW 计算机程序设计领域的一种优化策略
* 读写分离 在写入的时候 避免覆盖 造成数据问题
* CopyOnWriteArrayList的add是用lock锁
*/
public class ListTest {
public static void main(String[] args) {
//List<String> list = new ArrayList<>();
//List<String> list = new Vector<>();
//List<String> list = Collections.synchronizedList(new ArrayList<>());
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 100; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}

CopyOnWriteArraySet

/**
* 普通set会ConcurrentModificationException
* 解决方案:
* 1、Set<String> set = Collections.synchronizedSet(new HashSet<>());
* 2、Set<String> set = new CopyOnWriteArraySet<>();
*/
public class SetTest {
public static void main(String[] args) {
//Set<String> set = new HashSet<>();
//Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 100; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}

HashSet的底层就是HashMap。看源码可看出来。

public HashSet() {
map = new HashMap<>();
}

//本质就是map key是无法重复的!
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}

//PRESENT是不变的值
private static final Object PRESENT = new Object();

ConcurrentHashMap

ConcurrentHashMap的实现原理:ConcurrentHashMap实现原理及源码分析

/**
* new HashMap<>()默认等价于new HashMap<>(16,0.75);
*/
public class MapTest {
public static void main(String[] args) {
//Map<String, String> map = new HashMap<>();
Map<String, String> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 100; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}

Callable

  1. 可以有返回值
  2. 可以抛出异常
  3. 方法不同,run()/call()
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(new MyThread());
new Thread(futureTask).start();
String S = (String) futureTask.get();
System.out.println(S);
}
}

class MyThread implements Callable<String> {
@Override
public String call() {
System.out.println("call");
return "fsdfsdfs";
}
}

futureTask.get()方法可能产生阻塞,或者使用异步通信来处理。

细节:

  1. 第二次再调用FutureTask对象所持有的线程会直接结束对应线程,直接调用结果。
  2. 结果可能需要等待,会阻塞。

常用的辅助类(必会)

CountDownLatch

countDownLatch.countDown();//数量减一

countDownLatch.await();//等待计数器归零,然后再想下执行

每次有线程调用countDown(),计数器就会减一,假设计数器为0,countDownLatch.await()就会被唤醒。

public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);//总数是6
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName());
countDownLatch.countDown();//数量减一
}, String.valueOf(i)).start();
}
countDownLatch.await();//等待计数器归零,然后再想下执行
System.out.println("结束");
}
}

CyclicBarrier

public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("结束");
});
for (int i = 1; i <=7; i++) {
int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"-->"+temp);
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}

加法计数器,一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务。所有线程会等待全部线程到达栅栏之后才会继续执行,并且最后到达的线程会完成 Runnable 的任务。

CyclicBarrier 与 CountDownLatch 区别

  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
  • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。

Semaphore

Semaphore:信号量

public class SemaphoreDemo {
public static void main(String[] args) {
//允许的线程数量
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();//acquire 得到
System.out.println(Thread.currentThread().getName() + "in");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "out");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();//release 释放
}

}, String.valueOf(i)).start();
}
}
}

原理:

semaphore.acquire();获得,如果已经满了,就等待有空位被释放出来为止。

semaphore.release();释放,将当前的信号量释放+1,然后唤醒等待的线程。

作用:多个共享资源互斥的使用;并发限流,控制最大线程数。

读写锁ReadWriteLock

读可以被多线程同时读,写的时候只能有一个线程写。

public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//写入
for (int i = 1; i <= 5; i++) {
int finalI = i;
new Thread(() -> myCache.put(finalI + "", finalI + ""), String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 5; i++) {
int finalI = i;
new Thread(() -> myCache.get(finalI + ""), String.valueOf(i)).start();
}
}
}


class MyCache {
private final Map<String, Object> map = new HashMap<>();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

//存,写入的时候希望同时只有一个线程
public void put(String key, Object value) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入" + key + "完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}

//取,所有人都可以读
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取" + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "读取" + key + "完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}

读读可以共存,读写不能共存,写写不能共存。

独占锁(写锁),一次只能被一个线程占有;共享锁(读锁),多个线程可以同时占有。

阻塞队列

写入:如果队列满了,就必须阻塞等待

取:如果队列是空的,就必须阻塞等待生产

Blocking Queue

Blocking Queue不是新东西,在什么情况下会使用阻塞队列:多线程并发处理、线程池。

四组API

  1. 抛出异常
  2. 不会抛出异常
  3. 阻塞等待
  4. 超时等待
方式 抛出异常 不抛出异常 阻塞等待 超时等待
添加 add offer() put() offer(e, timeout, unit)
移除 remove poll() take() poll(timeout, unit)
检测队首元素 element peek
public class Test {
public static void main(String[] args) throws InterruptedException {
test1();
test2();
test3();
test4();
}

public static void test1() {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//System.out.println(blockingQueue.add("d"));//抛出异常

System.out.println(blockingQueue.element());//查看队首元素

System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//System.out.println(blockingQueue.remove());//抛出异常
System.out.println("=====================");
}

public static void test2() {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));//false 不抛出异常

System.out.println(blockingQueue.peek());

System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());//null 不抛出异常
System.out.println("=====================");
}

public static void test3() throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");//无返回值
blockingQueue.put("b");
blockingQueue.put("c");
//blockingQueue.put("d");//队列没有位置了 一直阻塞
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//System.out.println(blockingQueue.take());//没有元素,一直卡着等待
System.out.println("=====================");
}

public static void test4() throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS));//等待超时2s退出
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));//等待超时2s退出
}
}

同步队列SynchronousQueue

和其他的BlockingQueue不一样,SynchronousQueue进去(put)一个元素后,必须先从里面take出来,否则不能继续put。

public class Test {
public static void main(String[] args) {
SynchronousQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + " puts 1");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + " puts 2");
blockingQueue.put("3");
System.out.println(Thread.currentThread().getName() + " puts 3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " takes " + blockingQueue.take());
System.out.println(Thread.currentThread().getName() + " takes " + blockingQueue.take());
System.out.println(Thread.currentThread().getName() + " takes " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}

线程池

程序的运行本质:占用系统的资源!池化技术可以优化资源的使用。如线程池、连接池、内存池、对象池……

池化技术即事先准备好一些资源,有人要用就来拿,用完归还。

好处:

  1. 降低资源的消耗;
  2. 提高响应的速度;
  3. 方便管理

线程复用,可以控制最大并发数,管理线程。

public class Demo01 {
public static void main(String[] args) {
//ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
ExecutorService threadPool =Executors.newFixedThreadPool(5);//一个固定大小的线程池
//Executors.newCachedThreadPool();//可伸缩的线程池
try {
for (int i = 0; i < 10; i++) {
//使用线程池来创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " OK");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//程序结束前关闭线程池
threadPool.shutdown();
}
}
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//最大约为21亿 OOM
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

看源码,本质都是ThreadPoolExecutor,七大参数。

public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
int maximumPoolSize,//最大线程池大小
long keepAliveTime,//空闲资源的存活时间,对等待区有效,对核心线程无效
TimeUnit unit,//超时单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂,创建线程的,一般不动
RejectedExecutionHandler handler//拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

阿里的手册中对此有所体现。

通过ThreadPoolExecutor自定义线程池,有四种不用的处理策略。

/**
* new ThreadPoolExecutor.AbortPolicy()); //线程池满了 还有线程进来,就不处理并抛出异常。
* new ThreadPoolExecutor.CallerRunsPolicy()); //哪来的去哪里 即可能让main线程完成工作
* new ThreadPoolExecutor.DiscardOldestPolicy(); //队列满了就丢掉任务,不会抛出异常
* new ThreadPoolExecutor.DiscardPolicy(); //队列满了,去尝试和最早的竞争,不会抛出异常
*/
public class Demo01 {
public static void main(String[] args) {
//ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//ExecutorService threadPool = Executors.newFixedThreadPool(5);//一个固定大小的线程池
//ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩的线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());//哪来的去哪里
try {
//最大承载:即等待区Queue的capacity + maximumPoolSize
//超出最大承载就会抛出异常
for (int i = 0; i < 10; i++) {
//使用线程池来创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " OK");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//程序结束前关闭线程池
threadPool.shutdown();
}
}
}

池的最大线程数如何定义

  1. CPU密集型,CPU几核就是几,可以保持CPU的效率最高。
  2. IO密集型,判断程序中十分耗IO的线程,大于这个数更好。

四大函数式接口(必会)

函数式接口:只有一个方法的接口

@FunctionalInterface
public interface Runnable {
public abstract void run();
}

Java有很多函数式接口,可以简化编程。

四大函数式接口:

  1. Consumer<T>:消费型接口void accept(T t)
  2. Supplier<T>:供给型接口T get()
  3. Function<T, R>:函数型接口R apply(T t)
  4. Predicate<T>:断言型接口boolean test(T t)

其中T是传入参数,R是返回类型。

Function

/**
* Function 函数型接口,有一个输入参数,有一个输出。
* 只要是 函数式接口,就可以用lambda表达式简化。
*/
public class Demo01 {
public static void main(String[] args) {
//工具类 输出了输入的值
Function<String, String> function = new Function<>() {
@Override
public String apply(String str) {
return str;
}
};
System.out.println(function.apply("fjdiosfijs"));

Function<String, String> function1 = str -> str;
System.out.println(function1.apply("fsdffsdf"));
}
}

Predicate

/**
* 断定型接口 有一个输入参数 返回值为boolean
*/
public class Demo02 {
public static void main(String[] args) {
//判断字符串是否为空
Predicate<String> predicate = new Predicate<>() {
@Override
public boolean test(String str) {
return str.isEmpty();
}
};
System.out.println(predicate.test("abc"));

Predicate<String> predicate1 = String::isEmpty;
System.out.println(predicate.test("aaaaabc"));
}

}

Consumer

/**
* 消费型接口 只有输入 没有返回值
*/
public class Demo03 {
public static void main(String[] args) {
Consumer<String> consumer = new Consumer<>() {
@Override
public void accept(String str) {
System.out.println(str);
}
};
consumer.accept("abcd");

Consumer<String> consumer1 = System.out::println;
consumer1.accept("aaaaaaaaa");
}
}

Supplier

/**
* 供给型接口 没有参数 只有返回值
*/
public class Demo04 {
public static void main(String[] args) {
Supplier<Integer> supplier = new Supplier<>() {
@Override
public Integer get() {
return 1024;
}
};
System.out.println(supplier.get());

Supplier<Integer> supplier1 = () -> 1024;
System.out.println(supplier1.get());
}
}

流式计算

暂时停更