Commit Diff


commit - fb37ce2aa1f717002944b5aec393b3318e2e7261
commit + a8abc68745cebc0f92ba31a589e2f6f33ce215a9
blob - 9de03f0d26e4cad75685fe0929ef3cc725232ec9
blob + d6e65d033474011bd20818b6180c6152fe523906
--- channel.go
+++ channel.go
@@ -8,10 +8,15 @@ import (
 	"io/ioutil"
 	"log"
 	"net"
+	"time"
 
 	"golang.org/x/net/context"
 )
 
+const (
+	defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context
+)
+
 // channel provides bidirectional protocol framing for 9p over net.Conn.
 // Operations are not thread-safe but reads and writes may be carried out
 // concurrently, supporting separate read and write loops.
@@ -54,6 +59,32 @@ func (ch *channel) setmsize(msize int) {
 	ch.wrbuf = make([]byte, msize)
 }
 
+// version negiotiates the protocol version using channel, blocking until a
+// response is received. The received values can be used to set msize for the
+// channel or assist in client setup.
+func (ch *channel) version(ctx context.Context, msize uint32, version string) (uint32, string, error) {
+	req := newFcall(MessageTversion{
+		MSize:   uint32(msize),
+		Version: version,
+	})
+
+	if err := ch.writeFcall(ctx, req); err != nil {
+		return 0, "", err
+	}
+
+	resp := new(Fcall)
+	if err := ch.readFcall(ctx, resp); err != nil {
+		return 0, "", err
+	}
+
+	mv, ok := resp.Message.(*MessageRversion)
+	if !ok {
+		return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
+	}
+
+	return mv.MSize, mv.Version, nil
+}
+
 // ReadFcall reads the next message from the channel into fcall.
 func (ch *channel) readFcall(ctx context.Context, fcall *Fcall) error {
 	select {
@@ -61,14 +92,16 @@ func (ch *channel) readFcall(ctx context.Context, fcal
 		return ErrClosed
 	default:
 	}
-	log.Println("channel: readFcall", fcall)
 
-	if deadline, ok := ctx.Deadline(); ok {
-		if err := ch.conn.SetReadDeadline(deadline); err != nil {
-			log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
-		}
+	deadline, ok := ctx.Deadline()
+	if !ok {
+		deadline = time.Now().Add(defaultRWTimeout)
 	}
 
+	if err := ch.conn.SetReadDeadline(deadline); err != nil {
+		log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
+	}
+
 	n, err := readmsg(ch.brd, ch.rdbuf)
 	if err != nil {
 		// TODO(stevvooe): There may be more we can do here to detect partial
@@ -82,6 +115,7 @@ func (ch *channel) readFcall(ctx context.Context, fcal
 		return fmt.Errorf("message large than buffer:", n)
 	}
 
+	log.Println("channel: readFcall", fcall)
 	return ch.codec.Unmarshal(ch.rdbuf[:n], fcall)
 }
 
@@ -93,12 +127,15 @@ func (ch *channel) writeFcall(ctx context.Context, fca
 	}
 	log.Println("channel: writeFcall", fcall)
 
-	if deadline, ok := ctx.Deadline(); ok {
-		if err := ch.conn.SetWriteDeadline(deadline); err != nil {
-			log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
-		}
+	deadline, ok := ctx.Deadline()
+	if !ok {
+		deadline = time.Now().Add(defaultRWTimeout)
 	}
 
+	if err := ch.conn.SetWriteDeadline(deadline); err != nil {
+		log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
+	}
+
 	n, err := ch.codec.Marshal(ch.wrbuf, fcall)
 	if err != nil {
 		return err
@@ -150,6 +187,7 @@ func readmsg(rd io.Reader, p []byte) (n int, err error
 		}
 	}
 
+	log.Println("msg", n, msize, mbody, len(p), p)
 	return n, nil
 }
 
blob - 30c2205d3c5f55a717ead6a71e823d1445b868fa
blob + 4a1ab7014946f4804f93da8f94cc92ba0d4d27a4
--- client.go
+++ client.go
@@ -26,7 +26,7 @@ func NewSession(ctx context.Context, conn net.Conn) (S
 	ch := newChannel(conn, codec9p{}, msize) // sets msize, effectively.
 
 	// negotiate the protocol version
-	smsize, svers, err := version(ctx, ch, msize, vers)
+	smsize, svers, err := ch.version(ctx, msize, vers)
 	if err != nil {
 		return nil, err
 	}
@@ -219,28 +219,3 @@ func (c *client) flush(ctx context.Context, tag Tag) e
 
 	panic("not implemented")
 }
-
-// version negiotiates the protocol version using channel, blocking until a
-// response is received. This should be called before starting the transport.
-func version(ctx context.Context, ch *channel, msize uint32, version string) (uint32, string, error) {
-	req := newFcall(MessageTversion{
-		MSize:   uint32(msize),
-		Version: version,
-	})
-
-	if err := ch.writeFcall(ctx, req); err != nil {
-		return 0, "", err
-	}
-
-	resp := new(Fcall)
-	if err := ch.readFcall(ctx, resp); err != nil {
-		return 0, "", err
-	}
-
-	mv, ok := resp.Message.(*MessageRversion)
-	if !ok {
-		return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
-	}
-
-	return mv.MSize, mv.Version, nil
-}
blob - 1db7807e7ff9d6b0336ace83a50ede2453e8270a
blob + 9c73f8688214ada8d9cd4d8faf0ac0083b926f72
--- cmd/9pr/main.go
+++ cmd/9pr/main.go
@@ -176,13 +176,18 @@ func (c *fsCommander) cmdls(ctx context.Context, args 
 		}
 		defer c.session.Clunk(ctx, targetfid)
 
-		_, msize, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
+		qid, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
 		if err != nil {
 			return err
 		}
 
-		p := make([]byte, msize)
+		if iounit < 1 {
+			msize, _ := c.session.Version()
+			iounit = msize - 24 // size of message max minus fcall io header (Rread)
+		}
 
+		p := make([]byte, iounit)
+
 		n, err := c.session.Read(ctx, targetfid, p, 0)
 		if err != nil {
 			return err
@@ -269,19 +274,19 @@ func (c *fsCommander) cmdcat(ctx context.Context, args
 	targetfid := c.nextfid
 	c.nextfid++
 	components := strings.Split(strings.TrimSpace(strings.Trim(p, "/")), "/")
-	if _, err := c.session.Walk(c.ctx, c.rootfid, targetfid, components...); err != nil {
+	if _, err := c.session.Walk(ctx, c.rootfid, targetfid, components...); err != nil {
 		return err
 	}
-	defer c.session.Clunk(c.ctx, c.pwdfid)
+	defer c.session.Clunk(ctx, c.pwdfid)
 
-	_, msize, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD)
+	_, msize, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
 	if err != nil {
 		return err
 	}
 
 	b := make([]byte, msize)
 
-	n, err := c.session.Read(c.ctx, targetfid, b, 0)
+	n, err := c.session.Read(ctx, targetfid, b, 0)
 	if err != nil {
 		return err
 	}
blob - 10d52740dedc0b060356f11de11880aee4f26b6b
blob + 165a918e4c7c269d32ed20dab90b79e57a3aa8e1
--- encoding.go
+++ encoding.go
@@ -83,7 +83,7 @@ func (e *encoder) encode(vs ...interface{}) error {
 				return err
 			}
 		case *[]byte:
-			if err := e.encode(uint16(len(*v))); err != nil {
+			if err := e.encode(uint32(len(*v))); err != nil {
 				return err
 			}
 
@@ -181,7 +181,7 @@ func (d *decoder) decode(vs ...interface{}) error {
 
 			n, err := io.ReadFull(d.rd, b)
 			if err != nil {
-				log.Println("readfull failed:", err)
+				log.Println("readfull failed:", err, ll, n)
 				return err
 			}
 
@@ -207,7 +207,7 @@ func (d *decoder) decode(vs ...interface{}) error {
 				return err
 			}
 		case *[]byte:
-			var ll uint16
+			var ll uint32
 
 			if err := d.decode(&ll); err != nil {
 				return err
@@ -259,12 +259,37 @@ func (d *decoder) decode(vs ...interface{}) error {
 			}
 
 			*v = time.Unix(int64(epoch), 0).UTC()
-		case Message, *Qid, *Dir:
+		case *Dir:
+			var ll uint16
+
+			if err := d.decode(&ll); err != nil {
+				return err
+			}
+
+			b := make([]byte, ll)
+			// must consume entire dir entry.
+			n, err := io.ReadFull(d.rd, b)
+			if err != nil {
+				log.Println("dir readfull failed:", err, ll, n)
+				return err
+			}
+
 			elements, err := fields9p(v)
 			if err != nil {
 				return err
 			}
 
+			dec := &decoder{bytes.NewReader(b)}
+
+			if err := dec.decode(elements...); err != nil {
+				return err
+			}
+		case Message, *Qid:
+			elements, err := fields9p(v)
+			if err != nil {
+				return err
+			}
+
 			if err := d.decode(elements...); err != nil {
 				return err
 			}
blob - 4b25bc02d50aef020b58e0cb0ca06ea43cedede0
blob + f7b340c21172f8a3c4fa08da574a2301fc5d9910
--- transport.go
+++ transport.go
@@ -74,6 +74,8 @@ func (t *transport) send(ctx context.Context, fcall *F
 		return nil, ErrClosed
 	case <-ctx.Done():
 		return nil, ctx.Err()
+	case err := <-req.err:
+		return nil, err
 	case resp := <-req.response:
 		log.Println("resp", resp)
 		if resp.Type == Rerror {
@@ -92,7 +94,10 @@ func (t *transport) send(ctx context.Context, fcall *F
 
 // handle takes messages off the wire and wakes up the waiting tag call.
 func (t *transport) handle() {
-
+	defer func() {
+		log.Println("exited handle loop")
+		close(t.closed)
+	}()
 	// the following variable block are protected components owned by this thread.
 	var (
 		responses = make(chan *Fcall)
@@ -140,6 +145,7 @@ func (t *transport) handle() {
 	}()
 
 	for {
+		log.Println("wait...")
 		select {
 		case req := <-t.requests:
 
@@ -162,6 +168,7 @@ func (t *transport) handle() {
 			// the tag map and dealloc the tag. We may also want to send a
 			// flush for the tag.
 			if err := t.ch.writeFcall(req.ctx, req.fcall); err != nil {
+				log.Println("error writing fcall", err, req.fcall)
 				delete(outstanding, req.fcall.Tag)
 				req.err <- err
 			}