Blob
1 #include <u.h>2 #include <libc.h>3 #include <venti.h>4 #include "queue.h"6 long ventisendbytes, ventisendpackets;7 long ventirecvbytes, ventirecvpackets;9 static int10 _vtsend(VtConn *z, Packet *p)11 {12 IOchunk ioc;13 int n, tot;14 uchar buf[4];16 if(z->state != VtStateConnected) {17 werrstr("session not connected");18 return -1;19 }21 /* add framing */22 n = packetsize(p);23 if(z->version[1] == '2') {24 if(n >= (1<<16)) {25 werrstr("packet too large");26 packetfree(p);27 return -1;28 }29 buf[0] = n>>8;30 buf[1] = n;31 packetprefix(p, buf, 2);32 ventisendbytes += n+2;33 } else {34 buf[0] = n>>24;35 buf[1] = n>>16;36 buf[2] = n>>8;37 buf[3] = n;38 packetprefix(p, buf, 4);39 ventisendbytes += n+4;40 }41 ventisendpackets++;43 tot = 0;44 for(;;){45 n = packetfragments(p, &ioc, 1, 0);46 if(n == 0)47 break;48 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){49 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);50 packetfree(p);51 return -1;52 }53 packetconsume(p, nil, ioc.len);54 tot += ioc.len;55 }56 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);57 packetfree(p);58 return 1;59 }61 static int62 interrupted(void)63 {64 char e[ERRMAX];66 rerrstr(e, sizeof e);67 return strstr(e, "interrupted") != nil;68 }71 static Packet*72 _vtrecv(VtConn *z)73 {74 uchar buf[10], *b;75 int n, need;76 Packet *p;77 int size, len;79 if(z->state != VtStateConnected) {80 werrstr("session not connected");81 return nil;82 }84 p = z->part;85 /* get enough for head size */86 size = packetsize(p);87 need = z->version[1] - '0'; // 2 or 488 while(size < need) {89 b = packettrailer(p, need);90 assert(b != nil);91 if(0) fprint(2, "%d read hdr\n", getpid());92 n = read(z->infd, b, need);93 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);94 if(n==0 || (n<0 && !interrupted()))95 goto Err;96 size += n;97 packettrim(p, 0, size);98 }100 if(packetconsume(p, buf, need) < 0)101 goto Err;102 if(z->version[1] == '2') {103 len = (buf[0] << 8) | buf[1];104 size -= 2;105 } else {106 len = (buf[0]<<24) | (buf[1]<<16) | (buf[2]<<8) | buf[3];107 size -= 4;108 }110 while(size < len) {111 n = len - size;112 if(n > MaxFragSize)113 n = MaxFragSize;114 b = packettrailer(p, n);115 if(0) fprint(2, "%d read body %d\n", getpid(), n);116 n = read(z->infd, b, n);117 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);118 if(n > 0)119 size += n;120 packettrim(p, 0, size);121 if(n==0 || (n<0 && !interrupted()))122 goto Err;123 }124 ventirecvbytes += len;125 ventirecvpackets++;126 p = packetsplit(p, len);127 vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);128 return p;129 Err:130 vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);131 return nil;132 }134 /*135 * If you fork off two procs running vtrecvproc and vtsendproc,136 * then vtrecv/vtsend (and thus vtrpc) will never block except on137 * rendevouses, which is nice when it's running in one thread of many.138 */139 void140 vtrecvproc(void *v)141 {142 Packet *p;143 VtConn *z;144 Queue *q;146 z = v;147 q = _vtqalloc();149 qlock(&z->lk);150 z->readq = q;151 qlock(&z->inlk);152 rwakeup(&z->rpcfork);153 qunlock(&z->lk);155 while((p = _vtrecv(z)) != nil)156 if(_vtqsend(q, p) < 0){157 packetfree(p);158 break;159 }160 qunlock(&z->inlk);161 qlock(&z->lk);162 _vtqhangup(q);163 while((p = _vtnbqrecv(q)) != nil)164 packetfree(p);165 _vtqdecref(q);166 z->readq = nil;167 rwakeup(&z->rpcfork);168 qunlock(&z->lk);169 vthangup(z);170 }172 void173 vtsendproc(void *v)174 {175 Queue *q;176 Packet *p;177 VtConn *z;179 z = v;180 q = _vtqalloc();182 qlock(&z->lk);183 z->writeq = q;184 qlock(&z->outlk);185 rwakeup(&z->rpcfork);186 qunlock(&z->lk);188 while((p = _vtqrecv(q)) != nil)189 if(_vtsend(z, p) < 0)190 break;191 qunlock(&z->outlk);192 qlock(&z->lk);193 _vtqhangup(q);194 while((p = _vtnbqrecv(q)) != nil)195 packetfree(p);196 _vtqdecref(q);197 z->writeq = nil;198 rwakeup(&z->rpcfork);199 qunlock(&z->lk);200 return;201 }203 Packet*204 vtrecv(VtConn *z)205 {206 Packet *p;207 Queue *q;209 qlock(&z->lk);210 if(z->state != VtStateConnected){211 werrstr("not connected");212 qunlock(&z->lk);213 return nil;214 }215 if(z->readq){216 q = _vtqincref(z->readq);217 qunlock(&z->lk);218 p = _vtqrecv(q);219 _vtqdecref(q);220 return p;221 }223 qlock(&z->inlk);224 qunlock(&z->lk);225 p = _vtrecv(z);226 qunlock(&z->inlk);227 if(!p)228 vthangup(z);229 return p;230 }232 int233 vtsend(VtConn *z, Packet *p)234 {235 Queue *q;237 qlock(&z->lk);238 if(z->state != VtStateConnected){239 packetfree(p);240 werrstr("not connected");241 qunlock(&z->lk);242 return -1;243 }244 if(z->writeq){245 q = _vtqincref(z->writeq);246 qunlock(&z->lk);247 if(_vtqsend(q, p) < 0){248 _vtqdecref(q);249 packetfree(p);250 return -1;251 }252 _vtqdecref(q);253 return 0;254 }256 qlock(&z->outlk);257 qunlock(&z->lk);258 if(_vtsend(z, p) < 0){259 qunlock(&z->outlk);260 vthangup(z);261 return -1;262 }263 qunlock(&z->outlk);264 return 0;265 }