Blob
1 #include <u.h>2 #include <libc.h>3 #include <venti.h>4 #include <thread.h>5 #include "queue.h"7 enum8 {9 STACK = 8192,10 };12 typedef struct VtSconn VtSconn;13 struct VtSconn14 {15 int ctl;16 int ref;17 QLock lk;18 char dir[NETPATHLEN];19 VtSrv *srv;20 VtConn *c;21 };23 struct VtSrv24 {25 int afd;26 int dead;27 char adir[NETPATHLEN];28 Queue *q; /* Queue(VtReq*) */29 };31 static void listenproc(void*);32 static void connproc(void*);34 static void35 scincref(VtSconn *sc)36 {37 qlock(&sc->lk);38 sc->ref++;39 qunlock(&sc->lk);40 }42 static void43 scdecref(VtSconn *sc)44 {45 qlock(&sc->lk);46 if(--sc->ref > 0){47 qunlock(&sc->lk);48 return;49 }50 if(sc->c)51 vtfreeconn(sc->c);52 vtfree(sc);53 }55 VtSrv*56 vtlisten(char *addr)57 {58 VtSrv *s;60 s = vtmallocz(sizeof(VtSrv));61 s->afd = announce(addr, s->adir);62 if(s->afd < 0){63 free(s);64 return nil;65 }66 s->q = _vtqalloc();67 proccreate(listenproc, s, STACK);68 return s;69 }71 static void72 listenproc(void *v)73 {74 int ctl;75 char dir[NETPATHLEN];76 VtSrv *srv;77 VtSconn *sc;79 srv = v;80 for(;;){81 ctl = listen(srv->adir, dir);82 if(ctl < 0){83 srv->dead = 1;84 break;85 }86 sc = vtmallocz(sizeof(VtSconn));87 sc->ref = 1;88 sc->ctl = ctl;89 sc->srv = srv;90 strcpy(sc->dir, dir);91 proccreate(connproc, sc, STACK);92 }94 // hangup95 }97 static void98 connproc(void *v)99 {100 VtSconn *sc;101 VtConn *c;102 Packet *p;103 VtReq *r;104 int fd;105 static int first=1;107 if(first && chattyventi){108 first=0;109 fmtinstall('F', vtfcallfmt);110 }111 r = nil;112 sc = v;113 sc->c = nil;114 if(0) fprint(2, "new call %s on %d\n", sc->dir, sc->ctl);115 fd = accept(sc->ctl, sc->dir);116 close(sc->ctl);117 if(fd < 0){118 fprint(2, "accept %s: %r\n", sc->dir);119 goto out;120 }122 c = vtconn(fd, fd);123 sc->c = c;124 if(vtversion(c) < 0){125 fprint(2, "vtversion %s: %r\n", sc->dir);126 goto out;127 }128 if(vtsrvhello(c) < 0){129 fprint(2, "vtsrvhello %s: %r\n", sc->dir);130 goto out;131 }133 if(0) fprint(2, "new proc %s\n", sc->dir);134 proccreate(vtsendproc, c, STACK);135 qlock(&c->lk);136 while(!c->writeq)137 rsleep(&c->rpcfork);138 qunlock(&c->lk);140 while((p = vtrecv(c)) != nil){141 r = vtmallocz(sizeof(VtReq));142 if(vtfcallunpack(&r->tx, p) < 0){143 packetfree(p);144 fprint(2, "bad packet on %s: %r\n", sc->dir);145 continue;146 }147 if(chattyventi)148 fprint(2, "%s <- %F\n", argv0, &r->tx);149 packetfree(p);150 if(r->tx.type == VtTgoodbye)151 break;152 r->rx.tag = r->tx.tag;153 r->sc = sc;154 scincref(sc);155 if(_vtqsend(sc->srv->q, r) < 0){156 scdecref(sc);157 fprint(2, "hungup queue\n");158 break;159 }160 r = nil;161 }163 if(0) fprint(2, "eof on %s\n", sc->dir);165 out:166 if(r){167 vtfcallclear(&r->tx);168 vtfree(r);169 }170 if(0) fprint(2, "freed %s\n", sc->dir);171 scdecref(sc);172 return;173 }175 VtReq*176 vtgetreq(VtSrv *srv)177 {178 return _vtqrecv(srv->q);179 }181 void182 vtrespond(VtReq *r)183 {184 Packet *p;185 VtSconn *sc;187 sc = r->sc;188 if(r->rx.tag != r->tx.tag)189 abort();190 if(r->rx.type != r->tx.type+1 && r->rx.type != VtRerror)191 abort();192 if(chattyventi)193 fprint(2, "%s -> %F\n", argv0, &r->rx);194 if((p = vtfcallpack(&r->rx)) == nil){195 fprint(2, "fcallpack on %s: %r\n", sc->dir);196 packetfree(p);197 vtfcallclear(&r->rx);198 return;199 }200 vtsend(sc->c, p);201 scdecref(sc);202 vtfcallclear(&r->tx);203 vtfcallclear(&r->rx);204 vtfree(r);205 }