commit - f3666390156e312941e5ab87cae79dcfb738e315
commit + 8a7ec69711074b12b52a63f9eb61ce8cc82425bb
blob - /dev/null
blob + 1e7044f40483d25674fb40acc8b2a71b900925c7 (mode 644)
--- /dev/null
+++ client.go
+package p9pnew
+
+import (
+ "bufio"
+ "fmt"
+ "log"
+ "time"
+
+ "golang.org/x/net/context"
+
+ "net"
+)
+
+type client struct {
+ conn net.Conn
+ tags *tagPool
+ requests chan *fcallRequest
+}
+
+// NewSession returns a session using the connection.
+func NewSession(conn net.Conn) (Session, error) {
+ return &client{
+ conn: conn,
+ }
+}
+
+var _ Session = &client{}
+
+func (c *client) Auth(ctx context.Context, afid Fid, uname, aname string) (Qid, error) {
+ panic("not implemented")
+}
+
+func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) {
+ panic("not implemented")
+}
+
+func (c *client) Clunk(ctx context.Context, fid Fid) error {
+ panic("not implemented")
+}
+
+func (c *client) Remove(ctx context.Context, fid Fid) error {
+ panic("not implemented")
+}
+
+func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) {
+ panic("not implemented")
+}
+
+func (c *client) Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) {
+ panic("not implemented")
+}
+
+func (c *client) Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) {
+ panic("not implemented")
+}
+
+func (c *client) Open(ctx context.Context, fid Fid, mode int32) (Qid, error) {
+ panic("not implemented")
+}
+
+func (c *client) Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error) {
+ panic("not implemented")
+}
+
+func (c *client) Stat(context.Context, Fid) (Dir, error) {
+ panic("not implemented")
+}
+
+func (c *client) WStat(context.Context, Fid, Dir) error {
+ panic("not implemented")
+}
+
+func (c *client) Version(ctx context.Context, msize int32, version string) (int32, string, error) {
+ fcall := &Fcall{
+ Type: TVersion,
+ Tag: tag,
+ Message: MessageVersion{
+ MSize: msize,
+ Version: Version,
+ },
+ }
+
+ resp, err := c.send(ctx, fcall)
+ if err != nil {
+ return 0, "", err
+ }
+
+ mv, ok := resp.Message.(*MessageVersion)
+ if !ok {
+ return fmt.Errorf("invalid rpc response for version message: %v", resp)
+ }
+
+ return mv.MSize, mv.Version, nil
+}
+
+// send dispatches the fcall.
+func (c *client) send(ctx context.Context, fc *Fcall) (*Fcall, error) {
+ fc.Tag = c.tags.Get()
+ defer c.tags.Put(fc.Tag)
+
+ fcreq := newFcallRequest(ctx, fc)
+
+ // dispatch the request.
+ select {
+ case <-c.closed:
+ return nil, ErrClosed
+ case c.requests <- fcreq:
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+
+ // wait for the response.
+ select {
+ case <-closed:
+ return nil, ErrClosed
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case resp := <-fcreq.response:
+ return resp, nil
+ }
+}
+
+type fcallRequest struct {
+ ctx context.Context
+ fcall *Fcall
+ response chan *Fcall
+ err chan error
+}
+
+func newFcallRequest(ctx context.Context, fc *Fcall) fcallRequest {
+ return fcallRequest{
+ ctx: ctx,
+ fcall: fc,
+ response: make(chan *Fcall, 1),
+ err: make(chan err, 1),
+ }
+}
+
+// handle takes messages off the wire and wakes up the waiting tag call.
+func (c *client) handle() {
+
+ var (
+ responses = make(chan *Fcall)
+ // outstanding provides a map of tags to outstanding requests.
+ outstanding = map[Tag]*fcallRequest{}
+ )
+
+ // loop to read messages off of the connection
+ go func() {
+ r := bufio.NewReader(c.conn)
+
+ loop:
+ for {
+ // Continuously set the read dead line pump the loop below. We can
+ // probably set a connection dead threshold that can count these.
+ // Usually, this would only matter when there are actually
+ // outstanding requests.
+ if err := c.conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
+ panic(fmt.Sprintf("error setting read deadline: %v", err))
+ }
+
+ fc := new(Fcall)
+ if err := read9p(r, fc); err != nil {
+ switch err := err.(type) {
+ case net.Error:
+ if err.Timeout() || err.Temporary() {
+ break loop
+ }
+ }
+
+ panic(fmt.Sprintf("connection read error: %v", err))
+ }
+
+ select {
+ case <-closed:
+ return
+ case responses <- fc:
+ }
+ }
+
+ }()
+
+ w := bufio.NewWriter(c.conn)
+
+ for {
+ select {
+ case <-c.closed:
+ return
+ case req := <-c.requests:
+ outstanding[req.fcall.Tag] = req
+
+ // use deadline to set write deadline for this request.
+ deadline, ok := req.ctx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(time.Second)
+ }
+
+ if err := c.conn.SetWriteDeadline(deadline); err != nil {
+ log.Println("error setting write deadline: %v", err)
+ }
+
+ if err := write9p(w, req.fcall); err != nil {
+ delete(outstanding, req.fcall.Tag)
+ req.err <- err
+ }
+ case b := <-responses:
+ req, ok := outstanding[b.Tag]
+ if !ok {
+ panic("unknown tag received")
+ }
+ delete(outstanding, req.fcall.Tag)
+
+ req.response <- b
+ }
+ }
+}
blob - /dev/null
blob + 22235424d65bc142f1cb44c60aa1ccf7012ed76b (mode 644)
--- /dev/null
+++ errors.go
+package p9pnew
+
+import (
+ "errors"
+ "fmt"
+)
+
+type Error struct {
+ Name string
+}
+
+func (e Error) Error() string {
+ return fmt.Sprintf("9p: %v", e.Name)
+}
+
+var (
+ ErrClosed = errors.New("closed")
+
+ ErrUnknownfid = Error{Name: "unknown fid"}
+)
blob - /dev/null
blob + 73b68dd64117abec8a2328e35d03d23921bd9f42 (mode 644)
--- /dev/null
+++ fcall.go
+package p9pnew
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+
+ "encoding"
+)
+
+type FcallType uint8
+
+const (
+ Tversion FcallType = iota + 100
+ Rversion
+ Tauth
+ Rauth
+ Tattach
+ Rattach
+ Terror
+ Rerror
+ Tflush
+ Rflush
+ Twalk
+ Rwalk
+ Topen
+ Ropen
+ Tcreate
+ Rcreate
+ Tread
+ Rread
+ Twrite
+ Rwrite
+ Tclunk
+ Rclunk
+ Tremove
+ Rremove
+ Tstat
+ Rstat
+ Twstat
+ Rwstat
+ Tmax
+)
+
+type Fcall struct {
+ Type Type
+ Tag Tag
+ Message Message
+}
+
+const (
+ fcallHeaderSize = 4 /*size*/ + 1 /*type*/
+)
+
+func (fc *Fcall) Size() int {
+ return fcallHeaderSize + fc.Message.Size()
+}
+
+func (fc *Fcall) MarshalBinary() ([]byte, error) {
+ mp, err := fc.Message.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+
+ b := bytes.NewBuffer(make([]byte, 0, fc.Size()))
+ if err := write9p(b, fc.Size(), fc.Tag, mp); err != nil {
+ return nil, err
+ }
+
+ return b.Bytes(), nil
+}
+
+func (fc *Fcall) UnmarshalBinary(p []data) error {
+ var (
+ r = bytes.NewReader(p)
+ )
+
+ if err := read9p(r, &fc.Type, &fc.Tag); err != nil {
+ return err
+ }
+
+ switch fc.Type {
+ case Tversion, Rversion:
+ fc.Message = &MessageVersion{}
+ case Tauth:
+
+ case Rauth:
+
+ case Tattach:
+
+ case Rattach:
+
+ case Terror:
+
+ case Rerror:
+
+ case Tflush:
+
+ case Rflush:
+
+ case Twalk:
+
+ case Rwalk:
+
+ case Topen:
+
+ case Ropen:
+
+ case Tcreate:
+
+ case Rcreate:
+
+ case Tread:
+
+ case Rread:
+
+ case Twrite:
+
+ case Rwrite:
+
+ case Tclunk:
+
+ case Rclunk:
+
+ case Tremove:
+
+ case Rremove:
+
+ case Tstat:
+
+ case Rstat:
+
+ case Twstat:
+
+ case Rwstat:
+
+ }
+
+ return fc.Message.UnmarshalBinary(p[len(p)-r.Len():])
+}
+
+type Message interface {
+ Size() int
+ encoding.BinaryMarshaler
+ encoding.BinaryUnmarshaler
+}
+
+// MessageVersion encodes the message body for Tversion and Rversion RPC
+// calls. The body is identical in both directions.
+type MessageVersion struct {
+ MSize uint32
+ Version string
+}
+
+func (mv MessageVersion) Size() int {
+ return 4 + 2 + len(mv.Version)
+}
+
+func (mv MessageVersion) MarshalBinary() ([]byte, error) {
+ b := bytes.NewBuffer(make([]byte, 0, mv.Size()))
+
+ if err := write9p(b, mv.MSize, mv.Version); err != nil {
+ return nil, err
+ }
+
+ return b.Bytes(), nil
+}
+
+// write9p implements serialization for base types.
+func write9p(w io.Writer, vs ...interface{}) error {
+ for _, v := range vs {
+ switch v := v.(type) {
+ case string:
+ // implement string[s] encoding
+ if err := binary.Write(w, binary.LittleEndian, uint16(len(v))); err != nil {
+ return err
+ }
+
+ _, err := io.WriteString(w, s)
+ if err != nil {
+
+ return err
+ }
+ case *Fcall:
+ if err := write9p(w, v.Size()); err != nil {
+ return err
+ }
+ p, err := v.MarshalBinary()
+ if err != nil {
+ return err
+ }
+
+ n, err := w.Write(p)
+ if err != nil {
+ return err
+ }
+
+ if n != len(p) {
+ return io.ErrShortWrite
+ }
+
+ return nil
+ default:
+ if err := binary.Write(w, binary.LittleEndian, v); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+// read9p extracts values from rd and unmarshals them to the targets of vs.
+func read9p(rd io.Reader, vs ...interface{}) error {
+ for _, v := range vs {
+ switch v := v.(type) {
+ case *string:
+ var ll uint16
+
+ // implement string[s] encoding
+ if err := binary.Read(r, binary.LittleEndian, &ll); err != nil {
+ return err
+ }
+
+ b := make([]byte, ll)
+
+ n, err := io.ReadFull(b)
+ if err != nil {
+ return err
+ }
+
+ if n != int(ll) {
+ return fmt.Errorf("unexpected string length")
+ }
+
+ *v = string(b)
+ case *Fcall:
+ var size uint32
+ if err := read9p(buffered, &size); err != nil {
+ return err
+ }
+
+ p := make([]byte, size)
+ n, err := io.ReadFull(p)
+ if err != nil {
+ return err
+ }
+
+ if n != size {
+ return fmt.Errorf("error reading fcall: short read")
+ }
+
+ return v.UnmarshalBinary(p)
+ default:
+ if err := binary.Read(r, binary.LittleEndian, v); err != nil {
+ return err
+ }
+ }
+ }
+}
blob - /dev/null
blob + df87804033610f46a568197526e072fb5292a7c9 (mode 644)
--- /dev/null
+++ logging.go
+package p9pnew
+
+import (
+ "log"
+ "os"
+)
+
+type logging struct {
+ session Session
+ logger log.Logger
+}
+
+var _ Session = &logging{}
+
+func NewLogger(prefix string, session Session) Session {
+ return &logging{
+ session: session,
+ logger: *log.New(os.Stdout, prefix, 0),
+ }
+}
+
+func (l *logging) Auth(afid Fid, uname, aname string) (Qid, error) {
+ qid, err := l.session.Auth(afid, uname, aname)
+ l.logger.Printf("Auth(%v, %s, %s) -> (%v, %v)", afid, uname, aname, qid, err)
+ return qid, err
+}
+
+func (l *logging) Attach(fid, afid Fid, uname, aname string) (Qid, error) {
+ qid, err := l.session.Attach(fid, afid, uname, aname)
+ l.logger.Printf("Attach(%v, %v, %s, %s) -> (%v, %v)", fid, afid, uname, aname, qid, err)
+ return qid, err
+}
+
+func (l *logging) Clunk(fid Fid) error {
+ return l.session.Clunk(fid)
+}
+
+func (l *logging) Remove(fid Fid) (err error) {
+ defer func() {
+ l.logger.Printf("Remove(%v) -> %v", fid, err)
+ }()
+ return l.session.Remove(fid)
+}
+
+func (l *logging) Walk(fid Fid, newfid Fid, names ...string) ([]Qid, error) {
+ return l.session.Walk(fid, newfid, names...)
+}
+
+func (l *logging) Read(fid Fid, p []byte, offset int64) (n int, err error) {
+ return l.session.Read(fid, p, offset)
+}
+
+func (l *logging) Write(fid Fid, p []byte, offset int64) (n int, err error) {
+ return l.session.Write(fid, p, offset)
+}
+
+func (l *logging) Open(fid Fid, mode int32) (Qid, error) {
+ return l.session.Open(fid, mode)
+}
+
+func (l *logging) Create(parent Fid, name string, perm uint32, mode uint32) (Qid, error) {
+ return l.session.Create(parent, name, perm, mode)
+}
+
+func (l *logging) Stat(fid Fid) (Dir, error) {
+ return l.session.Stat(fid)
+}
+
+func (l *logging) WStat(fid Fid, dir Dir) error {
+ return l.session.WStat(fid, dir)
+}
+
+func (l *logging) Version(msize int32, version string) (int32, string, error) {
+ return l.session.Version(msize, version)
+}
blob - /dev/null
blob + 287a302388f8565d8ce9c6fcb628fa154a9d1a7a (mode 644)
--- /dev/null
+++ session.go
+package p9pnew
+
+import (
+ "net"
+
+ "golang.org/x/net/context"
+)
+
+// Session provides the central abstraction for a 9p connection. Clients
+// implement sessions and servers serve sessions. Sessions can be proxied by
+// serving up a client session.
+//
+// The interface is also wired up with full context support to manage timeouts
+// and resource clean up.
+//
+// Session represents the operations covered in section 5 of the plan 9 manual
+// (http://man.cat-v.org/plan_9/5/). Requests are managed internally, so the
+// Flush method is handled by the internal implementation. Consider preceeding
+// these all with context to control request timeout.
+type Session interface {
+ Auth(ctx context.Context, afid Fid, uname, aname string) (Qid, error)
+ Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error)
+ Clunk(ctx context.Context, fid Fid) error
+ Remove(ctx context.Context, fid Fid) error
+ Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error)
+ Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error)
+ Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error)
+ Open(ctx context.Context, fid Fid, mode int32) (Qid, error)
+ Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error)
+ Stat(context.Context, Fid) (Dir, error)
+ WStat(context.Context, Fid, Dir) error
+
+ // TODO(stevvooe): The version message affects a lot of protocol behavior.
+ // Consider hiding it behind the implementation, letting the version get
+ // negotiated. The API user should still be able to query it.
+ Version(ctx context.Context, msize int32, version string) (int32, string, error)
+}
+
+func Dial(addr string) (Session, error) {
+ c, err := net.Dial("tcp", addr)
+ if err != nil {
+ return nil, err
+ }
+
+ // BUG(stevvooe): Session doesn't actually close connection. Dial might
+ // not be the right interface.
+
+ return NewSession(c)
+}
blob - /dev/null
blob + 4bea0b041f5c0da62d20e12af577067a28bf09e8 (mode 644)
--- /dev/null
+++ tags.go
+package p9pnew
+
+import "fmt"
+
+// tagPool implements a free list to manage tags for outstanding 9p requests.
+type tagPool struct {
+ maximum Tag
+ freelist chan Tag // buffered to maximum
+ nexttag chan Tag // synchronous until max allocated
+ closed chan struct{}
+}
+
+func (tp *tagPool) Get() Tag {
+ select {
+ case <-tp.closed:
+ panic("tag pool is closed")
+ case t := <-tp.freelist:
+ return t
+ case t := <-tp.nexttag:
+ return t
+ }
+}
+
+func (tp *tagPool) next() {
+ var next Tag
+
+ for {
+ select {
+ case <-tp.closed:
+ return
+ case tp.nexttag <- next:
+ next++
+
+ if next >= tp.maximum {
+ return // exhausted, exit this loop
+ }
+ }
+ }
+}
+
+func (tp *tagPool) Put(tag Tag) {
+ select {
+ case tp.freelist <- tag:
+ case <-tp.closed:
+ }
+}
+
+func (tp *tagPool) Close() error {
+ select {
+ case <-tp.closed:
+ return fmt.Errorf("closed")
+ default:
+ close(tp.closed)
+ }
+ return nil
+}
+
+// NewtagPool returns a tag pool with the maximum number of outstanding
+// requests.
+func newTagPool(outstanding int) (*tagPool, error) {
+ return &tagPool{
+ maximum: Tag(outstanding),
+ freelist: make(chan Tag, outstanding),
+ nexttag: make(chan Tag),
+ closed: make(chan struct{}),
+ }, nil
+}
blob - /dev/null
blob + 23ab0c4ea3426ea68d3967bfc611167ad034ef77 (mode 644)
--- /dev/null
+++ types.go
+package p9pnew
+
+import (
+ "encoding"
+
+ "time"
+)
+
+const (
+ NOFID = ^Fid(0)
+ NOTAG = ^Tag(0)
+)
+
+const (
+ DMDIR = 0x80000000 /* mode bit for directories */
+ DMAPPEND = 0x40000000 /* mode bit for append only files */
+ DMEXCL = 0x20000000 /* mode bit for exclusive use files */
+ DMMOUNT = 0x10000000 /* mode bit for mounted channel */
+ DMAUTH = 0x08000000 /* mode bit for authentication file */
+ DMTMP = 0x04000000 /* mode bit for non-backed-up files */
+ DMREAD = 0x4 /* mode bit for read permission */
+ DMWRITE = 0x2 /* mode bit for write permission */
+ DMEXEC = 0x1 /* mode bit for execute permission */
+)
+
+const (
+ OREAD = 0 /* open for read */
+ OWRITE = 1 /* write */
+ ORDWR = 2 /* read and write */
+ OEXEC = 3 /* execute, == read but check execute permission */
+ OTRUNC = 16 /* or'ed in (except for exec), truncate file first */
+ OCEXEC = 32 /* or'ed in, close on exec */
+ ORCLOSE = 64 /* or'ed in, remove on close */
+ OEXCL = 0x1000 /* or'ed in, exclusive use (create only) */
+)
+
+type QType uint8
+
+const (
+ QTDIR QType = 0x80 // type bit for directories
+ QTAPPEND QType = 0x40 // type bit for append only files
+ QTEXCL QType = 0x20 // type bit for exclusive use files
+ QTMOUNT QType = 0x10 // type bit for mounted channel
+ QTAUTH QType = 0x08 // type bit for authentication file
+ QTTMP QType = 0x04 // type bit for not-backed-up file
+ QTFILE QType = 0x00 // plain file */
+)
+
+type Fid uint32
+
+type Qid struct {
+ Type QType
+ Version uint32
+ Path uint64
+}
+
+type Dir struct {
+ Type uint16
+ Dev uint32
+ Qid Qid
+ Mode uint32
+ Length uint64
+ Name string
+
+ AccessTime time.Time // TODO(stevvooe): Need special serialization type.
+ ModificationTime time.Time
+
+ /* Not really used for our implementation */
+ UID string
+ GID string
+ MUID string
+
+ // TODO(stevvooe): 9p2000.u/L should go here.
+}
+
+//
+type Tag uint16
+
+type FcallType uint8
+
+const (
+ FcallTypeTversion FcallType = iota + 100
+ FcallTypeRversion
+ FcallTypeTauth
+ FcallTypeRauth
+ FcallTypeTattach
+ FcallTypeRattach
+ FcallTypeTerror
+ FcallTypeRerror
+ FcallTypeTflush
+ FcallTypeRflush
+ FcallTypeTwalk
+ FcallTypeRwalk
+ FcallTypeTopen
+ FcallTypeRopen
+ FcallTypeTcreate
+ FcallTypeRcreate
+ FcallTypeTread
+ FcallTypeRread
+ FcallTypeTwrite
+ FcallTypeRwrite
+ FcallTypeTclunk
+ FcallTypeRclunk
+ FcallTypeTremove
+ FcallTypeRremove
+ FcallTypeTstat
+ FcallTypeRstat
+ FcallTypeTwstat
+ FcallTypeRwstat
+ FcallTypeTmax
+)
+
+type Fcall struct {
+ Type Type
+ Fid Fid
+ Tag Tag
+ Message Message
+}
+
+type Message interface {
+ Size() int
+ encoding.BinaryMarshaler
+ encoding.BinaryUnmarshaler
+}
+
+type MessageVersion struct {
+ MSize uint32
+ Version string
+}
+
+func (mv MessageVersion) MarshalBinary() ([]byte, error) {
+
+ encoding.BinaryMarshaler
+}