Blob


1 #include "stdinc.h"
2 #include "dat.h"
3 #include "fns.h"
5 typedef struct LumpQueue LumpQueue;
6 typedef struct WLump WLump;
8 enum
9 {
10 MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */
11 };
13 struct WLump
14 {
15 Lump *u;
16 Packet *p;
17 int creator;
18 int gen;
19 uint ms;
20 };
22 struct LumpQueue
23 {
24 QLock lock;
25 Rendez flush;
26 Rendez full;
27 Rendez empty;
28 WLump q[MaxLumpQ];
29 int w;
30 int r;
31 };
33 static LumpQueue *lumpqs;
34 static int nqs;
36 static QLock glk;
37 static int gen;
39 static void queueproc(void *vq);
41 int
42 initlumpqueues(int nq)
43 {
44 LumpQueue *q;
46 int i;
47 nqs = nq;
49 lumpqs = MKNZ(LumpQueue, nq);
51 for(i = 0; i < nq; i++){
52 q = &lumpqs[i];
53 q->full.l = &q->lock;
54 q->empty.l = &q->lock;
55 q->flush.l = &q->lock;
57 if(vtproc(queueproc, q) < 0){
58 seterr(EOk, "can't start write queue slave: %r");
59 return -1;
60 }
61 if(vtproc(queueproc, q) < 0){
62 seterr(EOk, "can't start write queue slave: %r");
63 return -1;
64 }
65 if(vtproc(queueproc, q) < 0){
66 seterr(EOk, "can't start write queue slave: %r");
67 return -1;
68 }
69 if(vtproc(queueproc, q) < 0){
70 seterr(EOk, "can't start write queue slave: %r");
71 return -1;
72 }
73 if(vtproc(queueproc, q) < 0){
74 seterr(EOk, "can't start write queue slave: %r");
75 return -1;
76 }
77 }
79 return 0;
80 }
82 /*
83 * queue a lump & it's packet data for writing
84 */
85 int
86 queuewrite(Lump *u, Packet *p, int creator, uint ms)
87 {
88 LumpQueue *q;
89 int i;
91 trace(TraceProc, "queuewrite");
92 i = indexsect(mainindex, u->score);
93 if(i < 0 || i >= nqs){
94 seterr(EBug, "internal error: illegal index section in queuewrite");
95 return -1;
96 }
98 q = &lumpqs[i];
100 qlock(&q->lock);
101 while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
102 trace(TraceProc, "queuewrite sleep");
103 rsleep(&q->full);
106 q->q[q->w].u = u;
107 q->q[q->w].p = p;
108 q->q[q->w].creator = creator;
109 q->q[q->w].ms = ms;
110 q->q[q->w].gen = gen;
111 q->w = (q->w + 1) & (MaxLumpQ - 1);
113 trace(TraceProc, "queuewrite wakeup");
114 rwakeup(&q->empty);
116 qunlock(&q->lock);
118 return 0;
121 void
122 flushqueue(void)
124 int i;
125 LumpQueue *q;
127 if(!lumpqs)
128 return;
130 trace(TraceProc, "flushqueue");
132 qlock(&glk);
133 gen++;
134 qunlock(&glk);
136 for(i=0; i<mainindex->nsects; i++){
137 q = &lumpqs[i];
138 qlock(&q->lock);
139 while(q->w != q->r && gen - q->q[q->r].gen > 0){
140 trace(TraceProc, "flushqueue sleep q%d", i);
141 rsleep(&q->flush);
143 qunlock(&q->lock);
147 static void
148 queueproc(void *vq)
150 LumpQueue *q;
151 Lump *u;
152 Packet *p;
153 int creator;
154 uint ms;
156 threadsetname("queueproc");
158 q = vq;
159 for(;;){
160 qlock(&q->lock);
161 while(q->w == q->r){
162 trace(TraceProc, "queueproc sleep empty");
163 rsleep(&q->empty);
166 u = q->q[q->r].u;
167 p = q->q[q->r].p;
168 creator = q->q[q->r].creator;
169 ms = q->q[q->r].ms;
171 q->r = (q->r + 1) & (MaxLumpQ - 1);
172 trace(TraceProc, "queueproc wakeup flush");
173 rwakeupall(&q->flush);
175 trace(TraceProc, "queueproc wakeup full");
176 rwakeup(&q->full);
178 qunlock(&q->lock);
180 trace(TraceProc, "queueproc writelump %V", u->score);
181 if(writeqlump(u, p, creator, ms) < 0)
182 fprint(2, "failed to write lump for %V: %r", u->score);
183 trace(TraceProc, "queueproc wrotelump %V", u->score);
185 putlump(u);