Filecoin技术架构分析之十五:节点运行逻辑

[复制链接]
12941 |0
发表于 2019-5-9 11:59:57 | 显示全部楼层 |阅读模式
目录15.filecoin源码分析之节点运行逻辑
15.2.1 基本数据结构
15.2.2 创建filecoin节点实例
15.2.3 启动及停止filecoin节点
15.2.4 启动及停止挖矿
15.1 前提
15.2 filecoin节点运行逻辑简析
15.3 阶段性分析结束说明
分析基于的源码版本:go-filecoin master a0598a54(2019年3月9日)
15.1 前提我们在前面的章节已经经过了三个阶段的梳理分析
概念阶段,包括概念、通用语言理解、开发网络使用
顶层架构与概念的结合理解
具体源码的简析,包括协议层、支撑包、内部api层、服务层
源码部分的command部分比较容易理解,就不单独文章赘述了,基本与内部api层都可以对应起来
现在再来看节点的运行逻辑应该会更加清晰了
15.2 filecoin节点运行逻辑简析15.2.1 基本数据结构▼package
node
▶imports
▼variables
+ErrNoMinerAddress
-filecoinDHTProtocol:dhtprotocol.ID
-log
//创建具体的filecoin节点实例▼+Config:struct
[fields]
//设置区块时间
+BlockTime:time.Duration//配置节点是否转发
+IsRelay:bool
//libp2p选项
+Libp2pOpts:[]libp2p.Option//在离线模式下,会关闭libp2p
+OfflineMode:bool
//配置资源
+Repo:repo.Repo//配置区块奖励方法
+Rewarder:consensus.BlockRewarder//配置节点时空证明校验函数
+Verifier:proofs.Verifier
[methods]//创建node实例
+Build(ctxcontext.Context):*Node,error
-buildHost(ctxcontext.Context,makeDHTfunc(hosthost.Host)routing.IpfsRouting,error):host.Host,error
+ConfigOpt:func(*Config)error
▼+Node:struct
[fields]
//确认最新区块,本地持久化并广播
+AddNewlyMinedBlock:newBlockFunc//订阅主题"/fil/blocks"
+BlockSub:pubsub.Subscription//块服务接口
+Blockstore:bstore.Blockstore//维持相关节点连接
+Bootstrapper:*net.Bootstrapper//读取区块信息
+ChainReader:chain.ReadStore//同时协议
+Consensus:consensus.Protocol//块交换,节点间的数据交换
+Exchange:exchange.Interface//new-head主题
+HeaviestTipSetCh:chaninterface{}//新区块处理请求
+HeaviestTipSetHandled:func()//hello服务
+HelloSvc:*hello.Handler//消息订阅
+MessageSub:pubsub.Subscription//挖矿调度
+MiningScheduler:mining.Scheduler//消息池操作
+MsgPool:*core.MessagePool//离线模式
+OfflineMode:bool
+OnlineStore:*hamt.CborIpldStore//对应libp2p中的host
+PeerHost:host.Host//libp2p中的pingservice
+Ping:*ping.PingService//高层api
+PorcelainAPI:*porcelain.API//功率表
+PowerTable:consensus.PowerTableView//配置资源
+Repo:repo.Repo//检索客户端
+RetrievalClient:*retrieval.Client//检索矿工
+RetrievalMiner:*retrieval.Miner//路由,libp2p
+Router:routing.IpfsRouting//存储矿工
+StorageMiner:*storage.Miner//存储客户
+StorageMinerClient:*storage.Client//链同步
+Syncer:chain.Syncer//钱包管理
+Wallet:*wallet.Wallet
-blockTime:time.Duration
-blockservice:bserv.BlockService
-cancelMining:context.CancelFunc
-cancelSubscriptionsCtx:context.CancelFunc
-cborStore:*hamt.CborIpldStore
-host:host.Host
-lookup:lookup.PeerLookupService
-mining
-miningCtx:context.Context
-miningDoneWg:*sync.WaitGroup
-sectorBuilder:sectorbuilder.SectorBuilder
[methods]
+BlockHeight():*types.BlockHeight,error
+BlockService():bserv.BlockService
+CborStore():*hamt.CborIpldStore
+ChainReadStore():chain.ReadStore//创建矿工方法
+CreateMiner(ctxcontext.Context,accountAddraddress.Address,gasPricetypes.AttoFIL,gasLimittypes.GasUnits,pledgeuint64,pidlibp2ppeer.ID,collateral*types.AttoFIL):*address.Address,error
+GetBlockTime():time.Duration
+Host():host.Host//节点查找方法
+Lookup():lookup.PeerLookupService
+MiningSignerAddress():address.Address
+MiningTimes():time.Duration,time.Duration//创建新的account地址,钱包地址
+NewAddress():address.Address,error
+SectorBuilder():sectorbuilder.SectorBuilder
+SetBlockTime(blockTimetime.Duration)//启动节点
+Start(ctxcontext.Context):error//启动挖矿
+StartMining(ctxcontext.Context):error//停止节点
+Stop(ctxcontext.Context)//停止挖矿
+StopMining(ctxcontext.Context)
-addNewlyMinedBlock(ctxcontext.Context,b*types.Block)
-cancelSubscriptions()
-getLastUsedSectorID(ctxcontext.Context,minerAddraddress.Address):uint64,error
-getMinerActorPubKey():[]byte,error
-handleNewHeaviestTipSet(ctxcontext.Context,headtypes.TipSet)
-handleNewMiningOutput(miningOutChchanmining.Output)
-handleSubscription(ctxcontext.Context,fpubSubProcessorFunc,fnamestring,spubsub.Subscription,snamestring)
-isMining():bool
-miningAddress():address.Address,error
-miningOwnerAddress(ctxcontext.Context,miningAddraddress.Address):address.Address,error
-saveMinerConfig(minerAddraddress.Address,signerAddraddress.Address):error
-setIsMining(isMiningbool)
-setupHeartbeatServices(ctxcontext.Context):error
-setupMining(ctxcontext.Context):error
[functions]//调用Build创建node实例
+New(ctxcontext.Context,opts...ConfigOpt):*Node,error
▼-blankValidator:struct
[methods]
+Select(_string,_[][]byte):int,error
+Validate(_string,_[]byte):error
-newBlockFunc:func(context.Context,*types.Block)
-pubSubProcessorFunc:func(ctxcontext.Context,msgpubsub.Message)error
▼functions
+BlockTime(blockTimetime.Duration):ConfigOpt
+IsRelay():ConfigOpt
+Libp2pOptions(opts...libp2p.Option):ConfigOpt
+OfflineMode(offlineModebool):ConfigOpt
+RewarderConfigOption(rewarderconsensus.BlockRewarder):ConfigOpt
+StartMining(ctxcontext.Context,node*Node):error
+VerifierConfigOption(verifierproofs.Verifier):ConfigOpt
-initSectorBuilderForNode(ctxcontext.Context,node*Node,sectorStoreTypeproofs.SectorStoreType):sectorbuilder.SectorBuilder,error
-initStorageMinerForNode(ctxcontext.Context,node*Node):*storage.Miner,error
-readGenesisCid(dsdatastore.Datastore):cid.Cid,error
15.2.2 创建filecoin节点实例实例化filecoin节点,简析见如下添加的注释
//BuildinstantiatesafilecoinNodefromthesettingsspecifiedintheconfig.func(nc*Config)Build(ctxcontext.Context)(*Node,error){//创建内存资源实例
ifnc.Repo==nil{
nc.Repo=repo.NewInMemoryRepo()
}//创建块服务实例
bs:=bstore.NewBlockstore(nc.Repo.Datastore())
validator:=blankValidator{}
varpeerHosthost.Host
varrouterrouting.IpfsRouting//带宽统计实例,加入libp2popts
bandwidthTracker:=p2pmetrics.NewBandwidthCounter()
nc.Libp2pOpts=append(nc.Libp2pOpts,libp2p.BandwidthReporter(bandwidthTracker))//非离线模式才启用libp2p
if!nc.OfflineMode{
makeDHT:=func(hhost.Host)(routing.IpfsRouting,error){
r,err:=dht.New(
ctx,
h,
dhtopts.Datastore(nc.Repo.Datastore()),
dhtopts.NamespacedValidator("v",validator),
dhtopts.Protocols(filecoinDHTProtocol),
)iferr!=nil{returnnil,errors.Wrap(err,"failedtosetuprouting")
}
router=rreturnr,err
}
varerrerror//实例化非离线模式libp2phost
peerHost,err=nc.buildHost(ctx,makeDHT)iferr!=nil{returnnil,err
}
}else{//离线模式处理
router=offroute.NewOfflineRouter(nc.Repo.Datastore(),validator)
peerHost=rhost.Wrap(noopLibP2PHost{},router)
}//ping服务实例
//setuppinger
pinger:=ping.NewPingService(peerHost)//bitswap实例
//setupbitswap
nwork:=bsnet.NewFromIpfsHost(peerHost,router)//nwork:=bsnet.NewFromIpfsHost(innerHost,router)
bswap:=bitswap.New(ctx,nwork,bs)
bservice:=bserv.New(bs,bswap)
cstOnline:=hamt.CborIpldStore{Blocks:bservice}
cstOffline:=hamt.CborIpldStore{Blocks:bserv.New(bs,offline.Exchange(bs))}//获取创世块cid
genCid,err:=readGenesisCid(nc.Repo.Datastore())iferr!=nil{returnnil,err
}//chain.Store实例以及功率表
varchainStorechain.Store=chain.NewDefaultStore(nc.Repo.ChainDatastore(),&cstOffline,genCid)
powerTable:=&consensus.MarketView{}//共识协议processor实例
varprocessorconsensus.Processorifnc.Rewarder==nil{
processor=consensus.NewDefaultProcessor()
}else{
processor=consensus.NewConfiguredProcessor(consensus.NewDefaultMessageValidator(),nc.Rewarder)
}//共识协议实例
varnodeConsensusconsensus.Protocolifnc.Verifier==nil{
nodeConsensus=consensus.NewExpected(&cstOffline,bs,processor,powerTable,genCid,&proofs.RustVerifier{})
}else{
nodeConsensus=consensus.NewExpected(&cstOffline,bs,processor,powerTable,genCid,nc.Verifier)
}//链同步,链读取,消息池实例
//onlythesyncergetsthestoragewhichisonlineconnected
chainSyncer:=chain.NewDefaultSyncer(&cstOnline,&cstOffline,nodeConsensus,chainStore)
chainReader,ok:=chainStore.(chain.ReadStore)if!ok{returnnil,errors.New("failedtocastchain.Storetochain.ReadStore")
}
msgPool:=core.NewMessagePool()//Setuplibp2ppubsub
fsub,err:=libp2pps.NewFloodSub(ctx,peerHost)iferr!=nil{returnnil,errors.Wrap(err,"failedtosetuppubsub")
}//钱包服务实例
backend,err:=wallet.NewDSBackend(nc.Repo.WalletDatastore())iferr!=nil{returnnil,errors.Wrap(err,"failedtosetupwalletbackend")
}
fcWallet:=wallet.New(backend)//实例化高层api
PorcelainAPI:=porcelain.New(plumbing.New(&plumbing.APIDeps{
Chain:chainReader,
Config:cfg.NewConfig(nc.Repo),
Deals:strgdls.New(nc.Repo.DealsDatastore()),
MsgPool:msgPool,
MsgPreviewer:msg.NewPreviewer(fcWallet,chainReader,&cstOffline,bs),
MsgQueryer:msg.NewQueryer(nc.Repo,fcWallet,chainReader,&cstOffline,bs),
MsgSender:msg.NewSender(fcWallet,chainReader,msgPool,consensus.NewOutboundMessageValidator(),fsub.Publish),
MsgWaiter:msg.NewWaiter(chainReader,bs,&cstOffline),
Network:net.New(peerHost,pubsub.NewPublisher(fsub),pubsub.NewSubscriber(fsub),net.NewRouter(router),bandwidthTracker),
SigGetter:mthdsig.NewGetter(chainReader),
Wallet:fcWallet,
}))//实例化node
nd:=&Node{
blockservice:bservice,
Blockstore:bs,
cborStore:&cstOffline,
OnlineStore:&cstOnline,
Consensus:nodeConsensus,
ChainReader:chainReader,
Syncer:chainSyncer,
PowerTable:powerTable,
PorcelainAPI:PorcelainAPI,
Exchange:bswap,
host:peerHost,
MsgPool:msgPool,
OfflineMode:nc.OfflineMode,
PeerHost:peerHost,
Ping:pinger,
Repo:nc.Repo,
Wallet:fcWallet,
blockTime:nc.BlockTime,
Router:router,
}//Bootstrappingnetworkpeers.
periodStr:=nd.Repo.Config().Bootstrap.Period
period,err:=time.ParseDuration(periodStr)iferr!=nil{returnnil,errors.Wrapf(err,"couldn'tparsebootstrapperiod%s",periodStr)
}//实例化Bootstrapper,指定node的该方法
//Bootstrappermaintainsconnectionstosomesubsetofaddresses
ba:=nd.Repo.Config().Bootstrap.Addresses
bpi,err:=net.PeerAddrsToPeerInfos(ba)iferr!=nil{returnnil,errors.Wrapf(err,"couldn'tparsebootstrapaddresses[%s]",ba)
}
minPeerThreshold:=nd.Repo.Config().Bootstrap.MinPeerThreshold
nd.Bootstrapper=net.NewBootstrapper(bpi,nd.Host(),nd.Host().Network(),nd.Router,minPeerThreshold,period)//实例化链查找服务,指定node的该方法
//On-chainlookupservice
defaultAddressGetter:=func()(address.Address,error){returnnd.PorcelainAPI.GetAndMaybeSetDefaultSenderAddress()
}
nd.lookup=lookup.NewChainLookupService(nd.ChainReader,defaultAddressGetter,bs)returnnd,nil}
15.2.3 启动及停止filecoin节点启动filecoin节点的流程概览
//Startbootsupthenode.func(node*Node)Start(ctxcontext.Context)error{//加载本地chain信息
iferr:=node.ChainReader.Load(ctx);err!=nil{returnerr
}//如果存在存储矿工,配置挖矿功能
//Onlysettheseupifthereisaminerconfigured.
if_,err:=node.miningAddress();err==nil{iferr:=node.setupMining(ctx);err!=nil{
log.Errorf("setupminingfailed:%v",err)returnerr
}
}//设置链同步回调函数
//Startup'hello'handshakeservice
syncCallBack:=func(pidlibp2ppeer.ID,cids[]cid.Cid,heightuint64){//TODOitispossiblethesyncerinterfaceshouldbemodifiedto
//makeuseoftheadditionalcontextnotusedhere(fromaddr+height).
//Tokeepthingssimplefornowthisinfoisnotused.
err:=node.Syncer.HandleNewBlocks(context.Background(),cids)iferr!=nil{
log.Infof("errorhandlingblocks:%s",types.NewSortedCidSet(cids...).String())
}
}//实例化hello握手协议
node.HelloSvc=hello.New(node.Host(),node.ChainReader.GenesisCid(),syncCallBack,node.ChainReader.Head)//实例化存储矿工协议
cni:=storage.NewClientNodeImpl(dag.NewDAGService(node.BlockService()),node.Host(),node.GetBlockTime())
varerrerror
node.StorageMinerClient,err=storage.NewClient(cni,node.PorcelainAPI)iferr!=nil{returnerrors.Wrap(err,"Couldnotmakenewstorageclient")
}//实例化检索客户及检索矿工协议
node.RetrievalClient=retrieval.NewClient(node)
node.RetrievalMiner=retrieval.NewMiner(node)//订阅区块通知
//subscribetoblocknotifications
blkSub,err:=node.PorcelainAPI.PubSubSubscribe(BlockTopic)iferr!=nil{returnerrors.Wrap(err,"failedtosubscribetoblockstopic")
}
node.BlockSub=blkSub//订阅消息通知
//subscribetomessagenotifications
msgSub,err:=node.PorcelainAPI.PubSubSubscribe(msg.Topic)iferr!=nil{returnerrors.Wrap(err,"failedtosubscribetomessagetopic")
}
node.MessageSub=msgSub
cctx,cancel:=context.WithCancel(context.Background())
node.cancelSubscriptionsCtx=cancel//启用新线程订阅区块及消息主题,设置handle回调
gonode.handleSubscription(cctx,node.processBlock,"processBlock",node.BlockSub,"BlockSub")
gonode.handleSubscription(cctx,node.processMessage,"processMessage",node.MessageSub,"MessageSub")//启用新线程处理新的tipset事件
node.HeaviestTipSetHandled=func(){}
node.HeaviestTipSetCh=node.ChainReader.HeadEvents().Sub(chain.NewHeadTopic)
gonode.handleNewHeaviestTipSet(cctx,node.ChainReader.Head())//非离线模式启动bootstapper服务
if!node.OfflineMode{
node.Bootstrapper.Start(context.Background())
}//启动心跳服务
iferr:=node.setupHeartbeatServices(ctx);err!=nil{returnerrors.Wrap(err,"failedtostartheartbeatservices")
}returnnil}
停止filecoin节点的流程概览
释放资源,停止相关服务
//Stopinitiatestheshutdownofthenode.func(node*Node)Stop(ctxcontext.Context){
node.ChainReader.HeadEvents().Unsub(node.HeaviestTipSetCh)//停止挖矿
node.StopMining(ctx)//取消订阅
node.cancelSubscriptions()//停止链读取服务
node.ChainReader.Stop()//停止密封服务
ifnode.SectorBuilder()!=nil{iferr:=node.SectorBuilder().Close();err!=nil{
fmt.Printf("errorclosingsectorbuilder:%s\n",err)
}
node.sectorBuilder=nil
}//关闭host实例
iferr:=node.Host().Close();err!=nil{
fmt.Printf("errorclosinghost:%s\n",err)
}//关闭资源实例
iferr:=node.Repo.Close();err!=nil{
fmt.Printf("errorclosingrepo:%s\n",err)
}//关闭bootstqpper实例
node.Bootstrapper.Stop()
fmt.Println("stoppingfilecoin:(")
}
15.2.4 启动及停止挖矿启动挖矿
//StartMiningcausesthenodetostartfeedingblockstotheminingworkerandinitializes//theSectorBuilderfortheminingaddress.func(node*Node)StartMining(ctxcontext.Context)error{//如果在挖矿中,退出
ifnode.isMining(){returnerrors.New("Nodeisalreadymining")
}//获取矿工地址
minerAddr,err:=node.miningAddress()iferr!=nil{returnerrors.Wrap(err,"failedtogetminingaddress")
}//确保密封服务实例存在
//ensurewehaveasectorbuilder
ifnode.SectorBuilder()==nil{iferr:=node.setupMining(ctx);err!=nil{returnerr
}
}//获取地址
minerOwnerAddr,err:=node.miningOwnerAddress(ctx,minerAddr)
minerSigningAddress:=node.MiningSignerAddress()iferr!=nil{returnerrors.Wrapf(err,"failedtogetminingowneraddressforminer%s",minerAddr)
}
blockTime,mineDelay:=node.MiningTimes()//实例化挖矿调度服务
ifnode.MiningScheduler==nil{
getStateFromKey:=func(ctxcontext.Context,tsKeystring)(state.Tree,error){
tsas,err:=node.ChainReader.GetTipSetAndState(ctx,tsKey)iferr!=nil{returnnil,err
}returnstate.LoadStateTree(ctx,node.CborStore(),tsas.TipSetStateRoot,builtin.Actors)
}
getState:=func(ctxcontext.Context,tstypes.TipSet)(state.Tree,error){returngetStateFromKey(ctx,ts.String())
}
getWeight:=func(ctxcontext.Context,tstypes.TipSet)(uint64,error){
parent,err:=ts.Parents()iferr!=nil{returnuint64(0),err
}//TODOhandlegenesiscidmoregracefully
ifparent.Len()==0{returnnode.Consensus.Weight(ctx,ts,nil)
}
pSt,err:=getStateFromKey(ctx,parent.String())iferr!=nil{returnuint64(0),err
}returnnode.Consensus.Weight(ctx,ts,pSt)
}
getAncestors:=func(ctxcontext.Context,tstypes.TipSet,newBlockHeight*types.BlockHeight)([]types.TipSet,error){returnchain.GetRecentAncestors(ctx,ts,node.ChainReader,newBlockHeight,consensus.AncestorRoundsNeeded,consensus.LookBackParameter)
}
processor:=consensus.NewDefaultProcessor()
worker:=mining.NewDefaultWorker(node.MsgPool,getState,getWeight,getAncestors,processor,node.PowerTable,
node.Blockstore,node.CborStore(),minerAddr,minerOwnerAddr,minerSigningAddress,node.Wallet,blockTime)
node.MiningScheduler=mining.NewScheduler(worker,mineDelay,node.ChainReader.Head)
}//paranoidcheck
//启动挖矿服务
if!node.MiningScheduler.IsStarted(){
node.miningCtx,node.cancelMining=context.WithCancel(context.Background())
outCh,doneWg:=node.MiningScheduler.Start(node.miningCtx)
node.miningDoneWg=doneWg
node.AddNewlyMinedBlock=node.addNewlyMinedBlock
node.miningDoneWg.Add(1)
gonode.handleNewMiningOutput(outCh)
}//initializeastorageminer
//初始化存储矿工
storageMiner,err:=initStorageMinerForNode(ctx,node)iferr!=nil{returnerrors.Wrap(err,"failedtoinitializestorageminer")
}
node.StorageMiner=storageMiner//loop,turningsealing-resultsintocommitSectormessagestobeincluded
//inthechain
//新开线程处理,1密封完成处理;2接受停止挖矿消息
gofunc(){for{
select{//密封完成处理
caseresult:=0{
gofunc(){for{
select{//取消
case
停止挖矿
//StopMiningstopsminingonnewblocks.func(node*Node)StopMining(ctxcontext.Context){
node.setIsMining(false)//取消挖矿
ifnode.cancelMining!=nil{
node.cancelMining()
}//等待执行中的挖矿任务完成后结束
ifnode.miningDoneWg!=nil{
node.miningDoneWg.Wait()
}//TODO:stopnode.StorageMiner}
15.3 阶段性分析结束说明至此笔者针对go-filecoin部分的分析快告一个小的段落了
文章因为时间的关系,书面出来只是将关键部分书面表达出来,更多的像是笔者的一个分析笔记,但是我相信对于想分析源码的朋友有一定帮助
后面会抽空补充一章总结,笔者在第4章中有提到过,薄读->厚读->再薄读,我们还需要一次薄读,来加深我们对go-filecoin的认识。


diwzdxcjupy.png

diwzdxcjupy.png
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表