

func (this *fastSync) processFastSyncContent(latest *types.Header) error {

​    stateSync := this.syncState(latest.Root)

​    defer stateSync.Cancel()

​    go func() {

​        if err := stateSync.Wait(); err != nil {

​            this.syncer.sch.Close() // wake up WaitResults

​        }

​    }()

​    pivot := this.syncer.sch.FastSyncPivot()

​    for {

​        results := this.syncer.sch.WaitResults()

​        if len(results) == 0 {

​            return stateSync.Cancel()

​        }

​        if this.chainInsertHook != nil {

​            this.chainInsertHook(results)

​        }

​        P, beforeP, afterP := splitAroundPivot(pivot, results)

​        if err := this.commitFastSyncData(beforeP, stateSync); err != nil {

​            return err

​        }

​        if P != nil {

​            stateSync.Cancel()

​            if err := this.commitPivotBlock(P); err != nil {

​                return err

​            }

​        }

​        if err := this.importBlockResults(afterP); err != nil {

​            return err

​        }

​    }



func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
	for _, result := range results {
		num := result.Header.Number.Uint64()
		switch {
		case num < pivot:
			before = append(before, result)
		case num == pivot:
			p = result
			after = append(after, result)
	return p, before, after


func (this *fastSync) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
	for len(results) != 0 {
		// Check for any termination requests.
		select {
		case <-this.syncer.quitCh:
			return errCancelContentProcessing
		case <-stateSync.done:
			if err := stateSync.Wait(); err != nil {
				return err
		// Retrieve the a batch of results to import
		items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
		first, last := results[0].Header, results[items-1].Header
		log.Debug("Inserting fast-sync blocks", "items", len(results),
			"firstnum", first.Number, "firsthash", first.Hash(),
			"lastnumn", last.Number, "lasthash", last.Hash(),
		blocks := make([]*types.Block, items)
		receipts := make([]types.Receipts, items)
		for i, result := range results[:items] {
			blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
			receipts[i] = result.Receipts
		if index, err := bc.InstanceBlockChain().InsertReceiptChain(blocks, receipts); err != nil {
			log.Debug("fast synced item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
			return errInvalidChain
		// Shift the results to the next batch
		results = results[items:]
	return nil


func (this *fastSync) commitPivotBlock(result *fetchResult) error {
	b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	// Sync the pivot block state. This should complete reasonably quickly because
	// we've already synced up to the reported head block state earlier.
	if err := this.syncState(b.Root()).Wait(); err != nil {
		return err
	log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
	if _, err := bc.InstanceBlockChain().InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
		return err
	return bc.InstanceBlockChain().FastSyncCommitHead(b.Hash())


func (this *fastSync) importBlockResults(results []*fetchResult) error {
	for len(results) != 0 {
		// Check for any termination requests. This makes clean shutdown faster.
		select {
		case <-this.syncer.quitCh:
			return errCancelContentProcessing
		// Retrieve the a batch of results to import
		items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
		first, last := results[0].Header, results[items-1].Header
		log.Debug("Inserting fast synced chain", "items", len(results),
			"firstnum", first.Number, "firsthash", first.Hash(),
			"lastnum", last.Number, "lasthash", last.Hash(),
		blocks := make([]*types.Block, items)
		for i, result := range results[:items] {
			blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
		if index, err := bc.InstanceBlockChain().InsertChain(blocks); err != nil {
			log.Debug("fast synced item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
			if err == consensus.ErrInvalidblockbutnodrop {
				return consensus.ErrInvalidblockbutnodrop
			return errInvalidChain
		// Shift the results to the next batch
		results = results[items:]
	return nil


func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
	// Do a sanity check that the provided chain is actually ordered and linked
	for i := 1; i < len(chain); i++ {
		if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
			// Chain broke ancestry, log a messge (programming error) and skip insertion
			log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
				"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
		return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
			chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
// Pre-checks passed, start the full block imports
defer bc.wg.Done()

defer bc.chainmu.Unlock()

// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
var (
	stats         = insertStats{startTime: mclock.Now()}
	events        = make([]interface{}, 0, len(chain))
	lastCanon     *types.Block
	coalescedLogs []*types.Log
// Start the parallel header verifier
headers := make([]*types.Header, len(chain))
seals := make([]bool, len(chain))

for i, block := range chain {
	headers[i] = block.Header()
	seals[i] = true

var mode config.SyncMode
if len(headers) == 1 {
	mode = config.FullSync
} else {
	mode = config.FastSync
abort, results := bc.engine.VerifyHeaders(bc, headers, seals, mode)

defer close(abort)

// Iterate over the blocks and insert when the verifier permits
for i, block := range chain {
	// If the chain is terminating, stop processing blocks
	if atomic.LoadInt32(&bc.procInterrupt) == 1 {
		log.Debug("Premature abort during blocks processing")

	// Wait for the block's verification to complete
	bstart := time.Now()

	err := <-results
	if err == nil {
		err = bc.Validator().ValidateBody(block)
	// Create a new statedb using the parent block and report an
	// error if it fails.
	var parent *types.Block
	if i == 0 {
		parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
	} else {
		parent = chain[i-1]
	state, err := state.New(parent.Root(), bc.stateCache)
	if err != nil {
		return i, events, coalescedLogs, err
	// Process block using the parent state as reference point.
	receipts, logs, usedGas, err := bc.processor.Process(block, state)
	if err != nil {
		bc.reportBlock(block, receipts, err)
		return i, events, coalescedLogs, err
	// Validate the state using the default validator
	err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas)
	if err != nil {
		//for i, tx := range block.Transactions() {
		bc.reportBlock(block, receipts, err)
		return i, events, coalescedLogs, err
	// Write the block to the chain and get the status.
	log.Info("----> Write Block and State From Outside", "number", block.Number(), "hash", block.Hash(), "difficulty", block.Difficulty())
	status, err := bc.WriteBlockAndState(block, receipts, state)
	if err != nil {
		return i, events, coalescedLogs, err
	switch status {
	case CanonStatTy:
		log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
			"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))

		log.Info("Inserted new block", "number", block.Number(), "hash", block.Hash())

		coalescedLogs = append(coalescedLogs, logs...)
		events = append(events, ChainEvent{block, block.Hash(), logs})
		lastCanon = block

	case SideStatTy:
		log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
			common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))

		events = append(events, ChainSideEvent{block})

	stats.usedGas += usedGas.Uint64()
	stats.report(chain, i)

// Append a single chain head event if we've progressed the chain
if lastCanon != nil && bc.LastBlockHash() == lastCanon.Hash() {
	events = append(events, ChainHeadEvent{lastCanon})
return 0, events, coalescedLogs, nil



func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, *big.Int, error) {
	var (
		receipts     types.Receipts
		receipt      *types.Receipt
		errs         error
		totalUsedGas = big.NewInt(0)
		header       = block.Header()
		allLogs      []*types.Log
		gp           = new(GasPool).AddGas(block.GasLimit())
	// Iterate over and process the individual transactions
	for i, tx := range block.Transactions() {
		statedb.Prepare(tx.Hash(), block.Hash(), i)
		msg, err := tx.AsMessage(types.MakeSigner(p.config))
		if err != nil {
			return nil, nil, nil, err
		//the tx without contract
		if len(msg.Data()) != 0 {
			receipt, _, errs = ApplyTransaction(p.config, p.bc, nil, gp, statedb, header, tx, totalUsedGas)
			if errs != nil {
				types.Deletesynsinger(synsigner, tx)
				return nil, nil, nil, errs
		} else {
			receipt, _, errs = ApplyTransactionNonContract(p.config, p.bc, nil, gp, statedb, header, tx, totalUsedGas)
			if errs != nil {
				types.Deletesynsinger(synsigner, tx)
				return nil, nil, nil, errs

		receipts = append(receipts, receipt)
		allLogs = append(allLogs, receipt.Logs...)
	go func(txs []*types.Transaction) {
		//types.ASynSender(synsigner, nil)
		for _, tx := range txs {
			types.Deletesynsinger(synsigner, tx)
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
if _, errfinalize := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), receipts); nil != errfinalize {
	return nil, nil, nil, errfinalize

return receipts, allLogs, totalUsedGas, nil




1 是state。快速同步的before块是没有state的。而全量同步需要每一个块的state

2 是交易处理。快速同步不校验交易,全量同步同步需要校验每一个交易,而校验交易的基础就是state。


