以太坊C++源码解析(八)交易队列(二)

交易队列的输入

交易队列的输入有两个,分别是接收到其他节点的广播交易和自身节点提交的交易。

分别来看这两种输入方式:

  • 接收广播交易
    在前面区块链同步章节中提到过,接收到交易后会通过调用TransactionQueue::enqueue()来将新交易放入交易队列中,这个函数代码非常简单:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId)
    {
    bool queued = false;
    {
    Guard l(x_queue);
    unsigned itemCount = _data.itemCount();
    for (unsigned i = 0; i < itemCount; ++i)
    {
    if (m_unverified.size() >= c_maxVerificationQueueSize)
    {
    LOG(m_logger) << "Transaction verification queue is full. Dropping "
    << itemCount - i << " transactions";
    break;
    }
    m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId));
    queued = true;
    }
    }
    if (queued)
    m_queueReady.notify_all();
    }

只是将交易放入m_unverified中,然后通知校验线程来校验。
再来看校验线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void TransactionQueue::verifierBody()
{
while (!m_aborting)
{
UnverifiedTransaction work;

{
unique_lock<Mutex> l(x_queue);
m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
if (m_aborting)
return;
work = move(m_unverified.front());
m_unverified.pop_front();
}

try
{
Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
ImportResult ir = import(t);
m_onImport(ir, t.sha3(), work.nodeId);
}
catch (...)
{
// should not happen as exceptions are handled in import.
cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
}
}
}

这里是一个简单的生产消费者队列,先将UnverifiedTransaction取出,然后调用import()函数进行校验。由于使用了线程,校验过程是异步的。

  • 自身提交交易
    节点自身提交的交易与上面的交易不同,是同步的,也就是直接会调用import()函数来进行校验。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik)
    {
    try
    {
    Transaction t = Transaction(_transactionRLP, CheckTransaction::Everything);
    return import(t, _ik);
    }
    catch (Exception const&)
    {
    return ImportResult::Malformed;
    }
    }

交易的校验

我们来看这个校验的过程,也就是TransactionQueue::import()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik)
{
if (_transaction.hasZeroSignature())
return ImportResult::ZeroSignature;
// Check if we already know this transaction.
h256 h = _transaction.sha3(WithSignature);

ImportResult ret;
{
UpgradableGuard l(m_lock);
auto ir = check_WITH_LOCK(h, _ik);
if (ir != ImportResult::Success)
return ir;

{
_transaction.safeSender(); // Perform EC recovery outside of the write lock
UpgradeGuard ul(l);
ret = manageImport_WITH_LOCK(h, _transaction);
}
}
return ret;
}

可以看到先是调用TransactionQueue::check_WITH_LOCK()函数来做一个简单检查。检查的过程是看这个交易是否是已经校验过的,是否是之前被删除的。
接着调用_transaction.safeSender(),这个函数是通过签名反推sender,在交易那一节已经说过。最后调用manageImport_WITH_LOCK()函数来处理交易。
manageImport_WITH_LOCK()函数过程稍稍复杂,我们可以一步一步来分析。
第一步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
auto cs = m_currentByAddressAndNonce.find(_transaction.from());
if (cs != m_currentByAddressAndNonce.end())
{
auto t = cs->second.find(_transaction.nonce());
if (t != cs->second.end())
{
if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
return ImportResult::OverbidGasPrice;
else
{
h256 dropped = (*t->second).transaction.sha3();
remove_WITH_LOCK(dropped);
m_onReplaced(dropped);
}
}
}

这一步是检查m_currentByAddressAndNonce中是否有重复项,判断标准是sendernonce,如果存在重复的则检查gasPrice,如果新的交易gasPrice更低,则校验失败,否则将现有的交易删除,替换为gasPrice更高的交易。
第二步是检查m_future,检查过程与第一步类似。
第三步:

1
2
3
4
5
6
7
8
9
10
11
// If valid, append to transactions.
insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
LOG(m_loggerDetail) << "Queued vaguely legit-looking transaction " << _h;

while (m_current.size() > m_limit)
{
LOG(m_loggerDetail) << "Dropping out of bounds transaction " << _h;
remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
}

m_onReady();

这里调用insertCurrent_WITH_LOCK()插入队列,然后将超出容量m_limit的部分删除,并调用m_onReady()表示目前队列有数据了,可以来取了。
我们再来看insertCurrent_WITH_LOCK()函数是怎么将交易插入队列的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
{
if (m_currentByHash.count(_p.first))
{
cwarn << "Transaction hash" << _p.first << "already in current?!";
return;
}

Transaction const& t = _p.second;
// Insert into current
auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
inserted.first->second = handle;
m_currentByHash[_p.first] = handle;

// Move following transactions from future to current
makeCurrent_WITH_LOCK(t);
m_known.insert(_p.first);
}

先还是检查是否有重复项,然后把交易插入m_current里,并记下插入的位置(迭代器),再分别加入m_currentByAddressAndNoncem_currentByHash中。
注意到末尾还有个makeCurrent_WITH_LOCK()的调用,这个是看情况将m_future里的交易移到m_current中。

交易队列的输出

交易队列输出只有一个,那就是区块block。在Block::sync()中会调用TransactionQueue::topTransactions()来取队列的前N项数据,再加入block中。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×