博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【队列源码研究】消息队列beanstalkd源码详解
阅读量:6569 次
发布时间:2019-06-24

本文共 23340 字,大约阅读时间需要 77 分钟。

顺风车运营研发团队 李乐

1.消息队列简介

计算机软件发展的一个重要目标是降低软件耦合性;

网站架构中,系统解耦合的重要手段就是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分为多个阶段,每个阶段之间通过共享数据的方式异步执行;

在分布式系统中,多个服务器集群通过分布式消息队列实现异步;分布式消息队列可以看作是内存队列的分布式部署;

分布式消息队列架构图通常如下所示:

clipboard.png

消息队列是典型的生产者消费者模式,两者不存在直接调用,只要保持数据结构不变,彼此功能实现可以随意改变而不互相影响;异步消息队列还有以下特点:

  • 提高系统可用性:消费者服务器发生故障时,生产者服务器可以继续处理业务请求,系统整体表现无故障;此时数据会在消息队列服务器堆积,待消费者服务器恢复后,可以继续处理消息队列中的数据;
  • 加快网站相应速度:业务处理前端的生产者服务器在处理完业务请求后,将数据写入消息队列,不需要等待消费者服务器处理就可以返回,减少响应延迟;
  • 消除并发访问高峰:用户访问是随机的,存在高峰和低谷;可以使用消息队列将突然增加的访问请求数据放入消息队列中,等待消费者服务器依次处理;

消费者消费消息时,通常有两种模式可以选择:拉模型与推模型。

  • 拉模型是由消息的消费者发起的,主动权把握在消费者手中,它会根据自己的情况对生产者发起调用;
  • 推模式消费者只会被动接受消息,消息队列一旦发现消息进入,就会通知消费者执行对消息的处理;

2.beanstalkd基本知识

2.1beanstalkd简介

beanstalkd是一个轻量级的消息队列;主要有一下特点:

  • 拉模式,消费者需要主动从服务器拉取消息数据;
  • tube:类似于消息主题topic,一个beanstalkd中可以支持多个tube,每个tube都有自己的producer和consumer;多个生产者可以往同一个tube生产job,多个消费者也能监听同一个tube获取job;
  • job:代替了传统的message,与消息最大的区别是,job有多种状态;
  • conn:代表一个客户端链接;
  • 优先级:job可以有0~2^32个优先级,0代表最高优先级,beanstalkd使用堆处理job的优先级排序,因此reserve命令的时间复杂度是O(logN);
  • 延时:生产者发布任务时可以指定延时,到达延迟时间后,job才能被消费者消费;
  • 超时机制:消费者从beanstalkd获取一个job后,必须在预设的 TTR (time-to-run) 时间内处理完任务,并发送 delete / release/ bury 命令改变任务状态;否则 Beanstalkd 会认为消息消费失败,重置job状态,使其可以被其他消费者消费。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从重新计时TTR;
  • 暂停:pause命令可以暂停当前tube,暂停时期内所有job都不能够被消费者消费;

job有一下几种状态:

  • READY,需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
  • DELAYED,延迟执行的任务,;
  • RESERVED,已经被消费者获取, 正在执行的任务,Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
  • BURIED,保留的任务: 任务不会被执行,也不会消失,除非有人将他修改为其他状态;
  • DELETED,消息被彻底删除。Beanstalkd 不再维持这些消息。

状态之间的转移图如下所示:

clipboard.png

思考:

  • beanstalkd如何维护job的状态?tube有3个集合delay、ready和 buried分别存放对应状态的job,conn的reserved_jobs集合存储状态为reserved的job(消费者获取一个job后,job的状态才会改变为reserved,因此这个集合由conn维护);
  • delay状态的job怎么修改为ready?delay集合是一个按照时间排序的最小堆,beanstalkd不定时循环从堆根节点获取job,校验是否需要改变其状态未ready;
  • 如何实现优先级?只有ready状态的job才能被消费者获取消费,ready集合是一个按照优先级排序的最小堆,根节点始终是优先级最高得job;
  • 拉模式实现?消费者使用reserve命令获取job,beanstalkd检查消费者监听的所有tube,查找到ready的job即返回,否则阻塞消费者知道有ready状态的job产生为止;

2.2beanstalkd命令

beanstalkd支持以下命令:

clipboard.png

3.beanstalkd源码分析

3.1数据结构

3.1.1基础结构体

//堆struct Heap {    int     cap; //堆容量    int     len; //堆元素数目    void    **data; //元素数组    Less    less;   //元素比较的函数指针    Record  rec;   //函数指针,将元素插入堆时,会调用此函数}; //函数指针定义:typedef int(*Less)(void*, void*);typedef void(*Record)(void*, int); //API:元素的插入与删除void * heapremove(Heap *h, int k);int heapinsert(Heap *h, void *x)
//集合struct ms {    size_t used, cap, last; //cap为当前集合容量;used集合中元素数目;last上次访问的集合元素的位置    void **items; //存储元素的数组    ms_event_fn oninsert, onremove; //往集合插入元素,删除元素时调用的函数}; //函数指针定义如下typedef void(*ms_event_fn)(ms a, void *item, size_t i); //APIvoid ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);//初始化集合int ms_append(ms a, void *item) //往集合追加元素int ms_contains(ms a, void *item)//判断集合是否包含元素void * ms_take(ms a)  //获取并删除元素(会从上次访问的位置last开始查找)int ms_remove(ms a, void *item) //删除元素,从头开始查找int ms_clear(ms a) //清空集合

3.1.2 管道tube

struct tube {    uint refs; //引用计数    char name[MAX_TUBE_NAME_LEN]; //名称    Heap ready; //存储状态未ready的job,按照优先级排序    Heap delay; //存储状态未delayed的job,按照到期时间排序    struct ms waiting; //等待当前tube有job产生的消费者集合        int64 pause; //执行pause命令后,pause字段记录暂停时间    int64 deadline_at; //deadline_at记录暂停到达时间    struct job buried; //存储状态为buried的job,是一个链表};

创建tube的代码如下:

tube make_tube(const char *name){    tube t;     //底层调用malloc分配空间    t = new(struct tube);    if (!t) return NULL;     t->name[MAX_TUBE_NAME_LEN - 1] = '\0';    strncpy(t->name, name, MAX_TUBE_NAME_LEN - 1);    if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name");     //设置ready与delay堆的函数指针    t->ready.less = job_pri_less;    t->delay.less = job_delay_less;    t->ready.rec = job_setheappos;    t->delay.rec = job_setheappos;    t->buried = (struct job) { };    t->buried.prev = t->buried.next = &t->buried;    ms_init(&t->waiting, NULL, NULL);     return t;}

两个堆指针函数如下:

//按照优先级比较int job_pri_less(void *ax, void *bx){    job a = ax, b = bx;    if (a->r.pri < b->r.pri) return 1;    if (a->r.pri > b->r.pri) return 0;    return a->r.id < b->r.id;}//按照过期时间比较int job_delay_less(void *ax, void *bx){    job a = ax, b = bx;    if (a->r.deadline_at < b->r.deadline_at) return 1;    if (a->r.deadline_at > b->r.deadline_at) return 0;    return a->r.id < b->r.id;}//设置每个job在堆的indexvoid job_setheappos(void *j, int pos){    ((job)j)->heap_index = pos;}

3.1.3任务job

注:job创建完成后,先会保存在全局变量all_jobs的hash表中;然后才会插入到tube的各job队列中;

struct job {    Jobrec r; // 存储job信息     tube tube; //指向其所属tube    job prev, next; //job可以组织为双向链表(buried状态的job就是链表)    job ht_next; //所有的job都存储在一个hash表中(拉链法),job的id为hash值;(tube中的job集合存储指针指向各个job)    size_t heap_index; /* where is this job in its current heap */    …………     char body[];//job的数据,柔性数组}; // job的描述信息struct Jobrec {    uint64 id;    uint32 pri;    int64  delay;    int64  ttr;    int32  body_size;    int64  created_at; //创建时间    int64  deadline_at; //延迟job的过期时间     //统计计数    uint32 reserve_ct;    uint32 timeout_ct;    uint32 release_ct;    uint32 bury_ct;    uint32 kick_ct;         byte   state;//当前状态};

3.14套接字socket

struct Socket {    int    fd;    Handle f; //socket发生事件时的处理函数    void   *x; //服务器监听的socket指向server结构体;客户端对应的socket指向conn结构体    int    added; //往epoll注册事件时,计算操作类型};

3.15服务器server

struct Server {    char *port;    char *addr;         Socket sock; //监听的socket    Heap   conns; //存储即将有事件发生的客户端;按照事件发生的时间排序的最小堆;                  //例如:当客户端获取job后,若唱过TTR时间没处理完,job会状态应重置为ready状态;                  //当客户端调用reserve获取job但当前tube没有ready状态的job时,客户端会被阻塞timeout时间;};

3.1.6客户端链接conn

struct Conn {    Server *srv; //执行服务器    Socket sock; //客户端socket    char   state; //客户端状态:等待接收命令,等待接收数据,等待回复命令,等待返回job,关闭,获取job阻塞中    char   type;  //客户端类型:生产者,消费者,获取job阻塞中    Conn   *next;    tube   use;   //当前使用的tube;put命令发布的job会插入到当前tube中    int64  tickat;      //客户端处理job的TTR到期时间;或者客户端阻塞的到期时间;用于在server的conns堆比较    int    tickpos;     // 在srv->conns堆里的位置    job    soonest_job; //所有reserve任务里到期时间最近的job    int    rw;          //当前关心的事件: 'r', 'w', or 'h'(读、写、关闭连接)    int    pending_timeout; //客户端获取job而阻塞的到期时间    char   halfclosed; //表示客户端断开连接     char cmd[LINE_BUF_SIZE]; // 输入缓冲区    int  cmd_len;    int  cmd_read;     char *reply;   //输出缓冲区    int  reply_len;    int  reply_sent;    char reply_buf[LINE_BUF_SIZE];     //put命令发布job时,从客户端读入的job    int in_job_read;    job in_job;     //待返回给客户端的job    job out_job;    int out_job_sent;     //当前客户端监听的所有tube集合    struct ms  watch;    //当前客户端的所有reserved状态的job    struct job reserved_jobs;};

3.2 服务器启动过程

3.2.1 epoll简介

epoll结构体:

typedef union epoll_data {    void *ptr;    int fd;    __uint32_t u32;    __uint64_t u64;} epoll_data_t;//保存触发事件的某个fd相关的数据 struct epoll_event {    __uint32_t events;      /* epoll event */    epoll_data_t data;      /* User data variable */};//其中events表示感兴趣的事件和被触发的事件,可能的取值为://EPOLLIN:表示对应的文件描述符可以读;//EPOLLOUT:表示对应的文件描述符可以写;//EPOLLPRI:表示对应的文件描述符有紧急的数可读;//EPOLLERR:表示对应的文件描述符发生错误;//EPOLLHUP:表示对应的文件描述符被挂断;

epoll API定义如下:

int epoll_create(int size) //生成一个epoll专用的文件描述符,其中的参数是指定生成描述符的最大范围; int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) //用于控制某个文件描述符上的事件,可以注册事件,修改事件,删除事件 int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)  //轮询I/O事件的发生;

3.2.2 beanstalkd使用epoll

//创建epoll:epfd = epoll_create(1); //注册事件int sockwant(Socket *s, int rw){   …………   ev.events |= EPOLLRDHUP | EPOLLPRI;   ev.data.ptr = s; //注意:传入的是sokcet指针;(socket的x字段会指向server或者conn结构体,当socket对应的fd发生事件时,可以得到server或conn对象)    return epoll_ctl(epfd, op, s->fd, &ev);} //等待事件发生//返回h r w 事件类型int socknext(Socket **s, int64 timeout){    …………    r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000));    if (r == -1 && errno != EINTR) {        twarn("epoll_wait");        exit(1);    }     if (r > 0) {        *s = ev.data.ptr; //返回发生事件的socket        if (ev.events & (EPOLLHUP|EPOLLRDHUP)) {            return 'h';        } else if (ev.events & EPOLLIN) {            return 'r';        } else if (ev.events & EPOLLOUT) {            return 'w';        }    }    return 0;}

3.2.3服务器启动

int main(int argc, char **argv){         optparse(&srv, argv+1);//解析输入参数         r = make_server_socket(srv.addr, srv.port); //创建socket        prot_init(); //初始化全局tubes集合,创建名称为default的默认tube     srvserve(&srv);//启动服务器    return 0;} struct ms tubes;//全局变量 void prot_init(){    //初始化tube集合    ms_init(&tubes, NULL, NULL);     //创建默认tube;tube_find_or_make方法会先从tubes集合查找指定名称为tube,查找到直接返回;否则创建新的tube    TUBE_ASSIGN(default_tube, tube_find_or_make("default"));}  void srvserve(Server *s){    //s->sock为server监听的socket;设置其处理函数为srvaccept;    s->sock.x = s;    s->sock.f = (Handle)srvaccept;    s->conns.less = (Less)connless; //设置s->conns堆的函数指针    s->conns.rec = (Record)connrec;     r = listen(s->sock.fd, 1024); //监听         r = sockwant(&s->sock, 'r'); //注册到epoll         //开启循环    for (;;) {        //服务器有一些事件需要在特定时间执行,获得最早待执行事件的时间间隔,作为epoll_wait的等待时间;后面详细分析函数内部        period = prottick(s);         int rw = socknext(&sock, period); //epoll wait                 if (rw) {            sock->f(sock->x, rw); //调用socket的处理函数        }    }}//至此,服务器启动完毕,等待客户端链接

conns堆分析:

上面说过,conns存储即将有事件发生的客户端;按照事件发生的时间排序的最小堆;

例如:当客户端获取job后,若唱过TTR时间没处理完,job会状态应重置为ready状态;
当客户端调用reserve获取job但当前tube没有ready状态的job时,客户端会被阻塞timeout时间;

//堆节点比较的函数指针:int connless(Conn *a, Conn *b){    return a->tickat < b->tickat;}  //将客户端对象插入conns堆时,tickpos记录其插入的index(避免客户端重复插入;插入之前发现其tickpos>-1则先删除再插入)void connrec(Conn *c, int i){    c->tickpos = i;}

处理客户端链接请求:

void srvaccept(Server *s, int ev){    h_accept(s->sock.fd, ev, s);} void h_accept(const int fd, const short which, Server *s){         cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);         flags = fcntl(cfd, F_GETFL, 0); //获得fd标识         r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK); //设置fd非阻塞,使用epoll必须设置非阻塞,负责epoll无法同时监听多个fd         //创建conn对象;默认监听default_tube(c->watch存储所有监听的tube);默认使用default_tube(c->use)    //注意:初始化conn对象时,客户端状态为STATE_WANTCOMMAND,即等待接收客户端命令;    c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);         c->srv = s;    c->sock.x = c;    c->sock.f = (Handle)prothandle; //设置客户端处理函数    c->sock.fd = cfd;     r = sockwant(&c->sock, 'r'); //epoll注册,监听可读事件}

当客户端socket可读或可写时,会执行prothandle函数:

static void prothandle(Conn *c, int ev){    h_conn(c->sock.fd, ev, c);} static void h_conn(const int fd, const short which, Conn *c){    //客户端断开链接,标记    if (which == 'h') {        c->halfclosed = 1;    }     //客户端数据交互(根据客户端状态不同执行不同的读写操作)    conn_data(c);    //解析完命令时,执行命令    while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);    }

3.3 服务器与客户端的数据交互

beanstalkd将客户端conn分为以下几种状态:等待接受命令,等待接收数据,等待回复数据,等待返回job等;

#define STATE_WANTCOMMAND 0#define STATE_WANTDATA 1#define STATE_SENDJOB 2#define STATE_SENDWORD 3#define STATE_WAIT 4#define STATE_BITBUCKET 5#define STATE_CLOSE 6

当客户端fd可读或者可写时,服务器根据当前客户端的状态执行不同的操作:

注意:TCP是基于流的,因此存在半包、粘包问题;即,服务器一次read的命令请求数据可能不完整,或者一次read多个命令请求的数据;

//有些状态操作已省略static void conn_data(Conn *c){         switch (c->state) {    case STATE_WANTCOMMAND:        r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read); //读取命令到输入缓冲区cmd        c->cmd_read += r;         c->cmd_len = cmd_len(c); //定位\r\n,并返回命令请求开始位置到\r\n长度;如果没有\r\b说明命令请求不完全,返回0         if (c->cmd_len) return do_cmd(c); //如果读取完整的命令,则处理;否则意味着命令不完全,需要下次继续接收          break;         case STATE_WANTDATA: //只有当使用put命令发布任务时,才会携带数据;客户端状态才会成为STATE_WANTDATA;                         //而读取命令行时,已经携带了任务的必要参数,那时已经创建了任务,并存储在c->in_job字段        j = c->in_job;         r = read(c->sock.fd, j->body + c->in_job_read, j->r.body_size -c->in_job_read); //读取任务数据                 c->in_job_read += r; //记录任务读取了多少数据         maybe_enqueue_incoming_job(c); //函数会判断任务数据是否已经读取完全,完全则将任务写入tube的ready或delay队列;后面会将        break;    case STATE_SENDWORD: //回复客户端命令请求        r= write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);                 c->reply_sent += r; //已经发送的字节数         if (c->reply_sent == c->reply_len) return reset_conn(c); //如果返回数据已经发完,则重置客户端rw,关心可读事件;否则继续待发送数据状态         break;    case STATE_SENDJOB: //待发送job        j = c->out_job;         //返回数据与job        iov[0].iov_base = (void *)(c->reply + c->reply_sent);        iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */        iov[1].iov_base = j->body + c->out_job_sent;        iov[1].iov_len = j->r.body_size - c->out_job_sent;         r = writev(c->sock.fd, iov, 2);                c->reply_sent += r;        if (c->reply_sent >= c->reply_len) {            c->out_job_sent += c->reply_sent - c->reply_len;            c->reply_sent = c->reply_len;        }         if (c->out_job_sent == j->r.body_size) { //如果job的数据已经发完,则重置客户端rw,关心可读事件;否则继续待发送job            return reset_conn(c);        }        break;        }}

3.4 命令的处理过程

3.4.1查找命令

//命令执行的入口函数static void do_cmd(Conn *c){    dispatch_cmd(c);  //分发并执行命令    fill_extra_data(c); //put命令时,不仅需要执行命令,还需要接续job数据} static voiddispatch_cmd(Conn *c){    //查找命令类型    type = which_cmd(c);     //switch处理各个命令    switch (type) {        …………    }}

beanstalkd有以下命令定义:

//命令字符串#define CMD_PUT "put "#define CMD_PEEK_READY "peek-ready"#define CMD_RESERVE "reserve"#define CMD_RELEASE "release "………… //命令编码类型:#define OP_UNKNOWN 0#define OP_PUT 1#define OP_PEEKJOB 2#define OP_RESERVE 3#define OP_DELETE 4#define OP_RELEASE 5…………

查找命令其实就是字符串比较:

static int which_cmd(Conn *c){    //宏定义;比较输入缓冲区命令字符串与命令表中字符串比较,返回命令类型    #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);     //宏替换后就是一系列if语句    TEST_CMD(c->cmd, CMD_PUT, OP_PUT);    TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);    …………}

3.4.2命令1——发布任务

case OP_PUT:    r = read_pri(&pri, c->cmd + 4, &delay_buf); //解析优先级pri     r = read_delay(&delay, delay_buf, &ttr_buf); //解析delay      r = read_ttr(&ttr, ttr_buf, &size_buf); //解析ttr     body_size = strtoul(size_buf, &end_buf, 10); //解析job字节数       op_ct[type]++; //统计     if (body_size > job_data_size_limit) { //job长度超过限制;返回        return skip(c, body_size + 2, MSG_JOB_TOO_BIG);    }    //put,说明是生产者,设置conn类型为生产者    connsetproducer(c);     //初始化job结构体,存储在hash表all_jobs中    c->in_job = make_job(pri, delay, ttr, body_size + 2, c->use);     //解析客户端发来的任务数据,存储在c->in_job的body数据字段    fill_extra_data(c);     //校验job数据是否读取完毕,完了则入tube的队列    maybe_enqueue_incoming_job(c);

任务入队列:

static void maybe_enqueue_incoming_job(Conn *c){    job j = c->in_job;     //任务数据已经读取完毕,入队列(ready或者delay队列)    if (c->in_job_read == j->r.body_size) return enqueue_incoming_job(c);     //任务数据没有读取完毕,则设置客户端conn状态未等待接收数据STATE_WANTDATA    c->state = STATE_WANTDATA;} static void enqueue_incoming_job(Conn *c){    int r;    job j = c->in_job;     c->in_job = NULL; /* the connection no longer owns this job */    c->in_job_read = 0;         //入队列    r = enqueue_job(c->srv, j, j->r.delay, 1);         //返回数据;并设置conn状态为STATE_SENDWORD    reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->r.id);} static int enqueue_job(Server *s, job j, int64 delay, char update_store){    int r;     j->reserver = NULL;    if (delay) {   //入delay队列,设置任务的deadline_at        j->r.deadline_at = nanoseconds() + delay;        r = heapinsert(&j->tube->delay, j);                j->r.state = Delayed;    } else {      //入ready队列        r = heapinsert(&j->tube->ready, j);        if (!r) return 0;        j->r.state = Ready;      }     //检查有没有消费者正在阻塞等待此tube产生job,若有需要返回job;    process_queue();    return 1;}

返回命令回复给客户端:

//reply_line函数组装命令回复数据,调用reply函数;只是将数据写入到输出缓冲区,并修改了客户端状态为STATE_SENDWORD,实际发送数据在3.3节已经说过;static void reply(Conn *c, char *line, int len, int state){    if (!c) return;     connwant(c, 'w');//修改关心的事件为可写事件    c->next = dirty; //放入dirty链表    dirty = c;    c->reply = line; //输出数据缓冲区    c->reply_len = len;    c->reply_sent = 0;    c->state = state; //设置conn状态}

connwant函数实现如下:

void connwant(Conn *c, int rw){    c->rw = rw; //c->rw记录当前客户端关心的socket事件    connsched(c);}  void connsched(Conn *c){    if (c->tickpos > -1) { //c->tickpos记录当前客户端在srv->conns堆的索引;(思考:tickpos在什么时候赋值的?heap的函数指针rec)        heapremove(&c->srv->conns, c->tickpos);    }    c->tickat = conntickat(c); //计算当前客户端待发生的某个事件的时间    if (c->tickat) {        heapinsert(&c->srv->conns, c); //插入srv->conns堆    }}

问题1:connwant只是修改了conn的rw字段为‘w’,表示关心客户端的读时间,什么时候调用epoll注册呢?dirty链表又是做什么的呢?

beanstalkd有个函数update_conns负责更新客户端socket的事件到epoll;其在每次循环开始,执行epoll_wait之前都会执行;

static void update_conns(){    int r;    Conn *c;     while (dirty) { //遍历dirty链表,更新每一个conn关心的socket事件        c = dirty;        dirty = dirty->next;        c->next = NULL;        r = sockwant(&c->sock, c->rw);        if (r == -1) {            twarn("sockwant");            connclose(c);        }    }}

问题2:srv->conns存储的客户端都是在某个时间点有事件待处理的,客户端都有哪些事件需要处理呢?

  • 1)消费者获取job后,job的状态改为reserved,当TTR时间过后,如果客户端还没有处理完这个job,服务器会将这个job的状态重置为ready,以便让其他消费者可以消费;
  • 2)消费者调用reserve获取job时,假如其监听的tube没有ready状态的job,那么客户端将会被阻塞,直到有job产生,或者阻塞超时;
//计算当前客户端待处理事件的deadlinestatic int64 conntickat(Conn *c){    //客户端正在阻塞    if (conn_waiting(c)) {        margin = SAFETY_MARGIN;    }     //如果客户端有reserved状态的任务,则获取到期时间最近的;(当客户端处于阻塞状态时,应该提前SAFETY_MARGIN时间处理此事件)    //connsoonestjob:获取到期时间最近的reserved job    if (has_reserved_job(c)) {        t = connsoonestjob(c)->r.deadline_at - nanoseconds() - margin;        should_timeout = 1;    }    //客户端阻塞超时时间    if (c->pending_timeout >= 0) {        t = min(t, ((int64)c->pending_timeout) * 1000000000);        should_timeout = 1;    }     //返回时间发生的时间;后续会将此客户端插入srv->conns堆,且是按照此时间排序的;    if (should_timeout) {        return nanoseconds() + t;    }    return 0;}

问题3:当生产者新发布一个job到某个tube时,此时可能有其他消费者监听此tube,且阻塞等待job的产生,此时就需要将此job返回给消费者;处理函数为process_queue

static void process_queue(){    int64 now = nanoseconds();     while ((j = next_eligible_job(now))) { //遍历所有tube,当tube有客户端等待,且有ready状态的job时,返回job        heapremove(&j->tube->ready, j->heap_index);                 //ms_take:将客户端从此job所属tube的waiting集合中删除;并返回客户端conn        //remove_waiting_conn:从当前客户端conn监听的所有tube的waiting队列中移除自己        //reserve_job:返回此job给客户端        reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);    }} static job next_eligible_job(int64 now){    tube t;    size_t i;    job j = NULL, candidate;     //循环所有tube    for (i = 0; i < tubes.used; i++) {        t = tubes.items[i];        if (t->pause) { //假如tube正在暂停,且超时时间未到,则跳过            if (t->deadline_at > now) continue;            t->pause = 0;        }        if (t->waiting.used && t->ready.len) { //tube的waiting集合有元素说明有客户端正在阻塞等待此tube产生任务;有ready状态的任务            candidate = t->ready.data[0];   //从tubes里获取满足条件的优先级最高得job返回            if (!j || job_pri_less(candidate, j)) {                j = candidate;            }        }    }     return j;}  Conn * remove_waiting_conn(Conn *c){    tube t;    size_t i;     if (!conn_waiting(c)) return NULL;     c->type &= ~CONN_TYPE_WAITING; //去除CONN_TYPE_WAITING标志    global_stat.waiting_ct--;    for (i = 0; i < c->watch.used; i++) {  //遍历客户端监听的所有tube,挨个从tube的waiting队列中删除自己        t = c->watch.items[i];        t->stat.waiting_ct--;        ms_remove(&t->waiting, c);    }    return c;}  static void reserve_job(Conn *c, job j){    j->r.deadline_at = nanoseconds() + j->r.ttr; //job的实效时间        j->r.state = Reserved; //状态改为Reserved    job_insert(&c->reserved_jobs, j); //插入客户端的reserved_jobs链表    j->reserver = c; //记录job当前消费者        if (c->soonest_job && j->r.deadline_at < c->soonest_job->r.deadline_at) { //soonest_job记录最近要到期的Reserved状态的job,更新;        c->soonest_job = j;    }    return reply_job(c, j, MSG_RESERVED); //返回job}

3.4.3 命令2——获取任务reserve

case OP_RESERVE_TIMEOUT:         timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10); //reserve可以设置阻塞超时时间,解析     case OP_RESERVE:     op_ct[type]++;    connsetworker(c); //设置客户端类型为消费者CONN_TYPE_WORKER     //当客户端有多个任务正在处理,处于reserved状态,且超时时间即将到达时;如果此时客户端监听的所有tube都没有ready状态的任务,则直接返回MSG_DEADLINE_SOON给客户端    if (conndeadlinesoon(c) && !conn_ready(c)) {        return reply_msg(c, MSG_DEADLINE_SOON);    }     //设置当前客户端正在等待job    wait_for_job(c, timeout);     //同3.4.2节    process_queue();

上面说过,当客户端有多个任务正在处理,处于reserved状态,且超时时间即将到达时;

如果此时客户端监听的所有tube都没有ready状态的任务,则直接返回MSG_DEADLINE_SOON给客户端;

否则会导致客户端的阻塞,导致这些reserved的任务超时;

static void wait_for_job(Conn *c, int timeout){    c->state = STATE_WAIT; //设置客户端状态为STATE_WAIT    enqueue_waiting_conn(c); //将客户端添加到其监听的所有tube的waiting队列中     //设置客户端的超时时间    c->pending_timeout = timeout;     //修改关心的事件为可读事件    connwant(c, 'h');    c->next = dirty; //将当前客户端添加到dirty链表中    dirty = c;} static void enqueue_waiting_conn(Conn *c){    tube t;    size_t i;     global_stat.waiting_ct++;    c->type |= CONN_TYPE_WAITING;    for (i = 0; i < c->watch.used; i++) {   //c->watch为客户端监听的所有tube        t = c->watch.items[i];        t->stat.waiting_ct++;        ms_append(&t->waiting, c);   //t->waiting为等待当前tube有任务产生的所有客户端    }}

3.4.4 循环之始epoll_wait之前

在执行epoll_wait之前,需要计算超时时间;不能被epoll_wait一直阻塞;服务器还有很多事情待处理;

  • 1)将状态未delay的且已经到期的job移到ready队列;
  • 2)tube暂停时间到达,如果tube存在消费者阻塞等待获取job,需要返回job给客户端;
  • 3)消费者消费的状态为reserved的job可能即将超时到期;
  • 4)客户端阻塞等待job的超时时间可能即将达到;

服务器需要及时处理这些所有事情,因此epoll_wait等待时间不能过长;

int64 prottick(Server *s){         int64 period = 0x34630B8A000LL; //默认epoll_wait等待时间       now = nanoseconds();    while ((j = delay_q_peek())) {  //遍历所有tube的delay队列中过期时间已经到达或者即将的job(即将到达时间最小)        d = j->r.deadline_at - now;        if (d > 0) {            period = min(period, d); //即将到达,更新period            break;        }        j = delay_q_take();        r = enqueue_job(s, j, 0, 0); //job入队到ready队列        if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */    }     for (i = 0; i < tubes.used; i++) {        t = tubes.items[i];        d = t->deadline_at - now;        if (t->pause && d <= 0) { //tube暂停期限达到,process_queue同3.4.2节            t->pause = 0;            process_queue();        }        else if (d > 0) {            period = min(period, d); //tube暂停即将到期,更新period        }    }     while (s->conns.len) {        Conn *c = s->conns.data[0]; //循环获取conn待执行事件发生时间最早的        d = c->tickat - now;        if (d > 0) {  //发生事件未到,更新period,结束循环            period = min(period, d);            break;        }         heapremove(&s->conns, 0); //否则,移除conn,处理客户端事件        conn_timeout(c);    }     update_conns(); //更新客户端关心的socke事件,其实就是遍历dirty链表     return period;} static job delay_q_peek(){    int i;    tube t;    job j = NULL, nj;     for (i = 0; i < tubes.used; i++) {  //返回状态为delay且到期时间最小的job        t = tubes.items[i];        if (t->delay.len == 0) {            continue;        }        nj = t->delay.data[0];        if (!j || nj->r.deadline_at < j->r.deadline_at) j = nj;    }     return j;} static void conn_timeout(Conn *c){    int r, should_timeout = 0;    job j;     //客户端正在被阻塞时,如果有reserved状态的job即将到期,则需要解除客户端阻塞    //conndeadlinesoon:查询到期时间最小的reserved job,校验其是否即将到期(1秒内到期)    if (conn_waiting(c) && conndeadlinesoon(c)) should_timeout = 1;    //connsoonestjob获取到期时间最近的reserved job    while ((j = connsoonestjob(c))) {        if (j->r.deadline_at >= nanoseconds()) break;         timeout_ct++; //已经超时        j->r.timeout_ct++;        r = enqueue_job(c->srv, remove_this_reserved_job(c, j), 0, 0); //从客户端的reserved_jobs链表移除job,重新入到tube的相应job队列        if (r < 1) bury_job(c->srv, j, 0); /* out of memory, so bury it */        connsched(c); //重新计算conn待处理事件的时间,入srv->conns堆    }     if (should_timeout) {        return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON); //reserved即将到期,解除阻塞,返回MSG_DEADLINE_SOON消息    } else if (conn_waiting(c) && c->pending_timeout >= 0) { //客户端阻塞超时,解除阻塞        c->pending_timeout = -1;        return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);    }}

4、总结

本文主要介绍beanstalkd基本设计思路;从源码层次分析主要数据结构,服务器初始化过程,简要介绍了put和reserve两个命令执行过程;

beanstalkd其他的命令就不再介绍了,基本类似,感兴趣的可以自己研究。

转载地址:http://qfvjo.baihongyu.com/

你可能感兴趣的文章
ubuntu获取root权限
查看>>
C#获得枚举类型的长度
查看>>
Unity中使用射线查询MeshCollider背面的方法
查看>>
2016/05/03(接口 买票 临界资源同步锁)
查看>>
点击按钮显示隐藏的表格
查看>>
hibernate中inverse作用
查看>>
迷宫问题//bfs
查看>>
2018-2019-1 20165206 《信息安全系统设计基础》第九周学习总结
查看>>
ASP.NET MVC 防止 CSRF 的方法
查看>>
Qt笔记——入门
查看>>
bootstrap-table对前台页面表格的支持
查看>>
泛型委托Func<string,string>()
查看>>
入门讲解HTML5的画布功能(1)
查看>>
Java enum的用法详解
查看>>
(38)Spring Boot分布式Session状态保存Redis【从零开始学Spring Boot】
查看>>
使用PHP Excel类读取和生成excel文件
查看>>
【转】python包导入细节
查看>>
数据结构 i_love(我喜欢)
查看>>
LIN通讯
查看>>
Cocos Creator 为Button添加事件的两种方法
查看>>