commit a8abc68745cebc0f92ba31a589e2f6f33ce215a9 from: Stephen J Day date: Fri Oct 30 03:21:00 2015 UTC fs/p9p/new: fully working ls with ufs The repl and client session work fully with rminnich/go9p/ufs. The key to this was proper support for directory serialization, which is subtle. The size field for Rread was the incorrect size. Several robustness improvements are also included. Issues with the timeouts and missing error returns have been addressed. Signed-off-by: Stephen J Day commit - fb37ce2aa1f717002944b5aec393b3318e2e7261 commit + a8abc68745cebc0f92ba31a589e2f6f33ce215a9 blob - 9de03f0d26e4cad75685fe0929ef3cc725232ec9 blob + d6e65d033474011bd20818b6180c6152fe523906 --- channel.go +++ channel.go @@ -8,10 +8,15 @@ import ( "io/ioutil" "log" "net" + "time" "golang.org/x/net/context" ) +const ( + defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context +) + // channel provides bidirectional protocol framing for 9p over net.Conn. // Operations are not thread-safe but reads and writes may be carried out // concurrently, supporting separate read and write loops. @@ -54,6 +59,32 @@ func (ch *channel) setmsize(msize int) { ch.wrbuf = make([]byte, msize) } +// version negiotiates the protocol version using channel, blocking until a +// response is received. The received values can be used to set msize for the +// channel or assist in client setup. +func (ch *channel) version(ctx context.Context, msize uint32, version string) (uint32, string, error) { + req := newFcall(MessageTversion{ + MSize: uint32(msize), + Version: version, + }) + + if err := ch.writeFcall(ctx, req); err != nil { + return 0, "", err + } + + resp := new(Fcall) + if err := ch.readFcall(ctx, resp); err != nil { + return 0, "", err + } + + mv, ok := resp.Message.(*MessageRversion) + if !ok { + return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp) + } + + return mv.MSize, mv.Version, nil +} + // ReadFcall reads the next message from the channel into fcall. func (ch *channel) readFcall(ctx context.Context, fcall *Fcall) error { select { @@ -61,14 +92,16 @@ func (ch *channel) readFcall(ctx context.Context, fcal return ErrClosed default: } - log.Println("channel: readFcall", fcall) - if deadline, ok := ctx.Deadline(); ok { - if err := ch.conn.SetReadDeadline(deadline); err != nil { - log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err) - } + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultRWTimeout) } + if err := ch.conn.SetReadDeadline(deadline); err != nil { + log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err) + } + n, err := readmsg(ch.brd, ch.rdbuf) if err != nil { // TODO(stevvooe): There may be more we can do here to detect partial @@ -82,6 +115,7 @@ func (ch *channel) readFcall(ctx context.Context, fcal return fmt.Errorf("message large than buffer:", n) } + log.Println("channel: readFcall", fcall) return ch.codec.Unmarshal(ch.rdbuf[:n], fcall) } @@ -93,12 +127,15 @@ func (ch *channel) writeFcall(ctx context.Context, fca } log.Println("channel: writeFcall", fcall) - if deadline, ok := ctx.Deadline(); ok { - if err := ch.conn.SetWriteDeadline(deadline); err != nil { - log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err) - } + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultRWTimeout) } + if err := ch.conn.SetWriteDeadline(deadline); err != nil { + log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err) + } + n, err := ch.codec.Marshal(ch.wrbuf, fcall) if err != nil { return err @@ -150,6 +187,7 @@ func readmsg(rd io.Reader, p []byte) (n int, err error } } + log.Println("msg", n, msize, mbody, len(p), p) return n, nil } blob - 30c2205d3c5f55a717ead6a71e823d1445b868fa blob + 4a1ab7014946f4804f93da8f94cc92ba0d4d27a4 --- client.go +++ client.go @@ -26,7 +26,7 @@ func NewSession(ctx context.Context, conn net.Conn) (S ch := newChannel(conn, codec9p{}, msize) // sets msize, effectively. // negotiate the protocol version - smsize, svers, err := version(ctx, ch, msize, vers) + smsize, svers, err := ch.version(ctx, msize, vers) if err != nil { return nil, err } @@ -219,28 +219,3 @@ func (c *client) flush(ctx context.Context, tag Tag) e panic("not implemented") } - -// version negiotiates the protocol version using channel, blocking until a -// response is received. This should be called before starting the transport. -func version(ctx context.Context, ch *channel, msize uint32, version string) (uint32, string, error) { - req := newFcall(MessageTversion{ - MSize: uint32(msize), - Version: version, - }) - - if err := ch.writeFcall(ctx, req); err != nil { - return 0, "", err - } - - resp := new(Fcall) - if err := ch.readFcall(ctx, resp); err != nil { - return 0, "", err - } - - mv, ok := resp.Message.(*MessageRversion) - if !ok { - return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp) - } - - return mv.MSize, mv.Version, nil -} blob - 1db7807e7ff9d6b0336ace83a50ede2453e8270a blob + 9c73f8688214ada8d9cd4d8faf0ac0083b926f72 --- cmd/9pr/main.go +++ cmd/9pr/main.go @@ -176,13 +176,18 @@ func (c *fsCommander) cmdls(ctx context.Context, args } defer c.session.Clunk(ctx, targetfid) - _, msize, err := c.session.Open(ctx, targetfid, p9pnew.OREAD) + qid, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD) if err != nil { return err } - p := make([]byte, msize) + if iounit < 1 { + msize, _ := c.session.Version() + iounit = msize - 24 // size of message max minus fcall io header (Rread) + } + p := make([]byte, iounit) + n, err := c.session.Read(ctx, targetfid, p, 0) if err != nil { return err @@ -269,19 +274,19 @@ func (c *fsCommander) cmdcat(ctx context.Context, args targetfid := c.nextfid c.nextfid++ components := strings.Split(strings.TrimSpace(strings.Trim(p, "/")), "/") - if _, err := c.session.Walk(c.ctx, c.rootfid, targetfid, components...); err != nil { + if _, err := c.session.Walk(ctx, c.rootfid, targetfid, components...); err != nil { return err } - defer c.session.Clunk(c.ctx, c.pwdfid) + defer c.session.Clunk(ctx, c.pwdfid) - _, msize, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD) + _, msize, err := c.session.Open(ctx, targetfid, p9pnew.OREAD) if err != nil { return err } b := make([]byte, msize) - n, err := c.session.Read(c.ctx, targetfid, b, 0) + n, err := c.session.Read(ctx, targetfid, b, 0) if err != nil { return err } blob - 10d52740dedc0b060356f11de11880aee4f26b6b blob + 165a918e4c7c269d32ed20dab90b79e57a3aa8e1 --- encoding.go +++ encoding.go @@ -83,7 +83,7 @@ func (e *encoder) encode(vs ...interface{}) error { return err } case *[]byte: - if err := e.encode(uint16(len(*v))); err != nil { + if err := e.encode(uint32(len(*v))); err != nil { return err } @@ -181,7 +181,7 @@ func (d *decoder) decode(vs ...interface{}) error { n, err := io.ReadFull(d.rd, b) if err != nil { - log.Println("readfull failed:", err) + log.Println("readfull failed:", err, ll, n) return err } @@ -207,7 +207,7 @@ func (d *decoder) decode(vs ...interface{}) error { return err } case *[]byte: - var ll uint16 + var ll uint32 if err := d.decode(&ll); err != nil { return err @@ -259,12 +259,37 @@ func (d *decoder) decode(vs ...interface{}) error { } *v = time.Unix(int64(epoch), 0).UTC() - case Message, *Qid, *Dir: + case *Dir: + var ll uint16 + + if err := d.decode(&ll); err != nil { + return err + } + + b := make([]byte, ll) + // must consume entire dir entry. + n, err := io.ReadFull(d.rd, b) + if err != nil { + log.Println("dir readfull failed:", err, ll, n) + return err + } + elements, err := fields9p(v) if err != nil { return err } + dec := &decoder{bytes.NewReader(b)} + + if err := dec.decode(elements...); err != nil { + return err + } + case Message, *Qid: + elements, err := fields9p(v) + if err != nil { + return err + } + if err := d.decode(elements...); err != nil { return err } blob - 4b25bc02d50aef020b58e0cb0ca06ea43cedede0 blob + f7b340c21172f8a3c4fa08da574a2301fc5d9910 --- transport.go +++ transport.go @@ -74,6 +74,8 @@ func (t *transport) send(ctx context.Context, fcall *F return nil, ErrClosed case <-ctx.Done(): return nil, ctx.Err() + case err := <-req.err: + return nil, err case resp := <-req.response: log.Println("resp", resp) if resp.Type == Rerror { @@ -92,7 +94,10 @@ func (t *transport) send(ctx context.Context, fcall *F // handle takes messages off the wire and wakes up the waiting tag call. func (t *transport) handle() { - + defer func() { + log.Println("exited handle loop") + close(t.closed) + }() // the following variable block are protected components owned by this thread. var ( responses = make(chan *Fcall) @@ -140,6 +145,7 @@ func (t *transport) handle() { }() for { + log.Println("wait...") select { case req := <-t.requests: @@ -162,6 +168,7 @@ func (t *transport) handle() { // the tag map and dealloc the tag. We may also want to send a // flush for the tag. if err := t.ch.writeFcall(req.ctx, req.fcall); err != nil { + log.Println("error writing fcall", err, req.fcall) delete(outstanding, req.fcall.Tag) req.err <- err }