Blob


1 #define NOPLAN9DEFINES
2 #include <u.h>
3 #include <libc.h>
4 #include <thread.h>
5 #include "threadimpl.h"
6 #include <errno.h>
7 #include <unistd.h>
8 #include <fcntl.h>
10 #define debugpoll 0
12 #ifdef __APPLE__
13 #include <sys/time.h>
14 enum { POLLIN=1, POLLOUT=2, POLLERR=4 };
15 struct pollfd
16 {
17 int fd;
18 int events;
19 int revents;
20 };
22 int
23 poll(struct pollfd *p, int np, int ms)
24 {
25 int i, maxfd, n;
26 struct timeval tv, *tvp;
27 fd_set rfd, wfd, efd;
29 maxfd = -1;
30 FD_ZERO(&rfd);
31 FD_ZERO(&wfd);
32 FD_ZERO(&efd);
33 for(i=0; i<np; i++){
34 p[i].revents = 0;
35 if(p[i].fd == -1)
36 continue;
37 if(p[i].fd > maxfd)
38 maxfd = p[i].fd;
39 if(p[i].events & POLLIN)
40 FD_SET(p[i].fd, &rfd);
41 if(p[i].events & POLLOUT)
42 FD_SET(p[i].fd, &wfd);
43 FD_SET(p[i].fd, &efd);
44 }
46 if(ms != -1){
47 tv.tv_usec = (ms%1000)*1000;
48 tv.tv_sec = ms/1000;
49 tvp = &tv;
50 }else
51 tvp = nil;
53 if(debugpoll){
54 fprint(2, "select %d:", maxfd+1);
55 for(i=0; i<=maxfd; i++){
56 if(FD_ISSET(i, &rfd))
57 fprint(2, " r%d", i);
58 if(FD_ISSET(i, &wfd))
59 fprint(2, " w%d", i);
60 if(FD_ISSET(i, &efd))
61 fprint(2, " e%d", i);
62 }
63 fprint(2, "; tp=%p, t=%d.%d\n", tvp, tv.tv_sec, tv.tv_usec);
64 }
66 n = select(maxfd+1, &rfd, &wfd, &efd, tvp);
68 if(n <= 0)
69 return n;
71 for(i=0; i<np; i++){
72 if(p[i].fd == -1)
73 continue;
74 if(FD_ISSET(p[i].fd, &rfd))
75 p[i].revents |= POLLIN;
76 if(FD_ISSET(p[i].fd, &wfd))
77 p[i].revents |= POLLOUT;
78 if(FD_ISSET(p[i].fd, &efd))
79 p[i].revents |= POLLERR;
80 }
81 return n;
82 }
84 #else
85 #include <poll.h>
86 #endif
88 /*
89 * Poll file descriptors in an idle loop.
90 */
92 typedef struct Poll Poll;
94 struct Poll
95 {
96 Channel *c; /* for sending back */
97 };
99 static Channel *sleepchan[64];
100 static int sleeptime[64];
101 static int nsleep;
103 static struct pollfd pfd[64];
104 static struct Poll polls[64];
105 static int npoll;
107 static void
108 pollidle(void *v)
110 int i, n, t;
111 uint now;
113 for(;; yield()){
114 if(debugpoll) fprint(2, "poll %d:", npoll);
115 for(i=0; i<npoll; i++){
116 if(debugpoll) fprint(2, " %d%c", pfd[i].fd, pfd[i].events==POLLIN ? 'r' : 'w');
117 pfd[i].revents = 0;
119 t = -1;
120 now = p9nsec()/1000000;
121 for(i=0; i<nsleep; i++){
122 n = sleeptime[i] - now;
123 if(debugpoll) fprint(2, " s%d", n);
124 if(n < 0)
125 n = 0;
126 if(t == -1 || n < t)
127 t = n;
129 if(debugpoll) fprint(2, "; t=%d\n", t);
131 n = poll(pfd, npoll, t);
132 //fprint(2, "poll ret %d:", n);
133 now = p9nsec()/1000000;
134 for(i=0; i<nsleep; i++){
135 if((int)(sleeptime[i] - now) < 0){
136 nbsendul(sleepchan[i], 0);
137 nsleep--;
138 sleepchan[i] = sleepchan[nsleep];
139 sleeptime[i] = sleeptime[nsleep];
140 i--;
144 if(n <= 0)
145 continue;
146 for(i=0; i<npoll; i++)
147 if(pfd[i].fd != -1 && pfd[i].revents){
148 //fprint(2, " %d", pfd[i].fd);
149 pfd[i].fd = -1;
150 pfd[i].events = 0;
151 pfd[i].revents = 0;
152 nbsendul(polls[i].c, 1);
153 //fprint(2, " x%d", pfd[i].fd);
155 //fprint(2, "\n");
159 void
160 threadfdwaitsetup(void)
162 static int setup = 0;
164 if(!setup){
165 setup = 1;
166 threadcreateidle(pollidle, nil, 16384);
170 void
171 _threadfdwait(int fd, int rw, ulong pc)
173 int i;
175 struct {
176 Channel c;
177 ulong x;
178 Alt *qentry[2];
179 } s;
181 threadfdwaitsetup();
182 chaninit(&s.c, sizeof(ulong), 1);
183 s.c.qentry = (volatile Alt**)s.qentry;
184 s.c.nentry = 2;
185 memset(s.qentry, 0, sizeof s.qentry);
186 for(i=0; i<npoll; i++)
187 if(pfd[i].fd == -1)
188 break;
189 if(i==npoll){
190 if(npoll >= nelem(polls)){
191 fprint(2, "Too many polled fds.\n");
192 abort();
194 npoll++;
197 pfd[i].fd = fd;
198 pfd[i].events = rw=='r' ? POLLIN : POLLOUT;
199 polls[i].c = &s.c;
200 if(0) fprint(2, "%s [%3d] fdwait %d %c list *0x%lux\n",
201 argv0, threadid(), fd, rw, pc);
202 recvul(&s.c);
205 void
206 threadfdwait(int fd, int rw)
208 _threadfdwait(fd, rw, getcallerpc(&fd));
211 void
212 threadsleep(int ms)
214 struct {
215 Channel c;
216 ulong x;
217 Alt *qentry[2];
218 } s;
220 threadfdwaitsetup();
221 chaninit(&s.c, sizeof(ulong), 1);
222 s.c.qentry = (volatile Alt**)s.qentry;
223 s.c.nentry = 2;
224 memset(s.qentry, 0, sizeof s.qentry);
226 sleepchan[nsleep] = &s.c;
227 sleeptime[nsleep++] = p9nsec()/1000000+ms;
228 recvul(&s.c);
231 void
232 threadfdnoblock(int fd)
234 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0)|O_NONBLOCK);
237 long
238 threadread(int fd, void *a, long n)
240 int nn;
242 threadfdnoblock(fd);
243 again:
244 errno = 0;
245 nn = read(fd, a, n);
246 if(nn <= 0){
247 if(errno == EINTR)
248 goto again;
249 if(errno == EAGAIN || errno == EWOULDBLOCK){
250 _threadfdwait(fd, 'r', getcallerpc(&fd));
251 goto again;
254 return nn;
257 int
258 threadrecvfd(int fd)
260 int nn;
262 threadfdnoblock(fd);
263 again:
264 nn = recvfd(fd);
265 if(nn < 0){
266 if(errno == EINTR)
267 goto again;
268 if(errno == EAGAIN || errno == EWOULDBLOCK){
269 _threadfdwait(fd, 'r', getcallerpc(&fd));
270 goto again;
273 return nn;
276 int
277 threadsendfd(int fd, int sfd)
279 int nn;
281 threadfdnoblock(fd);
282 again:
283 nn = sendfd(fd, sfd);
284 if(nn < 0){
285 if(errno == EINTR)
286 goto again;
287 if(errno == EAGAIN || errno == EWOULDBLOCK){
288 _threadfdwait(fd, 'w', getcallerpc(&fd));
289 goto again;
292 return nn;
295 long
296 threadreadn(int fd, void *a, long n)
298 int tot, nn;
300 for(tot = 0; tot<n; tot+=nn){
301 nn = threadread(fd, (char*)a+tot, n-tot);
302 if(nn <= 0){
303 if(tot == 0)
304 return nn;
305 return tot;
308 return tot;
311 long
312 _threadwrite(int fd, const void *a, long n)
314 int nn;
316 threadfdnoblock(fd);
317 again:
318 nn = write(fd, a, n);
319 if(nn < 0){
320 if(errno == EINTR)
321 goto again;
322 if(errno == EAGAIN || errno == EWOULDBLOCK){
323 _threadfdwait(fd, 'w', getcallerpc(&fd));
324 goto again;
327 return nn;
330 long
331 threadwrite(int fd, const void *a, long n)
333 int tot, nn;
335 for(tot = 0; tot<n; tot+=nn){
336 nn = _threadwrite(fd, (char*)a+tot, n-tot);
337 if(nn <= 0){
338 if(tot == 0)
339 return nn;
340 return tot;
343 return tot;