Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include <thread.h>
5 #include "queue.h"
7 enum
8 {
9 STACK = 8192,
10 };
12 typedef struct VtSconn VtSconn;
13 struct VtSconn
14 {
15 int ctl;
16 char dir[NETPATHLEN];
17 VtSrv *srv;
18 VtConn *c;
19 };
21 struct VtSrv
22 {
23 int afd;
24 int dead;
25 char adir[NETPATHLEN];
26 Queue *q; /* Queue(VtReq*) */
27 };
29 static void listenproc(void*);
30 static void connproc(void*);
32 VtSrv*
33 vtlisten(char *addr)
34 {
35 VtSrv *s;
37 s = vtmallocz(sizeof(VtSrv));
38 s->afd = announce(addr, s->adir);
39 if(s->afd < 0){
40 free(s);
41 return nil;
42 }
43 s->q = _vtqalloc();
44 proccreate(listenproc, s, STACK);
45 return s;
46 }
48 static void
49 listenproc(void *v)
50 {
51 int ctl;
52 char dir[NETPATHLEN];
53 VtSrv *srv;
54 VtSconn *sc;
56 srv = v;
57 for(;;){
58 ctl = listen(srv->adir, dir);
59 if(ctl < 0){
60 srv->dead = 1;
61 break;
62 }
63 sc = vtmallocz(sizeof(VtSconn));
64 sc->ctl = ctl;
65 sc->srv = srv;
66 strcpy(sc->dir, dir);
67 proccreate(connproc, sc, STACK);
68 }
70 // hangup
71 }
73 static void
74 connproc(void *v)
75 {
76 VtSconn *sc;
77 VtConn *c;
78 Packet *p;
79 VtReq *r;
80 int fd;
82 r = nil;
83 c = nil;
84 sc = v;
85 fprint(2, "new call %s on %d\n", sc->dir, sc->ctl);
86 fd = accept(sc->ctl, sc->dir);
87 close(sc->ctl);
88 if(fd < 0){
89 fprint(2, "accept %s: %r\n", sc->dir);
90 goto out;
91 }
93 c = vtconn(fd, fd);
94 sc->c = c;
95 if(vtversion(c) < 0){
96 fprint(2, "vtversion %s: %r\n", sc->dir);
97 goto out;
98 }
99 if(vtsrvhello(c) < 0){
100 fprint(2, "vtsrvhello %s: %r\n", sc->dir);
101 goto out;
104 fprint(2, "new proc %s\n", sc->dir);
105 proccreate(vtsendproc, c, STACK);
106 qlock(&c->lk);
107 while(!c->writeq)
108 rsleep(&c->rpcfork);
109 qunlock(&c->lk);
111 while((p = vtrecv(c)) != nil){
112 r = vtmallocz(sizeof(VtReq));
113 if(vtfcallunpack(&r->tx, p) < 0){
114 packetfree(p);
115 fprint(2, "bad packet on %s: %r\n", sc->dir);
116 continue;
118 packetfree(p);
119 if(r->tx.type == VtTgoodbye)
120 break;
121 r->rx.tag = r->tx.tag;
122 r->sc = sc;
123 if(_vtqsend(sc->srv->q, r) < 0){
124 fprint(2, "hungup queue\n");
125 break;
127 r = nil;
130 fprint(2, "eof on %s\n", sc->dir);
132 out:
133 if(r){
134 vtfcallclear(&r->tx);
135 vtfree(r);
137 if(c)
138 vtfreeconn(c);
139 fprint(2, "freed %s\n", sc->dir);
140 vtfree(sc);
141 return;
144 VtReq*
145 vtgetreq(VtSrv *srv)
147 return _vtqrecv(srv->q);
150 void
151 vtrespond(VtReq *r)
153 Packet *p;
154 VtSconn *sc;
156 sc = r->sc;
157 if(r->rx.tag != r->tx.tag)
158 abort();
159 if(r->rx.type != r->tx.type+1 && r->rx.type != VtRerror)
160 abort();
161 if((p = vtfcallpack(&r->rx)) == nil){
162 fprint(2, "fcallpack on %s: %r\n", sc->dir);
163 packetfree(p);
164 vtfcallclear(&r->rx);
165 return;
167 vtsend(sc->c, p);
168 vtfcallclear(&r->tx);
169 vtfcallclear(&r->rx);
170 vtfree(r);