Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
S1mpleBug committed Mar 2, 2022
1 parent 9f3ae3c commit ad83bc3
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 38 deletions.
1 change: 0 additions & 1 deletion Acceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reusepor
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr);

// TcpServer::start() => Acceptor.listen() 如果有新用户连接 要执行一个回调(accept => connfd => 打包成Channel => 唤醒subloop)
// baseloop监听到有事件发生 => acceptChannel_(listenfd) => 执行该回调函数
acceptChannel_.setReadCallback(
Expand Down
5 changes: 3 additions & 2 deletions Buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ ssize_t Buffer::readFd(int fd, int *saveErrno)
vec[0].iov_len = writable;
// 第二块缓冲区,指向栈空间
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
vec[1].iov_len = sizeof(extrabuf);

// when there is enough space in this buffer, don't read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
// 这里之所以说最多128k-1字节,是因为若writable为64k-1,那么需要两个缓冲区 第一个64k-1 第二个64k 所以做多128k-1
// 如果第一个缓冲区>=64k 那就只采用一个缓冲区 而不使用栈空间extrabuf[65536]的内容
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);

if (n < 0)
Expand All @@ -55,6 +55,7 @@ ssize_t Buffer::readFd(int fd, int *saveErrno)
writerIndex_ = buffer_.size();
append(extrabuf, n - writable); // 对buffer_扩容 并将extrabuf存储的另一部分数据追加至buffer_
}
return n;
}

// inputBuffer_.readFd表示将对端数据读到inputBuffer_中,移动writerIndex_指针
Expand Down
9 changes: 5 additions & 4 deletions Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Buffer
void append(const char *data, size_t len)
{
ensureWritableBytes(len);
std::copy(data, data + len, beginWrite());
std::copy(data, data+len, beginWrite());
writerIndex_ += len;
}
char *beginWrite() { return begin() + writerIndex_; }
Expand All @@ -83,7 +83,7 @@ class Buffer
void makeSpace(size_t len)
{
/**
* | kCheapPrepend |xxx|reader | writer | // xxx标示reader中已读的部分
* | kCheapPrepend |xxx|reader | writer | // xxx标示reader中已读的部分
* | kCheapPrepend | len |
**/
if (writableBytes() + prependableBytes() < len + kCheapPrepend) // 也就是说 len > xxx + writer的部分
Expand All @@ -92,8 +92,9 @@ class Buffer
}
else // 这里说明 len <= xxx + writer 把reader搬到从xxx开始 使得xxx后面是一段连续空间
{
size_t readable = readableBytes(); // readable = reader的长度
std::copy(begin() + readerIndex_, begin() + writerIndex_, // 把这一部分数据拷贝到begin+kCheapPrepend起始处
size_t readable = readableBytes(); // readable = reader的长度
std::copy(begin() + readerIndex_,
begin() + writerIndex_, // 把这一部分数据拷贝到begin+kCheapPrepend起始处
begin() + kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
Expand Down
9 changes: 7 additions & 2 deletions Channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ Channel::Channel(EventLoop *loop, int fd)
{
}

Channel::~Channel() {}
Channel::~Channel()
{
}

// channel的tie方法什么时候调用过? TcpConnection => channel
/**
Expand Down Expand Up @@ -80,7 +82,10 @@ void Channel::handleEventWithGuard(Timestamp receiveTime)
// 错误
if (revents_ & EPOLLERR)
{
errorCallback_();
if (errorCallback_)
{
errorCallback_();
}
}
//
if (revents_ & (EPOLLIN | EPOLLPRI))
Expand Down
8 changes: 4 additions & 4 deletions EPollPoller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const int kDeleted = 2; // 某个channel已经从Poller删除

EPollPoller::EPollPoller(EventLoop *loop)
: Poller(loop)
, epollfd_(epoll_create1(EPOLL_CLOEXEC))
, epollfd_(::epoll_create1(EPOLL_CLOEXEC))
, events_(kInitEventListSize) // vector<epoll_event>(16)
{
if (epollfd_ < 0)
Expand Down Expand Up @@ -72,7 +72,7 @@ void EPollPoller::updateChannel(Channel *channel)
int fd = channel->fd();
channels_[fd] = channel;
}
else // index == kDeleted
else // index == kAdd
{
}
channel->set_index(kAdded);
Expand Down Expand Up @@ -124,13 +124,13 @@ void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels)
void EPollPoller::update(int operation, Channel *channel)
{
epoll_event event;
::memset(&event, 0, sizeof event);
::memset(&event, 0, sizeof(event));

int fd = channel->fd();

event.events = channel->events();
event.data.ptr = channel;
event.data.fd = fd;
event.data.ptr = channel;

if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
Expand Down
14 changes: 8 additions & 6 deletions EventLoop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ int createEventfd()
}

EventLoop::EventLoop()
: looping_(false), quit_(false)
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, threadId_(CurrentThread::tid())
, poller_(Poller::newDefaultPoller(this))
Expand Down Expand Up @@ -98,6 +99,7 @@ void EventLoop::loop()
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping.\n", this);
looping_ = false;
}

/**
Expand Down Expand Up @@ -155,19 +157,19 @@ void EventLoop::queueInLoop(Functor cb)
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
ssize_t n = read(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8", n);
LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8\n", n);
}
}

// 用来唤醒loop所在线程 向wakeupFd_写一个数据 wakeupChannel就发生读事件 当前loop线程就会被唤醒
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8\n", n);
}
Expand Down
1 change: 1 addition & 0 deletions EventLoop.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class EventLoop : noncopyable

// 判断EventLoop对象是否在自己的线程里
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } // threadId_为EventLoop创建时的线程id CurrentThread::tid()为当前线程id

private:
void handleRead(); // 给eventfd返回的文件描述符wakeupFd_绑定的事件回调 当wakeup()时 即有事件发生时 调用handleRead()读wakeupFd_的8字节 同时唤醒阻塞的epoll_wait
void doPendingFunctors(); // 执行上层回调
Expand Down
14 changes: 7 additions & 7 deletions InetAddress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

InetAddress::InetAddress(uint16_t port, std::string ip)
{
bzero(&addr_, sizeof addr_);
::memset(&addr_, 0, sizeof(addr_));
addr_.sin_family = AF_INET;
addr_.sin_port = htons(port); // 本地字节序转为网络字节序
addr_.sin_addr.s_addr = inet_addr(ip.c_str());
addr_.sin_port = ::htons(port); // 本地字节序转为网络字节序
addr_.sin_addr.s_addr = ::inet_addr(ip.c_str());
}

std::string InetAddress::toIp() const
Expand All @@ -24,15 +24,15 @@ std::string InetAddress::toIpPort() const
// ip:port
char buf[64] = {0};
::inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);
size_t end = strlen(buf);
uint16_t port = ntohs(addr_.sin_port);
sprintf(buf + end, ":%u", port);
size_t end = ::strlen(buf);
uint16_t port = ::ntohs(addr_.sin_port);
sprintf(buf+end, ":%u", port);
return buf;
}

uint16_t InetAddress::toPort() const
{
return ntohs(addr_.sin_port);
return ::ntohs(addr_.sin_port);
}

#if 0
Expand Down
2 changes: 1 addition & 1 deletion Logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void Logger::log(std::string msg)
std::cout << "[INFO]";
break;
case ERROR:
std::cout << "[ERROR]";
std::cout << "[ERRIR]";
break;
case FATAL:
std::cout << "[FATAL]";
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* gcc version 5.4.0
* cmake version 3.5.1

项目编译执行`./build.sh`即可
项目编译执行`./build.sh`即可,测试用例进入`example/`文件夹,`make`即可生成服务器测试用例

## 功能介绍

Expand Down
11 changes: 6 additions & 5 deletions Socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <sys/socket.h>
#include <string.h>
#include <netinet/tcp.h>
#include <sys/socket.h>

#include "Socket.h"
#include "Logger.h"
Expand Down Expand Up @@ -33,7 +34,7 @@ int Socket::accept(InetAddress *peeraddr)
{
sockaddr_in addr;
socklen_t len;
::memset(&addr, 0, sizeof addr);
::memset(&addr, 0, sizeof(addr));
int connfd = ::accept(sockfd_, (sockaddr *)&addr, &len);
if (connfd >= 0)
{
Expand All @@ -53,20 +54,20 @@ void Socket::shutdownWrite()
void Socket::setTcpNoDelay(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof optval); // TCP_NODELAY包含头文件 <netinet/tcp.h>
::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 <netinet/tcp.h>
}
void Socket::setReuseAddr(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval); // TCP_NODELAY包含头文件 <netinet/tcp.h>
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 <netinet/tcp.h>
}
void Socket::setReusePort(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval); // TCP_NODELAY包含头文件 <netinet/tcp.h>
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 <netinet/tcp.h>
}
void Socket::setKeepAlive(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof optval); // TCP_NODELAY包含头文件 <netinet/tcp.h>
::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)); // TCP_NODELAY包含头文件 <netinet/tcp.h>
}
12 changes: 8 additions & 4 deletions TcpServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ TcpServer::TcpServer(EventLoop *loop,
, connectionCallback_()
, messageCallback_()
, nextConnId_(1)
, started_(0)
{
// 当有新用户连接时,Acceptor类中绑定的acceptChannel_会有读事件发生,执行handleRead()调用TcpServer::newConnection回调
acceptor_->setNewConnectionCallback(
Expand All @@ -38,6 +39,7 @@ TcpServer::~TcpServer()
{
TcpConnectionPtr conn(item.second);
item.second.reset(); // 把原始的智能指针复位 让栈空间的TcpConnectionPtr conn指向该对象 当conn出了其作用域 即可释放智能指针指向的对象
// 销毁连接
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
Expand Down Expand Up @@ -74,8 +76,8 @@ void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)

// 通过sockfd获取其绑定的本机的ip地址和端口信息
sockaddr_in local;
::memset(&local, 0, sizeof local);
socklen_t addrlen = sizeof local;
::memset(&local, 0, sizeof(local));
socklen_t addrlen = sizeof(local);
if(::getsockname(sockfd, (sockaddr *)&local, &addrlen) < 0)
{
LOG_ERROR("sockets::getLocalAddr");
Expand All @@ -97,12 +99,14 @@ void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));

ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
ioLoop->runInLoop(
std::bind(&TcpConnection::connectEstablished, conn));
}

void TcpServer::removeConnection(const TcpConnectionPtr &conn)
{
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
loop_->runInLoop(
std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)
Expand Down
2 changes: 1 addition & 1 deletion Thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void Thread::start() //
sem_t sem;
sem_init(&sem, false, 0); // false指的是 不设置进程间共享
// 开启线程
thread_ = std::shared_ptr<std::thread>(new std::thread([&]()->void {
thread_ = std::shared_ptr<std::thread>(new std::thread([&]() {
tid_ = CurrentThread::tid(); // 获取线程的tid值
sem_post(&sem);
func_(); // 开启一个新线程 专门执行该线程函数
Expand Down
5 changes: 5 additions & 0 deletions example/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
testserver :
g++ -g -o testserver testserver.cc -lmymuduo -lpthread -std=c++11

clean :
rm -f testserver
61 changes: 61 additions & 0 deletions example/testserver.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include <string>

#include <mymuduo/TcpServer.h>
#include <mymuduo/Logger.h>

class EchoServer
{
public:
EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 注册回调函数
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1));

server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

// 设置合适的subloop线程数量
server_.setThreadNum(3);
}
void start()
{
server_.start();
}

private:
// 连接建立或断开的回调函数
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());
}
else
{
LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
}
}

// 可读写事件回调
void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time)
{
std::string msg = buf->retrieveAllAsString();
conn->send(msg);
// conn->shutdown(); // 关闭写端 底层响应EPOLLHUP => 执行closeCallback_
}

EventLoop *loop_;
TcpServer server_;
};

int main() {
EventLoop loop;
InetAddress addr(8002);
EchoServer server(&loop, addr, "EchoServer");
server.start();
loop.loop();
return 0;
}

0 comments on commit ad83bc3

Please sign in to comment.