Blob
1 #include <u.h>2 #include <libc.h>3 #include <venti.h>4 #include "queue.h"6 static int7 _vtsend(VtConn *z, Packet *p)8 {9 IOchunk ioc;10 int n;11 uchar buf[2];13 if(z->state != VtStateConnected) {14 werrstr("session not connected");15 return -1;16 }18 /* add framing */19 n = packetsize(p);20 if(n >= (1<<16)) {21 werrstr("packet too large");22 packetfree(p);23 return -1;24 }25 buf[0] = n>>8;26 buf[1] = n;27 packetprefix(p, buf, 2);29 for(;;){30 n = packetfragments(p, &ioc, 1, 0);31 if(n == 0)32 break;33 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){34 packetfree(p);35 return 0;36 }37 packetconsume(p, nil, ioc.len);38 }39 packetfree(p);40 return 1;41 }43 static Packet*44 _vtrecv(VtConn *z)45 {46 uchar buf[10], *b;47 int n;48 Packet *p;49 int size, len;51 if(z->state != VtStateConnected) {52 werrstr("session not connected");53 return nil;54 }56 p = z->part;57 /* get enough for head size */58 size = packetsize(p);59 while(size < 2) {60 b = packettrailer(p, MaxFragSize);61 assert(b != nil);62 n = read(z->infd, b, MaxFragSize);63 if(n <= 0)64 goto Err;65 size += n;66 packettrim(p, 0, size);67 }69 if(packetconsume(p, buf, 2) < 0)70 goto Err;71 len = (buf[0] << 8) | buf[1];72 size -= 2;74 while(size < len) {75 n = len - size;76 if(n > MaxFragSize)77 n = MaxFragSize;78 b = packettrailer(p, n);79 if(readn(z->infd, b, n) != n)80 goto Err;81 size += n;82 }83 p = packetsplit(p, len);84 return p;85 Err:86 return nil;87 }89 /*90 * If you fork off two procs running vtrecvproc and vtsendproc,91 * then vtrecv/vtsend (and thus vtrpc) will never block except on92 * rendevouses, which is nice when it's running in one thread of many.93 */94 void95 vtrecvproc(void *v)96 {97 Packet *p;98 VtConn *z;99 Queue *q;101 z = v;102 q = _vtqalloc();104 qlock(&z->lk);105 z->readq = q;106 qlock(&z->inlk);107 rwakeup(&z->rpcfork);108 qunlock(&z->lk);110 while((p = _vtrecv(z)) != nil)111 if(_vtqsend(q, p) < 0){112 packetfree(p);113 break;114 }115 qunlock(&z->inlk);116 qlock(&z->lk);117 _vtqhangup(q);118 while((p = _vtnbqrecv(q)) != nil)119 packetfree(p);120 vtfree(q);121 z->readq = nil;122 rwakeup(&z->rpcfork);123 qunlock(&z->lk);124 vthangup(z);125 }127 void128 vtsendproc(void *v)129 {130 Queue *q;131 Packet *p;132 VtConn *z;134 z = v;135 q = _vtqalloc();137 qlock(&z->lk);138 z->writeq = q;139 qlock(&z->outlk);140 rwakeup(&z->rpcfork);141 qunlock(&z->lk);143 while((p = _vtqrecv(q)) != nil)144 if(_vtsend(z, p) < 0)145 break;146 qunlock(&z->outlk);147 qlock(&z->lk);148 _vtqhangup(q);149 while((p = _vtnbqrecv(q)) != nil)150 packetfree(p);151 vtfree(q);152 z->writeq = nil;153 rwakeup(&z->rpcfork);154 qunlock(&z->lk);155 return;156 }158 Packet*159 vtrecv(VtConn *z)160 {161 Packet *p;163 qlock(&z->lk);164 if(z->state != VtStateConnected){165 werrstr("not connected");166 qunlock(&z->lk);167 return nil;168 }169 if(z->readq){170 qunlock(&z->lk);171 return _vtqrecv(z->readq);172 }174 qlock(&z->inlk);175 qunlock(&z->lk);176 p = _vtrecv(z);177 qunlock(&z->inlk);178 if(!p)179 vthangup(z);180 return p;181 }183 int184 vtsend(VtConn *z, Packet *p)185 {186 qlock(&z->lk);187 if(z->state != VtStateConnected){188 packetfree(p);189 werrstr("not connected");190 qunlock(&z->lk);191 return -1;192 }193 if(z->writeq){194 qunlock(&z->lk);195 if(_vtqsend(z->writeq, p) < 0){196 packetfree(p);197 return -1;198 }199 return 0;200 }202 qlock(&z->outlk);203 qunlock(&z->lk);204 if(_vtsend(z, p) < 0){205 qunlock(&z->outlk);206 vthangup(z);207 return -1;208 }209 qunlock(&z->outlk);210 return 0;211 }