commit 8a7ec69711074b12b52a63f9eb61ce8cc82425bb from: Adrien Duermael date: Mon Oct 26 23:41:52 2015 UTC Both filesystem interfaces in master (fs & fs_tmp) commit - f3666390156e312941e5ab87cae79dcfb738e315 commit + 8a7ec69711074b12b52a63f9eb61ce8cc82425bb blob - /dev/null blob + 1e7044f40483d25674fb40acc8b2a71b900925c7 (mode 644) --- /dev/null +++ client.go @@ -0,0 +1,216 @@ +package p9pnew + +import ( + "bufio" + "fmt" + "log" + "time" + + "golang.org/x/net/context" + + "net" +) + +type client struct { + conn net.Conn + tags *tagPool + requests chan *fcallRequest +} + +// NewSession returns a session using the connection. +func NewSession(conn net.Conn) (Session, error) { + return &client{ + conn: conn, + } +} + +var _ Session = &client{} + +func (c *client) Auth(ctx context.Context, afid Fid, uname, aname string) (Qid, error) { + panic("not implemented") +} + +func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) { + panic("not implemented") +} + +func (c *client) Clunk(ctx context.Context, fid Fid) error { + panic("not implemented") +} + +func (c *client) Remove(ctx context.Context, fid Fid) error { + panic("not implemented") +} + +func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) { + panic("not implemented") +} + +func (c *client) Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) { + panic("not implemented") +} + +func (c *client) Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) { + panic("not implemented") +} + +func (c *client) Open(ctx context.Context, fid Fid, mode int32) (Qid, error) { + panic("not implemented") +} + +func (c *client) Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error) { + panic("not implemented") +} + +func (c *client) Stat(context.Context, Fid) (Dir, error) { + panic("not implemented") +} + +func (c *client) WStat(context.Context, Fid, Dir) error { + panic("not implemented") +} + +func (c *client) Version(ctx context.Context, msize int32, version string) (int32, string, error) { + fcall := &Fcall{ + Type: TVersion, + Tag: tag, + Message: MessageVersion{ + MSize: msize, + Version: Version, + }, + } + + resp, err := c.send(ctx, fcall) + if err != nil { + return 0, "", err + } + + mv, ok := resp.Message.(*MessageVersion) + if !ok { + return fmt.Errorf("invalid rpc response for version message: %v", resp) + } + + return mv.MSize, mv.Version, nil +} + +// send dispatches the fcall. +func (c *client) send(ctx context.Context, fc *Fcall) (*Fcall, error) { + fc.Tag = c.tags.Get() + defer c.tags.Put(fc.Tag) + + fcreq := newFcallRequest(ctx, fc) + + // dispatch the request. + select { + case <-c.closed: + return nil, ErrClosed + case c.requests <- fcreq: + case <-ctx.Done(): + return nil, ctx.Err() + } + + // wait for the response. + select { + case <-closed: + return nil, ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + case resp := <-fcreq.response: + return resp, nil + } +} + +type fcallRequest struct { + ctx context.Context + fcall *Fcall + response chan *Fcall + err chan error +} + +func newFcallRequest(ctx context.Context, fc *Fcall) fcallRequest { + return fcallRequest{ + ctx: ctx, + fcall: fc, + response: make(chan *Fcall, 1), + err: make(chan err, 1), + } +} + +// handle takes messages off the wire and wakes up the waiting tag call. +func (c *client) handle() { + + var ( + responses = make(chan *Fcall) + // outstanding provides a map of tags to outstanding requests. + outstanding = map[Tag]*fcallRequest{} + ) + + // loop to read messages off of the connection + go func() { + r := bufio.NewReader(c.conn) + + loop: + for { + // Continuously set the read dead line pump the loop below. We can + // probably set a connection dead threshold that can count these. + // Usually, this would only matter when there are actually + // outstanding requests. + if err := c.conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil { + panic(fmt.Sprintf("error setting read deadline: %v", err)) + } + + fc := new(Fcall) + if err := read9p(r, fc); err != nil { + switch err := err.(type) { + case net.Error: + if err.Timeout() || err.Temporary() { + break loop + } + } + + panic(fmt.Sprintf("connection read error: %v", err)) + } + + select { + case <-closed: + return + case responses <- fc: + } + } + + }() + + w := bufio.NewWriter(c.conn) + + for { + select { + case <-c.closed: + return + case req := <-c.requests: + outstanding[req.fcall.Tag] = req + + // use deadline to set write deadline for this request. + deadline, ok := req.ctx.Deadline() + if !ok { + deadline = time.Now().Add(time.Second) + } + + if err := c.conn.SetWriteDeadline(deadline); err != nil { + log.Println("error setting write deadline: %v", err) + } + + if err := write9p(w, req.fcall); err != nil { + delete(outstanding, req.fcall.Tag) + req.err <- err + } + case b := <-responses: + req, ok := outstanding[b.Tag] + if !ok { + panic("unknown tag received") + } + delete(outstanding, req.fcall.Tag) + + req.response <- b + } + } +} blob - /dev/null blob + 22235424d65bc142f1cb44c60aa1ccf7012ed76b (mode 644) --- /dev/null +++ errors.go @@ -0,0 +1,20 @@ +package p9pnew + +import ( + "errors" + "fmt" +) + +type Error struct { + Name string +} + +func (e Error) Error() string { + return fmt.Sprintf("9p: %v", e.Name) +} + +var ( + ErrClosed = errors.New("closed") + + ErrUnknownfid = Error{Name: "unknown fid"} +) blob - /dev/null blob + 73b68dd64117abec8a2328e35d03d23921bd9f42 (mode 644) --- /dev/null +++ fcall.go @@ -0,0 +1,261 @@ +package p9pnew + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "encoding" +) + +type FcallType uint8 + +const ( + Tversion FcallType = iota + 100 + Rversion + Tauth + Rauth + Tattach + Rattach + Terror + Rerror + Tflush + Rflush + Twalk + Rwalk + Topen + Ropen + Tcreate + Rcreate + Tread + Rread + Twrite + Rwrite + Tclunk + Rclunk + Tremove + Rremove + Tstat + Rstat + Twstat + Rwstat + Tmax +) + +type Fcall struct { + Type Type + Tag Tag + Message Message +} + +const ( + fcallHeaderSize = 4 /*size*/ + 1 /*type*/ +) + +func (fc *Fcall) Size() int { + return fcallHeaderSize + fc.Message.Size() +} + +func (fc *Fcall) MarshalBinary() ([]byte, error) { + mp, err := fc.Message.MarshalBinary() + if err != nil { + return nil, err + } + + b := bytes.NewBuffer(make([]byte, 0, fc.Size())) + if err := write9p(b, fc.Size(), fc.Tag, mp); err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +func (fc *Fcall) UnmarshalBinary(p []data) error { + var ( + r = bytes.NewReader(p) + ) + + if err := read9p(r, &fc.Type, &fc.Tag); err != nil { + return err + } + + switch fc.Type { + case Tversion, Rversion: + fc.Message = &MessageVersion{} + case Tauth: + + case Rauth: + + case Tattach: + + case Rattach: + + case Terror: + + case Rerror: + + case Tflush: + + case Rflush: + + case Twalk: + + case Rwalk: + + case Topen: + + case Ropen: + + case Tcreate: + + case Rcreate: + + case Tread: + + case Rread: + + case Twrite: + + case Rwrite: + + case Tclunk: + + case Rclunk: + + case Tremove: + + case Rremove: + + case Tstat: + + case Rstat: + + case Twstat: + + case Rwstat: + + } + + return fc.Message.UnmarshalBinary(p[len(p)-r.Len():]) +} + +type Message interface { + Size() int + encoding.BinaryMarshaler + encoding.BinaryUnmarshaler +} + +// MessageVersion encodes the message body for Tversion and Rversion RPC +// calls. The body is identical in both directions. +type MessageVersion struct { + MSize uint32 + Version string +} + +func (mv MessageVersion) Size() int { + return 4 + 2 + len(mv.Version) +} + +func (mv MessageVersion) MarshalBinary() ([]byte, error) { + b := bytes.NewBuffer(make([]byte, 0, mv.Size())) + + if err := write9p(b, mv.MSize, mv.Version); err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +// write9p implements serialization for base types. +func write9p(w io.Writer, vs ...interface{}) error { + for _, v := range vs { + switch v := v.(type) { + case string: + // implement string[s] encoding + if err := binary.Write(w, binary.LittleEndian, uint16(len(v))); err != nil { + return err + } + + _, err := io.WriteString(w, s) + if err != nil { + + return err + } + case *Fcall: + if err := write9p(w, v.Size()); err != nil { + return err + } + p, err := v.MarshalBinary() + if err != nil { + return err + } + + n, err := w.Write(p) + if err != nil { + return err + } + + if n != len(p) { + return io.ErrShortWrite + } + + return nil + default: + if err := binary.Write(w, binary.LittleEndian, v); err != nil { + return err + } + } + } + + return nil +} + +// read9p extracts values from rd and unmarshals them to the targets of vs. +func read9p(rd io.Reader, vs ...interface{}) error { + for _, v := range vs { + switch v := v.(type) { + case *string: + var ll uint16 + + // implement string[s] encoding + if err := binary.Read(r, binary.LittleEndian, &ll); err != nil { + return err + } + + b := make([]byte, ll) + + n, err := io.ReadFull(b) + if err != nil { + return err + } + + if n != int(ll) { + return fmt.Errorf("unexpected string length") + } + + *v = string(b) + case *Fcall: + var size uint32 + if err := read9p(buffered, &size); err != nil { + return err + } + + p := make([]byte, size) + n, err := io.ReadFull(p) + if err != nil { + return err + } + + if n != size { + return fmt.Errorf("error reading fcall: short read") + } + + return v.UnmarshalBinary(p) + default: + if err := binary.Read(r, binary.LittleEndian, v); err != nil { + return err + } + } + } +} blob - /dev/null blob + df87804033610f46a568197526e072fb5292a7c9 (mode 644) --- /dev/null +++ logging.go @@ -0,0 +1,75 @@ +package p9pnew + +import ( + "log" + "os" +) + +type logging struct { + session Session + logger log.Logger +} + +var _ Session = &logging{} + +func NewLogger(prefix string, session Session) Session { + return &logging{ + session: session, + logger: *log.New(os.Stdout, prefix, 0), + } +} + +func (l *logging) Auth(afid Fid, uname, aname string) (Qid, error) { + qid, err := l.session.Auth(afid, uname, aname) + l.logger.Printf("Auth(%v, %s, %s) -> (%v, %v)", afid, uname, aname, qid, err) + return qid, err +} + +func (l *logging) Attach(fid, afid Fid, uname, aname string) (Qid, error) { + qid, err := l.session.Attach(fid, afid, uname, aname) + l.logger.Printf("Attach(%v, %v, %s, %s) -> (%v, %v)", fid, afid, uname, aname, qid, err) + return qid, err +} + +func (l *logging) Clunk(fid Fid) error { + return l.session.Clunk(fid) +} + +func (l *logging) Remove(fid Fid) (err error) { + defer func() { + l.logger.Printf("Remove(%v) -> %v", fid, err) + }() + return l.session.Remove(fid) +} + +func (l *logging) Walk(fid Fid, newfid Fid, names ...string) ([]Qid, error) { + return l.session.Walk(fid, newfid, names...) +} + +func (l *logging) Read(fid Fid, p []byte, offset int64) (n int, err error) { + return l.session.Read(fid, p, offset) +} + +func (l *logging) Write(fid Fid, p []byte, offset int64) (n int, err error) { + return l.session.Write(fid, p, offset) +} + +func (l *logging) Open(fid Fid, mode int32) (Qid, error) { + return l.session.Open(fid, mode) +} + +func (l *logging) Create(parent Fid, name string, perm uint32, mode uint32) (Qid, error) { + return l.session.Create(parent, name, perm, mode) +} + +func (l *logging) Stat(fid Fid) (Dir, error) { + return l.session.Stat(fid) +} + +func (l *logging) WStat(fid Fid, dir Dir) error { + return l.session.WStat(fid, dir) +} + +func (l *logging) Version(msize int32, version string) (int32, string, error) { + return l.session.Version(msize, version) +} blob - /dev/null blob + 287a302388f8565d8ce9c6fcb628fa154a9d1a7a (mode 644) --- /dev/null +++ session.go @@ -0,0 +1,49 @@ +package p9pnew + +import ( + "net" + + "golang.org/x/net/context" +) + +// Session provides the central abstraction for a 9p connection. Clients +// implement sessions and servers serve sessions. Sessions can be proxied by +// serving up a client session. +// +// The interface is also wired up with full context support to manage timeouts +// and resource clean up. +// +// Session represents the operations covered in section 5 of the plan 9 manual +// (http://man.cat-v.org/plan_9/5/). Requests are managed internally, so the +// Flush method is handled by the internal implementation. Consider preceeding +// these all with context to control request timeout. +type Session interface { + Auth(ctx context.Context, afid Fid, uname, aname string) (Qid, error) + Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) + Clunk(ctx context.Context, fid Fid) error + Remove(ctx context.Context, fid Fid) error + Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) + Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) + Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) + Open(ctx context.Context, fid Fid, mode int32) (Qid, error) + Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error) + Stat(context.Context, Fid) (Dir, error) + WStat(context.Context, Fid, Dir) error + + // TODO(stevvooe): The version message affects a lot of protocol behavior. + // Consider hiding it behind the implementation, letting the version get + // negotiated. The API user should still be able to query it. + Version(ctx context.Context, msize int32, version string) (int32, string, error) +} + +func Dial(addr string) (Session, error) { + c, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + + // BUG(stevvooe): Session doesn't actually close connection. Dial might + // not be the right interface. + + return NewSession(c) +} blob - /dev/null blob + 4bea0b041f5c0da62d20e12af577067a28bf09e8 (mode 644) --- /dev/null +++ tags.go @@ -0,0 +1,67 @@ +package p9pnew + +import "fmt" + +// tagPool implements a free list to manage tags for outstanding 9p requests. +type tagPool struct { + maximum Tag + freelist chan Tag // buffered to maximum + nexttag chan Tag // synchronous until max allocated + closed chan struct{} +} + +func (tp *tagPool) Get() Tag { + select { + case <-tp.closed: + panic("tag pool is closed") + case t := <-tp.freelist: + return t + case t := <-tp.nexttag: + return t + } +} + +func (tp *tagPool) next() { + var next Tag + + for { + select { + case <-tp.closed: + return + case tp.nexttag <- next: + next++ + + if next >= tp.maximum { + return // exhausted, exit this loop + } + } + } +} + +func (tp *tagPool) Put(tag Tag) { + select { + case tp.freelist <- tag: + case <-tp.closed: + } +} + +func (tp *tagPool) Close() error { + select { + case <-tp.closed: + return fmt.Errorf("closed") + default: + close(tp.closed) + } + return nil +} + +// NewtagPool returns a tag pool with the maximum number of outstanding +// requests. +func newTagPool(outstanding int) (*tagPool, error) { + return &tagPool{ + maximum: Tag(outstanding), + freelist: make(chan Tag, outstanding), + nexttag: make(chan Tag), + closed: make(chan struct{}), + }, nil +} blob - /dev/null blob + 23ab0c4ea3426ea68d3967bfc611167ad034ef77 (mode 644) --- /dev/null +++ types.go @@ -0,0 +1,134 @@ +package p9pnew + +import ( + "encoding" + + "time" +) + +const ( + NOFID = ^Fid(0) + NOTAG = ^Tag(0) +) + +const ( + DMDIR = 0x80000000 /* mode bit for directories */ + DMAPPEND = 0x40000000 /* mode bit for append only files */ + DMEXCL = 0x20000000 /* mode bit for exclusive use files */ + DMMOUNT = 0x10000000 /* mode bit for mounted channel */ + DMAUTH = 0x08000000 /* mode bit for authentication file */ + DMTMP = 0x04000000 /* mode bit for non-backed-up files */ + DMREAD = 0x4 /* mode bit for read permission */ + DMWRITE = 0x2 /* mode bit for write permission */ + DMEXEC = 0x1 /* mode bit for execute permission */ +) + +const ( + OREAD = 0 /* open for read */ + OWRITE = 1 /* write */ + ORDWR = 2 /* read and write */ + OEXEC = 3 /* execute, == read but check execute permission */ + OTRUNC = 16 /* or'ed in (except for exec), truncate file first */ + OCEXEC = 32 /* or'ed in, close on exec */ + ORCLOSE = 64 /* or'ed in, remove on close */ + OEXCL = 0x1000 /* or'ed in, exclusive use (create only) */ +) + +type QType uint8 + +const ( + QTDIR QType = 0x80 // type bit for directories + QTAPPEND QType = 0x40 // type bit for append only files + QTEXCL QType = 0x20 // type bit for exclusive use files + QTMOUNT QType = 0x10 // type bit for mounted channel + QTAUTH QType = 0x08 // type bit for authentication file + QTTMP QType = 0x04 // type bit for not-backed-up file + QTFILE QType = 0x00 // plain file */ +) + +type Fid uint32 + +type Qid struct { + Type QType + Version uint32 + Path uint64 +} + +type Dir struct { + Type uint16 + Dev uint32 + Qid Qid + Mode uint32 + Length uint64 + Name string + + AccessTime time.Time // TODO(stevvooe): Need special serialization type. + ModificationTime time.Time + + /* Not really used for our implementation */ + UID string + GID string + MUID string + + // TODO(stevvooe): 9p2000.u/L should go here. +} + +// +type Tag uint16 + +type FcallType uint8 + +const ( + FcallTypeTversion FcallType = iota + 100 + FcallTypeRversion + FcallTypeTauth + FcallTypeRauth + FcallTypeTattach + FcallTypeRattach + FcallTypeTerror + FcallTypeRerror + FcallTypeTflush + FcallTypeRflush + FcallTypeTwalk + FcallTypeRwalk + FcallTypeTopen + FcallTypeRopen + FcallTypeTcreate + FcallTypeRcreate + FcallTypeTread + FcallTypeRread + FcallTypeTwrite + FcallTypeRwrite + FcallTypeTclunk + FcallTypeRclunk + FcallTypeTremove + FcallTypeRremove + FcallTypeTstat + FcallTypeRstat + FcallTypeTwstat + FcallTypeRwstat + FcallTypeTmax +) + +type Fcall struct { + Type Type + Fid Fid + Tag Tag + Message Message +} + +type Message interface { + Size() int + encoding.BinaryMarshaler + encoding.BinaryUnmarshaler +} + +type MessageVersion struct { + MSize uint32 + Version string +} + +func (mv MessageVersion) MarshalBinary() ([]byte, error) { + + encoding.BinaryMarshaler +}