Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include <thread.h>
5 #include "queue.h"
7 enum
8 {
9 STACK = 8192,
10 };
12 typedef struct VtSconn VtSconn;
13 struct VtSconn
14 {
15 int ctl;
16 char dir[NETPATHLEN];
17 VtSrv *srv;
18 VtConn *c;
19 };
21 struct VtSrv
22 {
23 int afd;
24 int dead;
25 char adir[NETPATHLEN];
26 Queue *q; /* Queue(VtReq*) */
27 };
29 static void listenproc(void*);
30 static void connproc(void*);
32 VtSrv*
33 vtlisten(char *addr)
34 {
35 VtSrv *s;
37 s = vtmallocz(sizeof(VtSrv));
38 s->afd = announce(addr, s->adir);
39 if(s->afd < 0){
40 free(s);
41 return nil;
42 }
43 s->q = _vtqalloc();
44 proccreate(listenproc, s, STACK);
45 return s;
46 }
48 static void
49 listenproc(void *v)
50 {
51 int ctl;
52 char dir[NETPATHLEN];
53 VtSrv *srv;
54 VtSconn *sc;
56 srv = v;
57 for(;;){
58 fprint(2, "listen for venti\n");
59 ctl = listen(srv->adir, dir);
60 if(ctl < 0){
61 srv->dead = 1;
62 break;
63 }
64 fprint(2, "got one\n");
65 sc = vtmallocz(sizeof(VtSconn));
66 sc->ctl = ctl;
67 sc->srv = srv;
68 strcpy(sc->dir, dir);
69 proccreate(connproc, sc, STACK);
70 }
72 // hangup
73 }
75 static void
76 connproc(void *v)
77 {
78 VtSconn *sc;
79 VtConn *c;
80 Packet *p;
81 VtReq *r;
82 int fd;
84 r = nil;
85 c = nil;
86 sc = v;
87 fprint(2, "new call %s on %d\n", sc->dir, sc->ctl);
88 fd = accept(sc->ctl, sc->dir);
89 close(sc->ctl);
90 if(fd < 0){
91 fprint(2, "accept %s: %r\n", sc->dir);
92 goto out;
93 }
95 c = vtconn(fd, fd);
96 sc->c = c;
97 if(vtversion(c) < 0){
98 fprint(2, "vtversion %s: %r\n", sc->dir);
99 goto out;
101 if(vtsrvhello(c) < 0){
102 fprint(2, "vtsrvhello %s: %r\n", sc->dir);
103 goto out;
106 fprint(2, "new proc %s\n", sc->dir);
107 proccreate(vtsendproc, c, STACK);
108 qlock(&c->lk);
109 while(!c->writeq)
110 rsleep(&c->rpcfork);
111 qunlock(&c->lk);
113 while((p = vtrecv(c)) != nil){
114 r = vtmallocz(sizeof(VtReq));
115 if(vtfcallunpack(&r->tx, p) < 0){
116 packetfree(p);
117 fprint(2, "bad packet on %s: %r\n", sc->dir);
118 continue;
120 packetfree(p);
121 if(r->tx.type == VtTgoodbye)
122 break;
123 r->rx.tag = r->tx.tag;
124 r->sc = sc;
125 if(_vtqsend(sc->srv->q, r) < 0){
126 fprint(2, "hungup queue\n");
127 break;
129 r = nil;
132 fprint(2, "eof on %s\n", sc->dir);
134 out:
135 if(r){
136 vtfcallclear(&r->tx);
137 vtfree(r);
139 if(c)
140 vtfreeconn(c);
141 fprint(2, "freed %s\n", sc->dir);
142 vtfree(sc);
143 return;
146 VtReq*
147 vtgetreq(VtSrv *srv)
149 return _vtqrecv(srv->q);
152 void
153 vtrespond(VtReq *r)
155 Packet *p;
156 VtSconn *sc;
158 sc = r->sc;
159 if(r->rx.tag != r->tx.tag)
160 abort();
161 if(r->rx.type != r->tx.type+1 && r->rx.type != VtRerror)
162 abort();
163 if((p = vtfcallpack(&r->rx)) == nil){
164 fprint(2, "fcallpack on %s: %r\n", sc->dir);
165 packetfree(p);
166 vtfcallclear(&r->rx);
167 return;
169 vtsend(sc->c, p);
170 vtfcallclear(&r->tx);
171 vtfcallclear(&r->rx);
172 vtfree(r);