同步器
Java 并发包中的同步器是一些用于协调多个线程执行的工具,用于实现线程之间的同步和互斥操作。这些同步器提供了不同的机制来控制线程的访问和执行顺序,以实现线程安全和并发控制。
1、Semaphore(信号量)
Semaphore 是 Java 并发包中的同步器之一,用于控制对临界区资源的访问数量。它允许多个线程同时访问临界区资源,但限制了同一时间内可以访问资源的线程数量。
Semaphore 维护一个许可证计数,线程可以获取和释放这些许可证。当许可证数量为零时,线程需要等待,直到其他线程释放许可证。
Semaphore 基本用法
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3); // 初始化信号量,允许同时访问的线程数量为3
// 创建多个线程来模拟访问临界区资源
for (int i = 1; i <= 5; i++) {
int threadId = i;
Thread thread = new Thread(() -> {
try {
semaphore.acquire(); // 获取许可证,如果没有许可证则阻塞
System.out.println("Thread " + threadId + " acquired a permit and is accessing the resource.");
Thread.sleep(2000); // 模拟访问临界区资源的耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可证
System.out.println("Thread " + threadId + " released the permit.");
}
});
thread.start();
}
}
}
运行结果:
Thread 1 acquired a permit and is accessing the resource.
Thread 3 acquired a permit and is accessing the resource.
Thread 2 acquired a permit and is accessing the resource.
Thread 2 released the permit.
Thread 4 acquired a permit and is accessing the resource.
Thread 5 acquired a permit and is accessing the resource.
Thread 3 released the permit.
Thread 1 released the permit.
Thread 4 released the permit.
Thread 5 released the permit.
在上述示例中,我们创建了一个 Semaphore 实例,并初始化许可证数量为 3。然后创建了多个线程,每个线程在获取许可证后访问临界区资源,模拟耗时操作后释放许可证。由于许可证数量有限,只有一部分线程能够同时访问资源,其他线程需要等待。
Semaphore 适用场景
- 有限资源的并发访问,如数据库连接池、线程池等。
- 控制对某个资源的同时访问数量,以避免资源竞争和过度消耗。
2、CountDownLatch
CountDownLatch 是 Java 并发包中的同步器之一,用于实现一种等待机制,允许一个或多个线程等待其他线程完成一组操作后再继续执行。
它通过维护一个计数器来实现等待和通知的机制。
在创建 CountDownLatch 时,需要指定初始计数值,每次调用 countDown() 方法会减少计数值,当计数值达到零时,等待的线程会被唤醒继续执行。
CountDownLatch 基本用法
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
int numberOfTasks = 3;
CountDownLatch latch = new CountDownLatch(numberOfTasks);
// 创建多个线程来模拟完成任务
for (int i = 1; i <= numberOfTasks; i++) {
int taskId = i;
Thread thread = new Thread(() -> {
try {
System.out.println("Task " + taskId + " is executing...");
Thread.sleep(2000); // 模拟任务执行耗时
System.out.println("Task " + taskId + " is completed.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 完成任务后减少计数
}
});
thread.start();
}
try {
System.out.println("Main thread is waiting for tasks to complete...");
latch.await(); // 等待所有任务完成
System.out.println("All tasks are completed. Main thread continues.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
Task 1 is executing...
Main thread is waiting for tasks to complete...
Task 2 is executing...
Task 3 is executing...
Task 1 is completed.
Task 2 is completed.
Task 3 is completed.
All tasks are completed. Main thread continues.
在上述示例中,我们创建了一个 CountDownLatch 实例,并初始化计数值为 3。然后创建了多个线程来模拟完成任务,每个线程执行完任务后调用 countDown() 方法减少计数。主线程在执行 latch.await() 时等待计数值为零,等待所有任务完成后继续执行。
使用 CountDownLatch 可以实现多个线程之间的协调,确保某些操作在其他操作完成后再继续执行。
CountDownLatch 适用场景
- 主线程等待多个子线程完成任务后再继续执行。
- 等待多个线程完成初始化工作后再开始并行操作。
3、CyclicBarrier
CyclicBarrier 是 Java 并发包中的同步器之一,用于实现一组线程在达到一个共同点之前等待彼此,并在达到共同点后继续执行。它可以被重置并重新使用,适用于需要多个线程协同工作的场景。
CyclicBarrier 维护一个计数器和一个栅栏动作(barrier action)。当线程调用 await() 方法时,计数器减少,当计数器达到零时,所有等待的线程会被唤醒并继续执行,同时会执行栅栏动作。计数器可以被重置,并且可以设置栅栏动作,在达到共同点后执行。
CyclicBarrier 基本用法
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int numberOfThreads = 3;
Runnable barrierAction = () -> System.out.println("All threads reached the barrier!");
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction);
// 创建多个线程来模拟并行执行任务
for (int i = 1; i <= numberOfThreads; i++) {
int threadId = i;
Thread thread = new Thread(() -> {
try {
System.out.println("Thread " + threadId + " is performing its task.");
Thread.sleep(2000); // 模拟任务执行耗时
System.out.println("Thread " + threadId + " has reached the barrier.");
barrier.await(); // 等待其他线程达到栅栏点
System.out.println("Thread " + threadId + " continues after the barrier.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
thread.start();
}
}
}
运行结果:
Thread 1 is performing its task.
Thread 3 is performing its task.
Thread 2 is performing its task.
Thread 2 has reached the barrier.
Thread 3 has reached the barrier.
Thread 1 has reached the barrier.
All threads reached the barrier!
Thread 1 continues after the barrier.
Thread 2 continues after the barrier.
Thread 3 continues after the barrier.
在上述示例中,我们创建了一个 CyclicBarrier 实例,初始化等待的线程数量为 3,并设置了栅栏动作。
然后创建多个线程,每个线程模拟执行任务后等待其他线程达到栅栏点,当所有线程都达到栅栏点时,栅栏动作会被执行。
使用 CyclicBarrier 可以实现多线程协同工作的场景,确保所有线程在某个共同点之前等待彼此,并在达到共同点后继续执行。
CyclicBarrier 计数器重置用法
package com.lf.java.basic.concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class MultipleCyclicBarrierExample {
public static void main(String[] args) {
int numberOfThreads = 3;
int numberOfRounds = 3;
Runnable barrierAction = () -> System.out.println("All threads reached the barrier!");
for (int round = 1; round <= numberOfRounds; round++) {
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction);
System.out.println("Round " + round + ": Starting tasks");
// 创建多个线程来模拟并行执行任务
for (int i = 1; i <= numberOfThreads; i++) {
int threadId = i;
int finalRound = round;
Thread thread = new Thread(() -> {
try {
System.out.println("Thread " + threadId + " is performing its task for Round " + finalRound);
Thread.sleep(2000); // 模拟任务执行耗时
System.out.println("Thread " + threadId + " has reached the barrier for Round " + finalRound);
barrier.await(); // 等待其他线程达到栅栏点
System.out.println("Thread " + threadId + " continues after the barrier for Round " + finalRound);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
thread.start();
}
// 等待所有线程完成当前轮次的任务
try {
Thread.sleep(3000); // 等待一段时间以观察效果
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Round " + round + ": All tasks completed\n");
// 让当前轮次的所有线程都离开栅栏点,以便重新使用
barrier.reset();
}
}
}
运行结果:
Round 1: Starting tasks
Thread 1 is performing its task for Round 1
Thread 2 is performing its task for Round 1
Thread 3 is performing its task for Round 1
Thread 3 has reached the barrier for Round 1
Thread 2 has reached the barrier for Round 1
Thread 1 has reached the barrier for Round 1
All threads reached the barrier!
Thread 2 continues after the barrier for Round 1
Thread 1 continues after the barrier for Round 1
Thread 3 continues after the barrier for Round 1
Round 1: All tasks completed
Round 2: Starting tasks
Thread 1 is performing its task for Round 2
Thread 2 is performing its task for Round 2
Thread 3 is performing its task for Round 2
Thread 3 has reached the barrier for Round 2
Thread 2 has reached the barrier for Round 2
Thread 1 has reached the barrier for Round 2
All threads reached the barrier!
Thread 1 continues after the barrier for Round 2
Thread 3 continues after the barrier for Round 2
Thread 2 continues after the barrier for Round 2
Round 2: All tasks completed
Round 3: Starting tasks
Thread 1 is performing its task for Round 3
Thread 2 is performing its task for Round 3
Thread 3 is performing its task for Round 3
Thread 1 has reached the barrier for Round 3
Thread 2 has reached the barrier for Round 3
Thread 3 has reached the barrier for Round 3
All threads reached the barrier!
Thread 3 continues after the barrier for Round 3
Thread 1 continues after the barrier for Round 3
Thread 2 continues after the barrier for Round 3
Round 3: All tasks completed
在上述示例中,我们模拟了多轮任务协同。每一轮都创建一个新的 CyclicBarrier 实例,用于协调线程的等待和通知。在每一轮的任务完成后,我们使用 barrier.reset() 来重置计数器,以便进行下一轮的任务协同。
运行这个示例可以看到多轮任务协同的效果,每一轮的任务都会等待所有线程完成后再继续,然后重置计数器以准备下一轮。
CyclicBarrier 适用场景
- 将多个线程分成阶段进行,每个阶段需要等待其他线程完成后再继续。
- 并行计算中的分治操作,等待所有线程完成分治任务后进行合并计算。
4、Phaser
Phaser 是 Java 并发包中的同步器之一,它提供了更灵活的多阶段线程协调机制,适用于需要分阶段进行多个任务的并行执行和协调的场景。Phaser 可以用于更复杂的同步需求,例如循环的多阶段任务协同。
Phaser 维护了一个计数器和多个阶段(phase)。在每个阶段,线程可以注册、等待和注销,以及在某个阶段到达时执行特定的操作。
Phaser 基本用法
import java.util.concurrent.Phaser;
public class PhaserExample {
public static void main(String[] args) {
int numberOfThreads = 3;
int numberOfPhases = 3;
Phaser phaser = new Phaser(numberOfThreads) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase " + phase + " completed.");
return phase == numberOfPhases - 1 || registeredParties == 0;
}
};
// 创建多个线程来模拟并行执行任务
for (int i = 0; i < numberOfThreads; i++) {
int threadId = i;
Thread thread = new Thread(() -> {
for (int phase = 0; phase < numberOfPhases; phase++) {
System.out.println("Thread " + threadId + " is in Phase " + phase);
phaser.arriveAndAwaitAdvance(); // 等待其他线程到达当前阶段
}
});
thread.start();
}
}
}
运行结果:
Thread 0 is in Phase 0
Thread 1 is in Phase 0
Thread 2 is in Phase 0
Phase 0 completed.
Thread 2 is in Phase 1
Thread 1 is in Phase 1
Thread 0 is in Phase 1
Phase 1 completed.
Thread 1 is in Phase 2
Thread 2 is in Phase 2
Thread 0 is in Phase 2
Phase 2 completed.
在上述示例中,我们创建了一个 Phaser 实例,设置初始注册线程数量为 3。然后,我们创建多个线程来模拟并行执行任务,每个线程都会在每个阶段调用 phaser.arriveAndAwaitAdvance() 等待其他线程到达当前阶段。当所有线程都到达后,onAdvance() 方法会被调用,用于执行阶段结束后的操作。
Phaser 提供了更灵活的多阶段协同机制,适用于需要多个阶段的任务协同和并行执行的场景。它还支持动态添加或删除等待线程,使其更适用于动态变化的并发需求。
Phaser 适用场景
- 需要分阶段执行的任务,每个阶段可以有不同的线程数。
- 需要动态添加或删除等待线程的场景。
5、ReentrantLock
ReentrantLock 是 Java 并发包中的同步器之一,它是一个可重入的互斥锁,提供了与 synchronized 关键字相似的功能,但更为灵活。与 synchronized 不同,ReentrantLock 具有更多的控制选项和功能,例如公平性、可中断性、超时等待等。
ReentrantLock 基本用法
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
// 创建多个线程来模拟使用锁
for (int i = 1; i <= 5; i++) {
int threadId = i;
Thread thread = new Thread(() -> {
try {
lock.lock(); // 获取锁
System.out.println("Thread " + threadId + " acquired the lock.");
Thread.sleep(2000); // 模拟临界区操作耗时
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
System.out.println("Thread " + threadId + " released the lock.");
}
});
thread.start();
}
}
}
运行结果:
Thread 1 acquired the lock.
Thread 1 released the lock.
Thread 2 acquired the lock.
Thread 2 released the lock.
Thread 3 acquired the lock.
Thread 3 released the lock.
Thread 4 acquired the lock.
Thread 4 released the lock.
Thread 5 acquired the lock.
Thread 5 released the lock.
在上述示例中,我们创建了一个 ReentrantLock 实例,并在多个线程中使用它来模拟对共享资源的访问。每个线程在访问资源前调用 lock.lock() 来获取锁,访问资源后调用 lock.unlock() 来释放锁。
需要注意的是,为了避免死锁,应该在 finally 块中释放锁,以确保无论是否发生异常,锁都会被释放。
ReentrantLock 还提供了其他方法,如 tryLock()(尝试获取锁,如果锁可用则获取,否则返回 false)、lockInterruptibly()(可中断的获取锁,可响应线程中断)等,使其更加灵活和强大。
ReentrantLock 中断获取锁用法
package com.lf.java.basic.concurrent;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class InterruptibleLockExample {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
// 创建线程尝试获取锁
Thread thread = new Thread(() -> {
try {
lock.lockInterruptibly(); // 可中断获取锁
System.out.println("Thread acquired the lock.");
Thread.sleep(5000); // 模拟临界区操作耗时
} catch (InterruptedException e) {
//中断唤醒线程
System.out.println("Thread interrupted while waiting for the lock.");
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock(); // 释放锁
System.out.println("Thread released the lock.");
}
}
});
// 启动线程
thread.start();
// 主线程等待一段时间后尝试中断线程
try {
Thread.sleep(2000);
System.out.println("Thread interrupt before");
thread.interrupt(); // 中断线程的等待
System.out.println("Thread interrupt after");
} catch (InterruptedException e) {
System.out.println("InterruptedException catch");
e.printStackTrace();
}
}
}
运行结果:
Thread acquired the lock.
Thread interrupt before
Thread interrupt after
Thread interrupted while waiting for the lock.
Thread released the lock.
在上述示例中,创建了一个线程尝试获取锁,但是主线程在启动线程后等待了一段时间后中断了该线程的等待。
由于我们使用了 lock.lockInterruptibly() 来获取锁,线程在等待锁的过程中可以响应中断,一旦被中断,它会抛出 InterruptedException,从而可以捕获中断事件并做出相应处理。
ReentrantLock 适用场景:
- 需要更精细的同步控制,例如在某些情况下需要手动释放锁。
- 需要可中断或超时等待的线程。
6、ReadWriteLock
ReadWriteLock 是 Java 并发包中的同步器之一,用于实现读写分离的锁机制,提供了更高效的并发访问控制。
它允许多个线程同时读取共享资源,但在写入资源时只允许一个线程进行,从而提高了并发性能。
ReadWriteLock 包含两种锁:读锁和写锁。
- 读锁(ReadLock):多个线程可以同时获取读锁,只要没有线程持有写锁。在没有写锁的情况下,多个线程可以并发读取共享资源,从而提高并发性能。
- 写锁(Write Lock):写锁是独占的,只有一个线程可以持有写锁。在一个线程持有写锁时,其他线程无法获取读锁或写锁,从而确保对共享资源的写操作是互斥的。
ReadWriteLock 基本用法
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockExample {
public static void main(String[] args) {
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 创建多个读线程
for (int i = 1; i <= 5; i++) {
int threadId = i;
Thread thread = new Thread(() -> {
readWriteLock.readLock().lock(); // 获取读锁
try {
System.out.println("Thread " + threadId + " is reading.");
Thread.sleep(2000); // 模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock(); // 释放读锁
}
});
thread.start();
}
// 创建一个写线程
Thread writeThread = new Thread(() -> {
readWriteLock.writeLock().lock(); // 获取写锁
try {
System.out.println("Write thread is writing.");
Thread.sleep(2000); // 模拟写入操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock(); // 释放写锁
}
});
writeThread.start();
}
}
运行结果:
Thread 1 is reading.
Thread 2 is reading.
Thread 4 is reading.
Thread 3 is reading.
Thread 5 is reading.
Write thread is writing
在上述示例中,我们创建了一个 ReadWriteLock 实例,然后创建多个读线程和一个写线程来模拟读写操作。
读线程在执行时调用 readWriteLock.readLock().lock() 来获取读锁,写线程在执行时调用 readWriteLock.writeLock().lock() 来获取写锁。
使用 ReadWriteLock 可以提高对共享资源的并发访问性能,适用于读操作频繁,写操作较少的场景。
ReadWriteLock 适用场景
- 读操作频繁,写操作较少的情况,以提高并发性能。
- 允许多个线程同时读取资源,但在写入资源时确保互斥。
7、 Condition
Condition 是 Java 并发包中的同步器之一,它提供了更灵活的线程等待和通知机制,用于在多线程环境下实现精细的线程协调。
Condition 是与 Lock 结合使用的,它可以替代传统的 wait() 和 notify() 方法,提供更多的控制选项和功能。
通过 Condition,我们可以实现更精确的等待和通知,以及更灵活的线程唤醒机制。
Condition 基本用法
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// 创建一个等待线程
Thread waitingThread = new Thread(() -> {
lock.lock();
try {
System.out.println("Waiting thread is waiting...");
condition.await(); // 等待条件满足
System.out.println("Waiting thread is awake.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
// 创建一个唤醒线程
Thread signalingThread = new Thread(() -> {
lock.lock();
try {
Thread.sleep(2000); // 模拟等待一段时间
System.out.println("Signaling thread is awake.");
condition.signal(); // 唤醒等待线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
// 启动线程
waitingThread.start();
signalingThread.start();
}
}
运行结果:
Waiting thread is waiting...
Signaling thread is awake.
Waiting thread is awake.
在上述示例中,我们创建了一个 ReentrantLock 实例和一个与之关联的 Condition,然后创建了一个等待线程和一个唤醒线程。
等待线程在调用 condition.await() 后进入等待状态,直到唤醒线程调用 condition.signal() 来唤醒它。
通过使用 Condition,我们可以更加精确地控制线程的等待和通知,使线程协调更加灵活。
Condition 实现阻塞队列
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueueWithCondition<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BlockingQueueWithCondition(int capacity) {
this.capacity = capacity;
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await();
}
queue.offer(item);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await();
}
T item = queue.poll();
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
BlockingQueueWithCondition<Integer> queue = new BlockingQueueWithCondition<>(5);
Thread producerThread = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
int item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(1500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
}
}
运行结果:
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Produced: 10
Consumed: 8
Consumed: 9
Consumed: 10
在上述示例中,我们使用 Condition 来实现了一个阻塞队列,其中 put() 方法用于向队列中放入元素,take() 方法用于从队列中取出元素。
当队列满时,生产者线程会等待 notFull 条件,当队列为空时,消费者线程会等待 notEmpty 条件。
这个示例展示了如何使用 Condition 来实现线程之间的协调,以及如何实现一个简单的阻塞队列。注意,这个示例并没有处理所有的边界情况和异常情况,实际使用时需要考虑更多细节。
8、BlockingQueue
BlockingQueue 是 Java 并发包中的一个接口,它提供了一种线程安全的队列实现,用于在多线程环境下进行数据的生产和消费。
BlockingQueue 支持阻塞操作,当队列满或空时,线程会被阻塞,直到条件满足。
BlockingQueue 提供了多种实现,包括:
- ArrayBlockingQueue:基于数组的有界阻塞队列。
- LinkedBlockingQueue:基于链表的可选有界阻塞队列。
- PriorityBlockingQueue:基于优先级的无界阻塞队列。
- DelayQueue:基于延迟时间的无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,用于直接传递数据。
- LinkedTransferQueue:基于链表的无界阻塞队列,结合了 LinkedBlockingQueue 和SynchronousQueue 特性。
- LinkedBlockingDeque:基于链表的双端阻塞队列。
BlockingQueue 基本用法
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
Thread producerThread = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
int item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(1500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
}
}
运行结果:
Consumed: 1
Produced: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Produced: 10
Consumed: 8
Consumed: 9
Consumed: 10
在上述示例中,我们使用了 ArrayBlockingQueue 来实现阻塞队列,其中生产者线程使用 put() 方法向队列中放入元素,消费者线程使用 take() 方法从队列中取出元素。
当队列满或空时,线程会被阻塞,直到条件满足。
BlockingQueue 是实现线程安全的生产者-消费者模式的常用工具,它简化了线程之间的协调和通信。
9、BlockingDeque
BlockingDeque(阻塞双端队列)是 Java 并发包中的一个接口,它是 BlockingQueue 接口的扩展,提供了双端队列的功能,并支持阻塞操作。BlockingDeque 可以在队列的两端插入和删除元素,同时支持阻塞操作,使得在多线程环境下更容易实现数据的生产和消费。
BlockingDeque 接口的实现类包括:
- LinkedBlockingDeque:基于链表的阻塞双端队列,可选有界或无界。
- LinkedBlockingDeque:基于链表的双端阻塞队列,无界。
BlockingDeque基本用法
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class BlockingDequeExample {
public static void main(String[] args) {
BlockingDeque<Integer> deque = new LinkedBlockingDeque<>(5);
Thread producerThread = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
deque.put(i);
System.out.println("Produced: " + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
int item = deque.take();
System.out.println("Consumed: " + item);
Thread.sleep(1500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
}
}
运行结果:
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Produced: 10
Consumed: 8
Consumed: 9
Consumed: 10
在上述示例中,我们使用了 LinkedBlockingDeque 来实现阻塞双端队列,生产者线程使用 put() 方法向队列中放入元素,消费者线程使用 take() 方法从队列中取出元素。
与 BlockingQueue 类似,当队列满或空时,线程会被阻塞,直到条件满足。
BlockingDeque 可以更灵活地实现在队列两端插入和删除元素,适用于更多种类的场景,例如双向数据传输和窗口滑动等。
以下是一些常用的在队列两端插入和删除元素的方法:
- 在队列头部插入元素:
void addFirst(E e): 将元素添加到队列的头部,如果队列已满,则抛出异常。
boolean offerFirst(E e): 将元素添加到队列的头部,如果队列已满,则返回 false。
void putFirst(E e): 将元素添加到队列的头部,如果队列已满,则阻塞等待直到有空间。
- 在队列尾部插入元素:
void addLast(E e):将元素添加到队列的尾部,如果队列已满,则抛出异常。
boolean offerLast(E e):将元素添加到队列的尾部,如果队列已满,则返回 false。
void putLast(E e):将元素添加到队列的尾部,如果队列已满,则阻塞等待直到有空间。
- 从队列头部删除元素:
E removeFirst(): 移除并返回队列头部的元素,如果队列为空,则抛出异常。
E pollFirst(): 移除并返回队列头部的元素,如果队列为空,则返回 null。
E takeFirst(): 移除并返回队列头部的元素,如果队列为空,则阻塞等待直到有元素。
- 从队列尾部删除元素:
E removeLast():移除并返回队列尾部的元素,如果队列为空,则抛出异常。
E pollLast(): 移除并返回队列尾部的元素,如果队列为空,则返回 null。
E takeLast(): 移除并返回队列尾部的元素,如果队列为空,则阻塞等待直到有元素。
这些方法使得你可以在双端队列的头部和尾部执行插入和删除操作,根据具体的需求选择合适的方法来实现线程安全的双端队列操作。
10、LockSupport
LockSupport 是 Java 并发包中提供的工具类,用于线程的阻塞和唤醒操作。
它提供了一种基于许可(permit)的方式来控制线程的阻塞和唤醒,相对于传统的 wait() 和 notify() 方法,LockSupport 更加灵活和可靠。
主要的方法包括:
-
void park():阻塞当前线程,直到获得许可。
-
void park(Object blocker):阻塞当前线程,并将 blocker 关联到当前线程,用于监控和诊断工具。
-
void parkNanos(long nanos):阻塞当前线程,最多等待指定的纳秒数,直到获得许可。
-
void parkNanos(Object blocker, long nanos):阻塞当前线程,并将 blocker
关联到当前线程,最多等待指定的纳秒数。 -
void parkUntil(long deadline):阻塞当前线程,直到指定的时间戳,直到获得许可。
-
void parkUntil(Object blocker, long deadline):阻塞当前线程,并将 blocker
关联到当前线程,直到指定的时间戳。 -
void unpark(Thread thread):唤醒指定的线程,如果线程被阻塞,则解除阻塞状态。
LockSupport基本用法
import java.util.concurrent.locks.LockSupport;
public class LockSupportExample {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println("Thread is going to be parked.");
LockSupport.park(); // 阻塞当前线程
System.out.println("Thread is unparked.");
});
thread.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main thread is unparking the parked thread.");
LockSupport.unpark(thread); // 唤醒被阻塞的线程
}
}
运行结果:
Thread is going to be parked.
Main thread is unparking the parked thread.
Thread is unparked.
在上述示例中,我们创建了一个新线程,调用了 LockSupport.park() 来阻塞该线程。
然后,主线程等待 2 秒后,调用了 LockSupport.unpark(thread) 来唤醒被阻塞的线程。与传统的 wait() 和 notify() 方法不同,LockSupport 是基于许可的,不需要获取某个特定对象的锁来进行阻塞和唤醒操作。
LockSupport 提供了一种更直接、灵活和可控的线程阻塞和唤醒机制,适用于各种多线程协调的场景。
11、Exchanger
Exchanger 是 Java 并发包中的同步器之一,用于实现两个线程之间交换数据。
它提供了一个同步点,当两个线程都到达这个同步点时,它们可以交换数据。Exchanger 可以用于实现线程间的数据传递和协作
Exchanger 提供了两个线程之间交换数据的功能,但仅限于两个线程。当两个线程都到达 Exchanger 同步点时,它们可以通过 exchange() 方法交换数据,然后各自继续执行。
Exchanger基本用法
import java.util.concurrent.Exchanger;
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
// 创建一个线程来发送数据
Thread senderThread = new Thread(() -> {
try {
String dataToSend = "Hello from Sender";
System.out.println("Sender is sending: " + dataToSend);
exchanger.exchange(dataToSend); // 发送数据并等待接收数据
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 创建一个线程来接收数据
Thread receiverThread = new Thread(() -> {
try {
String receivedData = exchanger.exchange(null); // 等待接收数据并发送数据
System.out.println("Receiver received: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 启动线程
senderThread.start();
receiverThread.start();
}
}
运行结果
Sender is sending: Hello from Sender
Receiver received: Hello from Sender
在上述示例中,我们创建了一个 Exchanger 实例,然后创建了一个发送数据的线程和一个接收数据的线程。
当发送数据的线程调用 exchange() 方法时,它会发送数据并等待接收数据;而接收数据的线程调用 exchange() 方法时,它会等待接收数据并发送数据。当两个线程都到达 Exchanger 同步点时,它们会交换数据,并继续执行。
需要注意的是,Exchanger 只适用于两个线程之间的数据交换。如果需要更多线程之间的数据交换,可能需要组合使用多个 Exchanger。