Blob


1 /*
2 * Sun RPC client.
3 */
4 #include <u.h>
5 #include <libc.h>
6 #include <thread.h>
7 #include <sunrpc.h>
9 typedef struct Out Out;
10 struct Out
11 {
12 char err[ERRMAX]; /* error string */
13 Channel *creply; /* send to finish rpc */
14 uchar *p; /* pending request packet */
15 int n; /* size of request */
16 ulong tag; /* flush tag of pending request */
17 ulong xid; /* xid of pending request */
18 ulong st; /* first send time */
19 ulong t; /* resend time */
20 int nresend; /* number of resends */
21 SunRpc rpc; /* response rpc */
22 };
24 static void
25 udpThread(void *v)
26 {
27 uchar *p, *buf;
28 Ioproc *io;
29 int n;
30 SunClient *cli;
31 enum { BufSize = 65536 };
33 cli = v;
34 buf = emalloc(BufSize);
35 io = ioproc();
36 p = nil;
37 for(;;){
38 n = ioread(io, cli->fd, buf, BufSize);
39 if(n <= 0)
40 break;
41 p = emalloc(4+n);
42 memmove(p+4, buf, n);
43 p[0] = n>>24;
44 p[1] = n>>16;
45 p[2] = n>>8;
46 p[3] = n;
47 if(sendp(cli->readchan, p) == 0)
48 break;
49 p = nil;
50 }
51 free(p);
52 closeioproc(io);
53 while(send(cli->dying, nil) == -1)
54 ;
55 }
57 static void
58 netThread(void *v)
59 {
60 uchar *p, buf[4];
61 Ioproc *io;
62 uint n, tot;
63 int done;
64 SunClient *cli;
66 cli = v;
67 io = ioproc();
68 tot = 0;
69 p = nil;
70 for(;;){
71 n = ioreadn(io, cli->fd, buf, 4);
72 if(n != 4)
73 break;
74 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
75 if(cli->chatty)
76 fprint(2, "%.8ux...", n);
77 done = n&0x80000000;
78 n &= ~0x80000000;
79 if(tot == 0){
80 p = emalloc(4+n);
81 tot = 4;
82 }else
83 p = erealloc(p, tot+n);
84 if(ioreadn(io, cli->fd, p+tot, n) != n)
85 break;
86 tot += n;
87 if(done){
88 p[0] = tot>>24;
89 p[1] = tot>>16;
90 p[2] = tot>>8;
91 p[3] = tot;
92 if(sendp(cli->readchan, p) == 0)
93 break;
94 p = nil;
95 tot = 0;
96 }
97 }
98 free(p);
99 closeioproc(io);
100 while(send(cli->dying, 0) == -1)
104 static void
105 timerThread(void *v)
107 Ioproc *io;
108 SunClient *cli;
110 cli = v;
111 io = ioproc();
112 for(;;){
113 if(iosleep(io, 200) < 0)
114 break;
115 if(sendul(cli->timerchan, 0) == 0)
116 break;
118 closeioproc(io);
119 while(send(cli->dying, 0) == -1)
123 static ulong
124 msec(void)
126 return nsec()/1000000;
129 static ulong
130 twait(ulong rtt, int nresend)
132 ulong t;
134 t = rtt;
135 if(nresend <= 1)
136 {}
137 else if(nresend <= 3)
138 t *= 2;
139 else if(nresend <= 18)
140 t <<= nresend-2;
141 else
142 t = 60*1000;
143 if(t > 60*1000)
144 t = 60*1000;
146 return t;
149 static void
150 rpcMuxThread(void *v)
152 uchar *buf, *p, *ep;
153 int i, n, nout, mout;
154 ulong t, xidgen, tag;
155 Alt a[5];
156 Out *o, **out;
157 SunRpc rpc;
158 SunClient *cli;
159 SunStatus ok;
161 cli = v;
162 mout = 16;
163 nout = 0;
164 out = emalloc(mout*sizeof(out[0]));
165 xidgen = truerand();
167 a[0].op = CHANRCV;
168 a[0].c = cli->rpcchan;
169 a[0].v = &o;
170 a[1].op = CHANNOP;
171 a[1].c = cli->timerchan;
172 a[1].v = nil;
173 a[2].op = CHANRCV;
174 a[2].c = cli->flushchan;
175 a[2].v = &tag;
176 a[3].op = CHANRCV;
177 a[3].c = cli->readchan;
178 a[3].v = &buf;
179 a[4].op = CHANEND;
181 for(;;){
182 switch(alt(a)){
183 case 0: /* o = <-rpcchan */
184 if(o == nil)
185 goto Done;
186 cli->nsend++;
187 /* set xid */
188 o->xid = ++xidgen;
189 if(cli->needcount)
190 p = o->p+4;
191 else
192 p = o->p;
193 p[0] = xidgen>>24;
194 p[1] = xidgen>>16;
195 p[2] = xidgen>>8;
196 p[3] = xidgen;
197 if(write(cli->fd, o->p, o->n) != o->n){
198 free(o->p);
199 o->p = nil;
200 snprint(o->err, sizeof o->err, "write: %r");
201 sendp(o->creply, 0);
202 break;
204 if(nout >= mout){
205 mout *= 2;
206 out = erealloc(out, mout*sizeof(out[0]));
208 o->st = msec();
209 o->nresend = 0;
210 o->t = o->st + twait(cli->rtt.avg, 0);
211 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
212 out[nout++] = o;
213 a[1].op = CHANRCV;
214 break;
216 case 1: /* <-timerchan */
217 t = msec();
218 for(i=0; i<nout; i++){
219 o = out[i];
220 if((int)(t - o->t) > 0){
221 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
222 if(cli->maxwait && t - o->st >= cli->maxwait){
223 free(o->p);
224 o->p = nil;
225 strcpy(o->err, "timeout");
226 sendp(o->creply, 0);
227 out[i--] = out[--nout];
228 continue;
230 cli->nresend++;
231 o->nresend++;
232 o->t = t + twait(cli->rtt.avg, o->nresend);
233 if(write(cli->fd, o->p, o->n) != o->n){
234 free(o->p);
235 o->p = nil;
236 snprint(o->err, sizeof o->err, "rewrite: %r");
237 sendp(o->creply, 0);
238 out[i--] = out[--nout];
239 continue;
243 /* stop ticking if no work; rpcchan will turn it back on */
244 if(nout == 0)
245 a[1].op = CHANNOP;
246 break;
248 case 2: /* tag = <-flushchan */
249 for(i=0; i<nout; i++){
250 o = out[i];
251 if(o->tag == tag){
252 out[i--] = out[--nout];
253 strcpy(o->err, "flushed");
254 free(o->p);
255 o->p = nil;
256 sendp(o->creply, 0);
259 break;
261 case 3: /* buf = <-readchan */
262 p = buf;
263 n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
264 p += 4;
265 ep = p+n;
266 if((ok = sunrpcunpack(p, ep, &p, &rpc)) != SunSuccess){
267 fprint(2, "%s: in: %.*H unpack failed\n", argv0, n, buf+4);
268 free(buf);
269 break;
271 if(cli->chatty)
272 fprint(2, "in: %B\n", &rpc);
273 if(rpc.iscall){
274 fprint(2, "did not get reply\n");
275 free(buf);
276 break;
278 o = nil;
279 for(i=0; i<nout; i++){
280 o = out[i];
281 if(o->xid == rpc.xid)
282 break;
284 if(i==nout){
285 if(cli->chatty) fprint(2, "did not find waiting request\n");
286 free(buf);
287 break;
289 out[i] = out[--nout];
290 free(o->p);
291 o->p = nil;
292 o->rpc = rpc;
293 if(rpc.status == SunSuccess){
294 o->p = buf;
295 }else{
296 o->p = nil;
297 free(buf);
298 sunerrstr(rpc.status);
299 rerrstr(o->err, sizeof o->err);
301 sendp(o->creply, 0);
302 break;
305 Done:
306 free(out);
307 sendp(cli->dying, 0);
310 SunClient*
311 sundial(char *address)
313 int fd;
314 SunClient *cli;
316 if((fd = dial(address, 0, 0, 0)) < 0)
317 return nil;
319 cli = emalloc(sizeof(SunClient));
320 cli->fd = fd;
321 cli->maxwait = 15000;
322 cli->rtt.avg = 1000;
323 cli->dying = chancreate(sizeof(void*), 0);
324 cli->rpcchan = chancreate(sizeof(Out*), 0);
325 cli->timerchan = chancreate(sizeof(ulong), 0);
326 cli->flushchan = chancreate(sizeof(ulong), 0);
327 cli->readchan = chancreate(sizeof(uchar*), 0);
328 if(strstr(address, "udp!")){
329 cli->needcount = 0;
330 cli->nettid = threadcreate(udpThread, cli, SunStackSize);
331 cli->timertid = threadcreate(timerThread, cli, SunStackSize);
332 }else{
333 cli->needcount = 1;
334 cli->nettid = threadcreate(netThread, cli, SunStackSize);
335 /* assume reliable: don't need timer */
336 /* BUG: netThread should know how to redial */
338 threadcreate(rpcMuxThread, cli, SunStackSize);
340 return cli;
343 void
344 sunclientclose(SunClient *cli)
346 int n;
348 /*
349 * Threadints get you out of any stuck system calls
350 * or thread rendezvouses, but do nothing if the thread
351 * is in the ready state. Keep interrupting until it takes.
352 */
353 n = 0;
354 if(!cli->timertid)
355 n++;
356 while(n < 2){
357 /*
358 threadint(cli->nettid);
359 if(cli->timertid)
360 threadint(cli->timertid);
361 */
363 yield();
364 while(nbrecv(cli->dying, nil) == 1)
365 n++;
368 sendp(cli->rpcchan, 0);
369 recvp(cli->dying);
371 /* everyone's gone: clean up */
372 close(cli->fd);
373 chanfree(cli->flushchan);
374 chanfree(cli->readchan);
375 chanfree(cli->timerchan);
376 free(cli);
379 void
380 sunclientflushrpc(SunClient *cli, ulong tag)
382 sendul(cli->flushchan, tag);
385 void
386 sunclientprog(SunClient *cli, SunProg *p)
388 if(cli->nprog%16 == 0)
389 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
390 cli->prog[cli->nprog++] = p;
393 int
394 sunclientrpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
396 uchar *bp, *p, *ep;
397 int i, n1, n2, n, nn;
398 Out o;
399 SunProg *prog;
400 SunStatus ok;
402 for(i=0; i<cli->nprog; i++)
403 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
404 break;
405 if(i==cli->nprog){
406 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
407 return -1;
409 prog = cli->prog[i];
411 if(cli->chatty){
412 fprint(2, "out: %B\n", &tx->rpc);
413 fprint(2, "\t%C\n", tx);
416 n1 = sunrpcsize(&tx->rpc);
417 n2 = suncallsize(prog, tx);
419 n = n1+n2;
420 if(cli->needcount)
421 n += 4;
423 /*
424 * The dance with 100 is to leave some padding in case
425 * suncallsize is slightly underestimating. If this happens,
426 * the pack will succeed and then we can give a good size
427 * mismatch error below. Otherwise the pack fails with
428 * garbage args, which is less helpful.
429 */
430 bp = emalloc(n+100);
431 ep = bp+n+100;
432 p = bp;
433 if(cli->needcount){
434 nn = n-4;
435 p[0] = (nn>>24)|0x80;
436 p[1] = nn>>16;
437 p[2] = nn>>8;
438 p[3] = nn;
439 p += 4;
441 if((ok = sunrpcpack(p, ep, &p, &tx->rpc)) != SunSuccess
442 || (ok = suncallpack(prog, p, ep, &p, tx)) != SunSuccess){
443 sunerrstr(ok);
444 free(bp);
445 return -1;
447 ep -= 100;
448 if(p != ep){
449 werrstr("rpc: packet size mismatch %d %ld %ld", n, ep-bp, p-bp);
450 free(bp);
451 return -1;
454 memset(&o, 0, sizeof o);
455 o.creply = chancreate(sizeof(void*), 0);
456 o.tag = tag;
457 o.p = bp;
458 o.n = n;
460 sendp(cli->rpcchan, &o);
461 recvp(o.creply);
462 chanfree(o.creply);
464 if(o.p == nil){
465 werrstr("%s", o.err);
466 return -1;
469 p = o.rpc.data;
470 ep = p+o.rpc.ndata;
471 rx->rpc = o.rpc;
472 rx->rpc.proc = tx->rpc.proc;
473 rx->rpc.prog = tx->rpc.prog;
474 rx->rpc.vers = tx->rpc.vers;
475 rx->type = (rx->rpc.proc<<1)|1;
476 if(rx->rpc.status != SunSuccess){
477 sunerrstr(rx->rpc.status);
478 werrstr("unpack: %r");
479 free(o.p);
480 return -1;
483 if((ok = suncallunpack(prog, p, ep, &p, rx)) != SunSuccess){
484 sunerrstr(ok);
485 werrstr("unpack: %r");
486 free(o.p);
487 return -1;
490 if(cli->chatty){
491 fprint(2, "in: %B\n", &rx->rpc);
492 fprint(2, "in:\t%C\n", rx);
495 if(tofree)
496 *tofree = o.p;
497 else
498 free(o.p);
500 return 0;