commit e9feffee9feb21519f324d13b201c31acf79b31d from: Omar Polo date: Tue Dec 19 10:26:21 2023 UTC ev: use two queues avoids issues with adding/removing elements from the 'working' queue. ev::add and ev::del only touch the "wip" queue, while poll only touches the "working" queue. This technically has the consequences that if from event A we remove event B which is pending and not yet processed, we may still fire B once. commit - d4bd39d8927c67a76bb2e9d330d4ae51e3d3a6b9 commit + e9feffee9feb21519f324d13b201c31acf79b31d blob - b7c351794b0f73b64535f1547e9151c47bffa02f blob + 930e60252b18395030ddb21f7822694e4255242d --- ev/ev.ha +++ ev/ev.ha @@ -45,9 +45,14 @@ type cb = struct { data: any, }; -type base = struct { +type queue = struct { pfds: []poll::pollfd, cbs: []cb, +}; + +type base = struct { + working: queue, + wip: queue, sigpipe: ((io::file, io::file) | void), sigcb: (cb | void), stop: bool, // signal to stop @@ -73,40 +78,58 @@ fn poll2ev(ev: i16) event = { }; export fn add(fd: io::file, ev: event, f: *fn(io::file, event, any) void, data: any) void = { - append(b.pfds, poll::pollfd { + append(b.wip.pfds, poll::pollfd { fd = fd, events = ev2poll(ev), ... }); - append(b.cbs, cb { + append(b.wip.cbs, cb { cb = f, data = data, }); }; export fn del(fd: io::file) void = { - for (let i = 0z; i < len(b.pfds); i += 1) { - if (b.pfds[i].fd != fd) { + for (let i = 0z; i < len(b.wip.pfds); i += 1) { + if (b.wip.pfds[i].fd != fd) { continue; }; - delete(b.pfds[i]); - delete(b.cbs[i]); + delete(b.wip.pfds[i]); + delete(b.wip.cbs[i]); return; }; }; +fn prepare_queue() void = { + let wip = b.wip; + + if (len(b.working.pfds) > 0) { + delete(b.working.pfds[..]); + delete(b.working.cbs[..]); + }; + + for (let i = 0z; i < len(wip.pfds); i += 1) { + append(b.working.pfds, wip.pfds[i]); + append(b.working.cbs, wip.cbs[i]); + }; +}; + export fn loop() (void | poll::error) = { const mask = poll::event::POLLIN | poll::event::POLLOUT | poll::event::POLLHUP; for (!b.stop) { - let n = poll::poll(b.pfds, poll::INDEF)?; - for (let i = 0z; i < len(b.pfds); i += 1) { - if ((b.pfds[i].revents & mask) == 0) { + prepare_queue(); + + let q = b.working; + + let n = poll::poll(q.pfds, poll::INDEF)?; + for (let i = 0z; i < len(q.pfds); i += 1) { + if ((q.pfds[i].revents & mask) == 0) { continue; }; - b.cbs[i].cb(b.pfds[i].fd, poll2ev(b.pfds[i].revents), - b.cbs[i].data); + q.cbs[i].cb(q.pfds[i].fd, poll2ev(q.pfds[i].revents), + q.cbs[i].data); }; }; };