文章目录
1. 学习方法
任何一种东西的学习都可以遵循一个公式:
场景 --> 需求 --> 解决方案 --> 应用 --> 原理
例如: 学习Dubbo,分布式 --> 服务治理 --> dubbo --> dubbo应用 --> 了解dubbo原理
2. 并发编程的发展历史
DougLea是并发编程的创始人
- 真空管/穿孔打卡
- 晶体管/批处理操作系统
2.1 真空管/穿孔打卡
程序员编写程序 --> 把代码转化成在卡片上打孔 --> 输入室将卡片录入机器 --> 机器将运行结果打印到纸上
存在问题: 机器在大部分时间内处于空闲状态
2.2 晶体管/批处理操作系统
解决问题: 每个环节都是批量操作,不需要等待上一个流程的数据而一直处于工作的的状态
存在问题: 存在IO问题,每一个处理环节都是串行处理,如果该环节出现io阻塞的情况,该环节一直会一直处于阻塞状态,cpu资源浪费
[外链图片转存失败(img-xXxY11Aa-1567594417235)(evernotecid://DE37D598-33A4-4708-800A-6D4CF841DDE2/wwwevernotecom/149352153/ENResource/p290)]
2.3 集成电路/多道程序设计
引入CPU切换进程处理时间篇,如果进程A阻塞,由CPU切换处理时间到进程B,这样CPU资源不存在浪费
[外链图片转存失败(img-9m5kMbBj-1567594417238)(evernotecid://DE37D598-33A4-4708-800A-6D4CF841DDE2/wwwevernotecom/149352153/ENResource/p292)]
3. 线程在java中的应用
3.1 创建线程的方式
- 集成Thread类(Thread类是对Runnable接口的实现)
- 实现Runnable接口
- Callable/Future带返回值得线程
- 线程池ThreadPool
4. 多线程的实际应用场景
4.1 多线程优势
- 吞吐量:你做WEB,容器帮你做了多线程,但是他只能帮你做请求层面的.简单的说,可能就是一个请求一个线程,或多个请求一个线程,如果是单线程,那同时只能处理一个用户的请求
- 伸缩性:也就是说,你可以通过增加CPU核数来提升性能,如果是单线程,那程序执行到死也就利用了单核,肯定没办法通过增加CPU核数来提升性能
假设有个请求,这个请求服务端的处理需要执行3个很缓慢的IO操作(比如数据库查询或文件查询),那么正常的顺序可能是(括号里面代表执行时间)
a、读取文件1 (10ms)
b、处理1的数据(1ms)
c、读取文件2 (10ms)
d、处理2的数据(1ms)
e、读取文件3 (10ms)
f、处理3的数据(1ms)
g、整合1、2、3的数据结果 (1ms)
单线程总共就需要34ms,如果你在这个请求内,把ab、cd、ef分别分给3个线程去做,就只需要12ms了
4.2 应用场景
- web服务器本身
- 各种专用服务器(如游戏服务器)
- 后台任务,例如:定时向大量(100w以上)的用户发送邮件
- 异步处理,例如:发微博,记录日志等
- 分布式计算
5. 线程的生命周期
5.1 生命周期
Java的线程状态被定义在公共枚举类java.lang.Thread.state中,一种有六种状态
- 新建(NEW): 表示线程新建出来还没有被启动的状态
- 运行(RUNNABLE): 表示线程正在运行,也可能是线程在就绪队列里等待运行.对这两种情况Java没有提供特别的语义
- 阻塞(BLOCKED): 线程在等待获得锁.比如线程尝试通过synchronized获得某个锁,但是该锁已经被其他线程占用了.这时线程就处于阻塞状态
- 等待(WAITING): 线程在等待某种资源就绪.举个例子,对于生产者消费者模式,消费者准备获取某个数据,但是生产者还没有准备好,这时线程就处于等待状态.wait()方法可以使线程处于等待状态,notify()可以解除这种等待.Thread.join()也可以使线程处于等待状态
- 计时等待(TIMED_WAIT): 线程进入条件和等待类似,但是它调用的是带有超时时间的方法
- 终结(TERMINATED): 线程正常退出或异常退出后,就处于终结状态.也可以叫线程的死亡
[外链图片转存失败(img-bcBoOYW5-1567594417239)(evernotecid://DE37D598-33A4-4708-800A-6D4CF841DDE2/wwwevernotecom/149352153/ENResource/p294)]
5.2 代码实现
package com.zhunongyun.toalibaba.distributed.part1.threadstatus;
import java.util.concurrent.TimeUnit;
/**
* 线程六种状态实现
*/
public class ThreadStatusTest {
public static void main(String[] args) {
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(100);
} catch (Exception e) {
}
}
}, "Time_Waiting_Thread").start();
new Thread(() -> {
while (true) {
synchronized (ThreadStatusTest.class) {
try {
ThreadStatusTest.class.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, "Waiting_Thread").start();
new Thread(new BlockedDemo(), "Blocked_Thread_01").start();
new Thread(new BlockedDemo(), "Blocked_Thread_02").start();
}
static class BlockedDemo implements Runnable {
@Override
public void run() {
synchronized (BlockedDemo.class) {
while (true) {
try {
TimeUnit.SECONDS.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
}
运行测试类ThreadStatusTest,在IDEA中找到ThreadStatusTest对应的class目录,右键点击Open In Terminal,输入命令jps查看ThreadStatusTest的线程ID,通过jstack查看该线程的信息
- Time_Waiting_Thread 线程执行
TimeUnit.SECONDS.sleep(100)
,sleep 100秒,线程状态是TIMED_WAITING(sleeping) - Waiting_Thread 线程执行
ThreadStatusTest.class.wait()
, 线程状态是WAITING (on object monitor) - Blocked_Thread_01 与 Blocked_Thread_02 两个线程随机抢到BlockedDemo类的锁,本次是Blocked_Thread_01线程抢到了锁,执行
TimeUnit.SECONDS.sleep(100)
, sleep 100秒,线程状态是TIMED_WAITING(sleeping) - Blocked_Thread_02 线程没有抢到BlockedDemo类的锁,线程状态是BLOCKED (on object monitor)
[外链图片转存失败(img-2JULra2f-1567594417240)(evernotecid://DE37D598-33A4-4708-800A-6D4CF841DDE2/wwwevernotecom/149352153/ENResource/p295)]
从上面的测试类运行的结果看,在写代码的时候要设置线程名称,这样在定位问题的时候可以定位到你的线程的状态以及问题,能快速排查和定位问题
6. 线程的基本操作-启动/停止
6.1 线程的启动
6.1. start()与run()
-
start(): 方法来启动线程,真正实现了多线程运行,这时无需等待run方法体代码执行完毕,可以直接继续执行下面的代码;通过调用Thread类的start()方法来启动一个线程, 这时此线程是处于就绪状态, 并没有运行, 然后通过此Thread类调用方法run()来完成其运行操作的, 这里方法run()称为线程体,它包含了要执行的这个线程的内容, Run方法运行结束, 此线程终止,然后CPU再调度其它线程
-
run(): 方法当作普通方法的方式调用,程序还是要顺序执行,要等待run方法体执行完毕后,才可继续执行下面的代码,程序中只有主线程——这一个线程,其程序执行路径还是只有一条,这样就没有达到多线程的目的
6.1.2 多线程运行原理
线程调用start()后,线程会被放到等待队列,状态为READY,等待CPU调度,并不一定要马上开始执行,只是将这个线程置于可动行状态,然后通过JVM,线程Thread会调用run()方法,执行本线程的线程体
先调用start后调用run,这么麻烦,为了不直接调用run?就是为了实现多线程的优点,没这个start不行
http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/native/java/lang/Thread.c
6.1.3 线程Thread启动为什么是调用start()
查看Thread类的源码start(),查看返现start()调用的是start0(),start0()是native方法属于jdk底层方法,可以通过openjdk查看源码,在JVM中对应的是JVM_StartThread
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}
private native void start0();
Thread JDK源码地址:
http://hg.openjdk.java.net/jdk8u/jdk8u40/jdk/file/c7bbaa04eaa8/src/share/native/java/lang/Thread.c
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
在jvm.cpp文件中可以找到JVM_StartThread
方法对应的源码,在这个方法中执行native_thread = new JavaThread(&thread_entry, sz);
调用底层系统创建线程,然后执行Thread::start(native_thread);
,调用start方法启动线程
代码地址:http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/68577993c7db/src/share/vm/prims/jvm.cpp
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;
// We cannot hold the Threads_lock when we throw an exception,
// due to rank ordering issues. Example: we might need to grab the
// Heap_lock while we construct the exception.
bool throw_illegal_thread_state = false;
// We must release the Threads_lock before we can post a jvmti event
// in Thread::start.
{
// Ensure that the C++ Thread and OSThread structures aren't freed before
// we operate.
MutexLocker mu(Threads_lock);
// Since JDK 5 the java.lang.Thread threadStatus is used to prevent
// re-starting an already started thread, so we should usually find
// that the JavaThread is null. However for a JNI attached thread
// there is a small window between the Thread object being created
// (with its JavaThread set) and the update to its threadStatus, so we
// have to check for this
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
// We could also check the stillborn flag to see if this thread was already stopped, but
// for historical reasons we let the thread detect that itself when it starts running
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
// Allocate the C++ Thread structure and create the native thread. The
// stack size retrieved from java is signed, but the constructor takes
// size_t (an unsigned type), so avoid passing negative values which would
// result in really large stacks.
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);
// At this point it may be possible that no osthread was created for the
// JavaThread due to lack of memory. Check for this situation and throw
// an exception if necessary. Eventually we may want to change this so
// that we only grab the lock if the thread was created successfully -
// then we can also do this check and throw the exception in the
// JavaThread constructor.
if (native_thread->osthread() != NULL) {
// Note: the current thread is not being used within "prepare".
native_thread->prepare(jthread);
}
}
}
if (throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}
assert(native_thread != NULL, "Starting null thread?");
if (native_thread->osthread() == NULL) {
// No one should hold a reference to the 'native_thread'.
delete native_thread;
if (JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted(
JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
"unable to create new native thread");
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}
Thread::start(native_thread);
JVM_END
在thread.cpp中可以查看到Thread::start方法的代码,线程会固定去调用start方法,所以启动线程只能是start方法
代码地址: http://hg.openjdk.java.net/jdk8u/jdk8u40/hotspot/file/68577993c7db/src/share/vm/runtime/thread.cpp
void Thread::start(Thread* thread) {
trace("start", thread);
// Start is different from resume in that its safety is guaranteed by context or
// being called from a Java method synchronized on the Thread object.
if (!DisableStartThread) {
if (thread->is_Java_thread()) {
// Initialize the thread state to RUNNABLE before starting this thread.
// Can not set it after the thread started because we do not know the
// exact thread state at that time. It could be in MONITOR_WAIT or
// in SLEEPING or some other state.
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
java_lang_Thread::RUNNABLE);
}
os::start_thread(thread);
}
}
6.1.4 线程终止
- Thread.interrupt()
- Thread.interrupted()
- 抛出InterruptedException
interrupt()只是给线程设置一个中断标志,线程仍会继续运行,需要结合isInterrupted()使用停止线程
Thread类提供的interrupt()方法,会将isInterrupted设置成true,这样在while (!Thread.currentThread().isInterrupted())
将会因为isInterrupted的值改变而终止循环
package com.zhunongyun.toalibaba.distributed.part1.threadstatus;
import java.util.concurrent.TimeUnit;
public class InterruptTest {
private static long i = 0;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println("i = " + i);
});
thread.start();
TimeUnit.SECONDS.sleep(1);
// 把线程的IsInterrupted设置成true
thread.interrupt();
}
}
interrupted()作用是测试当前线程是否被中断(检查中断标志),返回一个boolean并清除中断状态,第二次再调用时中断状态已经被清除,将返回一个false
package com.zhunongyun.toalibaba.distributed.part1.threadstatus;
public class InterruptedTest {
public static void main(String[] args) throws InterruptedException {
Thread.currentThread().interrupt();
System.out.println("第一次调用Thread.currentThread().interrupt():"
+ Thread.currentThread().isInterrupted());
System.out.println("第一次调用thread.interrupted():"
+ Thread.currentThread().interrupted());
System.out.println("第二次调用thread.interrupted():"
+ Thread.currentThread().interrupted());
}
}
抛出InterruptedException异常能让线程停止工作
package com.zhunongyun.toalibaba.distributed.part1.threadstatus;
public class InterruptedExceptionTest {
private static Long i = 0L;
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
while (true) {
if (i > 20000) {
throw new InterruptedException();
}
i++;
}
} catch (InterruptedException e) {
System.out.println("线程终止");
}
System.out.println("i = " + i);
});
thread.start();
}
}