Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 enum
8 {
9 STACK = 32768,
10 NHASH = 31,
11 MAXMSG = 64, /* per connection */
12 };
14 typedef struct Hash Hash;
15 typedef struct Fid Fid;
16 typedef struct Msg Msg;
17 typedef struct Conn Conn;
18 typedef struct Queue Queue;
20 struct Hash
21 {
22 Hash *next;
23 uint n;
24 void *v;
25 };
27 struct Fid
28 {
29 int fid;
30 int ref;
31 int cfid;
32 int openfd;
33 Fid *next;
34 };
36 struct Msg
37 {
38 Conn *c;
39 int internal;
40 int ref;
41 int ctag;
42 int tag;
43 int isopenfd;
44 Fcall tx;
45 Fcall rx;
46 Fid *fid;
47 Fid *newfid;
48 Fid *afid;
49 Msg *oldm;
50 Msg *next;
51 uchar *tpkt;
52 uchar *rpkt;
53 };
55 struct Conn
56 {
57 int fd;
58 int fdmode;
59 Fid *fdfid;
60 int nmsg;
61 int nfid;
62 Channel *inc;
63 Channel *internal;
64 int inputstalled;
65 char dir[40];
66 Hash *tag[NHASH];
67 Hash *fid[NHASH];
68 Queue *outq;
69 Queue *inq;
70 };
72 char *xaname;
73 char *addr;
74 int afd;
75 char adir[40];
76 int isunix;
77 Queue *outq;
78 Queue *inq;
79 int verbose = 0;
80 int logging = 0;
81 int msize = 8192;
82 int xafid = NOFID;
83 int attached;
84 int versioned;
86 void *gethash(Hash**, uint);
87 int puthash(Hash**, uint, void*);
88 int delhash(Hash**, uint, void*);
89 Msg *mread9p(Ioproc*, int);
90 int mwrite9p(Ioproc*, int, uchar*);
91 uchar *read9ppkt(Ioproc*, int);
92 int write9ppkt(int, uchar*);
93 Msg *msgnew(int);
94 void msgput(Msg*);
95 void msgclear(Msg*);
96 Msg *msgget(int);
97 void msgincref(Msg*);
98 Fid *fidnew(int);
99 void fidput(Fid*);
100 void *emalloc(int);
101 void *erealloc(void*, int);
102 Queue *qalloc(void);
103 int sendq(Queue*, void*);
104 void *recvq(Queue*);
105 void connthread(void*);
106 void connoutthread(void*);
107 void listenthread(void*);
108 void outputthread(void*);
109 void inputthread(void*);
110 void rewritehdr(Fcall*, uchar*);
111 void repack(Fcall*, uchar**);
112 int tlisten(char*, char*);
113 int taccept(int, char*);
114 int iolisten(Ioproc*, char*, char*);
115 int ioaccept(Ioproc*, int, char*);
116 int iorecvfd(Ioproc*, int);
117 int iosendfd(Ioproc*, int, int);
118 void mainproc(void*);
119 int ignorepipe(void*, char*);
120 int timefmt(Fmt*);
121 void dorootstat(void);
123 void
124 usage(void)
126 fprint(2, "usage: 9pserve [-lv] [-A aname afid] [-M msize] address\n");
127 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
128 threadexitsall("usage");
131 uchar vbuf[128];
132 extern int _threaddebuglevel;
133 void
134 threadmain(int argc, char **argv)
136 char *file, *x;
137 int fd;
139 x = getenv("verbose9pserve");
140 if(x){
141 verbose = atoi(x);
142 fprint(2, "verbose9pserve %s => %d\n", x, verbose);
144 ARGBEGIN{
145 default:
146 usage();
147 case 'A':
148 attached = 1;
149 xaname = EARGF(usage());
150 xafid = atoi(EARGF(usage()));
151 break;
152 case 'M':
153 versioned = 1;
154 msize = atoi(EARGF(usage()));
155 break;
156 case 'v':
157 verbose++;
158 break;
159 case 'u':
160 isunix++;
161 break;
162 case 'l':
163 logging++;
164 break;
165 }ARGEND
167 if(attached && !versioned){
168 fprint(2, "-A must be used with -M\n");
169 usage();
172 if(argc != 1)
173 usage();
174 addr = argv[0];
176 fmtinstall('T', timefmt);
178 if((afd = announce(addr, adir)) < 0)
179 sysfatal("announce %s: %r", addr);
180 if(logging){
181 if(strncmp(addr, "unix!", 5) == 0)
182 addr += 5;
183 file = smprint("%s.log", addr);
184 if(file == nil)
185 sysfatal("smprint log: %r");
186 if((fd = create(file, OWRITE, 0666)) < 0)
187 sysfatal("create %s: %r", file);
188 dup(fd, 2);
189 if(fd > 2)
190 close(fd);
192 if(verbose) fprint(2, "%T 9pserve running\n");
193 proccreate(mainproc, nil, STACK);
196 void
197 mainproc(void *v)
199 int n, nn;
200 Fcall f;
201 USED(v);
203 atnotify(ignorepipe, 1);
204 fmtinstall('D', dirfmt);
205 fmtinstall('M', dirmodefmt);
206 fmtinstall('F', fcallfmt);
207 fmtinstall('H', encodefmt);
209 outq = qalloc();
210 inq = qalloc();
212 if(!versioned){
213 f.type = Tversion;
214 f.version = "9P2000";
215 f.msize = msize;
216 f.tag = NOTAG;
217 n = convS2M(&f, vbuf, sizeof vbuf);
218 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
219 nn = write(1, vbuf, n);
220 if(n != nn)
221 sysfatal("error writing Tversion: %r\n");
222 n = read9pmsg(0, vbuf, sizeof vbuf);
223 if(convM2S(vbuf, n, &f) != n)
224 sysfatal("convM2S failure");
225 if(f.msize < msize)
226 msize = f.msize;
227 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
230 threadcreate(inputthread, nil, STACK);
231 threadcreate(outputthread, nil, STACK);
233 // if(rootfid)
234 // dorootstat();
236 threadcreate(listenthread, nil, STACK);
237 threadexits(0);
240 int
241 ignorepipe(void *v, char *s)
243 USED(v);
244 if(strcmp(s, "sys: write on closed pipe") == 0)
245 return 1;
246 if(strcmp(s, "sys: tstp") == 0)
247 return 1;
248 fprint(2, "9pserve %s: %T note: %s\n", addr, s);
249 return 0;
252 void
253 listenthread(void *arg)
255 Conn *c;
256 Ioproc *io;
258 io = ioproc();
259 USED(arg);
260 threadsetname("listen %s", adir);
261 for(;;){
262 c = emalloc(sizeof(Conn));
263 c->fd = iolisten(io, adir, c->dir);
264 if(c->fd < 0){
265 if(verbose) fprint(2, "%T listen: %r\n");
266 close(afd);
267 free(c);
268 return;
270 c->inc = chancreate(sizeof(void*), 0);
271 c->internal = chancreate(sizeof(void*), 0);
272 c->inq = qalloc();
273 c->outq = qalloc();
274 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
275 threadcreate(connthread, c, STACK);
279 void
280 send9pmsg(Msg *m)
282 int n, nn;
284 n = sizeS2M(&m->rx);
285 m->rpkt = emalloc(n);
286 nn = convS2M(&m->rx, m->rpkt, n);
287 if(nn != n)
288 sysfatal("sizeS2M + convS2M disagree");
289 sendq(m->c->outq, m);
292 void
293 sendomsg(Msg *m)
295 int n, nn;
297 n = sizeS2M(&m->tx);
298 m->tpkt = emalloc(n);
299 nn = convS2M(&m->tx, m->tpkt, n);
300 if(nn != n)
301 sysfatal("sizeS2M + convS2M disagree");
302 sendq(outq, m);
305 void
306 err(Msg *m, char *ename)
308 m->rx.type = Rerror;
309 m->rx.ename = ename;
310 m->rx.tag = m->tx.tag;
311 send9pmsg(m);
314 char*
315 estrdup(char *s)
317 char *t;
319 t = emalloc(strlen(s)+1);
320 strcpy(t, s);
321 return t;
324 void
325 connthread(void *arg)
327 int i, fd;
328 Conn *c;
329 Hash *h, *hnext;
330 Msg *m, *om, *mm;
331 Fid *f;
332 Ioproc *io;
334 c = arg;
335 threadsetname("conn %s", c->dir);
336 io = ioproc();
337 fd = ioaccept(io, c->fd, c->dir);
338 if(fd < 0){
339 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
340 goto out;
342 close(c->fd);
343 c->fd = fd;
344 threadcreate(connoutthread, c, STACK);
345 while((m = mread9p(io, c->fd)) != nil){
346 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
347 m->c = c;
348 m->ctag = m->tx.tag;
349 c->nmsg++;
350 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
351 if(puthash(c->tag, m->tx.tag, m) < 0){
352 err(m, "duplicate tag");
353 continue;
355 msgincref(m);
356 switch(m->tx.type){
357 case Tversion:
358 m->rx.tag = m->tx.tag;
359 m->rx.msize = m->tx.msize;
360 if(m->rx.msize > msize)
361 m->rx.msize = msize;
362 m->rx.version = "9P2000";
363 m->rx.type = Rversion;
364 send9pmsg(m);
365 continue;
366 case Tflush:
367 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
368 m->rx.tag = m->tx.tag;
369 m->rx.type = Rflush;
370 send9pmsg(m);
371 continue;
373 msgincref(m->oldm);
374 break;
375 case Tattach:
376 m->afid = nil;
377 if(m->tx.afid != NOFID
378 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
379 err(m, "unknown fid");
380 continue;
382 if(m->afid)
383 m->afid->ref++;
384 m->fid = fidnew(m->tx.fid);
385 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
386 err(m, "duplicate fid");
387 continue;
389 m->fid->ref++;
390 if(attached && m->afid==nil){
391 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
392 err(m, "invalid attach name");
393 continue;
395 m->tx.afid = xafid;
396 m->tx.aname = xaname;
397 m->tx.uname = estrdup(m->tx.uname);
398 repack(&m->tx, &m->tpkt);
399 free(m->tx.uname);
400 m->tx.uname = "XXX";
402 break;
403 case Twalk:
404 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
405 err(m, "unknown fid");
406 continue;
408 m->fid->ref++;
409 if(m->tx.newfid == m->tx.fid){
410 m->fid->ref++;
411 m->newfid = m->fid;
412 }else{
413 m->newfid = fidnew(m->tx.newfid);
414 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
415 err(m, "duplicate fid");
416 continue;
418 m->newfid->ref++;
420 break;
421 case Tauth:
422 if(attached){
423 err(m, "authentication not required");
424 continue;
426 m->afid = fidnew(m->tx.afid);
427 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
428 err(m, "duplicate fid");
429 continue;
431 m->afid->ref++;
432 break;
433 case Topenfd:
434 if(m->tx.mode&~(OTRUNC|3)){
435 err(m, "bad openfd mode");
436 continue;
438 m->isopenfd = 1;
439 m->tx.type = Topen;
440 m->tpkt[4] = Topen;
441 /* fall through */
442 case Tcreate:
443 case Topen:
444 case Tclunk:
445 case Tread:
446 case Twrite:
447 case Tremove:
448 case Tstat:
449 case Twstat:
450 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
451 err(m, "unknown fid");
452 continue;
454 m->fid->ref++;
455 break;
458 /* have everything - translate and send */
459 m->c = c;
460 m->ctag = m->tx.tag;
461 m->tx.tag = m->tag;
462 if(m->fid)
463 m->tx.fid = m->fid->fid;
464 if(m->newfid)
465 m->tx.newfid = m->newfid->fid;
466 if(m->afid)
467 m->tx.afid = m->afid->fid;
468 if(m->oldm)
469 m->tx.oldtag = m->oldm->tag;
470 /* reference passes to outq */
471 sendq(outq, m);
472 while(c->nmsg >= MAXMSG){
473 c->inputstalled = 1;
474 recvp(c->inc);
478 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
480 /* flush the output queue */
481 sendq(c->outq, nil);
482 while(c->outq != nil)
483 yield();
485 /* flush all outstanding messages */
486 for(i=0; i<NHASH; i++){
487 for(h=c->tag[i]; h; h=hnext){
488 om = h->v;
489 m = msgnew(0);
490 m->internal = 1;
491 m->c = c;
492 c->nmsg++;
493 m->tx.type = Tflush;
494 m->tx.tag = m->tag;
495 m->tx.oldtag = om->tag;
496 m->oldm = om;
497 msgincref(om);
498 msgincref(m); /* for outq */
499 sendomsg(m);
500 mm = recvp(c->internal);
501 assert(mm == m);
502 msgput(m); /* got from recvp */
503 msgput(m); /* got from msgnew */
504 msgput(om); /* got from hash table */
505 hnext = h->next;
506 free(h);
510 /* clunk all outstanding fids */
511 for(i=0; i<NHASH; i++){
512 for(h=c->fid[i]; h; h=hnext){
513 f = h->v;
514 m = msgnew(0);
515 m->internal = 1;
516 m->c = c;
517 c->nmsg++;
518 m->tx.type = Tclunk;
519 m->tx.tag = m->tag;
520 m->tx.fid = f->fid;
521 m->fid = f;
522 f->ref++;
523 msgincref(m);
524 sendomsg(m);
525 mm = recvp(c->internal);
526 assert(mm == m);
527 msgclear(m);
528 msgput(m); /* got from recvp */
529 msgput(m); /* got from msgnew */
530 fidput(f); /* got from hash table */
531 hnext = h->next;
532 free(h);
536 out:
537 closeioproc(io);
538 assert(c->nmsg == 0);
539 assert(c->nfid == 0);
540 close(c->fd);
541 chanfree(c->internal);
542 c->internal = 0;
543 chanfree(c->inc);
544 c->inc = 0;
545 free(c->inq);
546 c->inq = 0;
547 free(c);
550 static void
551 openfdthread(void *v)
553 Conn *c;
554 Fid *fid;
555 Msg *m;
556 int n;
557 vlong tot;
558 Ioproc *io;
559 char buf[1024];
561 c = v;
562 fid = c->fdfid;
563 io = ioproc();
564 threadsetname("openfd %s", c->fdfid);
565 tot = 0;
566 m = nil;
567 if(c->fdmode == OREAD){
568 for(;;){
569 if(verbose) fprint(2, "%T tread...");
570 m = msgnew(0);
571 m->internal = 1;
572 m->c = c;
573 m->tx.type = Tread;
574 m->tx.count = msize - IOHDRSZ;
575 m->tx.fid = fid->fid;
576 m->tx.tag = m->tag;
577 m->tx.offset = tot;
578 m->fid = fid;
579 fid->ref++;
580 msgincref(m);
581 sendomsg(m);
582 recvp(c->internal);
583 if(m->rx.type == Rerror){
584 // fprint(2, "%T read error: %s\n", m->rx.ename);
585 break;
587 if(m->rx.count == 0)
588 break;
589 tot += m->rx.count;
590 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
591 // fprint(2, "%T pipe write error: %r\n");
592 break;
594 msgput(m);
595 msgput(m);
596 m = nil;
598 }else{
599 for(;;){
600 if(verbose) fprint(2, "%T twrite...");
601 n = sizeof buf;
602 if(n > msize)
603 n = msize;
604 if((n=ioread(io, c->fd, buf, n)) <= 0){
605 if(n < 0)
606 fprint(2, "%T pipe read error: %r\n");
607 break;
609 m = msgnew(0);
610 m->internal = 1;
611 m->c = c;
612 m->tx.type = Twrite;
613 m->tx.fid = fid->fid;
614 m->tx.data = buf;
615 m->tx.count = n;
616 m->tx.tag = m->tag;
617 m->tx.offset = tot;
618 m->fid = fid;
619 fid->ref++;
620 msgincref(m);
621 sendomsg(m);
622 recvp(c->internal);
623 if(m->rx.type == Rerror){
624 // fprint(2, "%T write error: %s\n", m->rx.ename);
626 tot += n;
627 msgput(m);
628 msgput(m);
629 m = nil;
632 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
633 close(c->fd);
634 closeioproc(io);
635 if(m){
636 msgput(m);
637 msgput(m);
639 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
640 if(--fid->openfd == 0){
641 m = msgnew(0);
642 m->internal = 1;
643 m->c = c;
644 m->tx.type = Tclunk;
645 m->tx.tag = m->tag;
646 m->tx.fid = fid->fid;
647 m->fid = fid;
648 fid->ref++;
649 msgincref(m);
650 sendomsg(m);
651 recvp(c->internal);
652 msgput(m);
653 msgput(m);
655 fidput(fid);
656 c->fdfid = nil;
657 chanfree(c->internal);
658 c->internal = 0;
659 free(c);
662 int
663 xopenfd(Msg *m)
665 char errs[ERRMAX];
666 int n, p[2];
667 Conn *nc;
669 if(pipe(p) < 0){
670 rerrstr(errs, sizeof errs);
671 err(m, errs);
672 /* XXX return here? */
674 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
676 /* now we're committed. */
678 /* a new connection for this fid */
679 nc = emalloc(sizeof(Conn));
680 nc->internal = chancreate(sizeof(void*), 0);
682 /* a ref for us */
683 nc->fdfid = m->fid;
684 m->fid->ref++;
685 nc->fdfid->openfd++;
686 nc->fdmode = m->tx.mode;
687 nc->fd = p[0];
689 /* a thread to tend the pipe */
690 threadcreate(openfdthread, nc, STACK);
692 /* if mode is ORDWR, that openfdthread will write; start a reader */
693 if((m->tx.mode&3) == ORDWR){
694 nc = emalloc(sizeof(Conn));
695 nc->internal = chancreate(sizeof(void*), 0);
696 nc->fdfid = m->fid;
697 m->fid->ref++;
698 nc->fdfid->openfd++;
699 nc->fdmode = OREAD;
700 nc->fd = dup(p[0], -1);
701 threadcreate(openfdthread, nc, STACK);
704 /* steal fid from other connection */
705 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
706 fidput(m->fid);
708 /* rewrite as Ropenfd */
709 m->rx.type = Ropenfd;
710 n = GBIT32(m->rpkt);
711 m->rpkt = erealloc(m->rpkt, n+4);
712 PBIT32(m->rpkt+n, p[1]);
713 n += 4;
714 PBIT32(m->rpkt, n);
715 m->rpkt[4] = Ropenfd;
716 m->rx.unixfd = p[1];
717 return 0;
720 void
721 connoutthread(void *arg)
723 int err;
724 Conn *c;
725 Queue *outq;
726 Msg *m, *om;
727 Ioproc *io;
729 c = arg;
730 outq = c->outq;
731 io = ioproc();
732 threadsetname("connout %s", c->dir);
733 while((m = recvq(outq)) != nil){
734 err = m->tx.type+1 != m->rx.type;
735 if(!err && m->isopenfd)
736 if(xopenfd(m) < 0)
737 continue;
738 switch(m->tx.type){
739 case Tflush:
740 om = m->oldm;
741 if(om)
742 if(delhash(om->c->tag, om->ctag, om) == 0)
743 msgput(om);
744 break;
745 case Tclunk:
746 case Tremove:
747 if(m->fid)
748 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
749 fidput(m->fid);
750 break;
751 case Tauth:
752 if(err && m->afid){
753 if(verbose) fprint(2, "%T auth error\n");
754 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
755 fidput(m->afid);
757 break;
758 case Tattach:
759 if(err && m->fid)
760 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
761 fidput(m->fid);
762 break;
763 case Twalk:
764 if(err || m->rx.nwqid < m->tx.nwname)
765 if(m->tx.fid != m->tx.newfid && m->newfid)
766 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
767 fidput(m->newfid);
768 break;
770 if(delhash(m->c->tag, m->ctag, m) == 0)
771 msgput(m);
772 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
773 rewritehdr(&m->rx, m->rpkt);
774 if(mwrite9p(io, c->fd, m->rpkt) < 0)
775 if(verbose) fprint(2, "%T write error: %r\n");
776 msgput(m);
777 if(c->inputstalled && c->nmsg < MAXMSG)
778 nbsendp(c->inc, 0);
780 closeioproc(io);
781 free(outq);
782 c->outq = nil;
785 void
786 outputthread(void *arg)
788 Msg *m;
789 Ioproc *io;
791 USED(arg);
792 io = ioproc();
793 threadsetname("output");
794 while((m = recvq(outq)) != nil){
795 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
796 rewritehdr(&m->tx, m->tpkt);
797 if(mwrite9p(io, 1, m->tpkt) < 0)
798 sysfatal("output error: %r");
799 msgput(m);
801 closeioproc(io);
802 fprint(2, "%T output eof\n");
803 threadexitsall(0);
806 void
807 inputthread(void *arg)
809 uchar *pkt;
810 int n, nn, tag;
811 Msg *m;
812 Ioproc *io;
814 threadsetname("input");
815 if(verbose) fprint(2, "%T input thread\n");
816 io = ioproc();
817 USED(arg);
818 while((pkt = read9ppkt(io, 0)) != nil){
819 n = GBIT32(pkt);
820 if(n < 7){
821 fprint(2, "%T short 9P packet from server\n");
822 free(pkt);
823 continue;
825 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
826 tag = GBIT16(pkt+5);
827 if((m = msgget(tag)) == nil){
828 fprint(2, "%T unexpected 9P response tag %d\n", tag);
829 free(pkt);
830 continue;
832 if((nn = convM2S(pkt, n, &m->rx)) != n){
833 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
834 free(pkt);
835 msgput(m);
836 continue;
838 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
839 m->internal ? " (internal)" : "");
840 m->rpkt = pkt;
841 m->rx.tag = m->ctag;
842 if(m->internal)
843 sendp(m->c->internal, m);
844 else if(m->c->outq)
845 sendq(m->c->outq, m);
846 else
847 msgput(m);
849 closeioproc(io);
850 //fprint(2, "%T input eof\n");
851 threadexitsall(0);
854 void*
855 gethash(Hash **ht, uint n)
857 Hash *h;
859 for(h=ht[n%NHASH]; h; h=h->next)
860 if(h->n == n)
861 return h->v;
862 return nil;
865 int
866 delhash(Hash **ht, uint n, void *v)
868 Hash *h, **l;
870 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
871 if(h->n == n){
872 if(h->v != v){
873 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
874 return -1;
876 *l = h->next;
877 free(h);
878 return 0;
880 return -1;
883 int
884 puthash(Hash **ht, uint n, void *v)
886 Hash *h;
888 if(gethash(ht, n))
889 return -1;
890 h = emalloc(sizeof(Hash));
891 h->next = ht[n%NHASH];
892 h->n = n;
893 h->v = v;
894 ht[n%NHASH] = h;
895 return 0;
898 Fid **fidtab;
899 int nfidtab;
900 Fid *freefid;
902 Fid*
903 fidnew(int cfid)
905 Fid *f;
907 if(freefid == nil){
908 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
909 if(nfidtab == xafid){
910 fidtab[nfidtab++] = nil;
911 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
913 fidtab[nfidtab] = emalloc(sizeof(Fid));
914 freefid = fidtab[nfidtab];
915 freefid->fid = nfidtab++;
917 f = freefid;
918 freefid = f->next;
919 f->cfid = cfid;
920 f->ref = 1;
921 return f;
924 void
925 fidput(Fid *f)
927 if(f == nil)
928 return;
929 assert(f->ref > 0);
930 if(--f->ref > 0)
931 return;
932 f->next = freefid;
933 f->cfid = -1;
934 freefid = f;
937 Msg **msgtab;
938 int nmsgtab;
939 Msg *freemsg;
941 void
942 msgincref(Msg *m)
944 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
945 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
946 m->ref++;
949 Msg*
950 msgnew(int x)
952 Msg *m;
954 if(freemsg == nil){
955 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
956 msgtab[nmsgtab] = emalloc(sizeof(Msg));
957 freemsg = msgtab[nmsgtab];
958 freemsg->tag = nmsgtab++;
960 m = freemsg;
961 freemsg = m->next;
962 m->ref = 1;
963 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
964 getcallerpc(&x), m, m->tag, m->ref);
965 return m;
968 /*
969 * Clear data associated with connections, so that
970 * if all msgs have been msgcleared, the connection
971 * can be freed. Note that this does *not* free the tpkt
972 * and rpkt; they are freed in msgput with the msg itself.
973 * The io write thread might still be holding a ref to msg
974 * even once the connection has finished with it.
975 */
976 void
977 msgclear(Msg *m)
979 if(m->c){
980 m->c->nmsg--;
981 m->c = nil;
983 if(m->oldm){
984 msgput(m->oldm);
985 m->oldm = nil;
987 if(m->fid){
988 fidput(m->fid);
989 m->fid = nil;
991 if(m->afid){
992 fidput(m->afid);
993 m->afid = nil;
995 if(m->newfid){
996 fidput(m->newfid);
997 m->newfid = nil;
999 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
1000 close(m->rx.unixfd);
1001 m->rx.unixfd = -1;
1005 void
1006 msgput(Msg *m)
1008 if(m == nil)
1009 return;
1011 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
1012 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
1013 assert(m->ref > 0);
1014 if(--m->ref > 0)
1015 return;
1016 msgclear(m);
1017 if(m->tpkt){
1018 free(m->tpkt);
1019 m->tpkt = nil;
1021 if(m->rpkt){
1022 free(m->rpkt);
1023 m->rpkt = nil;
1025 m->isopenfd = 0;
1026 m->internal = 0;
1027 m->next = freemsg;
1028 freemsg = m;
1031 Msg*
1032 msgget(int n)
1034 Msg *m;
1036 if(n < 0 || n >= nmsgtab)
1037 return nil;
1038 m = msgtab[n];
1039 if(m->ref == 0)
1040 return nil;
1041 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
1042 msgincref(m);
1043 return m;
1047 void*
1048 emalloc(int n)
1050 void *v;
1052 v = mallocz(n, 1);
1053 if(v == nil){
1054 abort();
1055 sysfatal("out of memory allocating %d", n);
1057 return v;
1060 void*
1061 erealloc(void *v, int n)
1063 v = realloc(v, n);
1064 if(v == nil){
1065 abort();
1066 sysfatal("out of memory reallocating %d", n);
1068 return v;
1071 typedef struct Qel Qel;
1072 struct Qel
1074 Qel *next;
1075 void *p;
1078 struct Queue
1080 int hungup;
1081 QLock lk;
1082 Rendez r;
1083 Qel *head;
1084 Qel *tail;
1087 Queue*
1088 qalloc(void)
1090 Queue *q;
1092 q = mallocz(sizeof(Queue), 1);
1093 if(q == nil)
1094 return nil;
1095 q->r.l = &q->lk;
1096 return q;
1099 int
1100 sendq(Queue *q, void *p)
1102 Qel *e;
1104 e = emalloc(sizeof(Qel));
1105 qlock(&q->lk);
1106 if(q->hungup){
1107 werrstr("hungup queue");
1108 qunlock(&q->lk);
1109 return -1;
1111 e->p = p;
1112 e->next = nil;
1113 if(q->head == nil)
1114 q->head = e;
1115 else
1116 q->tail->next = e;
1117 q->tail = e;
1118 rwakeup(&q->r);
1119 qunlock(&q->lk);
1120 return 0;
1123 void*
1124 recvq(Queue *q)
1126 void *p;
1127 Qel *e;
1129 qlock(&q->lk);
1130 while(q->head == nil && !q->hungup)
1131 rsleep(&q->r);
1132 if(q->hungup){
1133 qunlock(&q->lk);
1134 return nil;
1136 e = q->head;
1137 q->head = e->next;
1138 qunlock(&q->lk);
1139 p = e->p;
1140 free(e);
1141 return p;
1144 uchar*
1145 read9ppkt(Ioproc *io, int fd)
1147 uchar buf[4], *pkt;
1148 int n, nn;
1150 n = ioreadn(io, fd, buf, 4);
1151 if(n != 4)
1152 return nil;
1153 n = GBIT32(buf);
1154 pkt = emalloc(n);
1155 PBIT32(pkt, n);
1156 nn = ioreadn(io, fd, pkt+4, n-4);
1157 if(nn != n-4){
1158 free(pkt);
1159 return nil;
1161 /* would do this if we ever got one of these, but we only generate them
1162 if(pkt[4] == Ropenfd){
1163 newfd = iorecvfd(io, fd);
1164 PBIT32(pkt+n-4, newfd);
1167 return pkt;
1170 Msg*
1171 mread9p(Ioproc *io, int fd)
1173 int n, nn;
1174 uchar *pkt;
1175 Msg *m;
1177 if((pkt = read9ppkt(io, fd)) == nil)
1178 return nil;
1180 m = msgnew(0);
1181 m->tpkt = pkt;
1182 n = GBIT32(pkt);
1183 nn = convM2S(pkt, n, &m->tx);
1184 if(nn != n){
1185 fprint(2, "%T read bad packet from %d\n", fd);
1186 return nil;
1188 return m;
1191 int
1192 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1194 int n, nfd;
1196 n = GBIT32(pkt);
1197 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1198 if(verbose > 1) fprint(2, "%T before iowrite\n");
1199 if(iowrite(io, fd, pkt, n) != n){
1200 fprint(2, "%T write error: %r\n");
1201 return -1;
1203 if(verbose > 1) fprint(2, "%T after iowrite\n");
1204 if(pkt[4] == Ropenfd){
1205 nfd = GBIT32(pkt+n-4);
1206 if(iosendfd(io, fd, nfd) < 0){
1207 fprint(2, "%T send fd error: %r\n");
1208 return -1;
1211 return 0;
1214 void
1215 restring(uchar *pkt, int pn, char *s)
1217 int n;
1219 if(s < (char*)pkt || s >= (char*)pkt+pn)
1220 return;
1222 n = strlen(s);
1223 memmove(s+1, s, n);
1224 PBIT16((uchar*)s-1, n);
1227 void
1228 repack(Fcall *f, uchar **ppkt)
1230 uint n, nn;
1231 uchar *pkt;
1233 pkt = *ppkt;
1234 n = GBIT32(pkt);
1235 nn = sizeS2M(f);
1236 if(nn > n){
1237 free(pkt);
1238 pkt = emalloc(nn);
1239 *ppkt = pkt;
1241 convS2M(f, pkt, nn);
1244 void
1245 rewritehdr(Fcall *f, uchar *pkt)
1247 int i, n;
1249 n = GBIT32(pkt);
1250 PBIT16(pkt+5, f->tag);
1251 switch(f->type){
1252 case Tversion:
1253 case Rversion:
1254 restring(pkt, n, f->version);
1255 break;
1256 case Tauth:
1257 PBIT32(pkt+7, f->afid);
1258 restring(pkt, n, f->uname);
1259 restring(pkt, n, f->aname);
1260 break;
1261 case Tflush:
1262 PBIT16(pkt+7, f->oldtag);
1263 break;
1264 case Tattach:
1265 restring(pkt, n, f->uname);
1266 restring(pkt, n, f->aname);
1267 PBIT32(pkt+7, f->fid);
1268 PBIT32(pkt+11, f->afid);
1269 break;
1270 case Twalk:
1271 PBIT32(pkt+7, f->fid);
1272 PBIT32(pkt+11, f->newfid);
1273 for(i=0; i<f->nwname; i++)
1274 restring(pkt, n, f->wname[i]);
1275 break;
1276 case Tcreate:
1277 restring(pkt, n, f->name);
1278 /* fall through */
1279 case Topen:
1280 case Tread:
1281 case Twrite:
1282 case Tclunk:
1283 case Tremove:
1284 case Tstat:
1285 case Twstat:
1286 PBIT32(pkt+7, f->fid);
1287 break;
1288 case Rerror:
1289 restring(pkt, n, f->ename);
1290 break;
1294 static long
1295 _iolisten(va_list *arg)
1297 char *a, *b;
1299 a = va_arg(*arg, char*);
1300 b = va_arg(*arg, char*);
1301 return listen(a, b);
1304 int
1305 iolisten(Ioproc *io, char *a, char *b)
1307 return iocall(io, _iolisten, a, b);
1310 static long
1311 _ioaccept(va_list *arg)
1313 int fd;
1314 char *dir;
1316 fd = va_arg(*arg, int);
1317 dir = va_arg(*arg, char*);
1318 return accept(fd, dir);
1321 int
1322 ioaccept(Ioproc *io, int fd, char *dir)
1324 return iocall(io, _ioaccept, fd, dir);
1327 int
1328 timefmt(Fmt *fmt)
1330 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1331 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1332 vlong ns;
1333 Tm tm;
1334 ns = nsec();
1335 tm = *localtime(time(0));
1336 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1337 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1338 (int)(ns%1000000000)/1000000);