commit - 1f61c0914d5f42054b075b6dc5296894de2d25ab
commit + 841d71b5c6be4851572a60c5a9f9dd239ea69e56
blob - 04e9806137c5fed5d33aaebaaa0f96ecd79f8aaf
blob + f7b76bf9735ef7a2a66bf574ad524af540c73a38
--- src/cmd/9pserve.c
+++ src/cmd/9pserve.c
{
Conn *c;
int internal;
+ int sync;
int ref;
int ctag;
int tag;
Hash *fid[NHASH];
Queue *outq;
Queue *inq;
+ Channel *outqdead;
int dotu;
};
c->internal = chancreate(sizeof(void*), 0);
c->inq = qalloc();
c->outq = qalloc();
+ c->outqdead = chancreate(sizeof(void*), 0);
if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
threadcreate(connthread, c, STACK);
}
int i, fd;
Conn *c;
Hash *h, *hnext;
- Msg *m, *om, *mm;
+ Msg *m, *om, *mm, sync;
Fid *f;
Ioproc *io;
}
if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
-
- /* flush the output queue */
- sendq(c->outq, nil);
- while(c->outq != nil)
- yield();
/* flush all outstanding messages */
for(i=0; i<NHASH; i++){
- for(h=c->tag[i]; h; h=hnext){
+ while((h = c->tag[i]) != nil){
om = h->v;
+ msgincref(om); /* for us */
m = msgnew(0);
m->internal = 1;
m->c = c;
assert(mm == m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
- msgput(om); /* got from hash table */
- hnext = h->next;
- free(h);
+ if(delhash(c->tag, om->tag, om) == 0)
+ msgput(om); /* got from hash table */
+ msgput(om); /* got from msgincref */
}
}
+
+ /*
+ * outputthread has written all its messages
+ * to the remote connection (because we've gotten all the replies!),
+ * but it might not have gotten a chance to msgput
+ * the very last one. sync up to make sure.
+ */
+ memset(&sync, 0, sizeof sync);
+ sync.sync = 1;
+ sync.c = c;
+ sendq(outq, &sync);
+ recvp(c->outqdead);
+
+ /* should be no messages left anywhere. */
+ assert(c->nmsg == 0);
+ /* everything is quiet; can close the local output queue. */
+ sendq(c->outq, nil);
+ recvp(c->outqdead);
+
/* clunk all outstanding fids */
for(i=0; i<NHASH; i++){
for(h=c->fid[i]; h; h=hnext){
char *ename;
int err;
Conn *c;
- Queue *outq;
Msg *m, *om;
Ioproc *io;
c = arg;
- outq = c->outq;
io = ioproc();
threadsetname("connout %s", c->dir);
- while((m = recvq(outq)) != nil){
+ while((m = recvq(c->outq)) != nil){
err = m->tx.type+1 != m->rx.type;
if(!err && m->isopenfd)
if(xopenfd(m) < 0)
nbsendp(c->inc, 0);
}
closeioproc(io);
- free(outq);
+ free(c->outq);
c->outq = nil;
+ sendp(c->outqdead, nil);
}
void
io = ioproc();
threadsetname("output");
while((m = recvq(outq)) != nil){
+ if(m->sync){
+ sendp(m->c->outqdead, nil);
+ continue;
+ }
if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
rewritehdr(&m->tx, m->tpkt);
if(mwrite9p(io, 1, m->tpkt) < 0)
struct Queue
{
- int hungup;
QLock lk;
Rendez r;
Qel *head;
e = emalloc(sizeof(Qel));
qlock(&q->lk);
- if(q->hungup){
- free(e);
- werrstr("hungup queue");
- qunlock(&q->lk);
- return -1;
- }
e->p = p;
e->next = nil;
if(q->head == nil)
Qel *e;
qlock(&q->lk);
- while(q->head == nil && !q->hungup)
+ while(q->head == nil)
rsleep(&q->r);
- if(q->hungup){
- qunlock(&q->lk);
- return nil;
- }
e = q->head;
q->head = e->next;
qunlock(&q->lk);