Blob


1 #ifdef PLAN9PORT
2 #include <u.h>
3 #include <signal.h>
4 #endif
5 #include "stdinc.h"
6 #include "dat.h"
7 #include "fns.h"
9 #include "whack.h"
11 int debug;
12 int nofork;
13 int mainstacksize = 256*1024;
14 VtSrv *ventisrv;
16 static void ventiserver(void*);
18 void
19 usage(void)
20 {
21 fprint(2, "usage: venti [-dsw] [-a ventiaddress] [-h httpaddress] [-c config] [-C cachesize] [-I icachesize] [-B blockcachesize]\n");
22 threadexitsall("usage");
23 }
24 void
25 threadmain(int argc, char *argv[])
26 {
27 char *configfile, *haddr, *vaddr, *webroot;
28 u32int mem, icmem, bcmem, minbcmem;
29 Config config;
31 traceinit();
32 threadsetname("main");
33 vaddr = nil;
34 haddr = nil;
35 configfile = nil;
36 webroot = nil;
37 mem = 0;
38 icmem = 0;
39 bcmem = 0;
40 ARGBEGIN{
41 case 'a':
42 vaddr = EARGF(usage());
43 break;
44 case 'B':
45 bcmem = unittoull(EARGF(usage()));
46 break;
47 case 'c':
48 configfile = EARGF(usage());
49 break;
50 case 'C':
51 mem = unittoull(EARGF(usage()));
52 break;
53 case 'D':
54 settrace(EARGF(usage()));
55 break;
56 case 'd':
57 debug = 1;
58 nofork = 1;
59 break;
60 case 'h':
61 haddr = EARGF(usage());
62 break;
63 case 'I':
64 icmem = unittoull(EARGF(usage()));
65 break;
66 case 'L':
67 ventilogging = 1;
68 break;
69 case 's':
70 nofork = 1;
71 break;
72 case 'W':
73 webroot = EARGF(usage());
74 break;
75 default:
76 usage();
77 }ARGEND
79 if(argc)
80 usage();
82 if(!nofork)
83 rfork(RFNOTEG);
85 #ifdef PLAN9PORT
86 {
87 /* sigh - needed to avoid signals when writing to hungup networks */
88 struct sigaction sa;
89 memset(&sa, 0, sizeof sa);
90 sa.sa_handler = SIG_IGN;
91 sigaction(SIGPIPE, &sa, nil);
92 }
93 #endif
95 trace(TraceQuiet, "venti started");
96 fprint(2, "venti: ");
98 ventifmtinstall();
99 if(configfile == nil)
100 configfile = "venti.conf";
102 if(initarenasum() < 0)
103 fprint(2, "warning: can't initialize arena summing process: %r");
105 fprint(2, "conf...");
106 if(initventi(configfile, &config) < 0)
107 sysfatal("can't init server: %r");
109 if(mem == 0)
110 mem = config.mem;
111 if(bcmem == 0)
112 bcmem = config.bcmem;
113 if(icmem == 0)
114 icmem = config.icmem;
115 if(haddr == nil)
116 haddr = config.haddr;
117 if(vaddr == nil)
118 vaddr = config.vaddr;
119 if(vaddr == nil)
120 vaddr = "tcp!*!venti";
121 if(webroot == nil)
122 webroot = config.webroot;
123 if(queuewrites == 0)
124 queuewrites = config.queuewrites;
126 if(haddr){
127 fprint(2, "httpd %s...", haddr);
128 if(httpdinit(haddr, webroot) < 0)
129 fprint(2, "warning: can't start http server: %r");
132 fprint(2, "init...");
134 if(mem == 0xffffffffUL)
135 mem = 1 * 1024 * 1024;
136 if(0) fprint(2, "initialize %d bytes of lump cache for %d lumps\n",
137 mem, mem / (8 * 1024));
138 initlumpcache(mem, mem / (8 * 1024));
140 icmem = u64log2(icmem / (sizeof(IEntry)+sizeof(IEntry*)) / ICacheDepth);
141 if(icmem < 4)
142 icmem = 4;
143 if(0) fprint(2, "initialize %d bytes of index cache for %d index entries\n",
144 (sizeof(IEntry)+sizeof(IEntry*)) * (1 << icmem) * ICacheDepth,
145 (1 << icmem) * ICacheDepth);
146 initicache(icmem, ICacheDepth);
147 initicachewrite();
149 /*
150 * need a block for every arena and every process
151 */
152 minbcmem = maxblocksize *
153 (mainindex->narenas + mainindex->nsects*4 + 16);
154 if(bcmem < minbcmem)
155 bcmem = minbcmem;
157 if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
158 initdcache(bcmem);
160 if(mainindex->bloom)
161 startbloomproc(mainindex->bloom);
163 fprint(2, "sync...");
164 if(syncindex(mainindex, 1, 0, 0) < 0)
165 sysfatal("can't sync server: %r");
167 if(queuewrites){
168 fprint(2, "queue...");
169 if(initlumpqueues(mainindex->nsects) < 0){
170 fprint(2, "can't initialize lump queues,"
171 " disabling write queueing: %r");
172 queuewrites = 0;
176 fprint(2, "announce %s...", vaddr);
177 ventisrv = vtlisten(vaddr);
178 if(ventisrv == nil)
179 sysfatal("can't announce %s: %r", vaddr);
181 fprint(2, "serving.\n");
182 if(nofork)
183 ventiserver(nil);
184 else
185 vtproc(ventiserver, nil);
188 static void
189 vtrerror(VtReq *r, char *error)
191 r->rx.msgtype = VtRerror;
192 r->rx.error = estrdup(error);
195 static void
196 ventiserver(void *v)
198 Packet *p;
199 VtReq *r;
200 char err[ERRMAX];
201 uint ms;
202 int cached, ok;
204 USED(v);
205 threadsetname("ventiserver");
206 trace(TraceWork, "start");
207 while((r = vtgetreq(ventisrv)) != nil){
208 trace(TraceWork, "finish");
209 trace(TraceWork, "start request %F", &r->tx);
210 trace(TraceRpc, "<- %F", &r->tx);
211 r->rx.msgtype = r->tx.msgtype+1;
212 addstat(StatRpcTotal, 1);
213 // print("req (arenas[0]=%p sects[0]=%p) %F\n",
214 // mainindex->arenas[0], mainindex->sects[0], &r->tx);
215 switch(r->tx.msgtype){
216 default:
217 vtrerror(r, "unknown request");
218 break;
219 case VtTread:
220 ms = msec();
221 r->rx.data = readlump(r->tx.score, r->tx.blocktype, r->tx.count, &cached);
222 ms = msec() - ms;
223 addstat2(StatRpcRead, 1, StatRpcReadTime, ms);
224 if(r->rx.data == nil){
225 addstat(StatRpcReadFail, 1);
226 rerrstr(err, sizeof err);
227 vtrerror(r, err);
228 }else{
229 addstat(StatRpcReadBytes, packetsize(r->rx.data));
230 addstat(StatRpcReadOk, 1);
231 if(cached)
232 addstat2(StatRpcReadCached, 1, StatRpcReadCachedTime, ms);
233 else
234 addstat2(StatRpcReadUncached, 1, StatRpcReadUncachedTime, ms);
236 break;
237 case VtTwrite:
238 p = r->tx.data;
239 r->tx.data = nil;
240 addstat(StatRpcWriteBytes, packetsize(p));
241 ms = msec();
242 ok = writelump(p, r->rx.score, r->tx.blocktype, 0, ms);
243 ms = msec() - ms;
244 addstat2(StatRpcWrite, 1, StatRpcWriteTime, ms);
246 if(ok < 0){
247 addstat(StatRpcWriteFail, 1);
248 rerrstr(err, sizeof err);
249 vtrerror(r, err);
251 break;
252 case VtTsync:
253 flushqueue();
254 flushdcache();
255 break;
257 trace(TraceRpc, "-> %F", &r->rx);
258 vtrespond(r);
259 trace(TraceWork, "start");
261 flushdcache();
262 flushicache();
263 threadexitsall(0);