Commit Diff


commit - f41196dcf2b4d9e98d8e2addb0cd403e5405a3f0
commit + 40d4a02d02470ddbfcb31818213c5daea4d1f545
blob - 52786490fc789cd9c9104fda5820ec0ddf135dbe
blob + a9453687ff9c6bcaec2512db06f1fbd224dd936a
--- channel.go
+++ channel.go
@@ -13,6 +13,27 @@ import (
 	"golang.org/x/net/context"
 )
 
+// Channel defines the operations necessary to implement a 9p message channel
+// interface. Typically, message channels do no protocol processing except to
+// send and receive message frames.
+type Channel interface {
+	// ReadFcall reads one fcall frame into the provided fcall structure. The
+	// Fcall may be cleared whether there is an error or not. If the operation
+	// is successful, the contents of the fcall will be populated in the
+	// argument. ReadFcall cannot be called concurrently with other calls to
+	// ReadFcall. This both to preserve message ordering and to allow lockless
+	// buffer reusage.
+	ReadFcall(ctx context.Context, fcall *Fcall) error
+
+	// WriteFcall writes the provided fcall to the channel. WriteFcall cannot
+	// be called concurrently with other calls to WriteFcall.
+	WriteFcall(ctx context.Context, fcall *Fcall) error
+
+	// SetMSize sets the maximum message size for the channel. This must never
+	// be called currently with ReadFcall or WriteFcall.
+	SetMSize(msize int)
+}
+
 const (
 	defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context
 )
@@ -20,12 +41,34 @@ const (
 // channel provides bidirectional protocol framing for 9p over net.Conn.
 // Operations are not thread-safe but reads and writes may be carried out
 // concurrently, supporting separate read and write loops.
+//
+// Lifecyle
+//
+// A connection, or message channel abstraction, has a lifecycle delineated by
+// Tversion/Rversion request response cycles. For now, this is part of the
+// channel itself but doesn't necessarily influence the channels state, except
+// the msize. Visually, it might look something like this:
+//
+// 	[Established] -> [Version] -> [Session] -> [Version]---+
+//	                     ^                                 |
+// 	                     |_________________________________|
+//
+// The connection is established, then we negotiate a version, run a session,
+// then negotiate a version and so on. For most purposes, we are likely going
+// to terminate the connection after the session but we may want to support
+// connection pooling. Pooling may result in possible security leaks if the
+// connections are shared among contexts, since the version is negotiated at
+// the start of the session. To avoid this, we can actually use a "tombstone"
+// version message which clears the server's session state without starting a
+// new session. The next version message would then prepare the session
+// without leaking any Fid's.
 type channel struct {
 	conn   net.Conn
 	codec  Codec
 	brd    *bufio.Reader
 	bwr    *bufio.Writer
 	closed chan struct{}
+	msize  int
 	rdbuf  []byte
 	wrbuf  []byte
 }
@@ -37,6 +80,7 @@ func newChannel(conn net.Conn, codec Codec, msize int)
 		brd:    bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size
 		bwr:    bufio.NewWriterSize(conn, msize),
 		closed: make(chan struct{}),
+		msize:  msize,
 		rdbuf:  make([]byte, msize),
 		wrbuf:  make([]byte, msize),
 	}
@@ -48,6 +92,7 @@ func (ch *channel) setmsize(msize int) {
 	// NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
 	// Proceed assuming that original size is sufficient.
 
+	ch.msize = msize
 	if msize < len(ch.rdbuf) {
 		// just change the cap
 		ch.rdbuf = ch.rdbuf[:msize]
@@ -85,9 +130,65 @@ func (ch *channel) version(ctx context.Context, msize 
 	return mv.MSize, mv.Version, nil
 }
 
+// negotiate blocks until a version message is received or a timeout occurs.
+// The msize for the tranport will be set from the negotiation. If negotiate
+// returns nil, a server may proceed with the connection.
+//
+// In the future, it might be better to handle the version messages in a
+// separate object that manages the session. Each set of version requests
+// effectively "reset" a connection, meaning all fids get clunked and all
+// outstanding IO is aborted. This is probably slightly racy, in practice with
+// a misbehaved client. The main issue is that we cannot tell which session
+// messages belong to.
+func (ch *channel) negotiate(ctx context.Context, version string) error {
+	// wait for the version message over the transport.
+	req := new(Fcall)
+	if err := ch.readFcall(ctx, req); err != nil {
+		return err
+	}
+
+	mv, ok := req.Message.(*MessageTversion)
+	if !ok {
+		return fmt.Errorf("expected version message: %v", mv)
+	}
+
+	respmsg := MessageRversion{
+		Version: version,
+	}
+
+	if mv.Version != version {
+		// TODO(stevvooe): Not the best place to do version handling. We need
+		// to have a way to pass supported versions into this method then have
+		// it return the actual version. For now, respond with unknown for
+		// anything that doesn't match the provided version string.
+		respmsg.Version = "unknown"
+	}
+
+	if int(mv.MSize) < ch.msize {
+		// if the server msize is too large, use the client's suggested msize.
+		ch.setmsize(int(mv.MSize))
+		respmsg.MSize = mv.MSize
+	} else {
+		respmsg.MSize = uint32(ch.msize)
+	}
+
+	resp := newFcall(respmsg)
+	if err := ch.writeFcall(ctx, resp); err != nil {
+		return err
+	}
+
+	if respmsg.Version == "unknown" {
+		return fmt.Errorf("bad version negotiation")
+	}
+
+	return nil
+}
+
 // ReadFcall reads the next message from the channel into fcall.
 func (ch *channel) readFcall(ctx context.Context, fcall *Fcall) error {
 	select {
+	case <-ctx.Done():
+		return ctx.Err()
 	case <-ch.closed:
 		return ErrClosed
 	default:
@@ -115,6 +216,8 @@ func (ch *channel) readFcall(ctx context.Context, fcal
 		return fmt.Errorf("message large than buffer:", n)
 	}
 
+	// clear out the fcall
+	*fcall = Fcall{}
 	if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
 		return err
 	}
@@ -124,6 +227,8 @@ func (ch *channel) readFcall(ctx context.Context, fcal
 
 func (ch *channel) writeFcall(ctx context.Context, fcall *Fcall) error {
 	select {
+	case <-ctx.Done():
+		return ctx.Err()
 	case <-ch.closed:
 		return ErrClosed
 	default:
blob - 25b2080e9f954172c4073e32232291a5f7030dbb
blob + 2da30033b5ccc50bec99a071ca5793ebeaec32f0
--- cmd/9pr/main.go
+++ cmd/9pr/main.go
@@ -34,9 +34,15 @@ func main() {
 	ctx := context.Background()
 	log.SetFlags(0)
 	flag.Parse()
+
+	proto := "tcp"
+	if strings.HasPrefix(addr, "unix:") {
+		proto = "unix"
+		addr = addr[5:]
+	}
 
 	log.Println("dialing", addr)
-	conn, err := net.Dial("tcp", addr)
+	conn, err := net.Dial(proto, addr)
 	if err != nil {
 		log.Fatal(err)
 	}
blob - 7818604df1a25843f729bd82b77a66aa08051e81
blob + e190166973a54ec0561999305e6166b8d9f360ab
--- cmd/9ps/main.go
+++ cmd/9ps/main.go
@@ -45,6 +45,8 @@ func main() {
 		}
 
 		go func(conn net.Conn) {
+			defer conn.Close()
+
 			ctx := context.WithValue(ctx, "conn", conn)
 			log.Println("connected", conn.RemoteAddr())
 			session, err := newLocalSession(ctx, root)
blob - /dev/null
blob + 5d884701a2cbc04f08d9dff5e5ddfcf4c4e27e27 (mode 644)
--- /dev/null
+++ dispatcher.go
@@ -0,0 +1,173 @@
+package p9pnew
+
+import (
+	"fmt"
+
+	"golang.org/x/net/context"
+)
+
+type handler interface {
+	handle(ctx context.Context, req *Fcall) (*Fcall, error)
+}
+
+// dispatcher routes fcalls to a Session.
+type dispatcher struct {
+	session Session
+}
+
+// handle responds to an fcall using the session. An error is only returned if
+// the handler cannot proceed. All session errors are returned as Rerror.
+func (d *dispatcher) handle(ctx context.Context, req *Fcall) (*Fcall, error) {
+	var resp *Fcall
+	switch req.Type {
+	case Tauth:
+		reqmsg, ok := req.Message.(MessageTauth)
+		if !ok {
+			return nil, fmt.Errorf("incorrect message for type: %v message=%v", req, req.Message)
+		}
+
+		qid, err := d.session.Auth(ctx, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname)
+		if err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(MessageRauth{Qid: qid})
+	case Tattach:
+		reqmsg, ok := req.Message.(*MessageTattach)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
+		}
+
+		qid, err := d.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname)
+		if err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(MessageRattach{
+			Qid: qid,
+		})
+	case Twalk:
+		reqmsg, ok := req.Message.(*MessageTwalk)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
+		}
+
+		// TODO(stevvooe): This is one of the places where we need to manage
+		// fid allocation lifecycle. We need to reserve the fid, then, if this
+		// call succeeds, we should alloc the fid for future uses. Also need
+		// to interact correctly with concurrent clunk and the flush of this
+		// walk message.
+		qids, err := d.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...)
+		if err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(&MessageRwalk{
+			Qids: qids,
+		})
+	case Topen:
+		reqmsg, ok := req.Message.(*MessageTopen)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+		}
+
+		qid, iounit, err := d.session.Open(ctx, reqmsg.Fid, reqmsg.Mode)
+		if err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(&MessageRopen{
+			Qid:    qid,
+			IOUnit: iounit,
+		})
+	case Tcreate:
+		reqmsg, ok := req.Message.(*MessageTcreate)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+		}
+
+		qid, iounit, err := d.session.Create(ctx, reqmsg.Fid, reqmsg.Name, reqmsg.Perm, uint32(reqmsg.Mode))
+		if err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(&MessageRcreate{
+			Qid:    qid,
+			IOUnit: iounit,
+		})
+
+	case Tread:
+		reqmsg, ok := req.Message.(*MessageTread)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+		}
+
+		p := make([]byte, int(reqmsg.Count))
+		n, err := d.session.Read(ctx, reqmsg.Fid, p, int64(reqmsg.Offset))
+		if err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(&MessageRread{
+			Data: p[:n],
+		})
+	case Twrite:
+		reqmsg, ok := req.Message.(*MessageTwrite)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+		}
+
+		n, err := d.session.Write(ctx, reqmsg.Fid, reqmsg.Data, int64(reqmsg.Offset))
+		if err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(&MessageRwrite{
+			Count: uint32(n),
+		})
+	case Tclunk:
+		reqmsg, ok := req.Message.(*MessageTclunk)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+		}
+
+		// TODO(stevvooe): Manage the clunking of file descriptors based on
+		// walk and attach call progression.
+		if err := d.session.Clunk(ctx, reqmsg.Fid); err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(&MessageRclunk{})
+	case Tremove:
+		reqmsg, ok := req.Message.(*MessageTremove)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+		}
+
+		if err := d.session.Remove(ctx, reqmsg.Fid); err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(&MessageRremove{})
+	case Tstat:
+		reqmsg, ok := req.Message.(*MessageTstat)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+		}
+
+		dir, err := d.session.Stat(ctx, reqmsg.Fid)
+		if err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(&MessageRstat{
+			Stat: dir,
+		})
+	case Twstat:
+		panic("not implemented")
+	default:
+		return nil, ErrUnknownMsg
+	}
+
+	return resp, nil
+}
blob - 2310a2c5f859b1b9f2486c722d01e703a67ca0a7
blob + e15f26b1b7a1c68a00ef24c5740fd35cde89a245
--- encoding_test.go
+++ encoding_test.go
@@ -2,6 +2,7 @@ package p9pnew
 
 import (
 	"bytes"
+	"errors"
 	"reflect"
 	"testing"
 	"time"
@@ -155,6 +156,14 @@ func TestEncodeDecode(t *testing.T) {
 				0x3, 0x0, 0x67, 0x69, 0x64, // gid
 				0x4, 0x0, 0x6d, 0x75, 0x69, 0x64}, // muid
 		},
+		{
+			description: "Rerror fcall",
+			target:      newErrorFcall(5556, errors.New("A serious error")),
+			marshaled: []byte{
+				0x75, 0xb4, 0x15,
+				0x12, 0x0, 0x0, 0x0,
+				0x61, 0x20, 0x6c, 0x6f, 0x74, 0x20, 0x6f, 0x66, 0x20, 0x62, 0x79, 0x74, 0x65, 0x20, 0x64, 0x61, 0x74, 0x61},
+		},
 	} {
 		t.Logf("target under test: %v", testcase.target)
 		fatalf := func(format string, args ...interface{}) {
blob - 6a1212e5299bf975367e1754ded62b3be8a88130
blob + 000834bd884dcf2b5745120f03727e76a12f91f3
--- errors.go
+++ errors.go
@@ -1,9 +1,6 @@
 package p9pnew
 
-import (
-	"errors"
-	"fmt"
-)
+import "errors"
 
 // common errors returned by Session interface methods
 var (
@@ -33,17 +30,12 @@ var (
 	ErrWalknodir    = new9pError("walk in non-directory")
 
 	// extra errors not part of the normal protocol
-	ErrTimeout = new9pError("fcall timeout") // returned when timing out on the fcall
+	ErrTimeout    = new9pError("fcall timeout") // returned when timing out on the fcall
+	ErrUnknownTag = new9pError("unknown tag")
+	ErrUnknownMsg = new9pError("unknown message") // returned when encountering unknown message type
 )
 
-type error9p struct {
-	Name string
-}
-
+// new9pError returns a new 9p error ready for the wire.
 func new9pError(s string) error {
-	return error9p{Name: s}
+	return MessageRerror{Ename: s}
 }
-
-func (e error9p) Error() string {
-	return fmt.Sprintf("9p: %v", e.Name)
-}
blob - 9d1fc1aba43ebbc74dc6a62fe3de23ff731f0761
blob + 80d2e7e1fd009f0babc3639130e1519b72bdd533
--- fcall.go
+++ fcall.go
@@ -121,6 +121,25 @@ func newFcall(msg Message) *Fcall {
 	}
 }
 
+func newErrorFcall(tag Tag, err error) *Fcall {
+	var msg Message
+
+	switch v := err.(type) {
+	case MessageRerror:
+		msg = v
+	case *MessageRerror:
+		msg = v
+	default:
+		msg = MessageRerror{Ename: v.Error()}
+	}
+
+	return &Fcall{
+		Type:    Rerror,
+		Tag:     tag,
+		Message: msg,
+	}
+}
+
 func (fc *Fcall) String() string {
 	return fmt.Sprintf("%v(%v) %v", fc.Type, fc.Tag, string9p(fc.Message))
 }
@@ -141,9 +160,9 @@ func newMessage(typ FcallType) (Message, error) {
 	case Rversion:
 		return &MessageRversion{}, nil
 	case Tauth:
-
+		return &MessageTauth{}, nil
 	case Rauth:
-
+		return &MessageRauth{}, nil
 	case Tattach:
 		return &MessageTattach{}, nil
 	case Rattach:
@@ -163,9 +182,9 @@ func newMessage(typ FcallType) (Message, error) {
 	case Ropen:
 		return &MessageRopen{}, nil
 	case Tcreate:
-
+		return &MessageTcreate{}, nil
 	case Rcreate:
-
+		return &MessageRcreate{}, nil
 	case Tread:
 		return &MessageTread{}, nil
 	case Rread:
@@ -221,6 +240,10 @@ type MessageRerror struct {
 	Ename string
 }
 
+func (e MessageRerror) Error() string {
+	return fmt.Sprintf("9p: %v", e.Ename)
+}
+
 type MessageTflush struct {
 	Oldtag Tag
 }
@@ -300,6 +323,8 @@ type MessageTremove struct {
 	Fid Fid
 }
 
+type MessageRremove struct{}
+
 type MessageTstat struct {
 	Fid Fid
 }
@@ -317,9 +342,9 @@ func (MessageTversion) Type() FcallType { return Tvers
 func (MessageRversion) Type() FcallType { return Rversion }
 func (MessageTauth) Type() FcallType    { return Tauth }
 func (MessageRauth) Type() FcallType    { return Rauth }
-func (MessageRerror) Type() FcallType   { return Rerror }
 func (MessageTflush) Type() FcallType   { return Tflush }
 func (MessageRflush) Type() FcallType   { return Rflush }
+func (MessageRerror) Type() FcallType   { return Rerror }
 func (MessageTattach) Type() FcallType  { return Tattach }
 func (MessageRattach) Type() FcallType  { return Rattach }
 func (MessageTwalk) Type() FcallType    { return Twalk }
@@ -335,6 +360,7 @@ func (MessageRwrite) Type() FcallType   { return Rwrit
 func (MessageTclunk) Type() FcallType   { return Tclunk }
 func (MessageRclunk) Type() FcallType   { return Rclunk }
 func (MessageTremove) Type() FcallType  { return Tremove }
+func (MessageRremove) Type() FcallType  { return Rremove }
 func (MessageTstat) Type() FcallType    { return Tstat }
 func (MessageRstat) Type() FcallType    { return Rstat }
 func (MessageTwstat) Type() FcallType   { return Twstat }
blob - 325347028d2ac5a31e07eb8ed43af174b5a3beb1
blob + cb90f4c988eb50b54f9e78cd447dbe74b543727c
--- server.go
+++ server.go
@@ -1,20 +1,39 @@
 package p9pnew
 
 import (
-	"bufio"
-	"fmt"
 	"log"
 	"net"
+	"time"
 
 	"golang.org/x/net/context"
 )
 
 // Serve the 9p session over the provided network connection.
 func Serve(ctx context.Context, conn net.Conn, session Session) {
+	const msize = 64 << 10
+	const vers = "9P2000"
+
+	ch := newChannel(conn, codec9p{}, msize)
+
+	negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
+	defer cancel()
+
+	// TODO(stevvooe): For now, we negotiate here. It probably makes sense to
+	// do this outside of this function and then pass in a ready made channel.
+	// We are not really ready to export the channel type yet.
+
+	if err := ch.negotiate(negctx, vers); err != nil {
+		// TODO(stevvooe): Need better error handling and retry support here.
+		// For now, we silently ignore the failure.
+		log.Println("error negotiating version:", err)
+		return
+	}
+
 	s := &server{
 		ctx:     ctx,
-		conn:    conn,
-		session: session,
+		ch:      ch,
+		handler: &dispatcher{session: session},
+		closed:  make(chan struct{}),
 	}
 
 	s.run()
@@ -23,18 +42,21 @@ func Serve(ctx context.Context, conn net.Conn, session
 type server struct {
 	ctx     context.Context
 	session Session
-	conn    net.Conn
+	ch      *channel
+	handler handler
 	closed  chan struct{}
 }
 
+type activeTag struct {
+	ctx       context.Context
+	request   *Fcall
+	cancel    context.CancelFunc
+	responded bool // true, if some response was sent (Response or Rflush/Rerror)
+}
+
 func (s *server) run() {
-	brd := bufio.NewReader(s.conn)
-	dec := &decoder{brd}
-	bwr := bufio.NewWriter(s.conn)
-	enc := &encoder{bwr}
+	tags := map[Tag]*activeTag{} // active requests
 
-	tags := map[Tag]*Fcall{} // active requests
-
 	log.Println("server.run()")
 	for {
 		select {
@@ -45,148 +67,90 @@ func (s *server) run() {
 		default:
 		}
 
-		// NOTE(stevvooe): For now, we only provide a single request at a time
-		// handler. We can refactor this to take requests off the wire as
-		// quickly as they arrive and dispatch in parallel to session.
+		// BUG(stevvooe): This server blocks on reads, calls to handlers and
+		// write, effectively single tracking fcalls through a target
+		// dispatcher. There is no reason we couldn't parallelize these
+		// requests out to the dispatcher to get massive performance
+		// improvements.
 
 		log.Println("server:", "wait")
-		fcall := new(Fcall)
-
-		// BUG(stevvooe): The decoder is not reliably consuming all of the
-		// bytes of the wire. Needs to be setup to consume all bytes indicated
-		// in the size portion. Should use msize from the version negotiation.
-
-		if err := dec.decode(fcall); err != nil {
-			log.Println("server decoding fcall:", err)
+		req := new(Fcall)
+		if err := s.ch.readFcall(s.ctx, req); err != nil {
+			log.Println("server: error reading fcall", err)
 			continue
 		}
 
-		log.Println("server:", "message", fcall)
-
-		if _, ok := tags[fcall.Tag]; ok {
-			if err := enc.encode(&Fcall{
-				Type: Rerror,
-				Tag:  fcall.Tag,
-				Message: &MessageRerror{
-					Ename: ErrDuptag.Error(),
-				},
-			}); err != nil {
-				log.Println("server:", err)
+		if _, ok := tags[req.Tag]; ok {
+			resp := newErrorFcall(req.Tag, ErrDuptag)
+			if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+				log.Printf("error sending duplicate tag response: %v", err)
 			}
-			bwr.Flush()
 			continue
 		}
-		tags[fcall.Tag] = fcall
 
-		resp, err := s.handle(s.ctx, fcall)
-		if err != nil {
-			log.Println("server:", err)
-			continue
-		}
+		// handle flush calls. The tag never makes it into active from here.
+		if mf, ok := req.Message.(MessageTflush); ok {
+			log.Println("flushing message", mf.Oldtag)
 
-		if err := enc.encode(resp); err != nil {
-			log.Println("server:", err)
-			continue
-		}
-		bwr.Flush()
+			// check if we have actually know about the requested flush
+			active, ok := tags[mf.Oldtag]
+			if ok {
+				active.cancel() // cancel the context
 
-	}
-}
+				resp := newFcall(MessageRflush{})
+				resp.Tag = req.Tag
+				if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+					log.Printf("error responding to flush: %v", err)
+				}
+				active.responded = true
+			} else {
+				resp := newErrorFcall(req.Tag, ErrUnknownTag)
+				if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+					log.Printf("error responding to flush: %v", err)
+				}
+			}
 
-// handle responds to an fcall using the session. An error is only returned if
-// the handler cannot proceed. All session errors are returned as Rerror.
-func (s *server) handle(ctx context.Context, req *Fcall) (*Fcall, error) {
-	var resp *Fcall
-	switch req.Type {
-	case Tattach:
-		reqmsg, ok := req.Message.(*MessageTattach)
-		if !ok {
-			return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
+			continue
 		}
 
-		qid, err := s.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname)
-		if err != nil {
-			return nil, err
-		}
+		// TODO(stevvooe): Add handler timeout here, as well, if we desire.
 
-		resp = &Fcall{
-			Type: Rattach,
-			Tag:  req.Tag,
-			Message: &MessageRattach{
-				Qid: qid,
-			},
-		}
-	case Twalk:
-		reqmsg, ok := req.Message.(*MessageTwalk)
-		if !ok {
-			return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
-		}
+		// Allows us to signal handlers to cancel processing of the fcall
+		// through context.
+		ctx, cancel := context.WithCancel(s.ctx)
 
-		// TODO(stevvooe): This is one of the places where we need to manage
-		// fid allocation lifecycle. We need to reserve the fid, then, if this
-		// call succeeds, we should alloc the fid for future uses. Also need
-		// to interact correctly with concurrent clunk and the flush of this
-		// walk message.
-		qids, err := s.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...)
-		if err != nil {
-			return nil, err
+		tags[req.Tag] = &activeTag{
+			ctx:     ctx,
+			request: req,
+			cancel:  cancel,
 		}
 
-		resp = newFcall(&MessageRwalk{
-			Qids: qids,
-		})
-	case Topen:
-		reqmsg, ok := req.Message.(*MessageTopen)
-		if !ok {
-			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
-		}
-
-		qid, iounit, err := s.session.Open(ctx, reqmsg.Fid, reqmsg.Mode)
+		resp, err := s.handler.handle(ctx, req)
 		if err != nil {
-			return nil, err
+			// all handler errors are forwarded as protocol errors.
+			resp = newErrorFcall(req.Tag, err)
 		}
+		resp.Tag = req.Tag
 
-		resp = newFcall(&MessageRopen{
-			Qid:    qid,
-			IOUnit: iounit,
-		})
-	case Tread:
-		reqmsg, ok := req.Message.(*MessageTread)
-		if !ok {
-			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
-		}
+		if err := ctx.Err(); err != nil {
+			// NOTE(stevvooe): We aren't really getting our moneys worth for
+			// how this is being handled. We really need to dispatch each
+			// request handler to a separate thread.
 
-		p := make([]byte, int(reqmsg.Count))
-		n, err := s.session.Read(ctx, reqmsg.Fid, p, int64(reqmsg.Offset))
-		if err != nil {
-			return nil, err
+			// the context was canceled for some reason, perhaps timeout or
+			// due to a flush call. We treat this as a condition where a
+			// response should not be sent.
+			log.Println("context error:", err)
+			continue
 		}
 
-		resp = newFcall(&MessageRread{
-			Data: p[:n],
-		})
-	case Tclunk:
-		reqmsg, ok := req.Message.(*MessageTclunk)
-		if !ok {
-			return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+		if !tags[req.Tag].responded {
+			if err := s.ch.writeFcall(ctx, resp); err != nil {
+				log.Println("server: error writing fcall:", err)
+				continue
+			}
 		}
 
-		// TODO(stevvooe): Manage the clunking of file descriptors based on
-		// walk and attach call progression.
-		if err := s.session.Clunk(ctx, reqmsg.Fid); err != nil {
-			return nil, err
-		}
-
-		resp = newFcall(&MessageRclunk{})
+		delete(tags, req.Tag)
 	}
-
-	if resp == nil {
-		log.Println("unknown message type:", req.Type)
-		resp = newFcall(&MessageRerror{
-			Ename: "unknown message type",
-		})
-	}
-
-	resp.Tag = req.Tag
-	return resp, nil
 }
blob - 5bdc01d70056b90a64fa6838de37153d3aec0cdb
blob + b7b890057006eb9f329f5e4fbc253f9e15b20fce
--- transport.go
+++ transport.go
@@ -16,6 +16,9 @@ type roundTripper interface {
 	send(ctx context.Context, fc *Fcall) (*Fcall, error)
 }
 
+// transport plays the role of being a client channel manager. It multiplexes
+// function calls onto the wire and dispatches responses to blocking calls to
+// send. On the whole, transport is thread-safe for calling send
 type transport struct {
 	ctx      context.Context
 	ch       *channel