Commit Diff


commit - d6198009fad10d2c6e0819feb0fa20608628dbcf
commit + 5f1e81058a16f2139f47022131a02ea597e003cc
blob - 5d526a8dcd50d0bf26806e5a1c3e3521a209b8b2
blob + acb4b04fc8d42393ecdd5d8c7444c95354fb2144
--- client.go
+++ client.go
@@ -12,15 +12,19 @@ import (
 )
 
 type client struct {
+	ctx      context.Context
 	conn     net.Conn
 	tags     *tagPool
 	requests chan fcallRequest
 	closed   chan struct{}
 }
 
-// NewSession returns a session using the connection.
-func NewSession(conn net.Conn) (Session, error) {
+// NewSession returns a session using the connection. The Context ctx provides
+// a context for out of bad messages, such as flushes, that may be sent by the
+// session. The session can effectively shutdown with this context.
+func NewSession(ctx context.Context, conn net.Conn) (Session, error) {
 	return &client{
+		ctx:  ctx,
 		conn: conn,
 	}, nil
 }
@@ -32,7 +36,27 @@ func (c *client) Auth(ctx context.Context, afid Fid, u
 }
 
 func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) {
-	panic("not implemented")
+	fcall := &Fcall{
+		Type: Tattach,
+		Message: &MessageTattach{
+			Fid:   fid,
+			Afid:  afid,
+			Uname: uname,
+			Aname: aname,
+		},
+	}
+
+	resp, err := c.send(ctx, fcall)
+	if err != nil {
+		return Qid{}, err
+	}
+
+	mrr, ok := resp.Message.(*MessageRattach)
+	if !ok {
+		return Qid{}, fmt.Errorf("invalid rpc response for attach message: %v", resp)
+	}
+
+	return mrr.Qid, nil
 }
 
 func (c *client) Clunk(ctx context.Context, fid Fid) error {
@@ -44,15 +68,83 @@ func (c *client) Remove(ctx context.Context, fid Fid) 
 }
 
 func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) {
-	panic("not implemented")
+	if len(names) > 16 {
+		// TODO(stevvooe): Implement multi-message handling for more than 16
+		// wnames. May want to actually force caller to implement this since
+		// we'll need a new fid for each RPC.
+		panic("more than 16 components not implemented")
+	}
+
+	fcall := &Fcall{
+		Type: Twalk,
+		Message: &MessageTwalk{
+			Fid:    fid,
+			Newfid: newfid,
+			Wname:  names,
+		},
+	}
+
+	resp, err := c.send(ctx, fcall)
+	if err != nil {
+		return nil, err
+	}
+
+	mrr, ok := resp.Message.(*MessageRwalk)
+	if !ok {
+		return nil, fmt.Errorf("invalid rpc response for walk message: %v", resp)
+	}
+
+	return mrr.Qids, nil
 }
 
 func (c *client) Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) {
-	panic("not implemented")
+	// TODO(stevvooe): Split up reads into multiple messages based on iounit.
+	// For now, we just support full blast. I mean, why not?
+	fcall := &Fcall{
+		Type: Tread,
+		Message: &MessageTread{
+			Fid:    fid,
+			Offset: uint64(offset),
+			Count:  uint32(len(p)),
+		},
+	}
+
+	resp, err := c.send(ctx, fcall)
+	if err != nil {
+		return 0, err
+	}
+
+	mrr, ok := resp.Message.(*MessageRread)
+	if !ok {
+		return 0, fmt.Errorf("invalid rpc response for read message: %v", resp)
+	}
+
+	return copy(p, mrr.Data), nil
 }
 
 func (c *client) Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) {
-	panic("not implemented")
+	// TODO(stevvooe): Split up writes into multiple messages based on iounit.
+	// For now, we just support full blast. I mean, why not?
+	fcall := &Fcall{
+		Type: Twrite,
+		Message: &MessageTwrite{
+			Fid:    fid,
+			Offset: uint64(offset),
+			Data:   p,
+		},
+	}
+
+	resp, err := c.send(ctx, fcall)
+	if err != nil {
+		return 0, err
+	}
+
+	mrr, ok := resp.Message.(MessageRwrite)
+	if !ok {
+		return 0, fmt.Errorf("invalid rpc response for read message: %v", resp)
+	}
+
+	return int(mrr.Count), nil
 }
 
 func (c *client) Open(ctx context.Context, fid Fid, mode int32) (Qid, error) {
@@ -90,6 +182,9 @@ func (c *client) Version(ctx context.Context, msize ui
 		return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
 	}
 
+	// TODO(stevvooe): Use this response to set iounit and version on this
+	// client instance.
+
 	return mv.MSize, mv.Version, nil
 }
 
@@ -157,11 +252,24 @@ func (c *client) handle() {
 
 	loop:
 		for {
+			const pump = time.Second
+
 			// Continuously set the read dead line pump the loop below. We can
 			// probably set a connection dead threshold that can count these.
 			// Usually, this would only matter when there are actually
 			// outstanding requests.
-			if err := c.conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
+			deadline, ok := c.ctx.Deadline()
+			if !ok {
+				deadline = time.Now().Add(pump)
+			} else {
+				// if the deadline is before
+				nd := time.Now().Add(pump)
+				if nd.Before(deadline) {
+					deadline = nd
+				}
+			}
+
+			if err := c.conn.SetReadDeadline(deadline); err != nil {
 				panic(fmt.Sprintf("error setting read deadline: %v", err))
 			}
 
@@ -178,6 +286,8 @@ func (c *client) handle() {
 			}
 
 			select {
+			case <-c.ctx.Done():
+				return
 			case <-c.closed:
 				return
 			case responses <- fc:
@@ -190,6 +300,8 @@ func (c *client) handle() {
 
 	for {
 		select {
+		case <-c.ctx.Done():
+			return
 		case <-c.closed:
 			return
 		case req := <-c.requests:
blob - a984f15e43f90d71c7c82e281df8cf2cbeb640ee
blob + c33ca277646297a1dd3badf83aca03b187eb9c7a
--- encoding.go
+++ encoding.go
@@ -4,7 +4,6 @@ import (
 	"encoding/binary"
 	"fmt"
 	"io"
-	"log"
 	"reflect"
 	"time"
 )
@@ -117,7 +116,6 @@ type decoder struct {
 // read9p extracts values from rd and unmarshals them to the targets of vs.
 func (d *decoder) decode(vs ...interface{}) error {
 	for _, v := range vs {
-		before := fmt.Sprintf("%#v", v)
 		switch v := v.(type) {
 		case *string:
 			var ll uint16
@@ -219,7 +217,6 @@ func (d *decoder) decode(vs ...interface{}) error {
 				return err
 			}
 		}
-		log.Printf("Decode: %v -> %#v", before, v)
 	}
 
 	return nil
blob - 6dfb0d4b0a2d2b11bb9074dafa15a299b06f0777
blob + 9933b9852590d921bfbd30d5d5611301b45cbabd
--- fcall.go
+++ fcall.go
@@ -158,13 +158,13 @@ func newMessage(typ FcallType) (Message, error) {
 	case Rcreate:
 
 	case Tread:
-
+		return &MessageTread{}, nil
 	case Rread:
 		return &MessageRread{}, nil
 	case Twrite:
-
+		return &MessageTwrite{}, nil
 	case Rwrite:
-
+		return &MessageRwrite{}, nil
 	case Tclunk:
 
 	case Rclunk:
@@ -229,10 +229,14 @@ type MessageTattach struct {
 	Aname string
 }
 
+func (MessageTattach) message9p() {}
+
 type MessageRattach struct {
 	Qid Qid
 }
 
+func (MessageRattach) message9p() {}
+
 type MessageTwalk struct {
 	Fid    Fid
 	Newfid Fid
blob - 032f07f44d7148b2ac5d6755d3afae997b5606f5
blob + 9eac603c2d0f52f707d8bf18e975a11f2c712207
--- session.go
+++ session.go
@@ -36,7 +36,7 @@ type Session interface {
 	Version(ctx context.Context, msize uint32, version string) (uint32, string, error)
 }
 
-func Dial(addr string) (Session, error) {
+func Dial(ctx context.Context, addr string) (Session, error) {
 	c, err := net.Dial("tcp", addr)
 	if err != nil {
 		return nil, err
@@ -45,5 +45,5 @@ func Dial(addr string) (Session, error) {
 	// BUG(stevvooe): Session doesn't actually close connection. Dial might
 	// not be the right interface.
 
-	return NewSession(c)
+	return NewSession(ctx, c)
 }