commit b714e9e06d490b28c46fed5f6e093dd569908848 from: Stephen J Day date: Thu Nov 05 06:46:27 2015 UTC fs/p9p/new: describe handler interface and polish Signed-off-by: Stephen J Day commit - 7d20322689f2c839df114432c36ff372554d6a80 commit + b714e9e06d490b28c46fed5f6e093dd569908848 blob - 91c7ce686abe1d94fdf3f2832597df6565738a67 blob + 665ea9208134d6d3b2fa184b6b9a6bc6a78681f0 --- cmd/9pr/main.go +++ cmd/9pr/main.go @@ -135,6 +135,11 @@ func main() { ctx, _ = context.WithTimeout(commander.ctx, 5*time.Second) if err := cmd(ctx, args[1:]...); err != nil { + if err == p9pnew.ErrClosed { + log.Println("connection closed, shutting down") + return + } + log.Printf("👹 %s: %v", name, err) } } blob - e190166973a54ec0561999305e6166b8d9f360ab blob + dcb1c5a59454fab0cdb3081c4c44e33a59cad080 --- cmd/9ps/main.go +++ cmd/9ps/main.go @@ -55,7 +55,7 @@ func main() { return } - p9pnew.Serve(ctx, conn, session) + p9pnew.Serve(ctx, conn, p9pnew.Dispatch(session)) }(c) } } blob - 6296fb7dffaf52cc6dd05d3d762673ce0dd2ca2e blob + 312755fa68fec6abe0d05a6a2e4bb6f4e3118113 --- dispatcher.go +++ dispatcher.go @@ -1,173 +1,132 @@ package p9pnew -import ( - "fmt" +import "golang.org/x/net/context" - "golang.org/x/net/context" -) +// Handler defines an interface for 9p message handlers. A handler +// implementation could be used to intercept calls of all types before sending +// them to the next handler. +type Handler interface { + Handle(ctx context.Context, msg Message) (Message, error) -type handler interface { - handle(ctx context.Context, req *Fcall) (*Fcall, error) + // TODO(stevvooe): Right now, this interface is functianally identical to + // roundtripper. If we find that this is sufficient on the server-side, we + // may unify the types. For now, we leave them separated to differentiate + // between them. } -// dispatcher routes fcalls to a Session. -type dispatcher struct { - session Session +// HandlerFunc is a convenience type for defining inline handlers. +type HandlerFunc func(ctx context.Context, msg Message) (Message, error) + +func (fn HandlerFunc) Handle(ctx context.Context, msg Message) (Message, error) { + return fn(ctx, msg) } -// handle responds to an fcall using the session. An error is only returned if -// the handler cannot proceed. All session errors are returned as Rerror. -func (d *dispatcher) handle(ctx context.Context, req *Fcall) (*Fcall, error) { - var resp *Fcall - switch req.Type { - case Tauth: - reqmsg, ok := req.Message.(MessageTauth) - if !ok { - return nil, fmt.Errorf("incorrect message for type: %v message=%v", req, req.Message) - } +// Dispatch returns a handler that dispatches messages to the target session. +// No concurrency is managed by the returned handler. It simply turns messages +// into function calls on the session. +func Dispatch(session Session) Handler { + return HandlerFunc(func(ctx context.Context, msg Message) (Message, error) { + switch msg := msg.(type) { + case MessageTauth: + qid, err := session.Auth(ctx, msg.Afid, msg.Uname, msg.Aname) + if err != nil { + return nil, err + } - qid, err := d.session.Auth(ctx, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname) - if err != nil { - return nil, err - } + return MessageRauth{Qid: qid}, nil + case MessageTattach: + qid, err := session.Attach(ctx, msg.Fid, msg.Afid, msg.Uname, msg.Aname) + if err != nil { + return nil, err + } - resp = newFcall(MessageRauth{Qid: qid}) - case Tattach: - reqmsg, ok := req.Message.(MessageTattach) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message) - } + return MessageRattach{ + Qid: qid, + }, nil + case MessageTwalk: + // TODO(stevvooe): This is one of the places where we need to manage + // fid allocation lifecycle. We need to reserve the fid, then, if this + // call succeeds, we should alloc the fid for future uses. Also need + // to interact correctly with concurrent clunk and the flush of this + // walk message. + qids, err := session.Walk(ctx, msg.Fid, msg.Newfid, msg.Wnames...) + if err != nil { + return nil, err + } - qid, err := d.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname) - if err != nil { - return nil, err - } + return MessageRwalk{ + Qids: qids, + }, nil + case MessageTopen: + qid, iounit, err := session.Open(ctx, msg.Fid, msg.Mode) + if err != nil { + return nil, err + } - resp = newFcall(MessageRattach{ - Qid: qid, - }) - case Twalk: - reqmsg, ok := req.Message.(MessageTwalk) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message) - } + return MessageRopen{ + Qid: qid, + IOUnit: iounit, + }, nil + case MessageTcreate: + qid, iounit, err := session.Create(ctx, msg.Fid, msg.Name, msg.Perm, msg.Mode) + if err != nil { + return nil, err + } - // TODO(stevvooe): This is one of the places where we need to manage - // fid allocation lifecycle. We need to reserve the fid, then, if this - // call succeeds, we should alloc the fid for future uses. Also need - // to interact correctly with concurrent clunk and the flush of this - // walk message. - qids, err := d.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...) - if err != nil { - return nil, err - } + return MessageRcreate{ + Qid: qid, + IOUnit: iounit, + }, nil + case MessageTread: + p := make([]byte, int(msg.Count)) + n, err := session.Read(ctx, msg.Fid, p, int64(msg.Offset)) + if err != nil { + return nil, err + } - resp = newFcall(MessageRwalk{ - Qids: qids, - }) - case Topen: - reqmsg, ok := req.Message.(MessageTopen) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) - } + return MessageRread{ + Data: p[:n], + }, nil + case MessageTwrite: + n, err := session.Write(ctx, msg.Fid, msg.Data, int64(msg.Offset)) + if err != nil { + return nil, err + } - qid, iounit, err := d.session.Open(ctx, reqmsg.Fid, reqmsg.Mode) - if err != nil { - return nil, err - } + return MessageRwrite{ + Count: uint32(n), + }, nil + case MessageTclunk: + // TODO(stevvooe): Manage the clunking of file descriptors based on + // walk and attach call progression. + if err := session.Clunk(ctx, msg.Fid); err != nil { + return nil, err + } - resp = newFcall(MessageRopen{ - Qid: qid, - IOUnit: iounit, - }) - case Tcreate: - reqmsg, ok := req.Message.(MessageTcreate) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) - } + return MessageRclunk{}, nil + case MessageTremove: + if err := session.Remove(ctx, msg.Fid); err != nil { + return nil, err + } - qid, iounit, err := d.session.Create(ctx, reqmsg.Fid, reqmsg.Name, reqmsg.Perm, reqmsg.Mode) - if err != nil { - return nil, err - } + return MessageRremove{}, nil + case MessageTstat: + dir, err := session.Stat(ctx, msg.Fid) + if err != nil { + return nil, err + } - resp = newFcall(MessageRcreate{ - Qid: qid, - IOUnit: iounit, - }) + return MessageRstat{ + Stat: dir, + }, nil + case MessageTwstat: + if err := session.WStat(ctx, msg.Fid, msg.Stat); err != nil { + return nil, err + } - case Tread: - reqmsg, ok := req.Message.(MessageTread) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) + return MessageRwstat{}, nil + default: + return nil, ErrUnknownMsg } - - p := make([]byte, int(reqmsg.Count)) - n, err := d.session.Read(ctx, reqmsg.Fid, p, int64(reqmsg.Offset)) - if err != nil { - return nil, err - } - - resp = newFcall(MessageRread{ - Data: p[:n], - }) - case Twrite: - reqmsg, ok := req.Message.(MessageTwrite) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) - } - - n, err := d.session.Write(ctx, reqmsg.Fid, reqmsg.Data, int64(reqmsg.Offset)) - if err != nil { - return nil, err - } - - resp = newFcall(MessageRwrite{ - Count: uint32(n), - }) - case Tclunk: - reqmsg, ok := req.Message.(MessageTclunk) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) - } - - // TODO(stevvooe): Manage the clunking of file descriptors based on - // walk and attach call progression. - if err := d.session.Clunk(ctx, reqmsg.Fid); err != nil { - return nil, err - } - - resp = newFcall(MessageRclunk{}) - case Tremove: - reqmsg, ok := req.Message.(MessageTremove) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) - } - - if err := d.session.Remove(ctx, reqmsg.Fid); err != nil { - return nil, err - } - - resp = newFcall(MessageRremove{}) - case Tstat: - reqmsg, ok := req.Message.(MessageTstat) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) - } - - dir, err := d.session.Stat(ctx, reqmsg.Fid) - if err != nil { - return nil, err - } - - resp = newFcall(MessageRstat{ - Stat: dir, - }) - case Twstat: - panic("not implemented") - default: - return nil, ErrUnknownMsg - } - - return resp, nil + }) } blob - 17ba2f9afa9bd7cf54fd6e4e6fb7cdec1a31a392 blob + d1a1644ae93485a44af0e16a36dd6a2de7467ed4 --- doc.go +++ doc.go @@ -4,11 +4,75 @@ in modern, production Go services. This package differ is has departed from the plan 9 implementation primitives and better follows idiomatic Go style. +The package revolves around the session type, which is an enumeration of raw +9p message calls. A few calls, such as flush and version, have been elided, +defering their usage to the server implementation. Sessions can be trivially +proxied through clients and servers. + +Getting Started + +The best place to get started is with Serve. Serve can be provided a +connection and a handler. A typical implementation will call Serve as part of +a listen/accept loop. As each network connection is created, Serve can be +called with a handler for the specific connection. The handler can be +implemented with a Session via the Dispatch function or can generate sessions +for dispatch in response to client messages. (See cmd/9ps for an example) + +On the client side, NewSession provides a 9p session from a connection. After +a version negotiation, methods can be called on the session, in parallel, and +calls will be sent over the connection. Call timeouts can be controlled via +the context provided to each method call. + +Framework + +This package has the beginning of a nice client-server framework for working +with 9p. Some of the abstractions aren't entirely fleshed out, but most of +this can center around the Handler. + +Missing from this are a number of tools for implementing 9p servers. The most +glaring are directory read and walk helpers. Other, more complex additions +might be a system to manage in memory filesystem trees that expose multi-user +sessions. + +Differences + +The largest difference between this package and other 9p packages is +simplification of the types needed to implement a server. To avoid confusing +bugs and odd behavior, the components are separated by each level of the +protocol. One example is that requests and responses are separated and they no +longer hold mutable state. This means that framing, transport management, +encoding, and dispatching are componentized. Little work will be required to +swap out encodings, transports or connection implementations. + +Context Integration + +This package has been wired from top to bottom to support context-based +resource management. Everything from startup to shutdown can have timeouts +using contexts. Not all close methods are fully in place, but we are very +close to having controlled, predictable cleanup for both servers and clients. +Timeouts can be very granular or very course, depending on the context of the +timeout. For example, it is very easy to set a short timeout for a stat call +but a long timeout for reading data. + Multiversion Support Currently, there is not multiversion support. The hooks and functionality are in place to add multi-version support. Generally, the correct space to do this is in the codec. Types, such as Dir, simply need to be extended to support the possibility of extra fields. + +The real question to ask here is what is the role of the version number in the +9p protocol. It really comes down to the level of support required. Do we just +need it at the protocol level, or do handlers and sessions need to be have +differently based on negotiated versions? + +Caveats + +This package has a number of TODOs to make it easier to use. Most of the +existing code provides a solid base to work from. Don't be discouraged by the +sawdust. + +In addition, the testing is embarassingly lacking. With time, we can get full +testing going and ensure we have confidence in the implementation. */ package p9pnew blob - bcb9d3529472a2cb92f44025dfeca20de9d5876d blob + 6a2c75e2341826863a836e5c7336f924e79113ac --- server.go +++ server.go @@ -9,9 +9,15 @@ import ( ) // Serve the 9p session over the provided network connection. -func Serve(ctx context.Context, conn net.Conn, session Session) { - msize, version := session.Version() - ch := newChannel(conn, codec9p{}, msize) +func Serve(ctx context.Context, conn net.Conn, handler Handler) { + + // TODO(stevvooe): It would be nice if the handler could declare the + // supported version. Before we had handler, we used the session to get + // the version (msize, version := session.Version()). We must decided if + // we want to proxy version and message size decisions all the back to the + // origin server or make those decisions at each link of a proxy chain. + + ch := newChannel(conn, codec9p{}, DefaultMSize) negctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() @@ -19,19 +25,19 @@ func Serve(ctx context.Context, conn net.Conn, session // do this outside of this function and then pass in a ready made channel. // We are not really ready to export the channel type yet. - if err := servernegotiate(negctx, ch, version); err != nil { + if err := servernegotiate(negctx, ch, DefaultVersion); err != nil { // TODO(stevvooe): Need better error handling and retry support here. // For now, we silently ignore the failure. log.Println("error negotiating version:", err) return } - ctx = withVersion(ctx, version) + ctx = withVersion(ctx, DefaultVersion) s := &server{ ctx: ctx, ch: ch, - handler: &dispatcher{session: session}, + handler: handler, closed: make(chan struct{}), } @@ -42,7 +48,7 @@ type server struct { ctx context.Context session Session ch Channel - handler handler + handler Handler closed chan struct{} } @@ -60,9 +66,10 @@ func (s *server) run() { for { select { case <-s.ctx.Done(): - log.Println("server: shutdown") + log.Println("server: context done") return case <-s.closed: + log.Println("server: shutdown") default: } @@ -75,8 +82,14 @@ func (s *server) run() { log.Println("server:", "wait") req := new(Fcall) if err := s.ch.ReadFcall(s.ctx, req); err != nil { + if err, ok := err.(net.Error); ok { + if err.Timeout() || err.Temporary() { + continue + } + } + log.Println("server: error reading fcall", err) - continue + return } if _, ok := tags[req.Tag]; ok { @@ -124,10 +137,13 @@ func (s *server) run() { cancel: cancel, } - resp, err := s.handler.handle(ctx, req) + var resp *Fcall + msg, err := s.handler.Handle(ctx, req.Message) if err != nil { // all handler errors are forwarded as protocol errors. resp = newErrorFcall(req.Tag, err) + } else { + resp = newFcall(msg) } resp.Tag = req.Tag blob - 2fac1b1c58190f943b61509cb4cb951170a95b4f blob + 6eef3152ad34bb1bb5927911229766aceba239e7 --- session.go +++ session.go @@ -27,8 +27,8 @@ type Session interface { Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) Open(ctx context.Context, fid Fid, mode Flag) (Qid, uint32, error) Create(ctx context.Context, parent Fid, name string, perm uint32, mode Flag) (Qid, uint32, error) - Stat(context.Context, Fid) (Dir, error) - WStat(context.Context, Fid, Dir) error + Stat(ctx context.Context, fid Fid) (Dir, error) + WStat(ctx context.Context, fid Fid, dir Dir) error // Version returns the supported version and msize of the session. This // can be affected by negotiating or the level of support provided by the blob - 9387f48e851e79a80260517317a258511a43253c blob + 8d44e875d5083ae7baba35b73ddd5b16d4c71409 --- transport.go +++ transport.go @@ -99,7 +99,7 @@ func (t *transport) send(ctx context.Context, msg Mess func (t *transport) handle() { defer func() { log.Println("exited handle loop") - close(t.closed) + t.Close() }() // the following variable block are protected components owned by this thread. var ( @@ -113,7 +113,7 @@ func (t *transport) handle() { go func() { defer func() { log.Println("exited read loop") - close(t.closed) + t.Close() }() loop: for { blob - afcc1d2bf0f3c6eafe7ca67bd79d5221bee7fd82 blob + c69518c669475a28ff5c7e3cb5e54f3786f6ba9d --- version.go +++ version.go @@ -68,7 +68,7 @@ func servernegotiate(ctx context.Context, ch Channel, return err } - mv, ok := req.Message.(*MessageTversion) + mv, ok := req.Message.(MessageTversion) if !ok { return fmt.Errorf("expected version message: %v", mv) }