一 代码解析
var util = require(‘util’);
var path = require(‘path’);
var fs = require(‘fs’);
var Peer = require(‘fabric-client/lib/Peer.js’);
var EventHub = require(‘fabric-client/lib/EventHub.js’);
var tx_id = null;
var nonce = null;
var config = require(’…/config.json’);
var helper = require(’./helper.js’);
var logger = helper.getLogger(‘Join-Channel’);
//helper.hfc.addConfigFile(path.join(__dirname, ‘network-config.json’));
var ORGS = helper.ORGS;
var allEventhubs = [];
//
//Attempt to send a request to the orderer with the sendCreateChain method
// 在admin user下 获取 channel的 genises.block 然后 指定peers加入channel
//
var joinChannel = function(channelName, peers, username, org) {
//关闭事件回调连接
var closeConnections = function(isSuccess) {
if (isSuccess) {
logger.debug(’\n============ Join Channel is SUCCESS \n’);
} else {
logger.debug(’\n!!! ERROR: Join Channel FAILED !!!\n’);
}
logger.debug(’’);
for (var key in allEventhubs) {
var eventhub = allEventhubs[key];
if (eventhub && eventhub.isconnected()) {
//logger.debug(‘Disconnecting the event hub’);
eventhub.disconnect();
}
}
};
//logger.debug(’\n Join Channel ============\n’)
logger.info(util.format(
‘Calling peers in organization “%s” to join the channel’, org));
var client = helper.getClientForOrg(org);
var channel = helper.getChannelForOrg(org);
var eventhubs = [];
// 1 在管理员环境下 获取原始块(peer想要加入channel 就得需要这个东西)
return helper.getOrgAdmin(org).then((admin) => {
logger.info(util.format('received member object for admin of the organization “%s”: ', org));
tx_id = client.newTransactionID();
let request = {
txId : tx_id
};
return channel.getGenesisBlock(1)(request);
}).then((genesis_block) => {
// 2 封装加入到channel的请求
tx_id = client.newTransactionID();
var request = {
targets: helper.newPeers(peers, org)(2),
txId: tx_id,
block: genesis_block
};
// 3.1 设置事件监听连接
eventhubs = helper.newEventHubs(2)(peers, org);
for (let key in eventhubs) {
let eh = eventhubs[key];
eh.connect();//
allEventhubs.push(eh);
}
var eventPromises = [];
eventhubs.forEach((eh) => {
let txPromise = new Promise((resolve, reject) => {
//3.2 监听区块信息 并设置超时时间
let handle = setTimeout(reject, parseInt(config.eventWaitTime));
eh.registerBlockEvent((block) => {
clearTimeout(handle);
// in real-world situations, a peer may have more than one channels so
// we must check that this block came from the channel we asked the peer to join
if (block.data.data.length === 1) {
// Config block must only contain one transaction
var channel_header = block.data.data[0].payload.header.channel_header;
//3.3 属于本channel的信息 就继续执行 否则就拒绝
if (channel_header.channel_id === channelName) {
resolve();
}
else {
reject();
}
}
});
});
//3.4 设置 eventPromises集合
eventPromises.push(txPromise);
});
//4 请求加入到channel
let sendPromise = channel.joinChannel(request)(3);
// 5 promise流 ( 监听事件 发送事件 ) 的 promise 全荣俱荣 一损俱损
return Promise.all([sendPromise].concat(eventPromises));
}, (err) => {
logger.error(‘Failed to enroll user ‘’ + username + ‘’ due to error: ’ +
err.stack ? err.stack : err);
throw new Error(‘Failed to enroll user ‘’ + username +
‘’ due to error: ’ + err.stack ? err.stack : err);
}).then((results) => {
//7 根据state 判断结果 如果到这里了 说明是sendPromise 返回的结果
logger.debug(util.format(‘Join Channel R E S P O N S E : %j’, results));
// 加入成功
if (results[0] && results[0][0] && results[0][0].response && results[0][0]
.response.status == 200) {
logger.info(util.format(
‘Successfully joined peers in organization %s to the channel ‘%s’’,
org, channelName));
closeConnections(true);
let response = {
success: true,
message: util.format(
‘Successfully joined peers in organization %s to the channel ‘%s’’,
org, channelName)
};
return response;
} else {
//加入失败
logger.error(’ Failed to join channel’);
closeConnections();
throw new Error(‘Failed to join channel’);
}
}, (err) => {
logger.error('Failed to join channel due to error: ’ + err.stack ? err.stack :
err);
closeConnections();
throw new Error('Failed to join channel due to error: ’ + err.stack ? err.stack :
err);
});
};
二api深度追踪
(1)
-
@typedef {Object} OrdererRequest
* @property {TransactionID} txId - Optional. Object with the transaction id and nonce
* @property {Orderer} orderer - Optional. The orderer instance or string name
* of the orderer to retrieve genesis block from
/
/*
* A channel’s first block is called the “genesis block”. This block captures the
* initial channel configuration. For a peer node to join the channel, it must be
* provided the genesis block. This method must be called before calling
* [joinChannel()]{@link Channel#joinChannel}.
*
* @param {OrdererRequest} request - Optional - A transaction ID object
* @returns {Promise} A Promise for an encoded protobuf “Block”
*/getGenesisBlock(request) { logger.debug('getGenesisBlock - start'); if (!request) { request = {}; } // verify that we have an orderer configured var orderer = this._clientContext.getTargetOrderer(request.orderer, this._orderers, this._name); var signer = null; var tx_id = request.txId; if (!tx_id) { signer = this._clientContext._getSigningIdentity(true); tx_id = new TransactionID(signer, true); } else { signer = this._clientContext._getSigningIdentity(tx_id.isAdmin()); } // now build the seek info, will be used once the channel is created // to get the genesis block back // build start var seekSpecifiedStart = new _abProto.SeekSpecified(); seekSpecifiedStart.setNumber(0); var seekStart = new _abProto.SeekPosition(); seekStart.setSpecified(seekSpecifiedStart); // build stop var seekSpecifiedStop = new _abProto.SeekSpecified(); seekSpecifiedStop.setNumber(0); var seekStop = new _abProto.SeekPosition(); seekStop.setSpecified(seekSpecifiedStop); // seek info with all parts var seekInfo = new _abProto.SeekInfo(); seekInfo.setStart(seekStart); seekInfo.setStop(seekStop); seekInfo.setBehavior(_abProto.SeekInfo.SeekBehavior.BLOCK_UNTIL_READY); // build the header for use with the seekInfo payload var seekInfoHeader = clientUtils.buildChannelHeader( _commonProto.HeaderType.DELIVER_SEEK_INFO, this._name, tx_id.getTransactionID(), this._initial_epoch, null, clientUtils.buildCurrentTimestamp(), orderer.getClientCertHash() ); var seekHeader = clientUtils.buildHeader(signer, seekInfoHeader, tx_id.getNonce()); var seekPayload = new _commonProto.Payload(); seekPayload.setHeader(seekHeader); seekPayload.setData(seekInfo.toBuffer()); var seekPayloadBytes = seekPayload.toBuffer(); let sig = signer.sign(seekPayloadBytes); let signature = Buffer.from(sig); // building manually or will get protobuf errors on send var envelope = { signature: signature, payload: seekPayloadBytes }; return orderer.sendDeliver(envelope); }
(2)
function newRemotes(names, forPeers, userOrg) {
let client = getClientForOrg(userOrg);
let targets = [];
// find the peer that match the names
for (let idx in names) {
let peerName = names[idx];
if (ORGS[userOrg].peers[peerName]) {
// found a peer matching the name
let data = fs.readFileSync(path.join(__dirname, ORGS[userOrg].peers[peerName][‘tls_cacerts’]));
let grpcOpts = {
pem: Buffer.from(data).toString(),
‘ssl-target-name-override’: ORGS[userOrg].peers[peerName][‘server-hostname’]
};
if (forPeers) {
targets.push(client.newPeer(ORGS[userOrg].peers[peerName].requests, grpcOpts));
} else {
let eh = client.newEventHub()//寻找到peer节点,;
eh.setPeerAddr(ORGS[userOrg].peers[peerName].events, grpcOpts);
targets.push(eh);
}
}
}
if (targets.length === 0) {
logger.error(util.format(‘Failed to find peers matching the names %s’, names));
}
return targets;
(3)
* For a peer node to become part of a channel, it must be sent the genesis
* block, as explained [here]{@link Channel#getGenesisBlock}. This method
* sends a join channel proposal to one or more endorsing peers.
*
* @param {JoinChannelRequest} request
* @param {Number} timeout - A number indicating milliseconds to wait on the
* response before rejecting the promise with a
* timeout error. This overrides the default timeout
* of the {@link Peer} instance(s) and the global timeout in the config settings.
* @returns {Promise} A Promise for an array of {@link ProposalResponse} from the target peers
*/
joinChannel(request, timeout) {
logger.debug('joinChannel - start');
var errorMsg = null;
// verify that we have targets (Peers) to join this channel
// defined by the caller
if (!request) {
errorMsg = 'Missing all required input request parameters';
}
// verify that we have transaction id
else if (!request.txId) {
errorMsg = 'Missing txId input parameter with the required transaction identifier';
}
else if (!request.block) {
errorMsg = 'Missing block input parameter with the required genesis block';
}
if (errorMsg) {
logger.error('joinChannel - error ' + errorMsg);
throw new Error(errorMsg);
}
var targets = this._getTargets(request.targets); //no role, will get all peers
var signer = this._clientContext._getSigningIdentity(request.txId.isAdmin());
var chaincodeInput = new _ccProto.ChaincodeInput();
var args = [];
args.push(Buffer.from('JoinChain', 'utf8'));
args.push(request.block.toBuffer());
chaincodeInput.setArgs(args);
var chaincodeID = new _ccProto.ChaincodeID();
chaincodeID.setName(Constants.CSCC);
var chaincodeSpec = new _ccProto.ChaincodeSpec();
chaincodeSpec.setType(_ccProto.ChaincodeSpec.Type.GOLANG);
chaincodeSpec.setChaincodeId(chaincodeID);
chaincodeSpec.setInput(chaincodeInput);
var channelHeader = clientUtils.buildChannelHeader(
_commonProto.HeaderType.ENDORSER_TRANSACTION,
'',
request.txId.getTransactionID(),
null, //no epoch
Constants.CSCC,
clientUtils.buildCurrentTimestamp(),
targets[0].getClientCertHash()
);
var header = clientUtils.buildHeader(signer, channelHeader, request.txId.getNonce());
var proposal = clientUtils.buildProposal(chaincodeSpec, header);
var signed_proposal = clientUtils.signProposal(signer, proposal);
return clientUtils.sendPeersProposal(targets, signed_proposal, timeout)(4)
.then(
function (responses) {
return Promise.resolve(responses);
}
).catch(
function (err) {
logger.error('joinChannel - Failed Proposal. Error: %s', err.stack ? err.stack : err);
return Promise.reject(err);
}
);
}
(4)
- This function will return one Promise when sending a proposal to many peers
*/
module.exports.sendPeersProposal = function (peers, proposal, timeout) {
let targets = peers;
if (!Array.isArray(peers)) {
targets = [peers];
}
// make function to return an individual promise
const fn = function (peer) {
return new Promise(function (resolve, reject) {
peer.sendProposal(proposal, timeout)(5).then(
function (result) {
resolve(result);
}
).catch(
function (err) {
logger.error(‘sendPeersProposal - Promise is rejected: %s’,
err.stack ? err.stack : err);
return reject(err);
}
);
});
};
(5)
/**
* Send an endorsement proposal to an endorser. This is used to call an
* endorsing peer to execute a chaincode to process a transaction proposal,
* or runs queries.
*
* @param {Proposal} proposal - A protobuf encoded byte array of type
* [Proposal]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/peer/proposal.proto#L134}
* @param {Number} timeout - A number indicating milliseconds to wait on the
* response before rejecting the promise with a
* timeout error. This overrides the default timeout
* of the Peer instance and the global timeout in the config settings.
* @returns {Promise} A Promise for a {@link ProposalResponse}
*/
sendProposal(proposal, timeout) {
logger.debug('Peer.sendProposal - Start');
let self = this;
let rto = self._request_timeout;
if (typeof timeout === 'number')
rto = timeout;
if(!proposal) {
return Promise.reject(new Error('Missing proposal to send to peer'));
}
// Send the transaction to the peer node via grpc
// The rpc specification on the peer side is:
// rpc ProcessProposal(Proposal) returns (ProposalResponse) {}
return new Promise(function(resolve, reject) {
var send_timeout = setTimeout(function(){
logger.error('sendProposal - timed out after:%s', rto);
return reject(new Error('REQUEST_TIMEOUT'));
}, rto);
self._endorserClient.processProposal(proposal, function(err, proposalResponse) {
clearTimeout(send_timeout);
if (err) {
logger.debug('Received proposal response from: %s status: %s',self._url, err);
if(err instanceof Error) {
reject(err);
}
else {
reject(new Error(err));
}
} else {
if (proposalResponse) {
logger.debug('Received proposal response from peer "%s": status - %s', self._url, proposalResponse.response.status);
resolve(proposalResponse);
} else {
logger.error('GRPC client failed to get a proper response from the peer "%s".', self._url);
reject(new Error(util.format('GRPC client failed to get a proper response from the peer "%s".', self._url)));
}
}
});
});
}