进程线程协程
1.进程 QQ.exe
2.线程
- 三种启动线程的方式
继承Thread
实现Runnable
线程池启动
3.sleep yield jion
- 调用yield的线程,进入到就绪等待状态,等待再次执行
- B 线程等待A 线程直接结束后执行
- t1 调 t2 的执行流程
线程状态
(1)new
(2)runnable(ready–running 调度器是线程从ready到running)
(3)Teminated
(4)TimedWaiting (sleep wait join LockSupport.parkNanos LockSupport.partUntil 到一定时间回到running)
(5)Waiting(wait join LockSupport.part)(notify noifyall LockSupport.unpart 结束等待回到running)
(6)Blocked(阻塞) sync没有获得锁
- new ready running teminated TimedWaiting(等下又时间限制) Waiting(一直等) Blocked
- runnable(ready->running) TimedWaiting,Waiting,Blocked
- runnable(runnging-ready) yield
怎么关闭线程
- 不要关闭线程,要让线程正常结束)
- 正常结束
- 不建议使用stop
Interrupt
- 在 一个线程中sleep很久, 可以通过Interrupt被打断,通过这个异常进行后续的处理
synchronized 锁sync
给某个对象上锁,线程获得锁以后可以执行对应的代码
锁定当前对象,拿到锁才可以执行对应的代码
静态方法的synchronized
-每一个class文件例如 T,加载到内存中,会生成T的对象,对内存中的代码对应,静态方法锁的对象,就是这个生成的对象,如上
synchronized 是可重入锁
- (面试题)一个同步方法可以调用另外一个同步方法 一个已经拥有的锁的线程 再次申请任然可以得到这把锁
通过父子继承关系 解释 synchronized 是可重入性
程序产生异常 锁会释放
synchronize的底层实现
JDK 早期 重量级实现 找os 申请锁
后来改进
锁升级的概念
jdk 1.5 synchronsize
synchronized(Object)
markword 在objects上 记录这个线程的id (偏向锁)
如果有线程征用,升级为自旋锁
10次以后
升级为重量级锁 - os
偏向锁 -》 自旋锁 -》 重量级锁
执行时间短(极速代码)线程数少 用自旋锁
执行时间长 线程数多、用系统锁
volatile
- 保持线程可见性
堆内存 所有线程共享的内存
每个线程有自己的工作内存
如果要操作公共区域的某个值 需要拷贝到自己的内存中 修改后写会公共区域
值发生改变 如果不加volatile 其他是不知道的 如果加上 其他线程就会知道 值发生改变
** volatile 一个线程改变这个值,另一个线程就能看到 **
volatile 保持线程的可见性
- 禁止指令重排序
现代cpu 执行指令 并发执行指令 b=4 a=3 b赋值为5
如果值发生改变 不对原有的指令执行顺序进行重新排序
单例模式 双重检查后 是否需要加volatile 需要 就是禁止指令重排序
new A() 1.在堆内存中开辟空间 2.成员变量初始化并赋值 3. 将内存内容赋值给
如果指令重排序 就会打乱原来指令执行的步骤 另外一个线程 拿到的值并不是最后赋值后的值 加入 volalite 不会打乱指令执行顺序
- 不能保证原子性 ( volatile 不能取代synchronize)
以上程序只有给m方法加上 synchrinize 才会执行正常
synchronize 优化
锁 粗化 (锁太多 代码块加大)
锁 细化 (精确到某几行)
锁对象的改变
锁定的对象o的属性 如果发生改变不影响锁 如果哦变成另外一个对象,那么一定会产生问题
- 在锁对象上加final
- 不要用String 类型作为锁
CAS(无锁优化 自旋 乐观锁)
AtomicInteger 线程安全
AtomicInteger 原理
内部通过unSafe 中的CompareAndSet (cas 操作实现的)
Compare And Set
cas( 要改的值v,期望值E,新值){
if V==E
V=New
otherwise try again or fail
CPU 原语支持 不能被打断
}
ABA 问题 (解决方案加version)
1 => 2 => 1
从1到2 2到1
cvs(o ,1,2)
解决方式加版本
int 不会与问题 如果是对象 就会有问题
unSafe
- 直接操作jvm中的内存 类似于 C C++ 直接操作内存的能力
- freeMemory
- allocateMenory
synchronize ++
AtomicLong ++(乐观锁 cas)
LongAdd ++ (内部分段锁 分段所里面又是cas操作)
ReentrantLock (cas) 可重入锁
- synchronize 可重入
- ReentrantLock 可以替代synchronize 但是必须手动解锁
ReentrantLock tryLock
ReentrantLock LockInterruptibly
ReentrantLock
- 公平锁 线程到来以后,看看等待队列中是否有其他等待抢锁的,排在后面,不直接去抢
- 不公平 第一个线程一直快速抢lock
- 公平锁
CountDownLatch
-
设置门栓值 使用 countdown 知道为零 latch.await以后的代码才能执行
-
也可使使用join 等待所有线程执行完成 再执行后续
CyclicBarrier
- 应用场景
并发执行 操作网络 文件 数据库
这三个 操作都并发完成了以后 才执行后续操作 可以使用 cyclicBarrier
Phaser 阶段
- 遗传算法可能使用 略
ReadWriteLock 读写锁 (共享锁 排它锁)
- 使用ReentrantLock
– 使用ReentrantReadWriteLock
- 读写锁可以使得所有读的操作同时进行,提高效率
Semaphore 信号灯
- 可以用来处理限流
exchangeer 交换
LockSupport
- 让当前线程阻塞 再唤醒继续执行
unpark() 可以在 park() 前调用
面试题
实现一个容器 提供 add size 方法
写两个线程 线程1 添加10个数据到容器中 线程2 监控元素的个数 当个数到5 线程2给出提示并结束
-
方法一
-
对象.wait() 对象.notify() 客户唤醒 但是notify 并不释放锁
-
怎么解决不释放锁的问题 在 notify 以后 紧接着自己wait 才能释放锁
-
方法二 countdownlatch 双门栓
- 方法三 LockSupport 两个LockSupport
面试题
设计一个固定的同步容器 有put get 方法以及getCount 方法
能够支持2个生产者线程和10个消费者线程
使用wait notify /notifyall
使用 ReentrantLock condition 精确唤醒消费者或者生产者
condition 本质是等待队列
AQS及源码阅读技巧
ReentrantLock Sync(NonFieSync) AQS(abstractQueuesSynchronizer)
lock acquire(1) acquire(1)
tryAcquire(1) 重写 tryAcquire(1)
nonfairTryAcquire 非公平
AQS (结构)核心是Volatile+CAS
1.volatile 修改 state 0 1 代表了 是否加锁
2.queue 是双向链表,存放的是Thread,
3.采用CAS的方式,控制末尾节点的多线程插入链表数据
4.内个节点都判断前一个阶段是否是第一个节点,第一个节点是锁的抢夺位置,如果判断前面的节点释放了锁,自己成为头结点
5.lock 流程 首先尝试抢锁,如果有人占用就进入队列,在队列里尝试获得锁,(判断前一个节点是否是第一个节点,是否拿到了锁,等待头结点释放锁,自己成为头结点)
VarHandle (JDK 1.9)
- VarHandle 也是指向对象的另一个引用,目的就是为了可以直接读写,关键是可以进行原子性的修改值
- 例如CompareAndSet getAndADD
- 除了可以进行不同操作,还可以完成一些原子性的线程安全的操作
ThreadLocal
- ThreadLocal set的值是每个线程独有的,其他线程无法使用
ThreadLocal 源码
- ThreadLocal 调用set 方法的时候,其实是将存入的对象和该线程以键值对的形式,存入一个Map,Map<ThreadLocal,Object>,而这个map是在,currentThread 中维护的,也就是当前调用的线层
- Thread.currentThread.map<ThreadLocal,Object>
为什么要用ThreadLocal
Spring 声明式事务 Spring+Mybits 每个连接从数据库连接池中获取Connection,为了保证同一个Connection,这时候就要将连接对象,存在ThreadLocal的当前线程的Map中,下次直接从这里面拿
强软弱虚
- 强 引用不为空 gc 不回收
-
软 一个对象,只有软引用指向它,内存不够用的时候,gc 才回收
-
设置堆内存20M
-
软引用10M gc 不会被回收
-
强13M 堆内存不够用 就会回收
-
弱 遇到gc 就会被回收 如果还有强引用指向他 只要强引用释放 它就应该被回收
- ThreadLocal tl = new ThreadLocal();
- tl 强引用指向ThreadLocal对象
- tl set() 实际是存在Thread 中threadLocals(Map)
- threadLocals(Map) <Threadlocal,obejct>
- Entry 是Threadlocal
-
ThreadLocal 使用必须完毕remove() 否则会导致内存泄漏
-
虚 (管理堆外内存)
容器
- 容器(Collection Map 数组)
- 物理存储(连续列表存储 链表存储)
jdk 早期设计 vector 和 hashtable 目前基本上不使用了
Vector HashTable 自带锁
Hashtable>Hashmap>synchorbizedHashMap>ConcurrentHashMap
ConcurrentHashMap 插入效率低 读取效率高
public class T01_TestConnHashMap {
static Map<UUID,UUID> m = new ConcurrentHashMap<UUID,UUID>();
static int count = 1000000;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static int THREAD_COUNT = 100;
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
static class MyThread extends Thread{
int start;
int gap = count/THREAD_COUNT;
public MyThread(int start){
this.start = start;
}
@Override
public void run() {
for (int i = start; i < start+gap; i++) {
m.put(keys[i],values[i]);
}
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i <threads.length ; i++) {
threads[i] = new MyThread(i* (count/THREAD_COUNT));
}
for (Thread t :threads){
t.start();
}
for (Thread t:threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println(end-start);
System.out.println(m.size());
//=======
start = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j <1000000 ; j++) {
m.get(keys[10]);
}
});
}
for (Thread t :threads){
t.start();
}
for (Thread t:threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end = System.currentTimeMillis();
System.out.println(end-start);
}
}
vector
vector>list>synchronizedList>queue(ConcurrentLinkedDeque)
多线程程序 多考虑queue 不用list
ConcurrentLinkedDeque 通过cas实现
public class TickerSeller4 {
static Queue<String> tickets = new ConcurrentLinkedDeque<>();
static {
for (int i = 0; i < 1000; i++) {
tickets.add("票编号"+i);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(()->{
while (true){
String str = tickets.poll();
if(str==null) break;
else System.out.println("销售了"+str);
}
}).start();
}
}
}
- ConcurrentHashMap(无顺)
- 跳表结构 ConcurrnetSkipListMap (有序)(针对tree Map)
- CopyOnWirteList 写实复制
read 不用加锁
write 加锁
write 复制数据Array+1,加入数据,将原来的引用指向的数组
场景 对特别多 写少
queue (ConcurrentQueue)
-
ConcurrentDuque
-
LinkedBlockQueue 无界阻塞队列
-
put take 自带阻塞
-
BlockQueue 天生带有生产者和消费者模型
-
ArrayBlockQueue 有界阻塞队列
-
DelayQueue
-
take 通过时间规则顺序拿
-
应用: 按时间进行任务调度
-
PriorityQueue
-
是通过二叉树树结构 排序的
-
SynchronusQueue 线程间传递数据
-
容量为零 一个线程put 必须有另一个线程take
-
TransferQueue
-
一个线程 提交了个东西 wait 到这个状态, 必须有个结果,才会结束
list 和queue的区别
List 和 queue 的区别
queue 对线程是友好的
提供了 offer peek poll 等方法
LinkedBlockingQueue 提供了 put take
面试题
要求用线程顺序打印A1B2C3D4....Z26
- wait notify 变种
-
线程池
Executor (接口)
ExecutorService extends Executor
- 继承Executor 实现了一些生命周期方法
Callable
public intreface Callable<V>{
V call() throws Exception;
}
Future
- Future Callable ExectorService.submit Future.get()
ExectorService service = Exectors.newCachedThreadPool();
Future f = service.submit( Callable);
submit 异步的 提交Callable任务 不阻塞当前线程,记性结束后,返回结果Future对象
f.get() 方法是阻塞的
FutureTask (runnable 和Future结合)
- 是一个Future 又是一个task
- 是一个任务 又可以保存运行结果
- 实现 Runnable Future接口
CompletableFuture
线程池
ThreadPoolExecutor
- 线程池的执行器
手动创建线程池
- 线程池 其实是由一个线程的队列 一个任务的队列 两个组成
-
参数一: 核心线程
-
参数二: 最大线程
-
参数三: 线程不运行 多长时间归还操作系统
-
参数四: 时间单位
-
参数五: 任务容器 BlockQueue { ArrayBlockQueue(4)[长度为四的对了] LinkedBlockQueue (最多Integer最大值个任务) SynchroinzerQueue 一个任务执行不玩另一个进不来,TransferQueue 线程处理完成任务后才能离开}
-
参数六: 创建Thread 的方式
-
参数七: 线程池忙 任务队列满 执行拒绝策略
四种拒绝策略
-
newSingleThreadPool
-
Executor 线程池工厂
-
newSingleThreadPool 一个线程 保证任务的执行顺序
-
newCacheThreadPool
来一个任务 必须马上启动一个线程执行 最大线程数 Integer.maxvalue
queue 是synchronizeQueue 手递手的queue -
newFixedThreadPool 指定线程数 queue LinkedBlockingQueue
-cache vs Fixed -
newScheduledThreadPool
-
定时任务线程池
-
DelayedWorkQueue 多长时间后运行
-
SingleThreadExecutor 单线程的线程池 有任务队列 能够管理线程 生命周期
-
ThreadPoolExecutor 继承自 ExecutorService
ThreadPoolExecutor 源码解析
-
执行task 任务
线程池空状态,接受到task 启动核心线程,在来一个task 再启动核心线程,核心线程满后,task 进入队列 队列满了,启动新线程,启动的线程大于最大线程,采取废弃策略 -
addworker 源码解析
- 线程池worker 任务单元
- 核心线程执行逻辑 - runworker
ForkJoinPool
ForkJoinPool 多个线程 都有自己的任务队列 自己线程执行完成以后从别人的队列中拿
- 类似于大数据的 mapreduce 先 fork 后 join
- 分解汇总任务