响应式的锁

由于响应式技术栈基本都是基于eventloop的,而对于当前(2022年5月18日)java的并发模型都是基于内核线程进行建模的,多个请求并发跑在同一个线程上导致了synchronized 和J.U.C包的锁不再适用

而并发中锁又是必须的,那么我们该怎么办?

kotlin 协程锁

以当前eventloop作为调度器驱动多个无栈协程,为每一个请求分配一个协程即可

本质上这种并不会阻塞住当前的内核线程,而是”阻塞“当前协程,实际上就是当前协程通过cas抢到锁之后来处理自己的业务,此时其他抢不到的阻塞等待调度,锁持有者协程放锁时恢复等待的协程执行

实际原理很简单:

cas看看能不能拿到 不能拿到把当前的continuation封装到waiter里面 等待resume

而unlock方法则是取出第一个等待的waiter节点,然后恢复其中的continuation执行就行了

vertx的异步锁

实际上就是通过 ConcurrentMap#compute的原子特性直接加入等待队列

若发现当前队列没有竞争者则直接获取到锁,触发future回调

而释放锁的release则是这样的

    @Override
    public void release() {
      if (invoked.compareAndSet(false, true)) {
        nextWaiter(lockName);
      }
    }
private void nextWaiter(String lockName) {
    List<LockWaiter> waiters = waitersMap.compute(lockName, (s, list) -> {
      return list == null || list.size() == 1 ? null : new ArrayList<>(list.subList(1, list.size()));
    });
    if (waiters != null) {
      waiters.get(0).acquireLock();
    }
  }

直接让下一个等待者获取到锁即可

java的loom

如果我还是想用juc这种lock怎么办?

那么我们就可以使用project loom(jdk19将preview),将当前的eventloop或者其他的线程池作为有栈协程的调度器,由于loom带来的virtualthread(协程)是Thread的子类,且LockSupport类也对其有适配,阻塞虚拟线程而不是内核线程

val router = Router.router(vertx)
val lock = ReentrantLock()
  router.get()
    .handler{
      Thread.startVirtualThread {
        lock.lock()
        try {
          //做你要的处理
        }finally {
            lock.unlock()
        }
      }
    }

然后我们来看看LockSupport#park方法 相当于从当前的内核线程上卸载这个协程 然后换一个协程来执行

 public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        try {
            if (t.isVirtual()) {
                VirtualThreads.park();
            } else {
                U.park(false, 0L);
            }
        } finally {
            setBlocker(t, null);
        }
    }
private void doPark() {
        // complete immediately if parking permit available or interrupted
        if (getAndSetParkPermit(false) || interrupted)
            return;

        // park the thread
        setState(PARKING);
        try {
            if (!yieldContinuation()) {
                // park on the carrier thread when pinned
                parkOnCarrierThread(false, 0);
            }
        } finally {
            assert (Thread.currentThread() == this) && (state() == RUNNING);
        }
    }
private boolean yieldContinuation() {
        boolean notifyJvmti = notifyJvmtiEvents;

        // unmount
        if (notifyJvmti) notifyJvmtiUnmountBegin(false);
        unmount();
        try {
            return Continuation.yield(VTHREAD_SCOPE);
        } finally {
            // re-mount
            mount();
            if (notifyJvmti) notifyJvmtiMountEnd(false);
        }
    }

Last updated