我们再来深入了解一下Host类里节点和本节点是怎么交互的,在上一节可以看到节点到了Host类后,会调用Host::connect
来连接对方,我们可以看下connect()
函数实现代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| void Host::connect(std::shared_ptr<Peer> const& _p) { bi::tcp::endpoint ep(_p->endpoint); cnetdetails << "Attempting connection to node " << _p->id << "@" << ep << " from " << id(); auto socket = make_shared<RLPXSocket>(m_ioService); socket->ref().async_connect(ep, [=](boost::system::error_code const& ec) { if (ec) { cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " (" << ec.message() << ")"; _p->m_lastDisconnect = TCPError; } else { cnetdetails << "Connecting to " << _p->id << "@" << ep; auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id); { Guard l(x_connecting); m_connecting.push_back(handshake); }
handshake->start(); } m_pendingPeerConns.erase(nptr); }); }
|
可以看到先是创建了一个socket,然后用async_connect()
异步去连接这个节点,连接成功后生成了一个RLPXHandshake
类,并调用了RLPXHandshake::start()
来开启握手流程,这里并没有连接成功后就传输数据,因为对方可能并不是一个ethereum节点,或者是运行协议不匹配的节点,握手流程就用来过滤掉不合格的节点,只有通过了握手流程才能进行数据交互。
注:在cpp-ethereum项目中底层数据传输用的是boost::asio库,作为准标准库中一员,boost::asio广泛应用在c++跨平台网络开发中,不熟悉的读者建议先去网络上阅读相关文档,后续文档假定读者已经了解了boost::asio库。
#RLPXHandshake类
RLPXHandshake::start()
函数实际调用了RLPXHandshake::transition()
函数,这个函数是RLPXHandshake
类的核心,从中可以看到握手的流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| void RLPXHandshake::transition(boost::system::error_code _ech) { if (m_nextState == New) { m_nextState = AckAuth; if (m_originated) writeAuth(); else readAuth(); } else if (m_nextState == AckAuth) { m_nextState = WriteHello; if (m_originated) readAck(); else writeAck(); } else if (m_nextState == AckAuthEIP8) { m_nextState = WriteHello; if (m_originated) readAck(); else writeAckEIP8(); } else if (m_nextState == WriteHello) { m_nextState = ReadHello; } else if (m_nextState == ReadHello) { m_nextState = StartSession; } }
|
精简后的流程还是比较清楚的,初始时候m_nextState
值为New
,那么正常的握手状态是New -> AckAuth -> WriteHello -> ReadHello -> StartSession
。如果这些环节中某一步出错了,那么该节点不会走到最后,否则最后的状态会变成StartSession
,那么到了StartSession
状态后会发生什么事呢?我们再看看看这部分代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| else if (m_nextState == ReadHello) { m_nextState = StartSession;
unsigned const handshakeSize = 32; m_handshakeInBuffer.resize(handshakeSize); ba::async_read(m_socket->ref(), boost::asio::buffer(m_handshakeInBuffer, handshakeSize), [this, self](boost::system::error_code ec, std::size_t) { if (ec) transition(ec); else { bytes headerRLP(header.size() - 3 - h128::size); bytesConstRef(&header).cropped(3).copyTo(&headerRLP); m_handshakeInBuffer.resize(frameSize + ((16 - (frameSize % 16)) % 16) + h128::size); ba::async_read(m_socket->ref(), boost::asio::buffer(m_handshakeInBuffer, m_handshakeInBuffer.size()), [this, self, headerRLP](boost::system::error_code ec, std::size_t) { if (ec) transition(ec); else { try { RLP rlp(frame.cropped(1), RLP::ThrowOnFail | RLP::FailIfTooSmall); m_host->startPeerSession(m_remote, rlp, move(m_io), m_socket); } catch (std::exception const& _e) { cnetlog << "Handshake causing an exception: " << _e.what(); m_nextState = Error; transition(); } } }); } }); }
|
当状态从ReadHello
向StartSession
转变时,连续收了两个包,然后调用了Host::startPeerSession()
,节点在RLPXHandshake
类转了一圈以后,如果合格的话又回到了Host
类中,从此开始新的征程。
#Host类
我们之前看到Host
类通过requirePeer()
函数推动了P2P发现模块的运转,但同时它又是整个P2P传输模块中的发动机,因此要研究ethereum网络部分需要从这里开始。
我们在libp2p\Host.h
文件中找到Host
类定义,其中有两个成员变量,熟悉boost::asio库的读者一定不陌生:
1 2
| ba::io_service m_ioService; bi::tcp::acceptor m_tcp4Acceptor;
|
其中m_ioService
就是Host类的核心了,它负责处理异步任务,当异步任务完成后调用完成句柄。
m_tcp4Acceptor
是负责接收连接的对象,它内部封装了一个socket对象。我们都知道服务端的socket需要经过创建,绑定IP端口,侦听,Accept这几个阶段,对于m_tcp4Acceptor
而言也是这样:
直接在Host
类初始化列表中进行创建
这部分是在Network::tcp4Listen()
函数中完成的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| for (unsigned i = 0; i < 2; ++i) { bi::tcp::endpoint endpoint(listenIP, requirePort ? _netPrefs.listenPort : (i ? 0 : c_defaultListenPort)); try { _acceptor.open(endpoint.protocol()); _acceptor.set_option(ba::socket_base::reuse_address(reuse)); _acceptor.bind(endpoint); _acceptor.listen(); return _acceptor.local_endpoint().port(); } catch (...) { if (i || requirePort) { cwarn << "Couldn't start accepting connections on host. Failed to accept socket on " << listenIP << ":" << _netPrefs.listenPort << ".\n" << boost::current_exception_diagnostic_information(); _acceptor.close(); return -1; } _acceptor.close(); continue; } }
|
注意到这里有一个循环,是用来防止端口被占用的。如果第一次端口被占用,则第二次使用0端口,也就是随机端口。
在这个函数里,_acceptor
依次完成了设置协议,设置端口重用,绑定端口和侦听。
又回到了Host
类,在Host::runAcceptor()
函数中,我们能找到以下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| auto socket = make_shared<RLPXSocket>(m_ioService); m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec) { try { auto handshake = make_shared<RLPXHandshake>(this, socket); m_connecting.push_back(handshake); handshake->start(); success = true; } catch (Exception const& _e) { cwarn << "ERROR: " << diagnostic_information(_e); } catch (std::exception const& _e) { cwarn << "ERROR: " << _e.what(); }
if (!success) socket->ref().close(); runAcceptor(); });
|
m_tcp4Acceptor
通过async_accept()
异步接收连接,当一个连接到来的时候发生了什么?我们又看到了熟悉的代码,是的!创建了一个RLPXHandshake类,又开始了握手流程。ethereum对于接收到的连接也是谨慎的,同样需要先进行校验,这里的握手流程与前面connect
时的流程稍有不同,区别就在RLPXHandshake::m_originated
上,connect
时的m_originated
值为true,也就是先向对方发送自己的Auth包,而被动接收时m_originated
为false,会等待对方发过来Auth包。
最后别忘了启动Host::m_ioService
,这部分被放在doWork()
函数里,还记得doWork()
函数吗?因为Host
类是从Worker
类继承而来,doWork()
会在一个循环中被调用。
1 2 3 4 5 6 7 8 9 10 11 12
| void Host::doWork() { try { if (m_run) m_ioService.run(); } catch (std::exception const& _e) { } }
|
但是doWork()
不是会被循环调用的吗?难道m_ioService.run()
也会重复调用吗?答案是不会,因为m_ioService.run()
会阻塞在这里,所以只会执行一次。
至此m_tcp4Acceptor
能够愉快地接收到TCP连接,并把连接交给RLPXHandshake
类去处理了。