Processor Chains and the Pipeline Manager
Processor 就是一个执行一些功能和返回状态码的一个组件,状态码决定chain中下一个是哪一个processor被执行。
PipelineManager 可以使你动态的添加或删除processor.Pipeline要执行processor。需要调用runProcess().他首先会找请求的链,如果is enable,PipelineManager 调用runProcess(),如果isn't enable,就会抛出一个异常。
The following sections describe how to create a processor pipeline:
配置Pipeline Manager:
每一个processor chain 都是有pipeline manager 控制的,它位于/atg/commerce/
PipelineManager. The PipelineManager属性:
$class=atg.commerce.pipeline.CommercePipelineManager # The location of the XML configuration file in the classpath. # Default: /atg/commerce/CommercePipeline.xml definitionFile=/atg/commerce/commercepipeline.xml # The transaction manager that the PipelineManager will use to coordinate its # transactions. transactionManager=/atg/dynamo/transaction/TransactionManager # The amount of time in milliseconds that a thread will wait to execute a pipeline. # Default: 15000 msec chainLockWaitTimeout=15000 # # The schedule to reinitialize the PipelineManager. # scheduler=/atg/dynamo/service/Scheduler schedule=every 6 hours in 6 hours
definitionFile是定义processor chains的。
Creating Processors:
首先要实现atg.service.pipeline.PipelineProcessor interface。
public abstract interface PipelineProcessor { public static final String CLASS_VERSION = "$Id: //product/DAS/version/10.2/Java/atg/service/pipeline/PipelineProcessor.java#2 $$Change: 768621 $"; public static final int STOP_CHAIN_EXECUTION_AND_COMMIT = 0; public static final int STOP_CHAIN_EXECUTION = 0; public static final int STOP_CHAIN_EXECUTION_AND_ROLLBACK = -1; public abstract int runProcess(Object paramObject, PipelineResult paramPipelineResult) throws Exception; public abstract int[] getRetCodes(); }
public abstract class ProcProcessPaymentGroup extends GenericService implements PipelineProcessor { public static String CLASS_VERSION = "$Id: //product/DCS/version/10.2/Java/atg/commerce/payment/processor/ProcProcessPaymentGroup.java#2 $$Change: 768796 $"; public static final int SUCCESS = 1; protected void invokeProcessorAction(PaymentManagerAction pProcessorAction, PaymentManagerPipelineArgs pParams) throws CommerceException { PaymentStatus status = null; if (isLoggingDebug()) { logDebug("Obtained processorAction with: " + pProcessorAction); } if (pProcessorAction == PaymentManagerAction.AUTHORIZE) status = authorizePaymentGroup(pParams); else if (pProcessorAction == PaymentManagerAction.DEBIT) status = debitPaymentGroup(pParams); else if (pProcessorAction == PaymentManagerAction.CREDIT) status = creditPaymentGroup(pParams); else if (pProcessorAction == PaymentManagerAction.DECREASE_AUTH_AMT) status = decreaseAuthorizationForPaymentGroup(pParams); else { throw new CommerceException("Invalid processor action specified: " + pProcessorAction); } pParams.setPaymentStatus(status); } public PaymentStatus decreaseAuthorizationForPaymentGroup(PaymentManagerPipelineArgs pParams) throws CommerceException { return null; } public abstract PaymentStatus authorizePaymentGroup(PaymentManagerPipelineArgs paramPaymentManagerPipelineArgs) throws CommerceException; public abstract PaymentStatus debitPaymentGroup(PaymentManagerPipelineArgs paramPaymentManagerPipelineArgs) throws CommerceException; public abstract PaymentStatus creditPaymentGroup(PaymentManagerPipelineArgs paramPaymentManagerPipelineArgs) throws CommerceException; public int runProcess(Object pParam, PipelineResult pResult) throws Exception { PaymentManagerPipelineArgs params = (PaymentManagerPipelineArgs)pParam; PaymentManagerAction action = params.getAction(); try { invokeProcessorAction(action, params); } catch (CommerceException e) { logError(e); pResult.addError("ProcProcessPaymentGroupFailed", e); return 0; } return 1; } public int[] getRetCodes() { int[] retCodes = { 1 }; return retCodes; } }
Pipeline Definition Files:
PipelineManager:定义文件根节点
pipelinechain: 定义一个procesor chain
- name:名字
- transaction:processor默认使用的事物模式
- headlink:在processor chain中的第一个processor被执行
- classname:将被实例化和用于PipelineChain对象,默认是atg.service.pipeline.PipelineChain.而且一定是他的子类
- resultclassname:用于pipleineResult对象,The default is atg.service.pipeline.PipelineResult. The value must implement PipelineResult.
pipelinelink:在 processor chain中定义processor.
- name:processor的名字
- transaction
processor:The name of the PipelineProcessor object.
- jndi :所引用processor类,这个对象是通过JNDI来解决的e
transition:根绝返回值决定下一个将要被执行的引用
- returnvalue:An integer string that is used to define the next pipeline element.
- 下一个将会被执行的pipelineprocessor,如果当前的返回值匹配return value
<?xml version="1.0"?> <!DOCTYPE PipelineManager SYSTEM "PipelineManager.dtd"> <PipelineManager> <pipelinechain name="AddToCart" transaction="TX_REQUIRED" headlink="proc1"> <pipelinelink name="proc1"> <processor class="atg.commerce.addA"/> <transition returnvalue="1" link="proc2"/> <transition returnvalue="2" link="proc3"/> </pipelinelink> <pipelinelink name="proc2" transaction="TX_REQUIRES_NEW"> <processor class="atg.commerce.addB"/> <transition returnvalue="1" link="proc4"/> <transition returnvalue="2" link="proc5"/> </pipelinelink> <pipelinelink name="proc3"> <processor class="atg.commerce.addE"/> <transition returnvalue="1" link="proc6"/> <transition returnvalue="2" link="proc7"/> <transition returnvalue="3" link="proc2"/> </pipelinelink> <pipelinelink name="proc4"> <processor class="atg.commerce.addC"/> </pipelinelink> <pipelinelink name="proc5" transaction="TX_REQUIRES_NEW"> <processor class="atg.commerce.addD"/> </pipelinelink> <pipelinelink name="proc6" transaction="TX_NOT_SUPPORTED"> <processor class="atg.commerce.addF"/> </pipelinelink> <pipelinelink name="proc7" transaction="TX_SUPPORTS"> <processor jndi="/dynamo/atg/commerce/addG"/> </pipelinelink> </pipelinechain> <pipelinechain name="RemoveFromCart" transaction="TX_REQUIRED" headlink="proc99" classname="atg.service.pipeline.PipelineMonoChain"> <pipelinelink name="proc99"> <processor class="atg.commerce.removeA"/> </pipelinelink> </pipelinechain> </PipelineManager>