Fabric 1.4 源码分析 背书节点和链码容器交互


1. 准备


func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) {
    cname := chaincodeName + ":" + chaincodeVersion
    if h := cs.HandlerRegistry.Handler(cname); h != nil {
        return h, nil

    ccci, err := cs.Lifecycle.ChaincodeContainerInfo(chaincodeName, qe)
    if err != nil {
        // TODO: There has to be a better way to do this...
        if cs.UserRunsCC {
                "You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?",

        return nil, errors.Wrapf(err, "[channel %s] failed to get chaincode container info for %s", chainID, cname)

    if err := cs.Launcher.Launch(ccci); err != nil {
        return nil, errors.Wrapf(err, "[channel %s] could not launch chaincode %s", chainID, cname)

    h := cs.HandlerRegistry.Handler(cname)
    if h == nil {
        return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname)

    return h, nil

type ChaincodeContainerInfo struct {
    Name        string
    Version     string
    Path        string
    Type        string
    CodePackage []byte

    // ContainerType is not a great name, but 'DOCKER' and 'SYSTEM' are the valid types
    ContainerType string

Launch()主要实现方法在core/chaincode/runtime_launcher.go Launch()方法。在该方法中,会调用r.Runtime.Start(ccci, codePackage)启动链码,在该方法中,首先会调用c.LaunchConfig(cname, ccci.Type)生成创建链码所需的参数LaunchConfig(链码类型go/java/nodejs,以及TLS配置),然后构造启动链码容器请求StartContainerReq。接着调用c.Processor.Process(ccci.ContainerType, scr)正式启动链码容器。操作完成后,通过Launch()里面的select—case语句阻塞获取结果,并结束程序运行。

func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error {
    if !alreadyStarted {
        go func() {
            if err := r.Runtime.Start(ccci, codePackage); err != nil {
                startFailCh <- errors.WithMessage(err, "error starting container")
            exitCode, err := r.Runtime.Wait(ccci)
            if err != nil {
                launchState.Notify(errors.Wrap(err, "failed to wait on container exit"))
            launchState.Notify(errors.Errorf("container exited with %d", exitCode))

    var err error
    select {
    case <-launchState.Done():
        err = errors.WithMessage(launchState.Err(), "chaincode registration failed")
    case err = <-startFailCh:
        r.Metrics.LaunchFailures.With("chaincode", cname).Add(1)
    case <-timeoutCh:
        err = errors.Errorf("timeout expired while starting chaincode %s for transaction", cname)
        r.Metrics.LaunchTimeouts.With("chaincode", cname).Add(1)

    return err


func (si StartContainerReq) Do(v VM) error {
    return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)

2. 启动系统链码

启动系统链码(进程模式)的话,则v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)的实现是在core/container/inproccontroller/inproccontroller.go start()方法。

func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
    path := ccid.GetName() // name=Name-Version
    // 获取已注册的inprocContainer模版
    ipctemplate := vm.registry.getType(path)
    instName := vm.GetVMName(ccid)
    // 构建chaincode实例ipc
    ipc, err := vm.getInstance(ipctemplate, instName, args, env)

    // 判断链码是否运行
    if ipc.running {
        return fmt.Errorf(fmt.Sprintf("chaincode running %s", path))

    ipc.running = true

    go func() {
        defer func() {
            if r := recover(); r != nil {
                inprocLogger.Criticalf("caught panic from chaincode  %s", instName)
        // 进程模式运行链码
        ipc.launchInProc(instName, args, env)

    return nil



func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error {
    if ipc.ChaincodeSupport == nil {
        inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()")
    // 建立peer侧接收链码侧发送通道
    peerRcvCCSend := make(chan *pb.ChaincodeMessage)
    // 建立链码侧接收peer侧发送通道
    ccRcvPeerSend := make(chan *pb.ChaincodeMessage)
    var err error
    // 传递链码侧Handler对象运行状态的通道
    ccchan := make(chan struct{}, 1)
    // 传递peer侧Handler对象运行状态的通道
    ccsupportchan := make(chan struct{}, 1)
    shimStartInProc := _shimStartInProc // shadow to avoid race in test
    go func() {
        defer close(ccchan)
        inprocLogger.Debugf("chaincode started for %s", id)
        if args == nil {
            args = ipc.args
        if env == nil {
            env = ipc.env
        // 启动系统链码
        err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend)
        if err != nil {
            err = fmt.Errorf("chaincode-support ended with err: %s", err)
            _inprocLoggerErrorf("%s", err)
        inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err)

    // shadow function to avoid data race
    inprocLoggerErrorf := _inprocLoggerErrorf
    go func() {
        defer close(ccsupportchan)
        inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend)
        inprocLogger.Debugf("chaincode-support started for  %s", id)
        // 启动peer侧Handler处理句柄,创建消息循环,处理链码侧发送的消息
        err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream)
        if err != nil {
            err = fmt.Errorf("chaincode ended with err: %s", err)
            inprocLoggerErrorf("%s", err)
        inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err)
    // 阻塞等待消息处理
    select {
    // 链码侧退出,关闭peer侧接收链码侧发送通道
    case <-ccchan:
        inprocLogger.Debugf("chaincode %s quit", id)
    // peer侧chaincode support退出
    case <-ccsupportchan:
        inprocLogger.Debugf("chaincode support %s quit", id)
    case <-ipc.stopChan:
        inprocLogger.Debugf("chaincode %s stopped", id)
    return err
  • 链码侧:


// Register on the stream
chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil {
    return errors.WithMessage(err, "error sending chaincode REGISTER")
  • peer侧:



3. 启动应用链码

当启动应用链码(docker容器模式)时,Start()接口实现为core/container/dockercontroller/dockercontroller.go Start()方法。



func userChaincodeStreamGetter(name string) (PeerChaincodeStream, error) {
    flag.StringVar(&peerAddress, "peer.address", "", "peer address")
    // Establish connection with validating peer
    // 与peer建立连接
    clientConn, err := newPeerClientConnection()
    // 创建链码支持服务客户端
    chaincodeSupportClient := pb.NewChaincodeSupportClient(clientConn)
    // Establish stream with validating peer
    // 调用Register()接口获取通信流
    stream, err := chaincodeSupportClient.Register(context.Background())
    return stream, nil


func (cs *ChaincodeSupport) Register(stream pb.ChaincodeSupport_RegisterServer) error {
    return cs.HandleChaincodeStream(stream)

4. 背书节点和链码交互

4.1 准备


  • 链码侧
switch handler.state {
case ready:
    err = handler.handleReady(msg, errc)
case established:
    err = handler.handleEstablished(msg, errc)
case created:
    err = handler.handleCreated(msg, errc)
    err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state)
  • peer侧
switch h.state {
case Created:
    return h.handleMessageCreatedState(msg)
case Ready:
    return h.handleMessageReadyState(msg)
    return errors.Errorf("handle message: invalid state %s for transaction %s", h.state, msg.Txid)


  1. 链码侧发送REGISTER消息
    • 首先进行各项基本配置,然后建立起与Peer节点的gRPC连接。
    • 创建Handler,并更改Handler状态为“Created”。
    • 发送REGISTER消息到peer节点。
    • 等待peer节点返回的信息
  2. peer侧接收REGISTER消息
    • 此时peer侧Handler状态为“Created”,调用handleMessageCreatedState()里面的HandleRegister()方法。
    • peer侧注册Handler,并发送REGISTERED消息给链码侧
    • 更新peer侧Handler状态为“Established”
    • 并且会调用notifyRegistry()方法,发送READY消息给链码侧,并更新状态为“Ready”
  3. 链码侧接收消息
    • 当链码侧接收REGISTERED消息,更新状态为Handler状态为“Established”
    • 当链码侧接收READY消息,更新状态为Handler状态为“Ready”


4.2 执行链码


4.2.1 实例化链码/升级链码操作


func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) {
    txParams.CollectionStore = h.getCollectionStore(msg.ChannelId)
    txParams.IsInitTransaction = (msg.Type == pb.ChaincodeMessage_INIT)
    // 创建交易上下文
    txctx, err := h.TXContexts.Create(txParams)
    if err != nil {
        return nil, err
    // 删除交易上下文
    defer h.TXContexts.Delete(msg.ChannelId, msg.Txid)
    if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil {
        return nil, err
    // 异步发送消息

    var ccresp *pb.ChaincodeMessage
    // 等待链码侧响应
    select {
    case ccresp = <-txctx.ResponseNotifier:
        // response is sent to user or calling chaincode. ChaincodeMessage_ERROR
        // are typically treated as error
    case <-time.After(timeout):
        err = errors.New("timeout expired while executing transaction")
        ccName := cccid.Name + ":" + cccid.Version
            "chaincode", ccName,

    return ccresp, err

当链码侧接收到ChaincodeMessage_INIT类型消息时会调用handler.handleInit(msg, errc)方法。

case pb.ChaincodeMessage_INIT:
        chaincodeLogger.Debugf("[%s] Received %s, initializing chaincode", shorttxid(msg.Txid), msg.Type)
        // Call the chaincode's Run function to initialize
        handler.handleInit(msg, errc)
        return nil
// handleInit handles request to initialize chaincode.
func (handler *Handler) handleInit(msg *pb.ChaincodeMessage, errc chan error) {
    go func() {
        var nextStateMsg *pb.ChaincodeMessage

        defer func() {
            // 协程结束时执行
            handler.triggerNextState(nextStateMsg, errc)
        // Get the function and args from Payload
        // 获取方法和参数
        input := &pb.ChaincodeInput{}
        unmarshalErr := proto.Unmarshal(msg.Payload, input)
        // Call chaincode's Run
        // Create the ChaincodeStub which the chaincode can use to callback
        stub := new(ChaincodeStub)
        err := stub.init(handler, msg.ChannelId, msg.Txid, input, msg.Proposal)
        // 执行链码的Init方法
        res := handler.cc.Init(stub)
        // Send COMPLETED message to chaincode support and change state
        nextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelId}
        chaincodeLogger.Debugf("[%s] Init succeeded. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_COMPLETED)

在handleInit(msg, errc)方法中,会反序列化msg.Payload为链码的输入,其中包含Args。然后调用链码的Init()方法,执行链码初始化流程。并将返回结果、链码事件、交易id以及通道id封装成ChaincodeMessage_COMPLETED类型的ChaincodeMessage发送给peer侧(triggerNextState()方法调用serialSendAsync()发送给peer)

当peer侧接收到对应消息。core/chaincode/handler.go handleMessageReadyState()。此时会调用Notify()方法把消息写入ResponseNotifier通道返回response。从而完成链码实例化/升级流程。

switch msg.Type {
case pb.ChaincodeMessage_COMPLETED, pb.ChaincodeMessage_ERROR:

4.2.2 调用链码

peer侧发送的消息类型为ChaincodeMessage_TRANSACTION。同理链码侧获取到ChaincodeMessage_TRANSACTION消息进行处理。会调用handler.handleTransaction(msg, errc)方法处理该类型消息。该类型消息执行流程和上述流程类似,只是此时调用的是链码的Invoke方法。再调用过程中会与状态数据库存在交互,因此会发送消息给peer侧,peer侧与状态数据库交互进行处理,完成后发送消息给链码侧,链码侧处理完成后发送ChaincodeMessage_COMPLETED消息给peer侧。

res := handler.cc.Invoke(stub)
  • 链码侧:
func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {
    // Access public data by setting the collection to empty string
    collection := ""
    return stub.handler.handleGetState(collection, key, stub.ChannelId, stub.TxID)

会在handleGetState()方法中调用callPeerWithChaincodeMsg()方法,再调用handler.sendReceive(msg, respChan)将消息类型ChaincodeMessage_GET_STATE的消息发送给peer侧。等待peer侧的消息返回,然后进行处理。处理完成后发送ChaincodeMessage_COMPLETED消息给peer侧。

  • peer侧:
    当peer侧获取到对应消息时会调用h.HandleTransaction(msg, h.HandleGetState)进行处理。最后将对应的消息封装成ChaincodeMessage_RESPONSE类型消息给链码侧。
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil




上述消息交互过程当中,Peer 和链码侧还会进行一项操作,那就是定期相互发送ChaincodeMessage_KEEPALIVE消息给对方,以确保彼此是在线状态。



