Commit Diff


commit - f3666390156e312941e5ab87cae79dcfb738e315
commit + 8a7ec69711074b12b52a63f9eb61ce8cc82425bb
blob - /dev/null
blob + 1e7044f40483d25674fb40acc8b2a71b900925c7 (mode 644)
--- /dev/null
+++ client.go
@@ -0,0 +1,216 @@
+package p9pnew
+
+import (
+	"bufio"
+	"fmt"
+	"log"
+	"time"
+
+	"golang.org/x/net/context"
+
+	"net"
+)
+
+type client struct {
+	conn     net.Conn
+	tags     *tagPool
+	requests chan *fcallRequest
+}
+
+// NewSession returns a session using the connection.
+func NewSession(conn net.Conn) (Session, error) {
+	return &client{
+		conn: conn,
+	}
+}
+
+var _ Session = &client{}
+
+func (c *client) Auth(ctx context.Context, afid Fid, uname, aname string) (Qid, error) {
+	panic("not implemented")
+}
+
+func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) {
+	panic("not implemented")
+}
+
+func (c *client) Clunk(ctx context.Context, fid Fid) error {
+	panic("not implemented")
+}
+
+func (c *client) Remove(ctx context.Context, fid Fid) error {
+	panic("not implemented")
+}
+
+func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) {
+	panic("not implemented")
+}
+
+func (c *client) Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) {
+	panic("not implemented")
+}
+
+func (c *client) Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) {
+	panic("not implemented")
+}
+
+func (c *client) Open(ctx context.Context, fid Fid, mode int32) (Qid, error) {
+	panic("not implemented")
+}
+
+func (c *client) Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error) {
+	panic("not implemented")
+}
+
+func (c *client) Stat(context.Context, Fid) (Dir, error) {
+	panic("not implemented")
+}
+
+func (c *client) WStat(context.Context, Fid, Dir) error {
+	panic("not implemented")
+}
+
+func (c *client) Version(ctx context.Context, msize int32, version string) (int32, string, error) {
+	fcall := &Fcall{
+		Type: TVersion,
+		Tag:  tag,
+		Message: MessageVersion{
+			MSize:   msize,
+			Version: Version,
+		},
+	}
+
+	resp, err := c.send(ctx, fcall)
+	if err != nil {
+		return 0, "", err
+	}
+
+	mv, ok := resp.Message.(*MessageVersion)
+	if !ok {
+		return fmt.Errorf("invalid rpc response for version message: %v", resp)
+	}
+
+	return mv.MSize, mv.Version, nil
+}
+
+// send dispatches the fcall.
+func (c *client) send(ctx context.Context, fc *Fcall) (*Fcall, error) {
+	fc.Tag = c.tags.Get()
+	defer c.tags.Put(fc.Tag)
+
+	fcreq := newFcallRequest(ctx, fc)
+
+	// dispatch the request.
+	select {
+	case <-c.closed:
+		return nil, ErrClosed
+	case c.requests <- fcreq:
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+
+	// wait for the response.
+	select {
+	case <-closed:
+		return nil, ErrClosed
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case resp := <-fcreq.response:
+		return resp, nil
+	}
+}
+
+type fcallRequest struct {
+	ctx      context.Context
+	fcall    *Fcall
+	response chan *Fcall
+	err      chan error
+}
+
+func newFcallRequest(ctx context.Context, fc *Fcall) fcallRequest {
+	return fcallRequest{
+		ctx:      ctx,
+		fcall:    fc,
+		response: make(chan *Fcall, 1),
+		err:      make(chan err, 1),
+	}
+}
+
+// handle takes messages off the wire and wakes up the waiting tag call.
+func (c *client) handle() {
+
+	var (
+		responses = make(chan *Fcall)
+		// outstanding provides a map of tags to outstanding requests.
+		outstanding = map[Tag]*fcallRequest{}
+	)
+
+	// loop to read messages off of the connection
+	go func() {
+		r := bufio.NewReader(c.conn)
+
+	loop:
+		for {
+			// Continuously set the read dead line pump the loop below. We can
+			// probably set a connection dead threshold that can count these.
+			// Usually, this would only matter when there are actually
+			// outstanding requests.
+			if err := c.conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
+				panic(fmt.Sprintf("error setting read deadline: %v", err))
+			}
+
+			fc := new(Fcall)
+			if err := read9p(r, fc); err != nil {
+				switch err := err.(type) {
+				case net.Error:
+					if err.Timeout() || err.Temporary() {
+						break loop
+					}
+				}
+
+				panic(fmt.Sprintf("connection read error: %v", err))
+			}
+
+			select {
+			case <-closed:
+				return
+			case responses <- fc:
+			}
+		}
+
+	}()
+
+	w := bufio.NewWriter(c.conn)
+
+	for {
+		select {
+		case <-c.closed:
+			return
+		case req := <-c.requests:
+			outstanding[req.fcall.Tag] = req
+
+			// use deadline to set write deadline for this request.
+			deadline, ok := req.ctx.Deadline()
+			if !ok {
+				deadline = time.Now().Add(time.Second)
+			}
+
+			if err := c.conn.SetWriteDeadline(deadline); err != nil {
+				log.Println("error setting write deadline: %v", err)
+			}
+
+			if err := write9p(w, req.fcall); err != nil {
+				delete(outstanding, req.fcall.Tag)
+				req.err <- err
+			}
+		case b := <-responses:
+			req, ok := outstanding[b.Tag]
+			if !ok {
+				panic("unknown tag received")
+			}
+			delete(outstanding, req.fcall.Tag)
+
+			req.response <- b
+		}
+	}
+}
blob - /dev/null
blob + 22235424d65bc142f1cb44c60aa1ccf7012ed76b (mode 644)
--- /dev/null
+++ errors.go
@@ -0,0 +1,20 @@
+package p9pnew
+
+import (
+	"errors"
+	"fmt"
+)
+
+type Error struct {
+	Name string
+}
+
+func (e Error) Error() string {
+	return fmt.Sprintf("9p: %v", e.Name)
+}
+
+var (
+	ErrClosed = errors.New("closed")
+
+	ErrUnknownfid = Error{Name: "unknown fid"}
+)
blob - /dev/null
blob + 73b68dd64117abec8a2328e35d03d23921bd9f42 (mode 644)
--- /dev/null
+++ fcall.go
@@ -0,0 +1,261 @@
+package p9pnew
+
+import (
+	"bytes"
+	"encoding/binary"
+	"fmt"
+	"io"
+
+	"encoding"
+)
+
+type FcallType uint8
+
+const (
+	Tversion FcallType = iota + 100
+	Rversion
+	Tauth
+	Rauth
+	Tattach
+	Rattach
+	Terror
+	Rerror
+	Tflush
+	Rflush
+	Twalk
+	Rwalk
+	Topen
+	Ropen
+	Tcreate
+	Rcreate
+	Tread
+	Rread
+	Twrite
+	Rwrite
+	Tclunk
+	Rclunk
+	Tremove
+	Rremove
+	Tstat
+	Rstat
+	Twstat
+	Rwstat
+	Tmax
+)
+
+type Fcall struct {
+	Type    Type
+	Tag     Tag
+	Message Message
+}
+
+const (
+	fcallHeaderSize = 4 /*size*/ + 1 /*type*/
+)
+
+func (fc *Fcall) Size() int {
+	return fcallHeaderSize + fc.Message.Size()
+}
+
+func (fc *Fcall) MarshalBinary() ([]byte, error) {
+	mp, err := fc.Message.MarshalBinary()
+	if err != nil {
+		return nil, err
+	}
+
+	b := bytes.NewBuffer(make([]byte, 0, fc.Size()))
+	if err := write9p(b, fc.Size(), fc.Tag, mp); err != nil {
+		return nil, err
+	}
+
+	return b.Bytes(), nil
+}
+
+func (fc *Fcall) UnmarshalBinary(p []data) error {
+	var (
+		r = bytes.NewReader(p)
+	)
+
+	if err := read9p(r, &fc.Type, &fc.Tag); err != nil {
+		return err
+	}
+
+	switch fc.Type {
+	case Tversion, Rversion:
+		fc.Message = &MessageVersion{}
+	case Tauth:
+
+	case Rauth:
+
+	case Tattach:
+
+	case Rattach:
+
+	case Terror:
+
+	case Rerror:
+
+	case Tflush:
+
+	case Rflush:
+
+	case Twalk:
+
+	case Rwalk:
+
+	case Topen:
+
+	case Ropen:
+
+	case Tcreate:
+
+	case Rcreate:
+
+	case Tread:
+
+	case Rread:
+
+	case Twrite:
+
+	case Rwrite:
+
+	case Tclunk:
+
+	case Rclunk:
+
+	case Tremove:
+
+	case Rremove:
+
+	case Tstat:
+
+	case Rstat:
+
+	case Twstat:
+
+	case Rwstat:
+
+	}
+
+	return fc.Message.UnmarshalBinary(p[len(p)-r.Len():])
+}
+
+type Message interface {
+	Size() int
+	encoding.BinaryMarshaler
+	encoding.BinaryUnmarshaler
+}
+
+// MessageVersion encodes the message body for Tversion and Rversion RPC
+// calls. The body is identical in both directions.
+type MessageVersion struct {
+	MSize   uint32
+	Version string
+}
+
+func (mv MessageVersion) Size() int {
+	return 4 + 2 + len(mv.Version)
+}
+
+func (mv MessageVersion) MarshalBinary() ([]byte, error) {
+	b := bytes.NewBuffer(make([]byte, 0, mv.Size()))
+
+	if err := write9p(b, mv.MSize, mv.Version); err != nil {
+		return nil, err
+	}
+
+	return b.Bytes(), nil
+}
+
+// write9p implements serialization for base types.
+func write9p(w io.Writer, vs ...interface{}) error {
+	for _, v := range vs {
+		switch v := v.(type) {
+		case string:
+			// implement string[s] encoding
+			if err := binary.Write(w, binary.LittleEndian, uint16(len(v))); err != nil {
+				return err
+			}
+
+			_, err := io.WriteString(w, s)
+			if err != nil {
+
+				return err
+			}
+		case *Fcall:
+			if err := write9p(w, v.Size()); err != nil {
+				return err
+			}
+			p, err := v.MarshalBinary()
+			if err != nil {
+				return err
+			}
+
+			n, err := w.Write(p)
+			if err != nil {
+				return err
+			}
+
+			if n != len(p) {
+				return io.ErrShortWrite
+			}
+
+			return nil
+		default:
+			if err := binary.Write(w, binary.LittleEndian, v); err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+// read9p extracts values from rd and unmarshals them to the targets of vs.
+func read9p(rd io.Reader, vs ...interface{}) error {
+	for _, v := range vs {
+		switch v := v.(type) {
+		case *string:
+			var ll uint16
+
+			// implement string[s] encoding
+			if err := binary.Read(r, binary.LittleEndian, &ll); err != nil {
+				return err
+			}
+
+			b := make([]byte, ll)
+
+			n, err := io.ReadFull(b)
+			if err != nil {
+				return err
+			}
+
+			if n != int(ll) {
+				return fmt.Errorf("unexpected string length")
+			}
+
+			*v = string(b)
+		case *Fcall:
+			var size uint32
+			if err := read9p(buffered, &size); err != nil {
+				return err
+			}
+
+			p := make([]byte, size)
+			n, err := io.ReadFull(p)
+			if err != nil {
+				return err
+			}
+
+			if n != size {
+				return fmt.Errorf("error reading fcall: short read")
+			}
+
+			return v.UnmarshalBinary(p)
+		default:
+			if err := binary.Read(r, binary.LittleEndian, v); err != nil {
+				return err
+			}
+		}
+	}
+}
blob - /dev/null
blob + df87804033610f46a568197526e072fb5292a7c9 (mode 644)
--- /dev/null
+++ logging.go
@@ -0,0 +1,75 @@
+package p9pnew
+
+import (
+	"log"
+	"os"
+)
+
+type logging struct {
+	session Session
+	logger  log.Logger
+}
+
+var _ Session = &logging{}
+
+func NewLogger(prefix string, session Session) Session {
+	return &logging{
+		session: session,
+		logger:  *log.New(os.Stdout, prefix, 0),
+	}
+}
+
+func (l *logging) Auth(afid Fid, uname, aname string) (Qid, error) {
+	qid, err := l.session.Auth(afid, uname, aname)
+	l.logger.Printf("Auth(%v, %s, %s) -> (%v, %v)", afid, uname, aname, qid, err)
+	return qid, err
+}
+
+func (l *logging) Attach(fid, afid Fid, uname, aname string) (Qid, error) {
+	qid, err := l.session.Attach(fid, afid, uname, aname)
+	l.logger.Printf("Attach(%v, %v, %s, %s) -> (%v, %v)", fid, afid, uname, aname, qid, err)
+	return qid, err
+}
+
+func (l *logging) Clunk(fid Fid) error {
+	return l.session.Clunk(fid)
+}
+
+func (l *logging) Remove(fid Fid) (err error) {
+	defer func() {
+		l.logger.Printf("Remove(%v) -> %v", fid, err)
+	}()
+	return l.session.Remove(fid)
+}
+
+func (l *logging) Walk(fid Fid, newfid Fid, names ...string) ([]Qid, error) {
+	return l.session.Walk(fid, newfid, names...)
+}
+
+func (l *logging) Read(fid Fid, p []byte, offset int64) (n int, err error) {
+	return l.session.Read(fid, p, offset)
+}
+
+func (l *logging) Write(fid Fid, p []byte, offset int64) (n int, err error) {
+	return l.session.Write(fid, p, offset)
+}
+
+func (l *logging) Open(fid Fid, mode int32) (Qid, error) {
+	return l.session.Open(fid, mode)
+}
+
+func (l *logging) Create(parent Fid, name string, perm uint32, mode uint32) (Qid, error) {
+	return l.session.Create(parent, name, perm, mode)
+}
+
+func (l *logging) Stat(fid Fid) (Dir, error) {
+	return l.session.Stat(fid)
+}
+
+func (l *logging) WStat(fid Fid, dir Dir) error {
+	return l.session.WStat(fid, dir)
+}
+
+func (l *logging) Version(msize int32, version string) (int32, string, error) {
+	return l.session.Version(msize, version)
+}
blob - /dev/null
blob + 287a302388f8565d8ce9c6fcb628fa154a9d1a7a (mode 644)
--- /dev/null
+++ session.go
@@ -0,0 +1,49 @@
+package p9pnew
+
+import (
+	"net"
+
+	"golang.org/x/net/context"
+)
+
+// Session provides the central abstraction for a 9p connection. Clients
+// implement sessions and servers serve sessions. Sessions can be proxied by
+// serving up a client session.
+//
+// The interface is also wired up with full context support to manage timeouts
+// and resource clean up.
+//
+// Session represents the operations covered in section 5 of the plan 9 manual
+// (http://man.cat-v.org/plan_9/5/). Requests are managed internally, so the
+// Flush method is handled by the internal implementation. Consider preceeding
+// these all with context to control request timeout.
+type Session interface {
+	Auth(ctx context.Context, afid Fid, uname, aname string) (Qid, error)
+	Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error)
+	Clunk(ctx context.Context, fid Fid) error
+	Remove(ctx context.Context, fid Fid) error
+	Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error)
+	Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error)
+	Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error)
+	Open(ctx context.Context, fid Fid, mode int32) (Qid, error)
+	Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error)
+	Stat(context.Context, Fid) (Dir, error)
+	WStat(context.Context, Fid, Dir) error
+
+	// TODO(stevvooe): The version message affects a lot of protocol behavior.
+	// Consider hiding it behind the implementation, letting the version get
+	// negotiated. The API user should still be able to query it.
+	Version(ctx context.Context, msize int32, version string) (int32, string, error)
+}
+
+func Dial(addr string) (Session, error) {
+	c, err := net.Dial("tcp", addr)
+	if err != nil {
+		return nil, err
+	}
+
+	// BUG(stevvooe): Session doesn't actually close connection. Dial might
+	// not be the right interface.
+
+	return NewSession(c)
+}
blob - /dev/null
blob + 4bea0b041f5c0da62d20e12af577067a28bf09e8 (mode 644)
--- /dev/null
+++ tags.go
@@ -0,0 +1,67 @@
+package p9pnew
+
+import "fmt"
+
+// tagPool implements a free list to manage tags for outstanding 9p requests.
+type tagPool struct {
+	maximum  Tag
+	freelist chan Tag // buffered to maximum
+	nexttag  chan Tag // synchronous until max allocated
+	closed   chan struct{}
+}
+
+func (tp *tagPool) Get() Tag {
+	select {
+	case <-tp.closed:
+		panic("tag pool is closed")
+	case t := <-tp.freelist:
+		return t
+	case t := <-tp.nexttag:
+		return t
+	}
+}
+
+func (tp *tagPool) next() {
+	var next Tag
+
+	for {
+		select {
+		case <-tp.closed:
+			return
+		case tp.nexttag <- next:
+			next++
+
+			if next >= tp.maximum {
+				return // exhausted, exit this loop
+			}
+		}
+	}
+}
+
+func (tp *tagPool) Put(tag Tag) {
+	select {
+	case tp.freelist <- tag:
+	case <-tp.closed:
+	}
+}
+
+func (tp *tagPool) Close() error {
+	select {
+	case <-tp.closed:
+		return fmt.Errorf("closed")
+	default:
+		close(tp.closed)
+	}
+	return nil
+}
+
+// NewtagPool returns a tag pool with the maximum number of outstanding
+// requests.
+func newTagPool(outstanding int) (*tagPool, error) {
+	return &tagPool{
+		maximum:  Tag(outstanding),
+		freelist: make(chan Tag, outstanding),
+		nexttag:  make(chan Tag),
+		closed:   make(chan struct{}),
+	}, nil
+}
blob - /dev/null
blob + 23ab0c4ea3426ea68d3967bfc611167ad034ef77 (mode 644)
--- /dev/null
+++ types.go
@@ -0,0 +1,134 @@
+package p9pnew
+
+import (
+	"encoding"
+
+	"time"
+)
+
+const (
+	NOFID = ^Fid(0)
+	NOTAG = ^Tag(0)
+)
+
+const (
+	DMDIR    = 0x80000000 /* mode bit for directories */
+	DMAPPEND = 0x40000000 /* mode bit for append only files */
+	DMEXCL   = 0x20000000 /* mode bit for exclusive use files */
+	DMMOUNT  = 0x10000000 /* mode bit for mounted channel */
+	DMAUTH   = 0x08000000 /* mode bit for authentication file */
+	DMTMP    = 0x04000000 /* mode bit for non-backed-up files */
+	DMREAD   = 0x4        /* mode bit for read permission */
+	DMWRITE  = 0x2        /* mode bit for write permission */
+	DMEXEC   = 0x1        /* mode bit for execute permission */
+)
+
+const (
+	OREAD   = 0      /* open for read */
+	OWRITE  = 1      /* write */
+	ORDWR   = 2      /* read and write */
+	OEXEC   = 3      /* execute, == read but check execute permission */
+	OTRUNC  = 16     /* or'ed in (except for exec), truncate file first */
+	OCEXEC  = 32     /* or'ed in, close on exec */
+	ORCLOSE = 64     /* or'ed in, remove on close */
+	OEXCL   = 0x1000 /* or'ed in, exclusive use (create only) */
+)
+
+type QType uint8
+
+const (
+	QTDIR    QType = 0x80 // type bit for directories
+	QTAPPEND QType = 0x40 // type bit for append only files
+	QTEXCL   QType = 0x20 // type bit for exclusive use files
+	QTMOUNT  QType = 0x10 // type bit for mounted channel
+	QTAUTH   QType = 0x08 // type bit for authentication file
+	QTTMP    QType = 0x04 // type bit for not-backed-up file
+	QTFILE   QType = 0x00 // plain file */
+)
+
+type Fid uint32
+
+type Qid struct {
+	Type    QType
+	Version uint32
+	Path    uint64
+}
+
+type Dir struct {
+	Type   uint16
+	Dev    uint32
+	Qid    Qid
+	Mode   uint32
+	Length uint64
+	Name   string
+
+	AccessTime       time.Time // TODO(stevvooe): Need special serialization type.
+	ModificationTime time.Time
+
+	/* Not really used for our implementation */
+	UID  string
+	GID  string
+	MUID string
+
+	// TODO(stevvooe): 9p2000.u/L should go here.
+}
+
+//
+type Tag uint16
+
+type FcallType uint8
+
+const (
+	FcallTypeTversion FcallType = iota + 100
+	FcallTypeRversion
+	FcallTypeTauth
+	FcallTypeRauth
+	FcallTypeTattach
+	FcallTypeRattach
+	FcallTypeTerror
+	FcallTypeRerror
+	FcallTypeTflush
+	FcallTypeRflush
+	FcallTypeTwalk
+	FcallTypeRwalk
+	FcallTypeTopen
+	FcallTypeRopen
+	FcallTypeTcreate
+	FcallTypeRcreate
+	FcallTypeTread
+	FcallTypeRread
+	FcallTypeTwrite
+	FcallTypeRwrite
+	FcallTypeTclunk
+	FcallTypeRclunk
+	FcallTypeTremove
+	FcallTypeRremove
+	FcallTypeTstat
+	FcallTypeRstat
+	FcallTypeTwstat
+	FcallTypeRwstat
+	FcallTypeTmax
+)
+
+type Fcall struct {
+	Type    Type
+	Fid     Fid
+	Tag     Tag
+	Message Message
+}
+
+type Message interface {
+	Size() int
+	encoding.BinaryMarshaler
+	encoding.BinaryUnmarshaler
+}
+
+type MessageVersion struct {
+	MSize   uint32
+	Version string
+}
+
+func (mv MessageVersion) MarshalBinary() ([]byte, error) {
+
+	encoding.BinaryMarshaler
+}