ZGGSONG WIKI

Unix网络编程复习

LNP作业1

2020年2月17号 星期一 08:55

GNU是什么词组的缩写?

  • 答:
    • GNU's Not Unix
    • ^-----^---^---
    • 箭头指向的那个那个字母:G N U

LNP作业2

2020年2月20号 星期四 11:01

Linux中目录/proc存储的是什么信息?

  • 答:
    • Linux 内核提供了一种通过 /proc 文件系统,在运行时访问内核内部数据结构、改变内核设置的机制。proc文件系统是一个伪文件系统,它只存在内存当中,而不占用外存空间。它以文件系统的方式为访问系统内核数据的操作提供接口。
    用户和应用程序可以通过proc得到系统的信息,并可以改变内核的某些参数。由于系统的信息,如进程,是动态改变的,所以用户或应用程序读取proc文件时,proc文件系统是动态从系统内核读出所需信息并提交的。下面列出的这些文件或子文件夹,并不是都是在你的系统中存在,这取决于你的内核配置和装载的模块。另外,在/proc下还有三个很重要的目录:net,scsi和sys。 Sys目录是可写的,可以通过它来访问或修改内核的参数,而net和scsi则依赖于内核配置。例如,如果系统不支持scsi,则scsi 目录不存在。 除了以上介绍的这些,还有的是一些以数字命名的目录,它们是进程目录。系统中当前运行的每一个进程都有对应的一个目录在/proc下,以进程的 PID号为目录名,它们是读取进程信息的接口。而self目录则是读取进程本身的信息接口,是一个link。

LNP作业3

2020年3月2号 星期一 08:56

写一个C程序: 生成10个子进程并利用waitpid回收,父进程回收子进程时输出子进程的PID.

代码

#include<unistd.h>
#include<sys/types.h>
#include<sys/wait.h>
#include<stdlib.h>
#include<stdio.h>
const int WNOHNAG = 1;
const int threadCount = 10;
int main(int argc,char *argv[]) {
  // 生成10个子进程并利用waitpid回收,父进程回收子进程时输出子进程的PID.
  pid_t *pid = (pid_t*)malloc(sizeof(int)*threadCount);
  int i = 0;
  do{
    *(pid + i) = fork();
    if (*(pid + i) == 0) {//子进程
      printf("child[%d] created...(sleep 1s)\n",i);
      sleep(5-0.5*i);//测试waitpid是否生效
      break;
    }else if (*(pid + i)>0) {//父进程
      //
    } else {
      printf("Error\n");
      break;
    }
    i ++ ;
  } while (i<threadCount);
  //父进程
  if (i == threadCount) {
    int threadCountTemp = threadCount;
    printf("Parent waiting child...\n");
    while (threadCountTemp>0) {
      for(int j = 0;j<threadCount;j ++ ) {
        if (waitpid(*(pid + j),NULL,WNOHNAG)>0) {
          printf(">>child[%d] done,pid = %d\n",j,*(pid + j));
          threadCountTemp--;
        }
        sleep(0.001);
      }
    }
  }
  return 0;
}

编译脚本

#!/bin/sh
echo "\n--------- project $1 start ---------\n"
g++ -o $1 $1.c `mysql_config --cflags --libs`;./$1;echo "\n--------- project $1 end ---------\n"
rm $1;

运行结果

LNP作业4

统计已经安装好虚拟机的人数,安装好的提交作业,没装好不要提交,本周末之前提交

  • 答:
    • 已安装

LNP作业5

写一对基于UDP的服务器客户端程序。

客户端可以用命令./udpclient <IP> <number>执行 并在标准输出上输出number + 1 服务器在约定的本地套接字上等待客户端消息,加1操作后发送给客户端 先来先服务 直接在作业中粘贴两端代码,include部分只粘贴一次

代码

服务端

#include <stdio.h>
  #include <unistd.h>
  #include <sys/types.h>
  #include <sys/socket.h>
  #include <string.h>
  #include <arpa/inet.h>
  #include <stdlib.h>

  typedef struct sockaddr* saddrp;

  const char* BIND_SERVER_IP = "127.0.0.1";
  const int BIND_PORT = 30303;

  int main(int argc, char *argv[])
  {
    // 创建fd
    int sockfd = socket(AF_INET,SOCK_DGRAM,0);
    if (sockfd < 0){
        perror("创建文件描述符失败");
        return -1;
    }
    // 配置
    struct sockaddr_in addr = {};
    addr.sin_family = AF_INET; // IPV4
    addr.sin_addr.s_addr = inet_addr(BIND_SERVER_IP);// IP
    addr.sin_port = htons(BIND_PORT); // PORT
    // 绑定
    int res = bind(sockfd,(saddrp)&addr,sizeof(addr));
    if (res < 0){
        perror("监听失败");
        return -1;
    }
    struct sockaddr_in src_addr ={};
    socklen_t addr_len = sizeof(struct sockaddr_in);
    while(1){
      char buf[255] = {};
      //接收数据和来源的ip地址
      recvfrom(sockfd,buf,sizeof(buf),0,(saddrp)&src_addr,&addr_len);
      printf("Recv:%s\n", buf);
      // char*转int
      int recv = atoi(buf);
      // +1操作
      recv++;
      // int转char*
      sprintf(buf, "%d", recv);
      // 发送数据给目标地址
      sendto(sockfd,buf,strlen(buf)+1,0,(saddrp)&src_addr,addr_len);
    }  
    //关闭socket对象
    close(sockfd);
    return 0;
  }

服务端编译脚本

#!/bin/sh
  echo "\n--------- project $1 start ---------\n"
  g++ -o $1 $1.c `mysql_config --cflags --libs`;./$1;echo "\n--------- project $1 end ---------\n"
  rm $1;

客户端

#include <stdio.h>
  #include <sys/types.h>
  #include <sys/socket.h>
  #include <string.h>
  #include <arpa/inet.h>
  #include <unistd.h>
  typedef struct sockaddr* saddrp;

  int main(int argc, char *argv[])
  {
    if(argc < 3) {
      perror("参数不足");
      return -1;
    }
    char* ip = argv[1];
    char* buf = argv[2];
    int sockfd = socket(AF_INET,SOCK_DGRAM,0);
    if (sockfd < 0){
        perror("创建文件描述符失败");
        return -1;
    }
    struct sockaddr_in addr = {};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(30303);
    addr.sin_addr.s_addr = inet_addr(ip);

    socklen_t addr_len = sizeof(struct sockaddr_in);
    while(1)
    {
        char recv_buf[255] = {};
        sendto(sockfd,buf,strlen(buf)+1,0,(saddrp)&addr,sizeof(addr));
        recvfrom(sockfd,recv_buf,sizeof(recv_buf),0,(saddrp)&addr,&addr_len);
        printf("Recv:%s\n",recv_buf);
        // 一次就结束
        break;
    }
    close(sockfd);
    return 0;
  }

客户端编译脚本

#!/bin/sh
  echo "\n--------- project $1 start ---------\n"
  g++ -o $1 $1.c `mysql_config --cflags --libs`;./$1 $2 $3;echo "\n--------- project $1 end ---------\n"
  rm $1;

运行结果

3.10 并发服务器实验

2020年3月10号 星期二 15:16

在3月8日课程内容的基础上修改并发服务器和对应客户端的代码,使得服务端可以返回客户端发送的两个整数之和。服务器提供并发服务。这次作业的代码也是实验二的代码。

服务端代码

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <stdlib.h>

typedef struct sockaddr* saddrp;

const char* BIND_SERVER_IP = "127.0.0.1";
const int BIND_PORT = 30303;

int main(int argc, char *argv[])
{
  // 创建fd
  int sockfd = socket(AF_INET,SOCK_DGRAM,0);
  if (sockfd < 0){
      perror("创建文件描述符失败");
      return -1;
  }
  // 配置
  struct sockaddr_in addr = {};
  addr.sin_family = AF_INET; // IPV4
  addr.sin_addr.s_addr = inet_addr(BIND_SERVER_IP);// IP
  addr.sin_port = htons(BIND_PORT); // PORT
  // 绑定
  int res = bind(sockfd,(saddrp)&addr,sizeof(addr));
  if (res < 0){
      perror("监听失败");
      return -1;
  }
  struct sockaddr_in src_addr ={};
  socklen_t addr_len = sizeof(struct sockaddr_in);
  int counter = 0;
  int a[2] = {0,0};
  while(1){
    counter++;
    char buf[255] = {};
    //接收数据和来源的ip地址
    recvfrom(sockfd,buf,sizeof(buf),0,(saddrp)&src_addr,&addr_len);
    printf("Recv:%s\n", buf);
    // char*转int
    int recv = atoi(buf);
    a[counter-1] = recv;
    if (counter == 2) {
      int ret_res = a[0] + a[1];
      printf("Return:%d + %d = %d\n", a[0], a[1], ret_res);
      counter = 0;
      // int转char*
      sprintf(buf, "%d", ret_res);
      // 发送数据给目标地址
      sendto(sockfd,buf,strlen(buf)+1,0,(saddrp)&src_addr,addr_len);
    }
  }
  //关闭socket对象
  close(sockfd);
  return 0;
}

客户端代码

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
typedef struct sockaddr* saddrp;

const char* BIND_SERVER_IP = "127.0.0.1";
const int BIND_PORT = 30303;

int main(int argc, char *argv[])
{
  char buf[] = "1";

  int sockfd = socket(AF_INET,SOCK_DGRAM,0);
  if (sockfd < 0){
      perror("创建文件描述符失败");
      return -1;
  }
  struct sockaddr_in addr = {};
  addr.sin_family = AF_INET;
  addr.sin_port = htons(BIND_PORT);
  addr.sin_addr.s_addr = inet_addr(BIND_SERVER_IP);

  socklen_t addr_len = sizeof(struct sockaddr_in);
  while(1)
  {
      char recv_buf[255] = {};
      sendto(sockfd,buf,strlen(buf)+1,0,(saddrp)&addr,sizeof(addr));
      sendto(sockfd,buf,strlen(buf)+1,0,(saddrp)&addr,sizeof(addr));
      recvfrom(sockfd,recv_buf,sizeof(recv_buf),0,(saddrp)&addr,&addr_len);
      printf("Recv:%s\n",recv_buf);
      // 一次就结束
      break;
  }
  close(sockfd);
  return 0;
}

运行结果

3.10 实现popen

2020年3月10号 星期二 15:21

实验一. 实现程序mypopen

以此shell命令为例 #./mypopen ls -l 父进程首先创建pipe管道; 父进程创建子进程; 子进程执行ls -l; 子进程的输出经管道传给父进程; 父进程,将子进程的信息输出在标准输出上

代码

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/wait.h>  
#include <limits.h>
#include <errno.h>  
#include <malloc.h>

/* 模拟popen */
FILE *mypopen(char* cmd[], char type);

/* 调试 */
int main(int argc, char* argv[]){
  if (argc < 2) {
    printf("%s", "请输入指令");
    exit(1);
  }
  // 最多支持10个参数
  // 构造新数组,提取第二个及后面的参数赋给新数组(cmd),并在令新数组最后一个数组值为NULL
  printf("You Exe Cmd: ");
  char** cmd = (char**)malloc(sizeof(char*)*10);
  for (int i = 1; i < argc; i++) {
    cmd[i - 1] = argv[i];
    printf("%s ", argv[i]);
  }
  printf("\n");
  cmd[argc] = NULL;
  // 非阻塞执行指令
  FILE* fp = mypopen(cmd, 'r');
  // 输出结果
  if (fp == NULL){
    printf("%s", "无效的指令");
    exit(1);
  }
  char c;
  c = fgetc(fp);
  while (c != EOF) {
    putchar(c);
    c = fgetc(fp);
  }
  return 0;
}

/* 模拟popen */
FILE *mypopen(char* cmd[], char type) {  
  int fd[2]; // 0读 1写
  int pid_t; // 进程描述符                 

  // 其他参数为非法
  if(type !='r' && type != 'w'){  
    printf("myopen() illegal param type, must be 'r' or 'w'/n");  
    return NULL;  
  }  

  // 建立管道  
  if(pipe(fd) < 0) {
    printf("myopen() pipe create error/n");  
    return NULL;  
  }

  // 建立子进程  
  pid_t = fork();             

  if(pid_t < 0) {
    return NULL;  
  }else if(pid_t == 0) {
    // 子进程执行命令
    if(type == 'r'){  
      close(fd[0]); // 关闭读管道
      // dup2(fd[1],STDOUT_FILENO);
      dup(fd[1]);  // 将标准输出重定向到管道写端
      close(fd[1]); // 使用完毕,关闭管道
    }else{  
      close(fd[1]); // 关闭写管道,父进程来以子写
      // dup2(fd[0],STDIN_FILENO);
      dup(fd[0]); // 将标准输出重定向到管道读端
      close(fd[0]); // 使用完毕,关闭管道
    }
    if(execvp(cmd[0], cmd) < 0)          //用exec族函数执行命令  
      return NULL;      
  } else {
    wait(0); // 父进程等待子进程返回  
    if(type=='r'){
      close(fd[1]);  
      return fdopen(fd[0],"r"); // 程序需要返回的参数是文件指针,因此需要用 fdopen 函数将描述符打开,其返回值为相应的文件指针   
    }else{  
      close(fd[0]);  
      return fdopen(fd[1],"w");  
    }
  }
}

编译脚本

#!/bin/sh
echo "\n--------- project $1 start ---------\n"
g++ -o $1 $1.c `mysql_config --cflags --libs`;./$@;echo "\n--------- project $1 end ---------\n"
rm $1;

运行结果

  • mypopen源程序的设计中,规定了参数不得超过10个

指令(包含指令名称)参数为1时

指令(包含指令名称)参数为2时

指令(包含指令名称)参数为n(n<=10)时

dup与dup2

  • APUE和man文档都用一句话简明的说出了这两个函数的作用:复制一个现存的文件描述符。
#include <unistd.h>
int dup(int oldfd);
int dup2(int oldfd, int newfd);
  • 当调用dup函数时,内核在进程中创建一个新的文件描述符,此描述符是当前可用文件描述符的最小数值,这个文件描述符指向oldfd所拥有的文件表项。
  • dup2和dup的区别就是可以用newfd参数指定新描述符的数值,如果newfd已经打开,则先将其关闭。如果newfd等于oldfd,则dup2返回newfd, 而不关闭它。dup2函数返回的新文件描述符同样与参数oldfd共享同一文件表项。
  • APUE用另外一个种方法说明了这个问题:
    • 实际上,调用dup(oldfd)等效于,fcntl(oldfd, F_DUPFD, 0);
    • 而调用dup2(oldfd, newfd)等效于,close(oldfd);fcntl(oldfd, F_DUPFD, newfd);

参考资料

3.19 select

2020年3月19号 星期四 11:04

请实现一个基于select函数的程序:mycp,其使用方式为:

./mycp filename1 filename2 运行结果是filename2是被复制的filename1,两者内容相同。 文件操作相关调用open, read, write,close可查阅Linux课程教材 注意因为要使用select,所以不要用文件流指针处理文件。

代码

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

int main(int argc, char* argv[]) {

  if (argc < 3) {
    printf("mycp require two params\n");
    return 0;
  }

  // const char * pathname = "./test";
  // const char * pathname_cp = "./testcp";
  const char * pathname = argv[1];
  const char * pathname_cp = argv[2];

  if (access(pathname, R_OK) < 0){
    printf("no access to read / no such file\n");
    return 0;
  }

  int fd, fd_cp;
  char buf[1]; // 一个一个的读,只要读到\0那么就说明结束了
  int ret, ret_cp, sret;

  fd = open(pathname, O_RDONLY);
  fd_cp = open(pathname_cp, O_WRONLY|O_CREAT);

  fd_set readfds;

  FD_ZERO(&readfds);
  FD_SET(fd, &readfds); // 加入到轮询列表

  while(1) {

    // 阻塞
    sret = select(fd+1, &readfds, NULL, NULL, 0); // &timeout

    if (sret == 0 || sret == -1) {
      printf("sret = %d\n", sret);
      printf("    error\n");
      break;
    } else {
      // printf("sret = %d\n", sret);
      memset((void *) buf, 0, 1);
      ret = read(fd, (void *) buf, 1);
      if (ret != -1) {
        if (buf[0] == '\0') { // 停止轮询条件
          break;
        } else {
          ret_cp = write(fd_cp, (void *) buf, 1);
          // printf("ret_cp = %d\n", ret_cp);
        }
        if (ret_cp == -1) {
          break;
        }
      } else {
        printf("ret = %d\n", ret);
        printf("    error\n");
        break;
      }
    }
  }
  return 0;
}

调试

  • 输入参数错误时

  • 输入的文件不存在/当前用户无读权限

  • 测试复制test文件

参考资料

测试

Test

epoll client

2020年3月23号 星期一 09:07

请利用epoll函数改写基于TCP的回音客户端,注意使用边缘触发模式,管理标准输入和套接字描述符。回音客户端和服务端的代码在教材源代码包中,本次作业仅需修改客户端,基于epoll的服务端将作为第三次实验的内容。

客户端代码(使用epoll监听文件描述符)

#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h> //close()
#include <string.h> //strcmp()等字符串操作函数
#include <stdlib.h> //atoi() 字符串转int
#include <sys/epoll.h>

const char* BIND_SERVER_IP = "127.0.0.1";
const int BIND_PORT = 30303;

int main(int argc, char *argv[]) {

  // 1 创建tcp通信socket
  int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
  if(socket_fd == -1){
    perror("socket failed!\n");
  }

  // 2 连接服务器
  struct sockaddr_in server_addr = {0};//服务器的地址信息
  server_addr.sin_family = AF_INET;//IPv4协议
  server_addr.sin_port = htons(BIND_PORT);//服务器端口号
  server_addr.sin_addr.s_addr = inet_addr(BIND_SERVER_IP);//设置服务器IP
  int ret = connect(socket_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
  if(ret == -1){
    perror("connect failed!\n");
  } else {
    printf("connect server successful!\n");
  }

  // 创建epoll监听连接TCP的文件描述符是否可读
  int epfd, nfds;
  struct epoll_event event, events[3]; // 最多存三个触发,本例中实际上只会占用一个
  epfd = epoll_create(1);
  event.data.fd = socket_fd; // 监听连接服务器的TCP文件描述符
  event.events = EPOLLIN | EPOLLET; // 是否可都 + 边缘触发(描述符由不可读变为可读,只通知一次)
  epoll_ctl(epfd, EPOLL_CTL_ADD, socket_fd, &event); // 注册

    //3 循环发送消息、接收消息
  while(1){
    char buf[1024] = {0};
    printf("say something:");
    // scanf("%s",buf); // 不识别空格
    // gets(buf); // 不安全
    scanf("%[^\n]%*c", buf); // 识别空格
    write(socket_fd, buf, strlen(buf)); //发送消息
    memset(buf, 0, sizeof(buf));
    // read(socket_fd, buf, sizeof(buf)); //阻塞,等待客户端发来消息
    nfds = epoll_wait(epfd, events, 3, -1); // 非阻塞监听回信
    int i;
    for (i = 0; i < nfds; ++i) {
      if (events[i].data.fd == socket_fd) {
        // printf("hello world\n");
        read(socket_fd, buf, sizeof(buf));
        printf("Receive: %s\n", buf);
      }
    }
    if(strcmp(buf, "exit") == 0){
      break;//退出循环
    }
  }

  //4 关闭socket
  close(socket_fd);

  return 0;
}

附:服务端代码(单线程服务端)

  • 服务端的作用是把客户端发来的消息再发回去,实现“回音”效果
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h> //close()
#include <string.h> //strcmp()等字符串操作函数
#include <stdlib.h> //atoi() 字符串转int

// const char* BIND_SERVER_IP = "127.0.0.1";
const int BIND_PORT = 30303;

int main(int argc, char *argv[]){

  //1 创建tcp通信socket
  int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
  if(socket_fd == -1){
    perror("创建tcp通信socket失败!\n");
    return -1;
  }

  //2 绑定socket地址
  struct sockaddr_in server_addr = {0};//存放地址信息
  server_addr.sin_family = AF_INET;//AF_INET->IPv4  
  server_addr.sin_port = htons(BIND_PORT);//端口号
  server_addr.sin_addr.s_addr = INADDR_ANY; //让系统检测本地网卡,自动绑定本地IP
  int ret = bind(socket_fd, (struct sockaddr *) &server_addr, sizeof(server_addr));
  if(ret == -1){
    perror("bind failed!\n");
    return -1;
  }

  //3 设置监听队列,设置为可以接受5个客户端连接
  ret = listen(socket_fd, 5);
  if(ret == -1){
    perror("listen falied!\n");
  }

  printf("server is running!\n");

  struct sockaddr_in client_addr = {0};//用来存放客户端的地址信息
  socklen_t len = sizeof(client_addr);
  int new_socket_fd = -1;//存放与客户端的通信socket

  //4 等待客户端连接
  new_socket_fd = accept( socket_fd, (struct sockaddr *)&client_addr, &len);
  if(new_socket_fd == -1){
    perror("accpet error!\n");
  } else {
    printf("IP:%s, PORT:%d [connected]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
  }

    //循环接收信息
  while(1){
    char buf[1024] = {0};
    read(new_socket_fd, buf, sizeof(buf)); //阻塞,,等待客户端发来消息
    write(new_socket_fd, buf, strlen(buf)); //发送消息
    printf("receive msg:%s\n", buf); //打印消息
    if(strcmp(buf, "exit") == 0){
      break; //退出循环
    }
  }

  //5 关闭socket
  close(new_socket_fd);
  close(socket_fd);

  return 0;
}

编译脚本

#!/bin/sh
echo "\n--------- project $1 start ---------\n"
g++ -o $1 $1.c;./$@;echo "\n--------- project $1 end ---------\n"
rm $1;

运行

  • 客户端

  • 服务端

相关知识点

epoll机制: epoll_create、epoll_ctl、epoll_wait

  • 引入sys/epoll.h依赖
#include <sys/epoll.h>

1. 创建epoll句柄

int epfd = epoll_create(int size);

size用来告诉内核这个监听的数目一共有多大

2. 将被监听的描述符添加到epoll句柄或从epool句柄中删除或者对监听事件进行修改

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

该函数用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。

  • 参数
    • epfd:由 epoll_create 生成的epoll专用的文件描述符;
    • op:要进行的操作例如注册事件,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除
    • fd:关联的文件描述符;
    • event:指向epoll_event的指针;

第一个参数是epoll_create()的返回值

第二个参数表示动作,用三个宏来表示: ||| |-|-| |EPOLL_CTL_ADD|注册新的fd到epfd中| |EPOLL_CTL_MOD|修改已经注册的fd的监听事件| |EPOLL_CTL_DEL|从epfd中删除一个fd|

第三个参数是需要监听的fd

第四个参数是告诉内核需要监听什么事件,structepoll_event结构如下:

typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

events可以是以下几个宏的集合: ||| |-|-| |EPOLLIN|触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);| |EPOLLOUT|触发该事件,表示对应的文件描述符上可以写数据;| |EPOLLPRI|表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);| |EPOLLERR|表示对应的文件描述符发生错误;| |EPOLLHUP|表示对应的文件描述符被挂断;| |EPOLLET|将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的;| |EPOLLONESHOT|只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。|

3.等待事件触发,当超过timeout还没有事件触发时,就超时。

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

该函数用于轮询I/O事件的发生,即等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大(数组成员的个数),这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1为阻塞)。

该函数返回需要处理的事件数目,如返回0表示已超时。

返回的事件集合在events数组中,数组中实际存放的成员个数是函数的返回值。返回0表示已经超时。

  • 参数:

    • epfd:由epoll_create 生成的epoll专用的文件描述符;
    • epoll_event:用于回传代处理事件的数组;
    • maxevents:每次能处理的事件数;
    • timeout: 等待I/O事件发生的超时值。-1相当于阻塞,0相当于非阻塞。一般用-1即可
  • 返回发生事件数。

4. ET与LT

EPOLL事件有两种模型:

  • ET:边缘触发模式
  • LT:水平触发模式
Edge Triggered(ET) 高速工作方式,错误率比较大,只支持no_block socket (非阻塞socket)
LevelTriggered(LT) 缺省工作方式,即默认的工作方式,支持blocksocket和no_blocksocket,错误率比较小。

水平触发

1. 对于读操作

只要缓冲内容不为空,LT模式返回读就绪。

2. 对于写操作

只要缓冲区还不满,LT模式会返回写就绪。

边缘触发

1. 对于读操作

(1)当缓冲区由不可读变为可读的时候,即缓冲区由空变为不空的时候。

(2)当有新数据到达时,即缓冲区中的待读数据变多的时候。

(3)当缓冲区有数据可读,且应用进程对相应的描述符进行EPOLL_CTL_MOD修改EPOLLIN事件时。

2. 对于写操作
  • (1)当缓冲区由不可写变为可写时。
  • (2)当有旧数据被发送走,即缓冲区中的内容变少的时候。
  • (3)当缓冲区有空间可写,且应用进程对相应的描述符进行EPOLL_CTL_MOD修改EPOLLOUT事件时。

参考资料

Producer vs. Consumer --Circular Queue

题量: 1 满分: 100 分 创建者:李沁 截止时间:2020-04-09 10:00 作业改写成环形缓存队列

分析

  • [x] 所给的代码截图采用生产者-缓存Quene-消费者模式在多线程并发场景下处理数据。
  • [x] 生产者一个线程,消费者一个线程,共用缓存队列。
  • [x] 生产者负责制造(接收)待处理数据,向缓存中添加数据。消费者负责处理待处理数据,从缓存中取出数据并销毁或标记为可用状态。两个线程相互独立,仅在操作缓存队列时需要相互避让。
  • 线程之间通过信号量进行避让操作,避免同时操作缓存引起脏数据。
  • [x] 生产者线程、消费者线程、缓存队列三者相辅相成,构成有机整体,可以异步处理多线程并发问题。

改用环形队列作为队列缓存的数据结构

  • [x] 代码参考了老师给的代码思路,自己重写模拟生产者消费者模型。
  • [x] 改用环形队列作为缓存(RingBuffer)。
  • [x] RingBuffer采用数组实现,最大可提供得并发访问由MAX_COUNT决定。
  • [x] 模型通过异步向数据库插入学生信息(姓名、学号、班级)。向数据库插入数据通过线程延迟10ms模拟
  • [x] 公共变量采用pthread的互斥锁保证数据完整。
  • [x] 实现环形缓存的关键函数:getRingBufferIndex(requestID)
  • [x] 如果ringBuffer容量已满,会直接丢弃请求
  • [x] thread3, thread4相关代码默认注释状态,用来测试连续两次有较长时间间隔的并发请求数 > 缓存最大容量时的情况。

代码1-定义环形缓存ringBuffer

#include <stdio.h>
#include <stdlib.h>

#ifndef ringBuffer_h
#define ringBuffer_h

// insert into db
// 待处理的数据格式
typedef struct {
  char *stu_name;  // 姓名
  char *stu_stuID; // 学号
  int age;     // 年龄
} StuInfo;

// ringBuffer item
// 环形队列存储单元
typedef struct RingBufferItem {
  StuInfo stuInfo; // 学生数据
  int enable;      // 缓存状态(对生产者来说),0可用,1有未处理数据
} *RingBuffer;

/* 创建环形队列 */
RingBuffer init_ringBuffer(int size) {
  RingBuffer ringBuffer;
  if (size > 0) {
    // printf("struct RingBufferItem size is : %ld\n", sizeof(struct RingBufferItem));
    ringBuffer = (RingBuffer)malloc(sizeof(RingBufferItem) * size);
    for (int i = 0; i < size; i++) {
      (ringBuffer + i)->enable = 1;
    }
    return ringBuffer;
  } else {
    return NULL;
  }
}

#endif

代码2-生产者-消费者运行模型

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include "ringBuffer.h"

/* 缓存容量*/
#define MAX_COUNT 5
/* 测试并发量 */
#define TEST_COUNT 10;
/* 定义环形数组 */
RingBuffer ringBuffer;
/* 定义requestID(自增ID) */
int requestID; // 生产者请求序号
/* 定义互斥锁 */
pthread_mutex_t mutex_for_requestID;
pthread_mutex_t mutex_for_ringBuffer;
/* 为消费者定义已处理请求数,作为消费者循环处理数据的判断 */
int requestDone; // 消费者处理序号

/* --- methods --- */

/* 计算环形队列的索引号 */
int getRingBufferIndex(int request_index);
/* 生产者 */
void *eventFactory_start(void *myvar);
/* 消费者 */
void *eventHandler_start(void *myvar);
/* 加锁 */
void my_pthread_mutex_lock(pthread_mutex_t *mutex);


int main(int argc, char *argv[]){

  // printf("%d\n", getRingBufferIndex(requestID++));
  // exit(0);

  // 初始化锁
  pthread_mutex_init(&mutex_for_requestID, NULL);
  pthread_mutex_init(&mutex_for_ringBuffer, NULL);
  // 初始化环形队列
  // 浅拷贝返回值
  ringBuffer = init_ringBuffer(MAX_COUNT);
  requestID = -1;
  requestDone = -1;
  // for(int i = 0; i < MAX_COUNT; i++) {
  //   printf("buffer[%d].enable = %d\n", i, (ringBuffer + i)->enable);
  // }

  pthread_t thread1, thread2;
  // pthread_t	thread3, thread4;

  int ret1, ret2;

  ret1 = pthread_create(&thread1, NULL, eventFactory_start, NULL);
  ret2 = pthread_create(&thread2, NULL, eventHandler_start, NULL);

  pthread_join(thread1, NULL);
  pthread_join(thread2, NULL);

  // sleep(1);
  // ret1 = pthread_create(&thread3, NULL, eventFactory_start, NULL);
  // ret2 = pthread_create(&thread4, NULL, eventHandler_start, NULL);
  // pthread_join(thread3, NULL);
  // pthread_join(thread4, NULL);

  printf("producer thread ret1 = %d\n", ret1);
  printf("consumer thread ret2 = %d\n", ret2);

  return 0;
}

/* 计算环形队列的索引号 */
int getRingBufferIndex(int request_index) {
  return request_index % MAX_COUNT;
}

/* 加锁 */
void my_pthread_mutex_lock(pthread_mutex_t *mutex) {
  if (pthread_mutex_lock(mutex) != 0){
    printf("lock error!\n");
    exit(0);
  }
}

/* 生产者 */
void *eventFactory_start(void *myvar) {
  // 并发放入1000个测试数据
  for(int i = 0; i < 15; i++) {
    // --- mock数据生成开始 ---
    char *stu_name = (char*)malloc(sizeof(char) * 64);
    char *stu_stuID = (char*)malloc(sizeof(char) * 64);
    sprintf(stu_name, "姓名[%d]", i);
    sprintf(stu_stuID, "ID[%d]", i);
    // printf("姓名 = %s\n", stu_name);
    // printf("ID = %s\n", stu_stuID);
    int age = i;
    // --- mock数据生成结束 ---
    // requestID 加锁
    my_pthread_mutex_lock(&mutex_for_requestID);
    requestID++;
    int current_requestID = requestID;
    // ringBuffer 加锁
    my_pthread_mutex_lock(&mutex_for_ringBuffer);
    // printf("current_requestID = %d\n", current_requestID);
    int ringBufferIndex = getRingBufferIndex(current_requestID);
    if((ringBuffer + ringBufferIndex)->enable == 0) {
      printf("☒ 并发数达到最大,已丢弃请求学生(%s)创建的请求\n", stu_name);
      requestID--;
      // requestID 解锁
      pthread_mutex_unlock(&mutex_for_requestID);
    } else {
      // requestID 解锁
      pthread_mutex_unlock(&mutex_for_requestID);
      // 向缓存添加模拟数据
      (ringBuffer + ringBufferIndex)->stuInfo.stu_name = stu_name;
      (ringBuffer + ringBufferIndex)->stuInfo.stu_stuID = stu_stuID;
      (ringBuffer + ringBufferIndex)->stuInfo.age = age;
      (ringBuffer + ringBufferIndex)->enable = 0;
      printf("→ 学生(%s)已创建任务\n", (ringBuffer + ringBufferIndex)->stuInfo.stu_name);
    }
    // ringBuffer 解锁
    pthread_mutex_unlock(&mutex_for_ringBuffer);
  }
}

/* 消费者 */
void *eventHandler_start(void *myvar) {
  int timeout_count = 10;
  int current_timeout_count = 0;
  while(1) {
    // requestID 加锁
    my_pthread_mutex_lock(&mutex_for_requestID);
    if (requestDone < requestID) { // 读requestID暂不加锁
      // requestID 解锁
      pthread_mutex_unlock(&mutex_for_requestID);
      // 有新请求处理
      requestDone++;
      int ringBufferIndex = getRingBufferIndex(requestDone);
      // ringBuffer 加锁
      my_pthread_mutex_lock(&mutex_for_ringBuffer);
      // 数据处理,假装 insert into table...
      sleep(0.001);
      printf("☑ 学生(%s)已插入数据表!\n", (ringBuffer + ringBufferIndex)->stuInfo.stu_name);
      (ringBuffer + ringBufferIndex)->enable = 1;
      // ringBuffer 解锁
      pthread_mutex_unlock(&mutex_for_ringBuffer);
    } else {
      // requestID 解锁
      pthread_mutex_unlock(&mutex_for_requestID);
      // 消费者休息50ms
      sleep(0.05);
      // current_timeout_count++;
      // if (timeout_count == current_timeout_count) {
      //   break;
      // }
    }
  }
}

运行结果

  • 当并发请求数 <= 缓存最大容量时(模拟某一时间点小股并发)

  • 当并发请求数 > 缓存最大容量时(模拟某一时间点大量并发)

  • 当连续两次有较长时间间隔(相隔1s)的并发请求数 > 缓存最大容量,时(模拟两个较长时间间隔的大量并发,用来测试环形缓存是否为环状、同一块缓存是否复用)

参考资料

转载

转载自https://blog.webpro.ltd

Copyright © 2020 ZGGSONG