commit - d6198009fad10d2c6e0819feb0fa20608628dbcf
commit + 5f1e81058a16f2139f47022131a02ea597e003cc
blob - 5d526a8dcd50d0bf26806e5a1c3e3521a209b8b2
blob + acb4b04fc8d42393ecdd5d8c7444c95354fb2144
--- client.go
+++ client.go
)
type client struct {
+ ctx context.Context
conn net.Conn
tags *tagPool
requests chan fcallRequest
closed chan struct{}
}
-// NewSession returns a session using the connection.
-func NewSession(conn net.Conn) (Session, error) {
+// NewSession returns a session using the connection. The Context ctx provides
+// a context for out of bad messages, such as flushes, that may be sent by the
+// session. The session can effectively shutdown with this context.
+func NewSession(ctx context.Context, conn net.Conn) (Session, error) {
return &client{
+ ctx: ctx,
conn: conn,
}, nil
}
}
func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) {
- panic("not implemented")
+ fcall := &Fcall{
+ Type: Tattach,
+ Message: &MessageTattach{
+ Fid: fid,
+ Afid: afid,
+ Uname: uname,
+ Aname: aname,
+ },
+ }
+
+ resp, err := c.send(ctx, fcall)
+ if err != nil {
+ return Qid{}, err
+ }
+
+ mrr, ok := resp.Message.(*MessageRattach)
+ if !ok {
+ return Qid{}, fmt.Errorf("invalid rpc response for attach message: %v", resp)
+ }
+
+ return mrr.Qid, nil
}
func (c *client) Clunk(ctx context.Context, fid Fid) error {
}
func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) {
- panic("not implemented")
+ if len(names) > 16 {
+ // TODO(stevvooe): Implement multi-message handling for more than 16
+ // wnames. May want to actually force caller to implement this since
+ // we'll need a new fid for each RPC.
+ panic("more than 16 components not implemented")
+ }
+
+ fcall := &Fcall{
+ Type: Twalk,
+ Message: &MessageTwalk{
+ Fid: fid,
+ Newfid: newfid,
+ Wname: names,
+ },
+ }
+
+ resp, err := c.send(ctx, fcall)
+ if err != nil {
+ return nil, err
+ }
+
+ mrr, ok := resp.Message.(*MessageRwalk)
+ if !ok {
+ return nil, fmt.Errorf("invalid rpc response for walk message: %v", resp)
+ }
+
+ return mrr.Qids, nil
}
func (c *client) Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) {
- panic("not implemented")
+ // TODO(stevvooe): Split up reads into multiple messages based on iounit.
+ // For now, we just support full blast. I mean, why not?
+ fcall := &Fcall{
+ Type: Tread,
+ Message: &MessageTread{
+ Fid: fid,
+ Offset: uint64(offset),
+ Count: uint32(len(p)),
+ },
+ }
+
+ resp, err := c.send(ctx, fcall)
+ if err != nil {
+ return 0, err
+ }
+
+ mrr, ok := resp.Message.(*MessageRread)
+ if !ok {
+ return 0, fmt.Errorf("invalid rpc response for read message: %v", resp)
+ }
+
+ return copy(p, mrr.Data), nil
}
func (c *client) Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) {
- panic("not implemented")
+ // TODO(stevvooe): Split up writes into multiple messages based on iounit.
+ // For now, we just support full blast. I mean, why not?
+ fcall := &Fcall{
+ Type: Twrite,
+ Message: &MessageTwrite{
+ Fid: fid,
+ Offset: uint64(offset),
+ Data: p,
+ },
+ }
+
+ resp, err := c.send(ctx, fcall)
+ if err != nil {
+ return 0, err
+ }
+
+ mrr, ok := resp.Message.(MessageRwrite)
+ if !ok {
+ return 0, fmt.Errorf("invalid rpc response for read message: %v", resp)
+ }
+
+ return int(mrr.Count), nil
}
func (c *client) Open(ctx context.Context, fid Fid, mode int32) (Qid, error) {
return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
}
+ // TODO(stevvooe): Use this response to set iounit and version on this
+ // client instance.
+
return mv.MSize, mv.Version, nil
}
loop:
for {
+ const pump = time.Second
+
// Continuously set the read dead line pump the loop below. We can
// probably set a connection dead threshold that can count these.
// Usually, this would only matter when there are actually
// outstanding requests.
- if err := c.conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
+ deadline, ok := c.ctx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(pump)
+ } else {
+ // if the deadline is before
+ nd := time.Now().Add(pump)
+ if nd.Before(deadline) {
+ deadline = nd
+ }
+ }
+
+ if err := c.conn.SetReadDeadline(deadline); err != nil {
panic(fmt.Sprintf("error setting read deadline: %v", err))
}
}
select {
+ case <-c.ctx.Done():
+ return
case <-c.closed:
return
case responses <- fc:
for {
select {
+ case <-c.ctx.Done():
+ return
case <-c.closed:
return
case req := <-c.requests:
blob - a984f15e43f90d71c7c82e281df8cf2cbeb640ee
blob + c33ca277646297a1dd3badf83aca03b187eb9c7a
--- encoding.go
+++ encoding.go
"encoding/binary"
"fmt"
"io"
- "log"
"reflect"
"time"
)
// read9p extracts values from rd and unmarshals them to the targets of vs.
func (d *decoder) decode(vs ...interface{}) error {
for _, v := range vs {
- before := fmt.Sprintf("%#v", v)
switch v := v.(type) {
case *string:
var ll uint16
return err
}
}
- log.Printf("Decode: %v -> %#v", before, v)
}
return nil
blob - 6dfb0d4b0a2d2b11bb9074dafa15a299b06f0777
blob + 9933b9852590d921bfbd30d5d5611301b45cbabd
--- fcall.go
+++ fcall.go
case Rcreate:
case Tread:
-
+ return &MessageTread{}, nil
case Rread:
return &MessageRread{}, nil
case Twrite:
-
+ return &MessageTwrite{}, nil
case Rwrite:
-
+ return &MessageRwrite{}, nil
case Tclunk:
case Rclunk:
Aname string
}
+func (MessageTattach) message9p() {}
+
type MessageRattach struct {
Qid Qid
}
+func (MessageRattach) message9p() {}
+
type MessageTwalk struct {
Fid Fid
Newfid Fid
blob - 032f07f44d7148b2ac5d6755d3afae997b5606f5
blob + 9eac603c2d0f52f707d8bf18e975a11f2c712207
--- session.go
+++ session.go
Version(ctx context.Context, msize uint32, version string) (uint32, string, error)
}
-func Dial(addr string) (Session, error) {
+func Dial(ctx context.Context, addr string) (Session, error) {
c, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
// BUG(stevvooe): Session doesn't actually close connection. Dial might
// not be the right interface.
- return NewSession(c)
+ return NewSession(ctx, c)
}