转眼两个月就过去了,《深入理解计算机系统》这本书大致翻完一遍,硬件部分知识我选择跳过了,与一些比较硬核的东西,如MMU的虚拟地址翻译,翻译成对应的vpn、vp0、tlbt、tlbi这些只是大概了解,并没有去记住翻译的每一个过程,了解其方式以后想了解就再去看。
在书中网络编程大片章介绍了socket,简单的介绍了一些应用层的协议http,对传输层很少描写或者说没有,所以我就打算把网络编程和并发写在一起,至于系统级I/O就懒得去写了,因为后面也都用到,系统级I/O在书中介绍了unix I/O的系统级函数与标准I/O库、描述符的概念,描述符有必要去了解,和windows下的句柄,内核中存维护着描述符表,而每个进程都维护着一个描述符表,子进程也拥有父进程的描述符表,从而我们知道如何去I/O重定向和文件共享。
下面代码是封装系统I/O的一组I/O,其中有带缓存的readnb和readlineb,与不带缓存的readn与writen,因为用的不是函数库,errno没有EINTR,所以需要自己定义一个,封装的原因是为了能够尽量写入数据与读出数据,无缓冲的I/O方便与网络传输,而着带缓冲的是线程安全的版本,线程不安全的一般缓冲区都是在全局或者是静态全局,容易被改写。
#define BUF_SIZE 8192
#define EINTR 4
struct rio_t{
int fd;
int cnt;
char *bufptr;
char buf[BUF_SIZE];
void rio_init(int fd){
this->fd=fd;
this->cnt=0;
this->bufptr=this->buf;
}
rio_t(int fd){
rio_init(fd);
}
ssize_t read(char *userbuf,size_t n){
int cnt=n;
int &m_cnt=this->cnt;
while(m_cnt<=0){
m_cnt=::read(this->fd,this->buf,n);
if(m_cnt<0){
if(errno==EINTR){
return -1;
}
}
else if(m_cnt==0){
return 0;
}
else{
this->bufptr=this->buf;
}
}
if(m_cnt<cnt){
cnt=m_cnt;
}
memcpy(userbuf,this->bufptr,cnt);
this->bufptr+=cnt;
m_cnt-=cnt;
return cnt;
}
ssize_t readlineb(void *userbuf,size_t n){
int cnt=0,tmp;
char ch,*bufp=(char*)userbuf;
for(int i=0;i<n;i++){
tmp=read(&ch,1);
if(tmp==1){
*bufp++=ch;
if(ch=='\n'){
break;
}
cnt++;
}
else if(cnt==0){
if(i==0){
return 0;
}
else{
break;
}
}
else{
return -1;
}
}
*bufp=0;
return cnt;
}
ssize_t readnb(void *userbuf,size_t n){
int nread,cnt=n;
char *bufptr=(char*)userbuf;
while(cnt>0){
nread=read(bufptr,cnt);
if(nread<0){
return -1;
}
else if(nread==0){
break;
}
cnt-=nread;
bufptr+=nread;
}
return n-cnt;
}
static ssize_t writen(int fd,void *userbuf,size_t n){
int nwrite,cnt=n;
char *bufptr=(char*)userbuf;
while(cnt>0){
nwrite=::write(fd,bufptr,cnt);
if(nwrite<=0){
if(errno==EINTR){
return 0;
}
return -1;
}
cnt-=nwrite;
bufptr+=nwrite;
}
return n;
}
static ssize_t readn(int fd,void *userbuf,size_t n){
int nread,cnt=n;
char *bufptr=(char*)userbuf;
while(cnt>0){
nread=::read(fd,bufptr,cnt);
if(nread<0){
return -1;
}
else if(nread==0){
break;
}
cnt-=nread;
bufptr+=nread;
}
return n-cnt;
}
};
socket编程-简单的C/S模型
一般来说在一个简单服务器端构造过程大概如下,我们要创建一个套接字,设置ipv_4或者ipv6协议与传输类型以及协议,然后设置sockaddr_in中的地址协议簇与ip地址和端口等等,然后bind告诉内核这个套接字以及绑定的地址是用于监听的,再然后listen,再然后就accept,相当于阻塞了,等待客户端connect,然后生成一个连接套接字。
而在客户端,只需要创建一个clientsocket,然后设置服务器的sockaddr_in中的ip地址与端口,然后就直接connect。
大致过程如上,但是为了更方便,因为服务器可能有多个ip地址,在服务器端,我们要开一个能使用的地址供连接,客户端也需要找服务器端可用的ip地址链接,这时候就要使用一些函数获取所有的地址以及端口+数据。
linux提供了更为强大的函数,getaddrinfo和getnameinfo,分别是通过域名或者服务器获得套接字地址与通过套接字地址获取相关信息。由于这两个函数需要设置很多参数来定义其行为是作为服务器端或者客户端,在使用的时候最好百度一下,了解其作用。
下面是定义一个创建监听套接字过程与客户端连接套接字,方便使用:
#define LISTENQ 1024
int open_listenfd(char *port)
{
addrinfo hints, *listp, *p;
int listenfd, rc, optval=1;
/* Get a list of potential server addresses */
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_socktype = SOCK_STREAM; /* Accept connections */
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; /* ... on any IP address */
hints.ai_flags |= AI_NUMERICSERV; /* ... using port number */
if ((rc = getaddrinfo(NULL, port, &hints, &listp)) != 0) {
fprintf(stderr, "getaddrinfo failed (port %s): %s\n", port, gai_strerror(rc));
return -2;
}
/* Walk the list for one that we can bind to */
for (p = listp; p; p = p->ai_next) {
/* Create a socket descriptor */
if ((listenfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) < 0)
continue; /* Socket failed, try the next */
/* Eliminates "Address already in use" error from bind */
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, //line:netp:csapp:setsockopt
(const void *)&optval , sizeof(int));
/* Bind the descriptor to the address */
if (bind(listenfd, p->ai_addr, p->ai_addrlen) == 0)
break; /* Success */
if (close(listenfd) < 0) { /* Bind failed, try the next */
fprintf(stderr, "open_listenfd close failed: %s\n", strerror(errno));
return -1;
}
}
/* Clean up */
freeaddrinfo(listp);
if (!p) /* No address worked */
return -1;
/* Make it a listening socket ready to accept connection requests */
if (listen(listenfd, LISTENQ) < 0) {
close(listenfd);
return -1;
}
return listenfd;
}
int open_listenfd(char *port)
{
addrinfo hints, *listp, *p;
int listenfd, rc, optval=1;
/* Get a list of potential server addresses */
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_socktype = SOCK_STREAM; /* Accept connections */
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; /* ... on any IP address */
hints.ai_flags |= AI_NUMERICSERV; /* ... using port number */
if ((rc = getaddrinfo(NULL, port, &hints, &listp)) != 0) {
fprintf(stderr, "getaddrinfo failed (port %s): %s\n", port, gai_strerror(rc));
return -2;
}
/* Walk the list for one that we can bind to */
for (p = listp; p; p = p->ai_next) {
/* Create a socket descriptor */
if ((listenfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) < 0)
continue; /* Socket failed, try the next */
/* Eliminates "Address already in use" error from bind */
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, //line:netp:csapp:setsockopt
(const void *)&optval , sizeof(int));
/* Bind the descriptor to the address */
if (bind(listenfd, p->ai_addr, p->ai_addrlen) == 0)
break; /* Success */
if (close(listenfd) < 0) { /* Bind failed, try the next */
fprintf(stderr, "open_listenfd close failed: %s\n", strerror(errno));
return -1;
}
}
/* Clean up */
freeaddrinfo(listp);
if (!p) /* No address worked */
return -1;
/* Make it a listening socket ready to accept connection requests */
if (listen(listenfd, LISTENQ) < 0) {
close(listenfd);
return -1;
}
return listenfd;
}
我们可以自己写一个最最最简单的C/S模型检测上面接口是否错误:
//server
void echo(int fd){
rio_t con(fd);
char buf[]="hello clinet!";
con.writen(fd,buf,sizeof(buf));
memset(buf,0,sizeof(buf));
con.readlineb(buf,MSGLEN);
printf("%s\n",buf);
}
int main(int argc, char const *argv[])
{
/* code */
int listen=open_listenfd("8080");
sockaddr_storage clientaddr;
int confd;
socklen_t clientlen=sizeof(sockaddr_storage);
while(1){
confd=accept(listen,(sockaddr*)&clientaddr,&clientlen);
echo(confd);
close(confd);
}
close(listen);
return 0;
}
//client
int main(int argc, char const *argv[])
{
/* code */
char buf[1000];
int client=open_clientfd("127.0.0.1","8080");
int len=recv(client,buf,1000,0);
printf("%s",buf);
char msg[]="hello server";
write(client,msg,strlen(msg));
close(client);
return 0;
}
//结果能够成功接收服务器端信息,服务器也能接受到客户端信息
[root@yanghao lerning]# ./server
hello server //这条信息是再另一个终端执行一下命令才出现
[root@yanghao lerning]# ./client
hello clinet!
上面就是简单的C/S模型,也可以不用,当然也可以自己去写一些套接字的连接,感觉可能更容易懂一些。
基于I/O多路复用的并发编程
I/O多路复用,个人理解的话,有点像一个字典,而key对应的是描述符,这组key用一个位向量组成,每个key对应着一个value,当集合中的其中一个描述符准备好可以读以后,就解除select的阻塞,也正如其英文名,选择。
我们称这个集合位准备好集合,注意的是select函数会把传入的集合更改,如集合中存放着两个描述符,stdin与socket,当系统有输入,按下回车后输入的信息就变成可读数据,此时select就会解除阻塞,设置传入的集合除了stdin在(设为1),其他的描述符清空(设为0),我们再根据判断哪个描述符在集合然后给出相应的状态(callback),再拿socket来举例,也就是服务器端监听的套接字,如果有客户端connect,相当于客户端传输一组连接信息给服务器端内核,这时候监听套接字就属于准备好读取状态,select中的集合就会设置其描述符为1,其他为0,如果没有connect,就一直处于阻塞状态。
我们可以看到其大概的作用,在C/S中,当服务器端与多个客户端相连接的时候,因为不使用多线程或者多进程,服务器端是一个一个处理,如果服务器因为等待一个客户端发送请求信息才关进行下一个客户端信息处理,而这个客户端迟迟不发送信息,那么服务器就卡在这,浪费很多资源,而如果我们使用select监听每个连接的套接字,如果有信息可读,我们就选择连接那个客户端套接字进行处理。
IO多路复用的优点在于因为是单进程处理并发,共享资源变得很容易,而且也容易去调试,而且事件驱动设计比基于进程去处理高效得很多,不需要像多进程那样通过上下文切换来调度新的流,而其缺点就是复杂,当我们要处理的并发事件过多,就使得编码复杂化,且如今现在几乎所有处理器都是多核处理器,基于事件驱动设计不能够充分利用多核处理器的特性。
函数原型与处理集合的相关宏:
int select (int maxfd + 1,fd_set *readset,fd_set *writeset,
fd_set *exceptset,const struct timeval * timeout);
FD_ZERO (fd_set *tmp);
FD_SET (int fd,fd_set *tmp);
FD_ISSET (int fd,fd_set *tmp);
FD_CLR (int fd,fd_set *tmp);
下面是简单的测试代码,写的是一个服务器处理多个客户端的服务器端简单模型,这个代码服务器端中使用的自己编写的I/O,推荐使用recv和send,在gdb调试的过程中,发现服务器端的readn或者readlineb被阻塞,因为服务器端设置的接收缓冲区太大,并不是客户端发送的信息大小,接收数据大小不匹配,所以服务器端会一直在等客户端发送信息直到客户端结束连接。
如果非要在客户端或者服务端使用封装的RIO进行互相传输数据,那么需要设置客户端和服务端发送与接收的缓冲区大小都一样,这样就不会产生阻塞了。
代码:
//服务器端代码
struct pool{
int clientCon[LISTENQ];
rio_t clientio[LISTENQ];
fd_set read_set;
fd_set ready_set;
int maxfd;
int max_connum;
void init_pool(int &fd){
maxfd=fd;
max_connum=-1;
for(int i=0;i<LISTENQ;i++){
clientCon[i]=-1;
}
FD_ZERO(&ready_set);
FD_SET(fd,&ready_set);
}
void addclientcon(int &fd){
for(int i=0;i<LISTENQ;i++){
if(clientCon[i]==-1){
clientCon[i]=fd;
clientio[i].rio_init(fd);
FD_SET(fd,&ready_set);
maxfd=maxfd>fd?maxfd:fd;
max_connum=max_connum>i?max_connum:i;
break;
}
if(i==LISTENQ-1){
fprintf(stdout,"TOo many client!");
return;
}
}
}
};
void echo(pool &e){
int cnt=0;
int tmpfd;
char ch[1024];
for(int i=0;i<=e.max_connum;i++){
tmpfd=e.clientCon[i];
if(tmpfd>0&&FD_ISSET(tmpfd,&(e.read_set))){
cnt=e.clientio[i].readlineb(ch,sizeof(ch));
if(cnt==0){
fprintf(stdout,"complete connection fd : %d\n",tmpfd);
FD_CLR(tmpfd,&e.ready_set);
close(tmpfd);
e.clientCon[i]=-1;
}
else{
printf("%s\n",ch);
if(strcmp(ch,"See you")==0){
fprintf(stdout,"By See You complete connection fd : %d\n",tmpfd);
FD_CLR(tmpfd,&e.ready_set);
close(tmpfd);
e.clientCon[i]=-1;
}
else{
sprintf(ch,"Stop connect plase answe [See you],your fd is: %d \n",tmpfd);
e.clientio[i].writen(tmpfd,ch,sizeof(ch));
}
}
}
}
}
int m_listen;
void handler(int sig){
close(m_listen);
exit(0);
}
int main()
{
signal(SIGINT,handler);
char port[]="8080";
m_listen=open_listenfd(port);
int confd;
int a=10;
sockaddr_storage clientcon;
socklen_t len=sizeof(sockaddr_storage);
pool e;
e.init_pool(m_listen);
while(1){
e.read_set=e.ready_set;
select(e.maxfd+1,&e.read_set,NULL,NULL,NULL);
if(FD_ISSET(m_listen,&e.read_set)){
char ch[]="Hello client,i'm server~\n";
confd=accept(m_listen,(sockaddr*)&clientcon,&len);
e.addclientcon(confd);
rio_t::writen(confd,ch,sizeof(ch));
fprintf(stdout,"connection fd: %d\n",confd);
}
echo(e);
}
close(m_listen);
return 0;
}
//客户端测试代码
int main(int argc, char const *argv[])
{
int client;
char buf[1000];
client=open_clientfd("127.0.0.1","8080");
int nread;
char msg[]="hi,i'm client of other version";
int len;
len=recv(client,buf,sizeof(buf),0);
printf("Server: %s",buf);
len=send(client,msg,sizeof(msg),0);
close(client);
return 0;
}
多线程
线程,是运行在进程上下文中的逻辑流,程序都是由每个进程中一个线程组成的,线程由内核自动调度,每个进程都有自己的私有空间(如:进程id,与进程相关的数据结构页表、内核栈之类的,还有堆栈、代码区、全局变量区、代码段、常量),每个线程有自己的线程ID,栈、栈指针、pc、通用寄存器等等,所有运行在一个进程的所有线程都共享该进程的整个虚拟空间,如我们在堆中申请的虚拟内存,或者打开的文件描述符等等。
线程与进程的执行区别在于,进程有父子进程之分,父进程可以回收子进程,而线程不分父子,和一个进程相关的线程组成一个线程池,独立于其他线程创建的线程,对等线程池中,一个线程可以杀死它的任何对等线程,或者等待其终止;多进程在切换进程上下文比线程切换要慢,多线程容易去共享数据,而多进程则需要管道、共享内存、信号、消息队列之类的方式去共享数据,比较麻烦。
在书中使用的是posix线程的接口,而我打算使用c++的thread类。
竞争
如以下代码,创建了两个线程去执行run,其结果不一定是2000,从汇编上来分析原因,在两个线程中执行的该循环都对c进行++操作,也就是会从c的地址获取值到寄存器进行add操作,然后再把寄存器的结果存放到c的地址中,只要其中一个线程在++过程中慢了一步,那么快了一步的线程所++的值将会无效,c的值还是会取决于慢的那个线程的++。
所以就产生了竞争资源的问题。
volatile int c=0;
void run(){
for(int i=0;i<=1000;i++){
c++;
}
}
int main(int argc, char const *argv[])
{
vector<thread>threads;
for(int i=0;i<2;i++){
threads.push_back(thread(run));
}
for(int i=0;i<2;i++){
threads[i].join();
}
cout<<c<<endl;
return 0;
}
信号量
为了解决竞争问题引入了信号量。
之前使用C++的thread的时候,学习过互斥量,对信号量的了解仅仅是知道这一名词,在看书的时候,有一种感觉,信号量和互斥量差不多,因为书中写着信号量提供了一种很方便的方法,能够确保对共享变量的互斥访问,这么一看觉得信号量和互斥量应该是一个东西吧,其实互斥量是一种特殊的信号量,先看看其函数吧:
int sem_init(sem_t *sem,0,int value);
int sem_wait(sem_t *sem);//P(s) 测试 对s--
int sem_post(sem_t *sem);//V(s) 增加 对s++
信号量用来实现互斥的话,信号量的值总是1或者0,使用PV操作对其进行增加或者删减。
值得注意的是,在P操作中,如果s为0,那么就会挂机该线程,直到s非0,一个V操作会重启这个线程,当多个线程在P操作中被阻塞后,被调度的执行线程在执行V后会重启这些线程中的其中一个线程,也就是我们不可得知是重启哪个线程,如果一个信号量在多个函数中使用PV,那么函数的调用顺序可能会有问题,所以使用信号量的时候要注意到这点。
而在上面这点中,我们可以发现,信号量与互斥量的区别,信号量可以设置非负整数值,而互斥量只有0、1,也就是说互斥量是一种特殊的信号量,信号量可以用来做同步或者互斥,而互斥量只能用来做互斥。