Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
S1mpleBug authored Mar 1, 2022
1 parent 9a99be0 commit b1cc2ae
Show file tree
Hide file tree
Showing 86 changed files with 6,902 additions and 0 deletions.
72 changes: 72 additions & 0 deletions Acceptor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <unistd.h>

#include "Acceptor.h"
#include "Logger.h"
#include "InetAddress.h"

static int createNonblocking()
{
int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
if (sockfd < 0)
{
LOG_FATAL("%s:%s:%d listen socket create err:%d\n", __FILE__, __FUNCTION__, __LINE__, errno);
}
return sockfd;
}

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop)
, acceptSocket_(createNonblocking())
, acceptChannel_(loop, acceptSocket_.fd())
, listenning_(false)
{
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr);

// TcpServer::start() => Acceptor.listen() 如果有新用户连接 要执行一个回调(accept => connfd => 打包成Channel => 唤醒subloop)
// baseloop监听到有事件发生 => acceptChannel_(listenfd) => 执行该回调函数
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

Acceptor::~Acceptor()
{
acceptChannel_.disableAll(); // 把从Poller中感兴趣的事件删除掉
acceptChannel_.remove(); // 调用EventLoop->removeChannel => Poller->removeChannel 把Poller的ChannelMap对应的部分删除
}

void Acceptor::listen()
{
listenning_ = true;
acceptSocket_.listen(); // listen
acceptChannel_.enableReading(); // acceptChannel_注册至Poller !重要
}

// listenfd有事件发生了,就是有新用户连接了
void Acceptor::handleRead()
{
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
if (NewConnectionCallback_)
{
NewConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop 唤醒并分发当前的新客户端的Channel
}
else
{
::close(connfd);
}
}
else
{
LOG_ERROR("%s:%s:%d accept err:%d\n", __FILE__, __FUNCTION__, __LINE__, errno);
if (errno == EMFILE)
{
LOG_ERROR("%s:%s:%d sockfd reached limit\n", __FILE__, __FUNCTION__, __LINE__);
}
}
}
32 changes: 32 additions & 0 deletions Acceptor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <functional>

#include "noncopyable.h"
#include "Socket.h"
#include "Channel.h"

class EventLoop;
class InetAddress;

class Acceptor : noncopyable
{
public:
using NewConnectionCallback = std::function<void(int sockfd, const InetAddress &)>;

Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport);
~Acceptor();

void setNewConnectionCallback(const NewConnectionCallback &cb) { NewConnectionCallback_ = cb; }

bool listenning() const { return listenning_; }
void listen();
private:
void handleRead();

EventLoop *loop_; // Acceptor用的就是用户定义的那个baseLoop 也称作mainLoop
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback NewConnectionCallback_;
bool listenning_;
};
70 changes: 70 additions & 0 deletions Buffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include <errno.h>
#include <sys/uio.h>
#include <unistd.h>

#include "Buffer.h"

/**
* 从fd上读取数据 Poller工作在LT模式
* Buffer缓冲区是有大小的! 但是从fd上读取数据的时候 却不知道tcp数据的最终大小
*
* @description: 从socket读到缓冲区的方法是使用readv先读至buffer_,
* Buffer_空间如果不够会读入到栈上65536个字节大小的空间,然后以append的
* 方式追加入buffer_。既考虑了避免系统调用带来开销,又不影响数据的接收。
**/
ssize_t Buffer::readFd(int fd, int *saveErrno)
{
// 栈额外空间,用于从套接字往出读时,当buffer_暂时不够用时暂存数据,待buffer_重新分配足够空间后,在把数据交换给buffer_。
char extrabuf[65536] = {0}; // 栈上内存空间 65536/1024 = 64KB

/*
struct iovec {
ptr_t iov_base; // iov_base指向的缓冲区存放的是readv所接收的数据或是writev将要发送的数据
size_t iov_len; // iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度
};
*/

// 使用iovec分配两个连续的缓冲区
struct iovec vec[2];
const size_t writable = writableBytes(); // 这是Buffer底层缓冲区剩余的可写空间大小 不一定能完全存储从fd读出的数据

// 第一块缓冲区,指向可写空间
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
// 第二块缓冲区,指向栈空间
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;

// when there is enough space in this buffer, don't read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
// 这里之所以说最多128k-1字节,是因为若writable为64k-1,那么需要两个缓冲区 第一个64k-1 第二个64k 所以做多128k-1
// 如果第一个缓冲区>=64k 那就只采用一个缓冲区 而不使用栈空间extrabuf[65536]的内容
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);

if (n < 0)
{
*saveErrno = errno;
}
else if (n <= writable) // Buffer的可写缓冲区已经够存储读出来的数据了
{
writerIndex_ += n;
}
else // extrabuf里面也写入了n-writable长度的数据
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable); // 对buffer_扩容 并将extrabuf存储的另一部分数据追加至buffer_
}
}

// inputBuffer_.readFd表示将对端数据读到inputBuffer_中,移动writerIndex_指针
// outputBuffer_.writeFd标示将数据写入到outputBuffer_中,从readerIndex_开始,可以写readableBytes()个字节
ssize_t Buffer::writeFd(int fd, int *saveErrno)
{
ssize_t n = ::write(fd, peek(), readableBytes());
if (n < 0)
{
*saveErrno = errno;
}
return n;
}
106 changes: 106 additions & 0 deletions Buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#pragma once

#include <vector>
#include <string>
#include <algorithm>
#include <stddef.h>

// 网络库底层的缓冲区类型定义
class Buffer
{
public:
static const size_t kCheapPrepend = 8;
static const size_t kInitialSize = 1024;

explicit Buffer(size_t initalSize = kInitialSize)
: buffer_(kCheapPrepend + initalSize)
, readerIndex_(kCheapPrepend)
, writerIndex_(kCheapPrepend)
{
}

size_t readableBytes() const { return writerIndex_ - readerIndex_; }
size_t writableBytes() const { return buffer_.size() - writerIndex_; }
size_t prependableBytes() const { return readerIndex_; }

// 返回缓冲区中可读数据的起始地址
const char *peek() const { return begin() + readerIndex_; }
void retrieve(size_t len)
{
if (len < readableBytes())
{
readerIndex_ += len; // 说明应用只读取了可读缓冲区数据的一部分,就是len长度 还剩下readerIndex+=len到writerIndex_的数据未读
}
else // len == readableBytes()
{
retrieveAll();
}
}
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}

// 把onMessage函数上报的Buffer数据 转成string类型的数据返回
std::string retrieveAllAsString() { return retrieveAsString(readableBytes()); }
std::string retrieveAsString(size_t len)
{
std::string result(peek(), len);
retrieve(len); // 上面一句把缓冲区中可读的数据已经读取出来 这里肯定要对缓冲区进行复位操作
return result;
}

// buffer_.size - writerIndex_
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len); // 扩容
}
}

// 把[data, data+len]内存上的数据添加到writable缓冲区当中
void append(const char *data, size_t len)
{
ensureWritableBytes(len);
std::copy(data, data+len, beginWrite());
writerIndex_ += len;
}
char *beginWrite() { return begin() + writerIndex_; }
const char *beginWrite() const { return begin() + writerIndex_; }

// 从fd上读取数据
ssize_t readFd(int fd, int *saveErrno);
// 通过fd发送数据
ssize_t writeFd(int fd, int *saveErrno);

private:
// vector底层数组首元素的地址 也就是数组的起始地址
char *begin() { return &*buffer_.begin(); }
const char *begin() const { return &*buffer_.begin(); }

void makeSpace(size_t len)
{
/**
* | kCheapPrepend |xxx|reader | writer | // xxx标示reader中已读的部分
* | kCheapPrepend | len |
**/
if (writableBytes() + prependableBytes() < len + kCheapPrepend) // 也就是说 len > xxx + writer的部分
{
buffer_.resize(writerIndex_ + len);
}
else // 这里说明 len <= xxx + writer 把reader搬到从xxx开始 使得xxx后面是一段连续空间
{
size_t readable = readableBytes(); // readable = reader的长度
std::copy(begin()+readerIndex_, begin()+writerIndex_, // 把这一部分数据拷贝到begin+kCheapPrepend起始处
begin()+kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_+readable;
}
}

std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
};
14 changes: 14 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
cmake_minimum_required(VERSION 2.5)
project(mymuduo)

# mymuduo最终编译成so动态库 设置动态库的路径 放置项目根目录的lib文件夹下面
set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)

# 设置调试信息 以及启动C++11语言标准
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -std=c++11")

# 定义参与编译的源代码文件
aux_source_directory(. SRC_LIST)

# 编译动态库
add_library(mymuduo SHARED ${SRC_LIST})
18 changes: 18 additions & 0 deletions Callbacks.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <memory>
#include <functional>

class Buffer;
class TcpConnection;
class Timestamp;

using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void(const TcpConnectionPtr &)>;
using CloseCallback = std::function<void(const TcpConnectionPtr &)>;
using WriteCompleteCallback = std::function<void(const TcpConnectionPtr &)>;
using HighWaterMarkCallback = std::function<void(const TcpConnectionPtr &, size_t)>;

using MessageCallback = std::function<void(const TcpConnectionPtr &,
Buffer *,
Timestamp)>;
Loading

0 comments on commit b1cc2ae

Please sign in to comment.