前言
遥想当年大二,实习面试的时候,面试官一个问题:操作系统最小的调度单元是什么?当时还没学过操作系统,只知道进程的概念,于是乎信心满满的答道,当然是进程啊,然后......就没有然后了。
之后再看这个问题,其实就是一个笑话。操作系统调度的最小单元其实是线程。现在想想当时,自己大二就敢跑出去实习也是服了自己。
zookeeper看得头大,虽然看懂了一点,但是实在不敢写总结,因为也无从写起,paxos算法和ZAB协议想总结清楚,现在火候还不够。所以打算暂时放一放zookeeper,先学习并发编程部分。
何时使用多线程
阻塞
可能这个词经常听,但是真正的要解释清楚还是有难度的。阻塞:程序在运行到某一个函数或者过程需要等待某些事情完成才能进行后面的操作,在等待期间会暂时停止对CPU的占用情况,这个时候CPU会闲置。
多线程的目的就是为了高效率的利用CPU,减少CPU的闲置时间。回到主要的问题——什么情况下该使用多线程,个人觉得有以下几点:
1、通过并行计算提高程序执行性能。
2、等待网络的部分。
3、I/O响应导致耗费大量的执行时间
等等等......
这些概念只是简单了解一下,后面如果涉及到BIO,NIO等概念可以结合这里看看。
如何启动线程
这个问题真的很老了,在进新网实习面试的时候就被问到了这个问题。这个问题很多人会回答两种,但是在Java中不止两种。
通常的有以下几种
1、继承Thread类
这个启动线程不需要包装,直接调用Thread类型的对象的start方法就可以启动线程(不是run),start是一个native方法
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread "+Thread.currentThread().getId()+" is running");
}
}
2、实现Runnable接口
与继承Thread类差不多,但是再启动线程的时候,需要包装一下
public class MyRunnable implements Runnable {
public void run() {
System.out.println("Thread "+Thread.currentThread().getId() +" is running");
}
}
3、实现Callable接口
这种方式实现的线程,可以有返回值。这种方式在启动线程的时候需要将其交给ExecutorService(这个类常用于构建线程池)去启动。
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
* comment:
* 通过FutureTask包装器来创建Thread线程
*/
public class CallableThread implements Callable<String> {
public String call() throws Exception {
int a = 1;
int b = 2;
System.out.println(a+b);
/**
* 如果处理事件过长,客户端会阻塞
* TimeUnit.SECONDS.sleep(20);
*/
return "执行结果:"+(a+b);
}
}
测试代码:
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
*/
public class ThreadDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread thread01= new MyThread();
MyThread thread02 = new MyThread();
thread01.start();
thread02.start();
MyRunnable runnable01 = new MyRunnable();
MyRunnable runnable02 = new MyRunnable();
//Runnable接口,需要通过Thread包装一下
Thread RunThread01 = new Thread(runnable01);
Thread RunThread02 = new Thread(runnable02);
RunThread01.start();
RunThread02.start();
//获得Callable接口的返回结果
ExecutorService executorService = Executors.newFixedThreadPool(1);
CallableThread callableThread = new CallableThread();
Future<String> future = executorService.submit(callableThread);
//客户端真正阻塞在这里,获取线程返回数据的时候,如果线程没有将数据准备好,这里会阻塞。
System.out.println(future.get());
executorService.shutdown();
}
}
玩玩责任链
合理的利用异步操作,可以提升程序处理性能,通过阻塞队列以及多线程的方式,实现对请求的异步化处理可以很好的提高程序性能,这个在zookeeper源码中是有提现的(奈何zookeeper现在依旧没看懂)
这里先需要了解几种阻塞队列的功能——LinkedBlockingQueue与ArrayBlockingQueue
请求的简单描述实例
package com.learn.ProcessorChain;
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
* comment:
* 请求的简单描述
*/
public class Request {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Request{" +
"name='" + name + '\'' +
'}';
}
}
处理请求的标记接口,对请求的不同处理类都需要实现这个接口
package com.learn.ProcessorChain;
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
* comment:
* 处理请求的标记接口
*/
public interface RequestProcessor {
void processRequest(Request request);
}
简单的打印处理
这里就用到了LinkedBlockingQueue,这个队列的take方法会阻塞,在没有获取到元素的时候会一直阻塞,直到获取到元素为止。
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
*/
public class PrintProcessor extends Thread implements RequestProcessor {
/**
* 业务逻辑链
* 这里用阻塞队列的原因,就是在没有任务的时候,take操作会阻塞,如果有元素了,take操作会被唤醒
*/
LinkedBlockingQueue<Request> requests = new LinkedBlockingQueue<Request>();
//下一个任务
private final RequestProcessor nextProcessor;
public PrintProcessor(RequestProcessor nextProcessor){
this.nextProcessor = nextProcessor;
}
@Override
public void run() {
while(true){
try{
//队列中没有元素会在这里阻塞
Request request = requests.take();
//这里是处理当前任务
System.out.println("begin print request data : "+request.getName());
//处理下一个任务
nextProcessor.processRequest(request);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
//处理请求
public void processRequest(Request request) {
//先将请求放到任务链中,后面统一处理
requests.add(request);
}
}
简单的保存处理
package com.learn.ProcessorChain;
import java.util.concurrent.LinkedBlockingQueue;
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
*/
public class SaveProcessor extends Thread implements RequestProcessor {
LinkedBlockingQueue<Request> requests = new LinkedBlockingQueue<Request>();
public void processRequest(Request request) {
requests.add(request);
}
@Override
public void run() {
while(true){
try{
Request request = requests.take();
System.out.println("begin save request info :"+request);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
测试主类:
package com.learn.ProcessorChain;
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
* comment:
* 链式处理任务
*/
public class RequestChainDemo {
PrintProcessor printProcessor;
protected RequestChainDemo(){
SaveProcessor saveProcessor =new SaveProcessor();
saveProcessor.start();
printProcessor = new PrintProcessor(saveProcessor);
printProcessor.start();
}
private void doTest(Request request){
printProcessor.processRequest(request);
}
public static void main(String[] args) {
Request request = new Request();
request.setName("Liman");
new RequestChainDemo().doTest(request);
}
}
说明:在对象初始化的时候,就已经设置的责任链,并且启动了两个线程,但是这两个处理线程目前没有待处理的请求,这个两个线程就一直阻塞,在printProcessor真正接受请求的时候开始,线程才开始真正的处理业务逻辑。
并发编程基础
前面的都是针对之前自己的认识做一个简单的回顾,这里才是正在的正文开始。
线程状态
线程才是操作系统调度的最小单元。线程的调度并没有想象中的灵活,这很容易造成问题。
通过查看Thread源码可以看到常说的线程状态有6种(NEW,RUNNABLE,BLOCKED,WAITING,TIME_WAITING,TERMINATED),简单一张图就可以总结,这个其实网上有很多了。
说明:其中的RUNNABLE为了更好的与实际贴合,拆分成了两种状态,一种是ready,另一种是running
查看行程的示例代码:
package com.learn.ThreadStatus;
import java.util.concurrent.TimeUnit;
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
* comment:
* 线程的状态
*/
public class ThreadStatus {
public static void main(String[] args) {
//TIME-WAITING
new Thread(()->{
while(true){
try{
TimeUnit.SECONDS.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
},"time_waiting").start();
//WAITING
new Thread(()->{
while(true){
synchronized (ThreadStatus.class){
try{
ThreadStatus.class.wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
},"waiting").start();
//线程在ThreadStatus加锁后,不会释放锁
new Thread(new BlockedDemo(),"BlockDemo-01").start();
new Thread(new BlockedDemo(),"BlockDemo-02").start();
}
static class BlockedDemo extends Thread{
@Override
public void run() {
synchronized (BlockedDemo.class){
while(true){
try{
TimeUnit.SECONDS.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
}
}
可以调用jps查看虚拟就中的pid,然后利用jstack根据pid查看各个线程状态。
线程的停止
记得之前《Java多线程核心编程技术》一书中介绍了三种线程停止的方式(算上stop),这里不总结stop方式。
1、通过interrupt标记位停止线程
package com.learn.ThreadStop;
import java.util.concurrent.TimeUnit;
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
* comment:
* 通过interrupt方法设置标志位,停止线程
* Thread.interrupt会重置标志位
*/
public class InterruptDemo {
private static int i;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()->{
while(!Thread.currentThread().isInterrupted()){
i++;
}
Thread.interrupted();//标志位复位
System.out.println("内部:"+Thread.currentThread().isInterrupted());
System.out.println("Num:"+i);
},"interruptDemo");
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
System.out.println(thread.isInterrupted());
Thread thread02 = new Thread(()->{
while(true){
boolean ii = Thread.currentThread().isInterrupted();
if(ii){
System.out.println("before : "+Thread.currentThread().getName()+" "+ii);
Thread.interrupted();//对线程进行复位,中断标志为false
System.out.println("after : "+Thread.currentThread().getName()+" "+Thread.currentThread().isInterrupted());
}
}
},"Thread02");
thread02.start();
TimeUnit.SECONDS.sleep(1);
thread02.interrupt();
}
}
上述的代码可以做相应的删减,明确thread的interrupt的使用方式
2、利用volatile变量停止线程
volatile修饰的变量会让该变量对所有的线程可见,这个字段也会用在线程通信。但是比较影响性能
package com.learn.ThreadStop;
/**
* author:liman
* createtime:2018/9/17
* mobile:15528212893
* email:[email protected]
* comment:
* 通过volatile字段中止线程
*/
public class VolatileInterruptDemo {
//volatile字段让该变量对所有的线程可见
private volatile static boolean stop = false;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()->{
int i = 0;
while(!stop){
i++;
}
System.out.println(i);
});
thread.start();
System.out.println("begin start thread");
Thread.sleep(1000);
stop = true;//主线程中将stop标记设置为true之后,子线程就会停止
}
}
线程的安全问题
线程的安全问题可以总结为可见性、原子性和有序性三个问题。
可见性:当一个对象在多个内存中都存在副本时,如果一个内存修改了共享变量,其它线程也应该能够看到被修改后的值,此为可见性。
一个运算赋值操作并不是一个原子性操作,多个线程执行时,CPU对线程的调度是随机的,我们不知道当前程序被执行到哪步就切换到了下一个线程,一个最经典的例子就是银行汇款问题,一个银行账户存款100,这时一个人从该账户取10元,同时另一个人向该账户汇10元,那么余额应该还是100。那么此时可能发生这种情况,A线程负责取款,B线程负责汇款,A从主内存读到100,B从主内存读到100,A执行减10操作,并将数据刷新到主内存,这时主内存数据100-10=90,而B内存执行加10操作,并将数据刷新到主内存,最后主内存数据100+10=110,显然这是一个严重的问题,我们要保证A线程和B线程有序执行,先取款后汇款或者先汇款后取款,此为有序性。上述描述其实也已经包含了原子性,针对余额的操作需要做成原子性。
线程安全问题,大致可以分为上述三个问题,这三个问题在不同硬件层面有着不同的解决方案,Java为了屏蔽不同硬件的差别,提出了JMM的抽象,针对JMM的抽象会在下一篇博客中进行描述。