11 NMsizeInit = 8192+IOHDRSZ,
15 QLock alock; /* alloc */
23 QLock rlock; /* read */
32 u32int msize; /* immutable */
36 QLock alock; /* alloc */
54 assert(con->version == nil);
55 assert(con->mhead == nil);
56 assert(con->whead == nil);
57 assert(con->nfid == 0);
58 assert(con->state == ConMoribund);
71 con->cprev->cnext = con->cnext;
73 cbox.chead = con->cnext;
75 con->cnext->cprev = con->cprev;
77 cbox.ctail = con->cprev;
78 con->cprev = con->cnext = nil;
80 if(cbox.ncon > cbox.maxcon){
89 con->anext = cbox.ahead;
92 rwakeup(&cbox.arendez);
99 assert(m->rwnext == nil);
100 assert(m->flush == nil);
103 if(mbox.nmsg > mbox.maxmsg){
107 qunlock(&mbox.alock);
110 m->anext = mbox.ahead;
113 rwakeup(&mbox.arendez);
114 qunlock(&mbox.alock);
123 while(mbox.ahead == nil){
124 if(mbox.nmsg >= mbox.maxmsg){
126 rsleep(&mbox.arendez);
129 m = vtmallocz(sizeof(Msg));
130 m->data = vtmalloc(mbox.msize);
131 m->msize = mbox.msize;
137 mbox.ahead = m->anext;
139 qunlock(&mbox.alock);
156 m->mprev->mnext = m->mnext;
158 con->mhead = m->mnext;
160 m->mnext->mprev = m->mprev;
162 con->mtail = m->mprev;
163 m->mprev = m->mnext = nil;
175 fprint(2, "msgFlush %F\n", &m->t);
178 * If this Tflush has been flushed, nothing to do.
179 * Look for the message to be flushed in the
180 * queue of all messages still on this connection.
181 * If it's not found must assume Elvis has already
182 * left the building and reply normally.
185 if(m->state == MsgF){
186 qunlock(&con->mlock);
189 for(old = con->mhead; old != nil; old = old->mnext)
190 if(old->t.tag == m->t.oldtag)
194 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
195 qunlock(&con->mlock);
200 fprint(2, "\tmsgFlush found %F\n", &old->t);
204 * There are two cases where the old message can be
205 * truly flushed and no reply to the original message given.
206 * The first is when the old message is in MsgR state; no
207 * processing has been done yet and it is still on the read
208 * queue. The second is if old is a Tflush, which doesn't
209 * affect the server state. In both cases, put the old
210 * message into MsgF state and let MsgWrite toss it after
211 * pulling it off the queue.
213 if(old->state == MsgR || old->t.type == Tflush){
216 fprint(2, "msgFlush: change %d from MsgR to MsgF\n",
221 * Link this flush message and the old message
222 * so multiple flushes can be coalesced (if there are
223 * multiple Tflush messages for a particular pending
224 * request, it is only necessary to respond to the last
225 * one, so any previous can be removed) and to be
226 * sure flushes wait for their corresponding old
227 * message to go out first.
228 * Waiting flush messages do not go on the write queue,
229 * they are processed after the old message is dealt
230 * with. There's no real need to protect the setting of
231 * Msg.nowq, the only code to check it runs in this
232 * process after this routine returns.
234 if((flush = old->flush) != nil){
236 fprint(2, "msgFlush: remove %d from %d list\n",
237 old->flush->t.tag, old->t.tag);
238 m->flush = flush->flush;
247 fprint(2, "msgFlush: add %d to %d queue\n",
248 m->t.tag, old->t.tag);
249 qunlock(&con->mlock);
260 threadsetname("msgProc");
264 * If surplus to requirements, exit.
265 * If not, wait for and pull a message off
269 if(mbox.nproc > mbox.maxproc){
271 qunlock(&mbox.rlock);
274 while(mbox.rhead == nil)
275 rsleep(&mbox.rrendez);
277 mbox.rhead = m->rwnext;
279 qunlock(&mbox.rlock);
285 * If the message has been flushed before
286 * any 9P processing has started, mark it so
287 * none will be attempted.
291 strcpy(e, "flushed");
294 qunlock(&con->mlock);
301 if(m->t.type == Tversion){
303 con->state = ConDown;
304 while(con->mhead != m)
305 rsleep(&con->rendez);
306 assert(con->state == ConDown);
307 if(con->version == m){
309 con->state = ConInit;
312 strcpy(e, "Tversion aborted");
314 else if(con->state != ConUp)
315 strcpy(e, "connection not ready");
320 * Dispatch if not error already.
323 if(*e == 0 && !(*rFcall[m->t.type])(m))
324 rerrstr(e, sizeof e);
330 m->r.type = m->t.type+1;
333 * Put the message (with reply) on the
334 * write queue and wakeup the write process.
338 if(con->whead == nil)
341 con->wtail->rwnext = m;
343 rwakeup(&con->wrendez);
344 qunlock(&con->wlock);
356 threadsetname("msgRead");
365 while((n = read9pmsg(fd, m->data, con->msize)) == 0)
368 m->t.type = Tversion;
371 m->t.msize = con->msize;
372 m->t.version = "9PEoF";
375 else if(convM2S(m->data, n, &m->t) != n){
377 fprint(2, "msgRead: convM2S error: %s\n",
383 fprint(2, "msgRead %p: t %F\n", con, &m->t);
386 if(con->mtail != nil){
387 m->mprev = con->mtail;
388 con->mtail->mnext = m;
395 qunlock(&con->mlock);
398 if(mbox.rhead == nil){
400 if(!rwakeup(&mbox.rrendez)){
401 if(mbox.nproc < mbox.maxproc){
402 if(proccreate(msgProc, nil, STACK) > 0)
409 * don't need this surely?
410 rwakeup(&mbox.rrendez);
414 mbox.rtail->rwnext = m;
416 qunlock(&mbox.rlock);
427 threadsetname("msgWrite");
430 if(proccreate(msgRead, con, STACK) < 0){
437 * Wait for and pull a message off the write queue.
440 while(con->whead == nil)
441 rsleep(&con->wrendez);
443 con->whead = m->rwnext;
446 qunlock(&con->wlock);
451 * Write each message (if it hasn't been flushed)
452 * followed by any messages waiting for it to complete.
459 fprint(2, "msgWrite %d: r %F\n",
462 if(m->state != MsgF){
464 qunlock(&con->mlock);
466 n = convS2M(&m->r, con->data, con->msize);
467 if(write(con->fd, con->data, n) != n)
473 if((flush = m->flush) != nil){
480 qunlock(&con->mlock);
483 if(eof && con->fd >= 0){
487 if(con->state == ConDown)
488 rwakeup(&con->rendez);
489 if(con->state == ConMoribund && con->mhead == nil){
499 conAlloc(int fd, char* name, int flags)
506 while(cbox.ahead == nil){
507 if(cbox.ncon >= cbox.maxcon){
509 rsleep(&cbox.arendez);
512 con = vtmallocz(sizeof(Con));
513 con->rendez.l = &con->lock;
514 con->data = vtmalloc(cbox.msize);
515 con->msize = cbox.msize;
516 con->mrendez.l = &con->mlock;
517 con->wrendez.l = &con->wlock;
524 cbox.ahead = con->anext;
527 if(cbox.ctail != nil){
528 con->cprev = cbox.ctail;
529 cbox.ctail->cnext = con;
537 assert(con->mhead == nil);
538 assert(con->whead == nil);
539 assert(con->fhead == nil);
540 assert(con->nfid == 0);
544 if(con->name != nil){
549 con->name = vtstrdup(name);
551 con->name = vtstrdup("unknown");
553 snprint(buf, sizeof buf, "%s/remote", con->name);
554 if((rfd = open(buf, OREAD)) >= 0){
555 n = read(rfd, buf, sizeof buf-1);
559 if((p = strchr(buf, '\n')) != nil)
561 strecpy(con->remote, con->remote+sizeof con->remote, buf);
566 qunlock(&cbox.alock);
568 if(proccreate(msgWrite, con, STACK) < 0){
577 cmdMsg(int argc, char* argv[])
580 char *usage = "usage: msg [-m nmsg] [-p nproc]";
581 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
583 maxmsg = maxproc = 0;
587 return cliError(usage);
591 return cliError(usage);
592 maxmsg = strtol(argv[0], &p, 0);
593 if(maxmsg <= 0 || p == argv[0] || *p != '\0')
594 return cliError(usage);
599 return cliError(usage);
600 maxproc = strtol(argv[0], &p, 0);
601 if(maxproc <= 0 || p == argv[0] || *p != '\0')
602 return cliError(usage);
606 return cliError(usage);
610 mbox.maxmsg = maxmsg;
611 maxmsg = mbox.maxmsg;
613 nmsgstarve = mbox.nmsgstarve;
614 qunlock(&mbox.alock);
618 mbox.maxproc = maxproc;
619 maxproc = mbox.maxproc;
621 nprocstarve = mbox.nprocstarve;
622 qunlock(&mbox.rlock);
624 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
625 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
626 nmsg, nmsgstarve, nproc, nprocstarve);
638 return strcmp(a->uname, b->uname);
642 fidMerge(Fid *a, Fid *b)
676 if(delay) /* easy way to handle 2-element list */
690 return fidMerge(a, b);
694 cmdWho(int argc, char* argv[])
696 char *usage = "usage: who";
703 return cliError(usage);
707 return cliError(usage);
712 for(con=cbox.chead; con; con=con->cnext){
713 if((l = strlen(con->name)) > l1)
715 if((l = strlen(con->remote)) > l2)
718 for(con=cbox.chead; con; con=con->cnext){
719 consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote);
720 qlock(&con->fidlock);
722 for(i=0; i<NFidHash; i++)
723 for(fid=con->fidhash[i]; fid; fid=fid->hash)
724 if(fid->fidno != NOFID && fid->uname){
728 fid = fidMergeSort(last);
730 for(; fid; last=fid, fid=fid->sort)
731 if(last==nil || strcmp(fid->uname, last->uname) != 0)
732 consPrint(" %q", fid->uname);
733 qunlock(&con->fidlock);
736 runlock(&cbox.clock);
743 mbox.arendez.l = &mbox.alock;
745 mbox.rrendez.l = &mbox.rlock;
747 mbox.maxmsg = NMsgInit;
748 mbox.maxproc = NMsgProcInit;
749 mbox.msize = NMsizeInit;
751 cliAddCmd("msg", cmdMsg);
755 cmdCon(int argc, char* argv[])
759 char *usage = "usage: con [-m ncon]";
760 int maxcon, ncon, nconstarve;
766 return cliError(usage);
770 return cliError(usage);
771 maxcon = strtol(argv[0], &p, 0);
772 if(maxcon <= 0 || p == argv[0] || *p != '\0')
773 return cliError(usage);
777 return cliError(usage);
781 cbox.maxcon = maxcon;
782 maxcon = cbox.maxcon;
784 nconstarve = cbox.nconstarve;
785 wunlock(&cbox.clock);
787 consPrint("\tcon -m %d\n", maxcon);
788 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
791 for(con = cbox.chead; con != nil; con = con->cnext){
792 consPrint("\t%s\n", con->name);
794 runlock(&cbox.clock);
802 cbox.arendez.l = &cbox.alock;
804 cbox.maxcon = NConInit;
805 cbox.msize = NMsizeInit;
807 cliAddCmd("con", cmdCon);
808 cliAddCmd("who", cmdWho);