以太坊C++源码解析(五)区块链同步(5)

onPeerBlockBodies()

BlockChainSync::requestBlocks()请求区块体后,如果对方有这些区块就会把数据返回回来,本节我们来看看接收区块体数据的处理。

数据包辗转从SessionEthereumPeer,再到EthereumPeerObserver,最后到BlockChainSync::onPeerBlockBodies中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
size_t itemCount = _r.itemCount();
LOG(m_logger) << "BlocksBodies (" << dec << itemCount << " entries) "
<< (itemCount ? "" : ": NoMoreBodies");
clearPeerDownload(_peer);
if (m_state != SyncState::Blocks && m_state != SyncState::Waiting) {
LOG(m_logger) << "Ignoring unexpected blocks";
return;
}
if (m_state == SyncState::Waiting)
{
LOG(m_loggerDetail) << "Ignored blocks while waiting";
return;
}
if (itemCount == 0)
{
LOG(m_loggerDetail) << "Peer does not have the blocks requested";
_peer->addRating(-1);
}

这个函数比onPeerBlockHeaders()简单多了,开头仍然主要是对SyncState做检查。
接着是遍历校验接收到的区块体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
for (unsigned i = 0; i < itemCount; i++)
{
RLP body(_r[i]);

auto txList = body[0];
h256 transactionRoot = trieRootOver(txList.itemCount(), [&](unsigned i){ return rlp(i); }, [&](unsigned i){ return txList[i].data().toBytes(); });
h256 uncles = sha3(body[1].data());
HeaderId id { transactionRoot, uncles };
auto iter = m_headerIdToNumber.find(id);
if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second))
{
LOG(m_loggerDetail) << "Ignored unknown block body";
continue;
}
unsigned blockNumber = iter->second;
if (haveItem(m_bodies, blockNumber))
{
LOG(m_logger) << "Skipping already downloaded block body " << blockNumber;
continue;
}
m_headerIdToNumber.erase(id);
mergeInto(m_bodies, blockNumber, body.data().toBytes());
}

校验过程也比较简单,主要是从区块体数据中重新计算transactionRootuncles值,和m_headers中对应块头里记录的值做比较,如果一样则mergeIntom_bodies里。
最后仍然是:

1
2
collectBlocks();
continueSync();

但是这次我们需要深入collectBlocks()这个函数里去看看了,因为这次有了区块头和区块体,只要条件允许就可以进行合并操作了。

collectBlocks()

BlockChainSync::collectBlocks()函数开头就提出合并所需要的两组条件。
第一组条件是:

1
2
if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty())
return;

第一组包含三个条件,缺一不可。
第二组条件是:

1
2
3
4
auto& headers = *m_headers.begin();
auto& bodies = *m_bodies.begin();
if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1)
return;

这里的headersm_headers中第一个连续区域,bodiesm_bodies中第一个连续区域。那么这里的两个条件是headers中最低区块号必须和bodies中最低区块号相同,并且这个区块号就是所需要同步的下一个区块。
满足这两个条件就可以进入正式的合并流程了,在BlockChainSync里的部分其实并不多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (; i < headers.second.size() && i < bodies.second.size(); i++)
{
RLPStream blockStream(3);
blockStream.appendRaw(headers.second[i].data);
RLP body(bodies.second[i]);
blockStream.appendRaw(body[0].data());
blockStream.appendRaw(body[1].data());
bytes block;
blockStream.swapOut(block);
switch (host().bq().import(&block))
{
// ...
}
}

假如headers里的区块是[区块3,区块4,区块5,区块6],bodies里的区块是[区块3,区块4],那么这里的遍历范围是[区块3,区块4]。将区块头和区块体合并起来以后放到RLPStream中,并调用BlockQueue::import()函数导入二级缓冲区中,BlockQueue::import()函数的实现在以后BlockQueue类里再细说,这里主要看BlockChainSync类里的流程。
调用完BlockQueue::import()后根据返回值做不同处理,这段就不分析了,可以直接去看源码。

1
2
3
4
5
6
7
8
9
10
11
12
auto newHeaders = std::move(headers.second);
newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i);
unsigned newHeaderHead = headers.first + i;
auto newBodies = std::move(bodies.second);
newBodies.erase(newBodies.begin(), newBodies.begin() + i);
unsigned newBodiesHead = bodies.first + i;
m_headers.erase(m_headers.begin());
m_bodies.erase(m_bodies.begin());
if (!newHeaders.empty())
m_headers[newHeaderHead] = newHeaders;
if (!newBodies.empty())
m_bodies[newBodiesHead] = newBodies;

导入二级缓冲区后,将导入成功的区块从headersbodies中删除,并重设m_headersm_bodies中的最低连续区域。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×