Blob
1 #include <u.h>2 #include <libc.h>3 #include <venti.h>4 #include <thread.h>5 #include "queue.h"7 enum8 {9 STACK = 8192,10 };12 typedef struct VtSconn VtSconn;13 struct VtSconn14 {15 int ctl;16 char dir[NETPATHLEN];17 VtSrv *srv;18 VtConn *c;19 };21 struct VtSrv22 {23 int afd;24 int dead;25 char adir[NETPATHLEN];26 Queue *q; /* Queue(VtReq*) */27 };29 static void listenproc(void*);30 static void connproc(void*);32 VtSrv*33 vtlisten(char *addr)34 {35 VtSrv *s;37 s = vtmallocz(sizeof(VtSrv));38 s->afd = announce(addr, s->adir);39 if(s->afd < 0){40 free(s);41 return nil;42 }43 s->q = _vtqalloc();44 proccreate(listenproc, s, STACK);45 return s;46 }48 static void49 listenproc(void *v)50 {51 int ctl;52 char dir[NETPATHLEN];53 VtSrv *srv;54 VtSconn *sc;56 srv = v;57 for(;;){58 ctl = listen(srv->adir, dir);59 if(ctl < 0){60 srv->dead = 1;61 break;62 }63 sc = vtmallocz(sizeof(VtSconn));64 sc->ctl = ctl;65 sc->srv = srv;66 strcpy(sc->dir, dir);67 proccreate(connproc, sc, STACK);68 }70 // hangup71 }73 static void74 connproc(void *v)75 {76 VtSconn *sc;77 VtConn *c;78 Packet *p;79 VtReq *r;80 int fd;82 r = nil;83 c = nil;84 sc = v;85 fprint(2, "new call %s on %d\n", sc->dir, sc->ctl);86 fd = accept(sc->ctl, sc->dir);87 close(sc->ctl);88 if(fd < 0){89 fprint(2, "accept %s: %r\n", sc->dir);90 goto out;91 }93 c = vtconn(fd, fd);94 sc->c = c;95 if(vtversion(c) < 0){96 fprint(2, "vtversion %s: %r\n", sc->dir);97 goto out;98 }99 if(vtsrvhello(c) < 0){100 fprint(2, "vtsrvhello %s: %r\n", sc->dir);101 goto out;102 }104 fprint(2, "new proc %s\n", sc->dir);105 proccreate(vtsendproc, c, STACK);106 qlock(&c->lk);107 while(!c->writeq)108 rsleep(&c->rpcfork);109 qunlock(&c->lk);111 while((p = vtrecv(c)) != nil){112 r = vtmallocz(sizeof(VtReq));113 if(vtfcallunpack(&r->tx, p) < 0){114 packetfree(p);115 fprint(2, "bad packet on %s: %r\n", sc->dir);116 continue;117 }118 packetfree(p);119 if(r->tx.type == VtTgoodbye)120 break;121 r->rx.tag = r->tx.tag;122 r->sc = sc;123 if(_vtqsend(sc->srv->q, r) < 0){124 fprint(2, "hungup queue\n");125 break;126 }127 r = nil;128 }130 fprint(2, "eof on %s\n", sc->dir);132 out:133 if(r){134 vtfcallclear(&r->tx);135 vtfree(r);136 }137 if(c)138 vtfreeconn(c);139 fprint(2, "freed %s\n", sc->dir);140 vtfree(sc);141 return;142 }144 VtReq*145 vtgetreq(VtSrv *srv)146 {147 return _vtqrecv(srv->q);148 }150 void151 vtrespond(VtReq *r)152 {153 Packet *p;154 VtSconn *sc;156 sc = r->sc;157 if(r->rx.tag != r->tx.tag)158 abort();159 if(r->rx.type != r->tx.type+1 && r->rx.type != VtRerror)160 abort();161 if((p = vtfcallpack(&r->rx)) == nil){162 fprint(2, "fcallpack on %s: %r\n", sc->dir);163 packetfree(p);164 vtfcallclear(&r->rx);165 return;166 }167 vtsend(sc->c, p);168 vtfcallclear(&r->tx);169 vtfcallclear(&r->rx);170 vtfree(r);171 }