commit - de4c554d8644b65ef777de9bc6b39639dbd7a51e
commit + 1f1a1b98065abdda04bc9128e4d8d8202a75bdd1
blob - 806f4d297f3ace65b8671cfc7eee19c46b83db41
blob + 0b067698e89dba3865f811c0b99ce4ed84462820
--- channel.go
+++ channel.go
-package p9pnew
+package p9p
import (
"bufio"
if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
return err
}
- log.Println("channel: recv", fcall)
+
return nil
}
return ErrClosed
default:
}
- log.Println("channel: send", fcall)
deadline, ok := ctx.Deadline()
if !ok {
blob - 95fb256b47676759a4ae4face3f337bfea447540
blob + dce55d38b491766bba623e40556dbc31a5a04248
--- client.go
+++ client.go
-package p9pnew
+package p9p
import (
- "golang.org/x/net/context"
-
"net"
+
+ "golang.org/x/net/context"
)
type client struct {
blob - 665ea9208134d6d3b2fa184b6b9a6bc6a78681f0
blob + 8f2e0deaed185907e1805c611aba9efe3c93f476
--- cmd/9pr/main.go
+++ cmd/9pr/main.go
"time"
"github.com/chzyer/readline"
- "github.com/docker/pinata/v1/fs/p9p/new"
+ "github.com/docker/pinata/v1/pkg/p9p"
"golang.org/x/net/context"
)
log.Fatal(err)
}
- csession, err := p9pnew.NewSession(ctx, conn)
+ csession, err := p9p.NewSession(ctx, conn)
if err != nil {
log.Fatalln(err)
}
// attach root
commander.nextfid = 1
- if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9pnew.NOFID, "anyone", "/"); err != nil {
+ if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9p.NOFID, "anyone", "/"); err != nil {
log.Fatalln(err)
}
commander.rootfid = commander.nextfid
ctx, _ = context.WithTimeout(commander.ctx, 5*time.Second)
if err := cmd(ctx, args[1:]...); err != nil {
- if err == p9pnew.ErrClosed {
+ if err == p9p.ErrClosed {
log.Println("connection closed, shutting down")
return
}
type fsCommander struct {
ctx context.Context
- session p9pnew.Session
+ session p9p.Session
pwd string
- pwdfid p9pnew.Fid
- rootfid p9pnew.Fid
+ pwdfid p9p.Fid
+ rootfid p9p.Fid
- nextfid p9pnew.Fid
+ nextfid p9p.Fid
readline *readline.Instance
stdout io.Writer
}
defer c.session.Clunk(ctx, targetfid)
- _, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
+ _, iounit, err := c.session.Open(ctx, targetfid, p9p.OREAD)
if err != nil {
return err
}
}
rd := bytes.NewReader(p[:n])
- codec := p9pnew.NewCodec() // TODO(stevvooe): Need way to resolve codec based on session.
+ codec := p9p.NewCodec() // TODO(stevvooe): Need way to resolve codec based on session.
for {
- var d p9pnew.Dir
- if err := p9pnew.DecodeDir(codec, rd, &d); err != nil {
+ var d p9p.Dir
+ if err := p9p.DecodeDir(codec, rd, &d); err != nil {
if err == io.EOF {
break
}
}
defer c.session.Clunk(ctx, c.pwdfid)
- _, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
+ _, iounit, err := c.session.Open(ctx, targetfid, p9p.OREAD)
if err != nil {
return err
}
blob - dcb1c5a59454fab0cdb3081c4c44e33a59cad080
blob + 12bcfc1e68545d3ba2e4b41805d564e7cdd5a5f3
--- cmd/9ps/main.go
+++ cmd/9ps/main.go
"net"
"strings"
- "github.com/docker/pinata/v1/fs/p9p/new"
+ "github.com/docker/pinata/v1/pkg/p9p"
"golang.org/x/net/context"
)
return
}
- p9pnew.Serve(ctx, conn, p9pnew.Dispatch(session))
+ if err := p9p.ServeConn(ctx, conn, p9p.Dispatch(session)); err != nil {
+ log.Printf("serving conn: %v", err)
+ }
}(c)
}
}
// newLocalSession returns a session to serve the local filesystem, restricted
// to the provided root.
-func newLocalSession(ctx context.Context, root string) (p9pnew.Session, error) {
+func newLocalSession(ctx context.Context, root string) (p9p.Session, error) {
// silly, just connect to ufs for now! replace this with real code later!
log.Println("dialing", ":5640", "for", ctx.Value("conn"))
conn, err := net.Dial("tcp", ":5640")
return nil, err
}
- session, err := p9pnew.NewSession(ctx, conn)
+ session, err := p9p.NewSession(ctx, conn)
if err != nil {
return nil, err
}
blob - ca01956c2100164871d21be450d44c152bb88edb
blob + eeb8602d9fb3f99ab4c6e30db11507cead9be29b
--- context.go
+++ context.go
-package p9pnew
+package p9p
import (
"golang.org/x/net/context"
blob - 312755fa68fec6abe0d05a6a2e4bb6f4e3118113
blob + 01396c7b3eb94755a94f2b216c9b8d763f557ee3
--- dispatcher.go
+++ dispatcher.go
-package p9pnew
+package p9p
import "golang.org/x/net/context"
blob - d1a1644ae93485a44af0e16a36dd6a2de7467ed4
blob + 58438cf7c8fea79c5b83d935f47270e2c835ce0c
--- doc.go
+++ doc.go
/*
-Package p9pnew implements a compliant 9P2000 client and server library for use
+Package p9p implements a compliant 9P2000 client and server library for use
in modern, production Go services. This package differentiates itself in that
is has departed from the plan 9 implementation primitives and better follows
idiomatic Go style.
In addition, the testing is embarassingly lacking. With time, we can get full
testing going and ensure we have confidence in the implementation.
*/
-package p9pnew
+package p9p
blob - 98abc3492b799689a4048e7b16dc42365a661d27
blob + 41ddbf0319ea6be32598c3273d6880fe8cf3b4f1
--- encoding.go
+++ encoding.go
-package p9pnew
+package p9p
import (
"bytes"
blob - 698be68e38a3cb680f70c490e783313e8f2398fe
blob + e55b866f5ed22f8ee8692f15ce3bf466609194c6
--- encoding_test.go
+++ encoding_test.go
-package p9pnew
+package p9p
import (
"bytes"
blob - 6fefda0ea6d5e2adaad6f8bd9d3713aec20b09d1
blob + 32f3c9fea803b82dc760bfb18310ad2329778fa2
--- errors.go
+++ errors.go
-package p9pnew
+package p9p
import (
"errors"
blob - c4b14e214cfc3b54492974e33ef924bb6b9fc5c1
blob + 56d43e0325a8ea98d24f9e076becdd5565f5e5e9
--- fcall.go
+++ fcall.go
-package p9pnew
+package p9p
import "fmt"
case Rclunk:
return "Rclunk"
case Tremove:
- return "Tremote"
+ return "Tremove"
case Rremove:
return "Rremove"
case Tstat:
blob - f1bae724b08d7812eafd714ad375fcaa413085ed
blob + d65f2687bf351dfe12c28afbe2768b858dcde720
--- logging.go
+++ logging.go
// +build ignore
-package p9pnew
+package p9p
import (
"log"
blob - 144af5861daa0b6ce00ae015d1b6c9d47611dfdf
blob + 3c2c4711b3c2fafb84ca6fb3db1b85f3ad34afaa
--- messages.go
+++ messages.go
-package p9pnew
+package p9p
import "fmt"
blob - 58d498921fbdef1f84d000e53549cacbb4624061
blob + 24d78b3be3d6009581e39ed6283df08d6e6dd118
--- server.go
+++ server.go
-package p9pnew
+package p9p
import (
+ "fmt"
"log"
"net"
"time"
"golang.org/x/net/context"
)
-// Serve the 9p session over the provided network connection.
-func Serve(ctx context.Context, conn net.Conn, handler Handler) {
+// TODO(stevvooe): Add net/http.Server-like type here to manage connections.
+// Coupled with Handler mux, we can get a very http-like experience for 9p
+// servers.
+// ServeConn the 9p handler over the provided network connection.
+func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error {
+
// TODO(stevvooe): It would be nice if the handler could declare the
// supported version. Before we had handler, we used the session to get
// the version (msize, version := session.Version()). We must decided if
// we want to proxy version and message size decisions all the back to the
// origin server or make those decisions at each link of a proxy chain.
- ch := newChannel(conn, codec9p{}, DefaultMSize)
+ ch := newChannel(cn, codec9p{}, DefaultMSize)
negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
// TODO(stevvooe): Need better error handling and retry support here.
- // For now, we silently ignore the failure.
- log.Println("error negotiating version:", err)
- return
+ return fmt.Errorf("error negotiating version:", err)
}
ctx = withVersion(ctx, DefaultVersion)
- s := &server{
+ c := &conn{
ctx: ctx,
ch: ch,
handler: handler,
closed: make(chan struct{}),
}
- s.run()
+ return c.serve()
}
-type server struct {
+// conn plays role of session dispatch for handler in a server.
+type conn struct {
ctx context.Context
session Session
ch Channel
handler Handler
closed chan struct{}
+ err error // terminal error for the conn
}
// activeRequest includes information about the active request.
cancel context.CancelFunc
}
-func (s *server) run() {
+// serve messages on the connection until an error is encountered.
+func (c *conn) serve() error {
tags := map[Tag]*activeRequest{} // active requests
requests := make(chan *Fcall) // sync, read-limited
completed := make(chan *Fcall, 1)
// read loop
- go func() {
- for {
- req := new(Fcall)
- if err := s.ch.ReadFcall(s.ctx, req); err != nil {
- if err, ok := err.(net.Error); ok {
- if err.Timeout() || err.Temporary() {
- continue
- }
- }
+ go c.read(requests)
+ go c.write(responses)
- log.Println("server: error reading fcall", err)
- return
- }
-
- select {
- case requests <- req:
- case <-s.ctx.Done():
- log.Println("server: context done")
- return
- case <-s.closed:
- log.Println("server: shutdown")
- return
- }
- }
- }()
-
- // write loop
- go func() {
- for {
- select {
- case resp := <-responses:
- if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
- log.Println("server: error writing fcall:", err)
- }
- case <-s.ctx.Done():
- log.Println("server: context done")
- return
- case <-s.closed:
- log.Println("server: shutdown")
- return
- }
- }
- }()
-
log.Println("server.run()")
for {
- log.Println("server:", "wait")
select {
case req := <-requests:
- log.Println("request", req)
if _, ok := tags[req.Tag]; ok {
select {
case responses <- newErrorFcall(req.Tag, ErrDuptag):
// Send to responses, bypass tag management.
- case <-s.ctx.Done():
- return
- case <-s.closed:
- return
+ case <-c.ctx.Done():
+ return c.ctx.Err()
+ case <-c.closed:
+ return c.err
}
continue
}
select {
case responses <- resp:
// bypass tag management in completed.
- case <-s.ctx.Done():
- return
- case <-s.closed:
- return
+ case <-c.ctx.Done():
+ return c.ctx.Err()
+ case <-c.closed:
+ return c.err
}
default:
// Allows us to session handlers to cancel processing of the fcall
// through context.
- ctx, cancel := context.WithCancel(s.ctx)
+ ctx, cancel := context.WithCancel(c.ctx)
// The contents of these instances are only writable in the main
// server loop. The value of tag will not change.
go func(ctx context.Context, req *Fcall) {
var resp *Fcall
- msg, err := s.handler.Handle(ctx, req.Message)
+ msg, err := c.handler.Handle(ctx, req.Message)
if err != nil {
// all handler errors are forwarded as protocol errors.
resp = newErrorFcall(req.Tag, err)
case completed <- resp:
case <-ctx.Done():
return
- case <-s.closed:
+ case <-c.closed:
return
}
}(ctx, req)
}
case resp := <-completed:
- log.Println("completed", resp)
// only responses that flip the tag state traverse this section.
active, ok := tags[resp.Tag]
if !ok {
log.Println("canceled", resp, active.ctx.Err())
}
delete(tags, resp.Tag)
- case <-s.ctx.Done():
- log.Println("server: context done")
+ case <-c.ctx.Done():
+ return c.ctx.Err()
+ case <-c.closed:
+ return c.err
+ }
+ }
+}
+
+// read takes requests off the channel and sends them on requests.
+func (c *conn) read(requests chan *Fcall) {
+ for {
+ req := new(Fcall)
+ if err := c.ch.ReadFcall(c.ctx, req); err != nil {
+ if err, ok := err.(net.Error); ok {
+ if err.Timeout() || err.Temporary() {
+ // TODO(stevvooe): A full idle timeout on the connection
+ // should be enforced here. No logging because it is quite
+ // chatty.
+ continue
+ }
+ }
+
+ c.CloseWithError(fmt.Errorf("error reading fcall: %v", err))
return
- case <-s.closed:
- log.Println("server: shutdown")
+ }
+
+ select {
+ case requests <- req:
+ case <-c.ctx.Done():
+ c.CloseWithError(c.ctx.Err())
return
+ case <-c.closed:
+ return
}
}
}
+
+func (c *conn) write(responses chan *Fcall) {
+ for {
+ select {
+ case resp := <-responses:
+ if err := c.ch.WriteFcall(c.ctx, resp); err != nil {
+ if err, ok := err.(net.Error); ok {
+ if err.Timeout() || err.Temporary() {
+ // TODO(stevvooe): A full idle timeout on the
+ // connection should be enforced here. We log here,
+ // since this is less common.
+ log.Println("9p server: temporary error writing fcall: %v", err)
+ continue
+ }
+ }
+
+ c.CloseWithError(fmt.Errorf("error writing fcall: %v", err))
+ return
+ }
+ case <-c.ctx.Done():
+ c.CloseWithError(c.ctx.Err())
+ return
+ case <-c.closed:
+ return
+ }
+ }
+}
+
+func (c *conn) Close() error {
+ return c.CloseWithError(nil)
+}
+
+func (c *conn) CloseWithError(err error) error {
+ select {
+ case <-c.closed:
+ return c.err
+ default:
+ close(c.closed)
+ if err == nil {
+ c.err = err
+ } else {
+ c.err = ErrClosed
+ }
+
+ return c.err
+ }
+}
blob - 8ebab5c10ad4d427357e5b72d158ec5cd4f012bf
blob + f6caee94546fa1288f44c7b465ed4da607141329
--- session.go
+++ session.go
-package p9pnew
+package p9p
import "golang.org/x/net/context"
blob - 8d93c7c601d73f26b6390e4039775b6e873ad029
blob + 4bbb78b65e3074494cf8d0fb6e53d1839f628936
--- transport.go
+++ transport.go
-package p9pnew
+package p9p
import (
"fmt"
blob - d909511b3e7ecd51d7f33988af702ac7245590ef
blob + 5fe03c6db69fbdaf081004818370c0302cf4145b
--- types.go
+++ types.go
-package p9pnew
+package p9p
import (
"fmt"
blob - 4c6712bcc7939d9947815e2a15bec59f9f868604
blob + ac20fb9dfd45e42713513aee43a5ad975d41c0cc
--- version.go
+++ version.go
-package p9pnew
+package p9p
import (
"fmt"