JUC

J.U.C java.util.concurrent

主要分为几个类簇:

 

ReentrantLock

轻量级锁与重量级锁:“轻量级”是相对于使用操作系统互斥量来实现的传统锁而言的

ReentrantLock 和synchronized 都是 可重入锁

可重入 是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响

try {    lock.lock();    // do something} catch (Exception e){    e.printStackTrace();}finally {    lock.unlock();}
lock.tryLock() // 可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
lock.lockInterruptibly(); // 可以通过interrupt()打断

公平锁:每个线程抢占锁的顺序为先后调用lock方法的顺序依次获取锁

非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁,和调用lock方法的先后顺序无关

new ReentrantLock(true); // true为公平锁

为什么需要非公平锁:在竞争激烈的情况下,唤醒等待线程的开销会很高,基本上是一种等待-获取-等待-获取的串行的局面,无法很好地利用并发性

synchronized vs ReentrantLock

应该优先选择synchronized:

类层次结构

classDiagram    Lock <|-- ReentrantLock    ReentrantLock *-- NonfairSync    ReentrantLock *-- Sync    ReentrantLock *-- FairSync    Sync <|-- NonfairSync    Sync <|-- FairSync

同步器

final boolean nonfairTryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    // 代表锁还没被获取    if (c == 0) {        // 设置状态标记获取锁        if (compareAndSetState(0, acquires)) {            // 标记获取锁的线程是当前线程            setExclusiveOwnerThread(current);            return true;        }    }    // 锁已经被获取了,并且获取锁的线程是当前线程    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0) // overflow            throw new Error("Maximum lock count exceeded");        // 设置锁的状态+1        setState(nextc);        return true;    }    // 加入等待队列    return false;}
protected final boolean tryRelease(int releases) {    // 释放锁后线程持有的锁数    int c = getState() - releases;    // 当前的线程没有持有锁    if (Thread.currentThread() != getExclusiveOwnerThread())        throw new IllegalMonitorStateException();    boolean free = false;    // 锁释放完了    if (c == 0) {        // 可以让其他线程获取锁        free = true;        setExclusiveOwnerThread(null);    }    // 如果锁没有释放完,设置state为当前线程持有的锁数    setState(c);    return free;}

FairSync公平锁

protected final boolean tryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    // 锁没有被持有    if (c == 0) {        // 如果当前线程处于同步队列的头节点,则获取锁成功,否则等待        if (!hasQueuedPredecessors() &&            compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }    // 锁被当前线程持有,重入    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0)            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }    // 锁被其他线程持有,等待    return false;}

NonfairSync非公平锁

这里的非公平锁tryAcquire实现就是上面同步器sync中的实现

ReentrantReadWriteLock

当读写锁是写加锁状态时, 在这个锁被解锁之前, 所有试图对这个锁加锁的线程都会被阻塞

当读写锁在读加锁状态时, 所有试图以读模式对它进行加锁的线程都可以得到访问权, 但是如果线程希望以写模式对此锁进行加锁, 它必须直到所有的线程释放锁

class Cache {    private Map<String, Object> cache = new HashMap<>();    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();    public void put(String key, Object value) {        ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();        writeLock.lock();        cache.put(key, value);        writeLock.unlock();    }    public Object get(String key) {        ReentrantReadWriteLock.ReadLock readLock = lock.readLock();        readLock.lock();        Object value = cache.get(key);        readLock.unlock();        return value;    }}

关于读写锁的一些问题:

CountDownLatch(闭锁)

确保某些活动直到其他活动都完成后才继续执行

202031219448

其最大的作用不是为了加锁,而是通过计数达到等待的功能,主要有两种形式的等待:

CountDownLatch latch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {    int finalI = i;    new Thread(()->{        Random random = new Random();        try {            Thread.sleep(random.nextInt(5000));        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("线程"+ finalI +"完成");        lock.latch();    }).start();}latch.await();System.out.println("all mission complete");

await

// await方法的实现是获取共享锁,如果获得后就返回,否则就等待public void await() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}// 这里sync判断能否获得锁的标志是state是否=0protected int tryAcquireShared(int acquires) {    return (getState() == 0) ? 1 : -1;}

countDown

// countDown的实现就是释放一个锁public void countDown() {    sync.releaseShared(1);}// sync判断能否释放锁的标志是 释放这次锁之后,锁的个数为0protected boolean tryReleaseShared(int releases) {    // Decrement count; signal when transition to zero    for (;;) {        int c = getState();        // 已经没有锁了        if (c == 0)            return false;        int nextc = c - 1;        if (compareAndSetState(c, nextc))            return nextc == 0;    }}

CyclicBarrier(栅栏)

闭锁用于等待事件,而栅栏用于等待其他线程

2020312194816

CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("all thread run"));// 调用await的线程会进行等待,直到第5个线程调用await,所有线程才会继续执行for (int i = 0; i < 5; i++) {    new Thread(() -> {        Random rnd= new Random();        try {            Thread.sleep(rnd.nextInt(3000));            System.out.println(Thread.currentThread()+"run");            barrier.await();        } catch (InterruptedException | BrokenBarrierException e) {            e.printStackTrace();        }    }).start();}

phaser

Semaphore(信号量)

用来控制使用资源的主体数量

Semaphore semaphore = new Semaphore(5);// Semaphore semaphore = new Semaphore(5,true); 公平的信号量// 最多只有5个线程能同时运行for (int i = 0; i < 10; i++) {    new Thread(()->{        Random rnd = new Random();        try {            semaphore.acquire();            System.out.println(Thread.currentThread()+"acquire lock");            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        }finally {            semaphore.release();        }    }).start();}

Exchanger

两个线程交换数据

Exchanger<String> exchanger = new Exchanger<>();new Thread(()->{    try {        System.out.println("1st:"+exchanger.exchange("1"));    } catch (InterruptedException e) {        e.printStackTrace();    }}).start();new Thread(()->{    try {        System.out.println("2nd:"+exchanger.exchange("2"));    } catch (InterruptedException e) {        e.printStackTrace();    }}).start();

StampedLock

先试着读,然后通过 validate 方法确认是否进入了写模式,如果没有进入,就成功避免了获取锁的开销

public class StampedSample {  private final StampedLock sl = new StampedLock();  void mutate() {      long stamp = sl.writeLock();      try {          write();      } finally {          sl.unlockWrite(stamp);      }  }  Data access() {      long stamp = sl.tryOptimisticRead();      Data data = read();      if (!sl.validate(stamp)) {          stamp = sl.readLock();          try {              data = read();          } finally {              sl.unlockRead(stamp);          }      }      return data;  }  // …}

LockSupport

var t = new Thread(()->{    for (int i = 0; i < 10; i++) {        if (i == 5){            // 暂停线程            LockSupport.park();        }        System.out.println(i);    }});t.start();Thread.sleep(3000);// 继续线程 可以在park之前调用LockSupport.unpark(t);

VarHandle

public class Main {    int x = 8;    public static void main(String[] args) throws Exception{        Main main = new Main();        VarHandle varHandle = MethodHandles.lookup().findVarHandle(Main.class,"x",int.class);        varHandle.compareAndSet(main,8,9);        System.out.println(varHandle.get(main));    }}

ThreadLocal

供了一种方式,让在多线程环境下,每个线程都可以拥有自己独特的数据,并且可以在整个线程执行过程中,从上而下的传递

ThreadLocal<String> tl = new ThreadLocal<>();var t1 = new Thread(()->{    tl.set("cxk");    System.out.println(tl.get()); // "cxk"});var t2 = new Thread(()->{    System.out.println(tl.get()); // null});t1.start();t1.join();t2.start();

使用ThreadLocal包装的对象只能在当前线程使用

截图录屏_选择区域_20200925150807

原理: https://ismy.wang/java/2019/05/10/%E5%88%9D%E6%8E%A2ThreadLocal.html

ThreadLocal使用了弱引用防止内存泄漏

注意:使用时,对象不再使用,必须手动remove,否则在使用线程池的情况下,线程如果没有被销毁,会产生内存泄漏

属性

// 表示当前ThreadLocal在全局map中的存放位置private final int threadLocalHashCode = nextHashCode();// 它的hashCode是通过一个原子整数不断递增的形式给出的,这样可以保证每台机器的每一个ThreadLocal都有唯一的hashCodeprivate static AtomicInteger nextHashCode = new AtomicInteger();// 用来存放各个ThreadLocal对应的数据static class ThreadLocalMap {    ...}

set

public void set(T value) {    Thread t = Thread.currentThread();    // 获取当前线程的一个map    ThreadLocalMap map = getMap(t);    // 向map里面放数据(如果map为空,则创建map)    if (map != null) {        map.set(this, value);    } else {        // 这里创建map的时候,创建后会存入value        createMap(t, value);    }}// 也就是说每个线程都拥有一张map,这张map的可是ThreadLocal,这样当使用ThreadLocal存取数据时,就会通过ThreadLocal来在这张map上set/get数据

get

public T get() {    Thread t = Thread.currentThread();    ThreadLocalMap map = getMap(t);    // 获取map进行get    if (map != null) {        ThreadLocalMap.Entry e = map.getEntry(this);        if (e != null) {            @SuppressWarnings("unchecked")            T result = (T)e.value;            return result;        }    }    return setInitialValue();}

应用

// 获取当前的请求RequestContextHolder.getCurrentRquest();

副作用

  1. 线程池复用线程会导致ThreadLocal 也被重用 从而会导致脏数据的产生
  2. 如果使用static修饰ThreadLocal 这个时候弱引用就无法防止内存泄露了

解决上面这些问题只需要使用的时候注意remove即可

InheritableThreadLocal

使用 InheritableThreadLocal,可以在子线程中获取到父线程中的 ThreadLocal 变量

其原理是子线程在初始化时,会去获取父线程的 ThreadLocalMap 来作为自己的 ThreadLocalMap

TransmittableThreadLocal

在使用 TransmittableThreadLocal 时,它在将值保存到父类 InheritableThreadLocal 中的同时,会将当前的 TransmittableThreadLocal 实际进行存储,这样使用完成后,它自己就会维护一份所有用到的TransmittableThreadLocal 实例,不管它是用户信息的,还是其他信息的实例

有了上面维护的信息,就可以借助Transmitter来对其中的数据进行操作,一般操作步骤如下

sequenceDiagram    participant Biz Code    participant Runnable    participant TtlRunnable    participant ThreadPool    participant TransmittableThreadLocal    participant Transmitter    Biz Code ->> TransmittableThreadLocal: createTtl()    Biz Code ->> TransmittableThreadLocal: setTtlValue()    Biz Code ->> Runnable: createBizTaskRunnable()    Biz Code ->> TtlRunnable: createTtlRunnableWrapper(Runnable)    TtlRunnable ->> Transmitter: captureAllTtlValues()    Transmitter ->> TransmittableThreadLocal: get()    Transmitter ->> TransmittableThreadLocal: copy(value:T)    Biz Code ->> ThreadPool: submitTtlRunnableToThreadPool()    ThreadPool ->> TtlRunnable: run()    TtlRunnable ->> TransmittableThreadLocal: beforeExecute()    TtlRunnable ->> Transmitter: replayCapturedTtlValues()    TtlRunnable ->> Runnable: run()    Runnable ->> TransmittableThreadLocal: useValueInTTL()    TtlRunnable ->> Transmitter: restoreTtlValuesBeforeReplay()    TtlRunnable ->> TransmittableThreadLocal: afterExecute()

锁的原理(AQS AbstractQueuedSynchronizer)

AQS 是一个锁框架,它定义了锁的实现机制,并开放出扩展的地方,让子类去实现,比如我们在 lock 的时候,AQS 开放出 state 字段,让子类可以根据 state 字段来决定是否能够获得锁,对于获取不到锁的线程 AQS 会自动进行管理,无需子类锁关心

方法分层

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

独占模式下的流程共享模式下的流程

整体架构

20202251401

属性

// 这个状态用来判断是否可以获得锁,每次获得锁时+1,释放锁-1// 当子类继承AQS来实现锁时,要根据这个状态判断能否获得锁(为0才能获得)跟释放锁(为1才能释放)private volatile int state;// 同步队列的头与尾,底层是一个双向链表,用来阻塞获取不到锁的线程,并在适当时机释放这些线程private transient volatile Node head;private transient volatile Node tail;// 条件队列,管理获取不到锁的线程,但条件队列不直接和锁打交道,但常常和锁配合使用public class ConditionObject implements Condition, java.io.Serializable {    // Condition 可以用来代替 Object 中相应的监控方法    // 它提供了一种线程协作方式,并且都有明确语义        /** First node of condition queue. */    private transient Node firstWaiter;    /** Last node of condition queue. */    private transient Node lastWaiter;}static final class Node {    ...    volatile int waitStatus;    // 在共享锁中用来表示下一个等待线程,排它锁则用来表示当前节点是共享还是排它模式    Node nextWaiter;}

获取锁

// 排它模式下,尝试获得锁public final void acquire(int arg) {    // tryAcquire让子类实现的,思路一般是根据state的值决定是否能获取到锁    if (!tryAcquire(arg) &&        // 如果获取不到就调用addWaiter让线程进入同步队列,然后acquireQueued这个方法代表进入之后会阻塞,直到被唤醒获得锁        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        // 获取锁失败,打断自身        selfInterrupt();}// 追加到同步队列的队尾private Node addWaiter(Node mode) {    Node node = new Node(mode);            for (;;) {        Node oldTail = tail;        // 如果队尾不为空,就将node插入到队尾        if (oldTail != null) {            // 将原来队尾的node设置为新加入node的prev            node.setPrevRelaxed(oldTail);            // 原子方式将node设置为队尾            if (compareAndSetTail(oldTail, node)) {                oldTail.next = node;                return node;            }        // 队尾为空,需要初始化同步队列        } else {            initializeSyncQueue();        }    }}// 阻塞当前线程final boolean acquireQueued(final Node node, int arg) {    boolean interrupted = false;    try {        for (;;) {            final Node p = node.predecessor();            // 如果这个当前线程节点的前置节点是头节点,并且自己已经能获得锁了            if (p == head && tryAcquire(arg)) {                // 将自己设置为头节点                setHead(node);                p.next = null; // help GC                // 然后返回                return interrupted;            }            // 前一个节点状态为SIGNAL了,那么就阻塞自己(park)            if (shouldParkAfterFailedAcquire(p, node))                interrupted |= parkAndCheckInterrupt();        }    } catch (Throwable t) {        cancelAcquire(node);        if (interrupted)            selfInterrupt();        throw t;    }}
public final void acquireShared(int arg) {    // 同样由子类实现    if (tryAcquireShared(arg) < 0)        // 里面的这个方法主要做的是不断自旋直到获取到锁,当获取到锁之后,会通知排在它后面的节点        doAcquireShared(arg);}private void doAcquireShared(int arg) {    final Node node = addWaiter(Node.SHARED);    boolean interrupted = false;    try {        for (;;) {            final Node p = node.predecessor();            // 当前节点的前置节点如果是头节点            if (p == head) {                // 尝试获取锁                int r = tryAcquireShared(arg);                // 获取锁成功                if (r >= 0) {                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    return;                }            }            if (shouldParkAfterFailedAcquire(p, node))                interrupted |= parkAndCheckInterrupt();        }    } catch (Throwable t) {        cancelAcquire(node);        throw t;    } finally {        if (interrupted)            selfInterrupt();    }}

释放锁

public final boolean release(int arg) {    // 同样留给子类实现判断是否能释放锁    if (tryRelease(arg)) {        Node h = head;        // 后面有一些等待唤醒的节点        if (h != null && h.waitStatus != 0)            // 从头开始唤醒等待锁的节点            unparkSuccessor(h);        return true;    }    return false;}
public final boolean releaseShared(int arg) {    // 基本跟上面一样    if (tryReleaseShared(arg)) {        // 唤醒后面的线程        doReleaseShared();        return true;    }    return false;}

条件队列

获得锁的线程,如果在碰到队列满或空的时候,就会阻塞住,这个阻塞就是用条件队列实现的,这个动作我们叫做入条件队列

之前队列满了,有了一些线程因为 take 操作而被阻塞进条件队列中,突然队列中的元素被线程 A 消费了,线程 A 就会调用 signal 方法,唤醒之前阻塞的线程