使用异步RFC实现并行处理
通过阅读本文,你将了解到以下知识点:
- 什么是异步RFC,即aRFC
- 一个最简单的例子
- 真实项目中的的示例代码
- 使用异步RFC的注意事项
1. 异步RFC的基本原理
在处理大数据量的工作时,ABAP提供了并行处理的机制。实现并行处理的常用方式有两种:background job的方式和asynchronous RFC (aRFC)的方式。
所谓并行处理,也即将要处理的目标数据,按一定的规则划分成多个独立的package,将每个package分发到不同的application server instance (AS instance)上处理。
其原理图如下:
2. 查看AS instance
使用SM51可以查看到所有的的AS instance,双击(Tx: SM50)可进入查看该instance上进程的使用情况。
对于AS instance,可以使用RZ12对其进行分组。在使用aRFC进行并行处理时,可以指定其在此定义的server group.
补充知识: SMLG是另一种给server instance分组的方式,这个分组用于LOGON.
3.一个最简单的示例代码
使用aRFC做并行处理,其关键字为
1) STARTING NEW TASK task_name, 这句话会开启一个新的dialog process
2) DESTINATION IN GROUP group_name, 这句话指定dialog process运行在哪个server group上,若使用DEFAULT,则会随机分配一个空闲的AS instance上
3) PERFORMING call_back ON END OF TASK, 异步进程结束后的回调函数
4) RECEIVE RESULTS FROM FUNCTION function_name, 用于接收异步进程处理返回的结果
下面这个例子中,会循环10次,开启异步进程查看系统的自身信息。
TYPES: BEGIN OF task_type,
name TYPE string,
dest TYPE string,
END OF task_type.
DATA: snd_jobs TYPE i,
rcv_jobs TYPE i,
exc_flag TYPE i,
info TYPE rfcsi,
mess TYPE c LENGTH 80,
indx TYPE c LENGTH 4,
name TYPE c LENGTH 8,
task_list TYPE STANDARD TABLE OF task_type,
task_wa TYPE task_type.
DO 10 TIMES.
indx = sy-index.
CONCATENATE 'Task' indx INTO name.
CALL FUNCTION 'RFC_SYSTEM_INFO'
STARTING NEW TASK name
DESTINATION IN GROUP DEFAULT "' 390' "pRFC
PERFORMING rfc_info ON END OF TASK
EXCEPTIONS
system_failure = 1 MESSAGE mess
communication_failure = 2 MESSAGE mess
resource_failure = 3.
CASE sy-subrc.
WHEN 0.
snd_jobs = snd_jobs + 1.
WHEN 1 OR 2.
MESSAGE mess TYPE 'I'.
WHEN 3.
IF snd_jobs >= 1 AND
exc_flag = 0.
exc_flag = 1.
WAIT UNTIL rcv_jobs >= snd_jobs
UP TO 5 SECONDS.
ENDIF.
IF sy-subrc = 0.
exc_flag = 0.
ELSE.
MESSAGE 'Resource failure' TYPE 'I'.
ENDIF.
WHEN OTHERS.
MESSAGE 'Other error' TYPE 'I'.
ENDCASE.
ENDDO.
WAIT UNTIL rcv_jobs >= snd_jobs.
LOOP AT task_list INTO task_wa.
WRITE: / task_wa-name, ', Server:', task_wa-dest.
ENDLOOP.
FORM rfc_info USING name.
task_wa-name = name.
rcv_jobs = rcv_jobs + 1.
RECEIVE RESULTS FROM FUNCTION 'RFC_SYSTEM_INFO'
IMPORTING
rfcsi_export = info
EXCEPTIONS
system_failure = 1 MESSAGE mess
communication_failure = 2 MESSAGE mess.
IF sy-subrc = 0.
task_wa-dest = info-rfcdest.
ELSE.
task_wa-dest = mess.
ENDIF.
APPEND task_wa TO task_list.
ENDFORM.
运行结果:
可以看出,异步运行的结果返回顺序,并不是完全和task的发送顺序一致的。
4.实战代码
在实际业务中使用aRFC做并行处理,可参考以下的设计思路,即首先要确定分包的大小,分好package后,调用aRFC并行处理。几个重要的参数 -
- gt_package 一般用于分包的存储,一般由两部分构成,即task_name和keys;
- gt_result, 用于结果的接收;
- gv_snd_jobs , 用于记录发送的job数量;
- gv_rcv_jobs, 用于记录已经接收的job数量;
- gv_pp_running, 用于记录正在运行的job数量,控制负载。
注意:运行下列代码,需要提前创建structure - zflight_s_key和zflight_t_key,以及函数ztest_flight_getdetail (源代码如下).
*&---------------------------------------------------------------------*
*& Report ZTEST_ASYNC_RFC
*&---------------------------------------------------------------------*
*&
*&---------------------------------------------------------------------*
REPORT ztest_async_rfc.
**global type, data
*TYPES: BEGIN OF gty_s_key_flight,
* carrid TYPE sflight-carrid,
* connid TYPE sflight-connid,
* fldate TYPE sflight-fldate,
* END OF gty_s_key_flight.
*TYPES: gty_t_key_flight TYPE TABLE OF gty_s_key_flight.
TYPES: BEGIN OF gty_s_flight_package,
task_name TYPE char10,
keys TYPE zflight_t_key,
END OF gty_s_flight_package.
DATA: gt_package TYPE TABLE OF gty_s_flight_package,
gt_results TYPE TABLE OF sflight,
gv_snd_jobs TYPE i,
gv_rcv_jobs TYPE i,
gv_pp_running TYPE i.
**selection screen
SELECTION-SCREEN BEGIN OF BLOCK svr_group WITH FRAME TITLE title.
PARAMETERS:
p_group TYPE rzllitab-classname DEFAULT 'parallel_generators' MODIF ID svr,
p_parpro TYPE int1 DEFAULT 3 MODIF ID svr,
p_size TYPE i DEFAULT 5 MODIF ID svr. " normally 100, 1000"
SELECTION-SCREEN END OF BLOCK svr_group.
INITIALIZATION.
title = 'Settings for Parallel Processing'.
START-OF-SELECTION.
"..build packages"
PERFORM build_packages.
"..parallel processing"
PERFORM parallel_processing.
"..display result"
PERFORM display_result.
FORM build_packages.
DATA: lv_cursor TYPE cursor,
lv_index TYPE char4,
lv_task_counter TYPE char8,
lt_keys TYPE zflight_t_key,
ls_package TYPE gty_s_flight_package.
OPEN CURSOR WITH HOLD @lv_cursor FOR
SELECT carrid, connid, fldate
FROM sflight
WHERE carrid = 'AA' OR carrid = 'AZ'.
DO.
lv_index = sy-index.
CONCATENATE 'TASK' lv_index INTO lv_task_counter.
FETCH NEXT CURSOR @lv_cursor
INTO TABLE @lt_keys
PACKAGE SIZE @p_size.
IF sy-subrc <> 0.
EXIT.
ENDIF.
ls_package = VALUE #( task_name = lv_task_counter
keys = lt_keys ).
APPEND ls_package TO gt_package.
ENDDO.
ENDFORM.
FORM parallel_processing.
DATA: lv_wp_total TYPE i,
lv_wp_free TYPE i,
lv_curr_wp_total TYPE i,
lv_curr_wp_free TYPE i,
lv_allow_wp TYPE i.
IF p_group IS NOT INITIAL.
" Get the list of application server available and those are free (no of WP & free WP)"
CALL FUNCTION 'SPBT_INITIALIZE'
EXPORTING
group_name = p_group
IMPORTING
max_pbt_wps = lv_wp_total
free_pbt_wps = lv_wp_free
EXCEPTIONS
invalid_group_name = 1
internal_error = 2
pbt_env_already_initialized = 3
currently_no_resources_avail = 4
no_pbt_resources_found = 5
cant_init_different_pbt_groups = 6
OTHERS = 7.
IF sy-subrc <> 0 AND sy-subrc <> 3.
MESSAGE 'No resources available.' TYPE 'E'.
EXIT.
ENDIF.
ENDIF.
LOOP AT gt_package INTO DATA(ls_package).
" generating jobs"
gv_snd_jobs = gv_snd_jobs + 1.
gv_pp_running = gv_pp_running + 1.
IF p_group IS NOT INITIAL.
DO.
" get the current available workprocess(time-dependent)"
CALL FUNCTION 'SPBT_GET_CURR_RESOURCE_INFO'
IMPORTING
max_pbt_wps = lv_curr_wp_total
free_pbt_wps = lv_curr_wp_free
EXCEPTIONS
internal_error = 1
pbt_env_not_initialized_yet = 2
OTHERS = 3.
IF p_parpro > 0.
lv_allow_wp = p_parpro.
ELSE.
"if maximum number of processes is not restricted, take only half of the free processes"
lv_allow_wp = lv_curr_wp_free / 2.
ENDIF.
" if current free wp is available with more than 2 wp"
" and the current running is less than allowed wp,"
" then there is enough wp for work of parallelization"
IF lv_curr_wp_free GT 2 AND gv_pp_running LT lv_allow_wp.
CALL FUNCTION 'ZTEST_FLIGHT_GETDETAIL'
STARTING NEW TASK ls_package-task_name
DESTINATION IN GROUP p_group
PERFORMING call_back ON END OF TASK
EXPORTING
it_keys = ls_package-keys
EXCEPTIONS
communication_failure = 1
system_failure = 2
resource_failure = 3.
IF sy-subrc <> 0.
MESSAGE ID sy-msgid TYPE sy-msgty NUMBER sy-msgno
WITH sy-msgv1 sy-msgv2 sy-msgv3 sy-msgv4.
ELSE.
EXIT.
ENDIF.
ELSE. " otherwise, it needs to wait until the wp are available"
WAIT UP TO 1 SECONDS.
CONTINUE.
ENDIF.
ENDDO.
ELSE.
"if no server group has been selected, then parallel processing use the Default Group"
CALL FUNCTION 'ZTEST_FLIGHT_GETDETAIL'
STARTING NEW TASK ls_package-task_name
DESTINATION IN GROUP DEFAULT
PERFORMING call_back ON END OF TASK
EXPORTING
it_keys = ls_package-keys
EXCEPTIONS
communication_failure = 1
system_failure = 2
resource_failure = 3.
IF sy-subrc <> 0.
MESSAGE ID sy-msgid TYPE sy-msgty NUMBER sy-msgno
WITH sy-msgv1 sy-msgv2 sy-msgv3 sy-msgv4.
EXIT.
ENDIF.
ENDIF.
ENDLOOP.
" wait, until all the jobs were received."
WAIT UNTIL gv_rcv_jobs >= gv_snd_jobs.
ENDFORM.
FORM call_back USING p_task TYPE clike.
DATA: lt_results TYPE STANDARD TABLE OF sflight.
" reveived jobs"
gv_rcv_jobs = gv_rcv_jobs + 1.
" retrieve results from parallelization"
RECEIVE RESULTS FROM FUNCTION 'ZTEST_FLIGHT_GETDETAIL'
IMPORTING et_results = lt_results.
IF sy-subrc = 0.
" running jobs"
gv_pp_running = gv_pp_running - 1.
ENDIF.
APPEND LINES OF lt_results TO gt_results.
ENDFORM.
FORM display_result.
DATA: lo_alv TYPE REF TO cl_salv_table.
CALL METHOD cl_salv_table=>factory
IMPORTING
r_salv_table = lo_alv
CHANGING
t_table = gt_results.
lo_alv->display( ).
ENDFORM.
这里调用的函数是标准示例函数的一个封装,其封装的内容如下 -
FUNCTION ztest_flight_getdetail.
*"----------------------------------------------------------------------
*"*"Local Interface:
*" IMPORTING
*" VALUE(IT_KEYS) TYPE ZFLIGHT_T_KEY
*" EXPORTING
*" VALUE(ET_RESULTS) TYPE SFLIGHT_TAB2
*"----------------------------------------------------------------------
DATA:
lt_sflight TYPE TABLE OF sflight.
ASSERT NOT cl_rfc=>is_external_direct( ).
LOOP AT it_keys INTO DATA(ls_key).
CALL FUNCTION 'FLIGHT_DETAILS'
EXPORTING
carrid = ls_key-carrid
connid = ls_key-connid
fldate = ls_key-fldate
TABLES
details = lt_sflight.
APPEND LINES OF lt_sflight TO et_results.
ENDLOOP.
ENDFUNCTION.
5.使用aRFC的注意事项
- 循环的退出条件要清晰,不要写成“死循环”
- 系统dialog process的分配原则,不要全部占满,使用default时,也要至少预留2个dialog process
- 分包的大小,原则是“对于每个分包,其单独的处理时间不要超过300秒”