第一部分 GeoGeo脚本基础 第6章 分布式计算与进程控制

6章分布式计算与进程控制

分布式计算是GeoGeo设计的重要的动机之一。其它还有如GeoGeo 大数据集用于解决如何让用户用一个简单的数组管理成GT的分布式或集中存储的大量数据等。对于大数据集等内容有专门的章节讨论。

分布式计算的意图是显而易见的,当任务较大单独一台计算机很难完成时,自然会希望有一个由多个计算机组成的计算机群来协同解决这个任务,除非你有一台非常强大的超级计算机。为此人们设计了多种多机并行计算的方法和系统。

GeoGeo的分布式计算与进程控制也是为了实现这一目标而设计的。这些功能有GeoGeo自己独立的语言/脚本支持。

GeoGeo不是设计一个应用于某用途的多机超级计算系统,其根本目的是使仅经过简单培训的领域专业人员能够使用GeoGeo设计一个自己的超级计算系统。事实上只有领域专家才真正理解本领域超级并行计算任务的分解和调度,多年的实践也证明了这一点。

在网络上安装了GeoGeo的计算机直接可以使用其它也安装了GeoGeo的计算机作为节点机实现并行计算,不需要其它特别的配置,仅需要这些计算机能够建立有效的网络连接即可。

GeoGeo将节点机按mn列排列成2维阵列,以对应地理空间数据的二维分布特征。通过下述代码即可在m*n个节点机上启动线程/进程。

for( i=0; i<m; i=i+1){

for(j=0; j<n;j=j+1){

              func(i,j,…);

}

}

函数func声明为

process func(int i, int j, …){

       … //函数语句

}

函数的定义使用了一个process关键字,GeoGeo解释程序遇到该关键字定义的函数时,在节点机列表文件中查找已经安装GeoGeo服务的节点机。根据节点机的空闲状态,在适当的的节点机上启动执行该函数。如果同时启动多个该函数的运行实例或者多个不同的进程函数,就可以在多个节点机上运行这些进程函数。可见这些工作只要求函数是用process关键字定义的就可以。

看起来是不是非常简单,实际上这样设计的背后隐藏了非常复杂的控制过程。大体控制流程是,搜寻恰当的节点机、传送函数指令代码、同步变量、在节点机上执行指令代码等过程。

虽然程序写起来简单,但在哪个节点机上执行,节点机上是否需要启动更多的线程,节点机与主机如何同步,数据锁定机制等都可以由用户任意控制。GeoGeo提供了一些控制函数,对底层细节进行管理,使其用起来简单,实际控制能力并不简单。

在单机上虽然也可以使用process关键字通过Socket通讯启动另外的进程,但单机多核时使用多线程更为方便合理。将上述函数定义时的process关键字换成thread即可实现线程函数的启动,函数的代码将在另一个新启动的线程中运行。

6.1创建进程

多进程的目标是多机并行计算,将一个较大的任务分解成多个子任务,再将这些子任务分配到各个节点计算机上各自进行计算,然后收集各个计算机的计算结果合成最终的计算结论。

GeoGeo设计成将源代码以函数为单元在节点机上分配执行,而不是将一个函数的部分语句分配到各个节点机上。一个节点机可以同时处理一个或多个函数,从而完成更为复杂的任务。这样做可以对任务成块划分,做到什么任务分配到那个节点机上对用户透明,减少主机和节点机的通讯次数,从而提高执行效率。

上一章讲述的线程处理部分是在主进程的基础上另外开辟新的执行线程。而这里的多进程则不同,它是在本机或者其它节点机上单独开辟一个程序的运行实例,在这个运行实例上再开辟执行线程。这里要强调的是节点机上的进程中的执行单元依然是多个线程,而不是在节点机上每启动一个进程函数就增加一个进程。

6.1.1启动和结束多进程

启动多进程计算需要在每台节点机(Slave)上运行一个名为GGServer.exe的应用程序。

启动多进程需要为主机准备一张IP地址表,该表用文本格式存储,放在一个命名为IPAddress.ips的文件中。该表第一行为节点机的行数,第二行为节点机的列数,以后逐行存放每个节点机的IP地址。下面是一个IPAddress.ips的文件的例子。

2

2

192.168.3.1

192.168.3.2

192.168.3.3

192.168.3.4

表明节点机由22列共4台计算机组成,每台计算机的IP地址为192.168.3.1~4。节点机IP地址的条数要等于或大于行数和列数的乘积。

启动多进程时搜索IPAddress.ips文件,如果没有找到,会提示用户输入一个同样扩展名为ips的文本格式的文件。

    6.1.1.1 开始多进程

在开始执行多进程函数之前要先执行一个ProcessBegin()的函数。这个函数做一些启动多进程函数的准备工作。注意到上一章讲述多线程时不需要任何准备动作,只要声明thrtead关键字所有工作都自动完成。这里的ProcessBegin函数是比多线程麻烦的地方之一。

ProcessBegin函数的原型如下:

int ProcessBegin( [T var[, T var …]]);

函数的参数为数目不定的变量名表,列出需要向节点机同步的全局变量的变量名。如果这个表为空,则主机所有全局变量与节点机同步。如果不希望ProcessBegin函数向节点机进程同步任何全局变量,可以使用 NULL 描述符。

ProcessBegin(NULL);

此时无论主程序是否声明过全局变量,都不能在节点机上使用。

函数的返回值为一个节点机状态表,每一个元素指示对应的节点机状态。这个状态值大于0时表示节点机准备就绪,小于等于0时表示节点机由于连接等方面原因无法工作。

    6.1.1.2 结束多进程

结束多进程函数ProcessEnd通知所有节点机结束与该主机关联的所有线程,切断与该主机的网络连接。注意结束多进程的ProcessEnd函数不强行杀死节点机上的工作线程,而是等待这些线程自然死亡。尽管如此,主程序在启动节点机进程后,还是应该循环检测各节点机状态,确认各节点机上所有线程结束后,再执行ProcessEnd函数进行清理工作,否则可能会引起异常。

如果确实希望强行结束杀死工作线程,GeoGeo提供了这种函数,可以在ProcessEnd之前执行。强烈不建议使用这些杀死节点机上工作线程的函数,因为节点机上的执行的代码是不确定的,何时被杀死也是随机的,强制杀死线程可能会引起节点上的异常。节点机异常是很麻烦的事情,试想一大群节点机频繁异常那将会多么令人抓狂!

ProcessEnd函数通知结束线程后还会唤醒挂起的线程(如果有)和解锁锁定变量(如果有)。同时需要时会同步全局变量。ProcessEnd函数不结束节点机上的服务进程,因为某一个节点机可能同时服务于不止一个的主机,这一问题在本章晚些时候讨论。

结束多进程使用ProcessEnd()函数。函数原型如下:

int ProcessEnd( [T var[, T var …]]);

该函数的参数同样为需要同步的全局变量表,这个参数表为空时表示没有需要同步的变量,这与ProcessBegin函数是不同的。有专门用于同步主机与节点机间变量的函数,需要时可以随时进行同步。函数的返回值为一个大于0的整型值,可以忽略。

6.1.2进程函数

进程函数是能够在节点机上自动创建程序进程/线程的GeoGeo函数。当程序的控制调用进程函数时,根据主进程搜索的当前可用节点机,将执行代码传送到节点机,同步程序运行需要的变量,创建一个进程/线程实例,然后在该进程中再创建线程开始执行程序代码。这里虽然是创建了一个独立进程,但是具体的函数仍然是单独执行的线程。

新进程运行的同时,主机上原来的程序继续执行。与线程函数不同,这些新启动的进程必须由GeoGeo来确定在哪台节点机上运行,而不是线程中由操作系统来决定线程函数的分配。通常的原则是查找空闲的节点机,然后在其上启动进程。但是这不保证进程启动和分配的最优。GeoGeo提供了一个ProcessTo()函数来设置启动当前进程的节点机,可以使用户干预哪个进程函数在哪台节点机上运行。

在一台节点机上重复启动进程会造成较多的资源占用。这样的浪费主要表现在代码和一些需要的同步的资源等方面。因此,并不是每次启动都真正的启动一个独立的进程。在同一进程内同时启动较多线程时,这一问题要比进程的情况缓和的多。

GeoGeo启动进程函数的过程和调用普通函数一样(启动线程函数也是如此)。启动后马上返回该进程中线程的线程号以及该线程所在的节点机编号,存放在一个有3个元素的1维整型数组中,该数组的第12两个元素是运行该进程函数的节点机所在的行列,第3个元素是线程号。

一个进程中可以有多个线程,或者虽然1个进程函数中看起来只有一个线程(函数),但多次重复调用该进程函数并不是每次都启动一个进程,而是增加对应的线程,这样会有多个线程存在。所以启动进程函数返回的是线程号而不是给进程的编号。

事实上,进程函数的节点机不需要额外的ID或者编号,检索和查询使用主机对其进行的编号。如第6.1.1节的4个节点机分为22列,(00)表示第1行第1列,也就是所列节点机中的第1个。(10)表示第2行第1列,对应的是第3个节点机(192.168.3.3)。节点机的行列编号是以0起记的(Zero Based)。

例如:如果希望在第1行第2列的节点机上运行一个进程函数,可以在调用该函数之前写

ProcessTo(0,1);

则随后的线程函数在第1行第2列的节点机上启动。

进程函数在节点机上也可以同时创建多个线程,对主机的线程管理同样适用于对节点机的线程管理。这部分内容在下面的过程进程中有进一步的讨论。

下面是一个简单的调用进程函数的实例,这个函数在不同的节点机上发出音调和长短不同的滴声。

程序清单 6.1  6-1-并行例1.c

1 process func(int i){

2    int n = (i+1)*200;

3    Beep(n,n*10);

4 }

5 main(){

6    int i;

7    ProcessBegin();

8    for(i=0;i<4; i=i+1){

9        func(i);

10       Sleep(1000);

11   }

12   ProcessEnd();

13 }

代码的第1~4行是一个以Process开头的进程函数,是在不同的节点机上发出不同音调和长短的滴声的函数。调用函数时向下传递一个参数i,取值0~3,在第二行根据i值计算出一个频率值,第3行根据这个频率,并用这个频率值的10倍作为发声的长度的毫秒数,使用Beep函数发出一个滴声。

5~13行是主程序,第7ProcessBegin()函数指示开始进程函数调用,此时与各节点机建立连接。在第12ProcessEnd()函数结束进程函数并断开与节点机的连接。

在第9行循环调用进程函数4次。使用6.1.1ips文件,运行上述代码。正常情况下应该在每台节点机上产生一个不同音调和长短的滴声。如果有节点机异常等情况,也可能多个进程函数跑到同一台节点机上运行。

如果ips文件中包含有本机的IP地址,进程函数也同样可以在本机运行。通常调试GeoGeo多进程程序时可以在本机上进行,待运行无误后再部署到节点机上。否则多机联机调试加多线程也是一件比较麻烦的工作。

在第10行休眠1秒钟时间,使各节点机发声的时间错开,或者在同一个节点机上发声时有充足的时间使每段声音断开。

注意每启动一次进程函数都有1秒的休眠时间,这个时间足够使每个节点机上的进程完成自己的任务,因此随后的ProcessEnd()函数可以正常清理。实际上不检查节点机状态就直接清理是存在很大风险的,在编程实践中应该尽量避免这种情况。

6.2进程状态

用户面对的可能是一群节点机,每个节点机又有数量不等的线程,查询也是一件比较繁杂的工作。通常需要查询的状态包括节点机的连接状态、进程工作状态和线程工作状态。连接状态是查询一共有多少台节点机,有多少由于关机、网络或者服务异常等不能正常工作。进程工作状态是在这些能够工作的节点机中已经分配的任务的完成情况。线程工作状态是某个线程的任务完成情况

6.2.1查询节点机

节点机以2维空间排列,主要目的是为了更适合地理空间数据的二维分布特征,这里的排列并非指物理排列,而是节点机在主机观点看来的二维排列次序。如果不需要二维排列的节点机或者节点机数据不能正好分成mn列,建议使用1行多列排列节点机,而不要使用1列多行的排列方式。第6.1.1节中ips文件的第12行就是对应的节点机的行列。

函数ProcessNodes()获取节点机的数目。

int ProcessNodes(int nd[]);

函数的返回值是节点机的总数m*n,函数的参数是1个有2个元素的1维数组。函数返回时,第1个元素是节点机的行数,第2个元素返回节点机的列数。如:

int nd[2];

int num = ProcessNodes(nd);

nd[0]返回节点机的行,nd[1]中是节点机的列,num的值为nd[0]nd[1]的乘积。

6.2.2节点机连接布局

当存在多个节点机时可能会有个别节点机由于机器状态和网络等原因无法工作,或者在进程正在工作时被认为关闭等。因此需要时应该及时刷新节点机状态,以便主程序分配任务和收集结果。

使用ProcessMap()函数获取节点机的状态布局

int ProcessMap(int map[]);

函数的参数为一个2维数组,第1维的大小为节点机的行数,第2维大小为节点机的列数。数组的每一个元素对应一个节点机,函数结束时。参数的元素为1时表示节点机可用,小于等于0的值表示该节点机不可用。该函数成功时返回1

int map[m][n];

ProcessMap(map);

逐个检测map数组元的值,只使用其值为1的元素对应的节点机。

6.1.1ips文件中的节点机关闭2台,应用下述代码检测连接状态。

程序清单 6.2  6-2-并行例2.c

1 main(){

2    int i,j;

3    int nd[2];

4    ProcessBegin();

5    ProcessNodes(nd);

6    int map[nd[0]][nd[1]];

7    ProcessMap(map);

8    ProcessEnd();

9    Print("节点机行数%d,节点机列数%d",nd[0],nd[1]);

10   for(i=0; i<nd[0];i=i+1) {

11       for(j=0; j<nd[1]; j=j+1){

12            Print("节点机%d,%d 状态:%d",i,j,map[i][j]);

13       }

14   }

15   return;

16 }

运行上述代码,程序输出如下:

节点机行数 2,节点机列数2

节点机 00状态:1

节点机 01状态:1

节点机 10状态:0

节点机 11状态:0

代码第4行进程开始。第5行取得节点机个数,这个值是规划值,是在ips文件中指定的,返回值在nd中,第1和第2个元素均为2,共4个节点机。第7行取得节点机的状态,如输出结果所示,第1行两台计算机可用(返回值为1),第2行两台计算机不可用。

6.2.3进程的工作状态

上面说到进程工作状态是在这些能够工作的节点机中已经分配的任务的完成情况。状态值大于0表示忙,等于0表示空闲,小于0表示任务已经完成或者任务已经完成可以重置。节点机中只要还有没完成的线程在工作,就表示忙。

需要查询某个特定节点机的进程工作状态时可以使用ProcessNodeStatus函数:

int ProcessNodeStatus(int i, int j)

参数int iint j为待查询节点机所在的行列,返回值为节点机的状态。

6.2.4进程中线程的状态

已经在5.3.1节讨论了使用ThreadStatus函数查询本机的线程工作状态,节点机的某特定线程状态用函数查询,其余同5.3.1节。

int ProcessThreadStatus(int i, int j, int thrNo);

函数的第12个参数是节点机的行列,第三个参数是线程号,每个进程在自己的节点机上排列自己的线程号,而不是所有节点机上所有线程统一编号,这三个参数是启动进程函数的时的返回值。

通常,主程序在启动进程函数以后要检查节点机的工作状态,在确认节点机上的所有线程运行结束后再关闭进程。

6.2.5进程状态查询示例

应用224个节点机(6.11节的节点机地址文件),关闭第2行的两台计算机。仅留2台能够正常工作的计算机,每台机器上启动一个进程函数的4个运行实例。进程函数中放一些仅为了耗时计算用的无意义代码。程序代码如下:

程序清单 6.3  6-3-并行例3.c

double gX; //全局变量

main(){

     gX = 3.14;

     int nd[2];

     int numProc = 4;

     int i,j,k,susp;

     int thrs;

     int st[3]=-1;

     ProcessBegin();//开始多进程

    ProcessNodes(nd);//查询节点机数

     Print("节点机行数%d节点机列数%d",nd[0],nd[1]);

 

     int map[nd[0]][nd[1]];

    ProcessMap(map);//查询节点机连接状态(未连接节点机有等时)

 

     //逐行逐列启动各节点机上的线程,每个节点机上启动个线程

     for(i=0;i<nd[0]; i=i+1) {

         for(j=0; j<nd[1]; j=j+1){

              Print("节点机第%d行、第%d 列的连接状态为%d",i+1,j+1,map[i][j]);

              if(map[i][j] == 1){

                   for(k=0; k<numProc; k=k+1){

                       ProcessTo(i,j);//定向到指定的节点机

                       st = func();//启动进程函数,这里返回节点机行列号和线程号

                       Print("节点机第%d行、第%d 列的线程%d 启动!",i+1,j+1,st[2]);

                       Sleep(500);

                       thrs = ProcessThreadStatus( st[0],st[1],st[2]);

Print("节点机第%d行、第%d 列线程%d 状态:%d",st[0]+1,st[1]+1, st[2],thrs);

                   }

              }

              else {

                   Print("节点机第%d行、第%d 列不可用!",i+1,j+1);

              }

         }

     }

 

     //查询进程结束的代码

     int running = 1;

     int proSta[nd[0]*nd[1]];

     while(running) {//只要还有进程在工作,就循环等待

         CheckProcStatus(nd[0],nd[1],proSta,map);//取来所有节点机工作状态

         running = 0;//假设所有节点机都已经完成任务了

         for(i=0; i<nd[0]*nd[1]; i=i+1){

              if(proSta[i] == 1){

                   running = 1;//其实还有节点机在忙

                   Print("忙!!!");

                   break;

              }

         }

         Sleep(500);//休眠等时

     }

 

     Print("全部进程结束!");

     ProcessEnd(gX);

     Print("计算结果gX = %f",gX);

     return;

}

////////////////////////////////////////////////////////////////

//   查询节点机工作状态的函数结果在数组proSta中。

////////////////////////////////////////////////////////////////

int CheckProcStatus(int h,int w,int proSta,int map){

     int i,j;

     for(i=0;i<h; i=i+1){

         for(j=0; j<w; j=j+1){

              if(map[i][j] == 1){

                   proSta[i*w+j] = ProcessNodeStatus(i,j);

              }

              else{

                   proSta[i*w+j] = -1;

              }

         }

     }

}

 

//////////////////////////////////////////////////////////////

//   进程函数

//////////////////////////////////////////////////////////////

process func(){

     //无意义耗时

     return ;

}

运行上述代码,输出如下:

节点机行数 2节点机列数 2

节点机第1行、第1列的连接状态为1

节点机第1行、第1列的线程1启动!

节点机第1行、第1列线程1状态:1

节点机第1行、第1列的线程4启动!

节点机第1行、第1列线程4状态:1

节点机第1行、第2列的连接状态为1

节点机第1行、第2列的线程1启动!

节点机第1行、第2列线程1状态:1

节点机第1行、第2列的线程4启动!

节点机第1行、第2列线程4状态:1

节点机第2行、第1列的连接状态为0

节点机第2行、第1列不可用!

节点机第2行、第2列的连接状态为0

节点机第2行、第2列不可用!

忙!!!

忙!!!

全部进程结束!

计算结果 gX = 97.211712

注意在查询进程结束的部分代码中有一行

Sleep(500);//休眠等时

这一行是非常必要的。如果去掉这一行,这部分等待进程结束的代码将占据本线程的所有CPU时间。线程释放的CPU时间和循环内代码执行时间的长短及休眠时长有关。通常休眠时间越短,循环占用的CPU时间就越长,释放的时间就越少。

6.3进程中的变量同步

进程函数仅返回进程中线程号,函数的参数也不能以引用的方式从节点机回传。变量同步全部通过共享全局变量来实现。

GeoGeo的主要设计思想是以消息传递模型为主,对于规模较小的数据集以消息驱动方式在主机和节点机之间传递,并进行必要的访问同步控制。大规模数据集则使用内存共享混合模型,将分布式存储或者集中存储的数据同时映射到主机和节点机,实现模拟的内存共享。这部分内容在后面讨论。

6.3.1进程函数的全局变量

多机多进程的共有变量与多核多线程不同。在多核多线程情况下,所有线程真正从物理上共享内存,同时使用一台计算机的存储空间。在这类以通用网络连接的多机系统则无法实现物理内存的共享,除非专门设计专用的硬件设备或者用软件办法进行模拟。

GeoGeo的多机并行计算的全局变量以消息传递方式进行。多机并行开始时,将需要的全局变量从主机复制一份副本到节点机,有更新时进行必要的同步刷新。这是这一节着重要讨论的问题。

细心的读者可能会注意到,GeoGeo宣称处理大数据集,对于成GT的大数据集进行这样的频繁的网络传输是不可能实现的。的确如此,GeoGeo的消息传递方式仅适合规模较小的数据集的同步。对于规模庞大的数据集,单机情况时GeoGeo也将其存储在外部存储区,然后将局部映射到内存空间。多机情况下,其外部存储区可为分布式或集中存储的共享存储空间,每台节点机像单机时一样将其映射到各自的内存空间,刷新与外部存储对应的内存映射空间,就能实现各节点机的变量同步。这一问题后面有专门的章节讨论,本节后面讨论的仅为消息传递的同步方式。

前面讨论ProcessBegin函数时曾经提及全局变量的同步问题,ProcessBegin函数不指定参数时,主程序全部全局变量在节点机进程函数中可见,ProcessBegin函数指定全局变量为函数参数时,仅指定的全局变量在进程函数中可见,其余全局变量不可见。

double gX,gY;

main(){

ProcessBegin(); // 全部全局变量同步到节点机

Func();

ProcessEnd(); //没有全局变量到主机的同步操作

}

process Func(){

… //gX,gY在这里均可以使用

}

如果将这一行ProcessBegin();改为ProcessBegin(gX);,则只有gXFunc函数中可见。如果不希望在进程函数中使用任何全局变量可以使用ProcessBegin(NULL);,这时所有全局变量在进程函数中均不可见。

ProcessEnd();一行除了结束所有进程之外,还可以指定将哪些全局变量与主进程同步。如主程序需要使用进程函数计算得到的计算结果,可将该全局变量作为该函数的参数。如:ProcessEnd(gX);。这个函数与ProcessBegin();函数不同,ProcessBegin不指定时将所有全局变量同步,ProcessEnd不指定时没有变量同步,只同步指定的全局变量。

下面通过一个例子说明使用全局变量获得进程函数的计算结果。

程序清单 6.4  6-4-并行例4.c

double PI,dbVar;

process func(){

     dbVar = dbVar*dbVar*PI;

     return;

}

main(){

     PI = 3.14;

     dbVar = 3.0;

     int thr[3];

     ProcessBegin();

     thr = func();

     while(ProcessNodeStatus(thr[0],thr[1]) > 0){

         Sleep(100);

     }

     ProcessEnd(dbVar);

     Print("面积= %f",dbVar);

     return;

}

这几行代码在一个节点机(通常是第1行第1列)上计算一个圆面积。运行后的输出结果为:

面积 = 28.260000

用几行简单的代码即可实现多机并行环境的任务调度分配。当然实际应用中不会将这样一个简单的计算面积的问题提交到多机并行环境进行处理。

除了使用全局变量,也可以使用函数的参数进行变量传递,将上述代码改造一下:

程序清单 6.5  6-5并行例5.c

double dbVar;

process func(double PI){

}

main(){

     dbVar = 3.0;

     int thr[3];

     ProcessBegin();

     thr = func(3.14);

}

3.14作为参数传给进程函数,会得到同样的计算结果。

6.3.2进程函数中的变量同步

    6.3.2.1 Flush函数

ProcessBeginProcessEnd函数可以实现主机和节点机之间全局变量的部分同步工作,但这两个函数是面向所有节点机的,在主机端完成同步。ProcessBegin函数在开始时向所有Slave机同步全局变量,在ProcessEnd函数所有Slave向主机刷新全局变量。

所有Slave向主机刷新全局变量时有一个问题。如果全局变量在所有节点机的值相同,则没有必要每个Slave机逐一刷新。如果全局变量在不同节点机有不同的值,这样刷新有仅保留了最后一个完成同步的值。这个过程实际上是黑瞎子掰苞米,是说黑瞎子(熊)右手掰来一个苞米放在左腋下夹住,再伸左胳膊掰下一个苞米夹在右腋下,如此反复,最后只剩下1个苞米,也就是最后掰的那个苞米。

函数Flush在进程函数在执行过程中进行变量同步,写在进程函数中。可以实现在任意的节点机和任意的线程实现与Master的变量同步。

Flush函数将指定的全局变量刷新到主机,也就是用节点机上的全局变量的值刷新主机的全局变量的值。该函数变量名表中的变量名一定要是Master机中定义过的全局变量,或者是全局变量的子变量,并且一定要是通过ProcessBegin函数或者FlushIn函数同步到该节点机上的变量。

int Flush (变量名表);

下面是一个使用Flush函数的简单例子:

程序清单 6.6  6-6-并行例6.c

double dbX = 3.14;

process func(){

    dbX = dbX*2.0;

    Sleep(1);

    Flush(dbX);

}

main(){

    int i;

    ProcessBegin();

    func();

    ProcessEnd();

    Print("%f",dbX);

}

运行输出:

dbX = 6.280000

    6.3.2.2节点机号与线程号

进程函数在执行过程中需要知道自己是谁,才能正确地将自己的数据同步到主机上。前面讲过节点机在主机上按行按列编号,这样主机很容易找到需要的节点机。节点机如果希望知道自己在主机中的排列位置,可以使用GetNode函数。

int GetNode(int nd[]);

该函数在进程函数中使用,参数是1个有2个元素的1维数组。函数返回时,在第1个元素中返回本机(节点机)所在的列,第2个元素是所在的行。函数的返回值返回1个大于0的整型数。

线程号在每个节点机上独立编号。程序运行都是在线程上执行的,其实用户对线程执行在哪个节点机上并不是很关心,关心的主要是线程执行的程序。这就像单机情况时,多核的系统执行着许多线程,大多数情况下领域应用人员并不在意哪个线程运行在哪个CPU核心上。因此,理想情况下应该是线程打破节点机限制统一编号,这样用户使用起来会更加方便。但是多机和多核心系统不同,单机不会因为某个CUP核心挂掉而影响线程的编号。多机情况就不同了,经常会因为网络、宕机以及人为关闭等因素使多机并行运算环境发生改变。这样线程号就会发生编号的动态变化。因此GeoGeo还是沿用了单机的线程编号方式。

下面是一个线程识别所在节点机输出的例子:

程序清单 6.7    6-7-并行例7.c

double gX = 0.0;

process func(){

     int node[2];

     GetNode(node);

     if(node[0] == 0 && node[1] == 0){

         gX = 3.14;

     }

     else{

         gX = 6.28;

     }

Flush(gX);

     return ;

}

main(){

     ProcessBegin();

     func(); //缺省状况启动第1行第1列的节点机(编号0,0)

     func(); //在下一个节点机上启动

     ProcessEnd();

     Print(" gX = %f ",gX);

     return;

}

这段代码中,主程序main在开始进程后,在两台Slave机上各自启动1个进程函数func。进程函数用GetNode函数判断自己运行在哪个Slave上(还可以用GetThread()函数判断自己是该节点机上的哪个线程),如果在第1行第1列的节点机上,就把gX赋值3.14,否则gX被赋值6.28。然后通过Flush函数将gX值同步到主机。

运行结束后显示结果

gX = 6.28

说明第1个进程函数结束后,将主机的gX同步为3.14,第2个进程结束后,将主机的gX3.14覆盖为6.28。需要注意的是,上例并非总是输出6.28,这取决于两个进程哪个结束的晚,谁结束的晚最后就是谁的值。

如添加下述代码后,输出就变为3.14。因为在第1个节点机上增加了运算代码,使其结束的比第二个线程要晚。

   if(node[0] == 0 && node[1] == 0){

      gX = 3.14;

     int l;

     double x;

     for(l=0; l<100000;l=l+1){

        x = gX*gX;

    

   }

或者写成

   if(node[0] == 0 && node[1] == 0)

   {

      gX = 3.14;

    Flush(gX);

   }

可以由代码控制全局变量的同步结果。此时程序输出第1个节点机上的gX值。此外,通过GeoGeo的子变量(Sub Variable)技术可以更好的控制变量同步,在下面一节要进一步讨论。

    6.3.2.3用户程序编号

还有一种方法对线程编号,就是通过用户代码给每个线程进行统一编号。这个编号通过进程函数的参数传递到每个线程中,如:

main(){

       int n=10;

       int i;

       for(i=0;i<n; i=i+1){

              ProcessBegin();

              func(i);

              ProcessEnd();

       }

}

process func(int i){

       if( i == 0){

              ...

       }

       ...

}

这样每个线程都知道自己在所有线程中的编号,通过这些编号可以进一步确定自己的任务。

注意到代码中没有涉及节点机的任何信息。实际上节点机在多机进程启动时(ProcessBegin函数)都已经处于活跃状态,执行到func(i)函数时GeoGeo顺序向节点机摊派任务。这与操作系统向不同的CPU核心指派任务类似,不同的是操作系统自动平衡不同核心的负载,并且每个核心能力基本一样,而这里向不同机器指派任务时,GeoGeo无法知道Slave除了本任务还在做什么,各个Slave机的能力状态也不一样,所以只能顺序摊派。GeoGeo提供了一个ProcessTo函数将马上就要启动的进程函数加载到指定的节点机上。

int ProcessTo(int i, int j)

函数的参数是节点机的行列,函数成功时返回1个大于0的值,失败时返回0

下面通过一个计算数组平均值的例子,进一步介绍一下进程函数的稍完整一些的应用。

例子生成一个2000*1999个随机数的二维数组,以行为粒度分成8组,分配在2个节点机上对数据求和,主程序收集各线程代码计算结果,总加在一起再除以数据样本数获得平均值。

程序清单 6.8  6-8-并行例-计算均值.c

1 int iRows = 1999;

2 int iCols = 2000;

3 int iNumThreads = 8;

4 double data[iRows][iCols];

5 double sum[iNumThreads];

6 process ProcFunc1(int thr,int startRow,int startCol,int height,int width){

7    int i,j;

8    for( i=startRow; i< startRow+height; i=i+1){

9        for( j=startCol; j< startCol+width; j=j+1){

10            sum[thr] = sum[thr]+data[i][j];

11       }

12   }

13  Flush(sum[thr]);

14   return ;

15 }

16 main(){

17   double mean = 0;

18   Rand(0.0,1.0,data);

19   int hi = iRows/iNumThreads + (iRows%iNumThreads > 0);

20   int i,lines;

21   sum = 0.0;

22   int st[3];

23   ProcessBegin();

24   for(i=0; i<iNumThreads; i=i+1){

25       lines = hi;

26       if( iNumThreads -1 == i && (iRows%iNumThreads > 0)){

27            lines = iRows%hi;

28       }

29       st = ProcFunc1(i,i*hi,0,lines,iCols);

30       Print("节点机第%d行、第%d 列的线程%d 启动,分配处理行数%d 行。", st[0]+1,st[1]+1,st[2],lines);

31   }

32   ProcessEnd();

33   Print("全部线程结束!");

34   for( i=0; i<iNumThreads; i=i+1){

35       mean = mean+sum[i];

36       Print(" sum[%d] = %f ",i,sum[i]);

37   }

38   mean = mean/iRows/iCols;

39   Print(" 平均值= %f ",mean);

40   return;

41 }

图 6.1 程序清单6.7输出结果

主程序开始后,在第18行生成2000*1999个取值在0~1之间的双精度浮点数据。

18  Rand(0.0,1.0,data);

数据存放在data数组中。第19

19  int hi = iRows/iNumThreads + (iRows%iNumThreads > 0);

计算1999行数据分在8个线程中除最后一个线程外每个线程处理的行数。在第29行循环8次,交替在2个节点机上启动8个线程。其中在第26~28行计算最后一个线程的处理数据的行数,如果与其它线程不同的话。

26      if( iNumThreads -1 == i && (iRows%iNumThreads > 0)){

27          lines = iRows%hi;

28      }

然后主程序结束等待回收结果。

实际上在启动了所有线程之后,随之在第32行执行ProcessEnd函数,此时主线程处于一种休眠和间歇检测状态,一直等到所有子线程结束才真正结束进程并断开与节点机的连接。如果此时需要干预各子线程的运行,可以在执行ProcessEnd函数之前通过检测子线程的状态来实现。

6~15行为进程函数,线程的参数中,第1个是循环启动线程的序号,这里作为对线程函数的编号。第23个参数是本线程待处理数据的起始行列,第45个参数是待处理数据的行数和列数。循环总加数据之和后,在第13行将计算结果同步到主程序中

13  FlushOut(sum[thr]);

注意这里使用了自己的线程号作为结果数组的数组元素。对sum数组的部分(数组的一部分或者单个元素)更新。这是使用了GeoGeo的子变量技术,后面有进一步的讨论。

所有进程结束后,主程序在第34~37行汇总计算出最后的结果。

上述程序有一些缺点,首先是需要同步全部数据集本身(data数组)。实际上每个线程处理的数据只有整个数据集的一部分,几十M的数据在主机和节点机间频繁交换会增大网络负载,尤其是节点机较多时。解决办法是可以在主机端预先对数据切分,或者使用GeoGeo提供的创建Dummy变量的技术,下一节随即对此进行讨论。还有就是计算速度很慢,这在前面已经注意到,循环对数组元素存取需要大量的检索时间,这1是解释程序本身就慢,2GeoGeo还没有进行优化。建议使用GeoGeo内部统计函数完成类似这样的工作。这里仅作演示之用。

    6.3.2.2 子变量同步

进程函数在进行变量同步时可以对该变量的子变量同步,可以简化计算过程的管理。上一节的第13行就使用了子变量同步技术。下面再用一个例子说明Dummy变量与子变量同步。

(暂无内容)。

6.3.3Master机同步变量

Master机端全局变量数据变更时,可以使用FlushVar函数对Slave机进行变量同步。

int FushVar(T varname, …);

FlushVar函数的应用例子见程序清单9-41

6.4过程进程

同前述过程线程一样,在一个文件中的独立执行单元称之为一个过程。一个过程可以作为一个单独的进程在节点机上启动,这个被启动的过程相对与启动进程就是一个独立的进程,称之为过程进程。

6.4.1启动过程线程

启动过程线程使用ProcessCall函数,函数原型如下:

int ProcessCall( STRING pathName);

函数的参数为过程代码文件的路径名。。

下面的示例代码保存在一个名为“6-9-系统时间.c”的文件中。

程序清单 6.9  6-9-系统时间.c

main(){

     func();

}

thread func(){

     int year,month,day,hour,minute,second;

     GetCurrentTime(year,month,day,hour,minute,second);

     STRING string;

     Format(string,"系统时间:%d年   %d月   %d日   %d时   %d分   %d秒",year,month,day,hour,minute,second);

     MessageBox(string);

}

这段代码运行时弹出一个对话框,里面显示系统时间。主程序调用一个线程函数,在线程中弹出一个对话框提示系统时间。单独运行输出为如下图

再看下面的示例代码:

程序清单 6.10  6-10-过程进程.c

main(){

     STRING cmd = "D:\\系统时间.c";

     int i,nThread[5];

     ProcessBegin();

     nThread[i] = ProcessCall(cmd);

     ProcessEnd();

}

将程序清单6.8的代码作为本段程序的过程进程,使用ProcessCall函数在节点机上运行。会在节点机上得到同样的输出结果。

可见,在过程进程中仍然可以启动线程函数,但是不要在过程进程中启动进程函数或者启动新的过程进程。

5.4.2过程进程的阻塞与非阻塞方式

过程进程运行的阻塞和非阻塞方式与过程线程基本相同。

6.5节点机的多主机服务

(暂无内容)

6.6系统进程

系统进程这里指启动系统的独立进程,也就是系统可执行文件的启动。系统进程使用Exec函数启动,函数原型:

int Exec(STRING commd int flag);

函数的第1个参数是命令行参数,第2个参数是一个状态指示,如果指定为1,程序等待子进程结束后返回。0时启动进程后直接返回,不等待。缺省时为0。进程启动成功时返回1,失败时返回0

下面是一个启动记事本的例子:

程序清单 6.11  6-11-外部进程.c

main(){

     STRING cmd = "C:\\windows\\system32\\notepad.exe D:\\文本数据.txt";

     int ret = Exec(cmd,1);

     MessageBox("主进程挂起等待子进程结束后继续执行!");

     ret = Exec(cmd);

     MessageBox("主进程启动子进程后直接继续执行!");

     return 1;

}

6.7本章小结

1. 多机并行计算可以在多台计算机上完成较大的计算任务。这些计算机有一台称之为主机(Master),也就是运行GeoGeo解释程序的计算机。其它计算机称之为节点机(Slave),在Slave机上运行GGServer应用程序。这些计算机具有有效的网络连接,能够实现基本的SOCKET通信。

2. Slave机逻辑上按行列排列形成Slave机阵列,以适应地学数据的空间分布特征。每台Slave机由所在阵列的行列位置标识。

3. 在每台Slave机上启动一个GeoGeo解释程序(由GGServer完成)称之为一个GeoGeo进程。GeoGeo进程是独立与Master机的GeoGeo主进程的独立程序运行单元。GeoGeo进程可以在Slave端启动更多的子线程,也可以由Master机在Slave上启动子线程。这些子线程的运行管理与前述所讨论的多线程相同。

4. GeoGeo进程有两种启动方式,即函数进程和过程进程。函数进程使用由process关键字定义的函数,GeeoGeo解释程序遇到这类函数就将其分配到Slave机上继续解释执行。过程进程使用ProcessCall函数将一个过程分配到Slave机上解释执行。

5. 不同于多线程,多进程开始和结束需要使用ProcessBeginProcessEnd函数,这两个函数同时承担全局变量同步的任务。

6. 函数进程可以通过函数参数向Slave机传送数据,或者可以通过全局变量实现Master-Slave间数据同步。过程进程只使用全局变量进行数据同步。同时可以在Master端使用FlushSlave函数、Slave端使用Flush函数更新全局变量。

7. 多进程不能像多线程一样由操作系统将各线程平衡分配到各个CPU内核上。GeoGeo只是轮流在各个Slave机上启动各进程,除非用户使用ProcessTo函数将指定的进程赋给指定的Slave机。

下载地址:http://download.csdn.net/detail/gordon3000/7922555 

猜你喜欢

转载自blog.csdn.net/gordon3000/article/details/39544057