1 #include "threadimpl.h"
3 static Lock chanlock; /* central channel access lock */
5 static void enqueue(Alt*, Channel**);
6 static void dequeue(Alt*);
7 static int altexec(Alt*, int);
17 t = _threadgetproc()->thread;
29 /* are there senders or receivers blocked? */
30 otherop = (CHANSND+CHANRCV) - a->op;
31 for(i=0; i<c->nentry; i++)
32 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
33 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
37 /* is there room in the channel? */
38 if((a->op==CHANSND && c->n < c->s)
39 || (a->op==CHANRCV && c->n > 0)){
40 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
53 for(i = 0; i < c->nentry; i++)
74 chaninit(Channel *c, int elemsize, int elemcnt)
76 if(elemcnt < 0 || elemsize <= 0 || c == nil)
78 memset(c, 0, sizeof *c);
81 _threaddebug(DBGCHAN, "chaninit %p", c);
86 chancreate(int elemsize, int elemcnt)
90 if(elemcnt < 0 || elemsize <= 0)
92 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
95 _threaddebug(DBGCHAN, "chancreate %p", c);
109 * The point of going splhi here is that note handlers
110 * might reasonably want to use channel operations,
111 * but that will hang if the note comes while we hold the
112 * chanlock. Instead, we delay the note until we've dropped
117 * T might be nil here -- the scheduler sends on threadwaitchan
118 * directly (in non-blocking mode, of course!).
120 t = _threadgetproc()->thread;
121 if((t && t->moribund) || _threadexitsallstatus)
122 yield(); /* won't return */
126 /* test whether any channels can proceed */
130 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
132 if(xa->op == CHANNOP)
147 /* nothing can proceed */
148 if(xa->op == CHANNOBLK){
155 /* enqueue on all channels. */
157 for(xa=alts; xa->op!=CHANEND; xa++){
160 enqueue(xa, (Channel**)&c);
164 * wait for successful rendezvous.
165 * we can't just give up if the rendezvous
166 * is interrupted -- someone else might come
167 * along and try to rendezvous with us, so
168 * we need to be here.
176 r = _threadrendezvous((ulong)&c, 0);
180 if(r==~0){ /* interrupted */
181 if(c!=nil) /* someone will meet us; go back */
183 c = (Channel*)~0; /* so no one tries to meet us */
186 /* dequeue from channels, find selected one */
188 for(xa=alts; xa->op!=CHANEND; xa++){
197 if(a == nil){ /* we were interrupted */
198 assert(c==(Channel*)~0);
202 altexec(a, s); /* unlocks chanlock, does splx */
212 setuserpc(getcallerpc(&alts));
217 runop(int op, Channel *c, void *v, int nb)
223 * we could do this without calling alt,
224 * but the only reason would be performance,
225 * and i'm not convinced it matters.
234 case -1: /* interrupted */
236 case 1: /* nonblocking, didn't accomplish anything */
242 fprint(2, "ERROR: channel alt returned %d\n", r);
249 recv(Channel *c, void *v)
251 setuserpc(getcallerpc(&c));
252 return runop(CHANRCV, c, v, 0);
256 nbrecv(Channel *c, void *v)
258 setuserpc(getcallerpc(&c));
259 return runop(CHANRCV, c, v, 1);
263 send(Channel *c, void *v)
265 setuserpc(getcallerpc(&c));
266 return runop(CHANSND, c, v, 0);
270 nbsend(Channel *c, void *v)
272 setuserpc(getcallerpc(&c));
273 return runop(CHANSND, c, v, 1);
277 channelsize(Channel *c, int sz)
280 fprint(2, "expected channel with elements of size %d, got size %d\n",
287 sendul(Channel *c, ulong v)
289 setuserpc(getcallerpc(&c));
290 channelsize(c, sizeof(ulong));
299 setuserpc(getcallerpc(&c));
300 channelsize(c, sizeof(ulong));
301 if(runop(CHANRCV, c, &v, 0) < 0)
307 sendp(Channel *c, void *v)
309 setuserpc(getcallerpc(&c));
310 channelsize(c, sizeof(void*));
311 return runop(CHANSND, c, &v, 0);
319 setuserpc(getcallerpc(&c));
320 channelsize(c, sizeof(void*));
321 if(runop(CHANRCV, c, &v, 0) < 0)
327 nbsendul(Channel *c, ulong v)
329 setuserpc(getcallerpc(&c));
330 channelsize(c, sizeof(ulong));
331 return runop(CHANSND, c, &v, 1);
339 setuserpc(getcallerpc(&c));
340 channelsize(c, sizeof(ulong));
341 if(runop(CHANRCV, c, &v, 1) == 0)
347 nbsendp(Channel *c, void *v)
349 setuserpc(getcallerpc(&c));
350 channelsize(c, sizeof(void*));
351 return runop(CHANSND, c, &v, 1);
359 setuserpc(getcallerpc(&c));
360 channelsize(c, sizeof(void*));
361 if(runop(CHANRCV, c, &v, 1) == 0)
367 emptyentry(Channel *c)
371 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
373 for(i=0; i<c->nentry; i++)
374 if(c->qentry[i]==nil)
379 if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry;
380 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
382 sysfatal("realloc channel entries: %r");
383 _threadmemset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
388 enqueue(Alt *a, Channel **c)
392 _threaddebug(DBGCHAN, "Queueing alt %p on channel %p", a, a->c);
394 i = emptyentry(a->c);
405 for(i=0; i<c->nentry; i++)
407 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
416 altexecbuffered(Alt *a, int willreplace)
422 /* use buffered channel queue */
423 if(a->op==CHANRCV && c->n > 0){
424 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
425 v = c->v + c->e*(c->f%c->s);
431 if(a->op==CHANSND && c->n < c->s){
432 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
433 v = c->v + c->e*((c->f+c->n)%c->s);
443 altcopy(void *dst, void *src, int sz)
447 memmove(dst, src, sz);
449 _threadmemset(dst, 0, sz);
454 altexec(Alt *a, int spl)
459 void *me, *waiter, *buf;
463 /* rendezvous with others */
464 otherop = (CHANSND+CHANRCV) - a->op;
468 for(i=0; i<c->nentry; i++)
469 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
473 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
477 * if buffer is full and there are waiters
478 * and we're meeting a waiter,
479 * we must be receiving.
481 * we use the value in the channel buffer,
482 * copy the waiter's value into the channel buffer
483 * on behalf of the waiter, and then wake the waiter.
487 buf = altexecbuffered(a, 1);
488 altcopy(me, buf, c->e);
489 altcopy(buf, waiter, c->e);
492 altcopy(me, waiter, c->e);
494 altcopy(waiter, me, c->e);
496 *b->tag = c; /* commits us to rendezvous */
497 _threaddebug(DBGCHAN, "unlocking the chanlock");
500 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
501 while(_threadrendezvous((ulong)b->tag, 0) == ~0)
506 buf = altexecbuffered(a, 0);
508 altcopy(me, buf, c->e);
510 altcopy(buf, me, c->e);