Blob
1 #include <u.h>2 #include <libc.h>3 #include <venti.h>4 #include "queue.h"6 long ventisendbytes, ventisendpackets;7 long ventirecvbytes, ventirecvpackets;9 static int10 _vtsend(VtConn *z, Packet *p)11 {12 IOchunk ioc;13 int n, tot;14 uchar buf[2];16 if(z->state != VtStateConnected) {17 werrstr("session not connected");18 return -1;19 }21 /* add framing */22 n = packetsize(p);23 if(n >= (1<<16)) {24 werrstr("packet too large");25 packetfree(p);26 return -1;27 }28 buf[0] = n>>8;29 buf[1] = n;30 packetprefix(p, buf, 2);31 ventisendbytes += n+2;32 ventisendpackets++;34 tot = 0;35 for(;;){36 n = packetfragments(p, &ioc, 1, 0);37 if(n == 0)38 break;39 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){40 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);41 packetfree(p);42 return 0;43 }44 packetconsume(p, nil, ioc.len);45 tot += ioc.len;46 }47 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);48 packetfree(p);49 return 1;50 }52 static int53 interrupted(void)54 {55 char e[ERRMAX];57 rerrstr(e, sizeof e);58 return strstr(e, "interrupted") != nil;59 }62 static Packet*63 _vtrecv(VtConn *z)64 {65 uchar buf[10], *b;66 int n;67 Packet *p;68 int size, len;70 if(z->state != VtStateConnected) {71 werrstr("session not connected");72 return nil;73 }75 p = z->part;76 /* get enough for head size */77 size = packetsize(p);78 while(size < 2) {79 b = packettrailer(p, 2);80 assert(b != nil);81 if(0) fprint(2, "%d read hdr\n", getpid());82 n = read(z->infd, b, 2);83 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);84 if(n==0 || (n<0 && !interrupted()))85 goto Err;86 size += n;87 packettrim(p, 0, size);88 }90 if(packetconsume(p, buf, 2) < 0)91 goto Err;92 len = (buf[0] << 8) | buf[1];93 size -= 2;95 while(size < len) {96 n = len - size;97 if(n > MaxFragSize)98 n = MaxFragSize;99 b = packettrailer(p, n);100 if(0) fprint(2, "%d read body %d\n", getpid(), n);101 n = read(z->infd, b, n);102 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);103 if(n > 0)104 size += n;105 packettrim(p, 0, size);106 if(n==0 || (n<0 && !interrupted()))107 goto Err;108 }109 ventirecvbytes += len;110 ventirecvpackets++;111 p = packetsplit(p, len);112 vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);113 return p;114 Err:115 vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);116 return nil;117 }119 /*120 * If you fork off two procs running vtrecvproc and vtsendproc,121 * then vtrecv/vtsend (and thus vtrpc) will never block except on122 * rendevouses, which is nice when it's running in one thread of many.123 */124 void125 vtrecvproc(void *v)126 {127 Packet *p;128 VtConn *z;129 Queue *q;131 z = v;132 q = _vtqalloc();134 qlock(&z->lk);135 z->readq = q;136 qlock(&z->inlk);137 rwakeup(&z->rpcfork);138 qunlock(&z->lk);140 while((p = _vtrecv(z)) != nil)141 if(_vtqsend(q, p) < 0){142 packetfree(p);143 break;144 }145 qunlock(&z->inlk);146 qlock(&z->lk);147 _vtqhangup(q);148 while((p = _vtnbqrecv(q)) != nil)149 packetfree(p);150 _vtqdecref(q);151 z->readq = nil;152 rwakeup(&z->rpcfork);153 qunlock(&z->lk);154 vthangup(z);155 }157 void158 vtsendproc(void *v)159 {160 Queue *q;161 Packet *p;162 VtConn *z;164 z = v;165 q = _vtqalloc();167 qlock(&z->lk);168 z->writeq = q;169 qlock(&z->outlk);170 rwakeup(&z->rpcfork);171 qunlock(&z->lk);173 while((p = _vtqrecv(q)) != nil)174 if(_vtsend(z, p) < 0)175 break;176 qunlock(&z->outlk);177 qlock(&z->lk);178 _vtqhangup(q);179 while((p = _vtnbqrecv(q)) != nil)180 packetfree(p);181 _vtqdecref(q);182 z->writeq = nil;183 rwakeup(&z->rpcfork);184 qunlock(&z->lk);185 return;186 }188 Packet*189 vtrecv(VtConn *z)190 {191 Packet *p;192 Queue *q;194 qlock(&z->lk);195 if(z->state != VtStateConnected){196 werrstr("not connected");197 qunlock(&z->lk);198 return nil;199 }200 if(z->readq){201 q = _vtqincref(z->readq);202 qunlock(&z->lk);203 p = _vtqrecv(q);204 _vtqdecref(q);205 return p;206 }208 qlock(&z->inlk);209 qunlock(&z->lk);210 p = _vtrecv(z);211 qunlock(&z->inlk);212 if(!p)213 vthangup(z);214 return p;215 }217 int218 vtsend(VtConn *z, Packet *p)219 {220 Queue *q;222 qlock(&z->lk);223 if(z->state != VtStateConnected){224 packetfree(p);225 werrstr("not connected");226 qunlock(&z->lk);227 return -1;228 }229 if(z->writeq){230 q = _vtqincref(z->writeq);231 qunlock(&z->lk);232 if(_vtqsend(q, p) < 0){233 _vtqdecref(q);234 packetfree(p);235 return -1;236 }237 _vtqdecref(q);238 return 0;239 }241 qlock(&z->outlk);242 qunlock(&z->lk);243 if(_vtsend(z, p) < 0){244 qunlock(&z->outlk);245 vthangup(z);246 return -1;247 }248 qunlock(&z->outlk);249 return 0;250 }