本文讲介绍一个与 Boost.Asio 的示例代码中的聊天服务器功能类似的网络服务程序,包括客户端 与服务端的 muduo 实现。这个例子的主要目的是介绍如何处理分包,并初步涉及 Muduo 的多线程功能 。Muduo 的下载地址: http://muduo.googlecode.com/files/muduo-0.1.7-alpha.tar.gz ,SHA1 873567e43b3c2cae592101ea809b30ba730f2ee6,本文的完整代码可在线阅读
http://code.google.com/p/muduo/source/browse/trunk/examples/asio/chat/ 。
TCP 分包
前面一篇《五个简单 TCP 协议》中处理的协议没有涉及分包,在 TCP 这种字节流协议上做应用层 分包是网络编程的基本需求。分包指的是在发生一个消息(message)或一帧(frame)数据时,通过一定的 处理,让接收方能从字节流中识别并截取(还原)出一个个消息。“粘包问题”是个伪问题。
对于短连接的 TCP 服务,分包不是一个问题,只要发送方主动关闭连接,就表示一条消息发送完毕 ,接收方 read() 返回 0,从而知道消息的结尾。例如前一篇文章里的 daytime 和 time 协议。
对于长连接的 TCP 服务,分包有四种方法:
1. 消息长度固定,比如 muduo 的 roundtrip 示例就采用了固定的 16 字节消息;
2. 使用特殊的字符或字符串作为消息的边界,例如 HTTP 协议的 headers 以 "rn" 为字段的分隔 符;
3. 在每条消息的头部加一个长度字段,这恐怕是最常见的做法,本文的聊天协议也采用这一办法;
4. 利用消息本身的格式来分包,例如 XML 格式的消息中 <root>...</root> 的配对 ,或者 JSON 格式中的 { ... } 的配对。解析这种消息格式通常会用到状态机。
在后文的代码讲解中还会仔细讨论用长度字段分包的常见陷阱。
聊天服务
本文实现的聊天服务非常简单,由服务端程序和客户端程序组成,协议如下:
* 服务端程序中某个端口侦听 (listen) 新的连接;
* 客户端向服务端发起连接;
* 连接建立之后,客户端随时准备接收服务端的消息并在屏幕上显示出来;
* 客户端接受键盘输入,以回车为界,把消息发送给服务端;
* 服务端接收到消息之后,依次发送给每个连接到它的客户端;原来发送消息的客户端进程也会收 到这条消息;
* 一个服务端进程可以同时服务多个客户端进程,当有消息到达服务端后,每个客户端进程都会收 到同一条消息,服务端广播发送消息的顺序是任意的,不一定哪个客户端会先收到这条消息。
* (可选)如果消息 A 先于消息 B 到达服务端,那么每个客户端都会先收到 A 再收到 B。
这实际上是一个简单的基于 TCP 的应用层广播协议,由服务端负责把消息发送给每个连接到它的客 户端。参与“聊天”的既可以是人,也可以是程序。在以后的文章中,我将介绍一个稍微复杂的一点的 例子 hub,它有“聊天室”的功能,客户端可以注册特定的 topic(s),并往某个 topic 发送消息,这 样代码更有意思。
消息格式
本聊天服务的消息格式非常简单,“消息”本身是一个字符串,每条消息的有一个 4 字节的头部, 以网络序存放字符串的长度。消息之间没有间隙,字符串也不一定以 '' 结尾。比方说有两条消息 "hello" 和 "chenshuo",那么打包后的字节流是:
0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'
共 21 字节。
打包的代码
这段代码把 const string& message 打包为 muduo::net::Buffer,并通过 conn 发送。
1: void send(muduo::net::TcpConnection* conn, const string& message) 2: { 3: muduo::net::Buffer buf; 4: buf.append(message.data(), message.size()); 5: int32_t len = muduo::net::sockets::hostToNetwork32(static_cast<int32_t>(message.size())); 6: buf.prepend(&len, sizeof len); 7: conn->send(&buf); 8: }
muduo::Buffer 有一个很好的功能,它在头部预留了 8 个字节的空间,这样第 6 行的 prepend() 操作就不需要移动已有的数据,效率较高。
分包的代码
解析数据往往比生成数据复杂,分包打包也不例外。
1: void onMessage(const muduo::net::TcpConnectionPtr& conn, 2: muduo::net::Buffer* buf, 3: muduo::Timestamp receiveTime) 4: { 5: while (buf->readableBytes() >= kHeaderLen) 6: { 7: const void* data = buf->peek(); 8: int32_t tmp = *static_cast<const int32_t*>(data); 9: int32_t len = muduo::net::sockets::networkToHost32(tmp); 10: if (len > 65536 || len < 0) 11: { 12: LOG_ERROR << "Invalid length " << len; 13: conn->shutdown(); 14: } 15: else if (buf->readableBytes() >= len + kHeaderLen) 16: { 17: buf->retrieve(kHeaderLen); 18: muduo::string message(buf->peek(), len); 19: buf->retrieve(len); 20: messageCallback_(conn, message, receiveTime); // 收到完整的消息,通知用户 21: } 22: else 23: { 24: break; 25: } 26: } 27: }
上面这段代码第 7 行用了 while 循环来反复读取数据,直到 Buffer 中的数据不 够一条完整的消息。请读者思考,如果换成 if (buf->readableBytes() >= kHeaderLen) 会有 什么后果。
以前面提到的两条消息的字节流为例:
0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'
假设数据最终都全部到达,onMessage() 至少要能正确处理以下各种数据到达的次序,每种情况下 messageCallback_ 都应该被调用两次:
1. 每次收到一个字节的数据,onMessage() 被调用 21 次;
2. 数据分两次到达,第一次收到 2 个字节,不足消息的长度字段;
3. 数据分两 次到达,第一次收到 4 个字节,刚好够长度字段,但是没有 body;
4. 数据分两次到达,第一 次收到 8 个字节,长度完整,但 body 不完整;
5. 数据分两次到达,第一次收到 9 个字节, 长度完整,body 也完整;
6. 数据分两次到达,第一次收到 10 个字节,第一条消息的长度完 整、body 也完整,第二条消息长度不完整;
7. 请自行移动分割点,验证各种情况;
8. 数据一次就全部到达,这时必须用 while 循环来读出两条消息,否则消息会堆积。
请读者验证 onMessage() 是否做到了以上几点。这个例子充分说明了 non-blocking read 必须和 input buffer 一起使用。
编解码器 LengthHeaderCodec
有人评论 Muduo 的接收缓冲区不能设置回调 函数的触发条件,确实如此。每当 socket 可读,Muduo 的 TcpConnection 会读取数据并存入 Input Buffer,然后回调用户的函数。不过,一个简单的间接层就能解决问题,让用户代码只关心“消 息到达”而不是“数据到达”,如本例中的 LengthHeaderCodec 所展示的那一样。
1: #ifndef MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H 2: #define MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H 3: 4: #include <muduo/base/Logging.h> 5: #include <muduo/net/Buffer.h> 6: #include <muduo/net/SocketsOps.h> 7: #include <muduo/net/TcpConnection.h> 8: 9: #include <boost/function.hpp> 10: #include <boost/noncopyable.hpp> 11: 12: using muduo::Logger; 13: 14: class LengthHeaderCodec : boost::noncopyable 15: { 16: public: 17: typedef boost::function<void (const muduo::net::TcpConnectionPtr&, 18: const muduo::string& message, 19: muduo::Timestamp)> StringMessageCallback; 20: 21: explicit LengthHeaderCodec(const StringMessageCallback& cb) 22: : messageCallback_(cb) 23: { 24: } 25: 26: void onMessage(const muduo::net::TcpConnectionPtr& conn, 27: muduo::net::Buffer* buf, 28: muduo::Timestamp receiveTime) 29: { 同上 } 30: 31: void send(muduo::net::TcpConnection* conn, const muduo::string& message) 32: { 同上 } 33: 34: private: 35: StringMessageCallback messageCallback_; 36: const static size_t kHeaderLen = sizeof(int32_t); 37: }; 38: 39: #endif // MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
这段代码把以 Buffer* 为参数的 MessageCallback 转换成了以 const string& 为参数的 StringMessageCallback,让用户代码不必关心分包操作。客户端和服务端都能从中受益。
服务 端的实现
聊天服务器的服务端代码小于 100 行,不到 asio 的一半。
请先阅读第 68 行起的数据成员的定义。除了经常见到的 EventLoop 和 TcpServer,ChatServer 还定义了 codec_ 和 std::set<TcpConnectionPtr> connections_ 作为成员,connections_ 是目前已建立的客户连 接,在收到消息之后,服务器会遍历整个容器,把消息广播给其中每一个 TCP 连接。
首先,在 构造函数里注册回调:
1: #include "codec.h" 2: 3: #include <muduo/base/Logging.h> 4: #include <muduo/base/Mutex.h> 5: #include <muduo/net/EventLoop.h> 6: #include <muduo/net/SocketsOps.h> 7: #include <muduo/net/TcpServer.h> 8: 9: #include <boost/bind.hpp> 10: 11: #include <set> 12: #include <stdio.h> 13: 14: using namespace muduo; 15: using namespace muduo::net; 16: 17: class ChatServer : boost::noncopyable 18: { 19: public: 20: ChatServer(EventLoop* loop, 21: const InetAddress& listenAddr) 22: : loop_(loop), 23: server_(loop, listenAddr, "ChatServer"), 24: codec_(boost::bind(&ChatServer::onStringMessage, this, _1, _2, _3)) 25: { 26: server_.setConnectionCallback( 27: boost::bind(&ChatServer::onConnection, this, _1)); 28: server_.setMessageCallback( 29: boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3)); 30: } 31: 32: void start() 33: { 34: server_.start(); 35: } 36:
这里有几点值得注意,在以往的代码里是直接把本 class 的 onMessage() 注册给 server_;这里我们把 LengthHeaderCodec::onMessage() 注册给 server_,然后向 codec_ 注册了 ChatServer::onStringMessage(),等于说让 codec_ 负责解析消息,然后把完整的消息回调给 ChatServer。这正是我前面提到的“一个简单的间接层”,在不增加 Muduo 库的复杂度的 前提下,提供了足够的灵活性让我们在用户代码里完成需要的工作。
另外,server_.start() 绝对不能在构造函数里调用,这么做将来会有线程安全的问题,见我在《当析构函数遇到多线程 ── C++ 中线程安全的对象回调》一文中的论述。
以下是处理连接的建立和断开的代码,注意它把 新建的连接加入到 connections_ 容器中,把已断开的连接从容器中删除。这么做是为了避免内存和资 源泄漏,TcpConnectionPtr 是 boost::shared_ptr<TcpConnection>,是 muduo 里唯一一个默 认采用 shared_ptr 来管理生命期的对象。以后我们会谈到这么做的原因。
37: private: 38: void onConnection(const TcpConnectionPtr& conn) 39: { 40: LOG_INFO << conn->localAddress().toHostPort() << " -> " 41: << conn->peerAddress().toHostPort() << " is " 42: << (conn->connected() ? "UP" : "DOWN"); 43: 44: MutexLockGuard lock(mutex_); 45: if (conn->connected()) 46: { 47: connections_.insert(conn); 48: } 49: else 50: { 51: connections_.erase(conn); 52: } 53: } 54:
以下是服务端处理消息的代码,它遍历整个 connections_ 容器,把消息打包发送给 各个客户连接。
55: void onStringMessage(const TcpConnectionPtr&, 56: const string& message, 57: Timestamp) 58: { 59: MutexLockGuard lock(mutex_); 60: for (ConnectionList::iterator it = connections_.begin(); 61: it != connections_.end(); 62: ++it) 63: { 64: codec_.send(get_pointer(*it), message); 65: } 66: } 67:
数据成员:
68: typedef std::set<TcpConnectionPtr> ConnectionList; 69: EventLoop* loop_; 70: TcpServer server_; 71: LengthHeaderCodec codec_; 72: MutexLock mutex_; 73: ConnectionList connections_; 74: }; 75:
main() 函数里边是例行公事的代码:
76: int main(int argc, char* argv[]) 77: { 78: LOG_INFO << "pid = " << getpid(); 79: if (argc > 1) 80: { 81: EventLoop loop; 82: uint16_t port = static_cast<uint16_t>(atoi(argv[1])); 83: InetAddress serverAddr(port); 84: ChatServer server(&loop, serverAddr); 85: server.start(); 86: loop.loop(); 87: } 88: else 89: { 90: printf("Usage: %s portn", argv[0]); 91: } 92: }
如果你读过 asio 的对应代码,会不会觉得 Reactor 往往比 Proactor 容易使用?
客户端的实现
我有时觉得服务端的程序常常比客户端的更容易写,聊天服务器再次验证 了我的看法。客户端的复杂性来自于它要读取键盘输入,而 EventLoop 是独占线程的,所以我用了两 个线程,main() 函数所在的线程负责读键盘,另外用一个 EventLoopThread 来处理网络 IO。我暂时 没有把标准输入输出融入 Reactor 的想法,因为服务器程序的 stdin 和 stdout 往往是重定向了的。
来看代码,首先,在构造函数里注册回调,并使用了跟前面一样的 LengthHeaderCodec 作为中 间层,负责打包分包。
1: #include "codec.h" 2: 3: #include <muduo/base/Logging.h> 4: #include <muduo/base/Mutex.h> 5: #include <muduo/net/EventLoopThread.h> 6: #include <muduo/net/TcpClient.h> 7: 8: #include <boost/bind.hpp> 9: #include <boost/noncopyable.hpp> 10: 11: #include <iostream> 12: #include <stdio.h> 13: 14: using namespace muduo; 15: using namespace muduo::net; 16: 17: class ChatClient : boost::noncopyable 18: { 19: public: 20: ChatClient(EventLoop* loop, const InetAddress& listenAddr) 21: : loop_(loop), 22: client_(loop, listenAddr, "ChatClient"), 23: codec_(boost::bind(&ChatClient::onStringMessage, this, _1, _2, _3)) 24: { 25: client_.setConnectionCallback( 26: boost::bind(&ChatClient::onConnection, this, _1)); 27: client_.setMessageCallback( 28: boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3)); 29: client_.enableRetry(); 30: } 31: 32: void connect() 33: { 34: client_.connect(); 35: } 36:
disconnect() 目前为空,客户端的连接由操作系统在进程终止时关闭。
37: void disconnect() 38: { 39: // client_.disconnect(); 40: } 41:
write() 会由 main 线程调用,所以要加锁,这个锁不是为了保护 TcpConnection, 而是保护 shared_ptr。
42: void write(const string& message) 43: { 44: MutexLockGuard lock(mutex_); 45: if (connection_) 46: { 47: codec_.send(get_pointer(connection_), message); 48: } 49: } 50:
onConnection() 会由 EventLoop 线程调用,所以要加锁以保护 shared_ptr。
51: private: 52: void onConnection(const TcpConnectionPtr& conn) 53: { 54: LOG_INFO << conn->localAddress().toHostPort() << " -> " 55: << conn->peerAddress().toHostPort() << " is " 56: << (conn->connected() ? "UP" : "DOWN"); 57: 58: MutexLockGuard lock(mutex_); 59: if (conn->connected()) 60: { 61: connection_ = conn; 62: } 63: else 64: { 65: connection_.reset(); 66: } 67: } 68: