commit 1f1a1b98065abdda04bc9128e4d8d8202a75bdd1 from: Stephen J Day date: Mon Nov 30 22:09:09 2015 UTC Merge branch 'pinata-pkg-filtered' into pinata-filtered Signed-off-by: Stephen J Day commit - de4c554d8644b65ef777de9bc6b39639dbd7a51e commit + 1f1a1b98065abdda04bc9128e4d8d8202a75bdd1 blob - 806f4d297f3ace65b8671cfc7eee19c46b83db41 blob + 0b067698e89dba3865f811c0b99ce4ed84462820 --- channel.go +++ channel.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import ( "bufio" @@ -148,7 +148,7 @@ func (ch *channel) ReadFcall(ctx context.Context, fcal if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil { return err } - log.Println("channel: recv", fcall) + return nil } @@ -160,7 +160,6 @@ func (ch *channel) WriteFcall(ctx context.Context, fca return ErrClosed default: } - log.Println("channel: send", fcall) deadline, ok := ctx.Deadline() if !ok { blob - 95fb256b47676759a4ae4face3f337bfea447540 blob + dce55d38b491766bba623e40556dbc31a5a04248 --- client.go +++ client.go @@ -1,9 +1,9 @@ -package p9pnew +package p9p import ( - "golang.org/x/net/context" - "net" + + "golang.org/x/net/context" ) type client struct { blob - 665ea9208134d6d3b2fa184b6b9a6bc6a78681f0 blob + 8f2e0deaed185907e1805c611aba9efe3c93f476 --- cmd/9pr/main.go +++ cmd/9pr/main.go @@ -16,7 +16,7 @@ import ( "time" "github.com/chzyer/readline" - "github.com/docker/pinata/v1/fs/p9p/new" + "github.com/docker/pinata/v1/pkg/p9p" "golang.org/x/net/context" ) @@ -47,7 +47,7 @@ func main() { log.Fatal(err) } - csession, err := p9pnew.NewSession(ctx, conn) + csession, err := p9p.NewSession(ctx, conn) if err != nil { log.Fatalln(err) } @@ -86,7 +86,7 @@ func main() { // attach root commander.nextfid = 1 - if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9pnew.NOFID, "anyone", "/"); err != nil { + if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9p.NOFID, "anyone", "/"); err != nil { log.Fatalln(err) } commander.rootfid = commander.nextfid @@ -135,7 +135,7 @@ func main() { ctx, _ = context.WithTimeout(commander.ctx, 5*time.Second) if err := cmd(ctx, args[1:]...); err != nil { - if err == p9pnew.ErrClosed { + if err == p9p.ErrClosed { log.Println("connection closed, shutting down") return } @@ -147,12 +147,12 @@ func main() { type fsCommander struct { ctx context.Context - session p9pnew.Session + session p9p.Session pwd string - pwdfid p9pnew.Fid - rootfid p9pnew.Fid + pwdfid p9p.Fid + rootfid p9p.Fid - nextfid p9pnew.Fid + nextfid p9p.Fid readline *readline.Instance stdout io.Writer @@ -185,7 +185,7 @@ func (c *fsCommander) cmdls(ctx context.Context, args } defer c.session.Clunk(ctx, targetfid) - _, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD) + _, iounit, err := c.session.Open(ctx, targetfid, p9p.OREAD) if err != nil { return err } @@ -203,10 +203,10 @@ func (c *fsCommander) cmdls(ctx context.Context, args } rd := bytes.NewReader(p[:n]) - codec := p9pnew.NewCodec() // TODO(stevvooe): Need way to resolve codec based on session. + codec := p9p.NewCodec() // TODO(stevvooe): Need way to resolve codec based on session. for { - var d p9pnew.Dir - if err := p9pnew.DecodeDir(codec, rd, &d); err != nil { + var d p9p.Dir + if err := p9p.DecodeDir(codec, rd, &d); err != nil { if err == io.EOF { break } @@ -316,7 +316,7 @@ func (c *fsCommander) cmdcat(ctx context.Context, args } defer c.session.Clunk(ctx, c.pwdfid) - _, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD) + _, iounit, err := c.session.Open(ctx, targetfid, p9p.OREAD) if err != nil { return err } blob - dcb1c5a59454fab0cdb3081c4c44e33a59cad080 blob + 12bcfc1e68545d3ba2e4b41805d564e7cdd5a5f3 --- cmd/9ps/main.go +++ cmd/9ps/main.go @@ -6,7 +6,7 @@ import ( "net" "strings" - "github.com/docker/pinata/v1/fs/p9p/new" + "github.com/docker/pinata/v1/pkg/p9p" "golang.org/x/net/context" ) @@ -55,14 +55,16 @@ func main() { return } - p9pnew.Serve(ctx, conn, p9pnew.Dispatch(session)) + if err := p9p.ServeConn(ctx, conn, p9p.Dispatch(session)); err != nil { + log.Printf("serving conn: %v", err) + } }(c) } } // newLocalSession returns a session to serve the local filesystem, restricted // to the provided root. -func newLocalSession(ctx context.Context, root string) (p9pnew.Session, error) { +func newLocalSession(ctx context.Context, root string) (p9p.Session, error) { // silly, just connect to ufs for now! replace this with real code later! log.Println("dialing", ":5640", "for", ctx.Value("conn")) conn, err := net.Dial("tcp", ":5640") @@ -70,7 +72,7 @@ func newLocalSession(ctx context.Context, root string) return nil, err } - session, err := p9pnew.NewSession(ctx, conn) + session, err := p9p.NewSession(ctx, conn) if err != nil { return nil, err } blob - ca01956c2100164871d21be450d44c152bb88edb blob + eeb8602d9fb3f99ab4c6e30db11507cead9be29b --- context.go +++ context.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import ( "golang.org/x/net/context" blob - 312755fa68fec6abe0d05a6a2e4bb6f4e3118113 blob + 01396c7b3eb94755a94f2b216c9b8d763f557ee3 --- dispatcher.go +++ dispatcher.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import "golang.org/x/net/context" blob - d1a1644ae93485a44af0e16a36dd6a2de7467ed4 blob + 58438cf7c8fea79c5b83d935f47270e2c835ce0c --- doc.go +++ doc.go @@ -1,5 +1,5 @@ /* -Package p9pnew implements a compliant 9P2000 client and server library for use +Package p9p implements a compliant 9P2000 client and server library for use in modern, production Go services. This package differentiates itself in that is has departed from the plan 9 implementation primitives and better follows idiomatic Go style. @@ -75,4 +75,4 @@ sawdust. In addition, the testing is embarassingly lacking. With time, we can get full testing going and ensure we have confidence in the implementation. */ -package p9pnew +package p9p blob - 98abc3492b799689a4048e7b16dc42365a661d27 blob + 41ddbf0319ea6be32598c3273d6880fe8cf3b4f1 --- encoding.go +++ encoding.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import ( "bytes" blob - 698be68e38a3cb680f70c490e783313e8f2398fe blob + e55b866f5ed22f8ee8692f15ce3bf466609194c6 --- encoding_test.go +++ encoding_test.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import ( "bytes" blob - 6fefda0ea6d5e2adaad6f8bd9d3713aec20b09d1 blob + 32f3c9fea803b82dc760bfb18310ad2329778fa2 --- errors.go +++ errors.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import ( "errors" blob - c4b14e214cfc3b54492974e33ef924bb6b9fc5c1 blob + 56d43e0325a8ea98d24f9e076becdd5565f5e5e9 --- fcall.go +++ fcall.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import "fmt" @@ -84,7 +84,7 @@ func (fct FcallType) String() string { case Rclunk: return "Rclunk" case Tremove: - return "Tremote" + return "Tremove" case Rremove: return "Rremove" case Tstat: blob - f1bae724b08d7812eafd714ad375fcaa413085ed blob + d65f2687bf351dfe12c28afbe2768b858dcde720 --- logging.go +++ logging.go @@ -1,6 +1,6 @@ // +build ignore -package p9pnew +package p9p import ( "log" blob - 144af5861daa0b6ce00ae015d1b6c9d47611dfdf blob + 3c2c4711b3c2fafb84ca6fb3db1b85f3ad34afaa --- messages.go +++ messages.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import "fmt" blob - 58d498921fbdef1f84d000e53549cacbb4624061 blob + 24d78b3be3d6009581e39ed6283df08d6e6dd118 --- server.go +++ server.go @@ -1,6 +1,7 @@ -package p9pnew +package p9p import ( + "fmt" "log" "net" "time" @@ -8,16 +9,20 @@ import ( "golang.org/x/net/context" ) -// Serve the 9p session over the provided network connection. -func Serve(ctx context.Context, conn net.Conn, handler Handler) { +// TODO(stevvooe): Add net/http.Server-like type here to manage connections. +// Coupled with Handler mux, we can get a very http-like experience for 9p +// servers. +// ServeConn the 9p handler over the provided network connection. +func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error { + // TODO(stevvooe): It would be nice if the handler could declare the // supported version. Before we had handler, we used the session to get // the version (msize, version := session.Version()). We must decided if // we want to proxy version and message size decisions all the back to the // origin server or make those decisions at each link of a proxy chain. - ch := newChannel(conn, codec9p{}, DefaultMSize) + ch := newChannel(cn, codec9p{}, DefaultMSize) negctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() @@ -27,29 +32,29 @@ func Serve(ctx context.Context, conn net.Conn, handler if err := servernegotiate(negctx, ch, DefaultVersion); err != nil { // TODO(stevvooe): Need better error handling and retry support here. - // For now, we silently ignore the failure. - log.Println("error negotiating version:", err) - return + return fmt.Errorf("error negotiating version:", err) } ctx = withVersion(ctx, DefaultVersion) - s := &server{ + c := &conn{ ctx: ctx, ch: ch, handler: handler, closed: make(chan struct{}), } - s.run() + return c.serve() } -type server struct { +// conn plays role of session dispatch for handler in a server. +type conn struct { ctx context.Context session Session ch Channel handler Handler closed chan struct{} + err error // terminal error for the conn } // activeRequest includes information about the active request. @@ -59,7 +64,8 @@ type activeRequest struct { cancel context.CancelFunc } -func (s *server) run() { +// serve messages on the connection until an error is encountered. +func (c *conn) serve() error { tags := map[Tag]*activeRequest{} // active requests requests := make(chan *Fcall) // sync, read-limited @@ -67,64 +73,21 @@ func (s *server) run() { completed := make(chan *Fcall, 1) // read loop - go func() { - for { - req := new(Fcall) - if err := s.ch.ReadFcall(s.ctx, req); err != nil { - if err, ok := err.(net.Error); ok { - if err.Timeout() || err.Temporary() { - continue - } - } + go c.read(requests) + go c.write(responses) - log.Println("server: error reading fcall", err) - return - } - - select { - case requests <- req: - case <-s.ctx.Done(): - log.Println("server: context done") - return - case <-s.closed: - log.Println("server: shutdown") - return - } - } - }() - - // write loop - go func() { - for { - select { - case resp := <-responses: - if err := s.ch.WriteFcall(s.ctx, resp); err != nil { - log.Println("server: error writing fcall:", err) - } - case <-s.ctx.Done(): - log.Println("server: context done") - return - case <-s.closed: - log.Println("server: shutdown") - return - } - } - }() - log.Println("server.run()") for { - log.Println("server:", "wait") select { case req := <-requests: - log.Println("request", req) if _, ok := tags[req.Tag]; ok { select { case responses <- newErrorFcall(req.Tag, ErrDuptag): // Send to responses, bypass tag management. - case <-s.ctx.Done(): - return - case <-s.closed: - return + case <-c.ctx.Done(): + return c.ctx.Err() + case <-c.closed: + return c.err } continue } @@ -146,15 +109,15 @@ func (s *server) run() { select { case responses <- resp: // bypass tag management in completed. - case <-s.ctx.Done(): - return - case <-s.closed: - return + case <-c.ctx.Done(): + return c.ctx.Err() + case <-c.closed: + return c.err } default: // Allows us to session handlers to cancel processing of the fcall // through context. - ctx, cancel := context.WithCancel(s.ctx) + ctx, cancel := context.WithCancel(c.ctx) // The contents of these instances are only writable in the main // server loop. The value of tag will not change. @@ -166,7 +129,7 @@ func (s *server) run() { go func(ctx context.Context, req *Fcall) { var resp *Fcall - msg, err := s.handler.Handle(ctx, req.Message) + msg, err := c.handler.Handle(ctx, req.Message) if err != nil { // all handler errors are forwarded as protocol errors. resp = newErrorFcall(req.Tag, err) @@ -178,13 +141,12 @@ func (s *server) run() { case completed <- resp: case <-ctx.Done(): return - case <-s.closed: + case <-c.closed: return } }(ctx, req) } case resp := <-completed: - log.Println("completed", resp) // only responses that flip the tag state traverse this section. active, ok := tags[resp.Tag] if !ok { @@ -200,12 +162,86 @@ func (s *server) run() { log.Println("canceled", resp, active.ctx.Err()) } delete(tags, resp.Tag) - case <-s.ctx.Done(): - log.Println("server: context done") + case <-c.ctx.Done(): + return c.ctx.Err() + case <-c.closed: + return c.err + } + } +} + +// read takes requests off the channel and sends them on requests. +func (c *conn) read(requests chan *Fcall) { + for { + req := new(Fcall) + if err := c.ch.ReadFcall(c.ctx, req); err != nil { + if err, ok := err.(net.Error); ok { + if err.Timeout() || err.Temporary() { + // TODO(stevvooe): A full idle timeout on the connection + // should be enforced here. No logging because it is quite + // chatty. + continue + } + } + + c.CloseWithError(fmt.Errorf("error reading fcall: %v", err)) return - case <-s.closed: - log.Println("server: shutdown") + } + + select { + case requests <- req: + case <-c.ctx.Done(): + c.CloseWithError(c.ctx.Err()) return + case <-c.closed: + return } } } + +func (c *conn) write(responses chan *Fcall) { + for { + select { + case resp := <-responses: + if err := c.ch.WriteFcall(c.ctx, resp); err != nil { + if err, ok := err.(net.Error); ok { + if err.Timeout() || err.Temporary() { + // TODO(stevvooe): A full idle timeout on the + // connection should be enforced here. We log here, + // since this is less common. + log.Println("9p server: temporary error writing fcall: %v", err) + continue + } + } + + c.CloseWithError(fmt.Errorf("error writing fcall: %v", err)) + return + } + case <-c.ctx.Done(): + c.CloseWithError(c.ctx.Err()) + return + case <-c.closed: + return + } + } +} + +func (c *conn) Close() error { + return c.CloseWithError(nil) +} + +func (c *conn) CloseWithError(err error) error { + select { + case <-c.closed: + return c.err + default: + close(c.closed) + if err == nil { + c.err = err + } else { + c.err = ErrClosed + } + + return c.err + } +} blob - 8ebab5c10ad4d427357e5b72d158ec5cd4f012bf blob + f6caee94546fa1288f44c7b465ed4da607141329 --- session.go +++ session.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import "golang.org/x/net/context" blob - 8d93c7c601d73f26b6390e4039775b6e873ad029 blob + 4bbb78b65e3074494cf8d0fb6e53d1839f628936 --- transport.go +++ transport.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import ( "fmt" blob - d909511b3e7ecd51d7f33988af702ac7245590ef blob + 5fe03c6db69fbdaf081004818370c0302cf4145b --- types.go +++ types.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import ( "fmt" blob - 4c6712bcc7939d9947815e2a15bec59f9f868604 blob + ac20fb9dfd45e42713513aee43a5ad975d41c0cc --- version.go +++ version.go @@ -1,4 +1,4 @@ -package p9pnew +package p9p import ( "fmt"