commit - fb37ce2aa1f717002944b5aec393b3318e2e7261
commit + a8abc68745cebc0f92ba31a589e2f6f33ce215a9
blob - 9de03f0d26e4cad75685fe0929ef3cc725232ec9
blob + d6e65d033474011bd20818b6180c6152fe523906
--- channel.go
+++ channel.go
"io/ioutil"
"log"
"net"
+ "time"
"golang.org/x/net/context"
)
+const (
+ defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context
+)
+
// channel provides bidirectional protocol framing for 9p over net.Conn.
// Operations are not thread-safe but reads and writes may be carried out
// concurrently, supporting separate read and write loops.
ch.wrbuf = make([]byte, msize)
}
+// version negiotiates the protocol version using channel, blocking until a
+// response is received. The received values can be used to set msize for the
+// channel or assist in client setup.
+func (ch *channel) version(ctx context.Context, msize uint32, version string) (uint32, string, error) {
+ req := newFcall(MessageTversion{
+ MSize: uint32(msize),
+ Version: version,
+ })
+
+ if err := ch.writeFcall(ctx, req); err != nil {
+ return 0, "", err
+ }
+
+ resp := new(Fcall)
+ if err := ch.readFcall(ctx, resp); err != nil {
+ return 0, "", err
+ }
+
+ mv, ok := resp.Message.(*MessageRversion)
+ if !ok {
+ return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
+ }
+
+ return mv.MSize, mv.Version, nil
+}
+
// ReadFcall reads the next message from the channel into fcall.
func (ch *channel) readFcall(ctx context.Context, fcall *Fcall) error {
select {
return ErrClosed
default:
}
- log.Println("channel: readFcall", fcall)
- if deadline, ok := ctx.Deadline(); ok {
- if err := ch.conn.SetReadDeadline(deadline); err != nil {
- log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
- }
+ deadline, ok := ctx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(defaultRWTimeout)
}
+ if err := ch.conn.SetReadDeadline(deadline); err != nil {
+ log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
+ }
+
n, err := readmsg(ch.brd, ch.rdbuf)
if err != nil {
// TODO(stevvooe): There may be more we can do here to detect partial
return fmt.Errorf("message large than buffer:", n)
}
+ log.Println("channel: readFcall", fcall)
return ch.codec.Unmarshal(ch.rdbuf[:n], fcall)
}
}
log.Println("channel: writeFcall", fcall)
- if deadline, ok := ctx.Deadline(); ok {
- if err := ch.conn.SetWriteDeadline(deadline); err != nil {
- log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
- }
+ deadline, ok := ctx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(defaultRWTimeout)
}
+ if err := ch.conn.SetWriteDeadline(deadline); err != nil {
+ log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
+ }
+
n, err := ch.codec.Marshal(ch.wrbuf, fcall)
if err != nil {
return err
}
}
+ log.Println("msg", n, msize, mbody, len(p), p)
return n, nil
}
blob - 30c2205d3c5f55a717ead6a71e823d1445b868fa
blob + 4a1ab7014946f4804f93da8f94cc92ba0d4d27a4
--- client.go
+++ client.go
ch := newChannel(conn, codec9p{}, msize) // sets msize, effectively.
// negotiate the protocol version
- smsize, svers, err := version(ctx, ch, msize, vers)
+ smsize, svers, err := ch.version(ctx, msize, vers)
if err != nil {
return nil, err
}
panic("not implemented")
}
-
-// version negiotiates the protocol version using channel, blocking until a
-// response is received. This should be called before starting the transport.
-func version(ctx context.Context, ch *channel, msize uint32, version string) (uint32, string, error) {
- req := newFcall(MessageTversion{
- MSize: uint32(msize),
- Version: version,
- })
-
- if err := ch.writeFcall(ctx, req); err != nil {
- return 0, "", err
- }
-
- resp := new(Fcall)
- if err := ch.readFcall(ctx, resp); err != nil {
- return 0, "", err
- }
-
- mv, ok := resp.Message.(*MessageRversion)
- if !ok {
- return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
- }
-
- return mv.MSize, mv.Version, nil
-}
blob - 1db7807e7ff9d6b0336ace83a50ede2453e8270a
blob + 9c73f8688214ada8d9cd4d8faf0ac0083b926f72
--- cmd/9pr/main.go
+++ cmd/9pr/main.go
}
defer c.session.Clunk(ctx, targetfid)
- _, msize, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
+ qid, iounit, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
if err != nil {
return err
}
- p := make([]byte, msize)
+ if iounit < 1 {
+ msize, _ := c.session.Version()
+ iounit = msize - 24 // size of message max minus fcall io header (Rread)
+ }
+ p := make([]byte, iounit)
+
n, err := c.session.Read(ctx, targetfid, p, 0)
if err != nil {
return err
targetfid := c.nextfid
c.nextfid++
components := strings.Split(strings.TrimSpace(strings.Trim(p, "/")), "/")
- if _, err := c.session.Walk(c.ctx, c.rootfid, targetfid, components...); err != nil {
+ if _, err := c.session.Walk(ctx, c.rootfid, targetfid, components...); err != nil {
return err
}
- defer c.session.Clunk(c.ctx, c.pwdfid)
+ defer c.session.Clunk(ctx, c.pwdfid)
- _, msize, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD)
+ _, msize, err := c.session.Open(ctx, targetfid, p9pnew.OREAD)
if err != nil {
return err
}
b := make([]byte, msize)
- n, err := c.session.Read(c.ctx, targetfid, b, 0)
+ n, err := c.session.Read(ctx, targetfid, b, 0)
if err != nil {
return err
}
blob - 10d52740dedc0b060356f11de11880aee4f26b6b
blob + 165a918e4c7c269d32ed20dab90b79e57a3aa8e1
--- encoding.go
+++ encoding.go
return err
}
case *[]byte:
- if err := e.encode(uint16(len(*v))); err != nil {
+ if err := e.encode(uint32(len(*v))); err != nil {
return err
}
n, err := io.ReadFull(d.rd, b)
if err != nil {
- log.Println("readfull failed:", err)
+ log.Println("readfull failed:", err, ll, n)
return err
}
return err
}
case *[]byte:
- var ll uint16
+ var ll uint32
if err := d.decode(&ll); err != nil {
return err
}
*v = time.Unix(int64(epoch), 0).UTC()
- case Message, *Qid, *Dir:
+ case *Dir:
+ var ll uint16
+
+ if err := d.decode(&ll); err != nil {
+ return err
+ }
+
+ b := make([]byte, ll)
+ // must consume entire dir entry.
+ n, err := io.ReadFull(d.rd, b)
+ if err != nil {
+ log.Println("dir readfull failed:", err, ll, n)
+ return err
+ }
+
elements, err := fields9p(v)
if err != nil {
return err
}
+ dec := &decoder{bytes.NewReader(b)}
+
+ if err := dec.decode(elements...); err != nil {
+ return err
+ }
+ case Message, *Qid:
+ elements, err := fields9p(v)
+ if err != nil {
+ return err
+ }
+
if err := d.decode(elements...); err != nil {
return err
}
blob - 4b25bc02d50aef020b58e0cb0ca06ea43cedede0
blob + f7b340c21172f8a3c4fa08da574a2301fc5d9910
--- transport.go
+++ transport.go
return nil, ErrClosed
case <-ctx.Done():
return nil, ctx.Err()
+ case err := <-req.err:
+ return nil, err
case resp := <-req.response:
log.Println("resp", resp)
if resp.Type == Rerror {
// handle takes messages off the wire and wakes up the waiting tag call.
func (t *transport) handle() {
-
+ defer func() {
+ log.Println("exited handle loop")
+ close(t.closed)
+ }()
// the following variable block are protected components owned by this thread.
var (
responses = make(chan *Fcall)
}()
for {
+ log.Println("wait...")
select {
case req := <-t.requests:
// the tag map and dealloc the tag. We may also want to send a
// flush for the tag.
if err := t.ch.writeFcall(req.ctx, req.fcall); err != nil {
+ log.Println("error writing fcall", err, req.fcall)
delete(outstanding, req.fcall.Tag)
req.err <- err
}