上一节中留了一个悬念,那就是在Session
类中我们没有找到有意义的处理代码,这部分代码隐藏在哪里呢?
答案其实在上上节的末尾中
那就是EthereumHost::newPeerCapability()
实现中! 我们来看下这个函数的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 shared_ptr <Capability> EthereumHost::newPeerCapability(shared_ptr <SessionFace> const & _s, unsigned _idOffset, p2p::CapDesc const & _cap){ auto ret = HostCapability<EthereumPeer>::newPeerCapability(_s, _idOffset, _cap); auto cap = capabilityFromSession<EthereumPeer>(*_s, _cap.second); assert(cap); cap->init( protocolVersion(), m_networkId, m_chain.details().totalDifficulty, m_chain.currentHash(), m_chain.genesisHash(), m_hostData, m_peerObserver ); return ret; }
这个函数上来先调用了父类的实现,然后调用capabilityFromSession<EthereumPeer>()
函数返回EthereumPeer
对象的指针,最后调用了EthereumPeer::init()
。看来有必要深入这个函数里看看。
1 2 3 4 5 6 7 void EthereumPeer::init(unsigned _hostProtocolVersion, u256 _hostNetworkId, u256 _chainTotalDifficulty, h256 _chainCurrentHash, h256 _chainGenesisHash, shared_ptr <EthereumHostDataFace> _hostData, shared_ptr <EthereumPeerObserverFace> _observer){ m_hostData = _hostData; m_observer = _observer; m_hostProtocolVersion = _hostProtocolVersion; requestStatus(_hostNetworkId, _chainTotalDifficulty, _chainCurrentHash, _chainGenesisHash); }
可以看到这里有我们感兴趣的requestStatus()
这个函数了!EthereumPeer::requestStatus()
函数实现也比较简单,就是向对方发送一个StatusPacket
包:
1 2 3 4 5 6 7 8 9 10 void EthereumPeer::requestStatus(u256 _hostNetworkId, u256 _chainTotalDifficulty, h256 _chainCurrentHash, h256 _chainGenesisHash){ assert(m_asking == Asking::Nothing); setAsking(Asking::State); m_requireTransactions = true ; RLPStream s; prep(s, StatusPacket, 5 ) << m_hostProtocolVersion << _hostNetworkId << _chainTotalDifficulty << _chainCurrentHash << _chainGenesisHash; sealAndSend(s); }
这个数据包真正开始了与peer有意义的通讯!
可以看到发送这个包的同时告知了对方我自己的版本号 ,网络id ,区块链难度 ,当前区块链最新块hash值 ,当前区块链创世区块的hash值 ,这些信息非常重要! 对端peer接收到这个StatusPacket
包以后怎么应答呢?前面讲过EthereumPeer
类是Session
类的消息处理器,那么我们到EthereumPeer::interpret()
中去寻找:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 case StatusPacket: { cout << "StatusPacket" << endl ; m_protocolVersion = _r[0 ].toInt<unsigned >(); m_networkId = _r[1 ].toInt<u256>(); m_totalDifficulty = _r[2 ].toInt<u256>(); m_latestHash = _r[3 ].toHash<h256>(); m_genesisHash = _r[4 ].toHash<h256>(); if (m_peerCapabilityVersion == m_hostProtocolVersion) m_protocolVersion = m_hostProtocolVersion; LOG(m_logger) << "Status: " << m_protocolVersion << " / " << m_networkId << " / " << m_genesisHash.hex().c_str() << ", TD: " << m_totalDifficulty << " = " << m_latestHash.hex().c_str(); setIdle(); observer->onPeerStatus(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this()))); break ; }
可以看到这里将对方的协议版本号 ,网络id 等信息取出来,然后交给observer
去处理。observer
也是从EthereumPeer::init()
函数中由EthereumHost
类传过来的,所以还是要去EthereumHost
那找,在EthereumHost
类的构造函数中有这么一句话:
1 m_peerObserver = make_shared<EthereumPeerObserver>(m_sync, m_tq);
EthereumPeerObserver
这个类其实是一个过渡类,我们可以看看它的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 void onPeerStatus (std ::shared_ptr <EthereumPeer> _peer) override { try { m_sync->onPeerStatus(_peer); } catch (FailedInvariant const &) { cwarn << "Failed invariant during sync, restarting sync" ; m_sync->restartSync(); } }
这个类里有许多这样的函数,都是过渡用的,其实真正调用的是m_sync
里对应的函数。而m_sync
从上面代码中可以看到其实是EthereumHost
类里的m_sync
,这个是BlockChainSync
类对象。
因此我们能够得出结论,所有数据包都是由BlockChainSync
类对象来处理的!
我们再来看看BlockChainSync::onPeerStatus()
函数的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void BlockChainSync::onPeerStatus(std ::shared_ptr <EthereumPeer> _peer){ char const * disconnectReason = nullptr ; if (_peer->m_genesisHash != host().chain().genesisHash()) disconnectReason = "Invalid genesis hash." ; else if (_peer->m_protocolVersion != host().protocolVersion()) disconnectReason = "Invalid protocol version." ; else if (_peer->m_networkId != host().networkId()) disconnectReason = "Invalid network identifier." ; else if (session->info().clientVersion.find("/v0.7.0/" ) != string ::npos) disconnectReason = "Blacklisted client version." ; else if (host().isBanned(session->id())) disconnectReason = "Peer banned for previous bad behaviour." ; else if (_peer->m_asking != Asking::State && _peer->m_asking != Asking::Nothing) disconnectReason = "Peer banned for unexpected status message." ; if (disconnectReason) { LOG(m_logger) << "Peer not suitable for sync: " << disconnectReason; cout << "Peer not suitable for sync: " << disconnectReason << endl ; _peer->disconnect(); return ; } if (!requestDaoForkBlockHeader(_peer)) { cout << "onPeerStatus syncPeer" << endl ; syncPeer(_peer, false ); } }
这个函数先是做了一些检测,判断peer的一些参数和自己参数不一致就关闭连接。 最后调用了syncPeer()
来对peer进行区块链同步!