Blob


1 #include "threadimpl.h"
3 static Lock chanlock; /* central channel access lock */
5 static void enqueue(Alt*, Channel**);
6 static void dequeue(Alt*);
7 static int altexec(Alt*, int);
9 int _threadhighnentry;
10 int _threadnalt;
12 static void
13 setuserpc(ulong pc)
14 {
15 Thread *t;
17 t = _threadgetproc()->thread;
18 if(t)
19 t->userpc = pc;
20 }
22 static int
23 canexec(Alt *a)
24 {
25 int i, otherop;
26 Channel *c;
28 c = a->c;
29 /* are there senders or receivers blocked? */
30 otherop = (CHANSND+CHANRCV) - a->op;
31 for(i=0; i<c->nentry; i++)
32 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
33 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
34 return 1;
35 }
37 /* is there room in the channel? */
38 if((a->op==CHANSND && c->n < c->s)
39 || (a->op==CHANRCV && c->n > 0)){
40 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
41 return 1;
42 }
44 return 0;
45 }
47 static void
48 _chanfree(Channel *c)
49 {
50 int i, inuse;
52 inuse = 0;
53 for(i = 0; i < c->nentry; i++)
54 if(c->qentry[i])
55 inuse = 1;
56 if(inuse)
57 c->freed = 1;
58 else{
59 if(c->qentry)
60 free(c->qentry);
61 free(c);
62 }
63 }
65 void
66 chanfree(Channel *c)
67 {
68 lock(&chanlock);
69 _chanfree(c);
70 unlock(&chanlock);
71 }
73 int
74 chaninit(Channel *c, int elemsize, int elemcnt)
75 {
76 if(elemcnt < 0 || elemsize <= 0 || c == nil)
77 return -1;
78 memset(c, 0, sizeof *c);
79 c->e = elemsize;
80 c->s = elemcnt;
81 _threaddebug(DBGCHAN, "chaninit %p", c);
82 return 1;
83 }
85 Channel*
86 chancreate(int elemsize, int elemcnt)
87 {
88 Channel *c;
90 if(elemcnt < 0 || elemsize <= 0)
91 return nil;
92 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
93 c->e = elemsize;
94 c->s = elemcnt;
95 _threaddebug(DBGCHAN, "chancreate %p", c);
96 return c;
97 }
99 static int
100 _alt(Alt *alts)
102 Alt *a, *xa;
103 Channel *volatile c;
104 int n, s;
105 ulong r;
106 Thread *t;
108 /*
109 * The point of going splhi here is that note handlers
110 * might reasonably want to use channel operations,
111 * but that will hang if the note comes while we hold the
112 * chanlock. Instead, we delay the note until we've dropped
113 * the lock.
114 */
116 /*
117 * T might be nil here -- the scheduler sends on threadwaitchan
118 * directly (in non-blocking mode, of course!).
119 */
120 t = _threadgetproc()->thread;
121 if((t && t->moribund) || _threadexitsallstatus)
122 yield(); /* won't return */
123 s = _procsplhi();
124 lock(&chanlock);
126 /* test whether any channels can proceed */
127 n = 0;
128 a = nil;
130 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
131 xa->entryno = -1;
132 if(xa->op == CHANNOP)
133 continue;
135 c = xa->c;
136 if(c==nil){
137 unlock(&chanlock);
138 _procsplx(s);
139 return -1;
141 if(canexec(xa))
142 if(nrand(++n) == 0)
143 a = xa;
146 if(a==nil){
147 /* nothing can proceed */
148 if(xa->op == CHANNOBLK){
149 unlock(&chanlock);
150 _procsplx(s);
151 _threadnalt++;
152 return xa - alts;
155 /* enqueue on all channels. */
156 c = nil;
157 for(xa=alts; xa->op!=CHANEND; xa++){
158 if(xa->op==CHANNOP)
159 continue;
160 enqueue(xa, (Channel**)&c);
163 /*
164 * wait for successful rendezvous.
165 * we can't just give up if the rendezvous
166 * is interrupted -- someone else might come
167 * along and try to rendezvous with us, so
168 * we need to be here.
169 */
170 Again:
171 t->alt = alts;
172 t->chan = Chanalt;
174 unlock(&chanlock);
175 _procsplx(s);
176 r = _threadrendezvous((ulong)&c, 0);
177 s = _procsplhi();
178 lock(&chanlock);
180 if(r==~0){ /* interrupted */
181 if(c!=nil) /* someone will meet us; go back */
182 goto Again;
183 c = (Channel*)~0; /* so no one tries to meet us */
186 /* dequeue from channels, find selected one */
187 a = nil;
188 for(xa=alts; xa->op!=CHANEND; xa++){
189 if(xa->op==CHANNOP)
190 continue;
191 if(xa->c == c)
192 a = xa;
193 dequeue(xa);
195 unlock(&chanlock);
196 _procsplx(s);
197 if(a == nil){ /* we were interrupted */
198 assert(c==(Channel*)~0);
199 return -1;
201 }else{
202 altexec(a, s); /* unlocks chanlock, does splx */
204 if(t)
205 t->chan = Channone;
206 return a - alts;
209 int
210 alt(Alt *alts)
212 setuserpc(getcallerpc(&alts));
213 return _alt(alts);
216 static int
217 runop(int op, Channel *c, void *v, int nb)
219 int r;
220 Alt a[2];
222 /*
223 * we could do this without calling alt,
224 * but the only reason would be performance,
225 * and i'm not convinced it matters.
226 */
227 a[0].op = op;
228 a[0].c = c;
229 a[0].v = v;
230 a[1].op = CHANEND;
231 if(nb)
232 a[1].op = CHANNOBLK;
233 switch(r=_alt(a)){
234 case -1: /* interrupted */
235 return -1;
236 case 1: /* nonblocking, didn't accomplish anything */
237 assert(nb);
238 return 0;
239 case 0:
240 return 1;
241 default:
242 fprint(2, "ERROR: channel alt returned %d\n", r);
243 abort();
244 return -1;
248 int
249 recv(Channel *c, void *v)
251 setuserpc(getcallerpc(&c));
252 return runop(CHANRCV, c, v, 0);
255 int
256 nbrecv(Channel *c, void *v)
258 setuserpc(getcallerpc(&c));
259 return runop(CHANRCV, c, v, 1);
262 int
263 send(Channel *c, void *v)
265 setuserpc(getcallerpc(&c));
266 return runop(CHANSND, c, v, 0);
269 int
270 nbsend(Channel *c, void *v)
272 setuserpc(getcallerpc(&c));
273 return runop(CHANSND, c, v, 1);
276 static void
277 channelsize(Channel *c, int sz)
279 if(c->e != sz){
280 fprint(2, "expected channel with elements of size %d, got size %d\n",
281 sz, c->e);
282 abort();
286 int
287 sendul(Channel *c, ulong v)
289 setuserpc(getcallerpc(&c));
290 channelsize(c, sizeof(ulong));
291 return send(c, &v);
294 ulong
295 recvul(Channel *c)
297 ulong v;
299 setuserpc(getcallerpc(&c));
300 channelsize(c, sizeof(ulong));
301 if(runop(CHANRCV, c, &v, 0) < 0)
302 return ~0;
303 return v;
306 int
307 sendp(Channel *c, void *v)
309 setuserpc(getcallerpc(&c));
310 channelsize(c, sizeof(void*));
311 return runop(CHANSND, c, &v, 0);
314 void*
315 recvp(Channel *c)
317 void *v;
319 setuserpc(getcallerpc(&c));
320 channelsize(c, sizeof(void*));
321 if(runop(CHANRCV, c, &v, 0) < 0)
322 return nil;
323 return v;
326 int
327 nbsendul(Channel *c, ulong v)
329 setuserpc(getcallerpc(&c));
330 channelsize(c, sizeof(ulong));
331 return runop(CHANSND, c, &v, 1);
334 ulong
335 nbrecvul(Channel *c)
337 ulong v;
339 setuserpc(getcallerpc(&c));
340 channelsize(c, sizeof(ulong));
341 if(runop(CHANRCV, c, &v, 1) == 0)
342 return 0;
343 return v;
346 int
347 nbsendp(Channel *c, void *v)
349 setuserpc(getcallerpc(&c));
350 channelsize(c, sizeof(void*));
351 return runop(CHANSND, c, &v, 1);
354 void*
355 nbrecvp(Channel *c)
357 void *v;
359 setuserpc(getcallerpc(&c));
360 channelsize(c, sizeof(void*));
361 if(runop(CHANRCV, c, &v, 1) == 0)
362 return nil;
363 return v;
366 static int
367 emptyentry(Channel *c)
369 int i, extra;
371 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
373 for(i=0; i<c->nentry; i++)
374 if(c->qentry[i]==nil)
375 return i;
377 extra = 16;
378 c->nentry += extra;
379 if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry;
380 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
381 if(c->qentry == nil)
382 sysfatal("realloc channel entries: %r");
383 _threadmemset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
384 return i;
387 static void
388 enqueue(Alt *a, Channel **c)
390 int i;
392 _threaddebug(DBGCHAN, "Queueing alt %p on channel %p", a, a->c);
393 a->tag = c;
394 i = emptyentry(a->c);
395 a->c->qentry[i] = a;
398 static void
399 dequeue(Alt *a)
401 int i;
402 Channel *c;
404 c = a->c;
405 for(i=0; i<c->nentry; i++)
406 if(c->qentry[i]==a){
407 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
408 c->qentry[i] = nil;
409 if(c->freed)
410 _chanfree(c);
411 return;
415 static void*
416 altexecbuffered(Alt *a, int willreplace)
418 uchar *v;
419 Channel *c;
421 c = a->c;
422 /* use buffered channel queue */
423 if(a->op==CHANRCV && c->n > 0){
424 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
425 v = c->v + c->e*(c->f%c->s);
426 if(!willreplace)
427 c->n--;
428 c->f++;
429 return v;
431 if(a->op==CHANSND && c->n < c->s){
432 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
433 v = c->v + c->e*((c->f+c->n)%c->s);
434 if(!willreplace)
435 c->n++;
436 return v;
438 abort();
439 return nil;
442 static void
443 altcopy(void *dst, void *src, int sz)
445 if(dst){
446 if(src)
447 memmove(dst, src, sz);
448 else
449 _threadmemset(dst, 0, sz);
453 static int
454 altexec(Alt *a, int spl)
456 volatile Alt *b;
457 int i, n, otherop;
458 Channel *c;
459 void *me, *waiter, *buf;
461 c = a->c;
463 /* rendezvous with others */
464 otherop = (CHANSND+CHANRCV) - a->op;
465 n = 0;
466 b = nil;
467 me = a->v;
468 for(i=0; i<c->nentry; i++)
469 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
470 if(nrand(++n) == 0)
471 b = c->qentry[i];
472 if(b != nil){
473 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
474 waiter = b->v;
475 if(c->s && c->n){
476 /*
477 * if buffer is full and there are waiters
478 * and we're meeting a waiter,
479 * we must be receiving.
481 * we use the value in the channel buffer,
482 * copy the waiter's value into the channel buffer
483 * on behalf of the waiter, and then wake the waiter.
484 */
485 if(a->op!=CHANRCV)
486 abort();
487 buf = altexecbuffered(a, 1);
488 altcopy(me, buf, c->e);
489 altcopy(buf, waiter, c->e);
490 }else{
491 if(a->op==CHANRCV)
492 altcopy(me, waiter, c->e);
493 else
494 altcopy(waiter, me, c->e);
496 *b->tag = c; /* commits us to rendezvous */
497 _threaddebug(DBGCHAN, "unlocking the chanlock");
498 unlock(&chanlock);
499 _procsplx(spl);
500 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
501 while(_threadrendezvous((ulong)b->tag, 0) == ~0)
503 return 1;
506 buf = altexecbuffered(a, 0);
507 if(a->op==CHANRCV)
508 altcopy(me, buf, c->e);
509 else
510 altcopy(buf, me, c->e);
512 unlock(&chanlock);
513 _procsplx(spl);
514 return 1;