Blob
1 #include <u.h>2 #include <libc.h>3 #include <venti.h>4 #include <thread.h>5 #include "queue.h"7 enum8 {9 STACK = 8192,10 };12 typedef struct VtSconn VtSconn;13 struct VtSconn14 {15 int ctl;16 char dir[NETPATHLEN];17 VtSrv *srv;18 VtConn *c;19 };21 struct VtSrv22 {23 int afd;24 int dead;25 char adir[NETPATHLEN];26 Queue *q; /* Queue(VtReq*) */27 };29 static void listenproc(void*);30 static void connproc(void*);32 VtSrv*33 vtlisten(char *addr)34 {35 VtSrv *s;37 s = vtmallocz(sizeof(VtSrv));38 s->afd = announce(addr, s->adir);39 if(s->afd < 0){40 free(s);41 return nil;42 }43 s->q = _vtqalloc();44 proccreate(listenproc, s, STACK);45 return s;46 }48 static void49 listenproc(void *v)50 {51 int ctl;52 char dir[NETPATHLEN];53 VtSrv *srv;54 VtSconn *sc;56 srv = v;57 for(;;){58 fprint(2, "listen for venti\n");59 ctl = listen(srv->adir, dir);60 if(ctl < 0){61 srv->dead = 1;62 break;63 }64 fprint(2, "got one\n");65 sc = vtmallocz(sizeof(VtSconn));66 sc->ctl = ctl;67 sc->srv = srv;68 strcpy(sc->dir, dir);69 proccreate(connproc, sc, STACK);70 }72 // hangup73 }75 static void76 connproc(void *v)77 {78 VtSconn *sc;79 VtConn *c;80 Packet *p;81 VtReq *r;82 int fd;84 r = nil;85 c = nil;86 sc = v;87 fprint(2, "new call %s on %d\n", sc->dir, sc->ctl);88 fd = accept(sc->ctl, sc->dir);89 close(sc->ctl);90 if(fd < 0){91 fprint(2, "accept %s: %r\n", sc->dir);92 goto out;93 }95 c = vtconn(fd, fd);96 sc->c = c;97 if(vtversion(c) < 0){98 fprint(2, "vtversion %s: %r\n", sc->dir);99 goto out;100 }101 if(vtsrvhello(c) < 0){102 fprint(2, "vtsrvhello %s: %r\n", sc->dir);103 goto out;104 }106 fprint(2, "new proc %s\n", sc->dir);107 proccreate(vtsendproc, c, STACK);108 qlock(&c->lk);109 while(!c->writeq)110 rsleep(&c->rpcfork);111 qunlock(&c->lk);113 while((p = vtrecv(c)) != nil){114 r = vtmallocz(sizeof(VtReq));115 if(vtfcallunpack(&r->tx, p) < 0){116 packetfree(p);117 fprint(2, "bad packet on %s: %r\n", sc->dir);118 continue;119 }120 packetfree(p);121 if(r->tx.type == VtTgoodbye)122 break;123 r->rx.tag = r->tx.tag;124 r->sc = sc;125 if(_vtqsend(sc->srv->q, r) < 0){126 fprint(2, "hungup queue\n");127 break;128 }129 r = nil;130 }132 fprint(2, "eof on %s\n", sc->dir);134 out:135 if(r){136 vtfcallclear(&r->tx);137 vtfree(r);138 }139 if(c)140 vtfreeconn(c);141 fprint(2, "freed %s\n", sc->dir);142 vtfree(sc);143 return;144 }146 VtReq*147 vtgetreq(VtSrv *srv)148 {149 return _vtqrecv(srv->q);150 }152 void153 vtrespond(VtReq *r)154 {155 Packet *p;156 VtSconn *sc;158 sc = r->sc;159 if(r->rx.tag != r->tx.tag)160 abort();161 if(r->rx.type != r->tx.type+1 && r->rx.type != VtRerror)162 abort();163 if((p = vtfcallpack(&r->rx)) == nil){164 fprint(2, "fcallpack on %s: %r\n", sc->dir);165 packetfree(p);166 vtfcallclear(&r->rx);167 return;168 }169 vtsend(sc->c, p);170 vtfcallclear(&r->tx);171 vtfcallclear(&r->rx);172 vtfree(r);173 }