分布式计算小程序

项目说明:

                  如何在网络中确定一个机子: ip + 端口号
                  ip : 表示网络中唯一标识
                  端口号: 计算机程序的唯一标识
                  协议: tcp /udp

项目难点:

在开始打算学习这个项目的时候,最开始的目的是学习Scala这门语言,我自身感觉这门语言比Python还要简单,但这也是自身感觉,首先去官网下载Scala配置文件(https://www.scala-lang.org/),安装之后就可以cmd命令行去执行代码,在做分布式计算项目中,在IDEA中配置Scala环境浪费啦大量时间,因为IDEA必须要与Scala插件版本相去对应才可以在IDEA打代码,还有就是导入maven中依赖信息配置不上,最后就是Scala的学习,总体项目划算简单,就是总是出现小毛病(小毛病才最烦人)

IDEA配置Scala插件:

    Scala是一个多范式的编程语言(支持多种方式的编程)

    ***   使用面向对象编程:封装、继承、多态<.li>

扫描二维码关注公众号,回复: 9556017 查看本文章

              ***   使用函数式编程:最大的特定(优点:代码非常简洁;缺点:可读性太差)

1、 首先要配置Java环境变量,设置好JDK

2、IDEA版本低:  按照如图所示配置插件

3、IDEA版本高:跟上图操作一样,去官网选择对应的Scala版本。

mavan依赖导入:

1、自动导包打开

2、把pom.xml中对应的依赖先删除,然后刷新右侧,之后再把依赖粘贴到pom.xml中,再次刷新右侧就好了并且从本地仓库将对应的包删除掉,然后让maven重新下载

3、不断刷新,或者退出从新进入

Akka并发编程框架

Akka介绍

​   Akka是一个用于构建高并发、分布式和可扩展的基于事件(消息)驱动的应用的工具包。Akka是使用scala开发的库,同时可以使用scala和Java语言来开发基于Akka的应用程序。

什么是分布式?

​ 分布式指的是将一个业务或者一个程序也或者说一个模块, 拆分成多个子业务, 子程序或者子模块的过程, 各个子业务, 子程序或者子模块分别运行在不同的节点或者进程中, 数据在不同的进程间进行传递,共同实现某一个目标的过程, 我们将这样的方式成为分布式

Akka的特性

- 提供基于异步非阻塞、高性能的事件(消息)驱动**编程模型**
- 内置容错机制,允许Actor在出错时进行恢复或者重置操作
- 超级轻量级的事件处理(每GB堆内存几百万Actor)
- 使用Akka可以在单机上构建高并发程序,也可以在网络中构建分布式程序。

Akka通信过程

Akka Actor的并发编程模型的基本流程:

1. 学生创建一个ActorSystem
2. 通过ActorSystem来创建一个ActorRef(老师的引用),并将消息发送给ActorRef
3. ActorRef将消息发送给Message Dispatcher(消息分发器)
4. Message Dispatcher将消息按照顺序保存到目标Actor的MailBox中
5. Message Dispatcher将MailBox放到一个线程中
6. MailBox按照顺序取出消息,最终将它递给TeacherActor接受的方法中

akka的通信地址

**Actor Path**

每一个Actor都有一个Path,这个路径可以被外部引用。路径的格式如下:

| Actor类型 | 路径 | 示例 |
| 本地Actor | akka://actorSystem名称/user/Actor名称 | akka://SimpleAkkaDemo/user/senderActor |
| 远程Actor | akka.tcp://my-sys@ip地址:port/**user**/Actor名称 | akka.tcp://192.168.10.17:5678/user/service-b |

 Scala学习笔记就不赘述啦,有java的基础,总的来说Scala是算入门啦!!!

代码:

package cn.itcast.akka

import akka.actor.Actor

import scala.io.Source

object WorkerActor extends Actor{
  override def receive: Receive = {
    // 当接收到setup消息后, 想master进行注册即可
    case "setup" => {
        println("接收到setup消息, 当前actor已经启动了,准备开始注册....")
        // 获取到master的地址:
        val masterActor = context.actorSelection("akka.tcp://[email protected]:8888/user/masterActor")

          // 发送注册消息
      masterActor ! "connect"
    }

    // 接收master发送过来的任务
    case filePaths:String =>{ // 类型匹配操作
      println("接收到任务:" + filePaths)
     
      val filePathArr: Array[String] =  filePaths.split(" ")
      // Source.fromFile(filePath).getLines().toList  :   根据传递进去的文件的绝对路径, 一行一行读取, 将每一行都
      //放置在一个List集合中
      /*
          allFileLineList放置的内容是所有的文件的每一行内容
            hadoop spark
            hadoop spark
            hadoop spark
            spark
            spark
            spark
       */
      val allFileLineList: Array[String] = filePathArr.flatMap(filePath => Source.fromFile(filePath).getLines().toList )

      // 对每一行的数据进行切割操作:
      /*
          allWords结果:
              hadoop
              spark
              hadoop
              spark
              hadoop
              spark
              spark
              spark
              spark
       */
      val allWords: Array[String] = allFileLineList.flatMap(line => line.split(" "))


      // 进行统计操作: 先分组, 将相同的单词放置在一起 然后个数

      val  allWordsgRroups: Map[String, Array[String]] = allWords.groupBy(item => item)
      val res: Map[String, Int] = allWordsgRroups.map(group =>(group._1,group._2.size))  // 得到一个局部的结果了
      println(res)

      sender() ! res

    }
  }
}
package cn.itcast.akka

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
 // worker端地址:  akka.tcp://[email protected]:7001/user/workerActor
object WorkerMain {
  def main(args: Array[String]): Unit = {

    //1. 创建actorSystem对象
    val actorSystem = ActorSystem("actorSystem",ConfigFactory.load())

    //2. 加载程序类, 启动
    val workerActor = actorSystem.actorOf(Props(WorkerActor),"workerActor")

    //3. 发送 setup 消息, 让其进行向master实现注册操作
    workerActor ! "setup"
  }

}
package cn.itcast.akka

import java.io.File

import akka.actor.{Actor, ActorRef}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object  MasterActor extends  Actor{

  // 定义一个存储worker端注册地址的列表
  val regAddressList =  ListBuffer[ActorRef]()  // 可变的list

  var taskPart  = mutable.Map[ActorRef,String]()  // 可变的map

  // 定义一个用来接收局部结果的操作
  val  aggResList = ListBuffer[Map[String,Int]]() // 可变的list

  override def receive: Receive = {
    // 接收所有worker的注册消息
    case "connect" => {

      // 将注册地址存储到regAddressList中, 方便后续进行回复消息
      regAddressList += sender()

      println("当前是第" + regAddressList.size +"个 注册成功了, 注册地址为:"+ sender().toString())

      //2. 当所有的worker全部都注册完成后, 进行文件的读取操作
      if(regAddressList.size ==3){
        // 表示所有的worker全部注册完成
        //2.1 读取文件夹中所有文件 : File对象
        val fileDir = new File("E:\\传智工作\\上课\\4天使用scala实现分布式计算程序\\scala分布式案例_day04\\资料\\data")
        //2.2 读取这个目录下所有的文件
        val allFileArr: Array[File] =  fileDir.listFiles()
        //2.3 将数组中每一个file对象 都转换为 文件绝对路径  file.getAbsolutePath
        val allFilePath: Array[String] =  allFileArr.map(file => file.getAbsolutePath )
        // array数组中存储类似于:  E:\传智工作\上课\4天使用scala实现分布式计算程序\scala分布式案例_day04\资料\data\1.txt
        //allFilePath.foreach(filePath => println(filePath))

        //3. 进行任务的分配工作 : 保证每个worker分配的任务都是平均  (基于轮询方案)

        var num = 0
        while(num < allFilePath.size){
          if(taskPart.isEmpty){ // 如果这个map中是空的, 直接分配即可
            for(regAddress <- regAddressList){ // 循环一个个worker, 给每一个worker分配任务
              if(num < allFilePath.size){
                taskPart = taskPart + (regAddress -> allFilePath(num)  )
              num = num+1

              }
            }

          }else{
            for(regAddress <- regAddressList){ // 循环一个个worker, 给每一个worker分配任务
              if(num < allFilePath.size){
                taskPart = taskPart + (regAddress -> (taskPart(regAddress) +" " + allFilePath(num) ) )
                num = num+1
              }
            }
          }

        }

        taskPart.foreach(item => println(item._1 +"分配任务为:" + item._2))

        //4. 任务的分发: 将分配好的任务, 发送给对应worker即可

        taskPart.foreach(item => {
          item._1 ! item._2
        })
      }

    }

    case aggRes:Map[String, Int] => {

      println(aggRes)
      //等待所有的局部结果全部返回后, 才应该聚合操作

      aggResList += aggRes


      if(aggResList.size ==3){
        // 扁平化处理:
        val aggRes: Seq[(String, Int)] =  aggResList.toList.flatten
        val aggResGroups: Map[String, Seq[(String, Int)]] =  aggRes.groupBy(item =>item._1 )

        val res = aggResGroups.map(item => {
          item._2.reduce((agg,curr) => (agg._1,agg._2+curr._2))
        }  )

        println(res)
      }
    }


  }
}
package cn.itcast.akka

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
// master的地址: akka.tcp://[email protected]:8888/user/masterActor
object MasterMain {

  def main(args: Array[String]): Unit = {
    //1. 创建actorSystem对象
    val actorSystem = ActorSystem("actorSystem",ConfigFactory.load())

    //2. 加载程序类, 启动 : 导包:  alt +回车 选择 import
    val masterActor =  actorSystem.actorOf(Props(MasterActor),"masterActor")


  }

}

 pom.xml配置信息

 <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution> 
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

猜你喜欢

转载自www.cnblogs.com/sunyuhai/p/12403586.html