1 76193d7c 2003-09-30 devnull #include "threadimpl.h"
3 76193d7c 2003-09-30 devnull static Lock chanlock; /* central channel access lock */
5 76193d7c 2003-09-30 devnull static void enqueue(Alt*, Channel**);
6 76193d7c 2003-09-30 devnull static void dequeue(Alt*);
7 76193d7c 2003-09-30 devnull static int altexec(Alt*, int);
9 76193d7c 2003-09-30 devnull int _threadhighnentry;
10 76193d7c 2003-09-30 devnull int _threadnalt;
12 76193d7c 2003-09-30 devnull static int
13 76193d7c 2003-09-30 devnull canexec(Alt *a)
15 76193d7c 2003-09-30 devnull int i, otherop;
16 76193d7c 2003-09-30 devnull Channel *c;
18 76193d7c 2003-09-30 devnull c = a->c;
19 76193d7c 2003-09-30 devnull /* are there senders or receivers blocked? */
20 76193d7c 2003-09-30 devnull otherop = (CHANSND+CHANRCV) - a->op;
21 76193d7c 2003-09-30 devnull for(i=0; i<c->nentry; i++)
22 76193d7c 2003-09-30 devnull if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
23 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
24 76193d7c 2003-09-30 devnull return 1;
27 76193d7c 2003-09-30 devnull /* is there room in the channel? */
28 76193d7c 2003-09-30 devnull if((a->op==CHANSND && c->n < c->s)
29 76193d7c 2003-09-30 devnull || (a->op==CHANRCV && c->n > 0)){
30 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
31 76193d7c 2003-09-30 devnull return 1;
34 76193d7c 2003-09-30 devnull return 0;
37 76193d7c 2003-09-30 devnull static void
38 76193d7c 2003-09-30 devnull _chanfree(Channel *c)
40 76193d7c 2003-09-30 devnull int i, inuse;
42 76193d7c 2003-09-30 devnull inuse = 0;
43 76193d7c 2003-09-30 devnull for(i = 0; i < c->nentry; i++)
44 76193d7c 2003-09-30 devnull if(c->qentry[i])
45 76193d7c 2003-09-30 devnull inuse = 1;
46 76193d7c 2003-09-30 devnull if(inuse)
47 76193d7c 2003-09-30 devnull c->freed = 1;
49 76193d7c 2003-09-30 devnull if(c->qentry)
50 76193d7c 2003-09-30 devnull free(c->qentry);
56 76193d7c 2003-09-30 devnull chanfree(Channel *c)
58 76193d7c 2003-09-30 devnull lock(&chanlock);
59 76193d7c 2003-09-30 devnull _chanfree(c);
60 76193d7c 2003-09-30 devnull unlock(&chanlock);
64 76193d7c 2003-09-30 devnull chaninit(Channel *c, int elemsize, int elemcnt)
66 76193d7c 2003-09-30 devnull if(elemcnt < 0 || elemsize <= 0 || c == nil)
67 76193d7c 2003-09-30 devnull return -1;
68 76193d7c 2003-09-30 devnull c->f = 0;
69 76193d7c 2003-09-30 devnull c->n = 0;
70 76193d7c 2003-09-30 devnull c->freed = 0;
71 76193d7c 2003-09-30 devnull c->e = elemsize;
72 76193d7c 2003-09-30 devnull c->s = elemcnt;
73 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "chaninit %p", c);
74 76193d7c 2003-09-30 devnull return 1;
78 76193d7c 2003-09-30 devnull chancreate(int elemsize, int elemcnt)
80 76193d7c 2003-09-30 devnull Channel *c;
82 76193d7c 2003-09-30 devnull if(elemcnt < 0 || elemsize <= 0)
83 76193d7c 2003-09-30 devnull return nil;
84 76193d7c 2003-09-30 devnull c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
85 76193d7c 2003-09-30 devnull c->e = elemsize;
86 76193d7c 2003-09-30 devnull c->s = elemcnt;
87 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "chancreate %p", c);
88 76193d7c 2003-09-30 devnull return c;
92 76193d7c 2003-09-30 devnull alt(Alt *alts)
94 76193d7c 2003-09-30 devnull Alt *a, *xa;
95 76193d7c 2003-09-30 devnull Channel *volatile c;
96 76193d7c 2003-09-30 devnull int n, s;
98 76193d7c 2003-09-30 devnull Thread *t;
101 76193d7c 2003-09-30 devnull * The point of going splhi here is that note handlers
102 76193d7c 2003-09-30 devnull * might reasonably want to use channel operations,
103 76193d7c 2003-09-30 devnull * but that will hang if the note comes while we hold the
104 76193d7c 2003-09-30 devnull * chanlock. Instead, we delay the note until we've dropped
105 76193d7c 2003-09-30 devnull * the lock.
107 76193d7c 2003-09-30 devnull t = _threadgetproc()->thread;
108 76193d7c 2003-09-30 devnull if(t->moribund || _threadexitsallstatus)
109 76193d7c 2003-09-30 devnull yield(); /* won't return */
110 76193d7c 2003-09-30 devnull s = _procsplhi();
111 76193d7c 2003-09-30 devnull lock(&chanlock);
112 76193d7c 2003-09-30 devnull t->alt = alts;
113 76193d7c 2003-09-30 devnull t->chan = Chanalt;
115 76193d7c 2003-09-30 devnull /* test whether any channels can proceed */
117 76193d7c 2003-09-30 devnull a = nil;
119 76193d7c 2003-09-30 devnull for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
120 76193d7c 2003-09-30 devnull xa->entryno = -1;
121 76193d7c 2003-09-30 devnull if(xa->op == CHANNOP)
122 76193d7c 2003-09-30 devnull continue;
124 76193d7c 2003-09-30 devnull c = xa->c;
125 76193d7c 2003-09-30 devnull if(c==nil){
126 76193d7c 2003-09-30 devnull unlock(&chanlock);
127 76193d7c 2003-09-30 devnull _procsplx(s);
128 76193d7c 2003-09-30 devnull t->chan = Channone;
129 76193d7c 2003-09-30 devnull return -1;
131 76193d7c 2003-09-30 devnull if(canexec(xa))
132 76193d7c 2003-09-30 devnull if(nrand(++n) == 0)
136 76193d7c 2003-09-30 devnull if(a==nil){
137 76193d7c 2003-09-30 devnull /* nothing can proceed */
138 76193d7c 2003-09-30 devnull if(xa->op == CHANNOBLK){
139 76193d7c 2003-09-30 devnull unlock(&chanlock);
140 76193d7c 2003-09-30 devnull _procsplx(s);
141 76193d7c 2003-09-30 devnull t->chan = Channone;
142 76193d7c 2003-09-30 devnull _threadnalt++;
143 76193d7c 2003-09-30 devnull return xa - alts;
146 76193d7c 2003-09-30 devnull /* enqueue on all channels. */
147 76193d7c 2003-09-30 devnull c = nil;
148 76193d7c 2003-09-30 devnull for(xa=alts; xa->op!=CHANEND; xa++){
149 76193d7c 2003-09-30 devnull if(xa->op==CHANNOP)
150 76193d7c 2003-09-30 devnull continue;
151 76193d7c 2003-09-30 devnull enqueue(xa, (Channel**)&c);
155 76193d7c 2003-09-30 devnull * wait for successful rendezvous.
156 76193d7c 2003-09-30 devnull * we can't just give up if the rendezvous
157 76193d7c 2003-09-30 devnull * is interrupted -- someone else might come
158 76193d7c 2003-09-30 devnull * along and try to rendezvous with us, so
159 76193d7c 2003-09-30 devnull * we need to be here.
162 76193d7c 2003-09-30 devnull unlock(&chanlock);
163 76193d7c 2003-09-30 devnull _procsplx(s);
164 76193d7c 2003-09-30 devnull r = _threadrendezvous((ulong)&c, 0);
165 76193d7c 2003-09-30 devnull s = _procsplhi();
166 76193d7c 2003-09-30 devnull lock(&chanlock);
168 76193d7c 2003-09-30 devnull if(r==~0){ /* interrupted */
169 76193d7c 2003-09-30 devnull if(c!=nil) /* someone will meet us; go back */
170 76193d7c 2003-09-30 devnull goto Again;
171 76193d7c 2003-09-30 devnull c = (Channel*)~0; /* so no one tries to meet us */
174 76193d7c 2003-09-30 devnull /* dequeue from channels, find selected one */
175 76193d7c 2003-09-30 devnull a = nil;
176 76193d7c 2003-09-30 devnull for(xa=alts; xa->op!=CHANEND; xa++){
177 76193d7c 2003-09-30 devnull if(xa->op==CHANNOP)
178 76193d7c 2003-09-30 devnull continue;
179 76193d7c 2003-09-30 devnull if(xa->c == c)
181 76193d7c 2003-09-30 devnull dequeue(xa);
183 76193d7c 2003-09-30 devnull unlock(&chanlock);
184 76193d7c 2003-09-30 devnull _procsplx(s);
185 76193d7c 2003-09-30 devnull if(a == nil){ /* we were interrupted */
186 76193d7c 2003-09-30 devnull assert(c==(Channel*)~0);
187 76193d7c 2003-09-30 devnull return -1;
190 76193d7c 2003-09-30 devnull altexec(a, s); /* unlocks chanlock, does splx */
192 76193d7c 2003-09-30 devnull _sched();
193 76193d7c 2003-09-30 devnull t->chan = Channone;
194 76193d7c 2003-09-30 devnull _threadnalt++;
195 76193d7c 2003-09-30 devnull return a - alts;
198 76193d7c 2003-09-30 devnull static int
199 76193d7c 2003-09-30 devnull runop(int op, Channel *c, void *v, int nb)
202 76193d7c 2003-09-30 devnull Alt a[2];
205 76193d7c 2003-09-30 devnull * we could do this without calling alt,
206 76193d7c 2003-09-30 devnull * but the only reason would be performance,
207 76193d7c 2003-09-30 devnull * and i'm not convinced it matters.
209 76193d7c 2003-09-30 devnull a[0].op = op;
210 76193d7c 2003-09-30 devnull a[0].c = c;
211 76193d7c 2003-09-30 devnull a[0].v = v;
212 76193d7c 2003-09-30 devnull a[1].op = CHANEND;
214 76193d7c 2003-09-30 devnull a[1].op = CHANNOBLK;
215 76193d7c 2003-09-30 devnull switch(r=alt(a)){
216 76193d7c 2003-09-30 devnull case -1: /* interrupted */
217 76193d7c 2003-09-30 devnull return -1;
218 76193d7c 2003-09-30 devnull case 1: /* nonblocking, didn't accomplish anything */
219 76193d7c 2003-09-30 devnull assert(nb);
220 76193d7c 2003-09-30 devnull return 0;
222 76193d7c 2003-09-30 devnull return 1;
223 76193d7c 2003-09-30 devnull default:
224 76193d7c 2003-09-30 devnull fprint(2, "ERROR: channel alt returned %d\n", r);
225 76193d7c 2003-09-30 devnull abort();
226 76193d7c 2003-09-30 devnull return -1;
231 76193d7c 2003-09-30 devnull recv(Channel *c, void *v)
233 76193d7c 2003-09-30 devnull return runop(CHANRCV, c, v, 0);
237 76193d7c 2003-09-30 devnull nbrecv(Channel *c, void *v)
239 76193d7c 2003-09-30 devnull return runop(CHANRCV, c, v, 1);
243 76193d7c 2003-09-30 devnull send(Channel *c, void *v)
245 76193d7c 2003-09-30 devnull return runop(CHANSND, c, v, 0);
249 76193d7c 2003-09-30 devnull nbsend(Channel *c, void *v)
251 76193d7c 2003-09-30 devnull return runop(CHANSND, c, v, 1);
254 76193d7c 2003-09-30 devnull static void
255 76193d7c 2003-09-30 devnull channelsize(Channel *c, int sz)
257 76193d7c 2003-09-30 devnull if(c->e != sz){
258 76193d7c 2003-09-30 devnull fprint(2, "expected channel with elements of size %d, got size %d",
259 76193d7c 2003-09-30 devnull sz, c->e);
260 76193d7c 2003-09-30 devnull abort();
265 76193d7c 2003-09-30 devnull sendul(Channel *c, ulong v)
267 76193d7c 2003-09-30 devnull channelsize(c, sizeof(ulong));
268 76193d7c 2003-09-30 devnull return send(c, &v);
272 76193d7c 2003-09-30 devnull recvul(Channel *c)
274 76193d7c 2003-09-30 devnull ulong v;
276 76193d7c 2003-09-30 devnull channelsize(c, sizeof(ulong));
277 76193d7c 2003-09-30 devnull if(recv(c, &v) < 0)
278 76193d7c 2003-09-30 devnull return ~0;
279 76193d7c 2003-09-30 devnull return v;
283 76193d7c 2003-09-30 devnull sendp(Channel *c, void *v)
285 76193d7c 2003-09-30 devnull channelsize(c, sizeof(void*));
286 76193d7c 2003-09-30 devnull return send(c, &v);
290 76193d7c 2003-09-30 devnull recvp(Channel *c)
292 76193d7c 2003-09-30 devnull void *v;
294 76193d7c 2003-09-30 devnull channelsize(c, sizeof(void*));
295 76193d7c 2003-09-30 devnull if(recv(c, &v) < 0)
296 76193d7c 2003-09-30 devnull return nil;
297 76193d7c 2003-09-30 devnull return v;
301 76193d7c 2003-09-30 devnull nbsendul(Channel *c, ulong v)
303 76193d7c 2003-09-30 devnull channelsize(c, sizeof(ulong));
304 76193d7c 2003-09-30 devnull return nbsend(c, &v);
308 76193d7c 2003-09-30 devnull nbrecvul(Channel *c)
310 76193d7c 2003-09-30 devnull ulong v;
312 76193d7c 2003-09-30 devnull channelsize(c, sizeof(ulong));
313 76193d7c 2003-09-30 devnull if(nbrecv(c, &v) == 0)
314 76193d7c 2003-09-30 devnull return 0;
315 76193d7c 2003-09-30 devnull return v;
319 76193d7c 2003-09-30 devnull nbsendp(Channel *c, void *v)
321 76193d7c 2003-09-30 devnull channelsize(c, sizeof(void*));
322 76193d7c 2003-09-30 devnull return nbsend(c, &v);
326 76193d7c 2003-09-30 devnull nbrecvp(Channel *c)
328 76193d7c 2003-09-30 devnull void *v;
330 76193d7c 2003-09-30 devnull channelsize(c, sizeof(void*));
331 76193d7c 2003-09-30 devnull if(nbrecv(c, &v) == 0)
332 76193d7c 2003-09-30 devnull return nil;
333 76193d7c 2003-09-30 devnull return v;
336 76193d7c 2003-09-30 devnull static int
337 76193d7c 2003-09-30 devnull emptyentry(Channel *c)
339 76193d7c 2003-09-30 devnull int i, extra;
341 76193d7c 2003-09-30 devnull assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
343 76193d7c 2003-09-30 devnull for(i=0; i<c->nentry; i++)
344 76193d7c 2003-09-30 devnull if(c->qentry[i]==nil)
345 76193d7c 2003-09-30 devnull return i;
347 76193d7c 2003-09-30 devnull extra = 16;
348 76193d7c 2003-09-30 devnull c->nentry += extra;
349 76193d7c 2003-09-30 devnull if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry;
350 76193d7c 2003-09-30 devnull c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
351 76193d7c 2003-09-30 devnull if(c->qentry == nil)
352 76193d7c 2003-09-30 devnull sysfatal("realloc channel entries: %r");
353 76193d7c 2003-09-30 devnull _threadmemset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
354 76193d7c 2003-09-30 devnull return i;
357 76193d7c 2003-09-30 devnull static void
358 76193d7c 2003-09-30 devnull enqueue(Alt *a, Channel **c)
362 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
363 76193d7c 2003-09-30 devnull a->tag = c;
364 76193d7c 2003-09-30 devnull i = emptyentry(a->c);
365 76193d7c 2003-09-30 devnull a->c->qentry[i] = a;
368 76193d7c 2003-09-30 devnull static void
369 76193d7c 2003-09-30 devnull dequeue(Alt *a)
372 76193d7c 2003-09-30 devnull Channel *c;
374 76193d7c 2003-09-30 devnull c = a->c;
375 76193d7c 2003-09-30 devnull for(i=0; i<c->nentry; i++)
376 76193d7c 2003-09-30 devnull if(c->qentry[i]==a){
377 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
378 76193d7c 2003-09-30 devnull c->qentry[i] = nil;
379 76193d7c 2003-09-30 devnull if(c->freed)
380 76193d7c 2003-09-30 devnull _chanfree(c);
385 76193d7c 2003-09-30 devnull static void*
386 76193d7c 2003-09-30 devnull altexecbuffered(Alt *a, int willreplace)
388 76193d7c 2003-09-30 devnull uchar *v;
389 76193d7c 2003-09-30 devnull Channel *c;
391 76193d7c 2003-09-30 devnull c = a->c;
392 76193d7c 2003-09-30 devnull /* use buffered channel queue */
393 76193d7c 2003-09-30 devnull if(a->op==CHANRCV && c->n > 0){
394 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
395 76193d7c 2003-09-30 devnull v = c->v + c->e*(c->f%c->s);
396 76193d7c 2003-09-30 devnull if(!willreplace)
399 76193d7c 2003-09-30 devnull return v;
401 76193d7c 2003-09-30 devnull if(a->op==CHANSND && c->n < c->s){
402 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
403 76193d7c 2003-09-30 devnull v = c->v + c->e*((c->f+c->n)%c->s);
404 76193d7c 2003-09-30 devnull if(!willreplace)
406 76193d7c 2003-09-30 devnull return v;
408 76193d7c 2003-09-30 devnull abort();
409 76193d7c 2003-09-30 devnull return nil;
412 76193d7c 2003-09-30 devnull static void
413 76193d7c 2003-09-30 devnull altcopy(void *dst, void *src, int sz)
415 76193d7c 2003-09-30 devnull if(dst){
417 76193d7c 2003-09-30 devnull memmove(dst, src, sz);
419 76193d7c 2003-09-30 devnull _threadmemset(dst, 0, sz);
423 76193d7c 2003-09-30 devnull static int
424 76193d7c 2003-09-30 devnull altexec(Alt *a, int spl)
426 76193d7c 2003-09-30 devnull volatile Alt *b;
427 76193d7c 2003-09-30 devnull int i, n, otherop;
428 76193d7c 2003-09-30 devnull Channel *c;
429 76193d7c 2003-09-30 devnull void *me, *waiter, *buf;
431 76193d7c 2003-09-30 devnull c = a->c;
433 76193d7c 2003-09-30 devnull /* rendezvous with others */
434 76193d7c 2003-09-30 devnull otherop = (CHANSND+CHANRCV) - a->op;
436 76193d7c 2003-09-30 devnull b = nil;
437 76193d7c 2003-09-30 devnull me = a->v;
438 76193d7c 2003-09-30 devnull for(i=0; i<c->nentry; i++)
439 76193d7c 2003-09-30 devnull if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
440 76193d7c 2003-09-30 devnull if(nrand(++n) == 0)
441 76193d7c 2003-09-30 devnull b = c->qentry[i];
442 76193d7c 2003-09-30 devnull if(b != nil){
443 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
444 76193d7c 2003-09-30 devnull waiter = b->v;
445 76193d7c 2003-09-30 devnull if(c->s && c->n){
447 76193d7c 2003-09-30 devnull * if buffer is full and there are waiters
448 76193d7c 2003-09-30 devnull * and we're meeting a waiter,
449 76193d7c 2003-09-30 devnull * we must be receiving.
451 76193d7c 2003-09-30 devnull * we use the value in the channel buffer,
452 76193d7c 2003-09-30 devnull * copy the waiter's value into the channel buffer
453 76193d7c 2003-09-30 devnull * on behalf of the waiter, and then wake the waiter.
455 76193d7c 2003-09-30 devnull if(a->op!=CHANRCV)
456 76193d7c 2003-09-30 devnull abort();
457 76193d7c 2003-09-30 devnull buf = altexecbuffered(a, 1);
458 76193d7c 2003-09-30 devnull altcopy(me, buf, c->e);
459 76193d7c 2003-09-30 devnull altcopy(buf, waiter, c->e);
461 76193d7c 2003-09-30 devnull if(a->op==CHANRCV)
462 76193d7c 2003-09-30 devnull altcopy(me, waiter, c->e);
464 76193d7c 2003-09-30 devnull altcopy(waiter, me, c->e);
466 76193d7c 2003-09-30 devnull *b->tag = c; /* commits us to rendezvous */
467 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "unlocking the chanlock");
468 76193d7c 2003-09-30 devnull unlock(&chanlock);
469 76193d7c 2003-09-30 devnull _procsplx(spl);
470 76193d7c 2003-09-30 devnull _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
471 76193d7c 2003-09-30 devnull while(_threadrendezvous((ulong)b->tag, 0) == ~0)
473 76193d7c 2003-09-30 devnull return 1;
476 76193d7c 2003-09-30 devnull buf = altexecbuffered(a, 0);
477 76193d7c 2003-09-30 devnull if(a->op==CHANRCV)
478 76193d7c 2003-09-30 devnull altcopy(me, buf, c->e);
480 76193d7c 2003-09-30 devnull altcopy(buf, me, c->e);
482 76193d7c 2003-09-30 devnull unlock(&chanlock);
483 76193d7c 2003-09-30 devnull _procsplx(spl);
484 76193d7c 2003-09-30 devnull return 1;