onPeerBlockBodies()
BlockChainSync::requestBlocks()
请求区块体后,如果对方有这些区块就会把数据返回回来,本节我们来看看接收区块体数据的处理。
数据包辗转从Session
到EthereumPeer
,再到EthereumPeerObserver
,最后到BlockChainSync::onPeerBlockBodies
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| RecursiveGuard l(x_sync); DEV_INVARIANT_CHECK; size_t itemCount = _r.itemCount(); LOG(m_logger) << "BlocksBodies (" << dec << itemCount << " entries) " << (itemCount ? "" : ": NoMoreBodies"); clearPeerDownload(_peer); if (m_state != SyncState::Blocks && m_state != SyncState::Waiting) { LOG(m_logger) << "Ignoring unexpected blocks"; return; } if (m_state == SyncState::Waiting) { LOG(m_loggerDetail) << "Ignored blocks while waiting"; return; } if (itemCount == 0) { LOG(m_loggerDetail) << "Peer does not have the blocks requested"; _peer->addRating(-1); }
|
这个函数比onPeerBlockHeaders()
简单多了,开头仍然主要是对SyncState
做检查。
接着是遍历校验接收到的区块体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| for (unsigned i = 0; i < itemCount; i++) { RLP body(_r[i]);
auto txList = body[0]; h256 transactionRoot = trieRootOver(txList.itemCount(), [&](unsigned i){ return rlp(i); }, [&](unsigned i){ return txList[i].data().toBytes(); }); h256 uncles = sha3(body[1].data()); HeaderId id { transactionRoot, uncles }; auto iter = m_headerIdToNumber.find(id); if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second)) { LOG(m_loggerDetail) << "Ignored unknown block body"; continue; } unsigned blockNumber = iter->second; if (haveItem(m_bodies, blockNumber)) { LOG(m_logger) << "Skipping already downloaded block body " << blockNumber; continue; } m_headerIdToNumber.erase(id); mergeInto(m_bodies, blockNumber, body.data().toBytes()); }
|
校验过程也比较简单,主要是从区块体数据中重新计算transactionRoot
和uncles
值,和m_headers
中对应块头里记录的值做比较,如果一样则mergeInto
到m_bodies
里。
最后仍然是:
1 2
| collectBlocks(); continueSync();
|
但是这次我们需要深入collectBlocks()
这个函数里去看看了,因为这次有了区块头和区块体,只要条件允许就可以进行合并操作了。
collectBlocks()
BlockChainSync::collectBlocks()
函数开头就提出合并所需要的两组条件。
第一组条件是:
1 2
| if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty()) return;
|
第一组包含三个条件,缺一不可。
第二组条件是:
1 2 3 4
| auto& headers = *m_headers.begin(); auto& bodies = *m_bodies.begin(); if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1) return;
|
这里的headers
是m_headers
中第一个连续区域,bodies
是m_bodies
中第一个连续区域。那么这里的两个条件是headers
中最低区块号必须和bodies
中最低区块号相同,并且这个区块号就是所需要同步的下一个区块。
满足这两个条件就可以进入正式的合并流程了,在BlockChainSync
里的部分其实并不多:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| for (; i < headers.second.size() && i < bodies.second.size(); i++) { RLPStream blockStream(3); blockStream.appendRaw(headers.second[i].data); RLP body(bodies.second[i]); blockStream.appendRaw(body[0].data()); blockStream.appendRaw(body[1].data()); bytes block; blockStream.swapOut(block); switch (host().bq().import(&block)) { } }
|
假如headers
里的区块是[区块3,区块4,区块5,区块6],bodies
里的区块是[区块3,区块4],那么这里的遍历范围是[区块3,区块4]。将区块头和区块体合并起来以后放到RLPStream
中,并调用BlockQueue::import()
函数导入二级缓冲区中,BlockQueue::import()
函数的实现在以后BlockQueue
类里再细说,这里主要看BlockChainSync
类里的流程。
调用完BlockQueue::import()
后根据返回值做不同处理,这段就不分析了,可以直接去看源码。
1 2 3 4 5 6 7 8 9 10 11 12
| auto newHeaders = std::move(headers.second); newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i); unsigned newHeaderHead = headers.first + i; auto newBodies = std::move(bodies.second); newBodies.erase(newBodies.begin(), newBodies.begin() + i); unsigned newBodiesHead = bodies.first + i; m_headers.erase(m_headers.begin()); m_bodies.erase(m_bodies.begin()); if (!newHeaders.empty()) m_headers[newHeaderHead] = newHeaders; if (!newBodies.empty()) m_bodies[newBodiesHead] = newBodies;
|
导入二级缓冲区后,将导入成功的区块从headers
和bodies
中删除,并重设m_headers
和m_bodies
中的最低连续区域。