以太坊源码分析之二节点的创建和启动

先看一下创建的代码:

func makeFullNode(ctx *cli.Context)*node.Node {

         stack,cfg := makeConfigNode(ctx)

         utils.RegisterEthService(stack,&cfg.Eth)

         ifctx.GlobalBool(utils.DashboardEnabledFlag.Name) {

                   utils.RegisterDashboardService(stack,&cfg.Dashboard, gitCommit)

         }

         //Whisper must be explicitly enabled by specifying at least 1 whisper flag or indev mode

         shhEnabled:= enableWhisper(ctx)

扫描二维码关注公众号,回复: 1046707 查看本文章

         shhAutoEnabled:= !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) &&ctx.GlobalIsSet(utils.DeveloperFlag.Name)

         ifshhEnabled || shhAutoEnabled {

                   ifctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {

                            cfg.Shh.MaxMessageSize= uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))

                   }

                   ifctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {

                            cfg.Shh.MinimumAcceptedPOW= ctx.Float64(utils.WhisperMinPOWFlag.Name)

                   }

                   utils.RegisterShhService(stack,&cfg.Shh)

         }

         //Add the Ethereum Stats daemon if requested.

         ifcfg.Ethstats.URL != "" {

                   utils.RegisterEthStatsService(stack,cfg.Ethstats.URL)

         }

         returnstack

}

这个函数写得还是比较容易看明白,开头就两点抛出来,创建节点,注册服务(后面的Whisper可以暂时忽略):

func makeConfigNode(ctx *cli.Context)(*node.Node, gethConfig) {

         //Load defaults.

         cfg:= gethConfig{

                   Eth:       eth.DefaultConfig,

                   Shh:       whisper.DefaultConfig,

                   Node:      defaultNodeConfig(),

                   Dashboard:dashboard.DefaultConfig,

         }

         //Load config file.

         iffile := ctx.GlobalString(configFileFlag.Name); file != "" {

                   iferr := loadConfig(file, &cfg); err != nil {

                            utils.Fatalf("%v",err)

                   }

         }

         //Apply flags.

         utils.SetNodeConfig(ctx,&cfg.Node)

         stack,err := node.New(&cfg.Node)

         iferr != nil {

                   utils.Fatalf("Failedto create the protocol stack: %v", err)

         }

         utils.SetEthConfig(ctx,stack, &cfg.Eth)

         ifctx.GlobalIsSet(utils.EthStatsURLFlag.Name) {

                   cfg.Ethstats.URL= ctx.GlobalString(utils.EthStatsURLFlag.Name)

         }

         utils.SetShhConfig(ctx,stack, &cfg.Shh)

         utils.SetDashboardConfig(ctx,&cfg.Dashboard)

         returnstack, cfg

}

先得看啥地方创建了节点:

stack, err := node.New(&cfg.Node)

它前面的一些加载配置,设置相关参数啥的都忽略,先进去这个看看:

func New(conf *Config) (*Node, error) {

         //Copy config and resolve the datadir so future changes to the current

         //working directory don't affect the node.

         confCopy:= *conf

         conf= &confCopy

         ifconf.DataDir != "" {

                   absdatadir,err := filepath.Abs(conf.DataDir)

                   iferr != nil {

                            returnnil, err

                   }

                   conf.DataDir= absdatadir

         }

         //Ensure that the instance name doesn't cause weird conflicts with

         //other files in the data directory.

         ifstrings.ContainsAny(conf.Name, `/\`) {

                   returnnil, errors.New(`Config.Name must not contain '/' or '\'`)

         }

         ifconf.Name == datadirDefaultKeyStore {

                   returnnil, errors.New(`Config.Name cannot be "` + datadirDefaultKeyStore +`"`)

         }

         ifstrings.HasSuffix(conf.Name, ".ipc") {

                   returnnil, errors.New(`Config.Name cannot end in ".ipc"`)

         }

         //Ensure that the AccountManager method works before the node has started.

         //We rely on this in cmd/geth.

         am,ephemeralKeystore, err := makeAccountManager(conf)

         iferr != nil {

                   returnnil, err

         }

         ifconf.Logger == nil {

                   conf.Logger= log.New()

         }

         //Note: any interaction with Config that would create/touch files

         //in the data directory or instance directory is delayed until Start.

         return&Node{

                   accman:            am,

                   ephemeralKeystore:ephemeralKeystore,

                   config:            conf,

                   serviceFuncs:      []ServiceConstructor{},

                   ipcEndpoint:       conf.IPCEndpoint(),

                   httpEndpoint:      conf.HTTPEndpoint(),

                   wsEndpoint:        conf.WSEndpoint(),

                   eventmux:          new(event.TypeMux),

                   log:               conf.Logger,

         },nil

}

果然是各种判断配置,然后在最后请出了Node大神。不过,千万不要忘记前边的一行代码:

         //Ensure that the AccountManager method works before the node has started.

         //We rely on this in cmd/geth.

         am,ephemeralKeystore, err := makeAccountManager(conf)

这个是创建帐户的,没它,这事儿就多了。所以还得先看看它,别急着向下走:

func makeAccountManager(conf *Config)(*accounts.Manager, string, error) {

         scryptN,scryptP, keydir, err := conf.AccountConfig()

         varephemeral string

         ifkeydir == "" {

                   //There is no datadir.

                   keydir,err = ioutil.TempDir("", "go-ethereum-keystore")

                   ephemeral= keydir

         }

         iferr != nil {

                   returnnil, "", err

         }

         iferr := os.MkdirAll(keydir, 0700); err != nil {

                   returnnil, "", err

         }

         //Assemble the account manager and supported backends

         backends:= []accounts.Backend{

                   keystore.NewKeyStore(keydir,scryptN, scryptP),

         }

         if!conf.NoUSB {

                   //Start a USB hub for Ledger hardware wallets

                   ifledgerhub, err := usbwallet.NewLedgerHub(); err != nil {

                            log.Warn(fmt.Sprintf("Failedto start Ledger hub, disabling: %v", err))

                   }else {

                            backends= append(backends, ledgerhub)

                   }

                   //Start a USB hub for Trezor hardware wallets

                   iftrezorhub, err := usbwallet.NewTrezorHub(); err != nil {

                            log.Warn(fmt.Sprintf("Failedto start Trezor hub, disabling: %v", err))

                   }else {

                            backends= append(backends, trezorhub)

                   }

         }

         returnaccounts.NewManager(backends...), ephemeral, nil

}

这时候前面的忽略的掉的各种配置应该在这里非常有用了,路径,名字啥的,假设都正确,则看一下:

func NewKeyStore(keydir string, scryptN,scryptP int) *KeyStore {

         keydir,_ = filepath.Abs(keydir)

         ks:= &KeyStore{storage: &keyStorePassphrase{keydir, scryptN, scryptP}}

         ks.init(keydir)

         returnks

}

看一下重点:

func (ks *KeyStore) init(keydir string) {

         //Lock the mutex since the account cache might call back with events

         ks.mu.Lock()

         deferks.mu.Unlock()

         //Initialize the set of unlocked keys and the account cache

         ks.unlocked= make(map[common.Address]*unlocked)

         ks.cache,ks.changes = newAccountCache(keydir)

         //TODO: In order for this finalizer to work, there must be no references

         //to ks. addressCache doesn't keep a reference but unlocked keys do,

         //so the finalizer will not trigger until all timed unlocks have expired.

         runtime.SetFinalizer(ks,func(m *KeyStore) {

                   m.cache.close()

         })

         //Create the initial list of wallets from the cache

         accs:= ks.cache.accounts()

         ks.wallets= make([]accounts.Wallet, len(accs))

         fori := 0; i < len(accs); i++ {

                   ks.wallets[i]= &keystoreWallet{account: accs[i], keystore: ks}

         }

}

看到cache没,缓存,也就是说大众套路,先从缓存找,找不到就从路径了找,路径是哪个,就是前边忽略的那些。有兴趣大家可以看一下,下断点一跟就知道了。值得一提得是在newAccountCache创建时,同时创建了一个newWatcher用来监视它。

后面就知道它有用了。接着看:

func (ac *accountCache) accounts()[]accounts.Account {

         ac.maybeReload()

         ac.mu.Lock()

         deferac.mu.Unlock()

         cpy:= make([]accounts.Account, len(ac.all))

         copy(cpy,ac.all)

         returncpy

}

没有帐户数据就加载一份,然后做个备份。常用打法,不新奇。然后看加载的过程,只是要注意ac.watcher.start中是一个goroutine.

func (ac *accountCache) maybeReload() {

         ac.mu.Lock()

         ifac.watcher.running {

                   ac.mu.Unlock()

                   return// A watcher is running and will keep the cache up-to-date.

         }

         ifac.throttle == nil {

                   ac.throttle= time.NewTimer(0)

         }else {

                   select{

                   case<-ac.throttle.C:

                   default:

                            ac.mu.Unlock()

                            return// The cache was reloaded recently.

                   }

         }

         //No watcher running, start it.

         ac.watcher.start()

         ac.throttle.Reset(minReloadInterval)

         ac.mu.Unlock()

         ac.scanAccounts()

}

最后,ac.scanAccounts(),扫描一下,这个和以前的版本有些细节的变化:

// scanAccounts checks if any changes haveoccurred on the filesystem, and

// updates the account cache accordingly

func (ac *accountCache) scanAccounts()error {

         //Scan the entire folder metadata for file changes

         creates,deletes, updates, err := ac.fileC.scan(ac.keydir)

         iferr != nil {

                   log.Debug("Failedto reload keystore contents", "err", err)

                   returnerr

         }

         ifcreates.Size() == 0 && deletes.Size() == 0 && updates.Size() ==0 {

                   returnnil

         }

         //Create a helper method to scan the contents of the key files

         var(

                   buf= new(bufio.Reader)

                   keystruct {

                            Addressstring `json:"address"`

                   }

         )

         readAccount:= func(path string) *accounts.Account {

                   fd,err := os.Open(path)

                   iferr != nil {

                            log.Trace("Failedto open keystore file", "path", path, "err", err)

                            returnnil

                   }

                   deferfd.Close()

                   buf.Reset(fd)

                   //Parse the address.

                   key.Address= ""

                   err= json.NewDecoder(buf).Decode(&key)

                   addr:= common.HexToAddress(key.Address)

                   switch{

                   caseerr != nil:

                            log.Debug("Failedto decode keystore key", "path", path, "err", err)

                   case(addr == common.Address{}):

                            log.Debug("Failedto decode keystore key", "path", path, "err","missing or zero address")

                   default:

                            return&accounts.Account{Address: addr, URL: accounts.URL{Scheme: KeyStoreScheme,Path: path}}

                   }

                   returnnil

         }

         //Process all the file diffs

         start:= time.Now()

         for_, p := range creates.List() {

                   ifa := readAccount(p.(string)); a != nil {

                            ac.add(*a)

                   }

         }

         for_, p := range deletes.List() {

                   ac.deleteByFile(p.(string))

         }

         for_, p := range updates.List() {

                   path:= p.(string)

                   ac.deleteByFile(path)

                   ifa := readAccount(path); a != nil {

                            ac.add(*a)

                   }

         }

         end:= time.Now()

         select{

         caseac.notify <- struct{}{}:

         default:

         }

         log.Trace("Handledkeystore changes", "time", end.Sub(start))

         returnnil

}

然后再调用:

// scan performs a new scan on the givendirectory, compares against the already

// cached filenames, and returns file sets:creates, deletes, updates.

func (fc *fileCache) scan(keyDir string)(set.Interface, set.Interface, set.Interface, error) {

         t0:= time.Now()

         //List all the failes from the keystore folder

         files,err := ioutil.ReadDir(keyDir)

         iferr != nil {

                   returnnil, nil, nil, err

         }

         t1:= time.Now()

         fc.mu.Lock()

         deferfc.mu.Unlock()

         //Iterate all the files and gather their metadata

         all:= set.NewNonTS()

         mods:= set.NewNonTS()

         varnewLastMod time.Time

         for_, fi := range files {

                   //Skip any non-key files from the folder

                   path:= filepath.Join(keyDir, fi.Name())

                   ifskipKeyFile(fi) {

                            log.Trace("Ignoringfile on account scan", "path", path)

                            continue

                   }

                   //Gather the set of all and fresly modified files

                   all.Add(path)

                   modified:= fi.ModTime()

                   ifmodified.After(fc.lastMod) {

                            mods.Add(path)

                   }

                   ifmodified.After(newLastMod) {

                            newLastMod= modified

                   }

         }

         t2:= time.Now()

         //Update the tracked files and return the three sets

         deletes:= set.Difference(fc.all, all)   //Deletes = previous - current

         creates:= set.Difference(all, fc.all)   //Creates = current - previous

         updates:= set.Difference(mods, creates) // Updates = modified - creates

         fc.all,fc.lastMod = all, newLastMod

         t3:= time.Now()

         //Report on the scanning stats and return

         log.Debug("FSscan times", "list", t1.Sub(t0), "set", t2.Sub(t1),"diff", t3.Sub(t2))

         returncreates, deletes, updates, nil

}

到启动时指定的加载目录上找keystore文件,这里应该就是data0了,json解析一下就可以了。OK,基本到这儿,给个例子:

{"address":"5799bfc65e0166b78618d09c258ae14806b676cd","crypto":{"cipher":"aes-128-ctr","ciphertext":"904ddeb515cc0351da30b35493afdf4ccbf3aa6634439811b2af3084267f69fb","cipherparams":{"iv":"176c50ff24ab7267d0cc0b83de3e2448"},"kdf":"scrypt","kdfparams":{"dklen":32,"n":262144,"p":1,"r":8,"salt":"b42a9e24f0d0a1dea23f964eff3784104cd606bbbc0e477c1bcde1812fd90bae"},"mac":"be3dbba1082d6b0be613f4bec3fe978841807055593545547f82c0b29eaf5449"},"id":"da501940-d401-4d0b-b074-6811907c36a0","version":3}

再看一下启动的代码:

func startNode(ctx *cli.Context, stack*node.Node) {

         //Start up the node itself

         utils.StartNode(stack)

         //Unlock any account specifically requested

         ks:=stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)

         passwords:= utils.MakePasswordList(ctx)

         unlocks:= strings.Split(ctx.GlobalString(utils.UnlockedAccountFlag.Name),",")

         fori, account := range unlocks {

                   iftrimmed := strings.TrimSpace(account); trimmed != "" {

                            unlockAccount(ctx,ks, trimmed, i, passwords)

                   }

         }

         //Register wallet event handlers to open and auto-derive wallets

         events:= make(chan accounts.WalletEvent, 16)

         stack.AccountManager().Subscribe(events)

         gofunc() {

                   //Create an chain state reader for self-derivation

                   rpcClient,err := stack.Attach()

                   iferr != nil {

                            utils.Fatalf("Failedto attach to self: %v", err)

                   }

                   stateReader:= ethclient.NewClient(rpcClient)

                   //Open any wallets already attached

                   for_, wallet := range stack.AccountManager().Wallets() {

                            iferr := wallet.Open(""); err != nil {

                                     log.Warn("Failedto open wallet", "url", wallet.URL(), "err", err)

                            }

                   }

                   //Listen for wallet event till termination

                   forevent := range events {

                            switchevent.Kind {

                            caseaccounts.WalletArrived:

                                     iferr := event.Wallet.Open(""); err != nil {

                                               log.Warn("Newwallet appeared, failed to open", "url", event.Wallet.URL(),"err", err)

                                     }

                            caseaccounts.WalletOpened:

                                     status,_ := event.Wallet.Status()

                                     log.Info("Newwallet appeared", "url", event.Wallet.URL(), "status",status)

                                     ifevent.Wallet.URL().Scheme == "ledger" {

                                               event.Wallet.SelfDerive(accounts.DefaultLedgerBaseDerivationPath,stateReader)

                                     }else {

                                               event.Wallet.SelfDerive(accounts.DefaultBaseDerivationPath,stateReader)

                                     }

                            caseaccounts.WalletDropped:

                                     log.Info("Oldwallet dropped", "url", event.Wallet.URL())

                                     event.Wallet.Close()

                            }

                   }

         }()

         //Start auxiliary services if enabled

         ifctx.GlobalBool(utils.MiningEnabledFlag.Name) ||ctx.GlobalBool(utils.DeveloperFlag.Name) {

                   //Mining only makes sense if a full Ethereum node is running

                   ifctx.GlobalBool(utils.LightModeFlag.Name) ||ctx.GlobalString(utils.SyncModeFlag.Name) == "light" {

                            utils.Fatalf("Lightclients do not support mining")

                   }

                   varethereum *eth.Ethereum

                   iferr := stack.Service(&ethereum); err != nil {

                            utils.Fatalf("Ethereumservice not running: %v", err)

                   }

                   //Use a reduced number of threads if requested

                   ifthreads := ctx.GlobalInt(utils.MinerThreadsFlag.Name); threads > 0 {

                            typethreaded interface {

                                     SetThreads(threadsint)

                            }

                            ifth, ok := ethereum.Engine().(threaded); ok {

                                     th.SetThreads(threads)

                            }

                   }

                   //Set the gas price to the limits from the CLI and start mining

                   ethereum.TxPool().SetGasPrice(utils.GlobalBig(ctx,utils.GasPriceFlag.Name))

                   iferr := ethereum.StartMining(true); err != nil {

                            utils.Fatalf("Failedto start mining: %v", err)

                   }

         }

}

它用调用:

func StartNode(stack *node.Node) {

         iferr := stack.Start(); err != nil {

                   Fatalf("Errorstarting protocol stack: %v", err)

         }

         gofunc() {

                   sigc:= make(chan os.Signal, 1)

                   signal.Notify(sigc,os.Interrupt)

                   defersignal.Stop(sigc)

                   <-sigc

                   log.Info("Gotinterrupt, shutting down...")

                   gostack.Stop()

                   fori := 10; i > 0; i-- {

                            <-sigc

                            ifi > 1 {

                                     log.Warn("Alreadyshutting down, interrupt more to panic.", "times", i-1)

                            }

                   }

                   debug.Exit()// ensure trace and CPU profile data is flushed.

                   debug.LoudPanic("boom")

         }()

}

最终调用:

// Start create a live P2P node and startsrunning it.

func (n *Node) Start() error {

         n.lock.Lock()

         defern.lock.Unlock()

         //Short circuit if the node's already running

         ifn.server != nil {

                   returnErrNodeRunning

         }

         iferr := n.openDataDir(); err != nil {

                   returnerr

         }

         //Initialize the p2p server. This creates the node key and

         //discovery databases.

         n.serverConfig= n.config.P2P

         n.serverConfig.PrivateKey= n.config.NodeKey()

         n.serverConfig.Name= n.config.NodeName()

         n.serverConfig.Logger= n.log

         ifn.serverConfig.StaticNodes == nil {

                   n.serverConfig.StaticNodes= n.config.StaticNodes()

         }

         ifn.serverConfig.TrustedNodes == nil {

                   n.serverConfig.TrustedNodes= n.config.TrustedNodes()

         }

         ifn.serverConfig.NodeDatabase == "" {

                   n.serverConfig.NodeDatabase= n.config.NodeDB()

         }

         running:= &p2p.Server{Config: n.serverConfig}

         n.log.Info("Startingpeer-to-peer node", "instance", n.serverConfig.Name)

         //Otherwise copy and specialize the P2P configuration

         services:= make(map[reflect.Type]Service)

         for_, constructor := range n.serviceFuncs {

                   //Create a new context for the particular service

                   ctx:= &ServiceContext{

                            config:         n.config,

                            services:       make(map[reflect.Type]Service),

                            EventMux:       n.eventmux,

                            AccountManager:n.accman,

                   }

                   forkind, s := range services { // copy needed for threaded access

                            ctx.services[kind]= s

                   }

                   //Construct and save the service

                   service,err := constructor(ctx)

                   iferr != nil {

                            returnerr

                   }

                   kind:= reflect.TypeOf(service)

                   if_, exists := services[kind]; exists {

                            return&DuplicateServiceError{Kind: kind}

                   }

                   services[kind]= service

         }

         //Gather the protocols and start the freshly assembled P2P server

         for_, service := range services {

                   running.Protocols= append(running.Protocols, service.Protocols()...)

         }

         iferr := running.Start(); err != nil {

                   returnconvertFileLockError(err)

         }

         //Start each of the services

         started:= []reflect.Type{}

         forkind, service := range services {

                   //Start the next service, stopping all previous upon failure

                   iferr := service.Start(running); err != nil {

                            for_, kind := range started {

                                     services[kind].Stop()

                            }

                            running.Stop()

                            returnerr

                   }

                   //Mark the service started for potential cleanup

                   started= append(started, kind)

         }

         //Lastly start the configured RPC interfaces

         iferr := n.startRPC(services); err != nil {

                   for_, service := range services {

                            service.Stop()

                   }

                   running.Stop()

                   returnerr

         }

         //Finish initializing the startup

         n.services= services

         n.server= running

         n.stop= make(chan struct{})

         returnnil

}

节点的作用最主要的是什么,是要能通信,以后所以的功能都是基于这个前提的,不然去中心化也没法搞成。所以要建立P2P通信服务和RPC的通信服务。以太坊维护网络使用的是UDP协议,数据交互等仍然使用的TCP。RPC当然是HTTP、Websocket等了。

// Start starts running the server.

// Servers can not be re-used afterstopping.

func (srv *Server) Start() (err error) {

         srv.lock.Lock()

         defersrv.lock.Unlock()

         ifsrv.running {

                   returnerrors.New("server already running")

         }

         srv.running= true

         srv.log= srv.Config.Logger

         ifsrv.log == nil {

                   srv.log= log.New()

         }

         srv.log.Info("StartingP2P networking")

         //static fields

         ifsrv.PrivateKey == nil {

                   returnfmt.Errorf("Server.PrivateKey must be set to a non-nil key")

         }

         ifsrv.newTransport == nil {

                   srv.newTransport= newRLPX

         }

         ifsrv.Dialer == nil {

                   srv.Dialer= TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}

         }

         srv.quit= make(chan struct{})

         srv.addpeer= make(chan *conn)

         srv.delpeer= make(chan peerDrop)

         srv.posthandshake= make(chan *conn)

         srv.addstatic= make(chan *discover.Node)

         srv.removestatic= make(chan *discover.Node)

         srv.peerOp= make(chan peerOpFunc)

         srv.peerOpDone= make(chan struct{})

         var(

                   conn      *net.UDPConn

                   sconn     *sharedUDPConn

                   realaddr  *net.UDPAddr

                   unhandledchan discover.ReadPacket

         )

         if!srv.NoDiscovery || srv.DiscoveryV5 {

                   addr,err := net.ResolveUDPAddr("udp", srv.ListenAddr)

                   iferr != nil {

                            returnerr

                   }

                   conn,err = net.ListenUDP("udp", addr)

                   iferr != nil {

                            returnerr

                   }

                   realaddr= conn.LocalAddr().(*net.UDPAddr)

                   ifsrv.NAT != nil {

                            if!realaddr.IP.IsLoopback() {

                                     gonat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port,"ethereum discovery")

                            }

                            //TODO: react to external IP changes over time.

                            ifext, err := srv.NAT.ExternalIP(); err == nil {

                                     realaddr= &net.UDPAddr{IP: ext, Port: realaddr.Port}

                            }

                   }

         }

         if!srv.NoDiscovery && srv.DiscoveryV5 {

                   unhandled= make(chan discover.ReadPacket, 100)

                   sconn= &sharedUDPConn{conn, unhandled}

         }

         //node table

         if!srv.NoDiscovery {

                   cfg:= discover.Config{

                            PrivateKey:   srv.PrivateKey,

                            AnnounceAddr:realaddr,

                            NodeDBPath:   srv.NodeDatabase,

                            NetRestrict:  srv.NetRestrict,

                            Bootnodes:    srv.BootstrapNodes,

                            Unhandled:    unhandled,

                   }

                   ntab,err := discover.ListenUDP(conn, cfg)

                   iferr != nil {

                            returnerr

                   }

                   srv.ntab= ntab

         }

         ifsrv.DiscoveryV5 {

                   var(

                            ntab*discv5.Network

                            err  error

                   )

                   ifsconn != nil {

                            ntab,err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "",srv.NetRestrict) //srv.NodeDatabase)

                   }else {

                            ntab,err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "",srv.NetRestrict) //srv.NodeDatabase)

                   }

                   iferr != nil {

                            returnerr

                   }

                   iferr := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {

                            returnerr

                   }

                   srv.DiscV5= ntab

         }

         dynPeers:= srv.maxDialedConns()

         dialer:= newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers,srv.NetRestrict)

         //handshake

         srv.ourHandshake= &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID:discover.PubkeyID(&srv.PrivateKey.PublicKey)}

         for_, p := range srv.Protocols {

                   srv.ourHandshake.Caps= append(srv.ourHandshake.Caps, p.cap())

         }

         //listen/dial

         ifsrv.ListenAddr != "" {

                   iferr := srv.startListening(); err != nil {

                            returnerr

                   }

         }

         ifsrv.NoDial && srv.ListenAddr == "" {

                   srv.log.Warn("P2Pserver will be useless, neither dialing nor listening")

         }

         srv.loopWG.Add(1)

         gosrv.run(dialer)

         srv.running= true

         returnnil

}

这里面先是建立一个UDP连接,又开始了TCP的监听;UDP负责管理网络中结点的发现以及维护,模块能够直接和临近结点交换各自已知结点信息,从而不断的更新结点网络.用nat来进行TCP端口映射,而nat主要是利用upnp和pmp两个协议,如果没有,那就认为指定的ip就是公网ip了.网络还会同时在udp和tcp的30303端口和p2p网络进行数据交换,前者是用来维护P2p网络,后者则是各种应用协议真正交换数据的地方.

再看RPC的启动:

// startRPC is a helper method to start allthe various RPC endpoint during node

// startup. It's not meant to be called atany time afterwards as it makes certain

// assumptions about the state of the node.

func (n *Node) startRPC(servicesmap[reflect.Type]Service) error {

         //Gather all the possible APIs to surface

         apis:= n.apis()

         for_, service := range services {

                   apis= append(apis, service.APIs()...)

         }

         //Start the various API endpoints, terminating all in case of errors

         iferr := n.startInProc(apis); err != nil {

                   returnerr

         }

         iferr := n.startIPC(apis); err != nil {

                   n.stopInProc()

                   returnerr

         }

         iferr := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules,n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil {

                   n.stopIPC()

                   n.stopInProc()

                   returnerr

         }

         iferr := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins,n.config.WSExposeAll); err != nil {

                   n.stopHTTP()

                   n.stopIPC()

                   n.stopInProc()

                   returnerr

         }

         //All API endpoints started successfully

         n.rpcAPIs= apis

         returnnil

}

到这儿,网络也起来了。如果有通信,各种数据交换就开始了。后面的其它部分,都是辅助的,这里不再赘述。

猜你喜欢

转载自blog.csdn.net/fpcc/article/details/80379836