一、select函数
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
功能:用于同时监控多个文件描述符的 IO 状态,通过 select,程序可以在单个线程中处理多个 IO 事件,避免为每个 IO 操作创建单独的线程,从而提高资源利用率和并发性能。但是select只负责等待,拷贝需要read、recv、write、send等负责。
参数:
- nfds:需要检查的最大文件描述符值 + 1
- readfds:输入输出型参数,fd_set本质上是一张位图
- 输入时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:用户告诉内核,是否监控对应文件描述符的读事件
- 输出时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:内核告诉用户,对应文件描述符的读事件 是否就绪
- 输入时:
- writefds:输入输出型参数,fd_set本质上是一张位图
- 输入时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:用户告诉内核,是否监控对应文件描述符的写事件
- 输出时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:内核告诉用户,对应文件描述符的写事件 是否就绪
- 输入时:
- exceptfds:输入输出型参数,fd_set本质上是一张位图
- 输入时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:用户告诉内核,是否监控对应文件描述符的异常事件
- 输出时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:内核告诉用户,对应文件描述符的异常事件 是否就绪
- 输入时:
- timeout:超时时间,控制 select 的阻塞行为:
- NULL:永久阻塞,直到有文件描述符就绪
- 0:立即返回(非阻塞模式)
- >0:指定超时时间(秒 + 微秒),超时后返回。
返回值:
- 正数:表示就绪的文件描述符总数
- 0:表示超时(无文件描述符就绪)
- -1:表示错误,并设置 errno(如 EINTR 表示被信号中断)
操作系统不建议用户直接修改fd_set位图,所以操作系统提供以下宏操作,来操作fd_set位图。
FD_ZERO(fd_set *set); // 清空集合
FD_SET(int fd, fd_set *set); // 将 fd 添加到集合
FD_CLR(int fd, fd_set *set); // 从集合中移除 fd
FD_ISSET(int fd, fd_set *set); // 检查 fd 是否在集合中(就绪时返回非零)
二、select的优缺点
- 优点
- select只负责等待,可以等待多个文件描述符,在IO的时候效率会比较高
- 缺点
- 使用select的时候,用户每次都需要对select的参数进行重置
- 在编写代码的时候,select需要用到第三方数组,会充满遍历,可能会影响select的效率
- 用户到内核,内核到用户,每次select的调用和返回,都需要对位图进行重置操作;用户和内核之间,需要一直进行数据拷贝
- select会让操作系统在底层遍历要关心的所有文件描述符,会导致效率降低
- fd_set是操作系统提供的一个类型,本身是一个位图,fd_set的大小是固定的,也就是fd_set的比特位位数是有上线的,所以select能够检测文件描述符的个数也是有限的
三、实现select服务器(只关心读事件)
3.1 Log.hpp(日志)
#pragma once
#include "LockGuard.hpp"
#include <iostream>
#include <string>
#include <stdarg.h>
#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
using namespace std;
// 日志等级
enum
{
Debug = 0, // 调试
Info, // 正常
Warning, // 警告
Error, // 错误,但程序并未直接退出
Fatal // 程序直接挂掉
};
enum
{
Screen = 10, // 打印到显示器上
OneFile, // 打印到一个文件中
ClassFile // 按照日志等级打印到不同的文件中
};
string LevelToString(int level)
{
switch (level)
{
case Debug:
return "Debug";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Unknow";
}
}
const char *default_filename = "log.";
const int default_style = Screen;
const char *defaultdir = "log";
class Log
{
public:
Log()
: style(default_style), filename(default_filename)
{
// mkdir(defaultdir,0775);
pthread_mutex_init(&_log_mutex, nullptr);
}
void SwitchStyle(int sty)
{
style = sty;
}
void WriteLogToOneFile(const string &logname, const string &logmessage)
{
int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);
if (fd == -1)
return;
{
LockGuard lockguard(&_log_mutex);
write(fd, logmessage.c_str(), logmessage.size());
}
close(fd);
}
void WriteLogToClassFile(const string &levelstr, const string &logmessage)
{
mkdir(defaultdir, 0775);
string name = defaultdir;
name += "/";
name += filename;
name += levelstr;
WriteLogToOneFile(name, logmessage);
}
void WriteLog(int level, const string &logmessage)
{
switch (style)
{
case Screen:
{
LockGuard lockguard(&_log_mutex);
cout << logmessage;
}
break;
case OneFile:
WriteLogToClassFile("All", logmessage);
break;
case ClassFile:
WriteLogToClassFile(LevelToString(level), logmessage);
break;
default:
break;
}
}
string GetTime()
{
time_t CurrentTime = time(nullptr);
struct tm *curtime = localtime(&CurrentTime);
char time[128];
// localtime 的年是从1900开始的,所以要加1900, 月是从0开始的所以加1
snprintf(time, sizeof(time), "%d-%d-%d %d:%d:%d",
curtime->tm_year + 1900, curtime->tm_mon + 1, curtime->tm_mday,
curtime->tm_hour, curtime->tm_min, curtime->tm_sec);
return time;
return "";
}
void LogMessage(int level, const char *format, ...)
{
char left[1024];
string Levelstr = LevelToString(level).c_str();
string Timestr = GetTime().c_str();
string Idstr = to_string(getpid());
snprintf(left, sizeof(left), "[%s][%s][%s] ",
Levelstr.c_str(), Timestr.c_str(), Idstr.c_str());
va_list args;
va_start(args, format);
char right[1024];
vsnprintf(right, sizeof(right), format, args);
string logmessage = left;
logmessage += right;
WriteLog(level, logmessage);
va_end(args);
}
~Log()
{
pthread_mutex_destroy(&_log_mutex);
};
private:
int style;
string filename;
pthread_mutex_t _log_mutex;
};
Log lg;
class Conf
{
public:
Conf()
{
lg.SwitchStyle(Screen);
}
~Conf()
{
}
};
Conf conf;
3.2 Lockguard.hpp(自动管理锁)
#pragma once
#include <iostream>
class Mutex
{
public:
Mutex(pthread_mutex_t* lock)
:pmutex(lock)
{}
void Lock()
{
pthread_mutex_lock(pmutex);
}
void Unlock()
{
pthread_mutex_unlock(pmutex);
}
~Mutex()
{}
public:
pthread_mutex_t* pmutex;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t* lock)
:mutex(lock)
{
mutex.Lock();
}
~LockGuard()
{
mutex.Unlock();
}
public:
Mutex mutex;
};
3.3 Socket.hpp(封装套接字)
#pragma once
#include <iostream>
#include <string>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#define CONV(addrptr) (struct sockaddr*)addrptr
enum{
Socket_err = 1,
Bind_err,
Listen_err
};
const static int defalutsockfd = -1;
const int defalutbacklog = 5;
class Socket
{
public:
virtual ~Socket(){};
virtual void CreateSocketOrDie() = 0;
virtual void BindSocketOrDie(uint16_t port) = 0;
virtual void ListenSocketOrDie(int backlog) = 0;
virtual Socket* AcceptConnection(std::string* ip , uint16_t* port) = 0;
virtual bool ConnectServer(const std::string& serverip , uint16_t serverport) = 0;
virtual int GetSockFd() = 0;
virtual void SetSockFd(int sockfd) = 0;
virtual void CloseSockFd() = 0;
virtual bool Recv(std::string& buffer,int size) = 0;
virtual void Send(const std::string& send_string) = 0;
public:
void BuildListenSocketMethod(uint16_t port,int backlog = defalutbacklog)
{
CreateSocketOrDie();
BindSocketOrDie(port);
ListenSocketOrDie(backlog);
}
bool BuildConnectSocketMethod(const std::string& serverip , uint16_t serverport)
{
CreateSocketOrDie();
return ConnectServer(serverip,serverport);
}
void BuildNormalSocketMethod(int sockfd)
{
SetSockFd(sockfd);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket(int sockfd = defalutsockfd)
:_sockfd(sockfd)
{}
~TcpSocket(){};
void CreateSocketOrDie() override
{
_sockfd = ::socket(AF_INET,SOCK_STREAM,0);
if(_sockfd < 0) exit(Socket_err);
}
void BindSocketOrDie(uint16_t port) override
{
struct sockaddr_in addr;
memset(&addr,0,sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
socklen_t len = sizeof(addr);
int n = ::bind(_sockfd,CONV(&addr),len);
if(n < 0) exit(Bind_err);
}
void ListenSocketOrDie(int backlog) override
{
int n = ::listen(_sockfd,backlog);
if(n < 0) exit(Listen_err);
}
Socket* AcceptConnection(std::string* clientip , uint16_t* clientport) override
{
struct sockaddr_in client;
memset(&client,0,sizeof(client));
socklen_t len = sizeof(client);
int fd = ::accept(_sockfd,CONV(&client),&len);
if(fd < 0) return nullptr;
char buffer[64];
inet_ntop(AF_INET,&client.sin_addr,buffer,len);
*clientip = buffer;
*clientport = ntohs(client.sin_port);
Socket* s = new TcpSocket(fd);
return s;
}
bool ConnectServer(const std::string& serverip , uint16_t serverport) override
{
struct sockaddr_in server;
memset(&server,0,sizeof(server));
server.sin_family = AF_INET;
// server.sin_addr.s_addr = inet_addr(serverip.c_str());
inet_pton(AF_INET,serverip.c_str(),&server.sin_addr);
server.sin_port = htons(serverport);
socklen_t len = sizeof(server);
int n = connect(_sockfd,CONV(&server),len);
if(n < 0) return false;
else return true;
}
int GetSockFd() override
{
return _sockfd;
}
void SetSockFd(int sockfd) override
{
_sockfd = sockfd;
}
void CloseSockFd() override
{
if(_sockfd > defalutsockfd)
{
close(_sockfd);
}
}
bool Recv(std::string& buffer , int size)override
{
char inbuffer[size];
int n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);
if(n > 0)
{
inbuffer[n] = 0;
}
else
{
return false;
}
buffer += inbuffer;
return true;
}
void Send(const std::string& send_string)
{
send(_sockfd,send_string.c_str(),send_string.size(),0);
}
private:
int _sockfd;
};
3.4 SelectServer.hpp(服务端封装)
#pragma once
#include <iostream>
#include <string>
#include <sys/select.h>
#include "Socket.hpp"
#include "Log.hpp"
#include <memory>
using namespace std;
const static uint16_t defalutport = 8888;
const static int gbacklog = 8;
const static int num = sizeof(fd_set) * 8;
class SelectServer
{
private:
void HandlerEvent(fd_set &rfds)
{
for (int i = 0; i < num; i++)
{
// 是否监控
if (!_rfds_array[i])
continue;
// 是否就绪
int fd = _rfds_array[i]->GetSockFd();
if (FD_ISSET(fd, &rfds))
{
// 是新连接到来,还是新数据到来
// 新连接到来
if (FD_ISSET(_listensock->GetSockFd(), &rfds))
{
lg.LogMessage(Info, "get a new link\n");
string clientip;
uint16_t cilentport;
// 由于select已经检测到listensock已经就绪了,这里不会阻塞
Socket *sock = _listensock->AcceptConnection(&clientip, &cilentport);
if (!sock)
{
lg.LogMessage(Error, "accept error\n");
continue;
}
lg.LogMessage(Info, "get a client , client info# %s %d , fd:%d\n", clientip.c_str(), cilentport, sock->GetSockFd());
// 这里已经获取连接成功,由于底层数据不一定就绪
// 所以这里需要将新连接的文件描述符交给select托管
// 只需将文件描述符加入到_rfds_array即可
int pos = 0;
for (; pos < num; pos++)
{
if (_rfds_array[pos] == nullptr)
{
_rfds_array[pos] = sock;
break;
}
}
// fd_set能够存储的文件描述符是有上限的
if(pos == num)
{
sock->CloseSockFd();
delete sock;
lg.LogMessage(Warning, "server is full...!\n");
}
}
else
{ // 是新数据来了
// 这里读是有问题的
string buffer;
bool flag = _rfds_array[i]->Recv(buffer,1024);
if(flag) // 读取成功
{
lg.LogMessage(Info,"client say# %s\n",buffer.c_str());
}
else // 读取失败
{
lg.LogMessage(Warning,"cilent quit !! close fd : %d\n",fd);
_rfds_array[i]->CloseSockFd();
delete _rfds_array[i];
_rfds_array[i] = nullptr;
}
}
}
}
}
public:
SelectServer(uint16_t port = defalutport)
: _port(port), _listensock(new TcpSocket()), _isrunning(false)
{
}
void Init()
{
_listensock->BuildListenSocketMethod(_port, gbacklog);
for (int i = 0; i < num; i++)
{
_rfds_array[i] = nullptr;
}
_rfds_array[0] = _listensock.get();
}
void Loop()
{
_isrunning = true;
while (_isrunning)
{
fd_set rfds;
FD_ZERO(&rfds);
PrintDebug();
// 设置需要监控的读事件文件描述符集,并找到最大的文件描述符
int max_fd = _listensock->GetSockFd();
for (int i = 0; i < num; i++)
{
if (_rfds_array[i] == nullptr)
continue;
else
{
int fd = _rfds_array[i]->GetSockFd();
// rfds本质是一个输入输出型参数,rfds是在select调用返回的时候,不断被修改,所以,每次都要重置
FD_SET(fd, &rfds);
if (max_fd < fd)
{
max_fd = fd;
}
}
}
struct timeval timeout = {0, 0};
ssize_t n = select(max_fd + 1, &rfds, nullptr, nullptr, &timeout);
switch (n)
{
case -1:
{
lg.LogMessage(Fatal, "select Error , last time : %u.%u\n", timeout.tv_sec, timeout.tv_usec);
break;
}
case 0:
{
lg.LogMessage(Info, "select timeout... , last time : %u.%u\n", timeout.tv_sec, timeout.tv_usec);
break;
}
default:
{
lg.LogMessage(Info, "select success , begin handler event , last time : %u.%u\n", timeout.tv_sec, timeout.tv_usec);
HandlerEvent(rfds);
break;
}
}
}
_isrunning = false;
}
void Stop()
{
_isrunning = false;
}
// 查看当前哪些文件描述符需要被监控
void PrintDebug()
{
std::cout << "current select rfds list is : ";
for (int i = 0; i < num; i++)
{
if (_rfds_array[i] == nullptr)
continue;
else
std::cout << _rfds_array[i]->GetSockFd() << " ";
}
std::cout << std::endl;
}
~SelectServer() {}
private:
unique_ptr<Socket> _listensock;
uint16_t _port;
bool _isrunning;
// Select服务器需要所有的fd以数据结构的方式组织起来
Socket *_rfds_array[num];
};
3.5 Main.cpp(服务端)
#include <iostream>
#include <memory>
#include "SelectServer.hpp"
using namespace std;
// ./selectserver port
int main(int argc , char* argv[])
{
if(argc != 2)
{
cout << "Usage : " << argv[0] << " port" << endl;
exit(0);
}
uint16_t localport = stoi(argv[1]);
unique_ptr<SelectServer> svr = make_unique<SelectServer>(localport);
svr->Init();
svr->Loop();
return 0;
}
结尾
如果有什么建议和疑问,或是有什么错误,大家可以在评论区中提出。
希望大家以后也能和我一起进步!!🌹🌹
如果这篇文章对你有用的话,希望大家给一个三连支持一下!!🌹🌹