Commit Diff


commit - 1544f90960275dc9211bde30329c3258e0e1bf38
commit + 619085f0b4a85104ef6c7496f9ce1f46e9b17c82
blob - /dev/null
blob + 3c9614e60c8b6059218f42caf6d7a785be17ca64 (mode 644)
--- /dev/null
+++ src/libthread/channel.c
@@ -0,0 +1,412 @@
+#include "u.h"
+#include "libc.h"
+#include "thread.h"
+#include "threadimpl.h"
+
+/*
+ * One can go through a lot of effort to avoid this global lock.
+ * You have to put locks in all the channels and all the Alt
+ * structures.  At the beginning of an alt you have to lock all
+ * the channels, but then to try to actually exec an op you
+ * have to lock the other guy's alt structure, so that other 
+ * people aren't trying to use him in some other op at the
+ * same time.
+ * 
+ * For Plan 9 apps, it's just not worth the extra effort.
+ */
+static QLock chanlock;
+
+Channel*
+chancreate(int elemsize, int bufsize)
+{
+	Channel *c;
+
+	c = malloc(sizeof *c+bufsize*elemsize);
+	memset(c, 0, sizeof *c);
+	c->elemsize = elemsize;
+	c->bufsize = bufsize;
+	c->nbuf = 0;
+	c->buf = (uchar*)(c+1);
+	return c;
+}
+
+void
+chansetname(Channel *c, char *fmt, ...)
+{
+	char *name;
+	va_list arg;
+
+	va_start(arg, fmt);
+	name = vsmprint(fmt, arg);
+	va_end(arg);
+	free(c->name);
+	c->name = name;
+}
+
+/* bug - work out races */
+void
+chanfree(Channel *c)
+{
+	if(c == nil)
+		return;
+	free(c->name);
+	free(c->arecv.a);
+	free(c->asend.a);
+	free(c);
+}
+
+static void
+addarray(_Altarray *a, Alt *alt)
+{
+	if(a->n == a->m){
+		a->m += 16;
+		a->a = realloc(a->a, a->m*sizeof a->a[0]);
+	}
+	a->a[a->n++] = alt;
+}
+
+static void
+delarray(_Altarray *a, int i)
+{
+	--a->n;
+	a->a[i] = a->a[a->n];
+}
+
+/*
+ * doesn't really work for things other than CHANSND and CHANRCV
+ * but is only used as arg to chanarray, which can handle it
+ */
+#define otherop(op)	(CHANSND+CHANRCV-(op))
+
+static _Altarray*
+chanarray(Channel *c, uint op)
+{
+	switch(op){
+	default:
+		return nil;
+	case CHANSND:
+		return &c->asend;
+	case CHANRCV:
+		return &c->arecv;
+	}
+}
+
+static int
+altcanexec(Alt *a)
+{
+	_Altarray *ar;
+	Channel *c;
+
+	if(a->op == CHANNOP)
+		return 0;
+	c = a->c;
+	if(c->bufsize == 0){
+		ar = chanarray(c, otherop(a->op));
+		return ar && ar->n;
+	}else{
+		switch(a->op){
+		default:
+			return 0;
+		case CHANSND:
+			return c->nbuf < c->bufsize;
+		case CHANRCV:
+			return c->nbuf > 0;
+		}
+	}
+}
+
+static void
+altqueue(Alt *a)
+{
+	_Altarray *ar;
+
+	ar = chanarray(a->c, a->op);
+	addarray(ar, a);
+}
+
+static void
+altdequeue(Alt *a)
+{
+	int i;
+	_Altarray *ar;
+
+	ar = chanarray(a->c, a->op);
+	if(ar == nil){
+		fprint(2, "bad use of altdequeue op=%d\n", a->op);
+		abort();
+	}
+
+	for(i=0; i<ar->n; i++)
+		if(ar->a[i] == a){
+			delarray(ar, i);
+			return;
+		}
+	fprint(2, "cannot find self in altdq\n");
+	abort();
+}
+
+static void
+altalldequeue(Alt *a)
+{
+	int i;
+
+	for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++)
+		if(a[i].op != CHANNOP)
+			altdequeue(&a[i]);
+}
+
+static void
+amove(void *dst, void *src, uint n)
+{
+	if(dst){
+		if(src == nil)
+			memset(dst, 0, n);
+		else
+			memmove(dst, src, n);
+	}
+}
+
+/*
+ * Actually move the data around.  There are up to three
+ * players: the sender, the receiver, and the channel itself.
+ * If the channel is unbuffered or the buffer is empty,
+ * data goes from sender to receiver.  If the channel is full,
+ * the receiver removes some from the channel and the sender
+ * gets to put some in.
+ */
+static void
+altcopy(Alt *s, Alt *r)
+{
+	Alt *t;
+	Channel *c;
+	uchar *cp;
+
+	/*
+	 * Work out who is sender and who is receiver
+	 */
+	if(s == nil && r == nil)
+		return;
+	assert(s != nil);
+	c = s->c;
+	if(s->op == CHANRCV){
+		t = s;
+		s = r;
+		r = t;
+	}
+	assert(s==nil || s->op == CHANSND);
+	assert(r==nil || r->op == CHANRCV);
+
+	/*
+	 * Channel is empty (or unbuffered) - copy directly.
+	 */
+	if(s && r && c->nbuf == 0){
+		amove(r->v, s->v, c->elemsize);
+		return;
+	}
+
+	/*
+	 * Otherwise it's always okay to receive and then send.
+	 */
+	if(r){
+		cp = c->buf + c->off*c->elemsize;
+		amove(r->v, cp, c->elemsize);
+		--c->nbuf;
+		if(++c->off == c->bufsize)
+			c->off = 0;
+	}
+	if(s){
+		cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize;
+		amove(cp, s->v, c->elemsize);
+		++c->nbuf;
+	}
+}
+
+static void
+altexec(Alt *a)
+{
+	int i;
+	_Altarray *ar;
+	Alt *other;
+	Channel *c;
+
+	c = a->c;
+	ar = chanarray(c, otherop(a->op));
+	if(ar && ar->n){
+		i = rand()%ar->n;
+		other = ar->a[i];
+		altcopy(a, other);
+		altalldequeue(other->xalt);
+		other->xalt[0].xalt = other;
+		_threadready(other->thread);
+	}else
+		altcopy(a, nil);
+}
+
+#define dbgalt 0
+int
+chanalt(Alt *a)
+{
+	int i, j, ncan, n, canblock;
+	Channel *c;
+	_Thread *t;
+
+	for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++)
+		;
+	n = i;
+	canblock = a[i].op == CHANEND;
+
+	t = proc()->thread;
+	for(i=0; i<n; i++){
+		a[i].thread = t;
+		a[i].xalt = a;
+	}
+	qlock(&chanlock);
+if(dbgalt) print("alt ");
+	ncan = 0;
+	for(i=0; i<n; i++){
+		c = a[i].c;
+if(dbgalt) print(" %c:", "esrnb"[a[i].op]);
+if(dbgalt) if(c->name) print("%s", c->name); else print("%p", c);
+		if(altcanexec(&a[i])){
+if(dbgalt) print("*");
+			ncan++;
+		}
+	}
+	if(ncan){
+		j = rand()%ncan;
+		for(i=0; i<n; i++){
+			if(altcanexec(&a[i])){
+				if(j-- == 0){
+if(dbgalt){
+c = a[i].c;
+print(" => %c:", "esrnb"[a[i].op]);
+if(c->name) print("%s", c->name); else print("%p", c);
+print("\n");
+}
+					altexec(&a[i]);
+					qunlock(&chanlock);
+					return i;
+				}
+			}
+		}
+	}
+if(dbgalt)print("\n");
+
+	if(!canblock){
+		qunlock(&chanlock);
+		return -1;
+	}
+
+	for(i=0; i<n; i++){
+		if(a[i].op != CHANNOP)
+			altqueue(&a[i]);
+	}
+	qunlock(&chanlock);
+
+	_threadswitch();
+
+	/*
+	 * the guy who ran the op took care of dequeueing us
+	 * and then set a[0].alt to the one that was executed.
+	 */
+	return a[0].xalt - a;
+}
+
+static int
+_chanop(Channel *c, int op, void *p, int canblock)
+{
+	Alt a[2];
+
+	a[0].c = c;
+	a[0].op = op;
+	a[0].v = p;
+	a[1].op = canblock ? CHANEND : CHANNOBLK;
+	if(chanalt(a) < 0)
+		return -1;
+	return 1;
+}
+
+int
+chansend(Channel *c, void *v)
+{
+	return _chanop(c, CHANSND, v, 1);
+}
+
+int
+channbsend(Channel *c, void *v)
+{
+	return _chanop(c, CHANSND, v, 0);
+}
+
+int
+chanrecv(Channel *c, void *v)
+{
+	return _chanop(c, CHANRCV, v, 1);
+}
+
+int
+channbrecv(Channel *c, void *v)
+{
+	return _chanop(c, CHANRCV, v, 0);
+}
+
+int
+chansendp(Channel *c, void *v)
+{
+	return _chanop(c, CHANSND, (void*)&v, 1);
+}
+
+void*
+chanrecvp(Channel *c)
+{
+	void *v;
+
+	_chanop(c, CHANRCV, (void*)&v, 1);
+	return v;
+}
+
+int
+channbsendp(Channel *c, void *v)
+{
+	return _chanop(c, CHANSND, (void*)&v, 0);
+}
+
+void*
+channbrecvp(Channel *c)
+{
+	void *v;
+
+	_chanop(c, CHANRCV, (void*)&v, 0);
+	return v;
+}
+
+int
+chansendul(Channel *c, ulong val)
+{
+	return _chanop(c, CHANSND, &val, 1);
+}
+
+ulong
+chanrecvul(Channel *c)
+{
+	ulong val;
+
+	_chanop(c, CHANRCV, &val, 1);
+	return val;
+}
+
+int
+channbsendul(Channel *c, ulong val)
+{
+	return _chanop(c, CHANSND, &val, 0);
+}
+
+ulong
+channbrecvul(Channel *c)
+{
+	ulong val;
+
+	_chanop(c, CHANRCV, &val, 0);
+	return val;
+}
+
blob - /dev/null
blob + 2296690f4a624694d84e7ca9a118608a876b37cd (mode 644)
--- /dev/null
+++ src/libthread/ioproc.c
@@ -0,0 +1,130 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include "ioproc.h"
+
+enum
+{
+	STACK = 32768,
+};
+
+void
+iointerrupt(Ioproc *io)
+{
+	if(!io->inuse)
+		return;
+	fprint(2, "bug: cannot iointerrupt yet\n");
+}
+
+static void
+xioproc(void *a)
+{
+	Ioproc *io, *x;
+	io = a;
+	/*
+	 * first recvp acquires the ioproc.
+	 * second tells us that the data is ready.
+	 */
+	for(;;){
+		while(recv(io->c, &x) == -1)
+			;
+		if(x == 0)	/* our cue to leave */
+			break;
+		assert(x == io);
+
+		/* caller is now committed -- even if interrupted he'll return */
+		while(recv(io->creply, &x) == -1)
+			;
+		if(x == 0)	/* caller backed out */
+			continue;
+		assert(x == io);
+
+		io->ret = io->op(&io->arg);
+		if(io->ret < 0)
+			rerrstr(io->err, sizeof io->err);
+		while(send(io->creply, &io) == -1)
+			;
+		while(recv(io->creply, &x) == -1)
+			;
+	}
+}
+
+Ioproc*
+ioproc(void)
+{
+	Ioproc *io;
+
+	io = mallocz(sizeof(*io), 1);
+	if(io == nil)
+		sysfatal("ioproc malloc: %r");
+	io->c = chancreate(sizeof(void*), 0);
+	chansetname(io->c, "ioc%p", io->c);
+	io->creply = chancreate(sizeof(void*), 0);
+	chansetname(io->creply, "ior%p", io->c);
+	io->tid = proccreate(xioproc, io, STACK);
+	return io;
+}
+
+void
+closeioproc(Ioproc *io)
+{
+	if(io == nil)
+		return;
+	iointerrupt(io);
+	while(send(io->c, 0) == -1)
+		;
+	chanfree(io->c);
+	chanfree(io->creply);
+	free(io);
+}
+
+long
+iocall(Ioproc *io, long (*op)(va_list*), ...)
+{
+	char e[ERRMAX];
+	int ret, inted;
+	Ioproc *msg;
+
+	if(send(io->c, &io) == -1){
+		werrstr("interrupted");
+		return -1;
+	}
+	assert(!io->inuse);
+	io->inuse = 1;
+	io->op = op;
+	va_start(io->arg, op);
+	msg = io;
+	inted = 0;
+	while(send(io->creply, &msg) == -1){
+		msg = nil;
+		inted = 1;
+	}
+	if(inted){
+		werrstr("interrupted");
+		return -1;
+	}
+
+	/*
+	 * If we get interrupted, we have stick around so that
+	 * the IO proc has someone to talk to.  Send it an interrupt
+	 * and try again.
+	 */
+	inted = 0;
+	while(recv(io->creply, nil) == -1){
+		inted = 1;
+		iointerrupt(io);
+	}
+	USED(inted);
+	va_end(io->arg);
+	ret = io->ret;
+	if(ret < 0)
+		strecpy(e, e+sizeof e, io->err);
+	io->inuse = 0;
+
+	/* release resources */
+	while(send(io->creply, &io) == -1)
+		;
+	if(ret < 0)
+		errstr(e, sizeof e);
+	return ret;
+}
blob - /dev/null
blob + f3a488d3ce493b2f4177ef1cc794b8c715a279b8 (mode 644)
--- /dev/null
+++ src/libthread/ioproc.h
@@ -0,0 +1,14 @@
+#define ioproc_arg(io, type)	(va_arg((io)->arg, type))
+
+struct Ioproc
+{
+	int tid;
+	Channel *c, *creply;
+	int inuse;
+	long (*op)(va_list*);
+	va_list arg;
+	long ret;
+	char err[ERRMAX];
+	Ioproc *next;
+};
+
blob - /dev/null
blob + 980937888859d4873f0c23dbc5e969494df14c50 (mode 644)
--- /dev/null
+++ src/libthread/mkfile
@@ -0,0 +1,39 @@
+<$PLAN9/src/mkhdr
+
+LIB=libthread.a
+OFILES=\
+	channel.$O\
+	exec.$O\
+	ioproc.$O\
+	iorw.$O\
+	pthread.$O\
+	qlock.$O\
+	ref.$O\
+	thread.$O\
+
+<$PLAN9/src/mksyslib
+
+HFILES=thread.h threadimpl.h
+
+tprimes: tprimes.$O
+	9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread
+tspawn: tspawn.$O
+	9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread
+tspawnloop: tspawnloop.$O
+	9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread
+
+%.$O: %.c
+	9c -I. $stem.c
+
+test:V: tprimes tspawn
+	primes 1 10007 >p1.txt
+	$PLAN9/bin/time ./tprimes 10000 >tp1.txt
+	cmp p1.txt tp1.txt
+	primes 1 1009 >p2.txt
+	$PLAN9/bin/time ./tprimes 1000 >tp2.txt
+	cmp p2.txt tp2.txt
+	echo tspawn should take 3 seconds, not 6
+	$PLAN9/bin/time ./tspawn sleep 3 >/dev/null
+
+CLEANFILES=p1.txt p2.txt tp1.txt tp2.txt
+
blob - /dev/null
blob + 77f97a4fcfc3872fb22b3eb203c6269ff5ef265f (mode 644)
--- /dev/null
+++ src/libthread/pthread.c
@@ -0,0 +1,108 @@
+#include "u.h"
+#include <errno.h>
+#include "libc.h"
+#include "thread.h"
+#include "threadimpl.h"
+
+static pthread_mutex_t initmutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void
+lockinit(Lock *lk)
+{
+	pthread_mutexattr_t attr;
+
+	pthread_mutex_lock(&initmutex);
+	if(lk->init == 0){
+		pthread_mutexattr_init(&attr);
+		pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
+		pthread_mutex_init(&lk->mutex, &attr);
+		pthread_mutexattr_destroy(&attr);
+		lk->init = 1;
+	}
+	pthread_mutex_unlock(&initmutex);
+}
+
+int
+_threadlock(Lock *lk, int block, ulong pc)
+{
+	int r;
+
+	if(!lk->init)
+		lockinit(lk);
+	if(block){
+		if(pthread_mutex_lock(&lk->mutex) != 0)
+			abort();
+		return 1;
+	}else{
+		r = pthread_mutex_trylock(&lk->mutex);
+		if(r == 0)
+			return 1;
+		if(r == EBUSY)
+			return 0;
+		abort();
+		return 0;
+	}
+}
+
+void
+_threadunlock(Lock *lk, ulong pc)
+{
+	if(pthread_mutex_unlock(&lk->mutex) != 0)
+		abort();
+}
+
+void
+_procsleep(_Procrendez *r)
+{
+	/* r is protected by r->l, which we hold */
+	pthread_cond_init(&r->cond, 0);
+	r->asleep = 1;
+	pthread_cond_wait(&r->cond, &r->l->mutex);
+	pthread_cond_destroy(&r->cond);
+	r->asleep = 0;
+}
+
+void
+_procwakeup(_Procrendez *r)
+{
+	if(r->asleep){
+		r->asleep = 0;
+		pthread_cond_signal(&r->cond);
+	}
+}
+
+void
+_procstart(Proc *p, void (*fn)(void*))
+{
+//print("pc\n");
+	if(pthread_create(&p->tid, nil, (void*(*)(void*))fn, p) < 0){
+//print("pc1\n");
+		fprint(2, "pthread_create: %r\n");
+		abort();
+	}
+//print("pc2\n");
+}
+
+static pthread_key_t prockey;
+
+Proc*
+_threadproc(void)
+{
+	Proc *p;
+
+	p = pthread_getspecific(prockey);
+	return p;
+}
+
+void
+_threadsetproc(Proc *p)
+{
+	pthread_setspecific(prockey, p);
+}
+
+void
+pthreadinit(void)
+{
+	pthread_key_create(&prockey, 0);
+}
+
blob - /dev/null
blob + 30c932edb13e24b88cd3163bd20b7ebe55820dd1 (mode 644)
--- /dev/null
+++ src/libthread/ref.c
@@ -0,0 +1,27 @@
+#include "u.h"
+#include "libc.h"
+#include "thread.h"
+
+static long
+refadd(Ref *r, long a)
+{
+	long ref;
+
+	lock(&r->lock);
+	r->ref += a;
+	ref = r->ref;
+	unlock(&r->lock);
+	return ref;
+}
+
+long
+incref(Ref *r)
+{
+	return refadd(r, 1);
+}
+
+long
+decref(Ref *r)
+{
+	return refadd(r, -1);
+}
blob - /dev/null
blob + 9f70b0e06a56c10cc72502a4812a1800c7330e61 (mode 644)
--- /dev/null
+++ src/libthread/threadimpl.h
@@ -0,0 +1,70 @@
+#include <ucontext.h>
+
+typedef struct Context Context;
+typedef struct Proc Proc;
+typedef struct _Procrendez _Procrendez;
+
+enum
+{
+	STACK = 8192
+};
+
+struct Context
+{
+	ucontext_t	uc;
+};
+
+struct _Thread
+{
+	_Thread	*next;
+	_Thread	*prev;
+	_Thread	*allnext;
+	_Thread	*allprev;
+	Context	context;
+	uint	id;
+	uchar	*stk;
+	uint	stksize;
+	int		exiting;
+	void	(*startfn)(void*);
+	void	*startarg;
+	Proc	*proc;
+	char	name[256];
+	char	state[256];
+};
+
+struct _Procrendez
+{
+	Lock		*l;
+	int		asleep;
+	pthread_cond_t	cond;
+};
+
+extern	void	_procsleep(_Procrendez*);
+extern	void	_procwakeup(_Procrendez*);
+
+struct Proc
+{
+	pthread_t	tid;
+	Lock		lock;
+	_Thread		*thread;
+	_Threadlist	runqueue;
+	_Threadlist	allthreads;
+	uint		nthread;
+	uint		sysproc;
+	_Procrendez	runrend;
+	Context	schedcontext;
+	void		*udata;
+};
+
+extern Proc *xxx;
+#define proc() _threadproc()
+#define setproc(p) _threadsetproc(p)
+
+extern void _procstart(Proc*, void (*fn)(void*));
+extern _Thread *_threadcreate(Proc*, void(*fn)(void*), void*, uint);
+extern void _threadexit(void);
+extern Proc *_threadproc(void);
+extern void _threadsetproc(Proc*);
+extern int _threadlock(Lock*, int, ulong);
+extern void _threadunlock(Lock*, ulong);
+