Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include <thread.h>
5 #include "queue.h"
7 enum
8 {
9 STACK = 8192,
10 };
12 typedef struct VtSconn VtSconn;
13 struct VtSconn
14 {
15 int ctl;
16 int ref;
17 QLock lk;
18 char dir[NETPATHLEN];
19 VtSrv *srv;
20 VtConn *c;
21 };
23 struct VtSrv
24 {
25 int afd;
26 int dead;
27 char adir[NETPATHLEN];
28 Queue *q; /* Queue(VtReq*) */
29 };
31 static void listenproc(void*);
32 static void connproc(void*);
34 static void
35 scincref(VtSconn *sc)
36 {
37 qlock(&sc->lk);
38 sc->ref++;
39 qunlock(&sc->lk);
40 }
42 static void
43 scdecref(VtSconn *sc)
44 {
45 qlock(&sc->lk);
46 if(--sc->ref > 0){
47 qunlock(&sc->lk);
48 return;
49 }
50 if(sc->c)
51 vtfreeconn(sc->c);
52 vtfree(sc);
53 }
55 VtSrv*
56 vtlisten(char *addr)
57 {
58 VtSrv *s;
60 s = vtmallocz(sizeof(VtSrv));
61 s->afd = announce(addr, s->adir);
62 if(s->afd < 0){
63 free(s);
64 return nil;
65 }
66 s->q = _vtqalloc();
67 proccreate(listenproc, s, STACK);
68 return s;
69 }
71 static void
72 listenproc(void *v)
73 {
74 int ctl;
75 char dir[NETPATHLEN];
76 VtSrv *srv;
77 VtSconn *sc;
79 srv = v;
80 for(;;){
81 ctl = listen(srv->adir, dir);
82 if(ctl < 0){
83 srv->dead = 1;
84 break;
85 }
86 sc = vtmallocz(sizeof(VtSconn));
87 sc->ref = 1;
88 sc->ctl = ctl;
89 sc->srv = srv;
90 strcpy(sc->dir, dir);
91 proccreate(connproc, sc, STACK);
92 }
94 // hangup
95 }
97 static void
98 connproc(void *v)
99 {
100 VtSconn *sc;
101 VtConn *c;
102 Packet *p;
103 VtReq *r;
104 int fd;
105 static int first=1;
107 if(first){
108 first=0;
109 fmtinstall('F', vtfcallfmt);
111 r = nil;
112 sc = v;
113 sc->c = nil;
114 fprint(2, "new call %s on %d\n", sc->dir, sc->ctl);
115 fd = accept(sc->ctl, sc->dir);
116 close(sc->ctl);
117 if(fd < 0){
118 fprint(2, "accept %s: %r\n", sc->dir);
119 goto out;
122 c = vtconn(fd, fd);
123 sc->c = c;
124 if(vtversion(c) < 0){
125 fprint(2, "vtversion %s: %r\n", sc->dir);
126 goto out;
128 if(vtsrvhello(c) < 0){
129 fprint(2, "vtsrvhello %s: %r\n", sc->dir);
130 goto out;
133 fprint(2, "new proc %s\n", sc->dir);
134 proccreate(vtsendproc, c, STACK);
135 qlock(&c->lk);
136 while(!c->writeq)
137 rsleep(&c->rpcfork);
138 qunlock(&c->lk);
140 while((p = vtrecv(c)) != nil){
141 r = vtmallocz(sizeof(VtReq));
142 if(vtfcallunpack(&r->tx, p) < 0){
143 packetfree(p);
144 fprint(2, "bad packet on %s: %r\n", sc->dir);
145 continue;
147 if(chattyventi)
148 fprint(2, "%s <- %F\n", argv0, &r->tx);
149 packetfree(p);
150 if(r->tx.type == VtTgoodbye)
151 break;
152 r->rx.tag = r->tx.tag;
153 r->sc = sc;
154 scincref(sc);
155 if(_vtqsend(sc->srv->q, r) < 0){
156 scdecref(sc);
157 fprint(2, "hungup queue\n");
158 break;
160 r = nil;
163 fprint(2, "eof on %s\n", sc->dir);
165 out:
166 if(r){
167 vtfcallclear(&r->tx);
168 vtfree(r);
170 fprint(2, "freed %s\n", sc->dir);
171 scdecref(sc);
172 return;
175 VtReq*
176 vtgetreq(VtSrv *srv)
178 return _vtqrecv(srv->q);
181 void
182 vtrespond(VtReq *r)
184 Packet *p;
185 VtSconn *sc;
187 sc = r->sc;
188 if(r->rx.tag != r->tx.tag)
189 abort();
190 if(r->rx.type != r->tx.type+1 && r->rx.type != VtRerror)
191 abort();
192 if(chattyventi)
193 fprint(2, "%s -> %F\n", argv0, &r->rx);
194 if((p = vtfcallpack(&r->rx)) == nil){
195 fprint(2, "fcallpack on %s: %r\n", sc->dir);
196 packetfree(p);
197 vtfcallclear(&r->rx);
198 return;
200 vtsend(sc->c, p);
201 scdecref(sc);
202 vtfcallclear(&r->tx);
203 vtfcallclear(&r->rx);
204 vtfree(r);