Skip to main content
DelayQueue

使用

public class DelayQueueMain {
    public static void main(String[] args) {
        var queue = new DelayQueue<DelayedUser>();
        var executorService = Executors.newFixedThreadPool(2);

        executorService.execute(() -> {
            for (int i = 0; i < 3; i++) {
                var delayedUser = new DelayedUser("name-" + i, new Random().nextInt(2000));
                queue.put(delayedUser);
                System.out.println(Thread.currentThread().getName() + " put user: " + delayedUser.name() + ", avaibleTime: " + delayedUser.avaibleTime());
                sleep(1);
            }
        });

        executorService.execute(() -> {
            for (int i = 0; i < 3; i++) {
                try {
                    var delayedUser = queue.take();
                    System.out.println(Thread.currentThread().getName() + " take user: " + delayedUser.name() + ", avaibleTime: " + delayedUser.avaibleTime());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        executorService.shutdown();
    }

    static void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

record DelayedUser(String name, long avaibleTime) implements Delayed {

    DelayedUser(String name, long avaibleTime) {
        this.name = name;
        this.avaibleTime = System.currentTimeMillis() + avaibleTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(avaibleTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.avaibleTime - ((DelayedUser) o).avaibleTime);
    }
}

huhxLess than 1 minutejavaConcurrency-CollectionConcurrency
LongAdder

huhxLess than 1 minutejavaConcurrency-ToolkitConcurrency
原子类 Atomic

Atomic翻译成中文是原子的意思,是指一个操作是不可中断的。在多个线程一起执行的时候,能够保证一个操作一旦开始,就不会被其他线程干扰。今天我们就来学习下java中自带的一些原子类。

使用

原子类都存放在java.util.concurrent.atomic下,这里以 AtomicInteger 为例子来介绍

AtomicInteger 类常用方法



huhxAbout 1 minjavaConcurrency-CollectionConcurrency
分而治之Fork/Join

在多核处理器时代,

使用

public class ForkJoinPoolMain {
    public static void main(String[] args) {
        var forkJoinPool = new ForkJoinPool();
        var sumTask = new SumTask(0, 100000000);
        forkJoinPool.submit(sumTask);
        
        System.out.println(sumTask.join());
    }
}

class SumTask extends RecursiveTask<Long> {
    private final static int TASK_THRESHOLD = 10000;
    private final int start;
    private final int end;

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start < TASK_THRESHOLD) {
            return LongStream.range(start, end).sum();
        } else {
            int middle = start + (end - start) / 2;
            var sumTask1 = new SumTask(start, middle);
            var sumTask2 = new SumTask(middle, end);

            return sumTask1.fork().join() + sumTask2.fork().join();
        }
    }
}

huhxLess than 1 minutejavaConcurrency-ToolkitConcurrency
StampedLock

我们知道ReadWriteLock可以实现读写分离,但是读写之间仍旧是需要同步的。当有大量的读线程,那么也会造成写线程的长时间阻塞引发饥饿的问题。有没有一种锁可以针对这种场景做些优化呢?今天我们就来看下这个作为读写锁的升级版:StampedLock

使用

StampedLock提供了一种乐观的读策略。这种乐观的锁非常类似无锁的操作,使得乐观锁完全不会阻塞写线程。

分析

StampedLock的内部实现是基于CLH锁的。CLH锁是一种自旋锁,它保证没有饥饿发生,并且可以保证FIFO的服务顺序。


huhxAbout 1 minjavaConcurrency-ToolkitConcurrency