diff --git a/Acceptor.cc b/Acceptor.cc index c5114ee..f5b5b34 100644 --- a/Acceptor.cc +++ b/Acceptor.cc @@ -26,7 +26,6 @@ Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reusepor acceptSocket_.setReuseAddr(true); acceptSocket_.setReusePort(true); acceptSocket_.bindAddress(listenAddr); - // TcpServer::start() => Acceptor.listen() 如果有新用户连接 要执行一个回调(accept => connfd => 打包成Channel => 唤醒subloop) // baseloop监听到有事件发生 => acceptChannel_(listenfd) => 执行该回调函数 acceptChannel_.setReadCallback( diff --git a/Buffer.cc b/Buffer.cc index 890ca21..55f487d 100644 --- a/Buffer.cc +++ b/Buffer.cc @@ -33,13 +33,13 @@ ssize_t Buffer::readFd(int fd, int *saveErrno) vec[0].iov_len = writable; // 第二块缓冲区,指向栈空间 vec[1].iov_base = extrabuf; - vec[1].iov_len = sizeof extrabuf; + vec[1].iov_len = sizeof(extrabuf); // when there is enough space in this buffer, don't read into extrabuf. // when extrabuf is used, we read 128k-1 bytes at most. // 这里之所以说最多128k-1字节,是因为若writable为64k-1,那么需要两个缓冲区 第一个64k-1 第二个64k 所以做多128k-1 // 如果第一个缓冲区>=64k 那就只采用一个缓冲区 而不使用栈空间extrabuf[65536]的内容 - const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1; + const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1; const ssize_t n = ::readv(fd, vec, iovcnt); if (n < 0) @@ -55,6 +55,7 @@ ssize_t Buffer::readFd(int fd, int *saveErrno) writerIndex_ = buffer_.size(); append(extrabuf, n - writable); // 对buffer_扩容 并将extrabuf存储的另一部分数据追加至buffer_ } + return n; } // inputBuffer_.readFd表示将对端数据读到inputBuffer_中,移动writerIndex_指针 diff --git a/Buffer.h b/Buffer.h index 88fb571..367dd16 100644 --- a/Buffer.h +++ b/Buffer.h @@ -64,7 +64,7 @@ class Buffer void append(const char *data, size_t len) { ensureWritableBytes(len); - std::copy(data, data + len, beginWrite()); + std::copy(data, data+len, beginWrite()); writerIndex_ += len; } char *beginWrite() { return begin() + writerIndex_; } @@ -83,7 +83,7 @@ class Buffer void makeSpace(size_t len) { /** - * | kCheapPrepend |xxx|reader | writer | // xxx标示reader中已读的部分 + * | kCheapPrepend |xxx|reader | writer | // xxx标示reader中已读的部分 * | kCheapPrepend | len | **/ if (writableBytes() + prependableBytes() < len + kCheapPrepend) // 也就是说 len > xxx + writer的部分 @@ -92,8 +92,9 @@ class Buffer } else // 这里说明 len <= xxx + writer 把reader搬到从xxx开始 使得xxx后面是一段连续空间 { - size_t readable = readableBytes(); // readable = reader的长度 - std::copy(begin() + readerIndex_, begin() + writerIndex_, // 把这一部分数据拷贝到begin+kCheapPrepend起始处 + size_t readable = readableBytes(); // readable = reader的长度 + std::copy(begin() + readerIndex_, + begin() + writerIndex_, // 把这一部分数据拷贝到begin+kCheapPrepend起始处 begin() + kCheapPrepend); readerIndex_ = kCheapPrepend; writerIndex_ = readerIndex_ + readable; diff --git a/Channel.cc b/Channel.cc index 08ff156..d64a5cd 100644 --- a/Channel.cc +++ b/Channel.cc @@ -19,7 +19,9 @@ Channel::Channel(EventLoop *loop, int fd) { } -Channel::~Channel() {} +Channel::~Channel() +{ +} // channel的tie方法什么时候调用过? TcpConnection => channel /** @@ -80,7 +82,10 @@ void Channel::handleEventWithGuard(Timestamp receiveTime) // 错误 if (revents_ & EPOLLERR) { - errorCallback_(); + if (errorCallback_) + { + errorCallback_(); + } } // 读 if (revents_ & (EPOLLIN | EPOLLPRI)) diff --git a/EPollPoller.cc b/EPollPoller.cc index 92bbc1f..26217fe 100644 --- a/EPollPoller.cc +++ b/EPollPoller.cc @@ -12,7 +12,7 @@ const int kDeleted = 2; // 某个channel已经从Poller删除 EPollPoller::EPollPoller(EventLoop *loop) : Poller(loop) - , epollfd_(epoll_create1(EPOLL_CLOEXEC)) + , epollfd_(::epoll_create1(EPOLL_CLOEXEC)) , events_(kInitEventListSize) // vector(16) { if (epollfd_ < 0) @@ -72,7 +72,7 @@ void EPollPoller::updateChannel(Channel *channel) int fd = channel->fd(); channels_[fd] = channel; } - else // index == kDeleted + else // index == kAdd { } channel->set_index(kAdded); @@ -124,13 +124,13 @@ void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) void EPollPoller::update(int operation, Channel *channel) { epoll_event event; - ::memset(&event, 0, sizeof event); + ::memset(&event, 0, sizeof(event)); int fd = channel->fd(); event.events = channel->events(); - event.data.ptr = channel; event.data.fd = fd; + event.data.ptr = channel; if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) { diff --git a/EventLoop.cc b/EventLoop.cc index ff9cc26..9ef9c29 100644 --- a/EventLoop.cc +++ b/EventLoop.cc @@ -42,7 +42,8 @@ int createEventfd() } EventLoop::EventLoop() - : looping_(false), quit_(false) + : looping_(false) + , quit_(false) , callingPendingFunctors_(false) , threadId_(CurrentThread::tid()) , poller_(Poller::newDefaultPoller(this)) @@ -98,6 +99,7 @@ void EventLoop::loop() doPendingFunctors(); } LOG_INFO("EventLoop %p stop looping.\n", this); + looping_ = false; } /** @@ -155,10 +157,10 @@ void EventLoop::queueInLoop(Functor cb) void EventLoop::handleRead() { uint64_t one = 1; - ssize_t n = read(wakeupFd_, &one, sizeof one); - if (n != sizeof one) + ssize_t n = read(wakeupFd_, &one, sizeof(one)); + if (n != sizeof(one)) { - LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8", n); + LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8\n", n); } } @@ -166,8 +168,8 @@ void EventLoop::handleRead() void EventLoop::wakeup() { uint64_t one = 1; - ssize_t n = write(wakeupFd_, &one, sizeof one); - if (n != sizeof one) + ssize_t n = write(wakeupFd_, &one, sizeof(one)); + if (n != sizeof(one)) { LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8\n", n); } diff --git a/EventLoop.h b/EventLoop.h index a955111..e700d0d 100644 --- a/EventLoop.h +++ b/EventLoop.h @@ -44,6 +44,7 @@ class EventLoop : noncopyable // 判断EventLoop对象是否在自己的线程里 bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } // threadId_为EventLoop创建时的线程id CurrentThread::tid()为当前线程id + private: void handleRead(); // 给eventfd返回的文件描述符wakeupFd_绑定的事件回调 当wakeup()时 即有事件发生时 调用handleRead()读wakeupFd_的8字节 同时唤醒阻塞的epoll_wait void doPendingFunctors(); // 执行上层回调 diff --git a/InetAddress.cc b/InetAddress.cc index 48efb82..1e2b860 100644 --- a/InetAddress.cc +++ b/InetAddress.cc @@ -5,10 +5,10 @@ InetAddress::InetAddress(uint16_t port, std::string ip) { - bzero(&addr_, sizeof addr_); + ::memset(&addr_, 0, sizeof(addr_)); addr_.sin_family = AF_INET; - addr_.sin_port = htons(port); // 本地字节序转为网络字节序 - addr_.sin_addr.s_addr = inet_addr(ip.c_str()); + addr_.sin_port = ::htons(port); // 本地字节序转为网络字节序 + addr_.sin_addr.s_addr = ::inet_addr(ip.c_str()); } std::string InetAddress::toIp() const @@ -24,15 +24,15 @@ std::string InetAddress::toIpPort() const // ip:port char buf[64] = {0}; ::inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf); - size_t end = strlen(buf); - uint16_t port = ntohs(addr_.sin_port); - sprintf(buf + end, ":%u", port); + size_t end = ::strlen(buf); + uint16_t port = ::ntohs(addr_.sin_port); + sprintf(buf+end, ":%u", port); return buf; } uint16_t InetAddress::toPort() const { - return ntohs(addr_.sin_port); + return ::ntohs(addr_.sin_port); } #if 0 diff --git a/Logger.cc b/Logger.cc index 32e642c..abf2cea 100644 --- a/Logger.cc +++ b/Logger.cc @@ -25,7 +25,7 @@ void Logger::log(std::string msg) std::cout << "[INFO]"; break; case ERROR: - std::cout << "[ERROR]"; + std::cout << "[ERRIR]"; break; case FATAL: std::cout << "[FATAL]"; diff --git a/README.md b/README.md index 6f20fb1..00ce040 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ * gcc version 5.4.0 * cmake version 3.5.1 -项目编译执行`./build.sh`即可 +项目编译执行`./build.sh`即可,测试用例进入`example/`文件夹,`make`即可生成服务器测试用例 ## 功能介绍 diff --git a/Socket.cc b/Socket.cc index bfd859e..cee6df3 100644 --- a/Socket.cc +++ b/Socket.cc @@ -3,6 +3,7 @@ #include #include #include +#include #include "Socket.h" #include "Logger.h" @@ -33,7 +34,7 @@ int Socket::accept(InetAddress *peeraddr) { sockaddr_in addr; socklen_t len; - ::memset(&addr, 0, sizeof addr); + ::memset(&addr, 0, sizeof(addr)); int connfd = ::accept(sockfd_, (sockaddr *)&addr, &len); if (connfd >= 0) { @@ -53,20 +54,20 @@ void Socket::shutdownWrite() void Socket::setTcpNoDelay(bool on) { int optval = on ? 1 : 0; - ::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof optval); // TCP_NODELAY包含头文件 + ::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 } void Socket::setReuseAddr(bool on) { int optval = on ? 1 : 0; - ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval); // TCP_NODELAY包含头文件 + ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 } void Socket::setReusePort(bool on) { int optval = on ? 1 : 0; - ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval); // TCP_NODELAY包含头文件 + ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 } void Socket::setKeepAlive(bool on) { int optval = on ? 1 : 0; - ::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof optval); // TCP_NODELAY包含头文件 + ::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 } \ No newline at end of file diff --git a/TcpServer.cc b/TcpServer.cc index 3d7fbc8..32e7ba5 100644 --- a/TcpServer.cc +++ b/TcpServer.cc @@ -26,6 +26,7 @@ TcpServer::TcpServer(EventLoop *loop, , connectionCallback_() , messageCallback_() , nextConnId_(1) + , started_(0) { // 当有新用户连接时,Acceptor类中绑定的acceptChannel_会有读事件发生,执行handleRead()调用TcpServer::newConnection回调 acceptor_->setNewConnectionCallback( @@ -38,6 +39,7 @@ TcpServer::~TcpServer() { TcpConnectionPtr conn(item.second); item.second.reset(); // 把原始的智能指针复位 让栈空间的TcpConnectionPtr conn指向该对象 当conn出了其作用域 即可释放智能指针指向的对象 + // 销毁连接 conn->getLoop()->runInLoop( std::bind(&TcpConnection::connectDestroyed, conn)); } @@ -74,8 +76,8 @@ void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) // 通过sockfd获取其绑定的本机的ip地址和端口信息 sockaddr_in local; - ::memset(&local, 0, sizeof local); - socklen_t addrlen = sizeof local; + ::memset(&local, 0, sizeof(local)); + socklen_t addrlen = sizeof(local); if(::getsockname(sockfd, (sockaddr *)&local, &addrlen) < 0) { LOG_ERROR("sockets::getLocalAddr"); @@ -97,12 +99,14 @@ void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) conn->setCloseCallback( std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)); - ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); + ioLoop->runInLoop( + std::bind(&TcpConnection::connectEstablished, conn)); } void TcpServer::removeConnection(const TcpConnectionPtr &conn) { - loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn)); + loop_->runInLoop( + std::bind(&TcpServer::removeConnectionInLoop, this, conn)); } void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn) diff --git a/Thread.cc b/Thread.cc index 8a972ad..bd4c325 100644 --- a/Thread.cc +++ b/Thread.cc @@ -29,7 +29,7 @@ void Thread::start() // sem_t sem; sem_init(&sem, false, 0); // false指的是 不设置进程间共享 // 开启线程 - thread_ = std::shared_ptr(new std::thread([&]()->void { + thread_ = std::shared_ptr(new std::thread([&]() { tid_ = CurrentThread::tid(); // 获取线程的tid值 sem_post(&sem); func_(); // 开启一个新线程 专门执行该线程函数 diff --git a/example/Makefile b/example/Makefile new file mode 100644 index 0000000..ec8fa75 --- /dev/null +++ b/example/Makefile @@ -0,0 +1,5 @@ +testserver : + g++ -g -o testserver testserver.cc -lmymuduo -lpthread -std=c++11 + +clean : + rm -f testserver \ No newline at end of file diff --git a/example/testserver.cc b/example/testserver.cc new file mode 100644 index 0000000..b0d6b1e --- /dev/null +++ b/example/testserver.cc @@ -0,0 +1,61 @@ +#include + +#include +#include + +class EchoServer +{ +public: + EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name) + : server_(loop, addr, name) + , loop_(loop) + { + // 注册回调函数 + server_.setConnectionCallback( + std::bind(&EchoServer::onConnection, this, std::placeholders::_1)); + + server_.setMessageCallback( + std::bind(&EchoServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + + // 设置合适的subloop线程数量 + server_.setThreadNum(3); + } + void start() + { + server_.start(); + } + +private: + // 连接建立或断开的回调函数 + void onConnection(const TcpConnectionPtr &conn) + { + if (conn->connected()) + { + LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str()); + } + else + { + LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str()); + } + } + + // 可读写事件回调 + void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) + { + std::string msg = buf->retrieveAllAsString(); + conn->send(msg); + // conn->shutdown(); // 关闭写端 底层响应EPOLLHUP => 执行closeCallback_ + } + + EventLoop *loop_; + TcpServer server_; +}; + +int main() { + EventLoop loop; + InetAddress addr(8002); + EchoServer server(&loop, addr, "EchoServer"); + server.start(); + loop.loop(); + return 0; +} \ No newline at end of file