commit - 984e353160593b20d1e2944e1f2e9ce2117c8490
commit + 06bb4ed20d855b60e39c1125d8d715ba8892265b
blob - 5ecdf4053be35a841608e69beb68a04cba3a43c9
blob + a427f2a5eecfab874dc30de17b9901c50c729e29
--- bin/9l
+++ bin/9l
extralibs="$extralibs -lutil"
;;
*Linux*) ld=gcc
- extralibs="$extralibs -lutil"
+ extralibs="$extralibs -lutil -lpthread"
;;
*Darwin*) ld=gcc ;;
*SunOS*) ld="${CC9:-cc} -g"
blob - 93a9104a9c29b5316fb4d2e093f8f49302ebbc27
blob + 1db39606a764c3ff0abf9416900bf7eebc68bc7b
--- include/libc.h
+++ include/libc.h
/*
* synchronization
*/
-typedef
-struct Lock {
- int val;
-} Lock;
-
-extern int _tas(int*);
+typedef struct Lock Lock;
+struct Lock
+{
+#ifdef PLAN9_PTHREADS
+ int init;
+ pthread_mutex_t mutex;
+#else
+ int val;
+#endif
+};
extern void lock(Lock*);
extern void unlock(Lock*);
extern int canlock(Lock*);
+/*
+ * Used to implement process sleep and wakeup,
+ * either in terms of pthreads or our own primitives.
+ * This will be more portable than writing our own
+ * per-system implementations, and on some systems
+ * non-pthreads threading implementations break libc
+ * (cough, Linux, cough).
+ */
+typedef struct _Procrend _Procrend;
+struct _Procrend
+{
+ int asleep;
+ Lock *l;
+ void *arg;
+#ifdef PLAN9_PTHREADS
+ pthread_cond_t cond;
+#endif
+};
+
+extern void _procsleep(_Procrend*);
+extern void _procwakeup(_Procrend*);
+
typedef struct QLp QLp;
struct QLp
{
- int inuse;
+ Lock inuse;
QLp *next;
+ _Procrend rend;
char state;
};
extern void qlock(QLock*);
extern void qunlock(QLock*);
extern int canqlock(QLock*);
-extern void _qlockinit(ulong (*)(ulong, ulong)); /* called only by the thread library */
+extern void _qlockinit(void(*)(_Procrend*), void(*)(_Procrend*)); /* called only by the thread library */
typedef
struct RWLock
blob - ce2d73bb7b842d8ce4d1b3d56bd93dafb31d7d13
blob + f2a12599795d4840f965beb3dbde406a9b4e3833
--- include/thread.h
+++ include/thread.h
/* the next variables are used internally to alt
* they need not be initialized
*/
- Channel **tag; /* pointer to rendez-vous tag */
+ struct Thread *thread; /* thread waiting on this alt */
int entryno; /* entry number */
};
blob - a7f97fc69f42fad92a0e46bfa572407a2e847104
blob + 0cd8e39721fc467ae1471ac68083bcd346d4aa31
--- include/u.h
+++ include/u.h
# undef _NEEDUSHORT
# undef _NEEDUINT
# undef _NEEDULONG
+# include <pthread.h>
+# define PLAN9_PTHREADS
# endif
#endif
#if defined(__sun__)
# undef _NEEDUSHORT
# undef _NEEDUINT
# undef _NEEDULONG
+# include <pthread.h>
+# define PLAN9_PTHREADS
#endif
#if defined(__FreeBSD__)
# include <sys/types.h>
# undef _NEEDUSHORT
# undef _NEEDUINT
# define _NEEDLL 1
+# include <pthread.h>
+# define PLAN9_PTHREADS
#endif
+
typedef signed char schar;
typedef unsigned int u32int;
typedef int s32int;
blob - f4704c6086d1e6c98884033f39ad091912cac62b
blob + 4f976359b400ebf3acd25feb5039f9bd500d0b9d
--- src/lib9/ffork-Linux.c
+++ src/lib9/ffork-Linux.c
+#include "ffork-pthread.c"
+
+#ifdef OLD
/*
* Is nothing simple?
*
return getpid();
}
+#endif
blob - 80f65b339862419454cb0e22425892fc9f007562
blob + eb3ea21dd6e8214611860a9216008d31df637364
--- src/lib9/lock.c
+++ src/lib9/lock.c
#include <unistd.h>
#include <sys/time.h>
#include <sched.h>
+#include <errno.h>
#include <libc.h>
-int _ntas;
-static int
-_xtas(void *v)
+static pthread_mutex_t initmutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void
+lockinit(Lock *lk)
{
- int x;
+ pthread_mutexattr_t attr;
- _ntas++;
- x = _tas(v);
- return x;
+ pthread_mutex_lock(&initmutex);
+ if(lk->init == 0){
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
+ pthread_mutex_init(&lk->mutex, &attr);
+ pthread_mutexattr_destroy(&attr);
+ lk->init = 1;
+ }
+ pthread_mutex_unlock(&initmutex);
}
-int
-canlock(Lock *l)
+void
+lock(Lock *lk)
{
- return !_xtas(&l->val);
+ if(!lk->init)
+ lockinit(lk);
+ if(pthread_mutex_lock(&lk->mutex) != 0)
+ abort();
}
-void
-unlock(Lock *l)
+int
+canlock(Lock *lk)
{
- l->val = 0;
+ int r;
+
+ if(!lk->init)
+ lockinit(lk);
+ r = pthread_mutex_trylock(&lk->mutex);
+ if(r == 0)
+ return 1;
+ if(r == EBUSY)
+ return 0;
+ abort();
}
void
-lock(Lock *lk)
+unlock(Lock *lk)
{
- int i;
-
- /* once fast */
- if(!_xtas(&lk->val))
- return;
- /* a thousand times pretty fast */
- for(i=0; i<1000; i++){
- if(!_xtas(&lk->val))
- return;
- sched_yield();
- }
- /* now nice and slow */
- for(i=0; i<1000; i++){
- if(!_xtas(&lk->val))
- return;
- usleep(100*1000);
- }
- /* take your time */
- while(_xtas(&lk->val))
- usleep(1000*1000);
+ if(pthread_mutex_unlock(&lk->mutex) != 0)
+ abort();
}
blob - d83a4b7f7bb583c18ad6f307ffab48c7d7c4f76b
blob + 798b08f385c035316777e4da164e38e4e640da4d
--- src/lib9/qlock.c
+++ src/lib9/qlock.c
Waking,
};
-static ulong (*_rendezvousp)(ulong, ulong) = rendezvous;
+static void (*procsleep)(_Procrend*) = _procsleep;
+static void (*procwakeup)(_Procrend*) = _procwakeup;
/* this gets called by the thread library ONLY to get us to use its rendezvous */
void
-_qlockinit(ulong (*r)(ulong, ulong))
+_qlockinit(void (*sleep)(_Procrend*), void (*wakeup)(_Procrend*))
{
- _rendezvousp = r;
+ procsleep = sleep;
+ procwakeup = wakeup;
}
/* find a free shared memory location to queue ourselves in */
fprint(2, "qlock: out of qlp\n");
abort();
}
- if(_tas(&(p->inuse)) == 0){
+ if(canlock(&p->inuse)){
ql.p = p;
p->next = nil;
break;
p->next = mp;
q->tail = mp;
mp->state = Queuing;
+ mp->rend.l = &q->lock;
+ _procsleep(&mp->rend);
unlock(&q->lock);
-
- /* wait */
- while((*_rendezvousp)((ulong)mp, 1) == ~0)
- ;
assert(mp->state == Waking);
- mp->inuse = 0;
+ unlock(&mp->inuse);
}
void
q->head = p->next;
if(q->head == nil)
q->tail = nil;
- unlock(&q->lock);
p->state = Waking;
- while((*_rendezvousp)((ulong)p, 0x12345) == ~0)
- ;
+ _procwakeup(&p->rend);
+ unlock(&q->lock);
return;
}
q->locked = 0;
q->tail = mp;
mp->next = nil;
mp->state = QueuingR;
+ mp->rend.l = &q->lock;
+ _procsleep(&mp->rend);
unlock(&q->lock);
-
- /* wait in kernel */
- while((*_rendezvousp)((ulong)mp, 1) == ~0)
- ;
assert(mp->state == Waking);
- mp->inuse = 0;
+ unlock(&mp->inuse);
}
int
if(q->head == 0)
q->tail = 0;
q->writer = 1;
- unlock(&q->lock);
/* wakeup waiter */
p->state = Waking;
- while((*_rendezvousp)((ulong)p, 0) == ~0)
- ;
+ _procwakeup(&p->rend);
+ unlock(&q->lock);
}
void
q->tail = mp;
mp->next = nil;
mp->state = QueuingW;
- unlock(&q->lock);
/* wait in kernel */
- while((*_rendezvousp)((ulong)mp, 1) == ~0)
- ;
+ mp->rend.l = &q->lock;
+ _procsleep(&mp->rend);
+ unlock(&q->lock);
assert(mp->state == Waking);
- mp->inuse = 0;
+ unlock(&mp->inuse);
}
int
q->head = p->next;
if(q->head == nil)
q->tail = nil;
- unlock(&q->lock);
p->state = Waking;
- while((*_rendezvousp)((ulong)p, 0) == ~0)
- ;
+ _procwakeup(&p->rend);
+ unlock(&q->lock);
return;
}
q->head = p->next;
q->readers++;
p->state = Waking;
- while((*_rendezvousp)((ulong)p, 0) == ~0)
- ;
+ _procwakeup(&p->rend);
}
if(q->head == nil)
q->tail = nil;
r->l->head = t->next;
if(r->l->head == nil)
r->l->tail = nil;
- unlock(&r->l->lock);
t->state = Waking;
- while((*_rendezvousp)((ulong)t, 0x12345) == ~0)
- ;
- }else{
+ _procwakeup(&t->rend);
+ }else
r->l->locked = 0;
- unlock(&r->l->lock);
- }
/* wait for a wakeup */
- while((*_rendezvousp)((ulong)me, 0x23456) == ~0)
- ;
+ me->rend.l = &r->l->lock;
+ _procsleep(&me->rend);
+
assert(me->state == Waking);
- me->inuse = 0;
+ unlock(&me->inuse);
if(!r->l->locked){
fprint(2, "rsleep: not locked after wakeup\n");
abort();
;
return i;
}
+
+void
+_procsleep(_Procrend *rend)
+{
+//print("sleep %p %d\n", rend, getpid());
+ pthread_cond_init(&rend->cond, 0);
+ rend->asleep = 1;
+ while(rend->asleep)
+ pthread_cond_wait(&rend->cond, &rend->l->mutex);
+ pthread_cond_destroy(&rend->cond);
+}
+
+void
+_procwakeup(_Procrend *rend)
+{
+//print("wakeup %p\n", rend);
+ rend->asleep = 0;
+ pthread_cond_signal(&rend->cond);
+}
+
blob - 68eaa95e2dc9534d66a23631a3a607014d40a33d
blob + cdb8376e54b8d21dd68cbd70065599ccf80faa58
--- src/libthread/channel.c
+++ src/libthread/channel.c
static Lock chanlock; /* central channel access lock */
-static void enqueue(Alt*, Channel**);
+static void enqueue(Alt*, Thread*);
static void dequeue(Alt*);
static int altexec(Alt*, int);
/* are there senders or receivers blocked? */
otherop = (CHANSND+CHANRCV) - a->op;
for(i=0; i<c->nentry; i++)
- if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
+ if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread==nil){
_threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
return 1;
}
_alt(Alt *alts)
{
Alt *a, *xa;
- Channel *volatile c;
+ Channel *c;
int n, s;
- ulong r;
Thread *t;
/*
xa->entryno = -1;
if(xa->op == CHANNOP)
continue;
-
+
c = xa->c;
if(c==nil){
unlock(&chanlock);
}
/* enqueue on all channels. */
- c = nil;
+ t->altc = nil;
for(xa=alts; xa->op!=CHANEND; xa++){
if(xa->op==CHANNOP)
continue;
- enqueue(xa, (Channel**)&c);
+ enqueue(xa, t);
}
/*
* is interrupted -- someone else might come
* along and try to rendezvous with us, so
* we need to be here.
+ *
+ * actually, now we're assuming no interrupts.
*/
- Again:
+ /*Again:*/
t->alt = alts;
t->chan = Chanalt;
-
- unlock(&chanlock);
+ t->altrend.l = &chanlock;
_procsplx(s);
- r = _threadrendezvous((ulong)&c, 0);
+ _threadsleep(&t->altrend);
s = _procsplhi();
- lock(&chanlock);
- if(r==~0){ /* interrupted */
- if(c!=nil) /* someone will meet us; go back */
- goto Again;
- c = (Channel*)~0; /* so no one tries to meet us */
- }
-
/* dequeue from channels, find selected one */
a = nil;
+ c = t->altc;
for(xa=alts; xa->op!=CHANEND; xa++){
if(xa->op==CHANNOP)
continue;
}
static void
-enqueue(Alt *a, Channel **c)
+enqueue(Alt *a, Thread *t)
{
int i;
_threaddebug(DBGCHAN, "Queueing alt %p on channel %p", a, a->c);
- a->tag = c;
+ a->thread = t;
i = emptyentry(a->c);
a->c->qentry[i] = a;
}
b = nil;
me = a->v;
for(i=0; i<c->nentry; i++)
- if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
+ if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread==nil)
if(nrand(++n) == 0)
b = c->qentry[i];
if(b != nil){
else
altcopy(waiter, me, c->e);
}
- *b->tag = c; /* commits us to rendezvous */
+ b->thread->altc = c;
+ _procwakeup(&b->thread->altrend);
+ _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock);
_threaddebug(DBGCHAN, "unlocking the chanlock");
unlock(&chanlock);
_procsplx(spl);
- _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock);
- while(_threadrendezvous((ulong)b->tag, 0) == ~0)
- ;
return 1;
}
blob - f5f0d6c06f3a678159437c3868a8f6489620153c
blob + 5dee4c481ba86f6068d6c483a8829b84e6ec94a1
--- src/libthread/create.c
+++ src/libthread/create.c
Pqueue _threadpq;
int _threadprocs;
+int __pthread_nonstandard_stacks;
static int nextID(void);
Thread *t;
char *s;
+ __pthread_nonstandard_stacks = 1;
if(stacksize < 32)
sysfatal("bad stacksize %d", stacksize);
t = _threadmalloc(sizeof(Thread), 1);
blob - 0a2fccb70f8c8264a037670bb11ce2e475e1ea3b
blob + 6129f3e0e81caa7bdcdd6c45f5c75c27638cfa8e
--- src/libthread/main.c
+++ src/libthread/main.c
//_threaddebuglevel = (DBGSCHED|DBGCHAN|DBGREND)^~0;
_systhreadinit();
- _qlockinit(_threadrendezvous);
+ _qlockinit(_threadsleep, _threadwakeup);
_sysfatal = _threadsysfatal;
notify(_threadnote);
if(mainstacksize == 0)
a = _threadmalloc(sizeof *a, 1);
a->argc = argc;
a->argv = argv;
-
+malloc(10);
p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0);
+malloc(10);
_schedinit(p);
abort(); /* not reached */
return 0;
{
Mainarg *a;
+malloc(10);
a = arg;
+malloc(10);
threadmain(a->argc, a->argv);
threadexits("threadmain");
}
blob - 221cb818214b7e7b017e9fe97c42c1f3ee9ede72
blob + d1a095e77874792531a737a5183048a6bfd84afa
--- src/libthread/mkfile
+++ src/libthread/mkfile
ioreadn.$O\
iosleep.$O\
iowrite.$O\
- kill.$O\
lib.$O\
main.$O\
memset.$O\
<$PLAN9/src/mksyslib
tprimes: tprimes.$O $PLAN9/lib/$LIB
- $LD -o tprimes tprimes.$O $LDFLAGS -lthread -l9 -lfmt -lutf
+ $LD -o tprimes tprimes.$O $LDFLAGS -lthread -l9
texec: texec.$O $PLAN9/lib/$LIB
- $LD -o texec texec.$O $LDFLAGS -lthread -l9 -lfmt -lutf
+ $LD -o texec texec.$O $LDFLAGS -lthread -l9
trend: trend.$O $PLAN9/lib/$LIB
- $LD -o trend trend.$O $LDFLAGS -lthread -l9 -lfmt -lutf
+ $LD -o trend trend.$O $LDFLAGS -lthread -l9
CLEANFILES=$CLEANFILES tprimes texec
blob - 70eb0ae8bf1c3b78be0a31c36e36187a58a0a27c
blob + 4451fa4d3c570102bc6f940d011a133f7f06a46c
--- src/libthread/rendez.c
+++ src/libthread/rendez.c
#include "threadimpl.h"
-Rgrp _threadrgrp;
-static int isdirty;
int _threadhighnrendez;
int _threadnrendez;
-static int nrendez;
-static ulong
-finish(Thread *t, ulong val)
+void
+_threadsleep(_Procrend *r)
{
- ulong ret;
+ Thread *t;
- ret = t->rendval;
- t->rendval = val;
- while(t->state == Running)
- sleep(0);
- lock(&t->proc->lock);
- if(t->state == Rendezvous){ /* not always true: might be Dead */
- t->state = Ready;
- _threadready(t);
- }
- unlock(&t->proc->lock);
- return ret;
-}
-
-ulong
-_threadrendezvous(ulong tag, ulong val)
-{
- ulong ret;
- Thread *t, **l;
-
- lock(&_threadrgrp.lock);
-_threadnrendez++;
- l = &_threadrgrp.hash[tag%nelem(_threadrgrp.hash)];
- for(t=*l; t; l=&t->rendhash, t=*l){
- if(t->rendtag==tag){
- _threaddebug(DBGREND, "Rendezvous with thread %d.%d", t->proc->pid, t->id);
- *l = t->rendhash;
- ret = finish(t, val);
- --nrendez;
- unlock(&_threadrgrp.lock);
- return ret;
- }
- }
-
- /* Going to sleep here. */
t = _threadgetproc()->thread;
- t->rendbreak = 0;
- t->inrendez = 1;
- t->rendtag = tag;
- t->rendval = val;
- t->rendhash = *l;
- *l = t;
- ++nrendez;
- if(nrendez > _threadhighnrendez)
- _threadhighnrendez = nrendez;
- _threaddebug(DBGREND, "Rendezvous for tag %lud (m=%d)", t->rendtag, t->moribund);
- unlock(&_threadrgrp.lock);
+ r->arg = t;
t->nextstate = Rendezvous;
+ t->inrendez = 1;
+ unlock(r->l);
_sched();
t->inrendez = 0;
- _threaddebug(DBGREND, "Woke after rendezvous; val is %lud", t->rendval);
- return t->rendval;
+ lock(r->l);
}
-/*
- * This is called while holding _threadpq.lock and p->lock,
- * so we can't lock _threadrgrp.lock. Instead our caller has
- * to call _threadbreakrendez after dropping those locks.
- */
void
-_threadflagrendez(Thread *t)
+_threadwakeup(_Procrend *r)
{
- t->rendbreak = 1;
- isdirty = 1;
-}
+ Thread *t;
-void
-_threadbreakrendez(void)
-{
- int i;
- Thread *t, **l;
-
- if(isdirty == 0)
+ t = r->arg;
+ while(t->state == Running)
+ sleep(0);
+ lock(&t->proc->lock);
+ if(t->state == Dead){
+ unlock(&t->proc->lock);
return;
- lock(&_threadrgrp.lock);
- if(isdirty == 0){
- unlock(&_threadrgrp.lock);
- return;
}
- isdirty = 0;
- for(i=0; i<nelem(_threadrgrp.hash); i++){
- l = &_threadrgrp.hash[i];
- for(t=*l; t; t=*l){
- if(t->rendbreak){
- *l = t->rendhash;
- finish(t, ~0);
- }else
- l=&t->rendhash;
- }
- }
- unlock(&_threadrgrp.lock);
+ assert(t->state == Rendezvous && t->inrendez);
+ t->state = Ready;
+ _threadready(t);
+ unlock(&t->proc->lock);
}
blob - df8014b6b09d3946796e618da04a9517bca0ab0b
blob + f9e680fd5b17d7ce2272ce24fc141a7ea6d13372
--- src/libthread/sched.c
+++ src/libthread/sched.c
unlock(&p->lock);
while(_setlabel(&p->sched))
;
+malloc(10);
_threaddebug(DBGSCHED, "top of schedinit, _threadexitsallstatus=%p", _threadexitsallstatus);
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
p->threads.tail = t->prevt;
unlock(&p->lock);
if(t->inrendez){
- _threadflagrendez(t);
- _threadbreakrendez();
+ abort();
+ // _threadflagrendez(t);
+ // _threadbreakrendez();
}
_stackfree(t->stk);
free(t->cmdname);
Resched:
p = _threadgetproc();
//fprint(2, "p %p\n", p);
+malloc(10);
if((t = p->thread) != nil){
needstack(512);
// _threaddebug(DBGSCHED, "pausing, state=%s set %p goto %p",
// psstate(t->state), &t->sched, &p->sched);
+print("swap\n");
if(_setlabel(&t->sched)==0)
_gotolabel(&p->sched);
_threadstacklimit(t->stk, t->stk+t->stksize);
return p->nsched++;
}else{
+malloc(10);
t = runthread(p);
if(t == nil){
_threaddebug(DBGSCHED, "all threads gone; exiting");
}
t->state = Running;
t->nextstate = Ready;
+malloc(10);
+print("gotolabel\n");
_gotolabel(&t->sched);
for(;;);
}
blob - 7e44e6464d813607c79c7d364e925ce02729a7d0
blob + 851186e32eff2865d6a6b50d12b422d8de43abaa
--- src/libthread/threadimpl.h
+++ src/libthread/threadimpl.h
char *cmdname; /* ptr to name of thread */
- int inrendez;
- Thread *rendhash; /* Trgrp linked list */
- ulong rendtag; /* rendezvous tag */
- ulong rendval; /* rendezvous value */
- int rendbreak; /* rendezvous has been taken */
+ int inrendez;
+ Channel *altc;
+ _Procrend altrend;
Chanstate chan; /* which channel operation is current */
Alt *alt; /* pointer to current alt structure (debugging) */
void _schedinit(void*);
void _systhreadinit(void);
void _threadassert(char*);
-void _threadbreakrendez(void);
void __threaddebug(ulong, char*, ...);
#define _threaddebug if(!_threaddebuglevel){}else __threaddebug
void _threadexitsall(char*);
-void _threadflagrendez(Thread*);
Proc* _threadgetproc(void);
extern void _threadmultiproc(void);
Proc* _threaddelproc(void);
void _threadnote(void*, char*);
void _threadready(Thread*);
void _threadidle(void);
-ulong _threadrendezvous(ulong, ulong);
+void _threadsleep(_Procrend*);
+void _threadwakeup(_Procrend*);
void _threadsignal(void);
void _threadsysfatal(char*, va_list);
long _xdec(long*);
blob - 89d30c0366bfb446f129c129948df9b8561af677
blob + 0e7194510c10ce34cab931d1ee60537f25429764
--- src/libthread/tprimes.c
+++ src/libthread/tprimes.c
int i;
Channel *c;
+malloc(10);
ARGBEGIN{
case 'D':
_threaddebuglevel = atoi(ARGF());