Blob


1 #include "threadimpl.h"
3 static Lock chanlock; /* central channel access lock */
5 static void enqueue(Alt*, Channel**);
6 static void dequeue(Alt*);
7 static int altexec(Alt*, int);
9 int _threadhighnentry;
10 int _threadnalt;
12 static int
13 canexec(Alt *a)
14 {
15 int i, otherop;
16 Channel *c;
18 c = a->c;
19 /* are there senders or receivers blocked? */
20 otherop = (CHANSND+CHANRCV) - a->op;
21 for(i=0; i<c->nentry; i++)
22 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
23 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
24 return 1;
25 }
27 /* is there room in the channel? */
28 if((a->op==CHANSND && c->n < c->s)
29 || (a->op==CHANRCV && c->n > 0)){
30 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
31 return 1;
32 }
34 return 0;
35 }
37 static void
38 _chanfree(Channel *c)
39 {
40 int i, inuse;
42 inuse = 0;
43 for(i = 0; i < c->nentry; i++)
44 if(c->qentry[i])
45 inuse = 1;
46 if(inuse)
47 c->freed = 1;
48 else{
49 if(c->qentry)
50 free(c->qentry);
51 free(c);
52 }
53 }
55 void
56 chanfree(Channel *c)
57 {
58 lock(&chanlock);
59 _chanfree(c);
60 unlock(&chanlock);
61 }
63 int
64 chaninit(Channel *c, int elemsize, int elemcnt)
65 {
66 if(elemcnt < 0 || elemsize <= 0 || c == nil)
67 return -1;
68 c->f = 0;
69 c->n = 0;
70 c->freed = 0;
71 c->e = elemsize;
72 c->s = elemcnt;
73 _threaddebug(DBGCHAN, "chaninit %p", c);
74 return 1;
75 }
77 Channel*
78 chancreate(int elemsize, int elemcnt)
79 {
80 Channel *c;
82 if(elemcnt < 0 || elemsize <= 0)
83 return nil;
84 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
85 c->e = elemsize;
86 c->s = elemcnt;
87 _threaddebug(DBGCHAN, "chancreate %p", c);
88 return c;
89 }
91 int
92 alt(Alt *alts)
93 {
94 Alt *a, *xa;
95 Channel *volatile c;
96 int n, s;
97 ulong r;
98 Thread *t;
100 /*
101 * The point of going splhi here is that note handlers
102 * might reasonably want to use channel operations,
103 * but that will hang if the note comes while we hold the
104 * chanlock. Instead, we delay the note until we've dropped
105 * the lock.
106 */
107 t = _threadgetproc()->thread;
108 if(t->moribund || _threadexitsallstatus)
109 yield(); /* won't return */
110 s = _procsplhi();
111 lock(&chanlock);
112 t->alt = alts;
113 t->chan = Chanalt;
115 /* test whether any channels can proceed */
116 n = 0;
117 a = nil;
119 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
120 xa->entryno = -1;
121 if(xa->op == CHANNOP)
122 continue;
124 c = xa->c;
125 if(c==nil){
126 unlock(&chanlock);
127 _procsplx(s);
128 t->chan = Channone;
129 return -1;
131 if(canexec(xa))
132 if(nrand(++n) == 0)
133 a = xa;
136 if(a==nil){
137 /* nothing can proceed */
138 if(xa->op == CHANNOBLK){
139 unlock(&chanlock);
140 _procsplx(s);
141 t->chan = Channone;
142 _threadnalt++;
143 return xa - alts;
146 /* enqueue on all channels. */
147 c = nil;
148 for(xa=alts; xa->op!=CHANEND; xa++){
149 if(xa->op==CHANNOP)
150 continue;
151 enqueue(xa, (Channel**)&c);
154 /*
155 * wait for successful rendezvous.
156 * we can't just give up if the rendezvous
157 * is interrupted -- someone else might come
158 * along and try to rendezvous with us, so
159 * we need to be here.
160 */
161 Again:
162 unlock(&chanlock);
163 _procsplx(s);
164 r = _threadrendezvous((ulong)&c, 0);
165 s = _procsplhi();
166 lock(&chanlock);
168 if(r==~0){ /* interrupted */
169 if(c!=nil) /* someone will meet us; go back */
170 goto Again;
171 c = (Channel*)~0; /* so no one tries to meet us */
174 /* dequeue from channels, find selected one */
175 a = nil;
176 for(xa=alts; xa->op!=CHANEND; xa++){
177 if(xa->op==CHANNOP)
178 continue;
179 if(xa->c == c)
180 a = xa;
181 dequeue(xa);
183 unlock(&chanlock);
184 _procsplx(s);
185 if(a == nil){ /* we were interrupted */
186 assert(c==(Channel*)~0);
187 return -1;
189 }else{
190 altexec(a, s); /* unlocks chanlock, does splx */
192 _sched();
193 t->chan = Channone;
194 _threadnalt++;
195 return a - alts;
198 static int
199 runop(int op, Channel *c, void *v, int nb)
201 int r;
202 Alt a[2];
204 /*
205 * we could do this without calling alt,
206 * but the only reason would be performance,
207 * and i'm not convinced it matters.
208 */
209 a[0].op = op;
210 a[0].c = c;
211 a[0].v = v;
212 a[1].op = CHANEND;
213 if(nb)
214 a[1].op = CHANNOBLK;
215 switch(r=alt(a)){
216 case -1: /* interrupted */
217 return -1;
218 case 1: /* nonblocking, didn't accomplish anything */
219 assert(nb);
220 return 0;
221 case 0:
222 return 1;
223 default:
224 fprint(2, "ERROR: channel alt returned %d\n", r);
225 abort();
226 return -1;
230 int
231 recv(Channel *c, void *v)
233 return runop(CHANRCV, c, v, 0);
236 int
237 nbrecv(Channel *c, void *v)
239 return runop(CHANRCV, c, v, 1);
242 int
243 send(Channel *c, void *v)
245 return runop(CHANSND, c, v, 0);
248 int
249 nbsend(Channel *c, void *v)
251 return runop(CHANSND, c, v, 1);
254 static void
255 channelsize(Channel *c, int sz)
257 if(c->e != sz){
258 fprint(2, "expected channel with elements of size %d, got size %d",
259 sz, c->e);
260 abort();
264 int
265 sendul(Channel *c, ulong v)
267 channelsize(c, sizeof(ulong));
268 return send(c, &v);
271 ulong
272 recvul(Channel *c)
274 ulong v;
276 channelsize(c, sizeof(ulong));
277 if(recv(c, &v) < 0)
278 return ~0;
279 return v;
282 int
283 sendp(Channel *c, void *v)
285 channelsize(c, sizeof(void*));
286 return send(c, &v);
289 void*
290 recvp(Channel *c)
292 void *v;
294 channelsize(c, sizeof(void*));
295 if(recv(c, &v) < 0)
296 return nil;
297 return v;
300 int
301 nbsendul(Channel *c, ulong v)
303 channelsize(c, sizeof(ulong));
304 return nbsend(c, &v);
307 ulong
308 nbrecvul(Channel *c)
310 ulong v;
312 channelsize(c, sizeof(ulong));
313 if(nbrecv(c, &v) == 0)
314 return 0;
315 return v;
318 int
319 nbsendp(Channel *c, void *v)
321 channelsize(c, sizeof(void*));
322 return nbsend(c, &v);
325 void*
326 nbrecvp(Channel *c)
328 void *v;
330 channelsize(c, sizeof(void*));
331 if(nbrecv(c, &v) == 0)
332 return nil;
333 return v;
336 static int
337 emptyentry(Channel *c)
339 int i, extra;
341 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
343 for(i=0; i<c->nentry; i++)
344 if(c->qentry[i]==nil)
345 return i;
347 extra = 16;
348 c->nentry += extra;
349 if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry;
350 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
351 if(c->qentry == nil)
352 sysfatal("realloc channel entries: %r");
353 _threadmemset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
354 return i;
357 static void
358 enqueue(Alt *a, Channel **c)
360 int i;
362 _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
363 a->tag = c;
364 i = emptyentry(a->c);
365 a->c->qentry[i] = a;
368 static void
369 dequeue(Alt *a)
371 int i;
372 Channel *c;
374 c = a->c;
375 for(i=0; i<c->nentry; i++)
376 if(c->qentry[i]==a){
377 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
378 c->qentry[i] = nil;
379 if(c->freed)
380 _chanfree(c);
381 return;
385 static void*
386 altexecbuffered(Alt *a, int willreplace)
388 uchar *v;
389 Channel *c;
391 c = a->c;
392 /* use buffered channel queue */
393 if(a->op==CHANRCV && c->n > 0){
394 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
395 v = c->v + c->e*(c->f%c->s);
396 if(!willreplace)
397 c->n--;
398 c->f++;
399 return v;
401 if(a->op==CHANSND && c->n < c->s){
402 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
403 v = c->v + c->e*((c->f+c->n)%c->s);
404 if(!willreplace)
405 c->n++;
406 return v;
408 abort();
409 return nil;
412 static void
413 altcopy(void *dst, void *src, int sz)
415 if(dst){
416 if(src)
417 memmove(dst, src, sz);
418 else
419 _threadmemset(dst, 0, sz);
423 static int
424 altexec(Alt *a, int spl)
426 volatile Alt *b;
427 int i, n, otherop;
428 Channel *c;
429 void *me, *waiter, *buf;
431 c = a->c;
433 /* rendezvous with others */
434 otherop = (CHANSND+CHANRCV) - a->op;
435 n = 0;
436 b = nil;
437 me = a->v;
438 for(i=0; i<c->nentry; i++)
439 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
440 if(nrand(++n) == 0)
441 b = c->qentry[i];
442 if(b != nil){
443 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
444 waiter = b->v;
445 if(c->s && c->n){
446 /*
447 * if buffer is full and there are waiters
448 * and we're meeting a waiter,
449 * we must be receiving.
451 * we use the value in the channel buffer,
452 * copy the waiter's value into the channel buffer
453 * on behalf of the waiter, and then wake the waiter.
454 */
455 if(a->op!=CHANRCV)
456 abort();
457 buf = altexecbuffered(a, 1);
458 altcopy(me, buf, c->e);
459 altcopy(buf, waiter, c->e);
460 }else{
461 if(a->op==CHANRCV)
462 altcopy(me, waiter, c->e);
463 else
464 altcopy(waiter, me, c->e);
466 *b->tag = c; /* commits us to rendezvous */
467 _threaddebug(DBGCHAN, "unlocking the chanlock");
468 unlock(&chanlock);
469 _procsplx(spl);
470 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
471 while(_threadrendezvous((ulong)b->tag, 0) == ~0)
473 return 1;
476 buf = altexecbuffered(a, 0);
477 if(a->op==CHANRCV)
478 altcopy(me, buf, c->e);
479 else
480 altcopy(buf, me, c->e);
482 unlock(&chanlock);
483 _procsplx(spl);
484 return 1;