Commit Diff


commit - 78cab7f39e3b5a0a8afd0a2f90564dfd049423a2
commit + 641595f816e782a5b39e5453e7afeed70c617ae0
blob - 432b3265bbeade0eef0f877bda4e9ef8d03d30b0
blob + 12bcfc1e68545d3ba2e4b41805d564e7cdd5a5f3
--- cmd/9ps/main.go
+++ cmd/9ps/main.go
@@ -55,7 +55,9 @@ func main() {
 				return
 			}
 
-			p9p.Serve(ctx, conn, p9p.Dispatch(session))
+			if err := p9p.ServeConn(ctx, conn, p9p.Dispatch(session)); err != nil {
+				log.Printf("serving conn: %v", err)
+			}
 		}(c)
 	}
 }
blob - a26cd69172fb7be642c1163f04d269b242745a9a
blob + db99d90caebaa4a7090817bb8d7114815121e554
--- server.go
+++ server.go
@@ -1,6 +1,7 @@
 package p9p
 
 import (
+	"fmt"
 	"log"
 	"net"
 	"time"
@@ -8,16 +9,20 @@ import (
 	"golang.org/x/net/context"
 )
 
-// Serve the 9p session over the provided network connection.
-func Serve(ctx context.Context, conn net.Conn, handler Handler) {
+// TODO(stevvooe): Add net/http.Server-like type here to manage connections.
+// Coupled with Handler mux, we can get a very http-like experience for 9p
+// servers.
 
+// ServeConn the 9p handler over the provided network connection.
+func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error {
+
 	// TODO(stevvooe): It would be nice if the handler could declare the
 	// supported version. Before we had handler, we used the session to get
 	// the version (msize, version := session.Version()). We must decided if
 	// we want to proxy version and message size decisions all the back to the
 	// origin server or make those decisions at each link of a proxy chain.
 
-	ch := newChannel(conn, codec9p{}, DefaultMSize)
+	ch := newChannel(cn, codec9p{}, DefaultMSize)
 	negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
 	defer cancel()
 
@@ -27,29 +32,29 @@ func Serve(ctx context.Context, conn net.Conn, handler
 
 	if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
 		// TODO(stevvooe): Need better error handling and retry support here.
-		// For now, we silently ignore the failure.
-		log.Println("error negotiating version:", err)
-		return
+		return fmt.Errorf("error negotiating version:", err)
 	}
 
 	ctx = withVersion(ctx, DefaultVersion)
 
-	s := &server{
+	c := &conn{
 		ctx:     ctx,
 		ch:      ch,
 		handler: handler,
 		closed:  make(chan struct{}),
 	}
 
-	s.run()
+	return c.serve()
 }
 
-type server struct {
+// conn plays role of session dispatch for handler in a server.
+type conn struct {
 	ctx     context.Context
 	session Session
 	ch      Channel
 	handler Handler
 	closed  chan struct{}
+	err     error // terminal error for the conn
 }
 
 // activeRequest includes information about the active request.
@@ -59,7 +64,8 @@ type activeRequest struct {
 	cancel  context.CancelFunc
 }
 
-func (s *server) run() {
+// serve messages on the connection until an error is encountered.
+func (c *conn) serve() error {
 	tags := map[Tag]*activeRequest{} // active requests
 
 	requests := make(chan *Fcall) // sync, read-limited
@@ -67,50 +73,9 @@ func (s *server) run() {
 	completed := make(chan *Fcall, 1)
 
 	// read loop
-	go func() {
-		for {
-			req := new(Fcall)
-			if err := s.ch.ReadFcall(s.ctx, req); err != nil {
-				if err, ok := err.(net.Error); ok {
-					if err.Timeout() || err.Temporary() {
-						continue
-					}
-				}
+	go c.read(requests)
+	go c.write(responses)
 
-				log.Println("server: error reading fcall", err)
-				return
-			}
-
-			select {
-			case requests <- req:
-			case <-s.ctx.Done():
-				log.Println("server: context done")
-				return
-			case <-s.closed:
-				log.Println("server: shutdown")
-				return
-			}
-		}
-	}()
-
-	// write loop
-	go func() {
-		for {
-			select {
-			case resp := <-responses:
-				if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
-					log.Println("server: error writing fcall:", err)
-				}
-			case <-s.ctx.Done():
-				log.Println("server: context done")
-				return
-			case <-s.closed:
-				log.Println("server: shutdown")
-				return
-			}
-		}
-	}()
-
 	log.Println("server.run()")
 	for {
 		log.Println("server:", "wait")
@@ -121,10 +86,10 @@ func (s *server) run() {
 				select {
 				case responses <- newErrorFcall(req.Tag, ErrDuptag):
 					// Send to responses, bypass tag management.
-				case <-s.ctx.Done():
-					return
-				case <-s.closed:
-					return
+				case <-c.ctx.Done():
+					return c.ctx.Err()
+				case <-c.closed:
+					return c.err
 				}
 				continue
 			}
@@ -146,15 +111,15 @@ func (s *server) run() {
 				select {
 				case responses <- resp:
 					// bypass tag management in completed.
-				case <-s.ctx.Done():
-					return
-				case <-s.closed:
-					return
+				case <-c.ctx.Done():
+					return c.ctx.Err()
+				case <-c.closed:
+					return c.err
 				}
 			default:
 				// Allows us to session handlers to cancel processing of the fcall
 				// through context.
-				ctx, cancel := context.WithCancel(s.ctx)
+				ctx, cancel := context.WithCancel(c.ctx)
 
 				// The contents of these instances are only writable in the main
 				// server loop. The value of tag will not change.
@@ -166,7 +131,7 @@ func (s *server) run() {
 
 				go func(ctx context.Context, req *Fcall) {
 					var resp *Fcall
-					msg, err := s.handler.Handle(ctx, req.Message)
+					msg, err := c.handler.Handle(ctx, req.Message)
 					if err != nil {
 						// all handler errors are forwarded as protocol errors.
 						resp = newErrorFcall(req.Tag, err)
@@ -178,7 +143,7 @@ func (s *server) run() {
 					case completed <- resp:
 					case <-ctx.Done():
 						return
-					case <-s.closed:
+					case <-c.closed:
 						return
 					}
 				}(ctx, req)
@@ -200,12 +165,86 @@ func (s *server) run() {
 				log.Println("canceled", resp, active.ctx.Err())
 			}
 			delete(tags, resp.Tag)
-		case <-s.ctx.Done():
-			log.Println("server: context done")
+		case <-c.ctx.Done():
+			return c.ctx.Err()
+		case <-c.closed:
+			return c.err
+		}
+	}
+}
+
+// read takes requests off the channel and sends them on requests.
+func (c *conn) read(requests chan *Fcall) {
+	for {
+		req := new(Fcall)
+		if err := c.ch.ReadFcall(c.ctx, req); err != nil {
+			if err, ok := err.(net.Error); ok {
+				if err.Timeout() || err.Temporary() {
+					// TODO(stevvooe): A full idle timeout on the connection
+					// should be enforced here. No logging because it is quite
+					// chatty.
+					continue
+				}
+			}
+
+			c.CloseWithError(fmt.Errorf("error reading fcall: %v", err))
 			return
-		case <-s.closed:
-			log.Println("server: shutdown")
+		}
+
+		select {
+		case requests <- req:
+		case <-c.ctx.Done():
+			c.CloseWithError(c.ctx.Err())
 			return
+		case <-c.closed:
+			return
 		}
 	}
 }
+
+func (c *conn) write(responses chan *Fcall) {
+	for {
+		select {
+		case resp := <-responses:
+			if err := c.ch.WriteFcall(c.ctx, resp); err != nil {
+				if err, ok := err.(net.Error); ok {
+					if err.Timeout() || err.Temporary() {
+						// TODO(stevvooe): A full idle timeout on the
+						// connection should be enforced here. We log here,
+						// since this is less common.
+						log.Println("9p server: temporary error writing fcall: %v", err)
+						continue
+					}
+				}
+
+				c.CloseWithError(fmt.Errorf("error writing fcall: %v", err))
+				return
+			}
+		case <-c.ctx.Done():
+			c.CloseWithError(c.ctx.Err())
+			return
+		case <-c.closed:
+			return
+		}
+	}
+}
+
+func (c *conn) Close() error {
+	return c.CloseWithError(nil)
+}
+
+func (c *conn) CloseWithError(err error) error {
+	select {
+	case <-c.closed:
+		return c.err
+	default:
+		close(c.closed)
+		if err == nil {
+			c.err = err
+		} else {
+			c.err = ErrClosed
+		}
+
+		return c.err
+	}
+}
blob - ff2fd20cbe5bddfe06b2157dc145d0fc537287d0
blob + f6caee94546fa1288f44c7b465ed4da607141329
--- session.go
+++ session.go
@@ -1,11 +1,7 @@
 package p9p
 
-import (
-	"net"
+import "golang.org/x/net/context"
 
-	"golang.org/x/net/context"
-)
-
 // Session provides the central abstraction for a 9p connection. Clients
 // implement sessions and servers serve sessions. Sessions can be proxied by
 // serving up a client session.
@@ -35,15 +31,3 @@ type Session interface {
 	// session implementation.
 	Version() (msize int, version string)
 }
-
-func Dial(ctx context.Context, addr string) (Session, error) {
-	c, err := net.Dial("tcp", addr)
-	if err != nil {
-		return nil, err
-	}
-
-	// BUG(stevvooe): Session doesn't actually close connection. Dial might
-	// not be the right interface.
-
-	return NewSession(ctx, c)
-}