commit 641595f816e782a5b39e5453e7afeed70c617ae0 from: Stephen J Day date: Sat Nov 14 00:48:44 2015 UTC pkg/p9p: refactor ServeConn to make room for Server Signed-off-by: Stephen J Day commit - 78cab7f39e3b5a0a8afd0a2f90564dfd049423a2 commit + 641595f816e782a5b39e5453e7afeed70c617ae0 blob - 432b3265bbeade0eef0f877bda4e9ef8d03d30b0 blob + 12bcfc1e68545d3ba2e4b41805d564e7cdd5a5f3 --- cmd/9ps/main.go +++ cmd/9ps/main.go @@ -55,7 +55,9 @@ func main() { return } - p9p.Serve(ctx, conn, p9p.Dispatch(session)) + if err := p9p.ServeConn(ctx, conn, p9p.Dispatch(session)); err != nil { + log.Printf("serving conn: %v", err) + } }(c) } } blob - a26cd69172fb7be642c1163f04d269b242745a9a blob + db99d90caebaa4a7090817bb8d7114815121e554 --- server.go +++ server.go @@ -1,6 +1,7 @@ package p9p import ( + "fmt" "log" "net" "time" @@ -8,16 +9,20 @@ import ( "golang.org/x/net/context" ) -// Serve the 9p session over the provided network connection. -func Serve(ctx context.Context, conn net.Conn, handler Handler) { +// TODO(stevvooe): Add net/http.Server-like type here to manage connections. +// Coupled with Handler mux, we can get a very http-like experience for 9p +// servers. +// ServeConn the 9p handler over the provided network connection. +func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error { + // TODO(stevvooe): It would be nice if the handler could declare the // supported version. Before we had handler, we used the session to get // the version (msize, version := session.Version()). We must decided if // we want to proxy version and message size decisions all the back to the // origin server or make those decisions at each link of a proxy chain. - ch := newChannel(conn, codec9p{}, DefaultMSize) + ch := newChannel(cn, codec9p{}, DefaultMSize) negctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() @@ -27,29 +32,29 @@ func Serve(ctx context.Context, conn net.Conn, handler if err := servernegotiate(negctx, ch, DefaultVersion); err != nil { // TODO(stevvooe): Need better error handling and retry support here. - // For now, we silently ignore the failure. - log.Println("error negotiating version:", err) - return + return fmt.Errorf("error negotiating version:", err) } ctx = withVersion(ctx, DefaultVersion) - s := &server{ + c := &conn{ ctx: ctx, ch: ch, handler: handler, closed: make(chan struct{}), } - s.run() + return c.serve() } -type server struct { +// conn plays role of session dispatch for handler in a server. +type conn struct { ctx context.Context session Session ch Channel handler Handler closed chan struct{} + err error // terminal error for the conn } // activeRequest includes information about the active request. @@ -59,7 +64,8 @@ type activeRequest struct { cancel context.CancelFunc } -func (s *server) run() { +// serve messages on the connection until an error is encountered. +func (c *conn) serve() error { tags := map[Tag]*activeRequest{} // active requests requests := make(chan *Fcall) // sync, read-limited @@ -67,50 +73,9 @@ func (s *server) run() { completed := make(chan *Fcall, 1) // read loop - go func() { - for { - req := new(Fcall) - if err := s.ch.ReadFcall(s.ctx, req); err != nil { - if err, ok := err.(net.Error); ok { - if err.Timeout() || err.Temporary() { - continue - } - } + go c.read(requests) + go c.write(responses) - log.Println("server: error reading fcall", err) - return - } - - select { - case requests <- req: - case <-s.ctx.Done(): - log.Println("server: context done") - return - case <-s.closed: - log.Println("server: shutdown") - return - } - } - }() - - // write loop - go func() { - for { - select { - case resp := <-responses: - if err := s.ch.WriteFcall(s.ctx, resp); err != nil { - log.Println("server: error writing fcall:", err) - } - case <-s.ctx.Done(): - log.Println("server: context done") - return - case <-s.closed: - log.Println("server: shutdown") - return - } - } - }() - log.Println("server.run()") for { log.Println("server:", "wait") @@ -121,10 +86,10 @@ func (s *server) run() { select { case responses <- newErrorFcall(req.Tag, ErrDuptag): // Send to responses, bypass tag management. - case <-s.ctx.Done(): - return - case <-s.closed: - return + case <-c.ctx.Done(): + return c.ctx.Err() + case <-c.closed: + return c.err } continue } @@ -146,15 +111,15 @@ func (s *server) run() { select { case responses <- resp: // bypass tag management in completed. - case <-s.ctx.Done(): - return - case <-s.closed: - return + case <-c.ctx.Done(): + return c.ctx.Err() + case <-c.closed: + return c.err } default: // Allows us to session handlers to cancel processing of the fcall // through context. - ctx, cancel := context.WithCancel(s.ctx) + ctx, cancel := context.WithCancel(c.ctx) // The contents of these instances are only writable in the main // server loop. The value of tag will not change. @@ -166,7 +131,7 @@ func (s *server) run() { go func(ctx context.Context, req *Fcall) { var resp *Fcall - msg, err := s.handler.Handle(ctx, req.Message) + msg, err := c.handler.Handle(ctx, req.Message) if err != nil { // all handler errors are forwarded as protocol errors. resp = newErrorFcall(req.Tag, err) @@ -178,7 +143,7 @@ func (s *server) run() { case completed <- resp: case <-ctx.Done(): return - case <-s.closed: + case <-c.closed: return } }(ctx, req) @@ -200,12 +165,86 @@ func (s *server) run() { log.Println("canceled", resp, active.ctx.Err()) } delete(tags, resp.Tag) - case <-s.ctx.Done(): - log.Println("server: context done") + case <-c.ctx.Done(): + return c.ctx.Err() + case <-c.closed: + return c.err + } + } +} + +// read takes requests off the channel and sends them on requests. +func (c *conn) read(requests chan *Fcall) { + for { + req := new(Fcall) + if err := c.ch.ReadFcall(c.ctx, req); err != nil { + if err, ok := err.(net.Error); ok { + if err.Timeout() || err.Temporary() { + // TODO(stevvooe): A full idle timeout on the connection + // should be enforced here. No logging because it is quite + // chatty. + continue + } + } + + c.CloseWithError(fmt.Errorf("error reading fcall: %v", err)) return - case <-s.closed: - log.Println("server: shutdown") + } + + select { + case requests <- req: + case <-c.ctx.Done(): + c.CloseWithError(c.ctx.Err()) return + case <-c.closed: + return } } } + +func (c *conn) write(responses chan *Fcall) { + for { + select { + case resp := <-responses: + if err := c.ch.WriteFcall(c.ctx, resp); err != nil { + if err, ok := err.(net.Error); ok { + if err.Timeout() || err.Temporary() { + // TODO(stevvooe): A full idle timeout on the + // connection should be enforced here. We log here, + // since this is less common. + log.Println("9p server: temporary error writing fcall: %v", err) + continue + } + } + + c.CloseWithError(fmt.Errorf("error writing fcall: %v", err)) + return + } + case <-c.ctx.Done(): + c.CloseWithError(c.ctx.Err()) + return + case <-c.closed: + return + } + } +} + +func (c *conn) Close() error { + return c.CloseWithError(nil) +} + +func (c *conn) CloseWithError(err error) error { + select { + case <-c.closed: + return c.err + default: + close(c.closed) + if err == nil { + c.err = err + } else { + c.err = ErrClosed + } + + return c.err + } +} blob - ff2fd20cbe5bddfe06b2157dc145d0fc537287d0 blob + f6caee94546fa1288f44c7b465ed4da607141329 --- session.go +++ session.go @@ -1,11 +1,7 @@ package p9p -import ( - "net" +import "golang.org/x/net/context" - "golang.org/x/net/context" -) - // Session provides the central abstraction for a 9p connection. Clients // implement sessions and servers serve sessions. Sessions can be proxied by // serving up a client session. @@ -35,15 +31,3 @@ type Session interface { // session implementation. Version() (msize int, version string) } - -func Dial(ctx context.Context, addr string) (Session, error) { - c, err := net.Dial("tcp", addr) - if err != nil { - return nil, err - } - - // BUG(stevvooe): Session doesn't actually close connection. Dial might - // not be the right interface. - - return NewSession(ctx, c) -}