Blob


1 #include "threadimpl.h"
3 static Lock chanlock; /* central channel access lock */
5 static void enqueue(Alt*, Thread*);
6 static void dequeue(Alt*);
7 static int altexec(Alt*);
9 int _threadhighnentry;
10 int _threadnalt;
12 static void
13 setuserpc(ulong pc)
14 {
15 Thread *t;
17 t = _threadgetproc()->thread;
18 if(t)
19 t->userpc = pc;
20 }
22 static int
23 canexec(Alt *a)
24 {
25 int i, otherop;
26 Channel *c;
28 c = a->c;
29 /* are there senders or receivers blocked? */
30 otherop = (CHANSND+CHANRCV) - a->op;
31 for(i=0; i<c->nentry; i++)
32 if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil){
33 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
34 return 1;
35 }
37 /* is there room in the channel? */
38 if((a->op==CHANSND && c->n < c->s)
39 || (a->op==CHANRCV && c->n > 0)){
40 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
41 return 1;
42 }
44 return 0;
45 }
47 static void
48 _chanfree(Channel *c)
49 {
50 int i, inuse;
52 inuse = 0;
53 for(i = 0; i < c->nentry; i++)
54 if(c->qentry[i])
55 inuse = 1;
56 if(inuse)
57 c->freed = 1;
58 else{
59 if(c->qentry)
60 free(c->qentry);
61 free(c);
62 }
63 }
65 void
66 chanfree(Channel *c)
67 {
68 lock(&chanlock);
69 _chanfree(c);
70 unlock(&chanlock);
71 }
73 int
74 chaninit(Channel *c, int elemsize, int elemcnt)
75 {
76 if(elemcnt < 0 || elemsize <= 0 || c == nil)
77 return -1;
78 memset(c, 0, sizeof *c);
79 c->e = elemsize;
80 c->s = elemcnt;
81 _threaddebug(DBGCHAN, "chaninit %p", c);
82 return 1;
83 }
85 Channel*
86 chancreate(int elemsize, int elemcnt)
87 {
88 Channel *c;
90 if(elemcnt < 0 || elemsize <= 0)
91 return nil;
92 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
93 c->e = elemsize;
94 c->s = elemcnt;
95 _threaddebug(DBGCHAN, "chancreate %p", c);
96 return c;
97 }
99 static int
100 _alt(Alt *alts)
102 Alt *a, *xa;
103 Channel *c;
104 int n;
105 Thread *t;
107 /*
108 * The point of going splhi here is that note handlers
109 * might reasonably want to use channel operations,
110 * but that will hang if the note comes while we hold the
111 * chanlock. Instead, we delay the note until we've dropped
112 * the lock.
113 */
115 /*
116 * T might be nil here -- the scheduler sends on threadwaitchan
117 * directly (in non-blocking mode, of course!).
118 */
119 t = _threadgetproc()->thread;
120 if((t && t->moribund) || _threadexitsallstatus)
121 yield(); /* won't return */
122 lock(&chanlock);
124 /* test whether any channels can proceed */
125 n = 0;
126 a = nil;
128 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
129 xa->entryno = -1;
130 if(xa->op == CHANNOP)
131 continue;
133 c = xa->c;
134 if(c==nil){
135 unlock(&chanlock);
136 return -1;
138 if(canexec(xa))
139 if(nrand(++n) == 0)
140 a = xa;
143 if(a==nil){
144 /* nothing can proceed */
145 if(xa->op == CHANNOBLK){
146 unlock(&chanlock);
147 _threadnalt++;
148 return xa - alts;
151 /* enqueue on all channels. */
152 t->altc = nil;
153 for(xa=alts; xa->op!=CHANEND; xa++){
154 if(xa->op==CHANNOP)
155 continue;
156 enqueue(xa, t);
159 /*
160 * wait for successful rendezvous.
161 * we can't just give up if the rendezvous
162 * is interrupted -- someone else might come
163 * along and try to rendezvous with us, so
164 * we need to be here.
166 * actually, now we're assuming no interrupts.
167 */
168 /*Again:*/
169 t->alt = alts;
170 t->chan = Chanalt;
171 t->altrend.l = &chanlock;
172 _threadsleep(&t->altrend);
174 /* dequeue from channels, find selected one */
175 a = nil;
176 c = t->altc;
177 for(xa=alts; xa->op!=CHANEND; xa++){
178 if(xa->op==CHANNOP)
179 continue;
180 if(xa->c == c)
181 a = xa;
182 dequeue(xa);
184 unlock(&chanlock);
185 if(a == nil){ /* we were interrupted */
186 assert(c==(Channel*)~0);
187 return -1;
189 }else{
190 altexec(a); /* unlocks chanlock, does splx */
192 if(t)
193 t->chan = Channone;
194 return a - alts;
197 int
198 alt(Alt *alts)
200 setuserpc(getcallerpc(&alts));
201 return _alt(alts);
204 static int
205 runop(int op, Channel *c, void *v, int nb)
207 int r;
208 Alt a[2];
210 /*
211 * we could do this without calling alt,
212 * but the only reason would be performance,
213 * and i'm not convinced it matters.
214 */
215 a[0].op = op;
216 a[0].c = c;
217 a[0].v = v;
218 a[1].op = CHANEND;
219 if(nb)
220 a[1].op = CHANNOBLK;
221 switch(r=_alt(a)){
222 case -1: /* interrupted */
223 return -1;
224 case 1: /* nonblocking, didn't accomplish anything */
225 assert(nb);
226 return 0;
227 case 0:
228 return 1;
229 default:
230 fprint(2, "ERROR: channel alt returned %d\n", r);
231 abort();
232 return -1;
236 int
237 recv(Channel *c, void *v)
239 setuserpc(getcallerpc(&c));
240 return runop(CHANRCV, c, v, 0);
243 int
244 nbrecv(Channel *c, void *v)
246 setuserpc(getcallerpc(&c));
247 return runop(CHANRCV, c, v, 1);
250 int
251 send(Channel *c, void *v)
253 setuserpc(getcallerpc(&c));
254 return runop(CHANSND, c, v, 0);
257 int
258 nbsend(Channel *c, void *v)
260 setuserpc(getcallerpc(&c));
261 return runop(CHANSND, c, v, 1);
264 static void
265 channelsize(Channel *c, int sz)
267 if(c->e != sz){
268 fprint(2, "expected channel with elements of size %d, got size %d\n",
269 sz, c->e);
270 abort();
274 int
275 sendul(Channel *c, ulong v)
277 setuserpc(getcallerpc(&c));
278 channelsize(c, sizeof(ulong));
279 return send(c, &v);
282 ulong
283 recvul(Channel *c)
285 ulong v;
287 setuserpc(getcallerpc(&c));
288 channelsize(c, sizeof(ulong));
289 if(runop(CHANRCV, c, &v, 0) < 0)
290 return ~0;
291 return v;
294 int
295 sendp(Channel *c, void *v)
297 setuserpc(getcallerpc(&c));
298 channelsize(c, sizeof(void*));
299 return runop(CHANSND, c, &v, 0);
302 void*
303 recvp(Channel *c)
305 void *v;
307 setuserpc(getcallerpc(&c));
308 channelsize(c, sizeof(void*));
309 if(runop(CHANRCV, c, &v, 0) < 0)
310 return nil;
311 return v;
314 int
315 nbsendul(Channel *c, ulong v)
317 setuserpc(getcallerpc(&c));
318 channelsize(c, sizeof(ulong));
319 return runop(CHANSND, c, &v, 1);
322 ulong
323 nbrecvul(Channel *c)
325 ulong v;
327 setuserpc(getcallerpc(&c));
328 channelsize(c, sizeof(ulong));
329 if(runop(CHANRCV, c, &v, 1) == 0)
330 return 0;
331 return v;
334 int
335 nbsendp(Channel *c, void *v)
337 setuserpc(getcallerpc(&c));
338 channelsize(c, sizeof(void*));
339 return runop(CHANSND, c, &v, 1);
342 void*
343 nbrecvp(Channel *c)
345 void *v;
347 setuserpc(getcallerpc(&c));
348 channelsize(c, sizeof(void*));
349 if(runop(CHANRCV, c, &v, 1) == 0)
350 return nil;
351 return v;
354 static int
355 emptyentry(Channel *c)
357 int i, extra;
359 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
361 for(i=0; i<c->nentry; i++)
362 if(c->qentry[i]==nil)
363 return i;
365 extra = 16;
366 c->nentry += extra;
367 if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry;
368 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
369 if(c->qentry == nil)
370 sysfatal("realloc channel entries: %r");
371 _threadmemset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
372 return i;
375 static void
376 enqueue(Alt *a, Thread *t)
378 int i;
380 _threaddebug(DBGCHAN, "Queueing alt %p on channel %p", a, a->c);
381 a->thread = t;
382 i = emptyentry(a->c);
383 a->c->qentry[i] = a;
386 static void
387 dequeue(Alt *a)
389 int i;
390 Channel *c;
392 c = a->c;
393 for(i=0; i<c->nentry; i++)
394 if(c->qentry[i]==a){
395 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
396 c->qentry[i] = nil;
397 if(c->freed)
398 _chanfree(c);
399 return;
403 static void*
404 altexecbuffered(Alt *a, int willreplace)
406 uchar *v;
407 Channel *c;
409 c = a->c;
410 /* use buffered channel queue */
411 if(a->op==CHANRCV && c->n > 0){
412 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
413 v = c->v + c->e*(c->f%c->s);
414 if(!willreplace)
415 c->n--;
416 c->f++;
417 return v;
419 if(a->op==CHANSND && c->n < c->s){
420 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
421 v = c->v + c->e*((c->f+c->n)%c->s);
422 if(!willreplace)
423 c->n++;
424 return v;
426 abort();
427 return nil;
430 static void
431 altcopy(void *dst, void *src, int sz)
433 if(dst){
434 if(src)
435 memmove(dst, src, sz);
436 else
437 _threadmemset(dst, 0, sz);
441 static int
442 altexec(Alt *a)
444 volatile Alt *b;
445 int i, n, otherop;
446 Channel *c;
447 void *me, *waiter, *buf;
449 c = a->c;
451 /* rendezvous with others */
452 otherop = (CHANSND+CHANRCV) - a->op;
453 n = 0;
454 b = nil;
455 me = a->v;
456 for(i=0; i<c->nentry; i++)
457 if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil)
458 if(nrand(++n) == 0)
459 b = c->qentry[i];
460 if(b != nil){
461 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
462 waiter = b->v;
463 if(c->s && c->n){
464 /*
465 * if buffer is full and there are waiters
466 * and we're meeting a waiter,
467 * we must be receiving.
469 * we use the value in the channel buffer,
470 * copy the waiter's value into the channel buffer
471 * on behalf of the waiter, and then wake the waiter.
472 */
473 if(a->op!=CHANRCV)
474 abort();
475 buf = altexecbuffered(a, 1);
476 altcopy(me, buf, c->e);
477 altcopy(buf, waiter, c->e);
478 }else{
479 if(a->op==CHANRCV)
480 altcopy(me, waiter, c->e);
481 else
482 altcopy(waiter, me, c->e);
484 b->thread->altc = c;
485 _threadwakeup(&b->thread->altrend);
486 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock);
487 _threaddebug(DBGCHAN, "unlocking the chanlock");
488 unlock(&chanlock);
489 return 1;
492 buf = altexecbuffered(a, 0);
493 if(a->op==CHANRCV)
494 altcopy(me, buf, c->e);
495 else
496 altcopy(buf, me, c->e);
498 unlock(&chanlock);
499 return 1;