1 #include "threadimpl.h"
3 static Lock chanlock; /* central channel access lock */
5 static void enqueue(Alt*, Thread*);
6 static void dequeue(Alt*);
7 static int altexec(Alt*);
17 t = _threadgetproc()->thread;
29 /* are there senders or receivers blocked? */
30 otherop = (CHANSND+CHANRCV) - a->op;
31 for(i=0; i<c->nentry; i++)
32 if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil){
33 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
37 /* is there room in the channel? */
38 if((a->op==CHANSND && c->n < c->s)
39 || (a->op==CHANRCV && c->n > 0)){
40 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
53 for(i = 0; i < c->nentry; i++)
74 chaninit(Channel *c, int elemsize, int elemcnt)
76 if(elemcnt < 0 || elemsize <= 0 || c == nil)
78 memset(c, 0, sizeof *c);
81 _threaddebug(DBGCHAN, "chaninit %p", c);
86 chancreate(int elemsize, int elemcnt)
90 if(elemcnt < 0 || elemsize <= 0)
92 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
95 _threaddebug(DBGCHAN, "chancreate %p", c);
108 * The point of going splhi here is that note handlers
109 * might reasonably want to use channel operations,
110 * but that will hang if the note comes while we hold the
111 * chanlock. Instead, we delay the note until we've dropped
116 * T might be nil here -- the scheduler sends on threadwaitchan
117 * directly (in non-blocking mode, of course!).
119 t = _threadgetproc()->thread;
120 if((t && t->moribund) || _threadexitsallstatus)
121 yield(); /* won't return */
124 /* test whether any channels can proceed */
128 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
130 if(xa->op == CHANNOP)
144 /* nothing can proceed */
145 if(xa->op == CHANNOBLK){
151 /* enqueue on all channels. */
153 for(xa=alts; xa->op!=CHANEND; xa++){
160 * wait for successful rendezvous.
161 * we can't just give up if the rendezvous
162 * is interrupted -- someone else might come
163 * along and try to rendezvous with us, so
164 * we need to be here.
166 * actually, now we're assuming no interrupts.
171 t->altrend.l = &chanlock;
172 _threadsleep(&t->altrend);
174 /* dequeue from channels, find selected one */
177 for(xa=alts; xa->op!=CHANEND; xa++){
185 if(a == nil){ /* we were interrupted */
186 assert(c==(Channel*)~0);
190 altexec(a); /* unlocks chanlock, does splx */
200 setuserpc(getcallerpc(&alts));
205 runop(int op, Channel *c, void *v, int nb)
211 * we could do this without calling alt,
212 * but the only reason would be performance,
213 * and i'm not convinced it matters.
222 case -1: /* interrupted */
224 case 1: /* nonblocking, didn't accomplish anything */
230 fprint(2, "ERROR: channel alt returned %d\n", r);
237 recv(Channel *c, void *v)
239 setuserpc(getcallerpc(&c));
240 return runop(CHANRCV, c, v, 0);
244 nbrecv(Channel *c, void *v)
246 setuserpc(getcallerpc(&c));
247 return runop(CHANRCV, c, v, 1);
251 send(Channel *c, void *v)
253 setuserpc(getcallerpc(&c));
254 return runop(CHANSND, c, v, 0);
258 nbsend(Channel *c, void *v)
260 setuserpc(getcallerpc(&c));
261 return runop(CHANSND, c, v, 1);
265 channelsize(Channel *c, int sz)
268 fprint(2, "expected channel with elements of size %d, got size %d\n",
275 sendul(Channel *c, ulong v)
277 setuserpc(getcallerpc(&c));
278 channelsize(c, sizeof(ulong));
287 setuserpc(getcallerpc(&c));
288 channelsize(c, sizeof(ulong));
289 if(runop(CHANRCV, c, &v, 0) < 0)
295 sendp(Channel *c, void *v)
297 setuserpc(getcallerpc(&c));
298 channelsize(c, sizeof(void*));
299 return runop(CHANSND, c, &v, 0);
307 setuserpc(getcallerpc(&c));
308 channelsize(c, sizeof(void*));
309 if(runop(CHANRCV, c, &v, 0) < 0)
315 nbsendul(Channel *c, ulong v)
317 setuserpc(getcallerpc(&c));
318 channelsize(c, sizeof(ulong));
319 return runop(CHANSND, c, &v, 1);
327 setuserpc(getcallerpc(&c));
328 channelsize(c, sizeof(ulong));
329 if(runop(CHANRCV, c, &v, 1) == 0)
335 nbsendp(Channel *c, void *v)
337 setuserpc(getcallerpc(&c));
338 channelsize(c, sizeof(void*));
339 return runop(CHANSND, c, &v, 1);
347 setuserpc(getcallerpc(&c));
348 channelsize(c, sizeof(void*));
349 if(runop(CHANRCV, c, &v, 1) == 0)
355 emptyentry(Channel *c)
359 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
361 for(i=0; i<c->nentry; i++)
362 if(c->qentry[i]==nil)
367 if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry;
368 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
370 sysfatal("realloc channel entries: %r");
371 _threadmemset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
376 enqueue(Alt *a, Thread *t)
380 _threaddebug(DBGCHAN, "Queueing alt %p on channel %p", a, a->c);
382 i = emptyentry(a->c);
393 for(i=0; i<c->nentry; i++)
395 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
404 altexecbuffered(Alt *a, int willreplace)
410 /* use buffered channel queue */
411 if(a->op==CHANRCV && c->n > 0){
412 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
413 v = c->v + c->e*(c->f%c->s);
419 if(a->op==CHANSND && c->n < c->s){
420 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
421 v = c->v + c->e*((c->f+c->n)%c->s);
431 altcopy(void *dst, void *src, int sz)
435 memmove(dst, src, sz);
437 _threadmemset(dst, 0, sz);
447 void *me, *waiter, *buf;
451 /* rendezvous with others */
452 otherop = (CHANSND+CHANRCV) - a->op;
456 for(i=0; i<c->nentry; i++)
457 if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil)
461 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
465 * if buffer is full and there are waiters
466 * and we're meeting a waiter,
467 * we must be receiving.
469 * we use the value in the channel buffer,
470 * copy the waiter's value into the channel buffer
471 * on behalf of the waiter, and then wake the waiter.
475 buf = altexecbuffered(a, 1);
476 altcopy(me, buf, c->e);
477 altcopy(buf, waiter, c->e);
480 altcopy(me, waiter, c->e);
482 altcopy(waiter, me, c->e);
485 _threadwakeup(&b->thread->altrend);
486 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock);
487 _threaddebug(DBGCHAN, "unlocking the chanlock");
492 buf = altexecbuffered(a, 0);
494 altcopy(me, buf, c->e);
496 altcopy(buf, me, c->e);