Commit Diff


commit - 6b84ea70bad42d1dc151a3547b21c8818527e78d
commit + e6bcde663b39a93941cfd4188a6378e2ac5b92b2
blob - acb4b04fc8d42393ecdd5d8c7444c95354fb2144
blob + 5993872f445220c17408dabd657bcfd53470ba28
--- client.go
+++ client.go
@@ -1,10 +1,8 @@
 package p9pnew
 
 import (
-	"bufio"
 	"fmt"
 	"log"
-	"time"
 
 	"golang.org/x/net/context"
 
@@ -12,11 +10,8 @@ import (
 )
 
 type client struct {
-	ctx      context.Context
-	conn     net.Conn
-	tags     *tagPool
-	requests chan fcallRequest
-	closed   chan struct{}
+	ctx       context.Context
+	transport roundTripper
 }
 
 // NewSession returns a session using the connection. The Context ctx provides
@@ -24,8 +19,8 @@ type client struct {
 // session. The session can effectively shutdown with this context.
 func NewSession(ctx context.Context, conn net.Conn) (Session, error) {
 	return &client{
-		ctx:  ctx,
-		conn: conn,
+		ctx:       ctx,
+		transport: newTransport(ctx, conn),
 	}, nil
 }
 
@@ -36,6 +31,7 @@ func (c *client) Auth(ctx context.Context, afid Fid, u
 }
 
 func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) {
+	log.Println("client attach", fid, aname)
 	fcall := &Fcall{
 		Type: Tattach,
 		Message: &MessageTattach{
@@ -46,7 +42,7 @@ func (c *client) Attach(ctx context.Context, fid, afid
 		},
 	}
 
-	resp, err := c.send(ctx, fcall)
+	resp, err := c.transport.send(ctx, fcall)
 	if err != nil {
 		return Qid{}, err
 	}
@@ -60,7 +56,20 @@ func (c *client) Attach(ctx context.Context, fid, afid
 }
 
 func (c *client) Clunk(ctx context.Context, fid Fid) error {
-	panic("not implemented")
+	fcall := newFcall(&MessageTclunk{
+		Fid: fid,
+	})
+
+	resp, err := c.transport.send(ctx, fcall)
+	if err != nil {
+		return err
+	}
+
+	if resp.Type != Rclunk {
+		return fmt.Errorf("incorrect response type: %v", resp)
+	}
+
+	return nil
 }
 
 func (c *client) Remove(ctx context.Context, fid Fid) error {
@@ -69,10 +78,7 @@ func (c *client) Remove(ctx context.Context, fid Fid) 
 
 func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) {
 	if len(names) > 16 {
-		// TODO(stevvooe): Implement multi-message handling for more than 16
-		// wnames. May want to actually force caller to implement this since
-		// we'll need a new fid for each RPC.
-		panic("more than 16 components not implemented")
+		return nil, fmt.Errorf("too many elements in wname")
 	}
 
 	fcall := &Fcall{
@@ -80,11 +86,11 @@ func (c *client) Walk(ctx context.Context, fid Fid, ne
 		Message: &MessageTwalk{
 			Fid:    fid,
 			Newfid: newfid,
-			Wname:  names,
+			Wnames: names,
 		},
 	}
 
-	resp, err := c.send(ctx, fcall)
+	resp, err := c.transport.send(ctx, fcall)
 	if err != nil {
 		return nil, err
 	}
@@ -109,7 +115,7 @@ func (c *client) Read(ctx context.Context, fid Fid, p 
 		},
 	}
 
-	resp, err := c.send(ctx, fcall)
+	resp, err := c.transport.send(ctx, fcall)
 	if err != nil {
 		return 0, err
 	}
@@ -134,21 +140,36 @@ func (c *client) Write(ctx context.Context, fid Fid, p
 		},
 	}
 
-	resp, err := c.send(ctx, fcall)
+	resp, err := c.transport.send(ctx, fcall)
 	if err != nil {
 		return 0, err
 	}
 
 	mrr, ok := resp.Message.(MessageRwrite)
 	if !ok {
-		return 0, fmt.Errorf("invalid rpc response for read message: %v", resp)
+		return 0, fmt.Errorf("invalid rpc response for write message: %v", resp)
 	}
 
 	return int(mrr.Count), nil
 }
 
-func (c *client) Open(ctx context.Context, fid Fid, mode int32) (Qid, error) {
-	panic("not implemented")
+func (c *client) Open(ctx context.Context, fid Fid, mode uint8) (Qid, uint32, error) {
+	fcall := newFcall(&MessageTopen{
+		Fid:  fid,
+		Mode: mode,
+	})
+
+	resp, err := c.transport.send(ctx, fcall)
+	if err != nil {
+		return Qid{}, 0, err
+	}
+
+	respmsg, ok := resp.Message.(MessageRopen)
+	if !ok {
+		return Qid{}, 0, fmt.Errorf("invalid rpc response for open message: %v", resp)
+	}
+
+	return respmsg.Qid, respmsg.Msize, nil
 }
 
 func (c *client) Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error) {
@@ -164,20 +185,19 @@ func (c *client) WStat(context.Context, Fid, Dir) erro
 }
 
 func (c *client) Version(ctx context.Context, msize uint32, version string) (uint32, string, error) {
-	fcall := &Fcall{
-		Type: Tversion,
-		Message: MessageVersion{
-			MSize:   uint32(msize),
-			Version: version,
-		},
+	msg := MessageTversion{
+		MSize:   uint32(msize),
+		Version: version,
 	}
 
-	resp, err := c.send(ctx, fcall)
+	fcall := newFcall(msg)
+
+	resp, err := c.transport.send(ctx, fcall)
 	if err != nil {
 		return 0, "", err
 	}
 
-	mv, ok := resp.Message.(*MessageVersion)
+	mv, ok := resp.Message.(*MessageRversion)
 	if !ok {
 		return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
 	}
@@ -191,144 +211,6 @@ func (c *client) Version(ctx context.Context, msize ui
 func (c *client) flush(ctx context.Context, tag Tag) error {
 	// TODO(stevvooe): We need to fire and forget flush messages when a call
 	// context gets cancelled.
-	panic("not implemented")
-}
 
-// send dispatches the fcall.
-func (c *client) send(ctx context.Context, fc *Fcall) (*Fcall, error) {
-	fc.Tag = c.tags.Get()
-	defer c.tags.Put(fc.Tag)
-
-	fcreq := newFcallRequest(ctx, fc)
-
-	// dispatch the request.
-	select {
-	case <-c.closed:
-		return nil, ErrClosed
-	case c.requests <- fcreq:
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	}
-
-	// wait for the response.
-	select {
-	case <-c.closed:
-		return nil, ErrClosed
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	case resp := <-fcreq.response:
-		return resp, nil
-	}
-}
-
-type fcallRequest struct {
-	ctx      context.Context
-	fcall    *Fcall
-	response chan *Fcall
-	err      chan error
+	panic("not implemented")
 }
-
-func newFcallRequest(ctx context.Context, fc *Fcall) fcallRequest {
-	return fcallRequest{
-		ctx:      ctx,
-		fcall:    fc,
-		response: make(chan *Fcall, 1),
-		err:      make(chan error, 1),
-	}
-}
-
-// handle takes messages off the wire and wakes up the waiting tag call.
-func (c *client) handle() {
-
-	var (
-		responses = make(chan *Fcall)
-		// outstanding provides a map of tags to outstanding requests.
-		outstanding = map[Tag]*fcallRequest{}
-	)
-
-	// loop to read messages off of the connection
-	go func() {
-		dec := &decoder{bufio.NewReader(c.conn)}
-
-	loop:
-		for {
-			const pump = time.Second
-
-			// Continuously set the read dead line pump the loop below. We can
-			// probably set a connection dead threshold that can count these.
-			// Usually, this would only matter when there are actually
-			// outstanding requests.
-			deadline, ok := c.ctx.Deadline()
-			if !ok {
-				deadline = time.Now().Add(pump)
-			} else {
-				// if the deadline is before
-				nd := time.Now().Add(pump)
-				if nd.Before(deadline) {
-					deadline = nd
-				}
-			}
-
-			if err := c.conn.SetReadDeadline(deadline); err != nil {
-				panic(fmt.Sprintf("error setting read deadline: %v", err))
-			}
-
-			fc := new(Fcall)
-			if err := dec.decode(fc); err != nil {
-				switch err := err.(type) {
-				case net.Error:
-					if err.Timeout() || err.Temporary() {
-						break loop
-					}
-				}
-
-				panic(fmt.Sprintf("connection read error: %v", err))
-			}
-
-			select {
-			case <-c.ctx.Done():
-				return
-			case <-c.closed:
-				return
-			case responses <- fc:
-			}
-		}
-
-	}()
-
-	enc := &encoder{bufio.NewWriter(c.conn)}
-
-	for {
-		select {
-		case <-c.ctx.Done():
-			return
-		case <-c.closed:
-			return
-		case req := <-c.requests:
-			outstanding[req.fcall.Tag] = &req
-
-			// use deadline to set write deadline for this request.
-			deadline, ok := req.ctx.Deadline()
-			if !ok {
-				deadline = time.Now().Add(time.Second)
-			}
-
-			if err := c.conn.SetWriteDeadline(deadline); err != nil {
-				log.Println("error setting write deadline: %v", err)
-			}
-
-			if err := enc.encode(req.fcall); err != nil {
-				delete(outstanding, req.fcall.Tag)
-				req.err <- err
-			}
-		case b := <-responses:
-			req, ok := outstanding[b.Tag]
-			if !ok {
-				panic("unknown tag received")
-			}
-			delete(outstanding, req.fcall.Tag)
-
-			req.response <- b
-		}
-	}
-}
blob - dbc20c3115cb222fa2a80d65f3472d7425979d5a
blob + d003a08067db2beb05f4e2124a29c59f1d0a43b8
--- cmd/9pr/main.go
+++ cmd/9pr/main.go
@@ -5,6 +5,9 @@ import (
 	"fmt"
 	"io"
 	"log"
+	"net"
+	"net/http"
+	_ "net/http/pprof"
 	"os"
 	"path"
 	"strings"
@@ -17,13 +20,29 @@ import (
 )
 
 func main() {
+	go func() {
+		log.Println(http.ListenAndServe("localhost:6060", nil))
+	}()
+
 	log.SetFlags(0)
 
 	// addr := os.Args[1]
-
+	ctx := context.Background()
 	// TODO(stevvooe): Use a dialer once we have the server session working
 	// and running.
 
+	session := newSimpleSession()
+
+	sconn, cconn := net.Pipe()
+
+	go p9pnew.Serve(ctx, sconn, session)
+
+	log.Println("new session")
+	csession, err := p9pnew.NewSession(ctx, cconn)
+	if err != nil {
+		log.Fatalln(err)
+	}
+
 	// session, err := p9pnew.Dial(ctx, addr)
 	// if err != nil {
 	// 	log.Fatalln(err)
@@ -31,7 +50,7 @@ func main() {
 
 	commander := &fsCommander{
 		ctx:     context.Background(),
-		session: newSimpleSession(),
+		session: csession,
 		pwd:     "/",
 		stdout:  os.Stdout,
 		stderr:  os.Stderr,
@@ -55,6 +74,7 @@ func main() {
 	}
 	commander.readline = rl
 
+	log.Println("attach root")
 	// attach root
 	commander.nextfid = 1
 	if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9pnew.NOFID, "anyone", "/"); err != nil {
@@ -63,6 +83,7 @@ func main() {
 	commander.rootfid = commander.nextfid
 	commander.nextfid++
 
+	log.Println("clone root")
 	// clone the pwd fid so we can clunk it
 	if _, err := commander.session.Walk(commander.ctx, commander.rootfid, commander.nextfid); err != nil {
 		log.Fatalln(err)
@@ -85,7 +106,7 @@ func main() {
 		args := strings.Fields(line)
 
 		name := args[0]
-		var cmd func(args ...string) error
+		var cmd func(ctx context.Context, args ...string) error
 
 		switch name {
 		case "ls":
@@ -97,12 +118,13 @@ func main() {
 		case "cat":
 			cmd = commander.cmdcat
 		default:
-			cmd = func(args ...string) error {
+			cmd = func(ctx context.Context, args ...string) error {
 				return fmt.Errorf("command not implemented")
 			}
 		}
 
-		if err := cmd(args[1:]...); err != nil {
+		ctx, _ = context.WithTimeout(commander.ctx, time.Second)
+		if err := cmd(ctx, args[1:]...); err != nil {
 			log.Printf("👹 %s: %v", name, err)
 		}
 	}
@@ -122,7 +144,7 @@ type fsCommander struct {
 	stderr   io.Writer
 }
 
-func (c *fsCommander) cmdls(args ...string) error {
+func (c *fsCommander) cmdls(ctx context.Context, args ...string) error {
 	ps := []string{c.pwd}
 	if len(args) > 0 {
 		ps = args
@@ -143,18 +165,18 @@ func (c *fsCommander) cmdls(args ...string) error {
 		targetfid := c.nextfid
 		c.nextfid++
 		components := strings.Split(strings.Trim(p, "/"), "/")
-		if _, err := c.session.Walk(c.ctx, c.rootfid, targetfid, components...); err != nil {
+		if _, err := c.session.Walk(ctx, c.rootfid, targetfid, components...); err != nil {
 			return err
 		}
-		defer c.session.Clunk(c.ctx, targetfid)
+		defer c.session.Clunk(ctx, targetfid)
 
-		if _, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD); err != nil {
+		if _, _, err := c.session.Open(ctx, targetfid, p9pnew.OREAD); err != nil {
 			return err
 		}
 
 		p := make([]byte, 4<<20)
 
-		n, err := c.session.Read(c.ctx, targetfid, p, 0)
+		n, err := c.session.Read(ctx, targetfid, p, 0)
 		if err != nil {
 			return err
 		}
@@ -183,7 +205,7 @@ func (c *fsCommander) cmdls(args ...string) error {
 	return wr.Flush()
 }
 
-func (c *fsCommander) cmdcd(args ...string) error {
+func (c *fsCommander) cmdcd(ctx context.Context, args ...string) error {
 	var p string
 	switch len(args) {
 	case 0:
@@ -213,7 +235,7 @@ func (c *fsCommander) cmdcd(args ...string) error {
 	return nil
 }
 
-func (c *fsCommander) cmdpwd(args ...string) error {
+func (c *fsCommander) cmdpwd(ctx context.Context, args ...string) error {
 	if len(args) != 0 {
 		return fmt.Errorf("pwd takes no arguments")
 	}
@@ -222,7 +244,7 @@ func (c *fsCommander) cmdpwd(args ...string) error {
 	return nil
 }
 
-func (c *fsCommander) cmdcat(args ...string) error {
+func (c *fsCommander) cmdcat(ctx context.Context, args ...string) error {
 	var p string
 	switch len(args) {
 	case 0:
@@ -245,11 +267,12 @@ func (c *fsCommander) cmdcat(args ...string) error {
 	}
 	defer c.session.Clunk(c.ctx, c.pwdfid)
 
-	if _, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD); err != nil {
+	_, msize, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD)
+	if err != nil {
 		return err
 	}
 
-	b := make([]byte, 4<<20)
+	b := make([]byte, msize)
 
 	n, err := c.session.Read(c.ctx, targetfid, b, 0)
 	if err != nil {
@@ -330,9 +353,6 @@ func newSimpleSession() p9pnew.Session {
 	b.add(bc)
 	root.add(a)
 	root.add(b)
-	log.Println(root.children)
-	log.Println(a.children)
-	log.Println(b.children)
 
 	return &simpleSession{
 		root:   root,
@@ -438,15 +458,15 @@ func (s *simpleSession) Write(ctx context.Context, fid
 	panic("not implemented")
 }
 
-func (s *simpleSession) Open(ctx context.Context, fid p9pnew.Fid, mode int32) (p9pnew.Qid, error) {
+func (s *simpleSession) Open(ctx context.Context, fid p9pnew.Fid, mode uint8) (p9pnew.Qid, uint32, error) {
 	fi, ok := s.fids[fid]
 	if !ok {
-		return p9pnew.Qid{}, p9pnew.ErrUnknownfid
+		return p9pnew.Qid{}, 0, p9pnew.ErrUnknownfid
 	}
 
 	s.opened[fid] = struct{}{}
 
-	return fi.dir.Qid, nil
+	return fi.dir.Qid, 4 << 20, nil
 }
 
 func (s *simpleSession) Create(ctx context.Context, parent p9pnew.Fid, name string, perm uint32, mode uint32) (p9pnew.Qid, error) {
blob - 45ec913cb709375259ca68b89724efe2da533146
blob + 529dbcad3e754a2d6af8c32544adc0bfbc3d0c39
--- encoding.go
+++ encoding.go
@@ -344,3 +344,16 @@ func fields9p(v interface{}) ([]interface{}, error) {
 
 	return elements, nil
 }
+
+func pretty9p(w io.Writer, v interface{}) error {
+	switch v := v.(type) {
+	case *Fcall:
+		pretty9p(w, *v)
+	case Fcall:
+		fmt.Fprintf(w, "uint32(%v) %v(%v) ", size9p(v), v.Type, v.Tag)
+		pretty9p(w, v.Message)
+		fmt.Fprintln(w)
+	}
+
+	return nil
+}
blob - 1467c5bfd69e08e5dabbf2b195dd87d531b47ee3
blob + 76b8a9ae3ccc5c4a86aa70d2b232a1155dbd2f8f
--- encoding_test.go
+++ encoding_test.go
@@ -27,12 +27,14 @@ func TestEncodeDecode(t *testing.T) {
 				0x4, 0x0, 0x71, 0x77, 0x65, 0x72,
 				0x4, 0x0, 0x7a, 0x78, 0x63, 0x76},
 		},
+		// Dir
+		// Qid
 		{
 			description: "Tversion fcall",
 			target: &Fcall{
 				Type: Tversion,
 				Tag:  2255,
-				Message: &MessageVersion{
+				Message: &MessageTversion{
 					MSize:   uint32(1024),
 					Version: "9PTEST",
 				},
@@ -43,6 +45,21 @@ func TestEncodeDecode(t *testing.T) {
 				0x6, 0x0, 0x39, 0x50, 0x54, 0x45, 0x53, 0x54},
 		},
 		{
+			description: "Rversion fcall",
+			target: &Fcall{
+				Type: Rversion,
+				Tag:  2255,
+				Message: &MessageRversion{
+					MSize:   uint32(1024),
+					Version: "9PTEST",
+				},
+			},
+			marshaled: []byte{
+				0x13, 0x0, 0x0, 0x0,
+				0x65, 0xcf, 0x8, 0x0, 0x4, 0x0, 0x0,
+				0x6, 0x0, 0x39, 0x50, 0x54, 0x45, 0x53, 0x54},
+		},
+		{
 			description: "Twalk fcall",
 			target: &Fcall{
 				Type: Twalk,
blob - 9933b9852590d921bfbd30d5d5611301b45cbabd
blob + 05d7d2c1a99ab94e8695ccc4740dedfda8d0371f
--- fcall.go
+++ fcall.go
@@ -106,20 +106,21 @@ type Fcall struct {
 	Message Message
 }
 
-func (fc Fcall) String() string {
+func newFcall(msg Message) *Fcall {
+	return &Fcall{
+		Type:    msg.Type(),
+		Message: msg,
+	}
+}
+
+func (fc *Fcall) String() string {
 	return fmt.Sprintf("%8d %v(%v) %v", size9p(fc), fc.Type, fc.Tag, fc.Message)
 }
 
 type Message interface {
-	// Size() uint32
-
-	// NOTE(stevvooe): The binary marshal approach isn't particularly nice to
-	// generating garbage. Consider using an append model, once we have the
-	// messages worked out.
-	// encoding.BinaryMarshaler
-	// encoding.BinaryUnmarshaler
-
-	message9p()
+	// Type indicates the Fcall type of the message. This must match
+	// Fcall.Type.
+	Type() FcallType
 }
 
 // newMessage returns a new instance of the message based on the Fcall type.
@@ -127,20 +128,19 @@ func newMessage(typ FcallType) (Message, error) {
 	// NOTE(stevvooe): This is a nasty bit of code but makes the transport
 	// fairly simple to implement.
 	switch typ {
-	case Tversion, Rversion:
-		return &MessageVersion{}, nil
+	case Tversion:
+		return &MessageTversion{}, nil
+	case Rversion:
+		return &MessageRversion{}, nil
 	case Tauth:
 
 	case Rauth:
-
 	case Tattach:
-
+		return &MessageTattach{}, nil
 	case Rattach:
-
-	case Terror:
-
+		return &MessageRattach{}, nil
 	case Rerror:
-
+		return &MessageRerror{}, nil
 	case Tflush:
 		return &MessageTflush{}, nil
 	case Rflush:
@@ -150,9 +150,9 @@ func newMessage(typ FcallType) (Message, error) {
 	case Rwalk:
 		return &MessageRwalk{}, nil
 	case Topen:
-
+		return &MessageTopen{}, nil
 	case Ropen:
-
+		return &MessageRopen{}, nil
 	case Tcreate:
 
 	case Rcreate:
@@ -166,9 +166,9 @@ func newMessage(typ FcallType) (Message, error) {
 	case Rwrite:
 		return &MessageRwrite{}, nil
 	case Tclunk:
-
+		return &MessageTclunk{}, nil
 	case Rclunk:
-
+		return nil, nil // no response body
 	case Tremove:
 
 	case Rremove:
@@ -180,34 +180,31 @@ func newMessage(typ FcallType) (Message, error) {
 	case Twstat:
 
 	case Rwstat:
-	default:
-		return nil, fmt.Errorf("unknown message type: %v", typ)
 
 	}
 
-	return nil, fmt.Errorf("unknown message")
+	return nil, fmt.Errorf("unknown message type")
 }
 
 // MessageVersion encodes the message body for Tversion and Rversion RPC
 // calls. The body is identical in both directions.
-type MessageVersion struct {
+type MessageTversion struct {
 	MSize   uint32
 	Version string
 }
 
-func (MessageVersion) message9p() {}
-
-func (mv MessageVersion) String() string {
-	return fmt.Sprintf("msize=%v version=%v", mv.MSize, mv.Version)
+type MessageRversion struct {
+	MSize   uint32
+	Version string
 }
 
-type MessageTAuth struct {
+type MessageTauth struct {
 	Afid  Fid
 	Uname string
 	Aname string
 }
 
-type MessageRAuth struct {
+type MessageRauth struct {
 	Qid Qid
 }
 
@@ -215,13 +212,10 @@ type MessageRerror struct {
 	Ename string
 }
 
-// MessageTflush handles the content for the Tflush message type.
 type MessageTflush struct {
 	Oldtag Tag
 }
 
-func (MessageTflush) message9p() {}
-
 type MessageTattach struct {
 	Fid   Fid
 	Afid  Fid
@@ -229,42 +223,30 @@ type MessageTattach struct {
 	Aname string
 }
 
-func (MessageTattach) message9p() {}
-
 type MessageRattach struct {
 	Qid Qid
 }
 
-func (MessageRattach) message9p() {}
-
 type MessageTwalk struct {
 	Fid    Fid
 	Newfid Fid
-	Wname  []string
+	Wnames []string
 }
 
-func (MessageTwalk) message9p() {}
-
 type MessageRwalk struct {
 	Qids []Qid
 }
 
-func (MessageRwalk) message9p() {}
-
 type MessageTopen struct {
 	Fid  Fid
 	Mode uint8
 }
 
-func (MessageTopen) message9p() {}
-
 type MessageRopen struct {
 	Qid   Qid
 	Msize uint32
 }
 
-func (MessageRopen) message9p() {}
-
 type MessageTcreate struct {
 	Fid  Fid
 	Name string
@@ -272,43 +254,31 @@ type MessageTcreate struct {
 	Mode uint8
 }
 
-func (MessageTcreate) message9p() {}
-
 type MessageRcreate struct {
 	Qid    Qid
 	IOUnit uint32
 }
 
-func (MessageRcreate) message9p() {}
-
 type MessageTread struct {
 	Fid    Fid
 	Offset uint64
 	Count  uint32
 }
 
-func (MessageTread) message9p() {}
-
 type MessageRread struct {
 	Data []byte
 }
 
-func (MessageRread) message9p() {}
-
 type MessageTwrite struct {
 	Fid    Fid
 	Offset uint64
 	Data   []byte
 }
 
-func (MessageTwrite) message9p() {}
-
 type MessageRwrite struct {
 	Count uint32
 }
 
-func (MessageRwrite) message9p() {}
-
 type MessageTclunk struct {
 	Fid Fid
 }
@@ -325,9 +295,31 @@ type MessageRstat struct {
 	Stat Dir
 }
 
-func (MessageRstat) message9p() {}
-
 type MessageTwstat struct {
 	Fid  Fid
 	Stat Dir
 }
+
+func (MessageTversion) Type() FcallType { return Tversion }
+func (MessageRversion) Type() FcallType { return Rversion }
+func (MessageTauth) Type() FcallType    { return Tauth }
+func (MessageRauth) Type() FcallType    { return Rauth }
+func (MessageRerror) Type() FcallType   { return Rerror }
+func (MessageTflush) Type() FcallType   { return Tflush }
+func (MessageTattach) Type() FcallType  { return Tattach }
+func (MessageRattach) Type() FcallType  { return Rattach }
+func (MessageTwalk) Type() FcallType    { return Twalk }
+func (MessageRwalk) Type() FcallType    { return Rwalk }
+func (MessageTopen) Type() FcallType    { return Topen }
+func (MessageRopen) Type() FcallType    { return Ropen }
+func (MessageTcreate) Type() FcallType  { return Tcreate }
+func (MessageRcreate) Type() FcallType  { return Rcreate }
+func (MessageTread) Type() FcallType    { return Tread }
+func (MessageRread) Type() FcallType    { return Rread }
+func (MessageTwrite) Type() FcallType   { return Twrite }
+func (MessageRwrite) Type() FcallType   { return Rwrite }
+func (MessageTclunk) Type() FcallType   { return Tclunk }
+func (MessageTremove) Type() FcallType  { return Tremove }
+func (MessageTstat) Type() FcallType    { return Tstat }
+func (MessageRstat) Type() FcallType    { return Rstat }
+func (MessageTwstat) Type() FcallType   { return Twstat }
blob - 19ec9f4039516b5fcebf2abc647d96a425cc374f
blob + dce6821112555e7c268f22d5c2cbaa952dd1e991
--- server.go
+++ server.go
@@ -1,8 +1,8 @@
-// +build ignore
-
 package p9pnew
 
 import (
+	"bufio"
+	"fmt"
 	"log"
 	"net"
 	"time"
@@ -11,44 +11,138 @@ import (
 )
 
 // Serve the 9p session over the provided network connection.
-func Serve(ctx context.Context, conn net.Conn, session Session) error {
-	panic("not implemented")
+func Serve(ctx context.Context, conn net.Conn, session Session) {
+	s := &server{
+		ctx:     ctx,
+		conn:    conn,
+		session: session,
+	}
+
+	s.run()
 }
 
 type server struct {
 	ctx     context.Context
 	session Session
 	conn    net.Conn
+	closed  chan struct{}
 }
 
 func (s *server) run() {
-	dec := decoder{s.conn}
+	brd := bufio.NewReader(s.conn)
+	dec := &decoder{brd}
+	bwr := bufio.NewWriter(s.conn)
+	enc := &encoder{bwr}
 
-	fcall := new(Fcall)
-	if err := dec.decode(fcall); err != nil {
-		log.Println(err)
-	}
+	tags := map[Tag]*Fcall{} // active requests
 
+	log.Println("server.run()")
+	for {
+		select {
+		case <-s.ctx.Done():
+			log.Println("server: shutdown")
+			return
+		case <-s.closed:
+		default:
+		}
+
+		// NOTE(stevvooe): For now, we only provide a single request at a time
+		// handler. We can refactor this to take requests off the wire as
+		// quickly as they arrive and dispatch in parallel to session.
+
+		log.Println("server:", "wait")
+		fcall := new(Fcall)
+		if err := dec.decode(fcall); err != nil {
+			log.Println("server decoding fcall:", err)
+			continue
+		}
+
+		log.Println("server:", "message", fcall)
+
+		if _, ok := tags[fcall.Tag]; ok {
+			if err := enc.encode(&Fcall{
+				Type: Rerror,
+				Tag:  fcall.Tag,
+				Message: &MessageRerror{
+					Ename: ErrDuptag.Error(),
+				},
+			}); err != nil {
+				log.Println("server:", err)
+			}
+			bwr.Flush()
+			continue
+		}
+		tags[fcall.Tag] = fcall
+
+		resp, err := s.handle(fcall)
+		if err != nil {
+			log.Println("server:", err)
+			continue
+		}
+
+		if err := enc.encode(resp); err != nil {
+			log.Println("server:", err)
+			continue
+		}
+		bwr.Flush()
+
+	}
 }
 
 // handle responds to an fcall using the session. An error is only returned if
 // the handler cannot proceed. All session errors are returned as Rerror.
-func (s *server) handle(f *Fcall) (*Fcall, error) {
+func (s *server) handle(req *Fcall) (*Fcall, error) {
 	const timeout = 30 * time.Second // TODO(stevvooe): Allow this to be configured.
-	ctx, cancel = context.WithTimeout(s.ctx, timeout)
+	ctx, cancel := context.WithTimeout(s.ctx, timeout)
 	defer cancel()
 
-	switch fcall.Type {
+	var resp *Fcall
+	switch req.Type {
 	case Tattach:
-		atc, ok := fcall.Message.(*MessageTattach)
-		if ok {
-			log.Println("bad message")
-			continue
+		reqmsg, ok := req.Message.(*MessageTattach)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
 		}
 
-		qid, err := s.session.Attach(s.ctx, atc.Fid, atc.Afid, atc.Uname, atc.Aname)
+		qid, err := s.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname)
 		if err != nil {
-			return
+			return nil, err
 		}
+
+		resp = &Fcall{
+			Type: Rattach,
+			Tag:  req.Tag,
+			Message: &MessageRattach{
+				Qid: qid,
+			},
+		}
+	case Twalk:
+		reqmsg, ok := req.Message.(*MessageTwalk)
+		if !ok {
+			return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
+		}
+
+		// TODO(stevvooe): This is one of the places where we need to manage
+		// fid allocation lifecycle. We need to reserve the fid, then, if this
+		// call succeeds, we should alloc the fid for future uses. Also need
+		// to interact correctly with concurrent clunk and the flush of this
+		// walk message.
+		qids, err := s.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...)
+		if err != nil {
+			return nil, err
+		}
+
+		resp = newFcall(&MessageRwalk{
+			Qids: qids,
+		})
 	}
+
+	if resp == nil {
+		resp = newFcall(&MessageRerror{
+			Ename: "unknown message type",
+		})
+	}
+
+	resp.Tag = req.Tag
+	return resp, nil
 }
blob - 9eac603c2d0f52f707d8bf18e975a11f2c712207
blob + 37779b341b219f43c948e22ba0287ac579e16625
--- session.go
+++ session.go
@@ -25,7 +25,7 @@ type Session interface {
 	Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error)
 	Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error)
 	Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error)
-	Open(ctx context.Context, fid Fid, mode int32) (Qid, error)
+	Open(ctx context.Context, fid Fid, mode uint8) (Qid, uint32, error)
 	Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error)
 	Stat(context.Context, Fid) (Dir, error)
 	WStat(context.Context, Fid, Dir) error
blob - 4bea0b041f5c0da62d20e12af577067a28bf09e8
blob + ab572f197ea3d78f7f9b4c37955a30121793f3fa
--- tags.go
+++ tags.go
@@ -57,11 +57,11 @@ func (tp *tagPool) Close() error {
 
 // NewtagPool returns a tag pool with the maximum number of outstanding
 // requests.
-func newTagPool(outstanding int) (*tagPool, error) {
+func newTagPool(outstanding int) *tagPool {
 	return &tagPool{
 		maximum:  Tag(outstanding),
 		freelist: make(chan Tag, outstanding),
 		nexttag:  make(chan Tag),
 		closed:   make(chan struct{}),
-	}, nil
+	}
 }
blob - /dev/null
blob + 82c3b4b965eb8eeaaa15ad008456f41a203c091c (mode 644)
--- /dev/null
+++ transport.go
@@ -0,0 +1,212 @@
+package p9pnew
+
+import (
+	"bufio"
+	"fmt"
+	"log"
+	"net"
+	"os"
+	"time"
+
+	"golang.org/x/net/context"
+)
+
+// roundTripper manages the request and response from the client-side. A
+// roundTripper must abide by many of the rules of http.RoundTripper.
+// Typically, the roundTripper will manage tag assignment and message
+// serialization.
+type roundTripper interface {
+	send(ctx context.Context, fc *Fcall) (*Fcall, error)
+}
+
+type transport struct {
+	ctx      context.Context
+	conn     net.Conn
+	requests chan *fcallRequest
+	closed   chan struct{}
+
+	tags uint16
+}
+
+func newTransport(ctx context.Context, conn net.Conn) roundTripper {
+	t := &transport{
+		ctx:      ctx,
+		conn:     conn,
+		requests: make(chan *fcallRequest),
+		closed:   make(chan struct{}),
+	}
+
+	go t.handle()
+
+	return t
+}
+
+type fcallRequest struct {
+	ctx      context.Context
+	fcall    *Fcall
+	response chan *Fcall
+	err      chan error
+}
+
+func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest {
+	return &fcallRequest{
+		ctx:      ctx,
+		fcall:    fcall,
+		response: make(chan *Fcall, 1),
+		err:      make(chan error, 1),
+	}
+}
+
+func (t *transport) send(ctx context.Context, fcall *Fcall) (*Fcall, error) {
+
+	req := newFcallRequest(ctx, fcall)
+
+	log.Println("dispatch", fcall)
+	// dispatch the request.
+	select {
+	case <-t.closed:
+		return nil, ErrClosed
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case t.requests <- req:
+	}
+
+	log.Println("wait", fcall)
+	// wait for the response.
+	select {
+	case <-t.closed:
+		return nil, ErrClosed
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case resp := <-req.response:
+		return resp, nil
+	}
+}
+
+// handle takes messages off the wire and wakes up the waiting tag call.
+func (t *transport) handle() {
+
+	// the following variable block are protected components owned by this thread.
+	var (
+		responses = make(chan *Fcall)
+		tags      Tag
+		// outstanding provides a map of tags to outstanding requests.
+		outstanding = map[Tag]*fcallRequest{}
+		brd         = bufio.NewReader(t.conn)
+		bwr         = bufio.NewWriter(t.conn)
+		enc         = &encoder{bwr}
+		dec         = &decoder{brd}
+	)
+
+	// loop to read messages off of the connection
+	go func() {
+
+	loop:
+		for {
+			const pump = time.Second
+
+			// Continuously set the read dead line pump the loop below. We can
+			// probably set a connection dead threshold that can count these.
+			// Usually, this would only matter when there are actually
+			// outstanding requests.
+			deadline, ok := t.ctx.Deadline()
+			if !ok {
+				deadline = time.Now().Add(pump)
+			} else {
+				// if the deadline is before
+				nd := time.Now().Add(pump)
+				if nd.Before(deadline) {
+					deadline = nd
+				}
+			}
+
+			if err := t.conn.SetReadDeadline(deadline); err != nil {
+				log.Printf("error setting read deadline: %v", err)
+			}
+
+			fc := new(Fcall)
+			if err := dec.decode(fc); err != nil {
+				switch err := err.(type) {
+				case net.Error:
+					if err.Timeout() || err.Temporary() {
+						break loop
+					}
+				}
+
+				panic(fmt.Sprintf("connection read error: %v", err))
+			}
+
+			select {
+			case <-t.ctx.Done():
+				return
+			case <-t.closed:
+				return
+			case responses <- fc:
+			}
+		}
+	}()
+
+	for {
+		select {
+		case req := <-t.requests:
+			tags++
+			req.fcall.Tag = tags
+			outstanding[req.fcall.Tag] = req
+
+			pretty9p(os.Stdout, req.fcall)
+			// use deadline to set write deadline for this request.
+			deadline, ok := req.ctx.Deadline()
+			if !ok {
+				deadline = time.Now().Add(time.Second)
+			}
+
+			if err := t.conn.SetWriteDeadline(deadline); err != nil {
+				log.Printf("error setting write deadline: %v", err)
+			}
+
+			// TODO(stevvooe): Consider the case of requests that never
+			// receive a response. We need to remove the fcall context from
+			// the tag map and dealloc the tag. We may also want to send a
+			// flush for the tag.
+
+			log.Println("send", req.fcall)
+			if err := enc.encode(req.fcall); err != nil {
+				delete(outstanding, req.fcall.Tag)
+				req.err <- err
+			}
+			if err := bwr.Flush(); err != nil {
+				delete(outstanding, req.fcall.Tag)
+				req.err <- err
+			}
+
+			log.Println("sent", req.fcall)
+		case b := <-responses:
+			req, ok := outstanding[b.Tag]
+			if !ok {
+				panic("unknown tag received")
+			}
+			delete(outstanding, req.fcall.Tag)
+
+			req.response <- b
+
+			// TODO(stevvooe): Reclaim tag id.
+		case <-t.ctx.Done():
+			return
+		case <-t.closed:
+			return
+		}
+	}
+}
+
+func (t *transport) Close() error {
+	select {
+	case <-t.closed:
+		return ErrClosed
+	case <-t.ctx.Done():
+		return t.ctx.Err()
+	default:
+		close(t.closed)
+	}
+
+	return nil
+}