Blob


1 #include "stdinc.h"
3 #include "9.h"
4 #include "dat.h"
5 #include "fns.h"
7 enum {
8 NConInit = 128,
9 NMsgInit = 384,
10 NMsgProcInit = 64,
11 NMsizeInit = 8192+IOHDRSZ,
12 };
14 static struct {
15 QLock alock; /* alloc */
16 Msg* ahead;
17 Rendez arendez;
19 int maxmsg;
20 int nmsg;
21 int nmsgstarve;
23 QLock rlock; /* read */
24 Msg* rhead;
25 Msg* rtail;
26 Rendez rrendez;
28 int maxproc;
29 int nproc;
30 int nprocstarve;
32 u32int msize; /* immutable */
33 } mbox;
35 static struct {
36 QLock alock; /* alloc */
37 Con* ahead;
38 Rendez arendez;
40 RWLock clock;
41 Con* chead;
42 Con* ctail;
44 int maxcon;
45 int ncon;
46 int nconstarve;
48 u32int msize;
49 } cbox;
51 static void
52 conFree(Con* con)
53 {
54 assert(con->version == nil);
55 assert(con->mhead == nil);
56 assert(con->whead == nil);
57 assert(con->nfid == 0);
58 assert(con->state == ConMoribund);
60 if(con->fd >= 0){
61 close(con->fd);
62 con->fd = -1;
63 }
64 con->state = ConDead;
65 con->aok = 0;
66 con->flags = 0;
67 con->isconsole = 0;
69 qlock(&cbox.alock);
70 if(con->cprev != nil)
71 con->cprev->cnext = con->cnext;
72 else
73 cbox.chead = con->cnext;
74 if(con->cnext != nil)
75 con->cnext->cprev = con->cprev;
76 else
77 cbox.ctail = con->cprev;
78 con->cprev = con->cnext = nil;
80 if(cbox.ncon > cbox.maxcon){
81 if(con->name != nil)
82 vtfree(con->name);
83 vtfree(con->data);
84 vtfree(con);
85 cbox.ncon--;
86 qunlock(&cbox.alock);
87 return;
88 }
89 con->anext = cbox.ahead;
90 cbox.ahead = con;
91 if(con->anext == nil)
92 rwakeup(&cbox.arendez);
93 qunlock(&cbox.alock);
94 }
96 static void
97 msgFree(Msg* m)
98 {
99 assert(m->rwnext == nil);
100 assert(m->flush == nil);
102 qlock(&mbox.alock);
103 if(mbox.nmsg > mbox.maxmsg){
104 vtfree(m->data);
105 vtfree(m);
106 mbox.nmsg--;
107 qunlock(&mbox.alock);
108 return;
110 m->anext = mbox.ahead;
111 mbox.ahead = m;
112 if(m->anext == nil)
113 rwakeup(&mbox.arendez);
114 qunlock(&mbox.alock);
117 static Msg*
118 msgAlloc(Con* con)
120 Msg *m;
122 qlock(&mbox.alock);
123 while(mbox.ahead == nil){
124 if(mbox.nmsg >= mbox.maxmsg){
125 mbox.nmsgstarve++;
126 rsleep(&mbox.arendez);
127 continue;
129 m = vtmallocz(sizeof(Msg));
130 m->data = vtmalloc(mbox.msize);
131 m->msize = mbox.msize;
132 mbox.nmsg++;
133 mbox.ahead = m;
134 break;
136 m = mbox.ahead;
137 mbox.ahead = m->anext;
138 m->anext = nil;
139 qunlock(&mbox.alock);
141 m->con = con;
142 m->state = MsgR;
143 m->nowq = 0;
145 return m;
148 static void
149 msgMunlink(Msg* m)
151 Con *con;
153 con = m->con;
155 if(m->mprev != nil)
156 m->mprev->mnext = m->mnext;
157 else
158 con->mhead = m->mnext;
159 if(m->mnext != nil)
160 m->mnext->mprev = m->mprev;
161 else
162 con->mtail = m->mprev;
163 m->mprev = m->mnext = nil;
166 void
167 msgFlush(Msg* m)
169 Con *con;
170 Msg *flush, *old;
172 con = m->con;
174 if(Dflag)
175 fprint(2, "msgFlush %F\n", &m->t);
177 /*
178 * If this Tflush has been flushed, nothing to do.
179 * Look for the message to be flushed in the
180 * queue of all messages still on this connection.
181 * If it's not found must assume Elvis has already
182 * left the building and reply normally.
183 */
184 qlock(&con->mlock);
185 if(m->state == MsgF){
186 qunlock(&con->mlock);
187 return;
189 for(old = con->mhead; old != nil; old = old->mnext)
190 if(old->t.tag == m->t.oldtag)
191 break;
192 if(old == nil){
193 if(Dflag)
194 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
195 qunlock(&con->mlock);
196 return;
199 if(Dflag)
200 fprint(2, "\tmsgFlush found %F\n", &old->t);
202 /*
203 * Found it.
204 * There are two cases where the old message can be
205 * truly flushed and no reply to the original message given.
206 * The first is when the old message is in MsgR state; no
207 * processing has been done yet and it is still on the read
208 * queue. The second is if old is a Tflush, which doesn't
209 * affect the server state. In both cases, put the old
210 * message into MsgF state and let MsgWrite toss it after
211 * pulling it off the queue.
212 */
213 if(old->state == MsgR || old->t.type == Tflush){
214 old->state = MsgF;
215 if(Dflag)
216 fprint(2, "msgFlush: change %d from MsgR to MsgF\n",
217 m->t.oldtag);
220 /*
221 * Link this flush message and the old message
222 * so multiple flushes can be coalesced (if there are
223 * multiple Tflush messages for a particular pending
224 * request, it is only necessary to respond to the last
225 * one, so any previous can be removed) and to be
226 * sure flushes wait for their corresponding old
227 * message to go out first.
228 * Waiting flush messages do not go on the write queue,
229 * they are processed after the old message is dealt
230 * with. There's no real need to protect the setting of
231 * Msg.nowq, the only code to check it runs in this
232 * process after this routine returns.
233 */
234 if((flush = old->flush) != nil){
235 if(Dflag)
236 fprint(2, "msgFlush: remove %d from %d list\n",
237 old->flush->t.tag, old->t.tag);
238 m->flush = flush->flush;
239 flush->flush = nil;
240 msgMunlink(flush);
241 msgFree(flush);
243 old->flush = m;
244 m->nowq = 1;
246 if(Dflag)
247 fprint(2, "msgFlush: add %d to %d queue\n",
248 m->t.tag, old->t.tag);
249 qunlock(&con->mlock);
252 static void
253 msgProc(void* v)
255 Msg *m;
256 char e[ERRMAX];
257 Con *con;
259 USED(v);
260 threadsetname("msgProc");
262 for(;;){
263 /*
264 * If surplus to requirements, exit.
265 * If not, wait for and pull a message off
266 * the read queue.
267 */
268 qlock(&mbox.rlock);
269 if(mbox.nproc > mbox.maxproc){
270 mbox.nproc--;
271 qunlock(&mbox.rlock);
272 break;
274 while(mbox.rhead == nil)
275 rsleep(&mbox.rrendez);
276 m = mbox.rhead;
277 mbox.rhead = m->rwnext;
278 m->rwnext = nil;
279 qunlock(&mbox.rlock);
281 con = m->con;
282 *e = 0;
284 /*
285 * If the message has been flushed before
286 * any 9P processing has started, mark it so
287 * none will be attempted.
288 */
289 qlock(&con->mlock);
290 if(m->state == MsgF)
291 strcpy(e, "flushed");
292 else
293 m->state = Msg9;
294 qunlock(&con->mlock);
296 if(*e == 0){
297 /*
298 * explain this
299 */
300 qlock(&con->lock);
301 if(m->t.type == Tversion){
302 con->version = m;
303 con->state = ConDown;
304 while(con->mhead != m)
305 rsleep(&con->rendez);
306 assert(con->state == ConDown);
307 if(con->version == m){
308 con->version = nil;
309 con->state = ConInit;
311 else
312 strcpy(e, "Tversion aborted");
314 else if(con->state != ConUp)
315 strcpy(e, "connection not ready");
316 qunlock(&con->lock);
319 /*
320 * Dispatch if not error already.
321 */
322 m->r.tag = m->t.tag;
323 if(*e == 0 && !(*rFcall[m->t.type])(m))
324 rerrstr(e, sizeof e);
325 if(*e != 0){
326 m->r.type = Rerror;
327 m->r.ename = e;
329 else
330 m->r.type = m->t.type+1;
332 /*
333 * Put the message (with reply) on the
334 * write queue and wakeup the write process.
335 */
336 if(!m->nowq){
337 qlock(&con->wlock);
338 if(con->whead == nil)
339 con->whead = m;
340 else
341 con->wtail->rwnext = m;
342 con->wtail = m;
343 rwakeup(&con->wrendez);
344 qunlock(&con->wlock);
349 static void
350 msgRead(void* v)
352 Msg *m;
353 Con *con;
354 int eof, fd, n;
356 threadsetname("msgRead");
358 con = v;
359 fd = con->fd;
360 eof = 0;
362 while(!eof){
363 m = msgAlloc(con);
365 while((n = read9pmsg(fd, m->data, con->msize)) == 0)
367 if(n < 0){
368 m->t.type = Tversion;
369 m->t.fid = NOFID;
370 m->t.tag = NOTAG;
371 m->t.msize = con->msize;
372 m->t.version = "9PEoF";
373 eof = 1;
375 else if(convM2S(m->data, n, &m->t) != n){
376 if(Dflag)
377 fprint(2, "msgRead: convM2S error: %s\n",
378 con->name);
379 msgFree(m);
380 continue;
382 if(Dflag)
383 fprint(2, "msgRead %p: t %F\n", con, &m->t);
385 qlock(&con->mlock);
386 if(con->mtail != nil){
387 m->mprev = con->mtail;
388 con->mtail->mnext = m;
390 else{
391 con->mhead = m;
392 m->mprev = nil;
394 con->mtail = m;
395 qunlock(&con->mlock);
397 qlock(&mbox.rlock);
398 if(mbox.rhead == nil){
399 mbox.rhead = m;
400 if(!rwakeup(&mbox.rrendez)){
401 if(mbox.nproc < mbox.maxproc){
402 if(proccreate(msgProc, nil, STACK) > 0)
403 mbox.nproc++;
405 else
406 mbox.nprocstarve++;
408 /*
409 * don't need this surely?
410 rwakeup(&mbox.rrendez);
411 */
413 else
414 mbox.rtail->rwnext = m;
415 mbox.rtail = m;
416 qunlock(&mbox.rlock);
420 static void
421 msgWrite(void* v)
423 Con *con;
424 int eof, n;
425 Msg *flush, *m;
427 threadsetname("msgWrite");
429 con = v;
430 if(proccreate(msgRead, con, STACK) < 0){
431 conFree(con);
432 return;
435 for(;;){
436 /*
437 * Wait for and pull a message off the write queue.
438 */
439 qlock(&con->wlock);
440 while(con->whead == nil)
441 rsleep(&con->wrendez);
442 m = con->whead;
443 con->whead = m->rwnext;
444 m->rwnext = nil;
445 assert(!m->nowq);
446 qunlock(&con->wlock);
448 eof = 0;
450 /*
451 * Write each message (if it hasn't been flushed)
452 * followed by any messages waiting for it to complete.
453 */
454 qlock(&con->mlock);
455 while(m != nil){
456 msgMunlink(m);
458 if(Dflag)
459 fprint(2, "msgWrite %d: r %F\n",
460 m->state, &m->r);
462 if(m->state != MsgF){
463 m->state = MsgW;
464 qunlock(&con->mlock);
466 n = convS2M(&m->r, con->data, con->msize);
467 if(write(con->fd, con->data, n) != n)
468 eof = 1;
470 qlock(&con->mlock);
473 if((flush = m->flush) != nil){
474 assert(flush->nowq);
475 m->flush = nil;
477 msgFree(m);
478 m = flush;
480 qunlock(&con->mlock);
482 qlock(&con->lock);
483 if(eof && con->fd >= 0){
484 close(con->fd);
485 con->fd = -1;
487 if(con->state == ConDown)
488 rwakeup(&con->rendez);
489 if(con->state == ConMoribund && con->mhead == nil){
490 qunlock(&con->lock);
491 conFree(con);
492 break;
494 qunlock(&con->lock);
498 Con*
499 conAlloc(int fd, char* name, int flags)
501 Con *con;
502 char buf[128], *p;
503 int rfd, n;
505 qlock(&cbox.alock);
506 while(cbox.ahead == nil){
507 if(cbox.ncon >= cbox.maxcon){
508 cbox.nconstarve++;
509 rsleep(&cbox.arendez);
510 continue;
512 con = vtmallocz(sizeof(Con));
513 con->rendez.l = &con->lock;
514 con->data = vtmalloc(cbox.msize);
515 con->msize = cbox.msize;
516 con->mrendez.l = &con->mlock;
517 con->wrendez.l = &con->wlock;
519 cbox.ncon++;
520 cbox.ahead = con;
521 break;
523 con = cbox.ahead;
524 cbox.ahead = con->anext;
525 con->anext = nil;
527 if(cbox.ctail != nil){
528 con->cprev = cbox.ctail;
529 cbox.ctail->cnext = con;
531 else{
532 cbox.chead = con;
533 con->cprev = nil;
535 cbox.ctail = con;
537 assert(con->mhead == nil);
538 assert(con->whead == nil);
539 assert(con->fhead == nil);
540 assert(con->nfid == 0);
542 con->state = ConNew;
543 con->fd = fd;
544 if(con->name != nil){
545 vtfree(con->name);
546 con->name = nil;
548 if(name != nil)
549 con->name = vtstrdup(name);
550 else
551 con->name = vtstrdup("unknown");
552 con->remote[0] = 0;
553 snprint(buf, sizeof buf, "%s/remote", con->name);
554 if((rfd = open(buf, OREAD)) >= 0){
555 n = read(rfd, buf, sizeof buf-1);
556 close(rfd);
557 if(n > 0){
558 buf[n] = 0;
559 if((p = strchr(buf, '\n')) != nil)
560 *p = 0;
561 strecpy(con->remote, con->remote+sizeof con->remote, buf);
564 con->flags = flags;
565 con->isconsole = 0;
566 qunlock(&cbox.alock);
568 if(proccreate(msgWrite, con, STACK) < 0){
569 conFree(con);
570 return nil;
573 return con;
576 static int
577 cmdMsg(int argc, char* argv[])
579 char *p;
580 char *usage = "usage: msg [-m nmsg] [-p nproc]";
581 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
583 maxmsg = maxproc = 0;
585 ARGBEGIN{
586 default:
587 return cliError(usage);
588 case 'm':
589 p = ARGF();
590 if(p == nil)
591 return cliError(usage);
592 maxmsg = strtol(argv[0], &p, 0);
593 if(maxmsg <= 0 || p == argv[0] || *p != '\0')
594 return cliError(usage);
595 break;
596 case 'p':
597 p = ARGF();
598 if(p == nil)
599 return cliError(usage);
600 maxproc = strtol(argv[0], &p, 0);
601 if(maxproc <= 0 || p == argv[0] || *p != '\0')
602 return cliError(usage);
603 break;
604 }ARGEND
605 if(argc)
606 return cliError(usage);
608 qlock(&mbox.alock);
609 if(maxmsg)
610 mbox.maxmsg = maxmsg;
611 maxmsg = mbox.maxmsg;
612 nmsg = mbox.nmsg;
613 nmsgstarve = mbox.nmsgstarve;
614 qunlock(&mbox.alock);
616 qlock(&mbox.rlock);
617 if(maxproc)
618 mbox.maxproc = maxproc;
619 maxproc = mbox.maxproc;
620 nproc = mbox.nproc;
621 nprocstarve = mbox.nprocstarve;
622 qunlock(&mbox.rlock);
624 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
625 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
626 nmsg, nmsgstarve, nproc, nprocstarve);
628 return 1;
631 static int
632 scmp(Fid *a, Fid *b)
634 if(a == 0)
635 return 1;
636 if(b == 0)
637 return -1;
638 return strcmp(a->uname, b->uname);
641 static Fid*
642 fidMerge(Fid *a, Fid *b)
644 Fid *s, **l;
646 l = &s;
647 while(a || b){
648 if(scmp(a, b) < 0){
649 *l = a;
650 l = &a->sort;
651 a = a->sort;
652 }else{
653 *l = b;
654 l = &b->sort;
655 b = b->sort;
658 *l = 0;
659 return s;
662 static Fid*
663 fidMergeSort(Fid *f)
665 int delay;
666 Fid *a, *b;
668 if(f == nil)
669 return nil;
670 if(f->sort == nil)
671 return f;
673 a = b = f;
674 delay = 1;
675 while(a && b){
676 if(delay) /* easy way to handle 2-element list */
677 delay = 0;
678 else
679 a = a->sort;
680 if(b = b->sort)
681 b = b->sort;
684 b = a->sort;
685 a->sort = nil;
687 a = fidMergeSort(f);
688 b = fidMergeSort(b);
690 return fidMerge(a, b);
693 static int
694 cmdWho(int argc, char* argv[])
696 char *usage = "usage: who";
697 int i, l1, l2, l;
698 Con *con;
699 Fid *fid, *last;
701 ARGBEGIN{
702 default:
703 return cliError(usage);
704 }ARGEND
706 if(argc > 0)
707 return cliError(usage);
709 rlock(&cbox.clock);
710 l1 = 0;
711 l2 = 0;
712 for(con=cbox.chead; con; con=con->cnext){
713 if((l = strlen(con->name)) > l1)
714 l1 = l;
715 if((l = strlen(con->remote)) > l2)
716 l2 = l;
718 for(con=cbox.chead; con; con=con->cnext){
719 consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote);
720 qlock(&con->fidlock);
721 last = nil;
722 for(i=0; i<NFidHash; i++)
723 for(fid=con->fidhash[i]; fid; fid=fid->hash)
724 if(fid->fidno != NOFID && fid->uname){
725 fid->sort = last;
726 last = fid;
728 fid = fidMergeSort(last);
729 last = nil;
730 for(; fid; last=fid, fid=fid->sort)
731 if(last==nil || strcmp(fid->uname, last->uname) != 0)
732 consPrint(" %q", fid->uname);
733 qunlock(&con->fidlock);
734 consPrint("\n");
736 runlock(&cbox.clock);
737 return 1;
740 void
741 msgInit(void)
743 mbox.arendez.l = &mbox.alock;
745 mbox.rrendez.l = &mbox.rlock;
747 mbox.maxmsg = NMsgInit;
748 mbox.maxproc = NMsgProcInit;
749 mbox.msize = NMsizeInit;
751 cliAddCmd("msg", cmdMsg);
754 static int
755 cmdCon(int argc, char* argv[])
757 char *p;
758 Con *con;
759 char *usage = "usage: con [-m ncon]";
760 int maxcon, ncon, nconstarve;
762 maxcon = 0;
764 ARGBEGIN{
765 default:
766 return cliError(usage);
767 case 'm':
768 p = ARGF();
769 if(p == nil)
770 return cliError(usage);
771 maxcon = strtol(argv[0], &p, 0);
772 if(maxcon <= 0 || p == argv[0] || *p != '\0')
773 return cliError(usage);
774 break;
775 }ARGEND
776 if(argc)
777 return cliError(usage);
779 wlock(&cbox.clock);
780 if(maxcon)
781 cbox.maxcon = maxcon;
782 maxcon = cbox.maxcon;
783 ncon = cbox.ncon;
784 nconstarve = cbox.nconstarve;
785 wunlock(&cbox.clock);
787 consPrint("\tcon -m %d\n", maxcon);
788 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
790 rlock(&cbox.clock);
791 for(con = cbox.chead; con != nil; con = con->cnext){
792 consPrint("\t%s\n", con->name);
794 runlock(&cbox.clock);
796 return 1;
799 void
800 conInit(void)
802 cbox.arendez.l = &cbox.alock;
804 cbox.maxcon = NConInit;
805 cbox.msize = NMsizeInit;
807 cliAddCmd("con", cmdCon);
808 cliAddCmd("who", cmdWho);