1 #include "threadimpl.h"
3 static Lock chanlock; /* central channel access lock */
5 static void enqueue(Alt*, Channel**);
6 static void dequeue(Alt*);
7 static int altexec(Alt*, int);
19 /* are there senders or receivers blocked? */
20 otherop = (CHANSND+CHANRCV) - a->op;
21 for(i=0; i<c->nentry; i++)
22 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
23 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
27 /* is there room in the channel? */
28 if((a->op==CHANSND && c->n < c->s)
29 || (a->op==CHANRCV && c->n > 0)){
30 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
43 for(i = 0; i < c->nentry; i++)
64 chaninit(Channel *c, int elemsize, int elemcnt)
66 if(elemcnt < 0 || elemsize <= 0 || c == nil)
73 _threaddebug(DBGCHAN, "chaninit %p", c);
78 chancreate(int elemsize, int elemcnt)
82 if(elemcnt < 0 || elemsize <= 0)
84 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
87 _threaddebug(DBGCHAN, "chancreate %p", c);
101 * The point of going splhi here is that note handlers
102 * might reasonably want to use channel operations,
103 * but that will hang if the note comes while we hold the
104 * chanlock. Instead, we delay the note until we've dropped
107 t = _threadgetproc()->thread;
108 if(t->moribund || _threadexitsallstatus)
109 yield(); /* won't return */
115 /* test whether any channels can proceed */
119 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
121 if(xa->op == CHANNOP)
137 /* nothing can proceed */
138 if(xa->op == CHANNOBLK){
146 /* enqueue on all channels. */
148 for(xa=alts; xa->op!=CHANEND; xa++){
151 enqueue(xa, (Channel**)&c);
155 * wait for successful rendezvous.
156 * we can't just give up if the rendezvous
157 * is interrupted -- someone else might come
158 * along and try to rendezvous with us, so
159 * we need to be here.
164 r = _threadrendezvous((ulong)&c, 0);
168 if(r==~0){ /* interrupted */
169 if(c!=nil) /* someone will meet us; go back */
171 c = (Channel*)~0; /* so no one tries to meet us */
174 /* dequeue from channels, find selected one */
176 for(xa=alts; xa->op!=CHANEND; xa++){
185 if(a == nil){ /* we were interrupted */
186 assert(c==(Channel*)~0);
190 altexec(a, s); /* unlocks chanlock, does splx */
199 runop(int op, Channel *c, void *v, int nb)
205 * we could do this without calling alt,
206 * but the only reason would be performance,
207 * and i'm not convinced it matters.
216 case -1: /* interrupted */
218 case 1: /* nonblocking, didn't accomplish anything */
224 fprint(2, "ERROR: channel alt returned %d\n", r);
231 recv(Channel *c, void *v)
233 return runop(CHANRCV, c, v, 0);
237 nbrecv(Channel *c, void *v)
239 return runop(CHANRCV, c, v, 1);
243 send(Channel *c, void *v)
245 return runop(CHANSND, c, v, 0);
249 nbsend(Channel *c, void *v)
251 return runop(CHANSND, c, v, 1);
255 channelsize(Channel *c, int sz)
258 fprint(2, "expected channel with elements of size %d, got size %d",
265 sendul(Channel *c, ulong v)
267 channelsize(c, sizeof(ulong));
276 channelsize(c, sizeof(ulong));
283 sendp(Channel *c, void *v)
285 channelsize(c, sizeof(void*));
294 channelsize(c, sizeof(void*));
301 nbsendul(Channel *c, ulong v)
303 channelsize(c, sizeof(ulong));
304 return nbsend(c, &v);
312 channelsize(c, sizeof(ulong));
313 if(nbrecv(c, &v) == 0)
319 nbsendp(Channel *c, void *v)
321 channelsize(c, sizeof(void*));
322 return nbsend(c, &v);
330 channelsize(c, sizeof(void*));
331 if(nbrecv(c, &v) == 0)
337 emptyentry(Channel *c)
341 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
343 for(i=0; i<c->nentry; i++)
344 if(c->qentry[i]==nil)
349 if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry;
350 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
352 sysfatal("realloc channel entries: %r");
353 _threadmemset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
358 enqueue(Alt *a, Channel **c)
362 _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
364 i = emptyentry(a->c);
375 for(i=0; i<c->nentry; i++)
377 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
386 altexecbuffered(Alt *a, int willreplace)
392 /* use buffered channel queue */
393 if(a->op==CHANRCV && c->n > 0){
394 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
395 v = c->v + c->e*(c->f%c->s);
401 if(a->op==CHANSND && c->n < c->s){
402 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
403 v = c->v + c->e*((c->f+c->n)%c->s);
413 altcopy(void *dst, void *src, int sz)
417 memmove(dst, src, sz);
419 _threadmemset(dst, 0, sz);
424 altexec(Alt *a, int spl)
429 void *me, *waiter, *buf;
433 /* rendezvous with others */
434 otherop = (CHANSND+CHANRCV) - a->op;
438 for(i=0; i<c->nentry; i++)
439 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
443 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
447 * if buffer is full and there are waiters
448 * and we're meeting a waiter,
449 * we must be receiving.
451 * we use the value in the channel buffer,
452 * copy the waiter's value into the channel buffer
453 * on behalf of the waiter, and then wake the waiter.
457 buf = altexecbuffered(a, 1);
458 altcopy(me, buf, c->e);
459 altcopy(buf, waiter, c->e);
462 altcopy(me, waiter, c->e);
464 altcopy(waiter, me, c->e);
466 *b->tag = c; /* commits us to rendezvous */
467 _threaddebug(DBGCHAN, "unlocking the chanlock");
470 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
471 while(_threadrendezvous((ulong)b->tag, 0) == ~0)
476 buf = altexecbuffered(a, 0);
478 altcopy(me, buf, c->e);
480 altcopy(buf, me, c->e);