Commit Diff


commit - de4c554d8644b65ef777de9bc6b39639dbd7a51e
commit + 1f1a1b98065abdda04bc9128e4d8d8202a75bdd1
blob - 806f4d297f3ace65b8671cfc7eee19c46b83db41
blob + 0b067698e89dba3865f811c0b99ce4ed84462820
--- channel.go
+++ channel.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import (
 	"bufio"
@@ -148,7 +148,7 @@ func (ch *channel) ReadFcall(ctx context.Context, fcal
 	if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
 		return err
 	}
-	log.Println("channel: recv", fcall)
+
 	return nil
 }
 
@@ -160,7 +160,6 @@ func (ch *channel) WriteFcall(ctx context.Context, fca
 		return ErrClosed
 	default:
 	}
-	log.Println("channel: send", fcall)
 
 	deadline, ok := ctx.Deadline()
 	if !ok {
blob - 95fb256b47676759a4ae4face3f337bfea447540
blob + dce55d38b491766bba623e40556dbc31a5a04248
--- client.go
+++ client.go
@@ -1,9 +1,9 @@
-package p9pnew
+package p9p
 
 import (
-	"golang.org/x/net/context"
-
 	"net"
+
+	"golang.org/x/net/context"
 )
 
 type client struct {
blob - 665ea9208134d6d3b2fa184b6b9a6bc6a78681f0
blob + 8f2e0deaed185907e1805c611aba9efe3c93f476
--- cmd/9pr/main.go
+++ cmd/9pr/main.go
@@ -16,7 +16,7 @@ import (
 	"time"
 
 	"github.com/chzyer/readline"
-	"github.com/docker/pinata/v1/fs/p9p/new"
+	"github.com/docker/pinata/v1/pkg/p9p"
 	"golang.org/x/net/context"
 )
 
@@ -47,7 +47,7 @@ func main() {
 		log.Fatal(err)
 	}
 
-	csession, err := p9pnew.NewSession(ctx, conn)
+	csession, err := p9p.NewSession(ctx, conn)
 	if err != nil {
 		log.Fatalln(err)
 	}
@@ -86,7 +86,7 @@ func main() {
 
 	// attach root
 	commander.nextfid = 1
-	if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9pnew.NOFID, "anyone", "/"); err != nil {
+	if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9p.NOFID, "anyone", "/"); err != nil {
 		log.Fatalln(err)
 	}
 	commander.rootfid = commander.nextfid
@@ -135,7 +135,7 @@ func main() {
 
 		ctx, _ = context.WithTimeout(commander.ctx, 5*time.Second)
 		if err := cmd(ctx, args[1:]...); err != nil {
-			if err == p9pnew.ErrClosed {
+			if err == p9p.ErrClosed {
 				log.Println("connection closed, shutting down")
 				return
 			}
@@ -147,12 +147,12 @@ func main() {
 
 type fsCommander struct {
 	ctx     context.Context
-	session p9pnew.Session
+	session p9p.Session
 	pwd     string
-	pwdfid  p9pnew.Fid
-	rootfid p9pnew.Fid
+	pwdfid  p9p.Fid
+	rootfid p9p.Fid
 
-	nextfid p9pnew.Fid
+	nextfid p9p.Fid
 
 	readline *readline.Instance
 	stdout   io.Writer
@@ -185,7 +185,7 @@ func (c *fsCommander) cmdls(ctx context.Context, args 
 		}
 		defer c.session.Clunk(ctx, targetfid)
 
-		_, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
+		_, iounit, err := c.session.Open(ctx, targetfid, p9p.OREAD)
 		if err != nil {
 			return err
 		}
@@ -203,10 +203,10 @@ func (c *fsCommander) cmdls(ctx context.Context, args 
 		}
 
 		rd := bytes.NewReader(p[:n])
-		codec := p9pnew.NewCodec() // TODO(stevvooe): Need way to resolve codec based on session.
+		codec := p9p.NewCodec() // TODO(stevvooe): Need way to resolve codec based on session.
 		for {
-			var d p9pnew.Dir
-			if err := p9pnew.DecodeDir(codec, rd, &d); err != nil {
+			var d p9p.Dir
+			if err := p9p.DecodeDir(codec, rd, &d); err != nil {
 				if err == io.EOF {
 					break
 				}
@@ -316,7 +316,7 @@ func (c *fsCommander) cmdcat(ctx context.Context, args
 	}
 	defer c.session.Clunk(ctx, c.pwdfid)
 
-	_, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
+	_, iounit, err := c.session.Open(ctx, targetfid, p9p.OREAD)
 	if err != nil {
 		return err
 	}
blob - dcb1c5a59454fab0cdb3081c4c44e33a59cad080
blob + 12bcfc1e68545d3ba2e4b41805d564e7cdd5a5f3
--- cmd/9ps/main.go
+++ cmd/9ps/main.go
@@ -6,7 +6,7 @@ import (
 	"net"
 	"strings"
 
-	"github.com/docker/pinata/v1/fs/p9p/new"
+	"github.com/docker/pinata/v1/pkg/p9p"
 	"golang.org/x/net/context"
 )
 
@@ -55,14 +55,16 @@ func main() {
 				return
 			}
 
-			p9pnew.Serve(ctx, conn, p9pnew.Dispatch(session))
+			if err := p9p.ServeConn(ctx, conn, p9p.Dispatch(session)); err != nil {
+				log.Printf("serving conn: %v", err)
+			}
 		}(c)
 	}
 }
 
 // newLocalSession returns a session to serve the local filesystem, restricted
 // to the provided root.
-func newLocalSession(ctx context.Context, root string) (p9pnew.Session, error) {
+func newLocalSession(ctx context.Context, root string) (p9p.Session, error) {
 	// silly, just connect to ufs for now! replace this with real code later!
 	log.Println("dialing", ":5640", "for", ctx.Value("conn"))
 	conn, err := net.Dial("tcp", ":5640")
@@ -70,7 +72,7 @@ func newLocalSession(ctx context.Context, root string)
 		return nil, err
 	}
 
-	session, err := p9pnew.NewSession(ctx, conn)
+	session, err := p9p.NewSession(ctx, conn)
 	if err != nil {
 		return nil, err
 	}
blob - ca01956c2100164871d21be450d44c152bb88edb
blob + eeb8602d9fb3f99ab4c6e30db11507cead9be29b
--- context.go
+++ context.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import (
 	"golang.org/x/net/context"
blob - 312755fa68fec6abe0d05a6a2e4bb6f4e3118113
blob + 01396c7b3eb94755a94f2b216c9b8d763f557ee3
--- dispatcher.go
+++ dispatcher.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import "golang.org/x/net/context"
 
blob - d1a1644ae93485a44af0e16a36dd6a2de7467ed4
blob + 58438cf7c8fea79c5b83d935f47270e2c835ce0c
--- doc.go
+++ doc.go
@@ -1,5 +1,5 @@
 /*
-Package p9pnew implements a compliant 9P2000 client and server library for use
+Package p9p implements a compliant 9P2000 client and server library for use
 in modern, production Go services. This package differentiates itself in that
 is has departed from the plan 9 implementation primitives and better follows
 idiomatic Go style.
@@ -75,4 +75,4 @@ sawdust.
 In addition, the testing is embarassingly lacking. With time, we can get full
 testing going and ensure we have confidence in the implementation.
 */
-package p9pnew
+package p9p
blob - 98abc3492b799689a4048e7b16dc42365a661d27
blob + 41ddbf0319ea6be32598c3273d6880fe8cf3b4f1
--- encoding.go
+++ encoding.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import (
 	"bytes"
blob - 698be68e38a3cb680f70c490e783313e8f2398fe
blob + e55b866f5ed22f8ee8692f15ce3bf466609194c6
--- encoding_test.go
+++ encoding_test.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import (
 	"bytes"
blob - 6fefda0ea6d5e2adaad6f8bd9d3713aec20b09d1
blob + 32f3c9fea803b82dc760bfb18310ad2329778fa2
--- errors.go
+++ errors.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import (
 	"errors"
blob - c4b14e214cfc3b54492974e33ef924bb6b9fc5c1
blob + 56d43e0325a8ea98d24f9e076becdd5565f5e5e9
--- fcall.go
+++ fcall.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import "fmt"
 
@@ -84,7 +84,7 @@ func (fct FcallType) String() string {
 	case Rclunk:
 		return "Rclunk"
 	case Tremove:
-		return "Tremote"
+		return "Tremove"
 	case Rremove:
 		return "Rremove"
 	case Tstat:
blob - f1bae724b08d7812eafd714ad375fcaa413085ed
blob + d65f2687bf351dfe12c28afbe2768b858dcde720
--- logging.go
+++ logging.go
@@ -1,6 +1,6 @@
 // +build ignore
 
-package p9pnew
+package p9p
 
 import (
 	"log"
blob - 144af5861daa0b6ce00ae015d1b6c9d47611dfdf
blob + 3c2c4711b3c2fafb84ca6fb3db1b85f3ad34afaa
--- messages.go
+++ messages.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import "fmt"
 
blob - 58d498921fbdef1f84d000e53549cacbb4624061
blob + 24d78b3be3d6009581e39ed6283df08d6e6dd118
--- server.go
+++ server.go
@@ -1,6 +1,7 @@
-package p9pnew
+package p9p
 
 import (
+	"fmt"
 	"log"
 	"net"
 	"time"
@@ -8,16 +9,20 @@ import (
 	"golang.org/x/net/context"
 )
 
-// Serve the 9p session over the provided network connection.
-func Serve(ctx context.Context, conn net.Conn, handler Handler) {
+// TODO(stevvooe): Add net/http.Server-like type here to manage connections.
+// Coupled with Handler mux, we can get a very http-like experience for 9p
+// servers.
 
+// ServeConn the 9p handler over the provided network connection.
+func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error {
+
 	// TODO(stevvooe): It would be nice if the handler could declare the
 	// supported version. Before we had handler, we used the session to get
 	// the version (msize, version := session.Version()). We must decided if
 	// we want to proxy version and message size decisions all the back to the
 	// origin server or make those decisions at each link of a proxy chain.
 
-	ch := newChannel(conn, codec9p{}, DefaultMSize)
+	ch := newChannel(cn, codec9p{}, DefaultMSize)
 	negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
 	defer cancel()
 
@@ -27,29 +32,29 @@ func Serve(ctx context.Context, conn net.Conn, handler
 
 	if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
 		// TODO(stevvooe): Need better error handling and retry support here.
-		// For now, we silently ignore the failure.
-		log.Println("error negotiating version:", err)
-		return
+		return fmt.Errorf("error negotiating version:", err)
 	}
 
 	ctx = withVersion(ctx, DefaultVersion)
 
-	s := &server{
+	c := &conn{
 		ctx:     ctx,
 		ch:      ch,
 		handler: handler,
 		closed:  make(chan struct{}),
 	}
 
-	s.run()
+	return c.serve()
 }
 
-type server struct {
+// conn plays role of session dispatch for handler in a server.
+type conn struct {
 	ctx     context.Context
 	session Session
 	ch      Channel
 	handler Handler
 	closed  chan struct{}
+	err     error // terminal error for the conn
 }
 
 // activeRequest includes information about the active request.
@@ -59,7 +64,8 @@ type activeRequest struct {
 	cancel  context.CancelFunc
 }
 
-func (s *server) run() {
+// serve messages on the connection until an error is encountered.
+func (c *conn) serve() error {
 	tags := map[Tag]*activeRequest{} // active requests
 
 	requests := make(chan *Fcall) // sync, read-limited
@@ -67,64 +73,21 @@ func (s *server) run() {
 	completed := make(chan *Fcall, 1)
 
 	// read loop
-	go func() {
-		for {
-			req := new(Fcall)
-			if err := s.ch.ReadFcall(s.ctx, req); err != nil {
-				if err, ok := err.(net.Error); ok {
-					if err.Timeout() || err.Temporary() {
-						continue
-					}
-				}
+	go c.read(requests)
+	go c.write(responses)
 
-				log.Println("server: error reading fcall", err)
-				return
-			}
-
-			select {
-			case requests <- req:
-			case <-s.ctx.Done():
-				log.Println("server: context done")
-				return
-			case <-s.closed:
-				log.Println("server: shutdown")
-				return
-			}
-		}
-	}()
-
-	// write loop
-	go func() {
-		for {
-			select {
-			case resp := <-responses:
-				if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
-					log.Println("server: error writing fcall:", err)
-				}
-			case <-s.ctx.Done():
-				log.Println("server: context done")
-				return
-			case <-s.closed:
-				log.Println("server: shutdown")
-				return
-			}
-		}
-	}()
-
 	log.Println("server.run()")
 	for {
-		log.Println("server:", "wait")
 		select {
 		case req := <-requests:
-			log.Println("request", req)
 			if _, ok := tags[req.Tag]; ok {
 				select {
 				case responses <- newErrorFcall(req.Tag, ErrDuptag):
 					// Send to responses, bypass tag management.
-				case <-s.ctx.Done():
-					return
-				case <-s.closed:
-					return
+				case <-c.ctx.Done():
+					return c.ctx.Err()
+				case <-c.closed:
+					return c.err
 				}
 				continue
 			}
@@ -146,15 +109,15 @@ func (s *server) run() {
 				select {
 				case responses <- resp:
 					// bypass tag management in completed.
-				case <-s.ctx.Done():
-					return
-				case <-s.closed:
-					return
+				case <-c.ctx.Done():
+					return c.ctx.Err()
+				case <-c.closed:
+					return c.err
 				}
 			default:
 				// Allows us to session handlers to cancel processing of the fcall
 				// through context.
-				ctx, cancel := context.WithCancel(s.ctx)
+				ctx, cancel := context.WithCancel(c.ctx)
 
 				// The contents of these instances are only writable in the main
 				// server loop. The value of tag will not change.
@@ -166,7 +129,7 @@ func (s *server) run() {
 
 				go func(ctx context.Context, req *Fcall) {
 					var resp *Fcall
-					msg, err := s.handler.Handle(ctx, req.Message)
+					msg, err := c.handler.Handle(ctx, req.Message)
 					if err != nil {
 						// all handler errors are forwarded as protocol errors.
 						resp = newErrorFcall(req.Tag, err)
@@ -178,13 +141,12 @@ func (s *server) run() {
 					case completed <- resp:
 					case <-ctx.Done():
 						return
-					case <-s.closed:
+					case <-c.closed:
 						return
 					}
 				}(ctx, req)
 			}
 		case resp := <-completed:
-			log.Println("completed", resp)
 			// only responses that flip the tag state traverse this section.
 			active, ok := tags[resp.Tag]
 			if !ok {
@@ -200,12 +162,86 @@ func (s *server) run() {
 				log.Println("canceled", resp, active.ctx.Err())
 			}
 			delete(tags, resp.Tag)
-		case <-s.ctx.Done():
-			log.Println("server: context done")
+		case <-c.ctx.Done():
+			return c.ctx.Err()
+		case <-c.closed:
+			return c.err
+		}
+	}
+}
+
+// read takes requests off the channel and sends them on requests.
+func (c *conn) read(requests chan *Fcall) {
+	for {
+		req := new(Fcall)
+		if err := c.ch.ReadFcall(c.ctx, req); err != nil {
+			if err, ok := err.(net.Error); ok {
+				if err.Timeout() || err.Temporary() {
+					// TODO(stevvooe): A full idle timeout on the connection
+					// should be enforced here. No logging because it is quite
+					// chatty.
+					continue
+				}
+			}
+
+			c.CloseWithError(fmt.Errorf("error reading fcall: %v", err))
 			return
-		case <-s.closed:
-			log.Println("server: shutdown")
+		}
+
+		select {
+		case requests <- req:
+		case <-c.ctx.Done():
+			c.CloseWithError(c.ctx.Err())
 			return
+		case <-c.closed:
+			return
 		}
 	}
 }
+
+func (c *conn) write(responses chan *Fcall) {
+	for {
+		select {
+		case resp := <-responses:
+			if err := c.ch.WriteFcall(c.ctx, resp); err != nil {
+				if err, ok := err.(net.Error); ok {
+					if err.Timeout() || err.Temporary() {
+						// TODO(stevvooe): A full idle timeout on the
+						// connection should be enforced here. We log here,
+						// since this is less common.
+						log.Println("9p server: temporary error writing fcall: %v", err)
+						continue
+					}
+				}
+
+				c.CloseWithError(fmt.Errorf("error writing fcall: %v", err))
+				return
+			}
+		case <-c.ctx.Done():
+			c.CloseWithError(c.ctx.Err())
+			return
+		case <-c.closed:
+			return
+		}
+	}
+}
+
+func (c *conn) Close() error {
+	return c.CloseWithError(nil)
+}
+
+func (c *conn) CloseWithError(err error) error {
+	select {
+	case <-c.closed:
+		return c.err
+	default:
+		close(c.closed)
+		if err == nil {
+			c.err = err
+		} else {
+			c.err = ErrClosed
+		}
+
+		return c.err
+	}
+}
blob - 8ebab5c10ad4d427357e5b72d158ec5cd4f012bf
blob + f6caee94546fa1288f44c7b465ed4da607141329
--- session.go
+++ session.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import "golang.org/x/net/context"
 
blob - 8d93c7c601d73f26b6390e4039775b6e873ad029
blob + 4bbb78b65e3074494cf8d0fb6e53d1839f628936
--- transport.go
+++ transport.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import (
 	"fmt"
blob - d909511b3e7ecd51d7f33988af702ac7245590ef
blob + 5fe03c6db69fbdaf081004818370c0302cf4145b
--- types.go
+++ types.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import (
 	"fmt"
blob - 4c6712bcc7939d9947815e2a15bec59f9f868604
blob + ac20fb9dfd45e42713513aee43a5ad975d41c0cc
--- version.go
+++ version.go
@@ -1,4 +1,4 @@
-package p9pnew
+package p9p
 
 import (
 	"fmt"