顺风车运营研发团队 李乐
1.消息队列简介
计算机软件发展的一个重要目标是降低软件耦合性;
网站架构中,系统解耦合的重要手段就是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分为多个阶段,每个阶段之间通过共享数据的方式异步执行;
在分布式系统中,多个服务器集群通过分布式消息队列实现异步;分布式消息队列可以看作是内存队列的分布式部署;
分布式消息队列架构图通常如下所示:
消息队列是典型的生产者消费者模式,两者不存在直接调用,只要保持数据结构不变,彼此功能实现可以随意改变而不互相影响;异步消息队列还有以下特点:
- 提高系统可用性:消费者服务器发生故障时,生产者服务器可以继续处理业务请求,系统整体表现无故障;此时数据会在消息队列服务器堆积,待消费者服务器恢复后,可以继续处理消息队列中的数据;
- 加快网站相应速度:业务处理前端的生产者服务器在处理完业务请求后,将数据写入消息队列,不需要等待消费者服务器处理就可以返回,减少响应延迟;
- 消除并发访问高峰:用户访问是随机的,存在高峰和低谷;可以使用消息队列将突然增加的访问请求数据放入消息队列中,等待消费者服务器依次处理;
消费者消费消息时,通常有两种模式可以选择:拉模型与推模型。
- 拉模型是由消息的消费者发起的,主动权把握在消费者手中,它会根据自己的情况对生产者发起调用;
- 推模式消费者只会被动接受消息,消息队列一旦发现消息进入,就会通知消费者执行对消息的处理;
2.beanstalkd基本知识
2.1beanstalkd简介
beanstalkd是一个轻量级的消息队列;主要有一下特点:
- 拉模式,消费者需要主动从服务器拉取消息数据;
- tube:类似于消息主题topic,一个beanstalkd中可以支持多个tube,每个tube都有自己的producer和consumer;多个生产者可以往同一个tube生产job,多个消费者也能监听同一个tube获取job;
- job:代替了传统的message,与消息最大的区别是,job有多种状态;
- conn:代表一个客户端链接;
- 优先级:job可以有0~2^32个优先级,0代表最高优先级,beanstalkd使用堆处理job的优先级排序,因此reserve命令的时间复杂度是O(logN);
- 延时:生产者发布任务时可以指定延时,到达延迟时间后,job才能被消费者消费;
- 超时机制:消费者从beanstalkd获取一个job后,必须在预设的 TTR (time-to-run) 时间内处理完任务,并发送 delete / release/ bury 命令改变任务状态;否则 Beanstalkd 会认为消息消费失败,重置job状态,使其可以被其他消费者消费。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从重新计时TTR;
- 暂停:pause命令可以暂停当前tube,暂停时期内所有job都不能够被消费者消费;
job有一下几种状态:
- READY,需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
- DELAYED,延迟执行的任务,;
- RESERVED,已经被消费者获取, 正在执行的任务,Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
- BURIED,保留的任务: 任务不会被执行,也不会消失,除非有人将他修改为其他状态;
- DELETED,消息被彻底删除。Beanstalkd 不再维持这些消息。
状态之间的转移图如下所示:
思考:
- beanstalkd如何维护job的状态?tube有3个集合delay、ready和 buried分别存放对应状态的job,conn的reserved_jobs集合存储状态为reserved的job(消费者获取一个job后,job的状态才会改变为reserved,因此这个集合由conn维护);
- delay状态的job怎么修改为ready?delay集合是一个按照时间排序的最小堆,beanstalkd不定时循环从堆根节点获取job,校验是否需要改变其状态未ready;
- 如何实现优先级?只有ready状态的job才能被消费者获取消费,ready集合是一个按照优先级排序的最小堆,根节点始终是优先级最高得job;
- 拉模式实现?消费者使用reserve命令获取job,beanstalkd检查消费者监听的所有tube,查找到ready的job即返回,否则阻塞消费者知道有ready状态的job产生为止;
2.2beanstalkd命令
beanstalkd支持以下命令:
3.beanstalkd源码分析
3.1数据结构
3.1.1基础结构体
//堆struct Heap { int cap; //堆容量 int len; //堆元素数目 void **data; //元素数组 Less less; //元素比较的函数指针 Record rec; //函数指针,将元素插入堆时,会调用此函数}; //函数指针定义:typedef int(*Less)(void*, void*);typedef void(*Record)(void*, int); //API:元素的插入与删除void * heapremove(Heap *h, int k);int heapinsert(Heap *h, void *x)
//集合struct ms { size_t used, cap, last; //cap为当前集合容量;used集合中元素数目;last上次访问的集合元素的位置 void **items; //存储元素的数组 ms_event_fn oninsert, onremove; //往集合插入元素,删除元素时调用的函数}; //函数指针定义如下typedef void(*ms_event_fn)(ms a, void *item, size_t i); //APIvoid ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);//初始化集合int ms_append(ms a, void *item) //往集合追加元素int ms_contains(ms a, void *item)//判断集合是否包含元素void * ms_take(ms a) //获取并删除元素(会从上次访问的位置last开始查找)int ms_remove(ms a, void *item) //删除元素,从头开始查找int ms_clear(ms a) //清空集合
3.1.2 管道tube
struct tube { uint refs; //引用计数 char name[MAX_TUBE_NAME_LEN]; //名称 Heap ready; //存储状态未ready的job,按照优先级排序 Heap delay; //存储状态未delayed的job,按照到期时间排序 struct ms waiting; //等待当前tube有job产生的消费者集合 int64 pause; //执行pause命令后,pause字段记录暂停时间 int64 deadline_at; //deadline_at记录暂停到达时间 struct job buried; //存储状态为buried的job,是一个链表};
创建tube的代码如下:
tube make_tube(const char *name){ tube t; //底层调用malloc分配空间 t = new(struct tube); if (!t) return NULL; t->name[MAX_TUBE_NAME_LEN - 1] = '\0'; strncpy(t->name, name, MAX_TUBE_NAME_LEN - 1); if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name"); //设置ready与delay堆的函数指针 t->ready.less = job_pri_less; t->delay.less = job_delay_less; t->ready.rec = job_setheappos; t->delay.rec = job_setheappos; t->buried = (struct job) { }; t->buried.prev = t->buried.next = &t->buried; ms_init(&t->waiting, NULL, NULL); return t;}
两个堆指针函数如下:
//按照优先级比较int job_pri_less(void *ax, void *bx){ job a = ax, b = bx; if (a->r.pri < b->r.pri) return 1; if (a->r.pri > b->r.pri) return 0; return a->r.id < b->r.id;}//按照过期时间比较int job_delay_less(void *ax, void *bx){ job a = ax, b = bx; if (a->r.deadline_at < b->r.deadline_at) return 1; if (a->r.deadline_at > b->r.deadline_at) return 0; return a->r.id < b->r.id;}//设置每个job在堆的indexvoid job_setheappos(void *j, int pos){ ((job)j)->heap_index = pos;}
3.1.3任务job
注:job创建完成后,先会保存在全局变量all_jobs的hash表中;然后才会插入到tube的各job队列中;
struct job { Jobrec r; // 存储job信息 tube tube; //指向其所属tube job prev, next; //job可以组织为双向链表(buried状态的job就是链表) job ht_next; //所有的job都存储在一个hash表中(拉链法),job的id为hash值;(tube中的job集合存储指针指向各个job) size_t heap_index; /* where is this job in its current heap */ ………… char body[];//job的数据,柔性数组}; // job的描述信息struct Jobrec { uint64 id; uint32 pri; int64 delay; int64 ttr; int32 body_size; int64 created_at; //创建时间 int64 deadline_at; //延迟job的过期时间 //统计计数 uint32 reserve_ct; uint32 timeout_ct; uint32 release_ct; uint32 bury_ct; uint32 kick_ct; byte state;//当前状态};
3.14套接字socket
struct Socket { int fd; Handle f; //socket发生事件时的处理函数 void *x; //服务器监听的socket指向server结构体;客户端对应的socket指向conn结构体 int added; //往epoll注册事件时,计算操作类型};
3.15服务器server
struct Server { char *port; char *addr; Socket sock; //监听的socket Heap conns; //存储即将有事件发生的客户端;按照事件发生的时间排序的最小堆; //例如:当客户端获取job后,若唱过TTR时间没处理完,job会状态应重置为ready状态; //当客户端调用reserve获取job但当前tube没有ready状态的job时,客户端会被阻塞timeout时间;};
3.1.6客户端链接conn
struct Conn { Server *srv; //执行服务器 Socket sock; //客户端socket char state; //客户端状态:等待接收命令,等待接收数据,等待回复命令,等待返回job,关闭,获取job阻塞中 char type; //客户端类型:生产者,消费者,获取job阻塞中 Conn *next; tube use; //当前使用的tube;put命令发布的job会插入到当前tube中 int64 tickat; //客户端处理job的TTR到期时间;或者客户端阻塞的到期时间;用于在server的conns堆比较 int tickpos; // 在srv->conns堆里的位置 job soonest_job; //所有reserve任务里到期时间最近的job int rw; //当前关心的事件: 'r', 'w', or 'h'(读、写、关闭连接) int pending_timeout; //客户端获取job而阻塞的到期时间 char halfclosed; //表示客户端断开连接 char cmd[LINE_BUF_SIZE]; // 输入缓冲区 int cmd_len; int cmd_read; char *reply; //输出缓冲区 int reply_len; int reply_sent; char reply_buf[LINE_BUF_SIZE]; //put命令发布job时,从客户端读入的job int in_job_read; job in_job; //待返回给客户端的job job out_job; int out_job_sent; //当前客户端监听的所有tube集合 struct ms watch; //当前客户端的所有reserved状态的job struct job reserved_jobs;};
3.2 服务器启动过程
3.2.1 epoll简介
epoll结构体:
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64;} epoll_data_t;//保存触发事件的某个fd相关的数据 struct epoll_event { __uint32_t events; /* epoll event */ epoll_data_t data; /* User data variable */};//其中events表示感兴趣的事件和被触发的事件,可能的取值为://EPOLLIN:表示对应的文件描述符可以读;//EPOLLOUT:表示对应的文件描述符可以写;//EPOLLPRI:表示对应的文件描述符有紧急的数可读;//EPOLLERR:表示对应的文件描述符发生错误;//EPOLLHUP:表示对应的文件描述符被挂断;
epoll API定义如下:
int epoll_create(int size) //生成一个epoll专用的文件描述符,其中的参数是指定生成描述符的最大范围; int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) //用于控制某个文件描述符上的事件,可以注册事件,修改事件,删除事件 int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout) //轮询I/O事件的发生;
3.2.2 beanstalkd使用epoll
//创建epoll:epfd = epoll_create(1); //注册事件int sockwant(Socket *s, int rw){ ………… ev.events |= EPOLLRDHUP | EPOLLPRI; ev.data.ptr = s; //注意:传入的是sokcet指针;(socket的x字段会指向server或者conn结构体,当socket对应的fd发生事件时,可以得到server或conn对象) return epoll_ctl(epfd, op, s->fd, &ev);} //等待事件发生//返回h r w 事件类型int socknext(Socket **s, int64 timeout){ ………… r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000)); if (r == -1 && errno != EINTR) { twarn("epoll_wait"); exit(1); } if (r > 0) { *s = ev.data.ptr; //返回发生事件的socket if (ev.events & (EPOLLHUP|EPOLLRDHUP)) { return 'h'; } else if (ev.events & EPOLLIN) { return 'r'; } else if (ev.events & EPOLLOUT) { return 'w'; } } return 0;}
3.2.3服务器启动
int main(int argc, char **argv){ optparse(&srv, argv+1);//解析输入参数 r = make_server_socket(srv.addr, srv.port); //创建socket prot_init(); //初始化全局tubes集合,创建名称为default的默认tube srvserve(&srv);//启动服务器 return 0;} struct ms tubes;//全局变量 void prot_init(){ //初始化tube集合 ms_init(&tubes, NULL, NULL); //创建默认tube;tube_find_or_make方法会先从tubes集合查找指定名称为tube,查找到直接返回;否则创建新的tube TUBE_ASSIGN(default_tube, tube_find_or_make("default"));} void srvserve(Server *s){ //s->sock为server监听的socket;设置其处理函数为srvaccept; s->sock.x = s; s->sock.f = (Handle)srvaccept; s->conns.less = (Less)connless; //设置s->conns堆的函数指针 s->conns.rec = (Record)connrec; r = listen(s->sock.fd, 1024); //监听 r = sockwant(&s->sock, 'r'); //注册到epoll //开启循环 for (;;) { //服务器有一些事件需要在特定时间执行,获得最早待执行事件的时间间隔,作为epoll_wait的等待时间;后面详细分析函数内部 period = prottick(s); int rw = socknext(&sock, period); //epoll wait if (rw) { sock->f(sock->x, rw); //调用socket的处理函数 } }}//至此,服务器启动完毕,等待客户端链接
conns堆分析:
上面说过,conns存储即将有事件发生的客户端;按照事件发生的时间排序的最小堆;
例如:当客户端获取job后,若唱过TTR时间没处理完,job会状态应重置为ready状态;当客户端调用reserve获取job但当前tube没有ready状态的job时,客户端会被阻塞timeout时间;//堆节点比较的函数指针:int connless(Conn *a, Conn *b){ return a->tickat < b->tickat;} //将客户端对象插入conns堆时,tickpos记录其插入的index(避免客户端重复插入;插入之前发现其tickpos>-1则先删除再插入)void connrec(Conn *c, int i){ c->tickpos = i;}
处理客户端链接请求:
void srvaccept(Server *s, int ev){ h_accept(s->sock.fd, ev, s);} void h_accept(const int fd, const short which, Server *s){ cfd = accept(fd, (struct sockaddr *)&addr, &addrlen); flags = fcntl(cfd, F_GETFL, 0); //获得fd标识 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK); //设置fd非阻塞,使用epoll必须设置非阻塞,负责epoll无法同时监听多个fd //创建conn对象;默认监听default_tube(c->watch存储所有监听的tube);默认使用default_tube(c->use) //注意:初始化conn对象时,客户端状态为STATE_WANTCOMMAND,即等待接收客户端命令; c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube); c->srv = s; c->sock.x = c; c->sock.f = (Handle)prothandle; //设置客户端处理函数 c->sock.fd = cfd; r = sockwant(&c->sock, 'r'); //epoll注册,监听可读事件}
当客户端socket可读或可写时,会执行prothandle函数:
static void prothandle(Conn *c, int ev){ h_conn(c->sock.fd, ev, c);} static void h_conn(const int fd, const short which, Conn *c){ //客户端断开链接,标记 if (which == 'h') { c->halfclosed = 1; } //客户端数据交互(根据客户端状态不同执行不同的读写操作) conn_data(c); //解析完命令时,执行命令 while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c); }
3.3 服务器与客户端的数据交互
beanstalkd将客户端conn分为以下几种状态:等待接受命令,等待接收数据,等待回复数据,等待返回job等;
#define STATE_WANTCOMMAND 0#define STATE_WANTDATA 1#define STATE_SENDJOB 2#define STATE_SENDWORD 3#define STATE_WAIT 4#define STATE_BITBUCKET 5#define STATE_CLOSE 6
当客户端fd可读或者可写时,服务器根据当前客户端的状态执行不同的操作:
注意:TCP是基于流的,因此存在半包、粘包问题;即,服务器一次read的命令请求数据可能不完整,或者一次read多个命令请求的数据;
//有些状态操作已省略static void conn_data(Conn *c){ switch (c->state) { case STATE_WANTCOMMAND: r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read); //读取命令到输入缓冲区cmd c->cmd_read += r; c->cmd_len = cmd_len(c); //定位\r\n,并返回命令请求开始位置到\r\n长度;如果没有\r\b说明命令请求不完全,返回0 if (c->cmd_len) return do_cmd(c); //如果读取完整的命令,则处理;否则意味着命令不完全,需要下次继续接收 break; case STATE_WANTDATA: //只有当使用put命令发布任务时,才会携带数据;客户端状态才会成为STATE_WANTDATA; //而读取命令行时,已经携带了任务的必要参数,那时已经创建了任务,并存储在c->in_job字段 j = c->in_job; r = read(c->sock.fd, j->body + c->in_job_read, j->r.body_size -c->in_job_read); //读取任务数据 c->in_job_read += r; //记录任务读取了多少数据 maybe_enqueue_incoming_job(c); //函数会判断任务数据是否已经读取完全,完全则将任务写入tube的ready或delay队列;后面会将 break; case STATE_SENDWORD: //回复客户端命令请求 r= write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent); c->reply_sent += r; //已经发送的字节数 if (c->reply_sent == c->reply_len) return reset_conn(c); //如果返回数据已经发完,则重置客户端rw,关心可读事件;否则继续待发送数据状态 break; case STATE_SENDJOB: //待发送job j = c->out_job; //返回数据与job iov[0].iov_base = (void *)(c->reply + c->reply_sent); iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */ iov[1].iov_base = j->body + c->out_job_sent; iov[1].iov_len = j->r.body_size - c->out_job_sent; r = writev(c->sock.fd, iov, 2); c->reply_sent += r; if (c->reply_sent >= c->reply_len) { c->out_job_sent += c->reply_sent - c->reply_len; c->reply_sent = c->reply_len; } if (c->out_job_sent == j->r.body_size) { //如果job的数据已经发完,则重置客户端rw,关心可读事件;否则继续待发送job return reset_conn(c); } break; }}
3.4 命令的处理过程
3.4.1查找命令
//命令执行的入口函数static void do_cmd(Conn *c){ dispatch_cmd(c); //分发并执行命令 fill_extra_data(c); //put命令时,不仅需要执行命令,还需要接续job数据} static voiddispatch_cmd(Conn *c){ //查找命令类型 type = which_cmd(c); //switch处理各个命令 switch (type) { ………… }}
beanstalkd有以下命令定义:
//命令字符串#define CMD_PUT "put "#define CMD_PEEK_READY "peek-ready"#define CMD_RESERVE "reserve"#define CMD_RELEASE "release "………… //命令编码类型:#define OP_UNKNOWN 0#define OP_PUT 1#define OP_PEEKJOB 2#define OP_RESERVE 3#define OP_DELETE 4#define OP_RELEASE 5…………
查找命令其实就是字符串比较:
static int which_cmd(Conn *c){ //宏定义;比较输入缓冲区命令字符串与命令表中字符串比较,返回命令类型 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o); //宏替换后就是一系列if语句 TEST_CMD(c->cmd, CMD_PUT, OP_PUT); TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB); …………}
3.4.2命令1——发布任务
case OP_PUT: r = read_pri(&pri, c->cmd + 4, &delay_buf); //解析优先级pri r = read_delay(&delay, delay_buf, &ttr_buf); //解析delay r = read_ttr(&ttr, ttr_buf, &size_buf); //解析ttr body_size = strtoul(size_buf, &end_buf, 10); //解析job字节数 op_ct[type]++; //统计 if (body_size > job_data_size_limit) { //job长度超过限制;返回 return skip(c, body_size + 2, MSG_JOB_TOO_BIG); } //put,说明是生产者,设置conn类型为生产者 connsetproducer(c); //初始化job结构体,存储在hash表all_jobs中 c->in_job = make_job(pri, delay, ttr, body_size + 2, c->use); //解析客户端发来的任务数据,存储在c->in_job的body数据字段 fill_extra_data(c); //校验job数据是否读取完毕,完了则入tube的队列 maybe_enqueue_incoming_job(c);
任务入队列:
static void maybe_enqueue_incoming_job(Conn *c){ job j = c->in_job; //任务数据已经读取完毕,入队列(ready或者delay队列) if (c->in_job_read == j->r.body_size) return enqueue_incoming_job(c); //任务数据没有读取完毕,则设置客户端conn状态未等待接收数据STATE_WANTDATA c->state = STATE_WANTDATA;} static void enqueue_incoming_job(Conn *c){ int r; job j = c->in_job; c->in_job = NULL; /* the connection no longer owns this job */ c->in_job_read = 0; //入队列 r = enqueue_job(c->srv, j, j->r.delay, 1); //返回数据;并设置conn状态为STATE_SENDWORD reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->r.id);} static int enqueue_job(Server *s, job j, int64 delay, char update_store){ int r; j->reserver = NULL; if (delay) { //入delay队列,设置任务的deadline_at j->r.deadline_at = nanoseconds() + delay; r = heapinsert(&j->tube->delay, j); j->r.state = Delayed; } else { //入ready队列 r = heapinsert(&j->tube->ready, j); if (!r) return 0; j->r.state = Ready; } //检查有没有消费者正在阻塞等待此tube产生job,若有需要返回job; process_queue(); return 1;}
返回命令回复给客户端:
//reply_line函数组装命令回复数据,调用reply函数;只是将数据写入到输出缓冲区,并修改了客户端状态为STATE_SENDWORD,实际发送数据在3.3节已经说过;static void reply(Conn *c, char *line, int len, int state){ if (!c) return; connwant(c, 'w');//修改关心的事件为可写事件 c->next = dirty; //放入dirty链表 dirty = c; c->reply = line; //输出数据缓冲区 c->reply_len = len; c->reply_sent = 0; c->state = state; //设置conn状态}
connwant函数实现如下:
void connwant(Conn *c, int rw){ c->rw = rw; //c->rw记录当前客户端关心的socket事件 connsched(c);} void connsched(Conn *c){ if (c->tickpos > -1) { //c->tickpos记录当前客户端在srv->conns堆的索引;(思考:tickpos在什么时候赋值的?heap的函数指针rec) heapremove(&c->srv->conns, c->tickpos); } c->tickat = conntickat(c); //计算当前客户端待发生的某个事件的时间 if (c->tickat) { heapinsert(&c->srv->conns, c); //插入srv->conns堆 }}
问题1:connwant只是修改了conn的rw字段为‘w’,表示关心客户端的读时间,什么时候调用epoll注册呢?dirty链表又是做什么的呢?
beanstalkd有个函数update_conns负责更新客户端socket的事件到epoll;其在每次循环开始,执行epoll_wait之前都会执行;
static void update_conns(){ int r; Conn *c; while (dirty) { //遍历dirty链表,更新每一个conn关心的socket事件 c = dirty; dirty = dirty->next; c->next = NULL; r = sockwant(&c->sock, c->rw); if (r == -1) { twarn("sockwant"); connclose(c); } }}
问题2:srv->conns存储的客户端都是在某个时间点有事件待处理的,客户端都有哪些事件需要处理呢?
- 1)消费者获取job后,job的状态改为reserved,当TTR时间过后,如果客户端还没有处理完这个job,服务器会将这个job的状态重置为ready,以便让其他消费者可以消费;
- 2)消费者调用reserve获取job时,假如其监听的tube没有ready状态的job,那么客户端将会被阻塞,直到有job产生,或者阻塞超时;
//计算当前客户端待处理事件的deadlinestatic int64 conntickat(Conn *c){ //客户端正在阻塞 if (conn_waiting(c)) { margin = SAFETY_MARGIN; } //如果客户端有reserved状态的任务,则获取到期时间最近的;(当客户端处于阻塞状态时,应该提前SAFETY_MARGIN时间处理此事件) //connsoonestjob:获取到期时间最近的reserved job if (has_reserved_job(c)) { t = connsoonestjob(c)->r.deadline_at - nanoseconds() - margin; should_timeout = 1; } //客户端阻塞超时时间 if (c->pending_timeout >= 0) { t = min(t, ((int64)c->pending_timeout) * 1000000000); should_timeout = 1; } //返回时间发生的时间;后续会将此客户端插入srv->conns堆,且是按照此时间排序的; if (should_timeout) { return nanoseconds() + t; } return 0;}
问题3:当生产者新发布一个job到某个tube时,此时可能有其他消费者监听此tube,且阻塞等待job的产生,此时就需要将此job返回给消费者;处理函数为process_queue
static void process_queue(){ int64 now = nanoseconds(); while ((j = next_eligible_job(now))) { //遍历所有tube,当tube有客户端等待,且有ready状态的job时,返回job heapremove(&j->tube->ready, j->heap_index); //ms_take:将客户端从此job所属tube的waiting集合中删除;并返回客户端conn //remove_waiting_conn:从当前客户端conn监听的所有tube的waiting队列中移除自己 //reserve_job:返回此job给客户端 reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j); }} static job next_eligible_job(int64 now){ tube t; size_t i; job j = NULL, candidate; //循环所有tube for (i = 0; i < tubes.used; i++) { t = tubes.items[i]; if (t->pause) { //假如tube正在暂停,且超时时间未到,则跳过 if (t->deadline_at > now) continue; t->pause = 0; } if (t->waiting.used && t->ready.len) { //tube的waiting集合有元素说明有客户端正在阻塞等待此tube产生任务;有ready状态的任务 candidate = t->ready.data[0]; //从tubes里获取满足条件的优先级最高得job返回 if (!j || job_pri_less(candidate, j)) { j = candidate; } } } return j;} Conn * remove_waiting_conn(Conn *c){ tube t; size_t i; if (!conn_waiting(c)) return NULL; c->type &= ~CONN_TYPE_WAITING; //去除CONN_TYPE_WAITING标志 global_stat.waiting_ct--; for (i = 0; i < c->watch.used; i++) { //遍历客户端监听的所有tube,挨个从tube的waiting队列中删除自己 t = c->watch.items[i]; t->stat.waiting_ct--; ms_remove(&t->waiting, c); } return c;} static void reserve_job(Conn *c, job j){ j->r.deadline_at = nanoseconds() + j->r.ttr; //job的实效时间 j->r.state = Reserved; //状态改为Reserved job_insert(&c->reserved_jobs, j); //插入客户端的reserved_jobs链表 j->reserver = c; //记录job当前消费者 if (c->soonest_job && j->r.deadline_at < c->soonest_job->r.deadline_at) { //soonest_job记录最近要到期的Reserved状态的job,更新; c->soonest_job = j; } return reply_job(c, j, MSG_RESERVED); //返回job}
3.4.3 命令2——获取任务reserve
case OP_RESERVE_TIMEOUT: timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10); //reserve可以设置阻塞超时时间,解析 case OP_RESERVE: op_ct[type]++; connsetworker(c); //设置客户端类型为消费者CONN_TYPE_WORKER //当客户端有多个任务正在处理,处于reserved状态,且超时时间即将到达时;如果此时客户端监听的所有tube都没有ready状态的任务,则直接返回MSG_DEADLINE_SOON给客户端 if (conndeadlinesoon(c) && !conn_ready(c)) { return reply_msg(c, MSG_DEADLINE_SOON); } //设置当前客户端正在等待job wait_for_job(c, timeout); //同3.4.2节 process_queue();
上面说过,当客户端有多个任务正在处理,处于reserved状态,且超时时间即将到达时;
如果此时客户端监听的所有tube都没有ready状态的任务,则直接返回MSG_DEADLINE_SOON给客户端;
否则会导致客户端的阻塞,导致这些reserved的任务超时;
static void wait_for_job(Conn *c, int timeout){ c->state = STATE_WAIT; //设置客户端状态为STATE_WAIT enqueue_waiting_conn(c); //将客户端添加到其监听的所有tube的waiting队列中 //设置客户端的超时时间 c->pending_timeout = timeout; //修改关心的事件为可读事件 connwant(c, 'h'); c->next = dirty; //将当前客户端添加到dirty链表中 dirty = c;} static void enqueue_waiting_conn(Conn *c){ tube t; size_t i; global_stat.waiting_ct++; c->type |= CONN_TYPE_WAITING; for (i = 0; i < c->watch.used; i++) { //c->watch为客户端监听的所有tube t = c->watch.items[i]; t->stat.waiting_ct++; ms_append(&t->waiting, c); //t->waiting为等待当前tube有任务产生的所有客户端 }}
3.4.4 循环之始epoll_wait之前
在执行epoll_wait之前,需要计算超时时间;不能被epoll_wait一直阻塞;服务器还有很多事情待处理;
- 1)将状态未delay的且已经到期的job移到ready队列;
- 2)tube暂停时间到达,如果tube存在消费者阻塞等待获取job,需要返回job给客户端;
- 3)消费者消费的状态为reserved的job可能即将超时到期;
- 4)客户端阻塞等待job的超时时间可能即将达到;
服务器需要及时处理这些所有事情,因此epoll_wait等待时间不能过长;
int64 prottick(Server *s){ int64 period = 0x34630B8A000LL; //默认epoll_wait等待时间 now = nanoseconds(); while ((j = delay_q_peek())) { //遍历所有tube的delay队列中过期时间已经到达或者即将的job(即将到达时间最小) d = j->r.deadline_at - now; if (d > 0) { period = min(period, d); //即将到达,更新period break; } j = delay_q_take(); r = enqueue_job(s, j, 0, 0); //job入队到ready队列 if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */ } for (i = 0; i < tubes.used; i++) { t = tubes.items[i]; d = t->deadline_at - now; if (t->pause && d <= 0) { //tube暂停期限达到,process_queue同3.4.2节 t->pause = 0; process_queue(); } else if (d > 0) { period = min(period, d); //tube暂停即将到期,更新period } } while (s->conns.len) { Conn *c = s->conns.data[0]; //循环获取conn待执行事件发生时间最早的 d = c->tickat - now; if (d > 0) { //发生事件未到,更新period,结束循环 period = min(period, d); break; } heapremove(&s->conns, 0); //否则,移除conn,处理客户端事件 conn_timeout(c); } update_conns(); //更新客户端关心的socke事件,其实就是遍历dirty链表 return period;} static job delay_q_peek(){ int i; tube t; job j = NULL, nj; for (i = 0; i < tubes.used; i++) { //返回状态为delay且到期时间最小的job t = tubes.items[i]; if (t->delay.len == 0) { continue; } nj = t->delay.data[0]; if (!j || nj->r.deadline_at < j->r.deadline_at) j = nj; } return j;} static void conn_timeout(Conn *c){ int r, should_timeout = 0; job j; //客户端正在被阻塞时,如果有reserved状态的job即将到期,则需要解除客户端阻塞 //conndeadlinesoon:查询到期时间最小的reserved job,校验其是否即将到期(1秒内到期) if (conn_waiting(c) && conndeadlinesoon(c)) should_timeout = 1; //connsoonestjob获取到期时间最近的reserved job while ((j = connsoonestjob(c))) { if (j->r.deadline_at >= nanoseconds()) break; timeout_ct++; //已经超时 j->r.timeout_ct++; r = enqueue_job(c->srv, remove_this_reserved_job(c, j), 0, 0); //从客户端的reserved_jobs链表移除job,重新入到tube的相应job队列 if (r < 1) bury_job(c->srv, j, 0); /* out of memory, so bury it */ connsched(c); //重新计算conn待处理事件的时间,入srv->conns堆 } if (should_timeout) { return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON); //reserved即将到期,解除阻塞,返回MSG_DEADLINE_SOON消息 } else if (conn_waiting(c) && c->pending_timeout >= 0) { //客户端阻塞超时,解除阻塞 c->pending_timeout = -1; return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT); }}
4、总结
本文主要介绍beanstalkd基本设计思路;从源码层次分析主要数据结构,服务器初始化过程,简要介绍了put和reserve两个命令执行过程;
beanstalkd其他的命令就不再介绍了,基本类似,感兴趣的可以自己研究。