1.阻塞队列
组成:
ArrayBlockingQueue: 底层是数组,元素是有限的.
LinkedBlockingDeque: 底层是链表,元素是无限的.(并不是真正的无限,最大是int的最大值)
方法:
public void put(E e); //存入元素.该方法是阻塞的.当容器中满了的时候,该方法就会停止在那里,等待着容器空
public E take(); //获取元素.该方法是阻塞的.当容器中空的时候,该方法会停止在那里,等待着容器中存入元素
import java.util.concurrent.ArrayBlockingQueue;
public class Demo {
public static void main(String[] args) {
// 创建阻塞队列的对象,容量为 1
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
Foodie foodie = new Foodie(arrayBlockingQueue);
Cooker cooker = new Cooker(arrayBlockingQueue);
foodie.start();
cooker.start();
}
}
import java.util.concurrent.ArrayBlockingQueue;
public class Cooker extends Thread {
private ArrayBlockingQueue<String> bd;
public Cooker(ArrayBlockingQueue<String> bd) {
this.bd = bd;
}
@Override
public void run() {
while (true) {
try {
bd.put("汉堡包");
System.out.println("厨师放入一个汉堡包");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
import java.util.concurrent.ArrayBlockingQueue;
public class Foodie extends Thread{
private ArrayBlockingQueue<String> bd;
public Foodie(ArrayBlockingQueue<String> bd) {
this.bd = bd;
}
@Override
public void run() {
while (true) {
try {
String take = bd.take();
System.out.println("吃货将" + take +"拿出来吃了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2.线程池:
概述:
线程池,就是存储线程的池子.线程池可以创建和回收程池.
线程:由计算机分配的.
申请线程的过程,是一个非常耗时的操作.
线程本身是需要占用系统资源的. 创建:
ExecutorService es = Executors.newCachedThreadPool(); //创建一个默认线程池.(内部最多有int最大值个线程)
ExecutorService es = Executors.newFixedThreadPool(int num); //创建一个线程池.(内部线程最大数量为num)
使用:
submit(Runnable task); //提交任务.(把要执行的任务交给线程池,由线程池分配线程执行)
shutdown(); //关闭线程池
提高效率,节省资源
例子:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestDemo {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
es.submit(new MyRunnable());
es.submit(new Thread(){
@Override
public void run() {
super.run();
}
});
es.shutdown();
}
}
class MyRunnable implements Runnable{
@Override
public void run() {
for (int i = 0; i < 50; i++) {
System.out.println(Thread.currentThread().getName()+":"+i);
}
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyThreadPoolDemo2 {
public static void main(String[] args) {
//创建线程池 最多有10个线程
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(()->{
System.out.println(Thread.currentThread().getName() + "在执行了");
});
executorService.submit(()->{
System.out.println(Thread.currentThread().getName() + "在执行了");
});
//关闭线程池
executorService.shutdown();
}
}
3.ThreadPoolExecutor(了解)
概述:
用于自定义线程池.
构造:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
参数:
corePoolSize: //核心线程的最大值,不能小于0
maximumPoolSize://最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize
keepAliveTime: //空闲线程最大存活时间,不能小于0
unit: //空闲线程最大存活时间的时间单位,一般使用TimeUtil选项
workQueue: //任务队列(排队的数量),不能为null
threadFactory: //创建线程方式,一般为"Executors.defaultThreadFactory()",不能为null
handler: //任务的拒绝策略(多余任务的处理方案),不能为null
拒绝策略:
ThreadPoolExecutor.AbortPolicy: //丢弃任务并抛出RejectedExecutionException异常。是默认的策略。
ThreadPoolExecutor.DiscardPolicy: //丢弃任务,但是不抛出异常 这是不推荐的做法。
ThreadPoolExecutor.DiscardOldestPolicy://抛弃队列中等待最久的任务 然后把当前任务加入队列中。
ThreadPoolExecutor.CallerRunsPolicy: //调用任务的run()方法绕过线程池直接执行。
练习:
public class MyThreadPoolDemo3 {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(2,
3,2, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
//调用线程任务
pool.submit(new MyRunnable());
pool.submit(new MyRunnable());
pool.shutdown();
/*
pool-1-thread-1在执行
pool-1-thread-2在执行
*/
}
}
4.可见性问题
概述:
多线程之间共享数据,在A线程修改数据的情况下,B没有看到A修改后的最新数据,这种现象就是"多线程的可见性问题".
解决:
1:使用volatile关键字,修饰该数据.
public class Test02 {
public volatile static int money = 10000;
public static void main(String[] args) {
Girl girl = new Girl();
girl.setName("小路同学");
girl.start();
Boy boy = new Boy();
boy.setName("小张同学");
boy.start();
}
}
class Boy extends Thread{
@Override
public void run() {
//等待100ms
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//修改money
Test02.money = 9000;
}
}
class Girl extends Thread{
@Override
public void run() {
while(Test02.money == 10000){
}
System.out.println("money不是一万了");
}
}
2:可以使用synchronized加锁解决.
public class Test01 {
public static void main(String[] args) {
Boy boy = new Boy();
boy.setName("小张同学");
boy.start();
Girl girl = new Girl();
girl.setName("小路同学");
girl.start();
}
}
class Money{
public static int money = 10000;
//定义一个锁对象
public static Object lock = new Object();
}
class Boy extends Thread{
@Override
public void run() {
//等待100ms
synchronized (Money.lock) {
try {
Thread.sleep(100);
//修改money
Money.money = 9000;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Girl extends Thread{
@Override
public void run() {
while (true){
synchronized (Money.lock){
if(Money.money != 10000){
System.out.println("money不是一万了");
break;
}
}
}
}
}
5.原子性问题
概述:
原子性指的是"多个操作,要么一起成功,要么一起失败".
多线程中的原子性问题,指的是:多个多线操作共同处理数据,并且出现了数据丢失的这种现象,就叫做"多线程的原子性问题"
解决:
使用原子性安全的类.
10000+10000 可能会出现不等于两万
public class Test {
public static AtomicInteger count= new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
MyThread1 myThread1 = new MyThread1();
MyThread2 myThread2 = new MyThread2();
myThread1.start();
myThread2.start();
Thread.sleep(2000);
System.out.println(Test.count);
}
}
class MyThread1 extends Thread {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Test.count.addAndGet(1);
}
}
}
class MyThread2 extends Thread {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Test.count.addAndGet(1);
}
}
}
6.AtomicInteger
概述: 是一个原子性的Integer类.内部的操作可以保证多线程的原子性.
构造:
public AtomicInteger(); //默认值为0
public AtomicInteger(int num); //默认值为num
方法:
public int addAndGet(int delta); //添加指定的值(可正可负),并返回添加后的值
public int get(); //添加指定的值(可正可负),并返回添加后的值
public class AtomicIntegerTest {
public static void main(String[] args) {
// public AtomicInteger(); 默认值为0
AtomicInteger ac1 = new AtomicInteger();
System.out.println(ac1.get());//0
// public AtomicInteger(int num); 默认值为num
AtomicInteger ac2 = new AtomicInteger(10);
System.out.println(ac2.get());//10
//getAndIncrement 原子当前值加1 返回自增前的值
AtomicInteger ac3 = new AtomicInteger(10);
int andIncrement = ac3.getAndIncrement();
System.out.println(andIncrement);//10
System.out.println(ac3.get());//11
AtomicInteger ac4 = new AtomicInteger(10);
int i = ac3.incrementAndGet();
System.out.println(i);//自增后的值 11
System.out.println(ac4.get());
// public int addAndGet(int delta); 添加指定的值(可正可负),并返回添加后的值
AtomicInteger ac5 = new AtomicInteger(10);
int a = ac5.addAndGet(20);
System.out.println(a);//30
System.out.println(ac5.get());//30
//getAndSet
AtomicInteger ac6 = new AtomicInteger(10);
int andSet = ac6.getAndSet(20);
System.out.println(andSet);//10
System.out.println(ac6.get());//20
// public int get(); 添加指定的值(可正可负),并返回添加后的值
}
}
7.并发包
概述: 并发包就是一些线程安全的集合类.
分类:
ConcurrentHashMap //线程安全的HsahMap
CopyOnWriteArrayList//线程安全的ArrayList
CopyOnWriteArraySet //线程安全的HsahSet
public class ConcurrentHashMapTest {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String,String> ch = new ConcurrentHashMap();
Thread t1 = new Thread(()->{
for (int i = 0; i < 25; i++) {
ch.put(i+"",i+"");
}
});
Thread t2 = new Thread(()->{
for (int i = 25; i < 51; i++) {
ch.put(i+"",i+"");
}
});
t1.start();
t2.start();
System.out.println("--------------------");
//为了让t1和t2有足够的时间
Thread.sleep(1000);
for (int i = 0; i < 51; i++) {
System.out.println(ch.get(i+""));
}
}
}
8.CountDownLatch
概述: 在多线程情况下,可以控制线程的执行顺序.确定某个线程在其他线程执行之后再执行.
构造:
public CountDownLatch(int num); //创建一个对象,指定一个计数器
方法:
public void await(); //当前线程等待,当计数器归零的时候,自动唤醒等待的线程
public void countDown();//计数器-1
public class CountDownLatchTest {
public static void main(String[] args) {
//创建CountDownLatch对象
CountDownLatch countDownLatch = new CountDownLatch(3);
//创建4个线程对象并开启
Mother mother = new Mother(countDownLatch);
mother.start();
Child1 c1 = new Child1(countDownLatch);
c1.setName("小美");
Child2 c2 = new Child2(countDownLatch);
c2.setName("小在");
Child3 c3 = new Child3(countDownLatch);
c3.setName("小分");
c1.start();
c2.start();
c3.start();
}
}
class Mother extends Thread{
private CountDownLatch countDownLatch;
public Mother(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
//等待
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("收拾碗筷");
}
}
9.Semaphore
概述: 可以控制"同时运行的线程的数量".
构造:
public Semaphore(int num); //最多允许同时运行num个线程(可以少,但不可以多)
方法:
void acquire(); //获取"许可证",运行线程运行
void release(); //释放"许可证"
public class SemaphoreTest {
public static void main(String[] args) {
//创建对象
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 10; i++) {
new Thread(myRunnable).start();
}
}
}
class MyRunnable implements Runnable {
//获得管理员对象
private Semaphore semaphore = new Semaphore(2);
@Override
public void run() {
//获得通行证
try {
semaphore.acquire();
//开始行驶
System.out.println("获得了通行证开始行驶");
Thread.sleep(2000);
System.out.println("归还通行证");
//归还通行证
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}