[ ABAP ] - 使用异步RFC实现并行处理

使用异步RFC实现并行处理

通过阅读本文,你将了解到以下知识点:
- 什么是异步RFC,即aRFC
- 一个最简单的例子
- 真实项目中的的示例代码
- 使用异步RFC的注意事项

1. 异步RFC的基本原理

在处理大数据量的工作时,ABAP提供了并行处理的机制。实现并行处理的常用方式有两种:background job的方式和asynchronous RFC (aRFC)的方式。

所谓并行处理,也即将要处理的目标数据,按一定的规则划分成多个独立的package,将每个package分发到不同的application server instance (AS instance)上处理。

其原理图如下:
AS instance示意图

2. 查看AS instance

使用SM51可以查看到所有的的AS instance,双击(Tx: SM50)可进入查看该instance上进程的使用情况。
这里写图片描述
这里写图片描述
对于AS instance,可以使用RZ12对其进行分组。在使用aRFC进行并行处理时,可以指定其在此定义的server group.
这里写图片描述
补充知识: SMLG是另一种给server instance分组的方式,这个分组用于LOGON.
这里写图片描述

3.一个最简单的示例代码

使用aRFC做并行处理,其关键字为
1) STARTING NEW TASK task_name, 这句话会开启一个新的dialog process
2) DESTINATION IN GROUP group_name, 这句话指定dialog process运行在哪个server group上,若使用DEFAULT,则会随机分配一个空闲的AS instance上
3) PERFORMING call_back ON END OF TASK, 异步进程结束后的回调函数
4) RECEIVE RESULTS FROM FUNCTION function_name, 用于接收异步进程处理返回的结果

下面这个例子中,会循环10次,开启异步进程查看系统的自身信息。

TYPES: BEGIN OF task_type,
         name TYPE string,
         dest TYPE string,
       END OF task_type.

DATA: snd_jobs  TYPE i,
      rcv_jobs  TYPE i,
      exc_flag  TYPE i,
      info      TYPE rfcsi,
      mess      TYPE c LENGTH 80,
      indx      TYPE c LENGTH 4,
      name      TYPE c LENGTH 8,
      task_list TYPE STANDARD TABLE OF task_type,
      task_wa   TYPE task_type.

DO 10 TIMES.
  indx = sy-index.
  CONCATENATE 'Task' indx INTO name.

  CALL FUNCTION 'RFC_SYSTEM_INFO'
    STARTING NEW TASK name
    DESTINATION IN GROUP DEFAULT  "' 390' "pRFC
    PERFORMING rfc_info ON END OF TASK
    EXCEPTIONS
      system_failure        = 1 MESSAGE mess
      communication_failure = 2 MESSAGE mess
      resource_failure      = 3.

  CASE sy-subrc.
    WHEN 0.
      snd_jobs = snd_jobs + 1.
    WHEN 1 OR 2.
      MESSAGE mess TYPE 'I'.
    WHEN 3.
      IF snd_jobs >= 1 AND
      exc_flag = 0.
        exc_flag = 1.
        WAIT UNTIL rcv_jobs >= snd_jobs
        UP TO 5 SECONDS.
      ENDIF.
      IF sy-subrc = 0.
        exc_flag = 0.
      ELSE.
        MESSAGE 'Resource failure' TYPE 'I'.
      ENDIF.
    WHEN OTHERS.
      MESSAGE 'Other error' TYPE 'I'.
  ENDCASE.
ENDDO.

WAIT UNTIL rcv_jobs >= snd_jobs.

LOOP AT task_list INTO task_wa.
  WRITE: / task_wa-name, ', Server:', task_wa-dest.
ENDLOOP.

FORM rfc_info USING name.
  task_wa-name = name.
  rcv_jobs = rcv_jobs + 1.
  RECEIVE RESULTS FROM FUNCTION 'RFC_SYSTEM_INFO'
  IMPORTING
    rfcsi_export = info
  EXCEPTIONS
    system_failure        = 1 MESSAGE mess
    communication_failure = 2 MESSAGE mess.

  IF sy-subrc = 0.
    task_wa-dest = info-rfcdest.
  ELSE.
    task_wa-dest = mess.
  ENDIF.
  APPEND task_wa TO task_list.
ENDFORM.

运行结果:
可以看出,异步运行的结果返回顺序,并不是完全和task的发送顺序一致的。

这里写图片描述

4.实战代码
在实际业务中使用aRFC做并行处理,可参考以下的设计思路,即首先要确定分包的大小,分好package后,调用aRFC并行处理。几个重要的参数 -
- gt_package 一般用于分包的存储,一般由两部分构成,即task_name和keys;
- gt_result, 用于结果的接收;
- gv_snd_jobs , 用于记录发送的job数量;
- gv_rcv_jobs, 用于记录已经接收的job数量;
- gv_pp_running, 用于记录正在运行的job数量,控制负载。

注意:运行下列代码,需要提前创建structure - zflight_s_key和zflight_t_key,以及函数ztest_flight_getdetail (源代码如下).

这里写图片描述
这里写图片描述

*&---------------------------------------------------------------------*
*& Report ZTEST_ASYNC_RFC
*&---------------------------------------------------------------------*
*&
*&---------------------------------------------------------------------*
REPORT ztest_async_rfc.

**global type, data
*TYPES: BEGIN OF gty_s_key_flight,
*         carrid TYPE sflight-carrid,
*         connid TYPE sflight-connid,
*         fldate TYPE sflight-fldate,
*       END OF gty_s_key_flight.
*TYPES: gty_t_key_flight TYPE TABLE OF gty_s_key_flight.
TYPES: BEGIN OF gty_s_flight_package,
         task_name TYPE char10,
         keys      TYPE zflight_t_key,
       END OF gty_s_flight_package.

DATA: gt_package    TYPE TABLE OF gty_s_flight_package,
      gt_results    TYPE TABLE OF sflight,
      gv_snd_jobs   TYPE i,
      gv_rcv_jobs   TYPE i,
      gv_pp_running TYPE i.

**selection screen
SELECTION-SCREEN BEGIN OF BLOCK svr_group WITH FRAME TITLE title.
PARAMETERS:
  p_group  TYPE rzllitab-classname DEFAULT 'parallel_generators' MODIF ID svr,
  p_parpro TYPE int1               DEFAULT 3 MODIF ID svr,
  p_size   TYPE i                  DEFAULT 5 MODIF ID svr. " normally 100, 1000"
SELECTION-SCREEN END OF BLOCK svr_group.


INITIALIZATION.
  title = 'Settings for Parallel Processing'.

START-OF-SELECTION.

"..build packages"
  PERFORM build_packages.

"..parallel processing"
  PERFORM parallel_processing.

"..display result"
  PERFORM display_result.

FORM build_packages.

  DATA: lv_cursor       TYPE cursor,
        lv_index        TYPE char4,
        lv_task_counter TYPE char8,
        lt_keys         TYPE zflight_t_key,
        ls_package      TYPE gty_s_flight_package.

  OPEN CURSOR WITH HOLD @lv_cursor FOR
    SELECT carrid, connid, fldate
    FROM sflight
    WHERE carrid = 'AA' OR carrid = 'AZ'.

  DO.

    lv_index = sy-index.
    CONCATENATE 'TASK' lv_index INTO lv_task_counter.

    FETCH NEXT CURSOR @lv_cursor
    INTO TABLE @lt_keys
    PACKAGE SIZE @p_size.
    IF sy-subrc <> 0.
      EXIT.
    ENDIF.

    ls_package = VALUE #( task_name = lv_task_counter
                          keys      = lt_keys ).

    APPEND ls_package TO gt_package.

  ENDDO.

ENDFORM.

FORM parallel_processing.

  DATA: lv_wp_total      TYPE i,
        lv_wp_free       TYPE i,
        lv_curr_wp_total TYPE i,
        lv_curr_wp_free  TYPE i,
        lv_allow_wp      TYPE i.

  IF p_group IS NOT INITIAL.
    " Get the list of application server available and those are free (no of WP & free WP)"
    CALL FUNCTION 'SPBT_INITIALIZE'
      EXPORTING
        group_name                     = p_group
      IMPORTING
        max_pbt_wps                    = lv_wp_total
        free_pbt_wps                   = lv_wp_free
      EXCEPTIONS
        invalid_group_name             = 1
        internal_error                 = 2
        pbt_env_already_initialized    = 3
        currently_no_resources_avail   = 4
        no_pbt_resources_found         = 5
        cant_init_different_pbt_groups = 6
        OTHERS                         = 7.
    IF sy-subrc <> 0 AND sy-subrc <> 3.
      MESSAGE 'No resources available.' TYPE 'E'.
      EXIT.
    ENDIF.
  ENDIF.

  LOOP AT gt_package INTO DATA(ls_package).
    " generating jobs"
    gv_snd_jobs = gv_snd_jobs + 1.
    gv_pp_running = gv_pp_running + 1.

    IF p_group IS NOT INITIAL.

      DO.
        " get the current available workprocess(time-dependent)"
        CALL FUNCTION 'SPBT_GET_CURR_RESOURCE_INFO'
          IMPORTING
            max_pbt_wps                 = lv_curr_wp_total
            free_pbt_wps                = lv_curr_wp_free
          EXCEPTIONS
            internal_error              = 1
            pbt_env_not_initialized_yet = 2
            OTHERS                      = 3.

        IF p_parpro > 0.
          lv_allow_wp = p_parpro.
        ELSE.
          "if maximum number of processes is not restricted, take only half of the free processes"
          lv_allow_wp = lv_curr_wp_free / 2.
        ENDIF.

        " if current free wp is available with more than 2 wp"
        " and the current running is less than allowed wp,"
        " then there is enough wp for work of parallelization"
        IF lv_curr_wp_free GT 2 AND gv_pp_running LT lv_allow_wp.
          CALL FUNCTION 'ZTEST_FLIGHT_GETDETAIL'
            STARTING NEW TASK ls_package-task_name
            DESTINATION IN GROUP p_group
            PERFORMING call_back ON END OF TASK
            EXPORTING
              it_keys               = ls_package-keys
            EXCEPTIONS
              communication_failure = 1
              system_failure        = 2
              resource_failure      = 3.
          IF sy-subrc <> 0.
            MESSAGE ID sy-msgid TYPE sy-msgty NUMBER sy-msgno
                    WITH sy-msgv1 sy-msgv2 sy-msgv3 sy-msgv4.
          ELSE.
            EXIT.
          ENDIF.

        ELSE. " otherwise, it needs to wait until the wp are available"
          WAIT UP TO 1 SECONDS.
          CONTINUE.
        ENDIF.
      ENDDO.

    ELSE.
      "if no server group has been selected, then parallel processing use the Default Group"
      CALL FUNCTION 'ZTEST_FLIGHT_GETDETAIL'
        STARTING NEW TASK ls_package-task_name
        DESTINATION IN GROUP DEFAULT
        PERFORMING call_back ON END OF TASK
        EXPORTING
          it_keys               = ls_package-keys
        EXCEPTIONS
          communication_failure = 1
          system_failure        = 2
          resource_failure      = 3.
      IF sy-subrc <> 0.
        MESSAGE ID sy-msgid TYPE sy-msgty NUMBER sy-msgno
                WITH sy-msgv1 sy-msgv2 sy-msgv3 sy-msgv4.
        EXIT.
      ENDIF.

    ENDIF.

  ENDLOOP.

  " wait, until all the jobs were received."
  WAIT UNTIL gv_rcv_jobs >= gv_snd_jobs.

ENDFORM.

FORM call_back USING p_task TYPE clike.

  DATA: lt_results TYPE STANDARD TABLE OF sflight.

  " reveived jobs"
  gv_rcv_jobs = gv_rcv_jobs + 1.

  " retrieve results from parallelization"
  RECEIVE RESULTS FROM FUNCTION 'ZTEST_FLIGHT_GETDETAIL'
    IMPORTING et_results = lt_results.
  IF sy-subrc = 0.
    " running jobs"
    gv_pp_running = gv_pp_running - 1.
  ENDIF.

  APPEND LINES OF lt_results TO gt_results.

ENDFORM.

FORM display_result.

  DATA: lo_alv TYPE REF TO cl_salv_table.

  CALL METHOD cl_salv_table=>factory
    IMPORTING
      r_salv_table = lo_alv
    CHANGING
      t_table      = gt_results.

  lo_alv->display( ).

ENDFORM.

这里调用的函数是标准示例函数的一个封装,其封装的内容如下 -

FUNCTION ztest_flight_getdetail.
*"----------------------------------------------------------------------
*"*"Local Interface:
*"  IMPORTING
*"     VALUE(IT_KEYS) TYPE  ZFLIGHT_T_KEY
*"  EXPORTING
*"     VALUE(ET_RESULTS) TYPE  SFLIGHT_TAB2
*"----------------------------------------------------------------------

  DATA:
    lt_sflight TYPE TABLE OF sflight.

  ASSERT NOT cl_rfc=>is_external_direct( ).

  LOOP AT it_keys INTO DATA(ls_key).

    CALL FUNCTION 'FLIGHT_DETAILS'
      EXPORTING
        carrid  = ls_key-carrid
        connid  = ls_key-connid
        fldate  = ls_key-fldate
      TABLES
        details = lt_sflight.

    APPEND LINES OF lt_sflight TO et_results.

  ENDLOOP.

ENDFUNCTION.

5.使用aRFC的注意事项

  • 循环的退出条件要清晰,不要写成“死循环”
  • 系统dialog process的分配原则,不要全部占满,使用default时,也要至少预留2个dialog process
  • 分包的大小,原则是“对于每个分包,其单独的处理时间不要超过300秒”

猜你喜欢

转载自blog.csdn.net/nkGavinGuo/article/details/80949381