Blob


1 #include "threadimpl.h"
3 /*
4 * One can go through a lot of effort to avoid this global lock.
5 * You have to put locks in all the channels and all the Alt
6 * structures. At the beginning of an alt you have to lock all
7 * the channels, but then to try to actually exec an op you
8 * have to lock the other guy's alt structure, so that other
9 * people aren't trying to use him in some other op at the
10 * same time.
11 *
12 * For Plan 9 apps, it's just not worth the extra effort.
13 */
14 static QLock chanlock;
16 Channel*
17 chancreate(int elemsize, int bufsize)
18 {
19 Channel *c;
21 c = malloc(sizeof *c+bufsize*elemsize);
22 if(c == nil)
23 sysfatal("chancreate malloc: %r");
24 memset(c, 0, sizeof *c);
25 c->elemsize = elemsize;
26 c->bufsize = bufsize;
27 c->nbuf = 0;
28 c->buf = (uchar*)(c+1);
29 return c;
30 }
32 void
33 chansetname(Channel *c, char *fmt, ...)
34 {
35 char *name;
36 va_list arg;
38 va_start(arg, fmt);
39 name = vsmprint(fmt, arg);
40 va_end(arg);
41 free(c->name);
42 c->name = name;
43 }
45 /* bug - work out races */
46 void
47 chanfree(Channel *c)
48 {
49 if(c == nil)
50 return;
51 free(c->name);
52 free(c->arecv.a);
53 free(c->asend.a);
54 free(c);
55 }
57 static void
58 addarray(_Altarray *a, Alt *alt)
59 {
60 if(a->n == a->m){
61 a->m += 16;
62 a->a = realloc(a->a, a->m*sizeof a->a[0]);
63 }
64 a->a[a->n++] = alt;
65 }
67 static void
68 delarray(_Altarray *a, int i)
69 {
70 --a->n;
71 a->a[i] = a->a[a->n];
72 }
74 /*
75 * doesn't really work for things other than CHANSND and CHANRCV
76 * but is only used as arg to chanarray, which can handle it
77 */
78 #define otherop(op) (CHANSND+CHANRCV-(op))
80 static _Altarray*
81 chanarray(Channel *c, uint op)
82 {
83 switch(op){
84 default:
85 return nil;
86 case CHANSND:
87 return &c->asend;
88 case CHANRCV:
89 return &c->arecv;
90 }
91 }
93 static int
94 altcanexec(Alt *a)
95 {
96 _Altarray *ar;
97 Channel *c;
99 if(a->op == CHANNOP || (c=a->c) == nil)
100 return 0;
101 if(c->bufsize == 0){
102 ar = chanarray(c, otherop(a->op));
103 return ar && ar->n;
104 }else{
105 switch(a->op){
106 default:
107 return 0;
108 case CHANSND:
109 return c->nbuf < c->bufsize;
110 case CHANRCV:
111 return c->nbuf > 0;
116 static void
117 altqueue(Alt *a)
119 _Altarray *ar;
121 if(a->c == nil)
122 return;
123 ar = chanarray(a->c, a->op);
124 addarray(ar, a);
127 static void
128 altdequeue(Alt *a)
130 int i;
131 _Altarray *ar;
133 ar = chanarray(a->c, a->op);
134 if(ar == nil){
135 fprint(2, "bad use of altdequeue op=%d\n", a->op);
136 abort();
139 for(i=0; i<ar->n; i++)
140 if(ar->a[i] == a){
141 delarray(ar, i);
142 return;
144 fprint(2, "cannot find self in altdequeue\n");
145 abort();
148 static void
149 altalldequeue(Alt *a)
151 int i;
153 for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++)
154 if(a[i].op != CHANNOP)
155 altdequeue(&a[i]);
158 static void
159 amove(void *dst, void *src, uint n)
161 if(dst){
162 if(src == nil)
163 memset(dst, 0, n);
164 else
165 memmove(dst, src, n);
169 /*
170 * Actually move the data around. There are up to three
171 * players: the sender, the receiver, and the channel itself.
172 * If the channel is unbuffered or the buffer is empty,
173 * data goes from sender to receiver. If the channel is full,
174 * the receiver removes some from the channel and the sender
175 * gets to put some in.
176 */
177 static void
178 altcopy(Alt *s, Alt *r)
180 Alt *t;
181 Channel *c;
182 uchar *cp;
184 /*
185 * Work out who is sender and who is receiver
186 */
187 if(s == nil && r == nil)
188 return;
189 assert(s != nil);
190 c = s->c;
191 if(s->op == CHANRCV){
192 t = s;
193 s = r;
194 r = t;
196 assert(s==nil || s->op == CHANSND);
197 assert(r==nil || r->op == CHANRCV);
199 /*
200 * Channel is empty (or unbuffered) - copy directly.
201 */
202 if(s && r && c->nbuf == 0){
203 amove(r->v, s->v, c->elemsize);
204 return;
207 /*
208 * Otherwise it's always okay to receive and then send.
209 */
210 if(r){
211 cp = c->buf + c->off*c->elemsize;
212 amove(r->v, cp, c->elemsize);
213 --c->nbuf;
214 if(++c->off == c->bufsize)
215 c->off = 0;
217 if(s){
218 cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize;
219 amove(cp, s->v, c->elemsize);
220 ++c->nbuf;
224 static void
225 altexec(Alt *a)
227 int i;
228 _Altarray *ar;
229 Alt *other;
230 Channel *c;
232 c = a->c;
233 ar = chanarray(c, otherop(a->op));
234 if(ar && ar->n){
235 i = rand()%ar->n;
236 other = ar->a[i];
237 altcopy(a, other);
238 altalldequeue(other->thread->alt);
239 other->thread->alt = other;
240 _threadready(other->thread);
241 }else
242 altcopy(a, nil);
245 #define dbgalt 0
246 int
247 chanalt(Alt *a)
249 int i, j, ncan, n, canblock;
250 Channel *c;
251 _Thread *t;
253 needstack(512);
254 for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++)
256 n = i;
257 canblock = a[i].op == CHANEND;
259 t = proc()->thread;
260 for(i=0; i<n; i++)
261 a[i].thread = t;
262 t->alt = a;
263 qlock(&chanlock);
264 if(dbgalt) print("alt ");
265 ncan = 0;
266 for(i=0; i<n; i++){
267 c = a[i].c;
268 if(dbgalt) print(" %c:", "esrnb"[a[i].op]);
269 if(dbgalt) if(c->name) print("%s", c->name); else print("%p", c);
270 if(altcanexec(&a[i])){
271 if(dbgalt) print("*");
272 ncan++;
275 if(ncan){
276 j = rand()%ncan;
277 for(i=0; i<n; i++){
278 if(altcanexec(&a[i])){
279 if(j-- == 0){
280 if(dbgalt){
281 c = a[i].c;
282 print(" => %c:", "esrnb"[a[i].op]);
283 if(c->name) print("%s", c->name); else print("%p", c);
284 print("\n");
286 altexec(&a[i]);
287 qunlock(&chanlock);
288 return i;
293 if(dbgalt)print("\n");
295 if(!canblock){
296 qunlock(&chanlock);
297 return -1;
300 for(i=0; i<n; i++){
301 if(a[i].op != CHANNOP)
302 altqueue(&a[i]);
304 qunlock(&chanlock);
306 _threadswitch();
308 /*
309 * the guy who ran the op took care of dequeueing us
310 * and then set t->alt to the one that was executed.
311 */
312 if(t->alt < a || t->alt >= a+n)
313 sysfatal("channel bad alt");
314 return t->alt - a;
317 static int
318 _chanop(Channel *c, int op, void *p, int canblock)
320 Alt a[2];
322 a[0].c = c;
323 a[0].op = op;
324 a[0].v = p;
325 a[1].op = canblock ? CHANEND : CHANNOBLK;
326 if(chanalt(a) < 0)
327 return -1;
328 return 1;
331 int
332 chansend(Channel *c, void *v)
334 return _chanop(c, CHANSND, v, 1);
337 int
338 channbsend(Channel *c, void *v)
340 return _chanop(c, CHANSND, v, 0);
343 int
344 chanrecv(Channel *c, void *v)
346 return _chanop(c, CHANRCV, v, 1);
349 int
350 channbrecv(Channel *c, void *v)
352 return _chanop(c, CHANRCV, v, 0);
355 int
356 chansendp(Channel *c, void *v)
358 return _chanop(c, CHANSND, (void*)&v, 1);
361 void*
362 chanrecvp(Channel *c)
364 void *v;
366 if(_chanop(c, CHANRCV, (void*)&v, 1) > 0)
367 return v;
368 return nil;
371 int
372 channbsendp(Channel *c, void *v)
374 return _chanop(c, CHANSND, (void*)&v, 0);
377 void*
378 channbrecvp(Channel *c)
380 void *v;
382 if(_chanop(c, CHANRCV, (void*)&v, 0) > 0)
383 return v;
384 return nil;
387 int
388 chansendul(Channel *c, ulong val)
390 return _chanop(c, CHANSND, &val, 1);
393 ulong
394 chanrecvul(Channel *c)
396 ulong val;
398 if(_chanop(c, CHANRCV, &val, 1) > 0)
399 return val;
400 return 0;
403 int
404 channbsendul(Channel *c, ulong val)
406 return _chanop(c, CHANSND, &val, 0);
409 ulong
410 channbrecvul(Channel *c)
412 ulong val;
414 if(_chanop(c, CHANRCV, &val, 0) > 0)
415 return val;
416 return 0;