1、scala并发编程介绍
来自学习资料
Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala是运用消息(message)的发送、接收来实现多线程的。使用Scala能够更容易地实现多线程应用的开发。
与java的并发编程比较
对于Java,我们都知道它的多线程实现需要对共享资源(变量、对象等)使用synchronized 关键字进行代码块同步、对象锁互斥等等。而且,常常一大块的try…catch语句块中加上wait方法、notify方法、notifyAll方法是让人很头疼的。原因就在于Java中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。 而在Scala中,我们可以通过复制不可变状态的资源(即对象,Scala中一切都是对象,连函数、方法也是)的一个副本,再基于Actor的消息发送、接收机制进行并行编程
Actor方法执行顺序
1.首先调用start()方法启动Actor
2.调用start()方法后其act()方法会被执行
3.向Actor发送消息
发送消息的方式
! 发送异步消息,没有返回值
!? 发送同步消息,等待返回值
!! 发送异步消息,返回值是Future[Any]
2、案例
注:我这里用的是Scala Actor是scala 2.10.6版本,也是为了学akka做准备
Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,老版本的Actor已经废弃
1、案例1
首先回顾使用java来写个多线程的demo,然后使用scala来实现
java版本
public class JActorDemo {
public static void main(String[] args){
Thread t1 = new Thread(new JMyActor1());
Thread t2 = new Thread(new JMyActor2());
t1.start();
t2.start();
}
}
class JMyActor1 implements Runnable{
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
System.out.println("thread-1:" + i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class JMyActor2 implements Runnable{
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
System.out.println("thread-2:" + i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出
thread-1:1
thread-2:1
thread-2:2
thread-1:2
thread-1:3
thread-2:3
thread-2:4
thread-1:4
thread-1:5
thread-2:5
thread-2:6
thread-1:6
thread-1:7
thread-2:7
thread-2:8
thread-1:8
thread-2:9
thread-1:9
thread-1:10
thread-2:10
scala版本,对比java那么就算继承的类不同,重写的方法不同但是功能类似
/**
* 1.首先调用start()方法启动Actor
* 2.调用start()方法后其act()方法会被执行
* 3.向Actor发送消息
* Created by 12706 on 2017/11/27.
*/
class MyActor1 extends Actor{
//重写Actor的act方法
override def act(): Unit = {
for (i <- 1 to 10) {
println("thread-1:" + i)
Thread.sleep(500)
}
}
}
class MyActor2 extends Actor{
override def act(): Unit = {
for (i <- 1 to 10) {
println("thread-2:" + i)
Thread.sleep(500)
}
}
}
object ActorDemo1 {
def main(args: Array[String]): Unit = {
val ma1 = new MyActor1
ma1.start()
val ma2 = new MyActor2
ma2.start()
}
}
输出统java版本一致
这两个Actor是并行执行的,act()方法中的for循环执行完成后actor程序就退出了
2、案例2
写个actor,可以不停地接收消息
class MessageActor extends Actor{
override def act(): Unit = {
//在act()方法中加入了while (true) 循环,就可以不停的接收消息
while (true) {
//接收消息
receive {
case "start" => {
println("开始接收消息")
Thread.sleep(1000)
}
case "continue" => {
println("接收消息中")
Thread.sleep(1000)
}
case "stop" => {
println("停止接收消息")
Thread.sleep(1000)
}
//其他情况则退出否则会一直阻塞
case _ => exit()
}
}
}
}
object MessageActor {
def main(args: Array[String]): Unit = {
val ma = new MessageActor
ma.start()
//发送异步消息,发送异步消息(向actor发送start串),发送完了继续往下走,但是一个actor对象接收到消息执行的过程是同步的按顺序执行
//!,!?,!!三种发送方式
ma ! "start"
ma ! "continue"
ma ! "stop"
ma ! "break"
}
}
3、案例3
还是不停接收消息,但是改变了方式(不同于案例2)
使用了react而不是recieve,react方式会复用线程,比receive更高效。而react 如果要反复执行消息处理,react外层要用loop,不能用while。
/**
*使用了react而不是recieve,react方式会复用线程,比receive更高效。
* 而react 如果要反复执行消息处理,react外层要用loop,不能用while。
* Created by 12706 on 2017/11/27.
*/
class LoopMessageActor extends Actor{
override def act(): Unit = {
loop {
react {
case "start" => {
println("开始接收消息")
Thread.sleep(1000)
}
case "continue" => {
println("接收消息中")
Thread.sleep(1000)
}
case "stop" => {
println("停止接收消息")
Thread.sleep(1000)
}
//其他情况则退出否则会一直阻塞
case _ => exit()
}
}
}
}
object LoopMessageActor {
def main(args: Array[String]): Unit = {
val lma = new LoopMessageActor
lma.start()
lma ! "start"
lma ! "continue"
lma ! "stop"
lma ! "break"
}
}
4、结合case class发送消息
case class SynMessage(id : Int, name : String)
case class AsyMessage(id : Int, name : String)
case class ResultContainer(id : Int, name : String)
class ApplyActor extends Actor{
override def act(): Unit = {
loop {
react {
case SynMessage(id,name) => {
println(s"同步消息: $id,$name")
Thread.sleep(3000)
//返回结果给发送者
sender ! ResultContainer(id,name)
}
case AsyMessage(id, name) => {
println(s"异步消息: $id,$name")
Thread.sleep(3000)
}
}
}
}
}
object ApplyActor {
def main(args: Array[String]): Unit = {
val actor = new ApplyActor
actor.start()
//发送AsyMessage消息
actor ! AsyMessage(9527,"异步")
//发送SynMessage消息,接收返回结果Future
val result = actor !! SynMessage(7259,"同步")
//判断是否返回了结果,使用Future的isSet方法(java中则是isDone方法,返回true则表示拿到了结果)
println(result.isSet)
//Future的apply()方法会构建一个异步操作且在未来某一个时刻返回一个值
//Future的值不知道什么时候可用,所以需要一种机制来获取异步操作的结果,
// 一种是不停的查看Future的完成状态,另一个采用阻塞的方式,scala提供了第二种方式的支持
//到这进入阻塞,主程序不会往下走了,直到取到Future的值
val message = result.apply()
//再次判断
println(result.isSet)
println(message)
}
}
输出
false
异步消息: 9527,异步//到这停顿3秒
同步消息: 7259,同步//到这停顿3秒
true
ResultContainer(7259,同步)