commit e342de7d52c80e5babe159afd1452c790e709fc0 from: Adrien Duermael date: Wed Nov 11 03:01:53 2015 UTC v1/pkg/p9p commit - /dev/null commit + e342de7d52c80e5babe159afd1452c790e709fc0 blob - /dev/null blob + 0fcc697f7bb6326693044b9145f9080a391864f4 (mode 644) --- /dev/null +++ channel.go @@ -0,0 +1,243 @@ +package p9p + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "time" + + "golang.org/x/net/context" +) + +// Channel defines the operations necessary to implement a 9p message channel +// interface. Typically, message channels do no protocol processing except to +// send and receive message frames. +type Channel interface { + // ReadFcall reads one fcall frame into the provided fcall structure. The + // Fcall may be cleared whether there is an error or not. If the operation + // is successful, the contents of the fcall will be populated in the + // argument. ReadFcall cannot be called concurrently with other calls to + // ReadFcall. This both to preserve message ordering and to allow lockless + // buffer reusage. + ReadFcall(ctx context.Context, fcall *Fcall) error + + // WriteFcall writes the provided fcall to the channel. WriteFcall cannot + // be called concurrently with other calls to WriteFcall. + WriteFcall(ctx context.Context, fcall *Fcall) error + + // MSize returns the current msize for the channel. + MSize() int + + // SetMSize sets the maximum message size for the channel. This must never + // be called currently with ReadFcall or WriteFcall. + SetMSize(msize int) +} + +func NewChannel(conn net.Conn, msize int) Channel { + return newChannel(conn, codec9p{}, msize) +} + +const ( + defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context +) + +// channel provides bidirectional protocol framing for 9p over net.Conn. +// Operations are not thread-safe but reads and writes may be carried out +// concurrently, supporting separate read and write loops. +// +// Lifecyle +// +// A connection, or message channel abstraction, has a lifecycle delineated by +// Tversion/Rversion request response cycles. For now, this is part of the +// channel itself but doesn't necessarily influence the channels state, except +// the msize. Visually, it might look something like this: +// +// [Established] -> [Version] -> [Session] -> [Version]---+ +// ^ | +// |_________________________________| +// +// The connection is established, then we negotiate a version, run a session, +// then negotiate a version and so on. For most purposes, we are likely going +// to terminate the connection after the session but we may want to support +// connection pooling. Pooling may result in possible security leaks if the +// connections are shared among contexts, since the version is negotiated at +// the start of the session. To avoid this, we can actually use a "tombstone" +// version message which clears the server's session state without starting a +// new session. The next version message would then prepare the session +// without leaking any Fid's. +type channel struct { + conn net.Conn + codec Codec + brd *bufio.Reader + bwr *bufio.Writer + closed chan struct{} + msize int + rdbuf []byte +} + +func newChannel(conn net.Conn, codec Codec, msize int) *channel { + return &channel{ + conn: conn, + codec: codec, + brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size + bwr: bufio.NewWriterSize(conn, msize), + closed: make(chan struct{}), + msize: msize, + rdbuf: make([]byte, msize), + } +} + +func (ch *channel) MSize() int { + return ch.msize +} + +// setmsize resizes the buffers for use with a separate msize. This call must +// be protected by a mutex or made before passing to other goroutines. +func (ch *channel) SetMSize(msize int) { + // NOTE(stevvooe): We cannot safely resize the buffered reader and writer. + // Proceed assuming that original size is sufficient. + + ch.msize = msize + if msize < len(ch.rdbuf) { + // just change the cap + ch.rdbuf = ch.rdbuf[:msize] + return + } + + ch.rdbuf = make([]byte, msize) +} + +// ReadFcall reads the next message from the channel into fcall. +func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ch.closed: + return ErrClosed + default: + } + + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultRWTimeout) + } + + if err := ch.conn.SetReadDeadline(deadline); err != nil { + log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err) + } + + n, err := readmsg(ch.brd, ch.rdbuf) + if err != nil { + // TODO(stevvooe): There may be more we can do here to detect partial + // reads. For now, we just propagate the error untouched. + return err + } + + if n > len(ch.rdbuf) { + // TODO(stevvooe): Make this error detectable and respond with error + // message. + return fmt.Errorf("message large than buffer:", n) + } + + // clear out the fcall + *fcall = Fcall{} + if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil { + return err + } + log.Println("channel: recv", fcall) + return nil +} + +func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ch.closed: + return ErrClosed + default: + } + log.Println("channel: send", fcall) + + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultRWTimeout) + } + + if err := ch.conn.SetWriteDeadline(deadline); err != nil { + log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err) + } + + p, err := ch.codec.Marshal(fcall) + if err != nil { + return err + } + + if err := sendmsg(ch.bwr, p); err != nil { + return err + } + + return ch.bwr.Flush() +} + +// readmsg reads a 9p message into p from rd, ensuring that all bytes are +// consumed from the size header. If the size header indicates the message is +// larger than p, the entire message will be discarded, leaving a truncated +// portion in p. Any error should be treated as a framing error unless n is +// zero. The caller must check that n is less than or equal to len(p) to +// ensure that a valid message has been read. +func readmsg(rd io.Reader, p []byte) (n int, err error) { + var msize uint32 + + if err := binary.Read(rd, binary.LittleEndian, &msize); err != nil { + return 0, err + } + + n += binary.Size(msize) + mbody := int(msize) - 4 + + if mbody < len(p) { + p = p[:mbody] + } + + np, err := io.ReadFull(rd, p) + if err != nil { + return np + n, err + } + n += np + + if mbody > len(p) { + // message has been read up to len(p) but we must consume the entire + // message. This is an error condition but is non-fatal if we can + // consume msize bytes. + nn, err := io.CopyN(ioutil.Discard, rd, int64(mbody-len(p))) + n += int(nn) + if err != nil { + return n, err + } + } + + return n, nil +} + +// sendmsg writes a message of len(p) to wr with a 9p size header. All errors +// should be considered terminal. +func sendmsg(wr io.Writer, p []byte) error { + size := uint32(len(p) + 4) // message size plus 4-bytes for size. + if err := binary.Write(wr, binary.LittleEndian, size); err != nil { + return nil + } + + // This assume partial writes to wr aren't possible. Not sure if this + // valid. Matters during timeout retries. + if n, err := wr.Write(p); err != nil { + return err + } else if n < len(p) { + return io.ErrShortWrite + } + + return nil +} blob - /dev/null blob + 5405996d8c9189d9946e0229e5f1e8c0dd7cf716 (mode 644) --- /dev/null +++ client.go @@ -0,0 +1,238 @@ +package p9p + +import ( + "golang.org/x/net/context" + + "net" +) + +type client struct { + version string + msize int + ctx context.Context + transport roundTripper +} + +// NewSession returns a session using the connection. The Context ctx provides +// a context for out of bad messages, such as flushes, that may be sent by the +// session. The session can effectively shutdown with this context. +func NewSession(ctx context.Context, conn net.Conn) (Session, error) { + ch := newChannel(conn, codec9p{}, DefaultMSize) // sets msize, effectively. + + // negotiate the protocol version + version, err := clientnegotiate(ctx, ch, DefaultVersion) + if err != nil { + return nil, err + } + + return &client{ + version: version, + msize: ch.MSize(), + ctx: ctx, + transport: newTransport(ctx, ch), + }, nil +} + +var _ Session = &client{} + +func (c *client) Version() (int, string) { + return c.msize, c.version +} + +func (c *client) Auth(ctx context.Context, afid Fid, uname, aname string) (Qid, error) { + m := MessageTauth{ + Afid: afid, + Uname: uname, + Aname: aname, + } + + resp, err := c.transport.send(ctx, m) + if err != nil { + return Qid{}, nil + } + + rauth, ok := resp.(MessageRauth) + if !ok { + return Qid{}, ErrUnexpectedMsg + } + + return rauth.Qid, nil +} + +func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) { + m := MessageTattach{ + Fid: fid, + Afid: afid, + Uname: uname, + Aname: aname, + } + + resp, err := c.transport.send(ctx, m) + if err != nil { + return Qid{}, err + } + + rattach, ok := resp.(MessageRattach) + if !ok { + return Qid{}, ErrUnexpectedMsg + } + + return rattach.Qid, nil +} + +func (c *client) Clunk(ctx context.Context, fid Fid) error { + resp, err := c.transport.send(ctx, MessageTclunk{ + Fid: fid, + }) + if err != nil { + return err + } + + _, ok := resp.(MessageRclunk) + if !ok { + return ErrUnexpectedMsg + } + + return nil +} + +func (c *client) Remove(ctx context.Context, fid Fid) error { + resp, err := c.transport.send(ctx, MessageTremove{ + Fid: fid, + }) + if err != nil { + return err + } + + _, ok := resp.(MessageRremove) + if !ok { + return ErrUnexpectedMsg + } + + return nil +} + +func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) { + if len(names) > 16 { + return nil, ErrWalkLimit + } + + resp, err := c.transport.send(ctx, MessageTwalk{ + Fid: fid, + Newfid: newfid, + Wnames: names, + }) + if err != nil { + return nil, err + } + + rwalk, ok := resp.(MessageRwalk) + if !ok { + return nil, ErrUnexpectedMsg + } + + return rwalk.Qids, nil +} + +func (c *client) Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) { + resp, err := c.transport.send(ctx, MessageTread{ + Fid: fid, + Offset: uint64(offset), + Count: uint32(len(p)), + }) + if err != nil { + return 0, err + } + + rread, ok := resp.(MessageRread) + if !ok { + return 0, ErrUnexpectedMsg + } + + return copy(p, rread.Data), nil +} + +func (c *client) Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) { + resp, err := c.transport.send(ctx, MessageTwrite{ + Fid: fid, + Offset: uint64(offset), + Data: p, + }) + if err != nil { + return 0, err + } + + rwrite, ok := resp.(MessageRwrite) + if !ok { + return 0, ErrUnexpectedMsg + } + + return int(rwrite.Count), nil +} + +func (c *client) Open(ctx context.Context, fid Fid, mode Flag) (Qid, uint32, error) { + resp, err := c.transport.send(ctx, MessageTopen{ + Fid: fid, + Mode: mode, + }) + if err != nil { + return Qid{}, 0, err + } + + ropen, ok := resp.(MessageRopen) + if !ok { + return Qid{}, 0, ErrUnexpectedMsg + } + + return ropen.Qid, ropen.IOUnit, nil +} + +func (c *client) Create(ctx context.Context, parent Fid, name string, perm uint32, mode Flag) (Qid, uint32, error) { + resp, err := c.transport.send(ctx, MessageTcreate{ + Fid: parent, + Name: name, + Perm: perm, + Mode: mode, + }) + if err != nil { + return Qid{}, 0, err + } + + rcreate, ok := resp.(MessageRcreate) + if !ok { + return Qid{}, 0, ErrUnexpectedMsg + } + + return rcreate.Qid, rcreate.IOUnit, nil +} + +func (c *client) Stat(ctx context.Context, fid Fid) (Dir, error) { + resp, err := c.transport.send(ctx, MessageTstat{Fid: fid}) + if err != nil { + return Dir{}, err + } + + rstat, ok := resp.(MessageRstat) + if !ok { + return Dir{}, ErrUnexpectedMsg + } + + return rstat.Stat, nil +} + +func (c *client) WStat(ctx context.Context, fid Fid, dir Dir) error { + resp, err := c.transport.send(ctx, MessageTwstat{ + Fid: fid, + Stat: dir, + }) + if err != nil { + return err + } + + _, ok := resp.(MessageRwstat) + if !ok { + return ErrUnexpectedMsg + } + + return nil +} blob - /dev/null blob + 584f8bb69b37fa81a0428f2156f4d698a32e42f2 (mode 644) --- /dev/null +++ cmd/9pr/main.go @@ -0,0 +1,343 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "io" + "log" + "net" + "net/http" + _ "net/http/pprof" + "os" + "path" + "strings" + "text/tabwriter" + "time" + + "github.com/chzyer/readline" + "github.com/docker/pinata/v1/pkg/p9p" + "golang.org/x/net/context" +) + +var addr string + +func init() { + flag.StringVar(&addr, "addr", ":5640", "addr of 9p service") +} + +func main() { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + + ctx := context.Background() + log.SetFlags(0) + flag.Parse() + + proto := "tcp" + if strings.HasPrefix(addr, "unix:") { + proto = "unix" + addr = addr[5:] + } + + log.Println("dialing", addr) + conn, err := net.Dial(proto, addr) + if err != nil { + log.Fatal(err) + } + + csession, err := p9pnew.NewSession(ctx, conn) + if err != nil { + log.Fatalln(err) + } + + commander := &fsCommander{ + ctx: context.Background(), + session: csession, + pwd: "/", + stdout: os.Stdout, + stderr: os.Stderr, + } + + completer := readline.NewPrefixCompleter( + readline.PcItem("ls"), + // readline.PcItem("find"), + readline.PcItem("stat"), + readline.PcItem("cat"), + readline.PcItem("cd"), + readline.PcItem("pwd"), + ) + + rl, err := readline.NewEx(&readline.Config{ + HistoryFile: ".history", + AutoComplete: completer, + }) + if err != nil { + log.Fatalln(err) + } + commander.readline = rl + + msize, version := commander.session.Version() + if err != nil { + log.Fatalln(err) + } + log.Println("9p version", version, msize) + + // attach root + commander.nextfid = 1 + if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9pnew.NOFID, "anyone", "/"); err != nil { + log.Fatalln(err) + } + commander.rootfid = commander.nextfid + commander.nextfid++ + + // clone the pwd fid so we can clunk it + if _, err := commander.session.Walk(commander.ctx, commander.rootfid, commander.nextfid); err != nil { + log.Fatalln(err) + } + commander.pwdfid = commander.nextfid + commander.nextfid++ + + for { + commander.readline.SetPrompt(fmt.Sprintf("%s 🐳 > ", commander.pwd)) + + line, err := rl.Readline() + if err != nil { + log.Fatalln("error: ", err) + } + + if line == "" { + continue + } + + args := strings.Fields(line) + + name := args[0] + var cmd func(ctx context.Context, args ...string) error + + switch name { + case "ls": + cmd = commander.cmdls + case "cd": + cmd = commander.cmdcd + case "pwd": + cmd = commander.cmdpwd + case "cat": + cmd = commander.cmdcat + case "stat": + cmd = commander.cmdstat + default: + cmd = func(ctx context.Context, args ...string) error { + return fmt.Errorf("command not implemented") + } + } + + ctx, _ = context.WithTimeout(commander.ctx, 5*time.Second) + if err := cmd(ctx, args[1:]...); err != nil { + if err == p9pnew.ErrClosed { + log.Println("connection closed, shutting down") + return + } + + log.Printf("👹 %s: %v", name, err) + } + } +} + +type fsCommander struct { + ctx context.Context + session p9pnew.Session + pwd string + pwdfid p9pnew.Fid + rootfid p9pnew.Fid + + nextfid p9pnew.Fid + + readline *readline.Instance + stdout io.Writer + stderr io.Writer +} + +func (c *fsCommander) cmdls(ctx context.Context, args ...string) error { + ps := []string{c.pwd} + if len(args) > 0 { + ps = args + } + + wr := tabwriter.NewWriter(c.stdout, 0, 8, 8, ' ', 0) + + for _, p := range ps { + // create a header if have more than one path. + if len(ps) > 1 { + fmt.Fprintln(wr, p+":") + } + + if !path.IsAbs(p) { + p = path.Join(c.pwd, p) + } + + targetfid := c.nextfid + c.nextfid++ + components := strings.Split(strings.Trim(p, "/"), "/") + if _, err := c.session.Walk(ctx, c.rootfid, targetfid, components...); err != nil { + return err + } + defer c.session.Clunk(ctx, targetfid) + + _, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD) + if err != nil { + return err + } + + if iounit < 1 { + msize, _ := c.session.Version() + iounit = uint32(msize - 24) // size of message max minus fcall io header (Rread) + } + + p := make([]byte, iounit) + + n, err := c.session.Read(ctx, targetfid, p, 0) + if err != nil { + return err + } + + rd := bytes.NewReader(p[:n]) + codec := p9pnew.NewCodec() // TODO(stevvooe): Need way to resolve codec based on session. + for { + var d p9pnew.Dir + if err := p9pnew.DecodeDir(codec, rd, &d); err != nil { + if err == io.EOF { + break + } + + return err + } + + fmt.Fprintf(wr, "%v\t%v\t%v\t%s\n", os.FileMode(d.Mode), d.Length, d.ModTime, d.Name) + } + + if len(ps) > 1 { + fmt.Fprintln(wr, "") + } + } + + // all output is dumped only after success. + return wr.Flush() +} + +func (c *fsCommander) cmdcd(ctx context.Context, args ...string) error { + var p string + switch len(args) { + case 0: + p = "/" + case 1: + p = args[0] + default: + return fmt.Errorf("cd: invalid args: %v", args) + } + + if !path.IsAbs(p) { + p = path.Join(c.pwd, p) + } + + targetfid := c.nextfid + c.nextfid++ + components := strings.Split(strings.TrimSpace(strings.Trim(p, "/")), "/") + if _, err := c.session.Walk(c.ctx, c.rootfid, targetfid, components...); err != nil { + return err + } + defer c.session.Clunk(c.ctx, c.pwdfid) + + log.Println("cd", p, targetfid) + c.pwd = p + c.pwdfid = targetfid + + return nil +} + +func (c *fsCommander) cmdstat(ctx context.Context, args ...string) error { + ps := []string{c.pwd} + if len(args) > 0 { + ps = args + } + + wr := tabwriter.NewWriter(c.stdout, 0, 8, 8, ' ', 0) + + for _, p := range ps { + targetfid := c.nextfid + c.nextfid++ + components := strings.Split(strings.Trim(p, "/"), "/") + if _, err := c.session.Walk(ctx, c.rootfid, targetfid, components...); err != nil { + return err + } + defer c.session.Clunk(ctx, targetfid) + + d, err := c.session.Stat(ctx, targetfid) + if err != nil { + return err + } + + fmt.Fprintf(wr, "%v\t%v\t%v\t%s\n", os.FileMode(d.Mode), d.Length, d.ModTime, d.Name) + } + + return wr.Flush() +} + +func (c *fsCommander) cmdpwd(ctx context.Context, args ...string) error { + if len(args) != 0 { + return fmt.Errorf("pwd takes no arguments") + } + + fmt.Println(c.pwd) + return nil +} + +func (c *fsCommander) cmdcat(ctx context.Context, args ...string) error { + var p string + switch len(args) { + case 0: + p = "/" + case 1: + p = args[0] + default: + return fmt.Errorf("cd: invalid args: %v", args) + } + + if !path.IsAbs(p) { + p = path.Join(c.pwd, p) + } + + targetfid := c.nextfid + c.nextfid++ + components := strings.Split(strings.TrimSpace(strings.Trim(p, "/")), "/") + if _, err := c.session.Walk(ctx, c.rootfid, targetfid, components...); err != nil { + return err + } + defer c.session.Clunk(ctx, c.pwdfid) + + _, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD) + if err != nil { + return err + } + + if iounit < 1 { + msize, _ := c.session.Version() + iounit = uint32(msize - 24) // size of message max minus fcall io header (Rread) + } + + b := make([]byte, iounit) + + n, err := c.session.Read(ctx, targetfid, b, 0) + if err != nil { + return err + } + + if _, err := os.Stdout.Write(b[:n]); err != nil { + return err + } + + os.Stdout.Write([]byte("\n")) + + return nil +} blob - /dev/null blob + bdba1522adb5247a7d23fb7651bce8770735f4c2 (mode 644) --- /dev/null +++ cmd/9ps/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "flag" + "log" + "net" + "strings" + + "github.com/docker/pinata/v1/pkg/p9p" + "golang.org/x/net/context" +) + +var ( + root string + addr string +) + +func init() { + flag.StringVar(&root, "root", "~/", "root of filesystem to serve over 9p") + flag.StringVar(&addr, "addr", ":5640", "bind addr for 9p server, prefix with unix: for unix socket") +} + +func main() { + ctx := context.Background() + log.SetFlags(0) + flag.Parse() + + proto := "tcp" + if strings.HasPrefix(addr, "unix:") { + proto = "unix" + addr = addr[5:] + } + + listener, err := net.Listen(proto, addr) + if err != nil { + log.Fatalln("error listening:", err) + } + defer listener.Close() + + for { + c, err := listener.Accept() + if err != nil { + log.Fatalln("error accepting:", err) + continue + } + + go func(conn net.Conn) { + defer conn.Close() + + ctx := context.WithValue(ctx, "conn", conn) + log.Println("connected", conn.RemoteAddr()) + session, err := newLocalSession(ctx, root) + if err != nil { + log.Println("error creating session") + return + } + + p9pnew.Serve(ctx, conn, p9pnew.Dispatch(session)) + }(c) + } +} + +// newLocalSession returns a session to serve the local filesystem, restricted +// to the provided root. +func newLocalSession(ctx context.Context, root string) (p9pnew.Session, error) { + // silly, just connect to ufs for now! replace this with real code later! + log.Println("dialing", ":5640", "for", ctx.Value("conn")) + conn, err := net.Dial("tcp", ":5640") + if err != nil { + return nil, err + } + + session, err := p9pnew.NewSession(ctx, conn) + if err != nil { + return nil, err + } + + return session, nil +} blob - /dev/null blob + eeb8602d9fb3f99ab4c6e30db11507cead9be29b (mode 644) --- /dev/null +++ context.go @@ -0,0 +1,26 @@ +package p9p + +import ( + "golang.org/x/net/context" +) + +type contextKey string + +const ( + versionKey contextKey = "9p.version" +) + +func withVersion(ctx context.Context, version string) context.Context { + return context.WithValue(ctx, versionKey, version) +} + +// GetVersion returns the protocol version from the context. If the version is +// not known, an empty string is returned. This is typically set on the +// context passed into function calls in a server implementation. +func GetVersion(ctx context.Context) string { + v, ok := ctx.Value(versionKey).(string) + if !ok { + return "" + } + return v +} blob - /dev/null blob + 01396c7b3eb94755a94f2b216c9b8d763f557ee3 (mode 644) --- /dev/null +++ dispatcher.go @@ -0,0 +1,132 @@ +package p9p + +import "golang.org/x/net/context" + +// Handler defines an interface for 9p message handlers. A handler +// implementation could be used to intercept calls of all types before sending +// them to the next handler. +type Handler interface { + Handle(ctx context.Context, msg Message) (Message, error) + + // TODO(stevvooe): Right now, this interface is functianally identical to + // roundtripper. If we find that this is sufficient on the server-side, we + // may unify the types. For now, we leave them separated to differentiate + // between them. +} + +// HandlerFunc is a convenience type for defining inline handlers. +type HandlerFunc func(ctx context.Context, msg Message) (Message, error) + +func (fn HandlerFunc) Handle(ctx context.Context, msg Message) (Message, error) { + return fn(ctx, msg) +} + +// Dispatch returns a handler that dispatches messages to the target session. +// No concurrency is managed by the returned handler. It simply turns messages +// into function calls on the session. +func Dispatch(session Session) Handler { + return HandlerFunc(func(ctx context.Context, msg Message) (Message, error) { + switch msg := msg.(type) { + case MessageTauth: + qid, err := session.Auth(ctx, msg.Afid, msg.Uname, msg.Aname) + if err != nil { + return nil, err + } + + return MessageRauth{Qid: qid}, nil + case MessageTattach: + qid, err := session.Attach(ctx, msg.Fid, msg.Afid, msg.Uname, msg.Aname) + if err != nil { + return nil, err + } + + return MessageRattach{ + Qid: qid, + }, nil + case MessageTwalk: + // TODO(stevvooe): This is one of the places where we need to manage + // fid allocation lifecycle. We need to reserve the fid, then, if this + // call succeeds, we should alloc the fid for future uses. Also need + // to interact correctly with concurrent clunk and the flush of this + // walk message. + qids, err := session.Walk(ctx, msg.Fid, msg.Newfid, msg.Wnames...) + if err != nil { + return nil, err + } + + return MessageRwalk{ + Qids: qids, + }, nil + case MessageTopen: + qid, iounit, err := session.Open(ctx, msg.Fid, msg.Mode) + if err != nil { + return nil, err + } + + return MessageRopen{ + Qid: qid, + IOUnit: iounit, + }, nil + case MessageTcreate: + qid, iounit, err := session.Create(ctx, msg.Fid, msg.Name, msg.Perm, msg.Mode) + if err != nil { + return nil, err + } + + return MessageRcreate{ + Qid: qid, + IOUnit: iounit, + }, nil + case MessageTread: + p := make([]byte, int(msg.Count)) + n, err := session.Read(ctx, msg.Fid, p, int64(msg.Offset)) + if err != nil { + return nil, err + } + + return MessageRread{ + Data: p[:n], + }, nil + case MessageTwrite: + n, err := session.Write(ctx, msg.Fid, msg.Data, int64(msg.Offset)) + if err != nil { + return nil, err + } + + return MessageRwrite{ + Count: uint32(n), + }, nil + case MessageTclunk: + // TODO(stevvooe): Manage the clunking of file descriptors based on + // walk and attach call progression. + if err := session.Clunk(ctx, msg.Fid); err != nil { + return nil, err + } + + return MessageRclunk{}, nil + case MessageTremove: + if err := session.Remove(ctx, msg.Fid); err != nil { + return nil, err + } + + return MessageRremove{}, nil + case MessageTstat: + dir, err := session.Stat(ctx, msg.Fid) + if err != nil { + return nil, err + } + + return MessageRstat{ + Stat: dir, + }, nil + case MessageTwstat: + if err := session.WStat(ctx, msg.Fid, msg.Stat); err != nil { + return nil, err + } + + return MessageRwstat{}, nil + default: + return nil, ErrUnknownMsg + } + }) +} blob - /dev/null blob + 58438cf7c8fea79c5b83d935f47270e2c835ce0c (mode 644) --- /dev/null +++ doc.go @@ -0,0 +1,78 @@ +/* +Package p9p implements a compliant 9P2000 client and server library for use +in modern, production Go services. This package differentiates itself in that +is has departed from the plan 9 implementation primitives and better follows +idiomatic Go style. + +The package revolves around the session type, which is an enumeration of raw +9p message calls. A few calls, such as flush and version, have been elided, +defering their usage to the server implementation. Sessions can be trivially +proxied through clients and servers. + +Getting Started + +The best place to get started is with Serve. Serve can be provided a +connection and a handler. A typical implementation will call Serve as part of +a listen/accept loop. As each network connection is created, Serve can be +called with a handler for the specific connection. The handler can be +implemented with a Session via the Dispatch function or can generate sessions +for dispatch in response to client messages. (See cmd/9ps for an example) + +On the client side, NewSession provides a 9p session from a connection. After +a version negotiation, methods can be called on the session, in parallel, and +calls will be sent over the connection. Call timeouts can be controlled via +the context provided to each method call. + +Framework + +This package has the beginning of a nice client-server framework for working +with 9p. Some of the abstractions aren't entirely fleshed out, but most of +this can center around the Handler. + +Missing from this are a number of tools for implementing 9p servers. The most +glaring are directory read and walk helpers. Other, more complex additions +might be a system to manage in memory filesystem trees that expose multi-user +sessions. + +Differences + +The largest difference between this package and other 9p packages is +simplification of the types needed to implement a server. To avoid confusing +bugs and odd behavior, the components are separated by each level of the +protocol. One example is that requests and responses are separated and they no +longer hold mutable state. This means that framing, transport management, +encoding, and dispatching are componentized. Little work will be required to +swap out encodings, transports or connection implementations. + +Context Integration + +This package has been wired from top to bottom to support context-based +resource management. Everything from startup to shutdown can have timeouts +using contexts. Not all close methods are fully in place, but we are very +close to having controlled, predictable cleanup for both servers and clients. +Timeouts can be very granular or very course, depending on the context of the +timeout. For example, it is very easy to set a short timeout for a stat call +but a long timeout for reading data. + +Multiversion Support + +Currently, there is not multiversion support. The hooks and functionality are +in place to add multi-version support. Generally, the correct space to do this +is in the codec. Types, such as Dir, simply need to be extended to support the +possibility of extra fields. + +The real question to ask here is what is the role of the version number in the +9p protocol. It really comes down to the level of support required. Do we just +need it at the protocol level, or do handlers and sessions need to be have +differently based on negotiated versions? + +Caveats + +This package has a number of TODOs to make it easier to use. Most of the +existing code provides a solid base to work from. Don't be discouraged by the +sawdust. + +In addition, the testing is embarassingly lacking. With time, we can get full +testing going and ensure we have confidence in the implementation. +*/ +package p9p blob - /dev/null blob + 41ddbf0319ea6be32598c3273d6880fe8cf3b4f1 (mode 644) --- /dev/null +++ encoding.go @@ -0,0 +1,528 @@ +package p9p + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "log" + "reflect" + "strings" + "time" +) + +// Codec defines the interface for encoding and decoding of 9p types. +// Unsupported types will throw an error. +type Codec interface { + // Unmarshal from data into the value pointed to by v. + Unmarshal(data []byte, v interface{}) error + + // Marshal the value v into a byte slice. + Marshal(v interface{}) ([]byte, error) + + // Size returns the encoded size for the target of v. + Size(v interface{}) int +} + +func NewCodec() Codec { + return codec9p{} +} + +type codec9p struct{} + +func (c codec9p) Unmarshal(data []byte, v interface{}) error { + dec := &decoder{bytes.NewReader(data)} + return dec.decode(v) +} + +func (c codec9p) Marshal(v interface{}) ([]byte, error) { + var b bytes.Buffer + enc := &encoder{&b} + + if err := enc.encode(v); err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +func (c codec9p) Size(v interface{}) int { + return int(size9p(v)) +} + +// DecodeDir decodes a directory entry from rd using the provided codec. +func DecodeDir(codec Codec, rd io.Reader, d *Dir) error { + var ll uint16 + + // pull the size off the wire + if err := binary.Read(rd, binary.LittleEndian, &ll); err != nil { + return err + } + + p := make([]byte, ll+2) + binary.LittleEndian.PutUint16(p, ll) // must have size at start + + // read out the rest of the record + if _, err := io.ReadFull(rd, p[2:]); err != nil { + return err + } + + return codec.Unmarshal(p, d) +} + +// EncodeDir writes the directory to wr. +func EncodeDir(codec Codec, wr io.Writer, d *Dir) error { + p, err := codec.Marshal(d) + if err != nil { + return err + } + + _, err = wr.Write(p) + return err +} + +type encoder struct { + wr io.Writer +} + +func (e *encoder) encode(vs ...interface{}) error { + for _, v := range vs { + switch v := v.(type) { + case uint8, uint16, uint32, uint64, FcallType, Tag, QType, Fid, Flag, + *uint8, *uint16, *uint32, *uint64, *FcallType, *Tag, *QType, *Fid, *Flag: + if err := binary.Write(e.wr, binary.LittleEndian, v); err != nil { + return err + } + case []byte: + if err := e.encode(uint32(len(v))); err != nil { + return err + } + + if err := binary.Write(e.wr, binary.LittleEndian, v); err != nil { + return err + } + + case *[]byte: + if err := e.encode(*v); err != nil { + return err + } + case string: + if err := binary.Write(e.wr, binary.LittleEndian, uint16(len(v))); err != nil { + return err + } + + _, err := io.WriteString(e.wr, v) + if err != nil { + return err + } + case *string: + if err := e.encode(*v); err != nil { + return err + } + + case []string: + if err := e.encode(uint16(len(v))); err != nil { + return err + } + + for _, m := range v { + if err := e.encode(m); err != nil { + return err + } + } + case *[]string: + if err := e.encode(*v); err != nil { + return err + } + case time.Time: + if err := e.encode(uint32(v.Unix())); err != nil { + return err + } + case *time.Time: + if err := e.encode(*v); err != nil { + return err + } + case Qid: + if err := e.encode(v.Type, v.Version, v.Path); err != nil { + return err + } + case *Qid: + if err := e.encode(*v); err != nil { + return err + } + case []Qid: + if err := e.encode(uint16(len(v))); err != nil { + return err + } + + elements := make([]interface{}, len(v)) + for i := range v { + elements[i] = &v[i] + } + + if err := e.encode(elements...); err != nil { + return err + } + case *[]Qid: + if err := e.encode(*v); err != nil { + return err + } + case Dir: + elements, err := fields9p(v) + if err != nil { + return err + } + + if err := e.encode(uint16(size9p(elements...))); err != nil { + return err + } + + if err := e.encode(elements...); err != nil { + return err + } + case *Dir: + if err := e.encode(*v); err != nil { + return err + } + case Fcall: + if err := e.encode(v.Type, v.Tag, v.Message); err != nil { + return err + } + case *Fcall: + if err := e.encode(*v); err != nil { + return err + } + case Message: + elements, err := fields9p(v) + if err != nil { + return err + } + + switch v.(type) { + case MessageRstat, *MessageRstat: + // NOTE(stevvooe): Prepend size preceeding Dir. See bugs in + // http://man.cat-v.org/plan_9/5/stat to make sense of this. + // The field has been included here but we need to make sure + // to double emit it for Rstat. + if err := e.encode(uint16(size9p(elements...))); err != nil { + return err + } + } + + if err := e.encode(elements...); err != nil { + return err + } + } + } + + return nil +} + +type decoder struct { + rd io.Reader +} + +// read9p extracts values from rd and unmarshals them to the targets of vs. +func (d *decoder) decode(vs ...interface{}) error { + for _, v := range vs { + switch v := v.(type) { + case *uint8, *uint16, *uint32, *uint64, *FcallType, *Tag, *QType, *Fid, *Flag: + if err := binary.Read(d.rd, binary.LittleEndian, v); err != nil { + return err + } + case *[]byte: + var ll uint32 + + if err := d.decode(&ll); err != nil { + return err + } + + *v = make([]byte, int(ll)) + + if err := binary.Read(d.rd, binary.LittleEndian, v); err != nil { + return err + } + case *string: + var ll uint16 + + // implement string[s] encoding + if err := d.decode(&ll); err != nil { + return err + } + + b := make([]byte, ll) + + n, err := io.ReadFull(d.rd, b) + if err != nil { + return err + } + + if n != int(ll) { + return fmt.Errorf("unexpected string length") + } + + *v = string(b) + case *[]string: + var ll uint16 + + if err := d.decode(&ll); err != nil { + return err + } + + elements := make([]interface{}, int(ll)) + *v = make([]string, int(ll)) + for i := range elements { + elements[i] = &(*v)[i] + } + + if err := d.decode(elements...); err != nil { + return err + } + case *time.Time: + var epoch uint32 + if err := d.decode(&epoch); err != nil { + return err + } + + *v = time.Unix(int64(epoch), 0).UTC() + case *Qid: + if err := d.decode(&v.Type, &v.Version, &v.Path); err != nil { + return err + } + case *[]Qid: + var ll uint16 + + if err := d.decode(&ll); err != nil { + return err + } + + elements := make([]interface{}, int(ll)) + *v = make([]Qid, int(ll)) + for i := range elements { + elements[i] = &(*v)[i] + } + + if err := d.decode(elements...); err != nil { + return err + } + case *Dir: + var ll uint16 + + if err := d.decode(&ll); err != nil { + return err + } + + b := make([]byte, ll) + // must consume entire dir entry. + n, err := io.ReadFull(d.rd, b) + if err != nil { + log.Println("dir readfull failed:", err, ll, n) + return err + } + + elements, err := fields9p(v) + if err != nil { + return err + } + + dec := &decoder{bytes.NewReader(b)} + + if err := dec.decode(elements...); err != nil { + return err + } + case *Fcall: + if err := d.decode(&v.Type, &v.Tag); err != nil { + return err + } + + message, err := newMessage(v.Type) + if err != nil { + return err + } + + // NOTE(stevvooe): We do a little pointer dance to allocate the + // new type, write to it, then assign it back to the interface as + // a concrete type, avoiding a pointer (the interface) to a + // pointer. + rv := reflect.New(reflect.TypeOf(message)) + if err := d.decode(rv.Interface()); err != nil { + return err + } + + v.Message = rv.Elem().Interface().(Message) + case Message: + elements, err := fields9p(v) + if err != nil { + return err + } + + switch v.(type) { + case *MessageRstat, MessageRstat: + // NOTE(stevvooe): Consume extra size preceeding Dir. See bugs + // in http://man.cat-v.org/plan_9/5/stat to make sense of + // this. The field has been included here but we need to make + // sure to double emit it for Rstat. decode extra size header + // for stat structure. + var ll uint16 + if err := d.decode(&ll); err != nil { + return err + } + } + + if err := d.decode(elements...); err != nil { + return err + } + } + } + + return nil +} + +// size9p calculates the projected size of the values in vs when encoded into +// 9p binary protocol. If an element or elements are not valid for 9p encoded, +// the value 0 will be used for the size. The error will be detected when +// encoding. +func size9p(vs ...interface{}) uint32 { + var s uint32 + for _, v := range vs { + if v == nil { + continue + } + + switch v := v.(type) { + case uint8, uint16, uint32, uint64, FcallType, Tag, QType, Fid, Flag, + *uint8, *uint16, *uint32, *uint64, *FcallType, *Tag, *QType, *Fid, *Flag: + s += uint32(binary.Size(v)) + case []byte: + s += uint32(binary.Size(uint32(0)) + len(v)) + case *[]byte: + s += size9p(uint32(0), *v) + case string: + s += uint32(binary.Size(uint16(0)) + len(v)) + case *string: + s += size9p(*v) + case []string: + s += size9p(uint16(0)) + + for _, sv := range v { + s += size9p(sv) + } + case *[]string: + s += size9p(*v) + case time.Time, *time.Time: + // BUG(stevvooe): Y2038 is coming. + s += size9p(uint32(0)) + case Qid: + s += size9p(v.Type, v.Version, v.Path) + case *Qid: + s += size9p(*v) + case []Qid: + s += size9p(uint16(0)) + elements := make([]interface{}, len(v)) + for i := range elements { + elements[i] = &v[i] + } + s += size9p(elements...) + case *[]Qid: + s += size9p(*v) + + case Dir: + // walk the fields of the message to get the total size. we just + // use the field order from the message struct. We may add tag + // ignoring if needed. + elements, err := fields9p(v) + if err != nil { + // BUG(stevvooe): The options here are to return 0, panic or + // make this return an error. Ideally, we make it safe to + // return 0 and have the rest of the package do the right + // thing. For now, we do this, but may want to panic until + // things are stable. + panic(err) + } + + s += size9p(elements...) + size9p(uint16(0)) + case *Dir: + s += size9p(*v) + case Fcall: + s += size9p(v.Type, v.Tag, v.Message) + case *Fcall: + s += size9p(*v) + case Message: + // special case twstat and rstat for size fields. See bugs in + // http://man.cat-v.org/plan_9/5/stat to make sense of this. + switch v.(type) { + case *MessageRstat, MessageRstat: + s += size9p(uint16(0)) // for extra size field before dir + } + + // walk the fields of the message to get the total size. we just + // use the field order from the message struct. We may add tag + // ignoring if needed. + elements, err := fields9p(v) + if err != nil { + // BUG(stevvooe): The options here are to return 0, panic or + // make this return an error. Ideally, we make it safe to + // return 0 and have the rest of the package do the right + // thing. For now, we do this, but may want to panic until + // things are stable. + panic(err) + } + + s += size9p(elements...) + } + } + + return s +} + +// fields9p lists the settable fields from a struct type for reading and +// writing. We are using a lot of reflection here for fairly static +// serialization but we can replace this in the future with generated code if +// performance is an issue. +func fields9p(v interface{}) ([]interface{}, error) { + rv := reflect.Indirect(reflect.ValueOf(v)) + + if rv.Kind() != reflect.Struct { + return nil, fmt.Errorf("cannot extract fields from non-struct: %v", rv) + } + + var elements []interface{} + for i := 0; i < rv.NumField(); i++ { + f := rv.Field(i) + + if !f.CanInterface() { + // unexported field, skip it. + continue + } + + if f.CanAddr() { + f = f.Addr() + } + + elements = append(elements, f.Interface()) + } + + return elements, nil +} + +func string9p(v interface{}) string { + if v == nil { + return "nil" + } + + rv := reflect.Indirect(reflect.ValueOf(v)) + + if rv.Kind() != reflect.Struct { + panic("not a struct") + } + + var s string + + for i := 0; i < rv.NumField(); i++ { + f := rv.Field(i) + + s += fmt.Sprintf(" %v=%v", strings.ToLower(rv.Type().Field(i).Name), f.Interface()) + } + + return s +} blob - /dev/null blob + e55b866f5ed22f8ee8692f15ce3bf466609194c6 (mode 644) --- /dev/null +++ encoding_test.go @@ -0,0 +1,240 @@ +package p9p + +import ( + "bytes" + "errors" + "reflect" + "testing" + "time" +) + +func TestEncodeDecode(t *testing.T) { + codec := NewCodec() + for _, testcase := range []struct { + description string + target interface{} + marshaled []byte + }{ + { + description: "uint8", + target: uint8('U'), + marshaled: []byte{0x55}, + }, + { + description: "uint16", + target: uint16(0x5544), + marshaled: []byte{0x44, 0x55}, + }, + { + description: "string", + target: "asdf", + marshaled: []byte{0x4, 0x0, 0x61, 0x73, 0x64, 0x66}, + }, + { + description: "[]string", + target: []string{"asdf", "qwer", "zxcv"}, + marshaled: []byte{ + 0x3, 0x0, // len(target) + 0x4, 0x0, 0x61, 0x73, 0x64, 0x66, + 0x4, 0x0, 0x71, 0x77, 0x65, 0x72, + 0x4, 0x0, 0x7a, 0x78, 0x63, 0x76}, + }, + { + description: "Qid", + target: Qid{ + Type: QTDIR, + Version: 0x10203040, + Path: 0x1020304050607080}, + marshaled: []byte{ + byte(QTDIR), // qtype + 0x40, 0x30, 0x20, 0x10, // version + 0x80, 0x70, 0x60, 0x50, 0x40, 0x30, 0x20, 0x10, // path + }, + }, + // Dir + { + description: "Tversion fcall", + target: &Fcall{ + Type: Tversion, + Tag: 2255, + Message: MessageTversion{ + MSize: uint32(1024), + Version: "9PTEST", + }, + }, + marshaled: []byte{ + 0x64, 0xcf, 0x8, 0x0, 0x4, 0x0, 0x0, + 0x6, 0x0, 0x39, 0x50, 0x54, 0x45, 0x53, 0x54}, + }, + { + description: "Rversion fcall", + target: &Fcall{ + Type: Rversion, + Tag: 2255, + Message: MessageRversion{ + MSize: uint32(1024), + Version: "9PTEST", + }, + }, + marshaled: []byte{ + 0x65, 0xcf, 0x8, 0x0, 0x4, 0x0, 0x0, + 0x6, 0x0, 0x39, 0x50, 0x54, 0x45, 0x53, 0x54}, + }, + { + description: "Twalk fcall", + target: &Fcall{ + Type: Twalk, + Tag: 5666, + Message: MessageTwalk{ + Fid: 1010, + Newfid: 1011, + Wnames: []string{"a", "b", "c"}, + }, + }, + marshaled: []byte{ + 0x6e, 0x22, 0x16, 0xf2, 0x3, 0x0, 0x0, 0xf3, 0x3, 0x0, 0x0, + 0x3, 0x0, // len(wnames) + 0x1, 0x0, 0x61, // "a" + 0x1, 0x0, 0x62, // "b" + 0x1, 0x0, 0x63}, // "c" + }, + { + description: "Rwalk call", + target: &Fcall{ + Type: Rwalk, + Tag: 5556, + Message: MessageRwalk{ + Qids: []Qid{ + Qid{ + Type: QTDIR, + Path: 1111, + Version: 11112, + }, + Qid{Type: QTFILE, + Version: 1112, + Path: 11114}, + }, + }, + }, + marshaled: []byte{ + 0x6f, 0xb4, 0x15, + 0x2, 0x0, + 0x80, 0x68, 0x2b, 0x0, 0x0, 0x57, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x58, 0x4, 0x0, 0x0, 0x6a, 0x2b, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + }, + { + description: "Rread fcall", + target: &Fcall{ + Type: Rread, + Tag: 5556, + Message: MessageRread{ + Data: []byte("a lot of byte data"), + }, + }, + marshaled: []byte{ + 0x75, 0xb4, 0x15, + 0x12, 0x0, 0x0, 0x0, + 0x61, 0x20, 0x6c, 0x6f, 0x74, 0x20, 0x6f, 0x66, 0x20, 0x62, 0x79, 0x74, 0x65, 0x20, 0x64, 0x61, 0x74, 0x61}, + }, + { + description: "", + target: &Fcall{ + Type: Rstat, + Tag: 5556, + Message: MessageRstat{ + Stat: Dir{ + Type: ^uint16(0), + Dev: ^uint32(0), + Qid: Qid{ + Type: QTDIR, + Version: ^uint32(0), + Path: ^uint64(0), + }, + Mode: DMDIR | DMREAD, + AccessTime: time.Date(2006, 01, 02, 03, 04, 05, 0, time.UTC), + ModTime: time.Date(2006, 01, 02, 03, 04, 05, 0, time.UTC), + Length: ^uint64(0), + Name: "somedir", + UID: "uid", + GID: "gid", + MUID: "muid", + }, + }, + }, + marshaled: []byte{ + 0x7d, 0xb4, 0x15, + 0x42, 0x0, // TODO(stevvooe): Include Dir size. Not straightforward. + 0x40, 0x0, // TODO(stevvooe): Include Dir size. Not straightforward. + 0xff, 0xff, // type + 0xff, 0xff, 0xff, 0xff, // dev + 0x80, 0xff, 0xff, 0xff, 0xff, // qid.type, qid.version + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // qid.path + 0x4, 0x0, 0x0, 0x80, // mode + 0x25, 0x98, 0xb8, 0x43, // atime + 0x25, 0x98, 0xb8, 0x43, // mtime + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // length + 0x7, 0x0, 0x73, 0x6f, 0x6d, 0x65, 0x64, 0x69, 0x72, + 0x3, 0x0, 0x75, 0x69, 0x64, // uid + 0x3, 0x0, 0x67, 0x69, 0x64, // gid + 0x4, 0x0, 0x6d, 0x75, 0x69, 0x64}, // muid + }, + { + description: "Rerror fcall", + target: newErrorFcall(5556, errors.New("A serious error")), + marshaled: []byte{ + 0x6b, // Rerror + 0xb4, 0x15, // Tag + 0xf, 0x0, // String size. + 0x41, 0x20, 0x73, 0x65, 0x72, 0x69, 0x6f, 0x75, 0x73, 0x20, 0x65, 0x72, 0x72, 0x6f, 0x72}, + }, + } { + t.Logf("target under test: %#v %T", testcase.target, testcase.target) + fatalf := func(format string, args ...interface{}) { + t.Fatalf(testcase.description+": "+format, args...) + } + + p, err := codec.Marshal(testcase.target) + if err != nil { + fatalf("error writing fcall: %v", err) + } + + if !bytes.Equal(p, testcase.marshaled) { + fatalf("unexpected bytes for fcall: \n%#v != \n%#v", p, testcase.marshaled) + } + + if size9p(testcase.target) == 0 { + fatalf("size of target should never be zero") + } + + // check that size9p is working correctly + if int(size9p(testcase.target)) != len(testcase.marshaled) { + fatalf("size not correct: %v != %v", int(size9p(testcase.target)), len(testcase.marshaled)) + } + + var v interface{} + targetType := reflect.TypeOf(testcase.target) + + if targetType.Kind() == reflect.Ptr { + v = reflect.New(targetType.Elem()).Interface() + } else { + v = reflect.New(targetType).Interface() + } + + if err := codec.Unmarshal(p, v); err != nil { + fatalf("error reading: %v", err) + } + + if targetType.Kind() != reflect.Ptr { + v = reflect.Indirect(reflect.ValueOf(v)).Interface() + } + + if !reflect.DeepEqual(v, testcase.target) { + fatalf("not equal: %v != %v (\n%#v\n%#v\n)", + v, testcase.target, + v, testcase.target) + } + + t.Logf("%#v", v) + + } +} blob - /dev/null blob + 32f3c9fea803b82dc760bfb18310ad2329778fa2 (mode 644) --- /dev/null +++ errors.go @@ -0,0 +1,55 @@ +package p9p + +import ( + "errors" + "fmt" +) + +// MessageRerror provides both a Go error type and message type. +type MessageRerror struct { + Ename string +} + +var ( + // 9p wire errors returned by Session interface methods + ErrBadattach = new9pError("unknown specifier in attach") + ErrBadoffset = new9pError("bad offset") + ErrBadcount = new9pError("bad count") + ErrBotch = new9pError("9P protocol botch") + ErrCreatenondir = new9pError("create in non-directory") + ErrDupfid = new9pError("duplicate fid") + ErrDuptag = new9pError("duplicate tag") + ErrIsdir = new9pError("is a directory") + ErrNocreate = new9pError("create prohibited") + ErrNomem = new9pError("out of memory") + ErrNoremove = new9pError("remove prohibited") + ErrNostat = new9pError("stat prohibited") + ErrNotfound = new9pError("file not found") + ErrNowrite = new9pError("write prohibited") + ErrNowstat = new9pError("wstat prohibited") + ErrPerm = new9pError("permission denied") + ErrUnknownfid = new9pError("unknown fid") + ErrBaddir = new9pError("bad directory in wstat") + ErrWalknodir = new9pError("walk in non-directory") + + // extra errors not part of the normal protocol + ErrTimeout = new9pError("fcall timeout") // returned when timing out on the fcall + ErrUnknownTag = new9pError("unknown tag") + ErrUnknownMsg = new9pError("unknown message") // returned when encountering unknown message type + ErrUnexpectedMsg = new9pError("unexpected message") // returned when an unexpected message is encountered + ErrWalkLimit = new9pError("too many wnames in walk") + ErrClosed = errors.New("closed") +) + +// new9pError returns a new 9p error ready for the wire. +func new9pError(s string) error { + return MessageRerror{Ename: s} +} + +func (MessageRerror) Type() FcallType { + return Rerror +} + +func (e MessageRerror) Error() string { + return fmt.Sprintf("9p: %v", e.Ename) +} blob - /dev/null blob + 52a62823d787f4e4969c511b5a6c31cba9079b3b (mode 644) --- /dev/null +++ fcall.go @@ -0,0 +1,145 @@ +package p9p + +import "fmt" + +type FcallType uint8 + +const ( + Tversion FcallType = iota + 100 + Rversion + Tauth + Rauth + Tattach + Rattach + Terror + Rerror + Tflush + Rflush + Twalk + Rwalk + Topen + Ropen + Tcreate + Rcreate + Tread + Rread + Twrite + Rwrite + Tclunk + Rclunk + Tremove + Rremove + Tstat + Rstat + Twstat + Rwstat + Tmax +) + +func (fct FcallType) String() string { + switch fct { + case Tversion: + return "Tversion" + case Rversion: + return "Rversion" + case Tauth: + return "Tauth" + case Rauth: + return "Rauth" + case Tattach: + return "Tattach" + case Rattach: + return "Rattach" + case Terror: + // invalid. + return "Terror" + case Rerror: + return "Rerror" + case Tflush: + return "Tflush" + case Rflush: + return "Rflush" + case Twalk: + return "Twalk" + case Rwalk: + return "Rwalk" + case Topen: + return "Topen" + case Ropen: + return "Ropen" + case Tcreate: + return "Tcreate" + case Rcreate: + return "Rcreate" + case Tread: + return "Tread" + case Rread: + return "Rread" + case Twrite: + return "Twrite" + case Rwrite: + return "Rwrite" + case Tclunk: + return "Tclunk" + case Rclunk: + return "Rclunk" + case Tremove: + return "Tremote" + case Rremove: + return "Rremove" + case Tstat: + return "Tstat" + case Rstat: + return "Rstat" + case Twstat: + return "Twstat" + case Rwstat: + return "Rwstat" + default: + return "Tunknown" + } +} + +type Fcall struct { + Type FcallType + Tag Tag + Message Message +} + +func newFcall(msg Message) *Fcall { + var tag Tag + + switch msg.Type() { + case Tversion, Rversion: + tag = NOTAG + } + + return &Fcall{ + Type: msg.Type(), + Tag: tag, + Message: msg, + } +} + +func newErrorFcall(tag Tag, err error) *Fcall { + var msg Message + + switch v := err.(type) { + case MessageRerror: + msg = v + case *MessageRerror: + msg = *v + default: + msg = MessageRerror{Ename: v.Error()} + } + + return &Fcall{ + Type: Rerror, + Tag: tag, + Message: msg, + } +} + +func (fc *Fcall) String() string { + return fmt.Sprintf("%v(%v) %v", fc.Type, fc.Tag, string9p(fc.Message)) +} blob - /dev/null blob + d65f2687bf351dfe12c28afbe2768b858dcde720 (mode 644) --- /dev/null +++ logging.go @@ -0,0 +1,77 @@ +// +build ignore + +package p9p + +import ( + "log" + "os" +) + +type logging struct { + session Session + logger log.Logger +} + +var _ Session = &logging{} + +func NewLogger(prefix string, session Session) Session { + return &logging{ + session: session, + logger: *log.New(os.Stdout, prefix, 0), + } +} + +func (l *logging) Auth(afid Fid, uname, aname string) (Qid, error) { + qid, err := l.session.Auth(afid, uname, aname) + l.logger.Printf("Auth(%v, %s, %s) -> (%v, %v)", afid, uname, aname, qid, err) + return qid, err +} + +func (l *logging) Attach(fid, afid Fid, uname, aname string) (Qid, error) { + qid, err := l.session.Attach(fid, afid, uname, aname) + l.logger.Printf("Attach(%v, %v, %s, %s) -> (%v, %v)", fid, afid, uname, aname, qid, err) + return qid, err +} + +func (l *logging) Clunk(fid Fid) error { + return l.session.Clunk(fid) +} + +func (l *logging) Remove(fid Fid) (err error) { + defer func() { + l.logger.Printf("Remove(%v) -> %v", fid, err) + }() + return l.session.Remove(fid) +} + +func (l *logging) Walk(fid Fid, newfid Fid, names ...string) ([]Qid, error) { + return l.session.Walk(fid, newfid, names...) +} + +func (l *logging) Read(fid Fid, p []byte, offset int64) (n int, err error) { + return l.session.Read(fid, p, offset) +} + +func (l *logging) Write(fid Fid, p []byte, offset int64) (n int, err error) { + return l.session.Write(fid, p, offset) +} + +func (l *logging) Open(fid Fid, mode int32) (Qid, error) { + return l.session.Open(fid, mode) +} + +func (l *logging) Create(parent Fid, name string, perm uint32, mode uint32) (Qid, error) { + return l.session.Create(parent, name, perm, mode) +} + +func (l *logging) Stat(fid Fid) (Dir, error) { + return l.session.Stat(fid) +} + +func (l *logging) WStat(fid Fid, dir Dir) error { + return l.session.WStat(fid, dir) +} + +func (l *logging) Version(msize int32, version string) (int32, string, error) { + return l.session.Version(msize, version) +} blob - /dev/null blob + 3c2c4711b3c2fafb84ca6fb3db1b85f3ad34afaa (mode 644) --- /dev/null +++ messages.go @@ -0,0 +1,216 @@ +package p9p + +import "fmt" + +// Message represents the target of an fcall. +type Message interface { + // Type returns the type of call for the target message. + Type() FcallType +} + +// newMessage returns a new instance of the message based on the Fcall type. +func newMessage(typ FcallType) (Message, error) { + switch typ { + case Tversion: + return MessageTversion{}, nil + case Rversion: + return MessageRversion{}, nil + case Tauth: + return MessageTauth{}, nil + case Rauth: + return MessageRauth{}, nil + case Tattach: + return MessageTattach{}, nil + case Rattach: + return MessageRattach{}, nil + case Rerror: + return MessageRerror{}, nil + case Tflush: + return MessageTflush{}, nil + case Rflush: + return MessageRflush{}, nil // No message body for this response. + case Twalk: + return MessageTwalk{}, nil + case Rwalk: + return MessageRwalk{}, nil + case Topen: + return MessageTopen{}, nil + case Ropen: + return MessageRopen{}, nil + case Tcreate: + return MessageTcreate{}, nil + case Rcreate: + return MessageRcreate{}, nil + case Tread: + return MessageTread{}, nil + case Rread: + return MessageRread{}, nil + case Twrite: + return MessageTwrite{}, nil + case Rwrite: + return MessageRwrite{}, nil + case Tclunk: + return MessageTclunk{}, nil + case Rclunk: + return MessageRclunk{}, nil // no response body + case Tremove: + return MessageTremove{}, nil + case Rremove: + return MessageRremove{}, nil + case Tstat: + return MessageTstat{}, nil + case Rstat: + return MessageRstat{}, nil + case Twstat: + return MessageTwstat{}, nil + case Rwstat: + return MessageRwstat{}, nil + } + + return nil, fmt.Errorf("unknown message type") +} + +// MessageVersion encodes the message body for Tversion and Rversion RPC +// calls. The body is identical in both directions. +type MessageTversion struct { + MSize uint32 + Version string +} + +type MessageRversion struct { + MSize uint32 + Version string +} + +type MessageTauth struct { + Afid Fid + Uname string + Aname string +} + +type MessageRauth struct { + Qid Qid +} + +type MessageTflush struct { + Oldtag Tag +} + +type MessageRflush struct{} + +type MessageTattach struct { + Fid Fid + Afid Fid + Uname string + Aname string +} + +type MessageRattach struct { + Qid Qid +} + +type MessageTwalk struct { + Fid Fid + Newfid Fid + Wnames []string +} + +type MessageRwalk struct { + Qids []Qid +} + +type MessageTopen struct { + Fid Fid + Mode Flag +} + +type MessageRopen struct { + Qid Qid + IOUnit uint32 +} + +type MessageTcreate struct { + Fid Fid + Name string + Perm uint32 + Mode Flag +} + +type MessageRcreate struct { + Qid Qid + IOUnit uint32 +} + +type MessageTread struct { + Fid Fid + Offset uint64 + Count uint32 +} + +type MessageRread struct { + Data []byte +} + +type MessageTwrite struct { + Fid Fid + Offset uint64 + Data []byte +} + +type MessageRwrite struct { + Count uint32 +} + +type MessageTclunk struct { + Fid Fid +} + +type MessageRclunk struct{} + +type MessageTremove struct { + Fid Fid +} + +type MessageRremove struct{} + +type MessageTstat struct { + Fid Fid +} + +type MessageRstat struct { + Stat Dir +} + +type MessageTwstat struct { + Fid Fid + Stat Dir +} + +type MessageRwstat struct{} + +func (MessageTversion) Type() FcallType { return Tversion } +func (MessageRversion) Type() FcallType { return Rversion } +func (MessageTauth) Type() FcallType { return Tauth } +func (MessageRauth) Type() FcallType { return Rauth } +func (MessageTflush) Type() FcallType { return Tflush } +func (MessageRflush) Type() FcallType { return Rflush } +func (MessageTattach) Type() FcallType { return Tattach } +func (MessageRattach) Type() FcallType { return Rattach } +func (MessageTwalk) Type() FcallType { return Twalk } +func (MessageRwalk) Type() FcallType { return Rwalk } +func (MessageTopen) Type() FcallType { return Topen } +func (MessageRopen) Type() FcallType { return Ropen } +func (MessageTcreate) Type() FcallType { return Tcreate } +func (MessageRcreate) Type() FcallType { return Rcreate } +func (MessageTread) Type() FcallType { return Tread } +func (MessageRread) Type() FcallType { return Rread } +func (MessageTwrite) Type() FcallType { return Twrite } +func (MessageRwrite) Type() FcallType { return Rwrite } +func (MessageTclunk) Type() FcallType { return Tclunk } +func (MessageRclunk) Type() FcallType { return Rclunk } +func (MessageTremove) Type() FcallType { return Tremove } +func (MessageRremove) Type() FcallType { return Rremove } +func (MessageTstat) Type() FcallType { return Tstat } +func (MessageRstat) Type() FcallType { return Rstat } +func (MessageTwstat) Type() FcallType { return Twstat } +func (MessageRwstat) Type() FcallType { return Rwstat } blob - /dev/null blob + 4be04253703a1b05cad060ebd8df239a38e272fb (mode 644) --- /dev/null +++ server.go @@ -0,0 +1,171 @@ +package p9p + +import ( + "log" + "net" + "time" + + "golang.org/x/net/context" +) + +// Serve the 9p session over the provided network connection. +func Serve(ctx context.Context, conn net.Conn, handler Handler) { + + // TODO(stevvooe): It would be nice if the handler could declare the + // supported version. Before we had handler, we used the session to get + // the version (msize, version := session.Version()). We must decided if + // we want to proxy version and message size decisions all the back to the + // origin server or make those decisions at each link of a proxy chain. + + ch := newChannel(conn, codec9p{}, DefaultMSize) + negctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + // TODO(stevvooe): For now, we negotiate here. It probably makes sense to + // do this outside of this function and then pass in a ready made channel. + // We are not really ready to export the channel type yet. + + if err := servernegotiate(negctx, ch, DefaultVersion); err != nil { + // TODO(stevvooe): Need better error handling and retry support here. + // For now, we silently ignore the failure. + log.Println("error negotiating version:", err) + return + } + + ctx = withVersion(ctx, DefaultVersion) + + s := &server{ + ctx: ctx, + ch: ch, + handler: handler, + closed: make(chan struct{}), + } + + s.run() +} + +type server struct { + ctx context.Context + session Session + ch Channel + handler Handler + closed chan struct{} +} + +type activeTag struct { + ctx context.Context + request *Fcall + cancel context.CancelFunc + responded bool // true, if some response was sent (Response or Rflush/Rerror) +} + +func (s *server) run() { + tags := map[Tag]*activeTag{} // active requests + + log.Println("server.run()") + for { + select { + case <-s.ctx.Done(): + log.Println("server: context done") + return + case <-s.closed: + log.Println("server: shutdown") + default: + } + + // BUG(stevvooe): This server blocks on reads, calls to handlers and + // write, effectively single tracking fcalls through a target + // dispatcher. There is no reason we couldn't parallelize these + // requests out to the dispatcher to get massive performance + // improvements. + + log.Println("server:", "wait") + req := new(Fcall) + if err := s.ch.ReadFcall(s.ctx, req); err != nil { + if err, ok := err.(net.Error); ok { + if err.Timeout() || err.Temporary() { + continue + } + } + + log.Println("server: error reading fcall", err) + return + } + + if _, ok := tags[req.Tag]; ok { + resp := newErrorFcall(req.Tag, ErrDuptag) + if err := s.ch.WriteFcall(s.ctx, resp); err != nil { + log.Printf("error sending duplicate tag response: %v", err) + } + continue + } + + // handle flush calls. The tag never makes it into active from here. + if mf, ok := req.Message.(MessageTflush); ok { + log.Println("flushing message", mf.Oldtag) + + // check if we have actually know about the requested flush + active, ok := tags[mf.Oldtag] + if ok { + active.cancel() // cancel the context + + resp := newFcall(MessageRflush{}) + resp.Tag = req.Tag + if err := s.ch.WriteFcall(s.ctx, resp); err != nil { + log.Printf("error responding to flush: %v", err) + } + active.responded = true + } else { + resp := newErrorFcall(req.Tag, ErrUnknownTag) + if err := s.ch.WriteFcall(s.ctx, resp); err != nil { + log.Printf("error responding to flush: %v", err) + } + } + + continue + } + + // TODO(stevvooe): Add handler timeout here, as well, if we desire. + + // Allows us to signal handlers to cancel processing of the fcall + // through context. + ctx, cancel := context.WithCancel(s.ctx) + + tags[req.Tag] = &activeTag{ + ctx: ctx, + request: req, + cancel: cancel, + } + + var resp *Fcall + msg, err := s.handler.Handle(ctx, req.Message) + if err != nil { + // all handler errors are forwarded as protocol errors. + resp = newErrorFcall(req.Tag, err) + } else { + resp = newFcall(msg) + } + resp.Tag = req.Tag + + if err := ctx.Err(); err != nil { + // NOTE(stevvooe): We aren't really getting our moneys worth for + // how this is being handled. We really need to dispatch each + // request handler to a separate thread. + + // the context was canceled for some reason, perhaps timeout or + // due to a flush call. We treat this as a condition where a + // response should not be sent. + log.Println("context error:", err) + continue + } + + if !tags[req.Tag].responded { + if err := s.ch.WriteFcall(ctx, resp); err != nil { + log.Println("server: error writing fcall:", err) + continue + } + } + + delete(tags, req.Tag) + } +} blob - /dev/null blob + ff2fd20cbe5bddfe06b2157dc145d0fc537287d0 (mode 644) --- /dev/null +++ session.go @@ -0,0 +1,49 @@ +package p9p + +import ( + "net" + + "golang.org/x/net/context" +) + +// Session provides the central abstraction for a 9p connection. Clients +// implement sessions and servers serve sessions. Sessions can be proxied by +// serving up a client session. +// +// The interface is also wired up with full context support to manage timeouts +// and resource clean up. +// +// Session represents the operations covered in section 5 of the plan 9 manual +// (http://man.cat-v.org/plan_9/5/). Requests are managed internally, so the +// Flush method is handled by the internal implementation. Consider preceeding +// these all with context to control request timeout. +type Session interface { + Auth(ctx context.Context, afid Fid, uname, aname string) (Qid, error) + Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) + Clunk(ctx context.Context, fid Fid) error + Remove(ctx context.Context, fid Fid) error + Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) + Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) + Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) + Open(ctx context.Context, fid Fid, mode Flag) (Qid, uint32, error) + Create(ctx context.Context, parent Fid, name string, perm uint32, mode Flag) (Qid, uint32, error) + Stat(ctx context.Context, fid Fid) (Dir, error) + WStat(ctx context.Context, fid Fid, dir Dir) error + + // Version returns the supported version and msize of the session. This + // can be affected by negotiating or the level of support provided by the + // session implementation. + Version() (msize int, version string) +} + +func Dial(ctx context.Context, addr string) (Session, error) { + c, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + + // BUG(stevvooe): Session doesn't actually close connection. Dial might + // not be the right interface. + + return NewSession(ctx, c) +} blob - /dev/null blob + 441111086f8fac97bbe5efa2a66485e1d18edb9c (mode 644) --- /dev/null +++ transport.go @@ -0,0 +1,219 @@ +package p9p + +import ( + "fmt" + "log" + "net" + + "golang.org/x/net/context" +) + +// roundTripper manages the request and response from the client-side. A +// roundTripper must abide by similar rules to the http.RoundTripper. +// Typically, the roundTripper will manage tag assignment and message +// serialization. +type roundTripper interface { + send(ctx context.Context, msg Message) (Message, error) +} + +// transport plays the role of being a client channel manager. It multiplexes +// function calls onto the wire and dispatches responses to blocking calls to +// send. On the whole, transport is thread-safe for calling send +type transport struct { + ctx context.Context + ch Channel + requests chan *fcallRequest + closed chan struct{} + + tags uint16 +} + +var _ roundTripper = &transport{} + +func newTransport(ctx context.Context, ch *channel) roundTripper { + t := &transport{ + ctx: ctx, + ch: ch, + requests: make(chan *fcallRequest), + closed: make(chan struct{}), + } + + go t.handle() + + return t +} + +type fcallRequest struct { + ctx context.Context + fcall *Fcall + response chan *Fcall + err chan error +} + +func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest { + return &fcallRequest{ + ctx: ctx, + fcall: fcall, + response: make(chan *Fcall, 1), + err: make(chan error, 1), + } +} + +func (t *transport) send(ctx context.Context, msg Message) (Message, error) { + fcall := newFcall(msg) + req := newFcallRequest(ctx, fcall) + + // dispatch the request. + select { + case <-t.closed: + return nil, ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + case t.requests <- req: + } + + // wait for the response. + select { + case <-t.closed: + return nil, ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-req.err: + return nil, err + case resp := <-req.response: + if resp.Type == Rerror { + // pack the error into something useful + respmesg, ok := resp.Message.(MessageRerror) + if !ok { + return nil, fmt.Errorf("invalid error response: %v", resp) + } + + return nil, respmesg + } + + return resp.Message, nil + } +} + +// handle takes messages off the wire and wakes up the waiting tag call. +func (t *transport) handle() { + defer func() { + log.Println("exited handle loop") + t.Close() + }() + // the following variable block are protected components owned by this thread. + var ( + responses = make(chan *Fcall) + tags Tag + // outstanding provides a map of tags to outstanding requests. + outstanding = map[Tag]*fcallRequest{} + ) + + // loop to read messages off of the connection + go func() { + defer func() { + log.Println("exited read loop") + t.Close() + }() + loop: + for { + fcall := new(Fcall) + if err := t.ch.ReadFcall(t.ctx, fcall); err != nil { + switch err := err.(type) { + case net.Error: + if err.Timeout() || err.Temporary() { + // BUG(stevvooe): There may be partial reads under + // timeout errors where this is actually fatal. + + // can only retry if we haven't offset the frame. + continue loop + } + } + + log.Println("fatal error reading msg:", err) + t.Close() + return + } + + select { + case <-t.ctx.Done(): + log.Println("ctx done") + return + case <-t.closed: + log.Println("transport closed") + return + case responses <- fcall: + } + } + }() + + for { + log.Println("wait...") + select { + case req := <-t.requests: + if req.fcall.Tag == NOTAG { + // NOTE(stevvooe): We disallow fcalls with NOTAG to come + // through this path since we can't join the tagged response + // with the waiting caller. This is typically used for the + // Tversion/Rversion round trip to setup a session. + // + // It may be better to allow these through but block all + // requests until a notag message has a response. + + req.err <- fmt.Errorf("disallowed tag through transport") + continue + } + + // BUG(stevvooe): This is an awful tag allocation procedure. + // Replace this with something that let's us allocate tags and + // associate data with them, returning to them to a pool when + // complete. Such a system would provide a lot of information + // about outstanding requests. + tags++ + req.fcall.Tag = tags + outstanding[req.fcall.Tag] = req + + // TODO(stevvooe): Consider the case of requests that never + // receive a response. We need to remove the fcall context from + // the tag map and dealloc the tag. We may also want to send a + // flush for the tag. + if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil { + delete(outstanding, req.fcall.Tag) + req.err <- err + } + case b := <-responses: + req, ok := outstanding[b.Tag] + if !ok { + panic("unknown tag received") + } + delete(outstanding, req.fcall.Tag) + + req.response <- b + + // TODO(stevvooe): Reclaim tag id. + case <-t.ctx.Done(): + return + case <-t.closed: + return + } + } +} + +func (t *transport) flush(ctx context.Context, tag Tag) error { + // TODO(stevvooe): We need to fire and forget flush messages when a call + // context gets cancelled. + panic("not implemented") +} + +func (t *transport) Close() error { + select { + case <-t.closed: + return ErrClosed + case <-t.ctx.Done(): + return t.ctx.Err() + default: + close(t.closed) + } + + return nil +} blob - /dev/null blob + 5fe03c6db69fbdaf081004818370c0302cf4145b (mode 644) --- /dev/null +++ types.go @@ -0,0 +1,131 @@ +package p9p + +import ( + "fmt" + "time" +) + +const ( + DefaultMSize = 64 << 10 + DefaultVersion = "9P2000" +) + +const ( + DMDIR = 0x80000000 // mode bit for directories + DMAPPEND = 0x40000000 // mode bit for append only files + DMEXCL = 0x20000000 // mode bit for exclusive use files + DMMOUNT = 0x10000000 // mode bit for mounted channel + DMAUTH = 0x08000000 // mode bit for authentication file + DMTMP = 0x04000000 // mode bit for non-backed-up files + + // 9p2000.u extensions + DMSYMLINK = 0x02000000 + DMDEVICE = 0x00800000 + DMNAMEDPIPE = 0x00200000 + DMSOCKET = 0x00100000 + DMSETUID = 0x00080000 + DMSETGID = 0x00040000 + + DMREAD = 0x4 // mode bit for read permission + DMWRITE = 0x2 // mode bit for write permission + DMEXEC = 0x1 // mode bit for execute permission +) + +// Flag defines the flag type for use with open and create +type Flag uint8 + +const ( + OREAD Flag = 0x00 // open for read + OWRITE = 0x01 // write + ORDWR = 0x02 // read and write + OEXEC = 0x03 // execute, == read but check execute permission + + // PROPOSAL(stevvooe): Possible protocal extension to allow the create of + // symlinks. Initially, the link is created with no value. Read and write + // to read and set the link value. + OSYMLINK = 0x04 + + // or'd in + OTRUNC = 0x10 // or'ed in (except for exec), truncate file first + OCEXEC = 0x20 // or'ed in, close on exec + ORCLOSE = 0x40 // or'ed in, remove on close +) + +// QType indicates the type of a resource within the Qid. +type QType uint8 + +const ( + QTDIR QType = 0x80 // type bit for directories + QTAPPEND = 0x40 // type bit for append only files + QTEXCL = 0x20 // type bit for exclusive use files + QTMOUNT = 0x10 // type bit for mounted channel + QTAUTH = 0x08 // type bit for authentication file + QTTMP = 0x04 // type bit for not-backed-up file + QTFILE = 0x00 // plain file +) + +func (qt QType) String() string { + switch qt { + case QTDIR: + return "dir" + case QTAPPEND: + return "append" + case QTEXCL: + return "excl" + case QTMOUNT: + return "mount" + case QTAUTH: + return "auth" + case QTTMP: + return "tmp" + case QTFILE: + return "file" + } + + return "unknown" +} + +// Tag uniquely identifies an outstanding fcall in a 9p session. +type Tag uint16 + +const NOTAG Tag = ^Tag(0) + +type Fid uint32 + +const NOFID Fid = ^Fid(0) + +type Qid struct { + Type QType `9p:type,1` + Version uint32 + Path uint64 +} + +func (qid Qid) String() string { + return fmt.Sprintf("qid(%v, v=%x, p=%x)", + qid.Type, qid.Version, qid.Path) +} + +type Dir struct { + Type uint16 + Dev uint32 + Qid Qid + Mode uint32 + + // BUG(stevvooe): The Year 2038 is coming soon. 9p wire protocol has these + // as 4 byte epoch times. Some possibilities include time dilation fields + // or atemporal files. We can also just not use them and set them to zero. + + AccessTime time.Time + ModTime time.Time + + Length uint64 + Name string + UID string + GID string + MUID string +} + +func (d Dir) String() string { + return fmt.Sprintf("dir(%v mode=%v atime=%v mtime=%v length=%v name=%v uid=%v gid=%v muid=%v)", + d.Qid, d.Mode, d.AccessTime, d.ModTime, d.Length, d.Name, d.UID, d.GID, d.MUID) +} blob - /dev/null blob + d23b4aae4ce948d568c577af4bbea35ecfa92f74 (mode 644) --- /dev/null +++ version.go @@ -0,0 +1,106 @@ +package p9p + +import ( + "fmt" + + "golang.org/x/net/context" +) + +// NOTE(stevvooe): This file contains functions for negotiating version on the +// client and server. There are some nasty details to get right for +// downgrading the connection on the server-side that are not present yet. +// Really, these should be refactored into some sort of channel type that can +// support resets through version messages during the protocol exchange. + +// clientnegotiate negiotiates the protocol version using channel, blocking +// until a response is received. The received value will be the version +// implemented by the server. +func clientnegotiate(ctx context.Context, ch Channel, version string) (string, error) { + req := newFcall(MessageTversion{ + MSize: uint32(ch.MSize()), + Version: version, + }) + + if err := ch.WriteFcall(ctx, req); err != nil { + return "", err + } + + resp := new(Fcall) + if err := ch.ReadFcall(ctx, resp); err != nil { + return "", err + } + + switch v := resp.Message.(type) { + case MessageRversion: + + if v.Version != version { + // TODO(stevvooe): A stubborn client indeed! + return "", fmt.Errorf("unsupported server version: %v", version) + } + + if int(v.MSize) > ch.MSize() { + // upgrade msize if server differs. + ch.SetMSize(int(v.MSize)) + } + + return v.Version, nil + case error: + return "", v + default: + return "", ErrUnexpectedMsg + } +} + +// servernegotiate blocks until a version message is received or a timeout +// occurs. The msize for the tranport will be set from the negotiation. If +// negotiate returns nil, a server may proceed with the connection. +// +// In the future, it might be better to handle the version messages in a +// separate object that manages the session. Each set of version requests +// effectively "reset" a connection, meaning all fids get clunked and all +// outstanding IO is aborted. This is probably slightly racy, in practice with +// a misbehaved client. The main issue is that we cannot tell which session +// messages belong to. +func servernegotiate(ctx context.Context, ch Channel, version string) error { + // wait for the version message over the transport. + req := new(Fcall) + if err := ch.ReadFcall(ctx, req); err != nil { + return err + } + + mv, ok := req.Message.(MessageTversion) + if !ok { + return fmt.Errorf("expected version message: %v", mv) + } + + respmsg := MessageRversion{ + Version: version, + } + + if mv.Version != version { + // TODO(stevvooe): Not the best place to do version handling. We need + // to have a way to pass supported versions into this method then have + // it return the actual version. For now, respond with unknown for + // anything that doesn't match the provided version string. + respmsg.Version = "unknown" + } + + if int(mv.MSize) < ch.MSize() { + // if the server msize is too large, use the client's suggested msize. + ch.SetMSize(int(mv.MSize)) + respmsg.MSize = mv.MSize + } else { + respmsg.MSize = uint32(ch.MSize()) + } + + resp := newFcall(respmsg) + if err := ch.WriteFcall(ctx, resp); err != nil { + return err + } + + if respmsg.Version == "unknown" { + return fmt.Errorf("bad version negotiation") + } + + return nil +}