Blob


1 #include "threadimpl.h"
3 int _threaddebuglevel = 0;
5 static uint threadnproc;
6 static uint threadnsysproc;
7 static Lock threadnproclock;
8 static Ref threadidref;
9 static Proc *threadmainproc;
11 static void addproc(Proc*);
12 static void delproc(Proc*);
13 static void addthread(_Threadlist*, _Thread*);
14 static void delthread(_Threadlist*, _Thread*);
15 static int onlist(_Threadlist*, _Thread*);
16 static void addthreadinproc(Proc*, _Thread*);
17 static void delthreadinproc(Proc*, _Thread*);
18 static void procmain(Proc*);
19 static int threadinfo(void*, char*);
20 static void pthreadscheduler(Proc *p);
21 static void pthreadsleepschedlocked(Proc *p, _Thread *t);
22 static void pthreadwakeupschedlocked(Proc *p, _Thread *self, _Thread *t);
23 static _Thread* procnext(Proc*, _Thread*);
25 static void
26 _threaddebug(_Thread *t, char *fmt, ...)
27 {
28 va_list arg;
29 char buf[128];
30 char *p;
31 static int fd = -1;
33 if(_threaddebuglevel == 0)
34 return;
36 if(fd < 0){
37 p = strrchr(argv0, '/');
38 if(p)
39 p++;
40 else
41 p = argv0;
42 snprint(buf, sizeof buf, "/tmp/%s.tlog", p);
43 if((fd = create(buf, OWRITE, 0666)) < 0)
44 fd = open("/dev/null", OWRITE);
45 if(fd >= 0 && fd != 2){
46 dup(fd, 2);
47 close(fd);
48 fd = 2;
49 }
50 }
52 va_start(arg, fmt);
53 vsnprint(buf, sizeof buf, fmt, arg);
54 va_end(arg);
55 if(t == nil)
56 t = proc()->thread;
57 if(t)
58 fprint(fd, "%p %d.%d: %s\n", proc(), getpid(), t->id, buf);
59 else
60 fprint(fd, "%p %d._: %s\n", proc(), getpid(), buf);
61 }
63 static _Thread*
64 getthreadnow(void)
65 {
66 return proc()->thread;
67 }
68 _Thread *(*threadnow)(void) = getthreadnow;
70 static Proc*
71 procalloc(void)
72 {
73 Proc *p;
75 p = malloc(sizeof *p);
76 if(p == nil)
77 sysfatal("procalloc malloc: %r");
78 memset(p, 0, sizeof *p);
79 addproc(p);
80 lock(&threadnproclock);
81 threadnproc++;
82 unlock(&threadnproclock);
83 return p;
84 }
86 _Thread*
87 _threadcreate(Proc *p, void (*fn)(void*), void *arg, uint stack)
88 {
89 _Thread *t;
91 USED(stack);
92 t = malloc(sizeof *t);
93 if(t == nil)
94 sysfatal("threadcreate malloc: %r");
95 memset(t, 0, sizeof *t);
96 t->id = incref(&threadidref);
97 t->startfn = fn;
98 t->startarg = arg;
99 t->proc = p;
100 if(p->nthread != 0)
101 _threadpthreadstart(p, t);
102 else
103 t->mainthread = 1;
104 p->nthread++;
105 addthreadinproc(p, t);
106 _threadready(t);
107 return t;
110 int
111 threadcreate(void (*fn)(void*), void *arg, uint stack)
113 _Thread *t;
115 t = _threadcreate(proc(), fn, arg, stack);
116 _threaddebug(nil, "threadcreate %d", t->id);
117 return t->id;
120 int
121 proccreate(void (*fn)(void*), void *arg, uint stack)
123 int id;
124 _Thread *t;
125 Proc *p;
127 p = procalloc();
128 t = _threadcreate(p, fn, arg, stack);
129 id = t->id; /* t might be freed after _procstart */
130 _threaddebug(t, "proccreate %p", p);
131 _procstart(p, procmain);
132 return id;
135 void
136 _threadswitch(void)
138 Proc *p;
140 needstack(0);
141 p = proc();
142 /*print("threadswtch %p\n", p); */
143 pthreadscheduler(p);
146 void
147 _threadready(_Thread *t)
149 Proc *p;
151 p = t->proc;
152 lock(&p->lock);
153 p->runrend.l = &p->lock;
154 addthread(&p->runqueue, t);
155 /*print("%d wake for job %d->%d\n", time(0), getpid(), p->osprocid); */
156 if(p != proc())
157 _procwakeupandunlock(&p->runrend);
158 else
159 unlock(&p->lock);
162 int
163 threadidle(void)
165 int n;
166 Proc *p;
168 p = proc();
169 n = p->nswitch;
170 lock(&p->lock);
171 p->runrend.l = &p->lock;
172 addthread(&p->idlequeue, p->thread);
173 unlock(&p->lock);
174 _threadswitch();
175 return p->nswitch - n;
178 int
179 threadyield(void)
181 int n;
182 Proc *p;
184 p = proc();
185 n = p->nswitch;
186 _threadready(p->thread);
187 _threadswitch();
188 return p->nswitch - n;
191 void
192 threadexits(char *msg)
194 Proc *p;
196 p = proc();
197 if(msg == nil)
198 msg = "";
199 utfecpy(p->msg, p->msg+sizeof p->msg, msg);
200 proc()->thread->exiting = 1;
201 _threadswitch();
204 void
205 threadpin(void)
207 Proc *p;
209 p = proc();
210 if(p->pinthread){
211 fprint(2, "already pinning a thread - %p %p\n", p->pinthread, p->thread);
212 assert(0);
214 p->pinthread = p->thread;
217 void
218 threadunpin(void)
220 Proc *p;
222 p = proc();
223 if(p->pinthread != p->thread){
224 fprint(2, "wrong pinthread - %p %p\n", p->pinthread, p->thread);
225 assert(0);
227 p->pinthread = nil;
230 void
231 threadsysfatal(char *fmt, va_list arg)
233 char buf[256];
235 vseprint(buf, buf+sizeof(buf), fmt, arg);
236 __fixargv0();
237 fprint(2, "%s: %s\n", argv0 ? argv0 : "<prog>", buf);
238 threadexitsall(buf);
241 static void
242 procmain(Proc *p)
244 _Thread *t;
246 _threadsetproc(p);
248 /* take out first thread to run on system stack */
249 t = p->runqueue.head;
250 delthread(&p->runqueue, t);
252 /* run it */
253 p->thread = t;
254 t->startfn(t->startarg);
255 if(p->nthread != 0)
256 threadexits(nil);
259 void
260 _threadpthreadmain(Proc *p, _Thread *t)
262 _threadsetproc(p);
263 lock(&p->lock);
264 pthreadsleepschedlocked(p, t);
265 unlock(&p->lock);
266 _threaddebug(nil, "startfn");
267 t->startfn(t->startarg);
268 threadexits(nil);
271 static void
272 pthreadsleepschedlocked(Proc *p, _Thread *t)
274 _threaddebug(t, "pthreadsleepsched %p %d", p, t->id);;
275 t->schedrend.l = &p->lock;
276 while(p->schedthread != t)
277 _procsleep(&t->schedrend);
280 static void
281 pthreadwakeupschedlocked(Proc *p, _Thread *self, _Thread *t)
283 _threaddebug(self, "pthreadwakeupschedlocked %p %d", p, t->id);;
284 t->schedrend.l = &p->lock;
285 p->schedthread = t;
286 _procwakeup(&t->schedrend);
289 static void
290 pthreadscheduler(Proc *p)
292 _Thread *self, *t;
294 _threaddebug(nil, "scheduler");
295 lock(&p->lock);
296 self = p->thread;
297 p->thread = nil;
298 _threaddebug(self, "pausing");
300 if(self->exiting) {
301 _threaddebug(self, "exiting");
302 delthreadinproc(p, self);
303 p->nthread--;
306 t = procnext(p, self);
307 if(t != nil) {
308 pthreadwakeupschedlocked(p, self, t);
309 if(!self->exiting) {
310 pthreadsleepschedlocked(p, self);
311 _threaddebug(nil, "resume %d", self->id);
312 unlock(&p->lock);
313 return;
317 if(t == nil) {
318 /* Tear down proc bookkeeping. Wait to free p. */
319 delproc(p);
320 lock(&threadnproclock);
321 if(p->sysproc)
322 --threadnsysproc;
323 if(--threadnproc == threadnsysproc)
324 threadexitsall(p->msg);
325 unlock(&threadnproclock);
328 /* Tear down pthread. */
329 if(self->mainthread && p->mainproc) {
330 _threaddaemonize();
331 _threaddebug(self, "sleeper");
332 unlock(&p->lock);
333 /*
334 * Avoid bugs with main pthread exiting.
335 * When all procs are gone, threadexitsall above will happen.
336 */
337 for(;;)
338 sleep(60*60*1000);
340 _threadsetproc(nil);
341 free(self);
342 unlock(&p->lock);
343 if(t == nil)
344 free(p);
345 _threadpexit();
348 static _Thread*
349 procnext(Proc *p, _Thread *self)
351 _Thread *t;
353 if((t = p->pinthread) != nil){
354 while(!onlist(&p->runqueue, t)){
355 p->runrend.l = &p->lock;
356 _threaddebug(self, "scheduler sleep (pin)");
357 _procsleep(&p->runrend);
358 _threaddebug(self, "scheduler wake (pin)");
360 } else
361 while((t = p->runqueue.head) == nil){
362 if(p->nthread == 0)
363 return nil;
364 if((t = p->idlequeue.head) != nil){
365 /*
366 * Run all the idling threads once.
367 */
368 while((t = p->idlequeue.head) != nil){
369 delthread(&p->idlequeue, t);
370 addthread(&p->runqueue, t);
372 continue;
374 p->runrend.l = &p->lock;
375 _threaddebug(self, "scheduler sleep");
376 _procsleep(&p->runrend);
377 _threaddebug(self, "scheduler wake");
380 if(p->pinthread && p->pinthread != t)
381 fprint(2, "p->pinthread %p t %p\n", p->pinthread, t);
382 assert(p->pinthread == nil || p->pinthread == t);
383 delthread(&p->runqueue, t);
385 p->thread = t;
386 p->nswitch++;
387 return t;
390 void
391 _threadsetsysproc(void)
393 lock(&threadnproclock);
394 if(++threadnsysproc == threadnproc)
395 threadexitsall(nil);
396 unlock(&threadnproclock);
397 proc()->sysproc = 1;
400 void**
401 procdata(void)
403 return &proc()->udata;
406 void**
407 threaddata(void)
409 return &proc()->thread->udata;
412 extern Jmp *(*_notejmpbuf)(void);
413 static Jmp*
414 threadnotejmp(void)
416 return &proc()->sigjmp;
419 /*
420 * debugging
421 */
422 void
423 threadsetname(char *fmt, ...)
425 va_list arg;
426 _Thread *t;
428 t = proc()->thread;
429 va_start(arg, fmt);
430 vsnprint(t->name, sizeof t->name, fmt, arg);
431 va_end(arg);
434 char*
435 threadgetname(void)
437 return proc()->thread->name;
440 void
441 threadsetstate(char *fmt, ...)
443 va_list arg;
444 _Thread *t;
446 t = proc()->thread;
447 va_start(arg, fmt);
448 vsnprint(t->state, sizeof t->name, fmt, arg);
449 va_end(arg);
452 int
453 threadid(void)
455 _Thread *t;
457 t = proc()->thread;
458 return t->id;
461 void
462 needstack(int n)
464 _Thread *t;
466 t = proc()->thread;
467 if(t->stk == nil)
468 return;
470 if((char*)&t <= (char*)t->stk
471 || (char*)&t - (char*)t->stk < 256+n){
472 fprint(2, "thread stack overflow: &t=%p tstk=%p n=%d\n", &t, t->stk, 256+n);
473 abort();
477 static int
478 singlethreaded(void)
480 return threadnproc == 1 && _threadprocs->nthread == 1;
483 /*
484 * locking
485 */
486 static int
487 threadqlock(QLock *l, int block, ulong pc)
489 /*print("threadqlock %p\n", l); */
490 lock(&l->l);
491 if(l->owner == nil){
492 l->owner = (*threadnow)();
493 /*print("qlock %p @%#x by %p\n", l, pc, l->owner); */
494 if(l->owner == nil) {
495 fprint(2, "%s: qlock uncontended owner=nil oops\n", argv0);
496 abort();
498 unlock(&l->l);
499 return 1;
501 if(!block){
502 unlock(&l->l);
503 return 0;
506 if(singlethreaded()){
507 fprint(2, "qlock deadlock\n");
508 abort();
511 /*print("qsleep %p @%#x by %p\n", l, pc, (*threadnow)()); */
512 addthread(&l->waiting, (*threadnow)());
513 unlock(&l->l);
515 _threadswitch();
517 if(l->owner != (*threadnow)()){
518 fprint(2, "%s: qlock pc=0x%lux owner=%p self=%p oops\n",
519 argv0, pc, l->owner, (*threadnow)());
520 abort();
522 if(l->owner == nil) {
523 fprint(2, "%s: qlock threadswitch owner=nil oops\n", argv0);
524 abort();
527 /*print("qlock wakeup %p @%#x by %p\n", l, pc, (*threadnow)()); */
528 return 1;
531 static void
532 threadqunlock(QLock *l, ulong pc)
534 _Thread *ready;
536 lock(&l->l);
537 /*print("qlock unlock %p @%#x by %p (owner %p)\n", l, pc, (*threadnow)(), l->owner); */
538 if(l->owner == 0){
539 fprint(2, "%s: qunlock pc=0x%lux owner=%p self=%p oops\n",
540 argv0, pc, l->owner, (*threadnow)());
541 abort();
543 if((l->owner = ready = l->waiting.head) != nil)
544 delthread(&l->waiting, l->owner);
545 /*
546 * N.B. Cannot call _threadready() before unlocking l->l,
547 * because the thread we are readying might:
548 * - be in another proc
549 * - start running immediately
550 * - and free l before we get a chance to run again
551 */
552 unlock(&l->l);
553 if(ready)
554 _threadready(l->owner);
557 static int
558 threadrlock(RWLock *l, int block, ulong pc)
560 USED(pc);
562 lock(&l->l);
563 if(l->writer == nil && l->wwaiting.head == nil){
564 l->readers++;
565 unlock(&l->l);
566 return 1;
568 if(!block){
569 unlock(&l->l);
570 return 0;
572 if(singlethreaded()){
573 fprint(2, "rlock deadlock\n");
574 abort();
576 addthread(&l->rwaiting, (*threadnow)());
577 unlock(&l->l);
578 _threadswitch();
579 return 1;
582 static int
583 threadwlock(RWLock *l, int block, ulong pc)
585 USED(pc);
587 lock(&l->l);
588 if(l->writer == nil && l->readers == 0){
589 l->writer = (*threadnow)();
590 unlock(&l->l);
591 return 1;
593 if(!block){
594 unlock(&l->l);
595 return 0;
597 if(singlethreaded()){
598 fprint(2, "wlock deadlock\n");
599 abort();
601 addthread(&l->wwaiting, (*threadnow)());
602 unlock(&l->l);
603 _threadswitch();
604 return 1;
607 static void
608 threadrunlock(RWLock *l, ulong pc)
610 _Thread *t;
612 USED(pc);
613 t = nil;
614 lock(&l->l);
615 --l->readers;
616 if(l->readers == 0 && (t = l->wwaiting.head) != nil){
617 delthread(&l->wwaiting, t);
618 l->writer = t;
620 unlock(&l->l);
621 if(t)
622 _threadready(t);
626 static void
627 threadwunlock(RWLock *l, ulong pc)
629 _Thread *t;
631 USED(pc);
632 lock(&l->l);
633 l->writer = nil;
634 assert(l->readers == 0);
635 while((t = l->rwaiting.head) != nil){
636 delthread(&l->rwaiting, t);
637 l->readers++;
638 _threadready(t);
640 t = nil;
641 if(l->readers == 0 && (t = l->wwaiting.head) != nil){
642 delthread(&l->wwaiting, t);
643 l->writer = t;
645 unlock(&l->l);
646 if(t)
647 _threadready(t);
650 /*
651 * sleep and wakeup
652 */
653 static void
654 threadrsleep(Rendez *r, ulong pc)
656 if(singlethreaded()){
657 fprint(2, "rsleep deadlock\n");
658 abort();
660 addthread(&r->waiting, proc()->thread);
661 qunlock(r->l);
662 _threadswitch();
663 qlock(r->l);
666 static int
667 threadrwakeup(Rendez *r, int all, ulong pc)
669 int i;
670 _Thread *t;
672 _threaddebug(nil, "rwakeup %p %d", r, all);
673 for(i=0;; i++){
674 if(i==1 && !all)
675 break;
676 if((t = r->waiting.head) == nil)
677 break;
678 _threaddebug(nil, "rwakeup %p %d -> wake %d", r, all, t->id);
679 delthread(&r->waiting, t);
680 _threadready(t);
681 _threaddebug(nil, "rwakeup %p %d -> loop", r, all);
683 _threaddebug(nil, "rwakeup %p %d -> total %d", r, all, i);
684 return i;
687 /*
688 * startup
689 */
691 static int threadargc;
692 static char **threadargv;
693 int mainstacksize;
694 extern int _p9usepwlibrary; /* getgrgid etc. smash the stack - tell _p9dir just say no */
695 static void
696 threadmainstart(void *v)
698 USED(v);
700 /*
701 * N.B. This call to proc() is a program's first call (indirectly) to a
702 * pthreads function while executing on a non-pthreads-allocated
703 * stack. If the pthreads implementation is using the stack pointer
704 * to locate the per-thread data, then this call will blow up.
705 * This means the pthread implementation is not suitable for
706 * running under libthread. Time to write your own. Sorry.
707 */
708 _p9usepwlibrary = 0;
709 threadmainproc = proc();
710 threadmain(threadargc, threadargv);
713 extern void (*_sysfatal)(char*, va_list);
715 int
716 main(int argc, char **argv)
718 Proc *p;
719 _Thread *t;
720 char *opts;
722 argv0 = argv[0];
724 opts = getenv("LIBTHREAD");
725 if(opts == nil)
726 opts = "";
728 if(threadmaybackground() && strstr(opts, "nodaemon") == nil && getenv("NOLIBTHREADDAEMONIZE") == nil)
729 _threadsetupdaemonize();
731 threadargc = argc;
732 threadargv = argv;
734 /*
735 * Install locking routines into C library.
736 */
737 _lock = _threadlock;
738 _unlock = _threadunlock;
739 _qlock = threadqlock;
740 _qunlock = threadqunlock;
741 _rlock = threadrlock;
742 _runlock = threadrunlock;
743 _wlock = threadwlock;
744 _wunlock = threadwunlock;
745 _rsleep = threadrsleep;
746 _rwakeup = threadrwakeup;
747 _notejmpbuf = threadnotejmp;
748 _pin = threadpin;
749 _unpin = threadunpin;
750 _sysfatal = threadsysfatal;
752 _pthreadinit();
753 p = procalloc();
754 p->mainproc = 1;
755 _threadsetproc(p);
756 if(mainstacksize == 0)
757 mainstacksize = 256*1024;
758 atnotify(threadinfo, 1);
759 t = _threadcreate(p, threadmainstart, nil, mainstacksize);
760 t->mainthread = 1;
761 procmain(p);
762 sysfatal("procmain returned in libthread");
763 /* does not return */
764 return 0;
767 /*
768 * hooray for linked lists
769 */
770 static void
771 addthread(_Threadlist *l, _Thread *t)
773 if(l->tail){
774 l->tail->next = t;
775 t->prev = l->tail;
776 }else{
777 l->head = t;
778 t->prev = nil;
780 l->tail = t;
781 t->next = nil;
784 static void
785 delthread(_Threadlist *l, _Thread *t)
787 if(t->prev)
788 t->prev->next = t->next;
789 else
790 l->head = t->next;
791 if(t->next)
792 t->next->prev = t->prev;
793 else
794 l->tail = t->prev;
797 /* inefficient but rarely used */
798 static int
799 onlist(_Threadlist *l, _Thread *t)
801 _Thread *tt;
803 for(tt = l->head; tt; tt=tt->next)
804 if(tt == t)
805 return 1;
806 return 0;
809 static void
810 addthreadinproc(Proc *p, _Thread *t)
812 _Threadlist *l;
814 l = &p->allthreads;
815 if(l->tail){
816 l->tail->allnext = t;
817 t->allprev = l->tail;
818 }else{
819 l->head = t;
820 t->allprev = nil;
822 l->tail = t;
823 t->allnext = nil;
826 static void
827 delthreadinproc(Proc *p, _Thread *t)
829 _Threadlist *l;
831 l = &p->allthreads;
832 if(t->allprev)
833 t->allprev->allnext = t->allnext;
834 else
835 l->head = t->allnext;
836 if(t->allnext)
837 t->allnext->allprev = t->allprev;
838 else
839 l->tail = t->allprev;
842 Proc *_threadprocs;
843 Lock _threadprocslock;
844 static Proc *_threadprocstail;
846 static void
847 addproc(Proc *p)
849 lock(&_threadprocslock);
850 if(_threadprocstail){
851 _threadprocstail->next = p;
852 p->prev = _threadprocstail;
853 }else{
854 _threadprocs = p;
855 p->prev = nil;
857 _threadprocstail = p;
858 p->next = nil;
859 unlock(&_threadprocslock);
862 static void
863 delproc(Proc *p)
865 lock(&_threadprocslock);
866 if(p->prev)
867 p->prev->next = p->next;
868 else
869 _threadprocs = p->next;
870 if(p->next)
871 p->next->prev = p->prev;
872 else
873 _threadprocstail = p->prev;
874 unlock(&_threadprocslock);
877 /*
878 * notify - for now just use the usual mechanisms
879 */
880 void
881 threadnotify(int (*f)(void*, char*), int in)
883 atnotify(f, in);
886 static int
887 onrunqueue(Proc *p, _Thread *t)
889 _Thread *tt;
891 for(tt=p->runqueue.head; tt; tt=tt->next)
892 if(tt == t)
893 return 1;
894 return 0;
897 /*
898 * print state - called from SIGINFO
899 */
900 static int
901 threadinfo(void *v, char *s)
903 Proc *p;
904 _Thread *t;
906 if(strcmp(s, "quit") != 0 && strcmp(s, "sys: status request") != 0)
907 return 0;
909 for(p=_threadprocs; p; p=p->next){
910 fprint(2, "proc %p %s%s\n", (void*)p->osprocid, p->msg,
911 p->sysproc ? " (sysproc)": "");
912 for(t=p->allthreads.head; t; t=t->allnext){
913 fprint(2, "\tthread %d %s: %s %s\n",
914 t->id,
915 t == p->thread ? "Running" :
916 onrunqueue(p, t) ? "Ready" : "Sleeping",
917 t->state, t->name);
920 return 1;