commit 5f1e81058a16f2139f47022131a02ea597e003cc from: Stephen J Day date: Wed Oct 28 01:35:47 2015 UTC fs/p9p/new: implement several client methods Adds in stub implementations for several client methods, checking to see we have correct compilation of the various message types. We also add context support to control the lifecycle of the session object. We are getting closer to a working client and it is time to make a server! Signed-off-by: Stephen J Day commit - d6198009fad10d2c6e0819feb0fa20608628dbcf commit + 5f1e81058a16f2139f47022131a02ea597e003cc blob - 5d526a8dcd50d0bf26806e5a1c3e3521a209b8b2 blob + acb4b04fc8d42393ecdd5d8c7444c95354fb2144 --- client.go +++ client.go @@ -12,15 +12,19 @@ import ( ) type client struct { + ctx context.Context conn net.Conn tags *tagPool requests chan fcallRequest closed chan struct{} } -// NewSession returns a session using the connection. -func NewSession(conn net.Conn) (Session, error) { +// NewSession returns a session using the connection. The Context ctx provides +// a context for out of bad messages, such as flushes, that may be sent by the +// session. The session can effectively shutdown with this context. +func NewSession(ctx context.Context, conn net.Conn) (Session, error) { return &client{ + ctx: ctx, conn: conn, }, nil } @@ -32,7 +36,27 @@ func (c *client) Auth(ctx context.Context, afid Fid, u } func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) { - panic("not implemented") + fcall := &Fcall{ + Type: Tattach, + Message: &MessageTattach{ + Fid: fid, + Afid: afid, + Uname: uname, + Aname: aname, + }, + } + + resp, err := c.send(ctx, fcall) + if err != nil { + return Qid{}, err + } + + mrr, ok := resp.Message.(*MessageRattach) + if !ok { + return Qid{}, fmt.Errorf("invalid rpc response for attach message: %v", resp) + } + + return mrr.Qid, nil } func (c *client) Clunk(ctx context.Context, fid Fid) error { @@ -44,15 +68,83 @@ func (c *client) Remove(ctx context.Context, fid Fid) } func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) { - panic("not implemented") + if len(names) > 16 { + // TODO(stevvooe): Implement multi-message handling for more than 16 + // wnames. May want to actually force caller to implement this since + // we'll need a new fid for each RPC. + panic("more than 16 components not implemented") + } + + fcall := &Fcall{ + Type: Twalk, + Message: &MessageTwalk{ + Fid: fid, + Newfid: newfid, + Wname: names, + }, + } + + resp, err := c.send(ctx, fcall) + if err != nil { + return nil, err + } + + mrr, ok := resp.Message.(*MessageRwalk) + if !ok { + return nil, fmt.Errorf("invalid rpc response for walk message: %v", resp) + } + + return mrr.Qids, nil } func (c *client) Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) { - panic("not implemented") + // TODO(stevvooe): Split up reads into multiple messages based on iounit. + // For now, we just support full blast. I mean, why not? + fcall := &Fcall{ + Type: Tread, + Message: &MessageTread{ + Fid: fid, + Offset: uint64(offset), + Count: uint32(len(p)), + }, + } + + resp, err := c.send(ctx, fcall) + if err != nil { + return 0, err + } + + mrr, ok := resp.Message.(*MessageRread) + if !ok { + return 0, fmt.Errorf("invalid rpc response for read message: %v", resp) + } + + return copy(p, mrr.Data), nil } func (c *client) Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) { - panic("not implemented") + // TODO(stevvooe): Split up writes into multiple messages based on iounit. + // For now, we just support full blast. I mean, why not? + fcall := &Fcall{ + Type: Twrite, + Message: &MessageTwrite{ + Fid: fid, + Offset: uint64(offset), + Data: p, + }, + } + + resp, err := c.send(ctx, fcall) + if err != nil { + return 0, err + } + + mrr, ok := resp.Message.(MessageRwrite) + if !ok { + return 0, fmt.Errorf("invalid rpc response for read message: %v", resp) + } + + return int(mrr.Count), nil } func (c *client) Open(ctx context.Context, fid Fid, mode int32) (Qid, error) { @@ -90,6 +182,9 @@ func (c *client) Version(ctx context.Context, msize ui return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp) } + // TODO(stevvooe): Use this response to set iounit and version on this + // client instance. + return mv.MSize, mv.Version, nil } @@ -157,11 +252,24 @@ func (c *client) handle() { loop: for { + const pump = time.Second + // Continuously set the read dead line pump the loop below. We can // probably set a connection dead threshold that can count these. // Usually, this would only matter when there are actually // outstanding requests. - if err := c.conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil { + deadline, ok := c.ctx.Deadline() + if !ok { + deadline = time.Now().Add(pump) + } else { + // if the deadline is before + nd := time.Now().Add(pump) + if nd.Before(deadline) { + deadline = nd + } + } + + if err := c.conn.SetReadDeadline(deadline); err != nil { panic(fmt.Sprintf("error setting read deadline: %v", err)) } @@ -178,6 +286,8 @@ func (c *client) handle() { } select { + case <-c.ctx.Done(): + return case <-c.closed: return case responses <- fc: @@ -190,6 +300,8 @@ func (c *client) handle() { for { select { + case <-c.ctx.Done(): + return case <-c.closed: return case req := <-c.requests: blob - a984f15e43f90d71c7c82e281df8cf2cbeb640ee blob + c33ca277646297a1dd3badf83aca03b187eb9c7a --- encoding.go +++ encoding.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "fmt" "io" - "log" "reflect" "time" ) @@ -117,7 +116,6 @@ type decoder struct { // read9p extracts values from rd and unmarshals them to the targets of vs. func (d *decoder) decode(vs ...interface{}) error { for _, v := range vs { - before := fmt.Sprintf("%#v", v) switch v := v.(type) { case *string: var ll uint16 @@ -219,7 +217,6 @@ func (d *decoder) decode(vs ...interface{}) error { return err } } - log.Printf("Decode: %v -> %#v", before, v) } return nil blob - 6dfb0d4b0a2d2b11bb9074dafa15a299b06f0777 blob + 9933b9852590d921bfbd30d5d5611301b45cbabd --- fcall.go +++ fcall.go @@ -158,13 +158,13 @@ func newMessage(typ FcallType) (Message, error) { case Rcreate: case Tread: - + return &MessageTread{}, nil case Rread: return &MessageRread{}, nil case Twrite: - + return &MessageTwrite{}, nil case Rwrite: - + return &MessageRwrite{}, nil case Tclunk: case Rclunk: @@ -229,10 +229,14 @@ type MessageTattach struct { Aname string } +func (MessageTattach) message9p() {} + type MessageRattach struct { Qid Qid } +func (MessageRattach) message9p() {} + type MessageTwalk struct { Fid Fid Newfid Fid blob - 032f07f44d7148b2ac5d6755d3afae997b5606f5 blob + 9eac603c2d0f52f707d8bf18e975a11f2c712207 --- session.go +++ session.go @@ -36,7 +36,7 @@ type Session interface { Version(ctx context.Context, msize uint32, version string) (uint32, string, error) } -func Dial(addr string) (Session, error) { +func Dial(ctx context.Context, addr string) (Session, error) { c, err := net.Dial("tcp", addr) if err != nil { return nil, err @@ -45,5 +45,5 @@ func Dial(addr string) (Session, error) { // BUG(stevvooe): Session doesn't actually close connection. Dial might // not be the right interface. - return NewSession(c) + return NewSession(ctx, c) }