最近在学习akka,在读了一下解析actor model的文章以及熟悉了一下官方文档的例子的后
我觉得需要一个项目来帮我进一步熟悉akka与scala编程,进过一番思索,我觉得akka可以用来
实现一个分布式爬虫框架。
设计思路
测试
这个设计思路只是我一点不成熟的想法,欢迎大家提出建议。
我觉得需要一个项目来帮我进一步熟悉akka与scala编程,进过一番思索,我觉得akka可以用来
实现一个分布式爬虫框架。
设计思路
1. 依赖的库,
http请求方面使用async-http-client,链接:https://github.com/AsyncHttpClient/async-http-client
分布式框架则是使用akka。
集中式存储系统使用kafka
缓存使用redis
2. 运行流程
1. 用户提交任务,就是提交一个jar包给manager。
2. manager接收到jar包后对其进行扫描,读取其中的配置文件以及爬虫的具体逻辑,爬虫的接口现在考虑仿照scrapy的来
3. manager根据jar包的爬虫逻辑类和配置创建相应的检查url的线程与执行任务的actor,并且将初始url推入kafka
4. 检查url的线程启动,不断的从kafka中读取url,然后封装后投递给actor(此处应该考虑actor的负载均衡问题,暂时打算用平均方法算法,以后考虑支持负载均衡的配置)
5. actor调用async-http-client异步请求url,同时注册回调时将response封装后投递给处理actor,同时将成功的url异步写入redis(防止爬取重复url)
扫描二维码关注公众号,回复:
1066607 查看本文章
6. 处理actor调用用户自定义的处理方法,将解析获得的url集合与redis中已爬取url取差集,然后将未爬取url推入kafka
核心逻辑测试
模拟了用户解析过程,直接生成url,同时kafka与redis也舍去,直接用单例的LinkedBlockingQueue替代
实现
package awm import scala.collection.JavaConverters._ import java.util.concurrent.{BlockingQueue, Future, LinkedBlockingQueue} import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props} import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient, Response} case class UrlMessage(url: String, id: Int) object Singleton { val queue: BlockingQueue[String] = new LinkedBlockingQueue[String]() val c = new AsyncHttpClient var count: Long = 0 } class CrawlerDemoActor extends Actor with ActorLogging{ override def receive = { case UrlMessage(url, id) => { val futureResponse: Future[Response] = Singleton.c.prepareGet(url).execute(new AsyncCompletionHandler[Response] { override def onCompleted(response: Response) = { Singleton.queue.addAll(List(url, url).asJava) Singleton.count += 1 log.info(s"${Singleton.count}, ${Thread.activeCount}") response } }) Singleton.queue.addAll(List(url, url).asJava) } } } object CrawlerDemoActor{ def props: Props = Props(new CrawlerDemoActor()) }
测试
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} import akka.actor.{Actor, ActorSystem, Props, ActorRef} import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe} import awm.{CrawlerDemoActor, UrlMessage} import util.control.Breaks._ import awm.Singleton._ class TestAsyncClientWithAkka(_system: ActorSystem) extends TestKit(_system) with Matchers with FlatSpecLike with BeforeAndAfterAll{ object N{var i = 0} def this() = this(ActorSystem("akka-demo")) override def afterAll(): Unit = { shutdown(system) } "A Demo" should "succesful" in { var c: List[ActorRef] = List.empty[ActorRef] for (x <- 1 to 40){ val actor = system.actorOf(CrawlerDemoActor.props) c = actor::c } queue.add("http://www.csdn.net/") queue.add("http://www.oschina.net/") val testProbe = TestProbe() var i = 0 while (true){ val url = queue.poll() breakable { if (url == null) break() if (i >= 40) i = 0 c(i) ! UrlMessage(url, N.i) N.i += 1 i += 1 } } } }
pom.xml
<dependencies> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.11</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.11</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.11</version> </dependency> <!-- https://mvnrepository.com/artifact/com.ning/async-http-client --> <dependency> <groupId>com.ning</groupId> <artifactId>async-http-client</artifactId> <version>1.9.40</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.11 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>2.5.6</version> </dependency> <!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.11 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-testkit_2.11</artifactId> <version>2.5.6</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.scalatest/scalatest_2.11 --> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.11</artifactId> <version>3.0.4</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/javax.xml.bind/jaxb-api --> <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>2.3.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
输出结果
[INFO] [11/05/2017 14:07:32.728] [New I/O worker #14] [akka://akka-demo/user/$H] 21823, 47 [INFO] [11/05/2017 14:07:32.730] [New I/O worker #5] [akka://akka-demo/user/$z] 21824, 47 [INFO] [11/05/2017 14:07:32.731] [New I/O worker #11] [akka://akka-demo/user/$k] 21825, 47 [INFO] [11/05/2017 14:07:32.734] [New I/O worker #13] [akka://akka-demo/user/$c] 21826, 47 [INFO] [11/05/2017 14:07:32.734] [New I/O worker #13] [akka://akka-demo/user/$n] 21827, 47 [INFO] [11/05/2017 14:07:32.736] [New I/O worker #8] [akka://akka-demo/user/$z] 21828, 47 [INFO] [11/05/2017 14:07:32.746] [New I/O worker #9] [akka://akka-demo/user/$n] 21829, 47 [INFO] [11/05/2017 14:07:32.746] [New I/O worker #6] [akka://akka-demo/user/$A] 21830, 47 [INFO] [11/05/2017 14:07:32.754] [New I/O worker #8] [akka://akka-demo/user/$K] 21831, 47 [INFO] [11/05/2017 14:07:32.754] [New I/O worker #5] [akka://akka-demo/user/$r] 21832, 47 [INFO] [11/05/2017 14:07:32.754] [New I/O worker #8] [akka://akka-demo/user/$o] 21833, 47 [INFO] [11/05/2017 14:07:32.754] [New I/O worker #14] [akka://akka-demo/user/$j] 21834, 47 [INFO] [11/05/2017 14:07:32.754] [New I/O worker #8] [akka://akka-demo/user/$o] 21835, 47 [INFO] [11/05/2017 14:07:32.754] [New I/O worker #10] [akka://akka-demo/user/$r] 21836, 47 [INFO] [11/05/2017 14:07:32.754] [New I/O worker #10] [akka://akka-demo/user/$N] 21837, 47 [INFO] [11/05/2017 14:07:32.754] [New I/O worker #16] [akka://akka-demo/user/$K] 21838, 47
这个设计思路只是我一点不成熟的想法,欢迎大家提出建议。