commit - 78cab7f39e3b5a0a8afd0a2f90564dfd049423a2
commit + 641595f816e782a5b39e5453e7afeed70c617ae0
blob - 432b3265bbeade0eef0f877bda4e9ef8d03d30b0
blob + 12bcfc1e68545d3ba2e4b41805d564e7cdd5a5f3
--- cmd/9ps/main.go
+++ cmd/9ps/main.go
return
}
- p9p.Serve(ctx, conn, p9p.Dispatch(session))
+ if err := p9p.ServeConn(ctx, conn, p9p.Dispatch(session)); err != nil {
+ log.Printf("serving conn: %v", err)
+ }
}(c)
}
}
blob - a26cd69172fb7be642c1163f04d269b242745a9a
blob + db99d90caebaa4a7090817bb8d7114815121e554
--- server.go
+++ server.go
package p9p
import (
+ "fmt"
"log"
"net"
"time"
"golang.org/x/net/context"
)
-// Serve the 9p session over the provided network connection.
-func Serve(ctx context.Context, conn net.Conn, handler Handler) {
+// TODO(stevvooe): Add net/http.Server-like type here to manage connections.
+// Coupled with Handler mux, we can get a very http-like experience for 9p
+// servers.
+// ServeConn the 9p handler over the provided network connection.
+func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error {
+
// TODO(stevvooe): It would be nice if the handler could declare the
// supported version. Before we had handler, we used the session to get
// the version (msize, version := session.Version()). We must decided if
// we want to proxy version and message size decisions all the back to the
// origin server or make those decisions at each link of a proxy chain.
- ch := newChannel(conn, codec9p{}, DefaultMSize)
+ ch := newChannel(cn, codec9p{}, DefaultMSize)
negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
// TODO(stevvooe): Need better error handling and retry support here.
- // For now, we silently ignore the failure.
- log.Println("error negotiating version:", err)
- return
+ return fmt.Errorf("error negotiating version:", err)
}
ctx = withVersion(ctx, DefaultVersion)
- s := &server{
+ c := &conn{
ctx: ctx,
ch: ch,
handler: handler,
closed: make(chan struct{}),
}
- s.run()
+ return c.serve()
}
-type server struct {
+// conn plays role of session dispatch for handler in a server.
+type conn struct {
ctx context.Context
session Session
ch Channel
handler Handler
closed chan struct{}
+ err error // terminal error for the conn
}
// activeRequest includes information about the active request.
cancel context.CancelFunc
}
-func (s *server) run() {
+// serve messages on the connection until an error is encountered.
+func (c *conn) serve() error {
tags := map[Tag]*activeRequest{} // active requests
requests := make(chan *Fcall) // sync, read-limited
completed := make(chan *Fcall, 1)
// read loop
- go func() {
- for {
- req := new(Fcall)
- if err := s.ch.ReadFcall(s.ctx, req); err != nil {
- if err, ok := err.(net.Error); ok {
- if err.Timeout() || err.Temporary() {
- continue
- }
- }
+ go c.read(requests)
+ go c.write(responses)
- log.Println("server: error reading fcall", err)
- return
- }
-
- select {
- case requests <- req:
- case <-s.ctx.Done():
- log.Println("server: context done")
- return
- case <-s.closed:
- log.Println("server: shutdown")
- return
- }
- }
- }()
-
- // write loop
- go func() {
- for {
- select {
- case resp := <-responses:
- if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
- log.Println("server: error writing fcall:", err)
- }
- case <-s.ctx.Done():
- log.Println("server: context done")
- return
- case <-s.closed:
- log.Println("server: shutdown")
- return
- }
- }
- }()
-
log.Println("server.run()")
for {
log.Println("server:", "wait")
select {
case responses <- newErrorFcall(req.Tag, ErrDuptag):
// Send to responses, bypass tag management.
- case <-s.ctx.Done():
- return
- case <-s.closed:
- return
+ case <-c.ctx.Done():
+ return c.ctx.Err()
+ case <-c.closed:
+ return c.err
}
continue
}
select {
case responses <- resp:
// bypass tag management in completed.
- case <-s.ctx.Done():
- return
- case <-s.closed:
- return
+ case <-c.ctx.Done():
+ return c.ctx.Err()
+ case <-c.closed:
+ return c.err
}
default:
// Allows us to session handlers to cancel processing of the fcall
// through context.
- ctx, cancel := context.WithCancel(s.ctx)
+ ctx, cancel := context.WithCancel(c.ctx)
// The contents of these instances are only writable in the main
// server loop. The value of tag will not change.
go func(ctx context.Context, req *Fcall) {
var resp *Fcall
- msg, err := s.handler.Handle(ctx, req.Message)
+ msg, err := c.handler.Handle(ctx, req.Message)
if err != nil {
// all handler errors are forwarded as protocol errors.
resp = newErrorFcall(req.Tag, err)
case completed <- resp:
case <-ctx.Done():
return
- case <-s.closed:
+ case <-c.closed:
return
}
}(ctx, req)
log.Println("canceled", resp, active.ctx.Err())
}
delete(tags, resp.Tag)
- case <-s.ctx.Done():
- log.Println("server: context done")
+ case <-c.ctx.Done():
+ return c.ctx.Err()
+ case <-c.closed:
+ return c.err
+ }
+ }
+}
+
+// read takes requests off the channel and sends them on requests.
+func (c *conn) read(requests chan *Fcall) {
+ for {
+ req := new(Fcall)
+ if err := c.ch.ReadFcall(c.ctx, req); err != nil {
+ if err, ok := err.(net.Error); ok {
+ if err.Timeout() || err.Temporary() {
+ // TODO(stevvooe): A full idle timeout on the connection
+ // should be enforced here. No logging because it is quite
+ // chatty.
+ continue
+ }
+ }
+
+ c.CloseWithError(fmt.Errorf("error reading fcall: %v", err))
return
- case <-s.closed:
- log.Println("server: shutdown")
+ }
+
+ select {
+ case requests <- req:
+ case <-c.ctx.Done():
+ c.CloseWithError(c.ctx.Err())
return
+ case <-c.closed:
+ return
}
}
}
+
+func (c *conn) write(responses chan *Fcall) {
+ for {
+ select {
+ case resp := <-responses:
+ if err := c.ch.WriteFcall(c.ctx, resp); err != nil {
+ if err, ok := err.(net.Error); ok {
+ if err.Timeout() || err.Temporary() {
+ // TODO(stevvooe): A full idle timeout on the
+ // connection should be enforced here. We log here,
+ // since this is less common.
+ log.Println("9p server: temporary error writing fcall: %v", err)
+ continue
+ }
+ }
+
+ c.CloseWithError(fmt.Errorf("error writing fcall: %v", err))
+ return
+ }
+ case <-c.ctx.Done():
+ c.CloseWithError(c.ctx.Err())
+ return
+ case <-c.closed:
+ return
+ }
+ }
+}
+
+func (c *conn) Close() error {
+ return c.CloseWithError(nil)
+}
+
+func (c *conn) CloseWithError(err error) error {
+ select {
+ case <-c.closed:
+ return c.err
+ default:
+ close(c.closed)
+ if err == nil {
+ c.err = err
+ } else {
+ c.err = ErrClosed
+ }
+
+ return c.err
+ }
+}
blob - ff2fd20cbe5bddfe06b2157dc145d0fc537287d0
blob + f6caee94546fa1288f44c7b465ed4da607141329
--- session.go
+++ session.go
package p9p
-import (
- "net"
+import "golang.org/x/net/context"
- "golang.org/x/net/context"
-)
-
// Session provides the central abstraction for a 9p connection. Clients
// implement sessions and servers serve sessions. Sessions can be proxied by
// serving up a client session.
// session implementation.
Version() (msize int, version string)
}
-
-func Dial(ctx context.Context, addr string) (Session, error) {
- c, err := net.Dial("tcp", addr)
- if err != nil {
- return nil, err
- }
-
- // BUG(stevvooe): Session doesn't actually close connection. Dial might
- // not be the right interface.
-
- return NewSession(ctx, c)
-}