commit e6bcde663b39a93941cfd4188a6378e2ac5b92b2 from: Stephen J Day date: Thu Oct 29 02:21:19 2015 UTC fs/p9p/new: working client to server communication A rather large commit, this provides the first working version of running the wire protocol over a connection. It is fairly shaky but we can already get portions of the repl emitting and receiving 9p messages. Other additions include the separate of client and transport. We've also made some adjustments to message instantiation. Signed-off-by: Stephen J Day commit - 6b84ea70bad42d1dc151a3547b21c8818527e78d commit + e6bcde663b39a93941cfd4188a6378e2ac5b92b2 blob - acb4b04fc8d42393ecdd5d8c7444c95354fb2144 blob + 5993872f445220c17408dabd657bcfd53470ba28 --- client.go +++ client.go @@ -1,10 +1,8 @@ package p9pnew import ( - "bufio" "fmt" "log" - "time" "golang.org/x/net/context" @@ -12,11 +10,8 @@ import ( ) type client struct { - ctx context.Context - conn net.Conn - tags *tagPool - requests chan fcallRequest - closed chan struct{} + ctx context.Context + transport roundTripper } // NewSession returns a session using the connection. The Context ctx provides @@ -24,8 +19,8 @@ type client struct { // session. The session can effectively shutdown with this context. func NewSession(ctx context.Context, conn net.Conn) (Session, error) { return &client{ - ctx: ctx, - conn: conn, + ctx: ctx, + transport: newTransport(ctx, conn), }, nil } @@ -36,6 +31,7 @@ func (c *client) Auth(ctx context.Context, afid Fid, u } func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) { + log.Println("client attach", fid, aname) fcall := &Fcall{ Type: Tattach, Message: &MessageTattach{ @@ -46,7 +42,7 @@ func (c *client) Attach(ctx context.Context, fid, afid }, } - resp, err := c.send(ctx, fcall) + resp, err := c.transport.send(ctx, fcall) if err != nil { return Qid{}, err } @@ -60,7 +56,20 @@ func (c *client) Attach(ctx context.Context, fid, afid } func (c *client) Clunk(ctx context.Context, fid Fid) error { - panic("not implemented") + fcall := newFcall(&MessageTclunk{ + Fid: fid, + }) + + resp, err := c.transport.send(ctx, fcall) + if err != nil { + return err + } + + if resp.Type != Rclunk { + return fmt.Errorf("incorrect response type: %v", resp) + } + + return nil } func (c *client) Remove(ctx context.Context, fid Fid) error { @@ -69,10 +78,7 @@ func (c *client) Remove(ctx context.Context, fid Fid) func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) { if len(names) > 16 { - // TODO(stevvooe): Implement multi-message handling for more than 16 - // wnames. May want to actually force caller to implement this since - // we'll need a new fid for each RPC. - panic("more than 16 components not implemented") + return nil, fmt.Errorf("too many elements in wname") } fcall := &Fcall{ @@ -80,11 +86,11 @@ func (c *client) Walk(ctx context.Context, fid Fid, ne Message: &MessageTwalk{ Fid: fid, Newfid: newfid, - Wname: names, + Wnames: names, }, } - resp, err := c.send(ctx, fcall) + resp, err := c.transport.send(ctx, fcall) if err != nil { return nil, err } @@ -109,7 +115,7 @@ func (c *client) Read(ctx context.Context, fid Fid, p }, } - resp, err := c.send(ctx, fcall) + resp, err := c.transport.send(ctx, fcall) if err != nil { return 0, err } @@ -134,21 +140,36 @@ func (c *client) Write(ctx context.Context, fid Fid, p }, } - resp, err := c.send(ctx, fcall) + resp, err := c.transport.send(ctx, fcall) if err != nil { return 0, err } mrr, ok := resp.Message.(MessageRwrite) if !ok { - return 0, fmt.Errorf("invalid rpc response for read message: %v", resp) + return 0, fmt.Errorf("invalid rpc response for write message: %v", resp) } return int(mrr.Count), nil } -func (c *client) Open(ctx context.Context, fid Fid, mode int32) (Qid, error) { - panic("not implemented") +func (c *client) Open(ctx context.Context, fid Fid, mode uint8) (Qid, uint32, error) { + fcall := newFcall(&MessageTopen{ + Fid: fid, + Mode: mode, + }) + + resp, err := c.transport.send(ctx, fcall) + if err != nil { + return Qid{}, 0, err + } + + respmsg, ok := resp.Message.(MessageRopen) + if !ok { + return Qid{}, 0, fmt.Errorf("invalid rpc response for open message: %v", resp) + } + + return respmsg.Qid, respmsg.Msize, nil } func (c *client) Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error) { @@ -164,20 +185,19 @@ func (c *client) WStat(context.Context, Fid, Dir) erro } func (c *client) Version(ctx context.Context, msize uint32, version string) (uint32, string, error) { - fcall := &Fcall{ - Type: Tversion, - Message: MessageVersion{ - MSize: uint32(msize), - Version: version, - }, + msg := MessageTversion{ + MSize: uint32(msize), + Version: version, } - resp, err := c.send(ctx, fcall) + fcall := newFcall(msg) + + resp, err := c.transport.send(ctx, fcall) if err != nil { return 0, "", err } - mv, ok := resp.Message.(*MessageVersion) + mv, ok := resp.Message.(*MessageRversion) if !ok { return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp) } @@ -191,144 +211,6 @@ func (c *client) Version(ctx context.Context, msize ui func (c *client) flush(ctx context.Context, tag Tag) error { // TODO(stevvooe): We need to fire and forget flush messages when a call // context gets cancelled. - panic("not implemented") -} -// send dispatches the fcall. -func (c *client) send(ctx context.Context, fc *Fcall) (*Fcall, error) { - fc.Tag = c.tags.Get() - defer c.tags.Put(fc.Tag) - - fcreq := newFcallRequest(ctx, fc) - - // dispatch the request. - select { - case <-c.closed: - return nil, ErrClosed - case c.requests <- fcreq: - case <-ctx.Done(): - return nil, ctx.Err() - } - - // wait for the response. - select { - case <-c.closed: - return nil, ErrClosed - case <-ctx.Done(): - return nil, ctx.Err() - case resp := <-fcreq.response: - return resp, nil - } -} - -type fcallRequest struct { - ctx context.Context - fcall *Fcall - response chan *Fcall - err chan error + panic("not implemented") } - -func newFcallRequest(ctx context.Context, fc *Fcall) fcallRequest { - return fcallRequest{ - ctx: ctx, - fcall: fc, - response: make(chan *Fcall, 1), - err: make(chan error, 1), - } -} - -// handle takes messages off the wire and wakes up the waiting tag call. -func (c *client) handle() { - - var ( - responses = make(chan *Fcall) - // outstanding provides a map of tags to outstanding requests. - outstanding = map[Tag]*fcallRequest{} - ) - - // loop to read messages off of the connection - go func() { - dec := &decoder{bufio.NewReader(c.conn)} - - loop: - for { - const pump = time.Second - - // Continuously set the read dead line pump the loop below. We can - // probably set a connection dead threshold that can count these. - // Usually, this would only matter when there are actually - // outstanding requests. - deadline, ok := c.ctx.Deadline() - if !ok { - deadline = time.Now().Add(pump) - } else { - // if the deadline is before - nd := time.Now().Add(pump) - if nd.Before(deadline) { - deadline = nd - } - } - - if err := c.conn.SetReadDeadline(deadline); err != nil { - panic(fmt.Sprintf("error setting read deadline: %v", err)) - } - - fc := new(Fcall) - if err := dec.decode(fc); err != nil { - switch err := err.(type) { - case net.Error: - if err.Timeout() || err.Temporary() { - break loop - } - } - - panic(fmt.Sprintf("connection read error: %v", err)) - } - - select { - case <-c.ctx.Done(): - return - case <-c.closed: - return - case responses <- fc: - } - } - - }() - - enc := &encoder{bufio.NewWriter(c.conn)} - - for { - select { - case <-c.ctx.Done(): - return - case <-c.closed: - return - case req := <-c.requests: - outstanding[req.fcall.Tag] = &req - - // use deadline to set write deadline for this request. - deadline, ok := req.ctx.Deadline() - if !ok { - deadline = time.Now().Add(time.Second) - } - - if err := c.conn.SetWriteDeadline(deadline); err != nil { - log.Println("error setting write deadline: %v", err) - } - - if err := enc.encode(req.fcall); err != nil { - delete(outstanding, req.fcall.Tag) - req.err <- err - } - case b := <-responses: - req, ok := outstanding[b.Tag] - if !ok { - panic("unknown tag received") - } - delete(outstanding, req.fcall.Tag) - - req.response <- b - } - } -} blob - dbc20c3115cb222fa2a80d65f3472d7425979d5a blob + d003a08067db2beb05f4e2124a29c59f1d0a43b8 --- cmd/9pr/main.go +++ cmd/9pr/main.go @@ -5,6 +5,9 @@ import ( "fmt" "io" "log" + "net" + "net/http" + _ "net/http/pprof" "os" "path" "strings" @@ -17,13 +20,29 @@ import ( ) func main() { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + log.SetFlags(0) // addr := os.Args[1] - + ctx := context.Background() // TODO(stevvooe): Use a dialer once we have the server session working // and running. + session := newSimpleSession() + + sconn, cconn := net.Pipe() + + go p9pnew.Serve(ctx, sconn, session) + + log.Println("new session") + csession, err := p9pnew.NewSession(ctx, cconn) + if err != nil { + log.Fatalln(err) + } + // session, err := p9pnew.Dial(ctx, addr) // if err != nil { // log.Fatalln(err) @@ -31,7 +50,7 @@ func main() { commander := &fsCommander{ ctx: context.Background(), - session: newSimpleSession(), + session: csession, pwd: "/", stdout: os.Stdout, stderr: os.Stderr, @@ -55,6 +74,7 @@ func main() { } commander.readline = rl + log.Println("attach root") // attach root commander.nextfid = 1 if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9pnew.NOFID, "anyone", "/"); err != nil { @@ -63,6 +83,7 @@ func main() { commander.rootfid = commander.nextfid commander.nextfid++ + log.Println("clone root") // clone the pwd fid so we can clunk it if _, err := commander.session.Walk(commander.ctx, commander.rootfid, commander.nextfid); err != nil { log.Fatalln(err) @@ -85,7 +106,7 @@ func main() { args := strings.Fields(line) name := args[0] - var cmd func(args ...string) error + var cmd func(ctx context.Context, args ...string) error switch name { case "ls": @@ -97,12 +118,13 @@ func main() { case "cat": cmd = commander.cmdcat default: - cmd = func(args ...string) error { + cmd = func(ctx context.Context, args ...string) error { return fmt.Errorf("command not implemented") } } - if err := cmd(args[1:]...); err != nil { + ctx, _ = context.WithTimeout(commander.ctx, time.Second) + if err := cmd(ctx, args[1:]...); err != nil { log.Printf("👹 %s: %v", name, err) } } @@ -122,7 +144,7 @@ type fsCommander struct { stderr io.Writer } -func (c *fsCommander) cmdls(args ...string) error { +func (c *fsCommander) cmdls(ctx context.Context, args ...string) error { ps := []string{c.pwd} if len(args) > 0 { ps = args @@ -143,18 +165,18 @@ func (c *fsCommander) cmdls(args ...string) error { targetfid := c.nextfid c.nextfid++ components := strings.Split(strings.Trim(p, "/"), "/") - if _, err := c.session.Walk(c.ctx, c.rootfid, targetfid, components...); err != nil { + if _, err := c.session.Walk(ctx, c.rootfid, targetfid, components...); err != nil { return err } - defer c.session.Clunk(c.ctx, targetfid) + defer c.session.Clunk(ctx, targetfid) - if _, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD); err != nil { + if _, _, err := c.session.Open(ctx, targetfid, p9pnew.OREAD); err != nil { return err } p := make([]byte, 4<<20) - n, err := c.session.Read(c.ctx, targetfid, p, 0) + n, err := c.session.Read(ctx, targetfid, p, 0) if err != nil { return err } @@ -183,7 +205,7 @@ func (c *fsCommander) cmdls(args ...string) error { return wr.Flush() } -func (c *fsCommander) cmdcd(args ...string) error { +func (c *fsCommander) cmdcd(ctx context.Context, args ...string) error { var p string switch len(args) { case 0: @@ -213,7 +235,7 @@ func (c *fsCommander) cmdcd(args ...string) error { return nil } -func (c *fsCommander) cmdpwd(args ...string) error { +func (c *fsCommander) cmdpwd(ctx context.Context, args ...string) error { if len(args) != 0 { return fmt.Errorf("pwd takes no arguments") } @@ -222,7 +244,7 @@ func (c *fsCommander) cmdpwd(args ...string) error { return nil } -func (c *fsCommander) cmdcat(args ...string) error { +func (c *fsCommander) cmdcat(ctx context.Context, args ...string) error { var p string switch len(args) { case 0: @@ -245,11 +267,12 @@ func (c *fsCommander) cmdcat(args ...string) error { } defer c.session.Clunk(c.ctx, c.pwdfid) - if _, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD); err != nil { + _, msize, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD) + if err != nil { return err } - b := make([]byte, 4<<20) + b := make([]byte, msize) n, err := c.session.Read(c.ctx, targetfid, b, 0) if err != nil { @@ -330,9 +353,6 @@ func newSimpleSession() p9pnew.Session { b.add(bc) root.add(a) root.add(b) - log.Println(root.children) - log.Println(a.children) - log.Println(b.children) return &simpleSession{ root: root, @@ -438,15 +458,15 @@ func (s *simpleSession) Write(ctx context.Context, fid panic("not implemented") } -func (s *simpleSession) Open(ctx context.Context, fid p9pnew.Fid, mode int32) (p9pnew.Qid, error) { +func (s *simpleSession) Open(ctx context.Context, fid p9pnew.Fid, mode uint8) (p9pnew.Qid, uint32, error) { fi, ok := s.fids[fid] if !ok { - return p9pnew.Qid{}, p9pnew.ErrUnknownfid + return p9pnew.Qid{}, 0, p9pnew.ErrUnknownfid } s.opened[fid] = struct{}{} - return fi.dir.Qid, nil + return fi.dir.Qid, 4 << 20, nil } func (s *simpleSession) Create(ctx context.Context, parent p9pnew.Fid, name string, perm uint32, mode uint32) (p9pnew.Qid, error) { blob - 45ec913cb709375259ca68b89724efe2da533146 blob + 529dbcad3e754a2d6af8c32544adc0bfbc3d0c39 --- encoding.go +++ encoding.go @@ -344,3 +344,16 @@ func fields9p(v interface{}) ([]interface{}, error) { return elements, nil } + +func pretty9p(w io.Writer, v interface{}) error { + switch v := v.(type) { + case *Fcall: + pretty9p(w, *v) + case Fcall: + fmt.Fprintf(w, "uint32(%v) %v(%v) ", size9p(v), v.Type, v.Tag) + pretty9p(w, v.Message) + fmt.Fprintln(w) + } + + return nil +} blob - 1467c5bfd69e08e5dabbf2b195dd87d531b47ee3 blob + 76b8a9ae3ccc5c4a86aa70d2b232a1155dbd2f8f --- encoding_test.go +++ encoding_test.go @@ -27,12 +27,14 @@ func TestEncodeDecode(t *testing.T) { 0x4, 0x0, 0x71, 0x77, 0x65, 0x72, 0x4, 0x0, 0x7a, 0x78, 0x63, 0x76}, }, + // Dir + // Qid { description: "Tversion fcall", target: &Fcall{ Type: Tversion, Tag: 2255, - Message: &MessageVersion{ + Message: &MessageTversion{ MSize: uint32(1024), Version: "9PTEST", }, @@ -43,6 +45,21 @@ func TestEncodeDecode(t *testing.T) { 0x6, 0x0, 0x39, 0x50, 0x54, 0x45, 0x53, 0x54}, }, { + description: "Rversion fcall", + target: &Fcall{ + Type: Rversion, + Tag: 2255, + Message: &MessageRversion{ + MSize: uint32(1024), + Version: "9PTEST", + }, + }, + marshaled: []byte{ + 0x13, 0x0, 0x0, 0x0, + 0x65, 0xcf, 0x8, 0x0, 0x4, 0x0, 0x0, + 0x6, 0x0, 0x39, 0x50, 0x54, 0x45, 0x53, 0x54}, + }, + { description: "Twalk fcall", target: &Fcall{ Type: Twalk, blob - 9933b9852590d921bfbd30d5d5611301b45cbabd blob + 05d7d2c1a99ab94e8695ccc4740dedfda8d0371f --- fcall.go +++ fcall.go @@ -106,20 +106,21 @@ type Fcall struct { Message Message } -func (fc Fcall) String() string { +func newFcall(msg Message) *Fcall { + return &Fcall{ + Type: msg.Type(), + Message: msg, + } +} + +func (fc *Fcall) String() string { return fmt.Sprintf("%8d %v(%v) %v", size9p(fc), fc.Type, fc.Tag, fc.Message) } type Message interface { - // Size() uint32 - - // NOTE(stevvooe): The binary marshal approach isn't particularly nice to - // generating garbage. Consider using an append model, once we have the - // messages worked out. - // encoding.BinaryMarshaler - // encoding.BinaryUnmarshaler - - message9p() + // Type indicates the Fcall type of the message. This must match + // Fcall.Type. + Type() FcallType } // newMessage returns a new instance of the message based on the Fcall type. @@ -127,20 +128,19 @@ func newMessage(typ FcallType) (Message, error) { // NOTE(stevvooe): This is a nasty bit of code but makes the transport // fairly simple to implement. switch typ { - case Tversion, Rversion: - return &MessageVersion{}, nil + case Tversion: + return &MessageTversion{}, nil + case Rversion: + return &MessageRversion{}, nil case Tauth: case Rauth: - case Tattach: - + return &MessageTattach{}, nil case Rattach: - - case Terror: - + return &MessageRattach{}, nil case Rerror: - + return &MessageRerror{}, nil case Tflush: return &MessageTflush{}, nil case Rflush: @@ -150,9 +150,9 @@ func newMessage(typ FcallType) (Message, error) { case Rwalk: return &MessageRwalk{}, nil case Topen: - + return &MessageTopen{}, nil case Ropen: - + return &MessageRopen{}, nil case Tcreate: case Rcreate: @@ -166,9 +166,9 @@ func newMessage(typ FcallType) (Message, error) { case Rwrite: return &MessageRwrite{}, nil case Tclunk: - + return &MessageTclunk{}, nil case Rclunk: - + return nil, nil // no response body case Tremove: case Rremove: @@ -180,34 +180,31 @@ func newMessage(typ FcallType) (Message, error) { case Twstat: case Rwstat: - default: - return nil, fmt.Errorf("unknown message type: %v", typ) } - return nil, fmt.Errorf("unknown message") + return nil, fmt.Errorf("unknown message type") } // MessageVersion encodes the message body for Tversion and Rversion RPC // calls. The body is identical in both directions. -type MessageVersion struct { +type MessageTversion struct { MSize uint32 Version string } -func (MessageVersion) message9p() {} - -func (mv MessageVersion) String() string { - return fmt.Sprintf("msize=%v version=%v", mv.MSize, mv.Version) +type MessageRversion struct { + MSize uint32 + Version string } -type MessageTAuth struct { +type MessageTauth struct { Afid Fid Uname string Aname string } -type MessageRAuth struct { +type MessageRauth struct { Qid Qid } @@ -215,13 +212,10 @@ type MessageRerror struct { Ename string } -// MessageTflush handles the content for the Tflush message type. type MessageTflush struct { Oldtag Tag } -func (MessageTflush) message9p() {} - type MessageTattach struct { Fid Fid Afid Fid @@ -229,42 +223,30 @@ type MessageTattach struct { Aname string } -func (MessageTattach) message9p() {} - type MessageRattach struct { Qid Qid } -func (MessageRattach) message9p() {} - type MessageTwalk struct { Fid Fid Newfid Fid - Wname []string + Wnames []string } -func (MessageTwalk) message9p() {} - type MessageRwalk struct { Qids []Qid } -func (MessageRwalk) message9p() {} - type MessageTopen struct { Fid Fid Mode uint8 } -func (MessageTopen) message9p() {} - type MessageRopen struct { Qid Qid Msize uint32 } -func (MessageRopen) message9p() {} - type MessageTcreate struct { Fid Fid Name string @@ -272,43 +254,31 @@ type MessageTcreate struct { Mode uint8 } -func (MessageTcreate) message9p() {} - type MessageRcreate struct { Qid Qid IOUnit uint32 } -func (MessageRcreate) message9p() {} - type MessageTread struct { Fid Fid Offset uint64 Count uint32 } -func (MessageTread) message9p() {} - type MessageRread struct { Data []byte } -func (MessageRread) message9p() {} - type MessageTwrite struct { Fid Fid Offset uint64 Data []byte } -func (MessageTwrite) message9p() {} - type MessageRwrite struct { Count uint32 } -func (MessageRwrite) message9p() {} - type MessageTclunk struct { Fid Fid } @@ -325,9 +295,31 @@ type MessageRstat struct { Stat Dir } -func (MessageRstat) message9p() {} - type MessageTwstat struct { Fid Fid Stat Dir } + +func (MessageTversion) Type() FcallType { return Tversion } +func (MessageRversion) Type() FcallType { return Rversion } +func (MessageTauth) Type() FcallType { return Tauth } +func (MessageRauth) Type() FcallType { return Rauth } +func (MessageRerror) Type() FcallType { return Rerror } +func (MessageTflush) Type() FcallType { return Tflush } +func (MessageTattach) Type() FcallType { return Tattach } +func (MessageRattach) Type() FcallType { return Rattach } +func (MessageTwalk) Type() FcallType { return Twalk } +func (MessageRwalk) Type() FcallType { return Rwalk } +func (MessageTopen) Type() FcallType { return Topen } +func (MessageRopen) Type() FcallType { return Ropen } +func (MessageTcreate) Type() FcallType { return Tcreate } +func (MessageRcreate) Type() FcallType { return Rcreate } +func (MessageTread) Type() FcallType { return Tread } +func (MessageRread) Type() FcallType { return Rread } +func (MessageTwrite) Type() FcallType { return Twrite } +func (MessageRwrite) Type() FcallType { return Rwrite } +func (MessageTclunk) Type() FcallType { return Tclunk } +func (MessageTremove) Type() FcallType { return Tremove } +func (MessageTstat) Type() FcallType { return Tstat } +func (MessageRstat) Type() FcallType { return Rstat } +func (MessageTwstat) Type() FcallType { return Twstat } blob - 19ec9f4039516b5fcebf2abc647d96a425cc374f blob + dce6821112555e7c268f22d5c2cbaa952dd1e991 --- server.go +++ server.go @@ -1,8 +1,8 @@ -// +build ignore - package p9pnew import ( + "bufio" + "fmt" "log" "net" "time" @@ -11,44 +11,138 @@ import ( ) // Serve the 9p session over the provided network connection. -func Serve(ctx context.Context, conn net.Conn, session Session) error { - panic("not implemented") +func Serve(ctx context.Context, conn net.Conn, session Session) { + s := &server{ + ctx: ctx, + conn: conn, + session: session, + } + + s.run() } type server struct { ctx context.Context session Session conn net.Conn + closed chan struct{} } func (s *server) run() { - dec := decoder{s.conn} + brd := bufio.NewReader(s.conn) + dec := &decoder{brd} + bwr := bufio.NewWriter(s.conn) + enc := &encoder{bwr} - fcall := new(Fcall) - if err := dec.decode(fcall); err != nil { - log.Println(err) - } + tags := map[Tag]*Fcall{} // active requests + log.Println("server.run()") + for { + select { + case <-s.ctx.Done(): + log.Println("server: shutdown") + return + case <-s.closed: + default: + } + + // NOTE(stevvooe): For now, we only provide a single request at a time + // handler. We can refactor this to take requests off the wire as + // quickly as they arrive and dispatch in parallel to session. + + log.Println("server:", "wait") + fcall := new(Fcall) + if err := dec.decode(fcall); err != nil { + log.Println("server decoding fcall:", err) + continue + } + + log.Println("server:", "message", fcall) + + if _, ok := tags[fcall.Tag]; ok { + if err := enc.encode(&Fcall{ + Type: Rerror, + Tag: fcall.Tag, + Message: &MessageRerror{ + Ename: ErrDuptag.Error(), + }, + }); err != nil { + log.Println("server:", err) + } + bwr.Flush() + continue + } + tags[fcall.Tag] = fcall + + resp, err := s.handle(fcall) + if err != nil { + log.Println("server:", err) + continue + } + + if err := enc.encode(resp); err != nil { + log.Println("server:", err) + continue + } + bwr.Flush() + + } } // handle responds to an fcall using the session. An error is only returned if // the handler cannot proceed. All session errors are returned as Rerror. -func (s *server) handle(f *Fcall) (*Fcall, error) { +func (s *server) handle(req *Fcall) (*Fcall, error) { const timeout = 30 * time.Second // TODO(stevvooe): Allow this to be configured. - ctx, cancel = context.WithTimeout(s.ctx, timeout) + ctx, cancel := context.WithTimeout(s.ctx, timeout) defer cancel() - switch fcall.Type { + var resp *Fcall + switch req.Type { case Tattach: - atc, ok := fcall.Message.(*MessageTattach) - if ok { - log.Println("bad message") - continue + reqmsg, ok := req.Message.(*MessageTattach) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message) } - qid, err := s.session.Attach(s.ctx, atc.Fid, atc.Afid, atc.Uname, atc.Aname) + qid, err := s.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname) if err != nil { - return + return nil, err } + + resp = &Fcall{ + Type: Rattach, + Tag: req.Tag, + Message: &MessageRattach{ + Qid: qid, + }, + } + case Twalk: + reqmsg, ok := req.Message.(*MessageTwalk) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message) + } + + // TODO(stevvooe): This is one of the places where we need to manage + // fid allocation lifecycle. We need to reserve the fid, then, if this + // call succeeds, we should alloc the fid for future uses. Also need + // to interact correctly with concurrent clunk and the flush of this + // walk message. + qids, err := s.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...) + if err != nil { + return nil, err + } + + resp = newFcall(&MessageRwalk{ + Qids: qids, + }) } + + if resp == nil { + resp = newFcall(&MessageRerror{ + Ename: "unknown message type", + }) + } + + resp.Tag = req.Tag + return resp, nil } blob - 9eac603c2d0f52f707d8bf18e975a11f2c712207 blob + 37779b341b219f43c948e22ba0287ac579e16625 --- session.go +++ session.go @@ -25,7 +25,7 @@ type Session interface { Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error) - Open(ctx context.Context, fid Fid, mode int32) (Qid, error) + Open(ctx context.Context, fid Fid, mode uint8) (Qid, uint32, error) Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error) Stat(context.Context, Fid) (Dir, error) WStat(context.Context, Fid, Dir) error blob - 4bea0b041f5c0da62d20e12af577067a28bf09e8 blob + ab572f197ea3d78f7f9b4c37955a30121793f3fa --- tags.go +++ tags.go @@ -57,11 +57,11 @@ func (tp *tagPool) Close() error { // NewtagPool returns a tag pool with the maximum number of outstanding // requests. -func newTagPool(outstanding int) (*tagPool, error) { +func newTagPool(outstanding int) *tagPool { return &tagPool{ maximum: Tag(outstanding), freelist: make(chan Tag, outstanding), nexttag: make(chan Tag), closed: make(chan struct{}), - }, nil + } } blob - /dev/null blob + 82c3b4b965eb8eeaaa15ad008456f41a203c091c (mode 644) --- /dev/null +++ transport.go @@ -0,0 +1,212 @@ +package p9pnew + +import ( + "bufio" + "fmt" + "log" + "net" + "os" + "time" + + "golang.org/x/net/context" +) + +// roundTripper manages the request and response from the client-side. A +// roundTripper must abide by many of the rules of http.RoundTripper. +// Typically, the roundTripper will manage tag assignment and message +// serialization. +type roundTripper interface { + send(ctx context.Context, fc *Fcall) (*Fcall, error) +} + +type transport struct { + ctx context.Context + conn net.Conn + requests chan *fcallRequest + closed chan struct{} + + tags uint16 +} + +func newTransport(ctx context.Context, conn net.Conn) roundTripper { + t := &transport{ + ctx: ctx, + conn: conn, + requests: make(chan *fcallRequest), + closed: make(chan struct{}), + } + + go t.handle() + + return t +} + +type fcallRequest struct { + ctx context.Context + fcall *Fcall + response chan *Fcall + err chan error +} + +func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest { + return &fcallRequest{ + ctx: ctx, + fcall: fcall, + response: make(chan *Fcall, 1), + err: make(chan error, 1), + } +} + +func (t *transport) send(ctx context.Context, fcall *Fcall) (*Fcall, error) { + + req := newFcallRequest(ctx, fcall) + + log.Println("dispatch", fcall) + // dispatch the request. + select { + case <-t.closed: + return nil, ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + case t.requests <- req: + } + + log.Println("wait", fcall) + // wait for the response. + select { + case <-t.closed: + return nil, ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + case resp := <-req.response: + return resp, nil + } +} + +// handle takes messages off the wire and wakes up the waiting tag call. +func (t *transport) handle() { + + // the following variable block are protected components owned by this thread. + var ( + responses = make(chan *Fcall) + tags Tag + // outstanding provides a map of tags to outstanding requests. + outstanding = map[Tag]*fcallRequest{} + brd = bufio.NewReader(t.conn) + bwr = bufio.NewWriter(t.conn) + enc = &encoder{bwr} + dec = &decoder{brd} + ) + + // loop to read messages off of the connection + go func() { + + loop: + for { + const pump = time.Second + + // Continuously set the read dead line pump the loop below. We can + // probably set a connection dead threshold that can count these. + // Usually, this would only matter when there are actually + // outstanding requests. + deadline, ok := t.ctx.Deadline() + if !ok { + deadline = time.Now().Add(pump) + } else { + // if the deadline is before + nd := time.Now().Add(pump) + if nd.Before(deadline) { + deadline = nd + } + } + + if err := t.conn.SetReadDeadline(deadline); err != nil { + log.Printf("error setting read deadline: %v", err) + } + + fc := new(Fcall) + if err := dec.decode(fc); err != nil { + switch err := err.(type) { + case net.Error: + if err.Timeout() || err.Temporary() { + break loop + } + } + + panic(fmt.Sprintf("connection read error: %v", err)) + } + + select { + case <-t.ctx.Done(): + return + case <-t.closed: + return + case responses <- fc: + } + } + }() + + for { + select { + case req := <-t.requests: + tags++ + req.fcall.Tag = tags + outstanding[req.fcall.Tag] = req + + pretty9p(os.Stdout, req.fcall) + // use deadline to set write deadline for this request. + deadline, ok := req.ctx.Deadline() + if !ok { + deadline = time.Now().Add(time.Second) + } + + if err := t.conn.SetWriteDeadline(deadline); err != nil { + log.Printf("error setting write deadline: %v", err) + } + + // TODO(stevvooe): Consider the case of requests that never + // receive a response. We need to remove the fcall context from + // the tag map and dealloc the tag. We may also want to send a + // flush for the tag. + + log.Println("send", req.fcall) + if err := enc.encode(req.fcall); err != nil { + delete(outstanding, req.fcall.Tag) + req.err <- err + } + if err := bwr.Flush(); err != nil { + delete(outstanding, req.fcall.Tag) + req.err <- err + } + + log.Println("sent", req.fcall) + case b := <-responses: + req, ok := outstanding[b.Tag] + if !ok { + panic("unknown tag received") + } + delete(outstanding, req.fcall.Tag) + + req.response <- b + + // TODO(stevvooe): Reclaim tag id. + case <-t.ctx.Done(): + return + case <-t.closed: + return + } + } +} + +func (t *transport) Close() error { + select { + case <-t.closed: + return ErrClosed + case <-t.ctx.Done(): + return t.ctx.Err() + default: + close(t.closed) + } + + return nil +}