前言
当前,并发编程很常见,而且通常比较困难、容易出错。Rust则希望简化并发编程,并消除各种并发问题。
最初,Rust团队认为消除并发问题和内存安全是两个不同的挑战,需要用不同方法的解决。但最终,团队发现所有权和类型系统可以同时解决这两大难题。利用所有权和类型检查,可以在编译期而不是运行期发现很多并发错误,Rust
称之为放心(safety)并发。
一、线程
多线程的问题
- 竞争,多个线程乱序访问数据
- 死锁
- 特定条件(时序等)下的bug,难于复现、定位
线程实现分类:
- OS线程,直接使用操作系统提供的线程
- 绿色线程(协程,用户态线程),协程与操作系统线程M:N的对应关系,M、N大小关系不定
M:N模型需要较大的运行时来管理线程,Rust标准采用了1:1模型,当然也有线程池相关的库(crate)采用其它模型。
示例:
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
spawn创建新线程,但是主线程没有join,因此可能提前结束。(这个性质是通用的,没特殊之处)。
线程间共享
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
}
上述代码会报错
error[E0373]: closure may outlive the current function, but it borrows
v
, which is owned by the current function
也就是说Rust编译器会自动检查线程间是否有数据共享,如果有则直接报错。变量v
的生命期不长于main线程,而spawn
出来的线程生命期可能长于main
函数的生命期。
严格讲,因为handle
被join
了,人工可以判别v
的使用没有问题,但编译器保险起见,还是报错了,仍然有局限性。
那么C++编译器或者所有编译器,严格讲也可以做到,但是都没有。
此外,v
从main线程转移到了spawn线程,但是每个线程都有自己的栈空间,那么main线程gg后,如何析构v
这个本地变量?因为v是一个vector
,会在堆上分配空间,所以v应该被浅拷贝到handle线程的栈空间了。
二、消息传递
1、试用
业界著名的安全并发原则:不要通过共享内存来通信,而是通过通信来共享内存。
Do not communicate by sharing memory; instead, share memory by communicating.
https://go.dev/doc/effective_go#concurrency
示例:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
可以看到,通道的概念、使用和go类似,这样做的优点是消除了data race。
2、通道与所有权转移
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val);
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
错误信息
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value:val
–> src/main.rs:10:31
|
8 | let val = String::from(“hi”);
| — move occurs becauseval
has typeString
, which does not implement theCopy
trait
9 | tx.send(val).unwrap();
| — value moved here
10 | println!(“val is {}”, val);
| ^^^ value borrowed here after move
For more information about this error, tryrustc --explain E0382
.
error: could not compilemessage-passing
due to previous error
可以发现,val
的所有权在通道中被转移了(从子线程到主线程)。因为没有复制存在,所以效率很高(浅拷贝,go中如何实现呢?)。
3、多值发送
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
通过for循环,发送方不断发送,接收方不停接收,直到发送方析构。
4、多生产者
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
// --snip--
}
tx
发送之前,调用clone
,就可以将多个发送方与单个接收方连接。
三、状态共享
很多语言中的通道类似单一所有权,而内存共享并发则类似多所有权。
1、互斥量(Mutex)来实现一次一线程
Mutex
是mutual exclusive(相互排斥、独占,可以类比电视剧、综艺的独家播出)。
使用难点、规则:
- 使用前必须获取锁
- 使用结束后必须释放锁
因为程序执行的复杂性,上面两个规则很可能被打破,从而导致资源泄露、死锁之类的问题,这也让很多人更倾向使用通道channel
。但是,Rust
的类型系统和所有权规则让程序员无法错误的操作锁获取、锁释放。相比较,C++
虽然有RAII的机制,但不是强制的,一旦忘记、错误使用,编译器并不会发现错误。
2、Mutex
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
互斥量mutex的创建方式特殊,如果不调用m.lock()
,是无法修改m
保护的数据。
出了作用域(大括号内的),直接释放锁(类似C++的析构函数,区别在于强制调用lock
才能修改,而C++不强制)。
3、多线程间共享Mutex<T>
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
编译报错,
| let counter = Mutex::new(0);
| ------- move occurs becausecounter
has typeMutex<i32>
, which does not implement theCopy
trait
…
| let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop
| let mut num = counter.lock().unwrap();
| ------- use occurs due to use in closure
因为Mutex
类型counter无法共享,只能被borrow。
个人看法:Rust的这种机制有点不理解,因为Mutex肯定用来共享,但使用时还需要加入其它机制,显得冗长了?
4、多线程和多所有权
let counter = Rc::new(Mutex::new(0));
Rc非并发安全,因此无法在多线程间move,也不行。
5、Arc<T>
原子引用计数
线程间安全共享
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0)); // Rc::new compile err
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
只有类似Arc::new(Mutex::new(0))
的变量才能在线程间共享,猜测是为了正确析构对象,不加Arc
,Mutex
也可以安全共享了(有lock在嘛),但何时析构呢?
四、Sync & Send特征(Trait)
1、通过Send
来允许所有权的转移
Send
标签特征(tag trait)表明可以在线程间安全的转移所有权,也就是可以跨线程移动。Rc<T>
没有实现该特征,因此无法在线程间安全的转移。
2、通过Sync
允许多线程访问
Sync
标签特征(tag trait)表明可以安全的在线程间传递不可变借用。
类似Send
,内置类型默认都实现了Sync
。
同样原因,Rc<T>
不是Sync
。
3、手工实现Send
和Sync
是不安全的
可以手工实现,但是可能需要unsafe rust,编译器不能保证安全,需要程序员自己保证。
以Arc为例
unsafe impl<T: ?sized + Sync + Send> Send for Arc<T> {}
unsafe impl<T: ?sized + Sync + Send> Sync for Arc<T> {}
如果Arc
的实现有问题,那编译器是无法检测的。