简介:
今天打算写一个关于Redis压测的小程序,想来反正也是花时间写,不如顺便研究一下Akka这个比较火的开源库。
代码是scala写得,建议大家尝试新技术!
hello word 小例子:
pom.xml
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>2.3.9</version> </dependency>
HelloActor.scala
/** * Created by zz on 2016/9/27. */ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props case object Person extends Per{ val name:String="job" val age:Int=18 } class RunRedisActor extends Actor { def receive = { case "hello" => {println("hello word")} case Person => println("ren") case _ => println("您是?") } } object Main extends App { val system = ActorSystem("TestRedisSystem") // 缺省的Actor构造函数 val runRedisActor = system.actorOf(Props[RunRedisActor], name = "helloactor") runRedisActor ! "hello" runRedisActor ! Person runRedisActor ! "喂" }
运行结果:
hello word
ren
您是?
Process finished with exit code -1
ren
您是?
Process finished with exit code -1
注意:akka2.4.X 版本会以后错误,原因是jdk1.7及以下版本不兼容。
报错信息 写道
Exception in thread "main" java.lang.UnsupportedClassVersionError: JVMCFRE003 主要版本错误;类=akka/actor/ActorSystem$,偏移量=6
at java.lang.ClassLoader.defineClassImpl(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:295)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:154)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:711)
at java.net.URLClassLoader.access$400(URLClassLoader.java:92)
at java.net.URLClassLoader$ClassFinder.run(URLClassLoader.java:1159)
at java.security.AccessController.doPrivileged(AccessController.java:314)
at java.net.URLClassLoader.findClass(URLClassLoader.java:594)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:743)
at java.lang.ClassLoader.loadClass(ClassLoader.java:711)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:325)
at java.lang.ClassLoader.loadClass(ClassLoader.java:690)
at com.test.lus.akka.Main$.delayedEndpoint$com$test$lus$akka$Main$1(HelloActor.scala:25)
at com.test.lus.akka.Main$delayedInit$body.apply(HelloActor.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.test.lus.akka.Main$.main(HelloActor.scala:24)
at com.test.lus.akka.Main.main(HelloActor.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:613)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
at java.lang.ClassLoader.defineClassImpl(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:295)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:154)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:711)
at java.net.URLClassLoader.access$400(URLClassLoader.java:92)
at java.net.URLClassLoader$ClassFinder.run(URLClassLoader.java:1159)
at java.security.AccessController.doPrivileged(AccessController.java:314)
at java.net.URLClassLoader.findClass(URLClassLoader.java:594)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:743)
at java.lang.ClassLoader.loadClass(ClassLoader.java:711)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:325)
at java.lang.ClassLoader.loadClass(ClassLoader.java:690)
at com.test.lus.akka.Main$.delayedEndpoint$com$test$lus$akka$Main$1(HelloActor.scala:25)
at com.test.lus.akka.Main$delayedInit$body.apply(HelloActor.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.test.lus.akka.Main$.main(HelloActor.scala:24)
at com.test.lus.akka.Main.main(HelloActor.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:613)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
-------------------------------------------分割线----------------------------------------------------
进阶:我们实现简单分布式,远程调用actor
pom.xml 中添加远程依赖:
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.11</artifactId> <version>2.3.9</version> </dependency>
application.conf akka默认使用com.typesafe读取配置文件
MyRemoteServerSideActor { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } } } MyRemoteClientSideActor { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } } }
RemoteActor
import akka.actor.{Actor} /** * Created by zz on 2016/9/27. */ class RemoteActor extends Actor{ def receive = { case "remote" => println("remote") sender() ! "ok" } }
AkkaServerApplication
import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigFactory /** * Created by zz on 2016/9/27. */ object AkkaServerApplication extends App { val system = ActorSystem("remote-system", ConfigFactory.load().getConfig("MyRemoteServerSideActor")) // 创建名称为remote-system的ActorSystem:从配置文件application.conf中获取该Actor的配置内容 val log = system.log log.info("Remote server actor started: " + system) system.actorOf(Props[RemoteActor], "remoteActor") // 创建一个名称为remoteActor的Actor,返回一个ActorRef,这里我们不需要使用这个返回值 }
ClientActor
import akka.actor.{Actor, ActorLogging} /** * Created by zz on 2016/9/27. */ class ClientActor extends Actor with ActorLogging { // akka.<protocol>://<actor system>@<hostname>:<port>/<actor path> val path = "akka.tcp://[email protected]:2552/user/remoteActor" // 远程Actor的路径,通过该路径能够获取到远程Actor的一个引用 val remoteServerRef = context.actorSelection(path) // 获取到远程Actor的一个引用,通过该引用可以向远程Actor发送消息 def receive = { case "remote" => remoteServerRef ! "remote" println("please go to request client ....") case "ok" =>println("server is ok") } }
AkkaClientApplication
import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigFactory /** * Created byzz on 2016/9/27. */ object AkkaClientApplication extends App { val system = ActorSystem("client-system", ConfigFactory.load().getConfig("MyRemoteClientSideActor")) // 通过配置文件application.conf配置创建ActorSystem系统 val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 获取到ClientActor的一个引用 clientActor ! "remote" }
AkkaServerApplication输出 写道
[INFO] [09/27/2016 17:01:52.580] [main] [Remoting] Starting remoting
[INFO] [09/27/2016 17:01:52.917] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [09/27/2016 17:01:52.919] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
[INFO] [09/27/2016 17:01:52.931] [main] [ActorSystem(remote-system)] Remote server actor started: akka://remote-system
remote
[INFO] [09/27/2016 17:01:52.917] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [09/27/2016 17:01:52.919] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
[INFO] [09/27/2016 17:01:52.931] [main] [ActorSystem(remote-system)] Remote server actor started: akka://remote-system
remote
AkkaClientApplication输出 写道
[INFO] [09/27/2016 17:02:00.080] [main] [Remoting] Starting remoting
[INFO] [09/27/2016 17:02:00.511] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [09/27/2016 17:02:00.513] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
please go to request client ....
server is ok
[INFO] [09/27/2016 17:02:00.511] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [09/27/2016 17:02:00.513] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
please go to request client ....
server is ok
-------------------------------------------------分割线-----------------------------------------------
再进阶:ping pong demo
PingPong.scala
import akka.actor.{Actor, ActorRef} /** * Created by zz on 2016/9/27. */ case object PingMessage case object PongMessage case object StartMessage case object StopMessage class Ping(pong: ActorRef) extends Actor { var count = 0 def incrementAndPrint { count += 1; println("ping") } def receive = { case StartMessage => incrementAndPrint pong ! PingMessage case PongMessage => if (count > 9) { sender ! StopMessage println("ping stopped") context.stop(self) } else { incrementAndPrint sender ! PingMessage } } } class Pong extends Actor { def receive = { case PingMessage => println(" pong") sender ! PongMessage case StopMessage => println("pong stopped") context.stop(self) context.system.shutdown() } }
PingPongApp
import akka.actor.{ActorSystem, Props} /** * Created by zz on 2016/9/27. */ object PingPongApp extends App { val system = ActorSystem("PingPongSystem") val pong = system.actorOf(Props[Pong], name = "pong") val ping = system.actorOf(Props(new Ping(pong)), name = "ping") // start them going ping ! StartMessage }
输出 写道
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping stopped
pong stopped
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping stopped
pong stopped
后记:例子比较简单,但是骨架基本如此,后续再做完善。。。