commit 741b69be96397e0ec6db0c84b4ead4f41363ea98 from: Omar Polo date: Sun Sep 26 17:00:07 2021 UTC fastcgi completely asynchronous This changes the fastcgi implementation from a blocking I/O to an async implementation on top of libevent' bufferevents. Should improve the responsiveness of gmid especially when using remote fastcgi applications. commit - 83fe545a2b8c892e70ecf6b48180c27e6bc6b414 commit + 741b69be96397e0ec6db0c84b4ead4f41363ea98 blob - 75c72b253ff38288b44f689a109f4cbf5c4bd69c blob + 9678bec7aebc21727d68e7f630e3247ddab9d9c7 --- fcgi.c +++ fcgi.c @@ -149,7 +149,7 @@ prepare_header(struct fcgi_header *h, int type, int id } static int -fcgi_begin_request(int sock, int id) +fcgi_begin_request(struct bufferevent *bev, int id) { struct fcgi_begin_req_record r; @@ -165,13 +165,14 @@ fcgi_begin_request(int sock, int id) r.body.role0 = FCGI_RESPONDER; r.body.flags = FCGI_KEEP_CONN; - if (write(sock, &r, sizeof(r)) != sizeof(r)) + if (bufferevent_write(bev, &r, sizeof(r)) == -1) return -1; return 0; } static int -fcgi_send_param(int sock, int id, const char *name, const char *value) +fcgi_send_param(struct bufferevent *bev, int id, const char *name, + const char *value) { struct fcgi_header h; uint32_t namlen, vallen, padlen; @@ -196,92 +197,41 @@ fcgi_send_param(int sock, int id, const char *name, co prepare_header(&h, FCGI_PARAMS, id, size, padlen); - if (write(sock, &h, sizeof(h)) != sizeof(h) || - write(sock, s, sizeof(s)) != sizeof(s) || - write(sock, name, namlen) != namlen || - write(sock, value, vallen) != vallen || - write(sock, padding, padlen) != padlen) + if (bufferevent_write(bev, &h, sizeof(h)) == -1 || + bufferevent_write(bev, s, sizeof(s)) == -1 || + bufferevent_write(bev, name, namlen) == -1 || + bufferevent_write(bev, value, vallen) == -1 || + bufferevent_write(bev, padding, padlen) == -1) return -1; return 0; } static int -fcgi_end_param(int sock, int id) +fcgi_end_param(struct bufferevent *bev, int id) { struct fcgi_header h; prepare_header(&h, FCGI_PARAMS, id, 0, 0); - if (write(sock, &h, sizeof(h)) != sizeof(h)) + if (bufferevent_write(bev, &h, sizeof(h)) == -1) return -1; prepare_header(&h, FCGI_STDIN, id, 0, 0); - if (write(sock, &h, sizeof(h)) != sizeof(h)) + if (bufferevent_write(bev, &h, sizeof(h)) == -1) return -1; return 0; } static int -fcgi_abort_request(int sock, int id) +fcgi_abort_request(struct bufferevent *bev, int id) { struct fcgi_header h; prepare_header(&h, FCGI_ABORT_REQUEST, id, 0, 0); - if (write(sock, &h, sizeof(h)) != sizeof(h)) + if (bufferevent_write(bev, &h, sizeof(h)) == -1) return -1; - - return 0; -} - -static int -must_read(int sock, char *d, size_t len) -{ - ssize_t r; - -#if DEBUG_FCGI - if (debug_socket == -1) { - struct sockaddr_un addr; - - if ((debug_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) - err(1, "socket"); - - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - strlcpy(addr.sun_path, "./debug.sock", sizeof(addr.sun_path)); - if (connect(debug_socket, (struct sockaddr*)&addr, sizeof(addr)) - == -1) - err(1, "connect"); - } -#endif - for (;;) { - switch (r = read(sock, d, len)) { - case -1: - case 0: - return -1; - default: -#if DEBUG_FCGI - write(debug_socket, d, r); -#endif - - if (r == (ssize_t)len) - return 0; - len -= r; - d += r; - } - } -} - -static int -fcgi_read_header(int sock, struct fcgi_header *h) -{ - if (must_read(sock, (char*)h, sizeof(*h)) == -1) - return -1; - if (h->version != FCGI_VERSION_1) { - errno = EINVAL; - return -1; - } return 0; } @@ -346,128 +296,160 @@ end: c->next(0, 0, c); } -static int -consume(int fd, size_t len) -{ - size_t l; - char buf[64]; - - while (len != 0) { - if ((l = len) > sizeof(buf)) - l = sizeof(buf); - if (must_read(fd, buf, l) == -1) - return 0; - len -= l; - } - - return 1; -} - -static void -close_all(struct fcgi *f) -{ - size_t i; - struct client *c; - - for (i = 0; i < MAX_USERS; i++) { - c = &clients[i]; - - if (c->fcgi != f->id) - continue; - - if (c->code != 0) - close_conn(0, 0, c); - else - start_reply(c, CGI_ERROR, "CGI error"); - } - - fcgi_close_backend(f); -} - void fcgi_close_backend(struct fcgi *f) { - event_del(&f->e); - close(f->fd); - f->fd = -1; - f->pending = 0; - f->s = FCGI_OFF; + bufferevent_free(f->bev); + f->bev = NULL; + close(fcgi->fd); + fcgi->fd = -1; + fcgi->pending = 0; + fcgi->s = FCGI_OFF; } void -handle_fcgi(int sock, short event, void *d) +fcgi_read(struct bufferevent *bev, void *d) { - struct fcgi *f = d; - struct fcgi_header h; + struct fcgi *fcgi = d; + struct evbuffer *src = EVBUFFER_INPUT(bev); + struct fcgi_header hdr; struct fcgi_end_req_body end; struct client *c; struct mbuf *mbuf; size_t len; - if (fcgi_read_header(sock, &h) == -1) - goto err; +#if DEBUG_FCGI + if (debug_socket == -1) { + struct sockaddr_un addr; - c = try_client_by_id(recid(&h)); - if (c == NULL || c->fcgi != f->id) - goto err; + if ((debug_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) + err(1, "socket"); - len = reclen(&h); + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strlcpy(addr.sun_path, "./debug.sock", sizeof(addr.sun_path)); + if (connect(debug_socket, (struct sockaddr*)&addr, sizeof(addr)) + == -1) + err(1, "connect"); + } +#endif - switch (h.type) { - case FCGI_END_REQUEST: - if (len != sizeof(end)) - goto err; - if (must_read(sock, (char*)&end, sizeof(end)) == -1) - goto err; - /* TODO: do something with the status? */ + for (;;) { + if (EVBUFFER_LENGTH(src) < sizeof(hdr)) + return; - f->pending--; - c->fcgi = -1; - c->next = close_conn; - event_once(c->fd, EV_WRITE, ©_mbuf, c, NULL); - break; + memcpy(&hdr, EVBUFFER_DATA(src), sizeof(hdr)); - case FCGI_STDERR: - /* discard stderr (for now) */ - if (!consume(sock, len)) + c = try_client_by_id(recid(&hdr)); + if (c == NULL) { + log_err(NULL, + "got invalid client id from fcgi backend %d", + recid(&hdr)); goto err; - break; + } + + len = reclen(&hdr); - case FCGI_STDOUT: - if ((mbuf = calloc(1, sizeof(*mbuf) + len)) == NULL) - fatal("calloc"); - mbuf->len = len; - if (must_read(sock, mbuf->data, len) == -1) { - free(mbuf); - goto err; - } + if (EVBUFFER_LENGTH(src) < sizeof(hdr) + len + hdr.padding) + return; - if (TAILQ_EMPTY(&c->mbufhead)) { - TAILQ_INSERT_HEAD(&c->mbufhead, mbuf, mbufs); +#if DEBUG_FCGI + write(debug_soocket, EVBUFFER_DATA(src), + sizeof(hdr) + len + hdr.padding); +#endif + + evbuffer_drain(src, sizeof(hdr)); + + switch (hdr.type) { + case FCGI_END_REQUEST: + if (len != sizeof(end)) { + log_err(NULL, + "got invalid end request record size"); + goto err; + } + bufferevent_read(bev, &end, sizeof(end)); + + /* TODO: do something with the status? */ + fcgi->pending--; + c->fcgi = -1; + c->next = close_conn; event_once(c->fd, EV_WRITE, ©_mbuf, c, NULL); - } else + break; + + case FCGI_STDERR: + /* discard stderr (for now) */ + evbuffer_drain(src, len); + break; + + case FCGI_STDOUT: + if ((mbuf = calloc(1, sizeof(*mbuf) + len)) == NULL) + fatal("calloc"); + mbuf->len = len; + bufferevent_read(bev, mbuf->data, len); + + if (TAILQ_EMPTY(&c->mbufhead)) + event_once(c->fd, EV_WRITE, ©_mbuf, + c, NULL); + TAILQ_INSERT_TAIL(&c->mbufhead, mbuf, mbufs); - break; + break; - default: - log_err(NULL, "got invalid fcgi record (type=%d)", h.type); - goto err; - } + default: + log_err(NULL, "got invalid fcgi record (type=%d)", + hdr.type); + goto err; + } - if (!consume(sock, h.padding)) - goto err; + evbuffer_drain(src, hdr.padding); - if (f->pending == 0 && shutting_down) - fcgi_close_backend(f); + if (fcgi->pending == 0 && shutting_down) { + fcgi_error(bev, EVBUFFER_EOF, fcgi); + return; + } + } +err: + fcgi_error(bev, EVBUFFER_ERROR, fcgi); +} + +void +fcgi_write(struct bufferevent *bev, void *d) +{ + /* + * There's no much use for the write callback. + */ return; +} -err: - close_all(f); +void +fcgi_error(struct bufferevent *bev, short err, void *d) +{ + struct fcgi *fcgi = d; + struct client *c; + size_t i; + + if (!(err & EVBUFFER_ERROR) || + !(err & EVBUFFER_EOF)) + log_warn(NULL, "unknown event error (%x)", + err); + + for (i = 0; i < MAX_USERS; ++i) { + c = &clients[i]; + + if (c->fcgi != fcgi->id) + continue; + + if (c->code != 0) + close_conn(0, 0, 0); + else + start_reply(c, CGI_ERROR, "CGI error"); + } + + fcgi_close_backend(fcgi); } void -send_fcgi_req(struct fcgi *f, struct client *c) +fcgi_req(struct fcgi *f, struct client *c) { char addr[NI_MAXHOST], buf[22]; int e; @@ -486,50 +468,50 @@ send_fcgi_req(struct fcgi *f, struct client *c) c->next = NULL; - fcgi_begin_request(f->fd, c->id); - fcgi_send_param(f->fd, c->id, "GATEWAY_INTERFACE", "CGI/1.1"); - fcgi_send_param(f->fd, c->id, "GEMINI_URL_PATH", c->iri.path); - fcgi_send_param(f->fd, c->id, "QUERY_STRING", c->iri.query); - fcgi_send_param(f->fd, c->id, "REMOTE_ADDR", addr); - fcgi_send_param(f->fd, c->id, "REMOTE_HOST", addr); - fcgi_send_param(f->fd, c->id, "REQUEST_METHOD", ""); - fcgi_send_param(f->fd, c->id, "SERVER_NAME", c->iri.host); - fcgi_send_param(f->fd, c->id, "SERVER_PROTOCOL", "GEMINI"); - fcgi_send_param(f->fd, c->id, "SERVER_SOFTWARE", GMID_VERSION); + fcgi_begin_request(f->bev, c->id); + fcgi_send_param(f->bev, c->id, "GATEWAY_INTERFACE", "CGI/1.1"); + fcgi_send_param(f->bev, c->id, "GEMINI_URL_PATH", c->iri.path); + fcgi_send_param(f->bev, c->id, "QUERY_STRING", c->iri.query); + fcgi_send_param(f->bev, c->id, "REMOTE_ADDR", addr); + fcgi_send_param(f->bev, c->id, "REMOTE_HOST", addr); + fcgi_send_param(f->bev, c->id, "REQUEST_METHOD", ""); + fcgi_send_param(f->bev, c->id, "SERVER_NAME", c->iri.host); + fcgi_send_param(f->bev, c->id, "SERVER_PROTOCOL", "GEMINI"); + fcgi_send_param(f->bev, c->id, "SERVER_SOFTWARE", GMID_VERSION); if (tls_peer_cert_provided(c->ctx)) { - fcgi_send_param(f->fd, c->id, "AUTH_TYPE", "CERTIFICATE"); - fcgi_send_param(f->fd, c->id, "REMOTE_USER", + fcgi_send_param(f->bev, c->id, "AUTH_TYPE", "CERTIFICATE"); + fcgi_send_param(f->bev, c->id, "REMOTE_USER", tls_peer_cert_subject(c->ctx)); - fcgi_send_param(f->fd, c->id, "TLS_CLIENT_ISSUER", + fcgi_send_param(f->bev, c->id, "TLS_CLIENT_ISSUER", tls_peer_cert_issuer(c->ctx)); - fcgi_send_param(f->fd, c->id, "TLS_CLIENT_HASH", + fcgi_send_param(f->bev, c->id, "TLS_CLIENT_HASH", tls_peer_cert_hash(c->ctx)); - fcgi_send_param(f->fd, c->id, "TLS_VERSION", + fcgi_send_param(f->bev, c->id, "TLS_VERSION", tls_conn_version(c->ctx)); - fcgi_send_param(f->fd, c->id, "TLS_CIPHER", + fcgi_send_param(f->bev, c->id, "TLS_CIPHER", tls_conn_cipher(c->ctx)); snprintf(buf, sizeof(buf), "%d", tls_conn_cipher_strength(c->ctx)); - fcgi_send_param(f->fd, c->id, "TLS_CIPHER_STRENGTH", buf); + fcgi_send_param(f->bev, c->id, "TLS_CIPHER_STRENGTH", buf); tim = tls_peer_cert_notbefore(c->ctx); strftime(buf, sizeof(buf), "%FT%TZ", gmtime_r(&tim, &tminfo)); - fcgi_send_param(f->fd, c->id, "TLS_CLIENT_NOT_BEFORE", buf); + fcgi_send_param(f->bev, c->id, "TLS_CLIENT_NOT_BEFORE", buf); tim = tls_peer_cert_notafter(c->ctx); strftime(buf, sizeof(buf), "%FT%TZ", gmtime_r(&tim, &tminfo)); - fcgi_send_param(f->fd, c->id, "TLS_CLIENT_NOT_AFTER", buf); + fcgi_send_param(f->bev, c->id, "TLS_CLIENT_NOT_AFTER", buf); TAILQ_FOREACH(p, &c->host->params, envs) { - fcgi_send_param(f->fd, c->id, p->name, p->value); + fcgi_send_param(f->bev, c->id, p->name, p->value); } } else - fcgi_send_param(f->fd, c->id, "AUTH_TYPE", ""); + fcgi_send_param(f->bev, c->id, "AUTH_TYPE", ""); - if (fcgi_end_param(f->fd, c->id) == -1) - close_all(f); + if (fcgi_end_param(f->bev, c->id) == -1) + fcgi_error(f->bev, EVBUFFER_ERROR, f); } blob - 34da70cdbb43806d2837ebecb321afaf6422a591 blob + d0b17bd334af3970950d093bbd0be97d7286b221 --- gmid.h +++ gmid.h @@ -68,7 +68,8 @@ struct fcgi { char *port; char *prog; int fd; - struct event e; + + struct bufferevent *bev; /* number of pending clients */ int pending; @@ -382,8 +383,10 @@ int executor_main(struct imsgbuf*); /* fcgi.c */ void fcgi_close_backend(struct fcgi *); -void handle_fcgi(int, short, void*); -void send_fcgi_req(struct fcgi*, struct client*); +void fcgi_read(struct bufferevent *, void *); +void fcgi_write(struct bufferevent *, void *); +void fcgi_error(struct bufferevent *, short, void *); +void fcgi_req(struct fcgi *, struct client *); /* sandbox.c */ void sandbox_server_process(void); blob - d4b2d11710ee95cc58f4a26b30310b6ebf3dd09b blob + 605ec6c139731b617f49551fc100762e1fdb24a7 --- server.c +++ server.c @@ -615,7 +615,7 @@ apply_fastcgi(struct client *c) break; case FCGI_READY: c->fcgi = id; - send_fcgi_req(f, c); + fcgi_req(f, c); break; } @@ -1229,13 +1229,23 @@ handle_imsg_fcgi_fd(struct imsgbuf *ibuf, struct imsg id = imsg->hdr.peerid; f = &fcgi[id]; - if ((f->fd = imsg->fd) != -1) { - f->s = FCGI_READY; - event_set(&f->e, imsg->fd, EV_READ | EV_PERSIST, &handle_fcgi, - &fcgi[id]); - event_add(&f->e, NULL); - } else { + if ((f->fd = imsg->fd) == -1) f->s = FCGI_OFF; + else { + mark_nonblock(f->fd); + + f->s = FCGI_READY; + + f->bev = bufferevent_new(f->fd, fcgi_read, fcgi_write, + fcgi_error, f); + if (f->bev == NULL) { + close(f->fd); + log_err(NULL, "%s: failed to allocate client buffer", + __func__); + f->s = FCGI_OFF; + } + + bufferevent_enable(f->bev, EV_READ|EV_WRITE); } for (i = 0; i < MAX_USERS; ++i) { @@ -1245,11 +1255,11 @@ handle_imsg_fcgi_fd(struct imsgbuf *ibuf, struct imsg if (c->fcgi != id) continue; - if (f->fd == -1) { + if (f->s == FCGI_OFF) { c->fcgi = -1; start_reply(c, TEMP_FAILURE, "internal server error"); } else - send_fcgi_req(f, c); + fcgi_req(f, c); } } @@ -1280,8 +1290,7 @@ handle_imsg_quit(struct imsgbuf *ibuf, struct imsg *im if (fcgi[i].path == NULL && fcgi[i].prog == NULL) break; - if (!event_pending(&fcgi[i].e, EV_READ, NULL) || - fcgi[i].pending != 0) + if (fcgi[i].bev == NULL || fcgi[i].pending != 0) continue; fcgi_close_backend(&fcgi[i]);