commit - 6b84ea70bad42d1dc151a3547b21c8818527e78d
commit + e6bcde663b39a93941cfd4188a6378e2ac5b92b2
blob - acb4b04fc8d42393ecdd5d8c7444c95354fb2144
blob + 5993872f445220c17408dabd657bcfd53470ba28
--- client.go
+++ client.go
package p9pnew
import (
- "bufio"
"fmt"
"log"
- "time"
"golang.org/x/net/context"
)
type client struct {
- ctx context.Context
- conn net.Conn
- tags *tagPool
- requests chan fcallRequest
- closed chan struct{}
+ ctx context.Context
+ transport roundTripper
}
// NewSession returns a session using the connection. The Context ctx provides
// session. The session can effectively shutdown with this context.
func NewSession(ctx context.Context, conn net.Conn) (Session, error) {
return &client{
- ctx: ctx,
- conn: conn,
+ ctx: ctx,
+ transport: newTransport(ctx, conn),
}, nil
}
}
func (c *client) Attach(ctx context.Context, fid, afid Fid, uname, aname string) (Qid, error) {
+ log.Println("client attach", fid, aname)
fcall := &Fcall{
Type: Tattach,
Message: &MessageTattach{
},
}
- resp, err := c.send(ctx, fcall)
+ resp, err := c.transport.send(ctx, fcall)
if err != nil {
return Qid{}, err
}
}
func (c *client) Clunk(ctx context.Context, fid Fid) error {
- panic("not implemented")
+ fcall := newFcall(&MessageTclunk{
+ Fid: fid,
+ })
+
+ resp, err := c.transport.send(ctx, fcall)
+ if err != nil {
+ return err
+ }
+
+ if resp.Type != Rclunk {
+ return fmt.Errorf("incorrect response type: %v", resp)
+ }
+
+ return nil
}
func (c *client) Remove(ctx context.Context, fid Fid) error {
func (c *client) Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error) {
if len(names) > 16 {
- // TODO(stevvooe): Implement multi-message handling for more than 16
- // wnames. May want to actually force caller to implement this since
- // we'll need a new fid for each RPC.
- panic("more than 16 components not implemented")
+ return nil, fmt.Errorf("too many elements in wname")
}
fcall := &Fcall{
Message: &MessageTwalk{
Fid: fid,
Newfid: newfid,
- Wname: names,
+ Wnames: names,
},
}
- resp, err := c.send(ctx, fcall)
+ resp, err := c.transport.send(ctx, fcall)
if err != nil {
return nil, err
}
},
}
- resp, err := c.send(ctx, fcall)
+ resp, err := c.transport.send(ctx, fcall)
if err != nil {
return 0, err
}
},
}
- resp, err := c.send(ctx, fcall)
+ resp, err := c.transport.send(ctx, fcall)
if err != nil {
return 0, err
}
mrr, ok := resp.Message.(MessageRwrite)
if !ok {
- return 0, fmt.Errorf("invalid rpc response for read message: %v", resp)
+ return 0, fmt.Errorf("invalid rpc response for write message: %v", resp)
}
return int(mrr.Count), nil
}
-func (c *client) Open(ctx context.Context, fid Fid, mode int32) (Qid, error) {
- panic("not implemented")
+func (c *client) Open(ctx context.Context, fid Fid, mode uint8) (Qid, uint32, error) {
+ fcall := newFcall(&MessageTopen{
+ Fid: fid,
+ Mode: mode,
+ })
+
+ resp, err := c.transport.send(ctx, fcall)
+ if err != nil {
+ return Qid{}, 0, err
+ }
+
+ respmsg, ok := resp.Message.(MessageRopen)
+ if !ok {
+ return Qid{}, 0, fmt.Errorf("invalid rpc response for open message: %v", resp)
+ }
+
+ return respmsg.Qid, respmsg.Msize, nil
}
func (c *client) Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error) {
}
func (c *client) Version(ctx context.Context, msize uint32, version string) (uint32, string, error) {
- fcall := &Fcall{
- Type: Tversion,
- Message: MessageVersion{
- MSize: uint32(msize),
- Version: version,
- },
+ msg := MessageTversion{
+ MSize: uint32(msize),
+ Version: version,
}
- resp, err := c.send(ctx, fcall)
+ fcall := newFcall(msg)
+
+ resp, err := c.transport.send(ctx, fcall)
if err != nil {
return 0, "", err
}
- mv, ok := resp.Message.(*MessageVersion)
+ mv, ok := resp.Message.(*MessageRversion)
if !ok {
return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
}
func (c *client) flush(ctx context.Context, tag Tag) error {
// TODO(stevvooe): We need to fire and forget flush messages when a call
// context gets cancelled.
- panic("not implemented")
-}
-// send dispatches the fcall.
-func (c *client) send(ctx context.Context, fc *Fcall) (*Fcall, error) {
- fc.Tag = c.tags.Get()
- defer c.tags.Put(fc.Tag)
-
- fcreq := newFcallRequest(ctx, fc)
-
- // dispatch the request.
- select {
- case <-c.closed:
- return nil, ErrClosed
- case c.requests <- fcreq:
- case <-ctx.Done():
- return nil, ctx.Err()
- }
-
- // wait for the response.
- select {
- case <-c.closed:
- return nil, ErrClosed
- case <-ctx.Done():
- return nil, ctx.Err()
- case resp := <-fcreq.response:
- return resp, nil
- }
-}
-
-type fcallRequest struct {
- ctx context.Context
- fcall *Fcall
- response chan *Fcall
- err chan error
+ panic("not implemented")
}
-
-func newFcallRequest(ctx context.Context, fc *Fcall) fcallRequest {
- return fcallRequest{
- ctx: ctx,
- fcall: fc,
- response: make(chan *Fcall, 1),
- err: make(chan error, 1),
- }
-}
-
-// handle takes messages off the wire and wakes up the waiting tag call.
-func (c *client) handle() {
-
- var (
- responses = make(chan *Fcall)
- // outstanding provides a map of tags to outstanding requests.
- outstanding = map[Tag]*fcallRequest{}
- )
-
- // loop to read messages off of the connection
- go func() {
- dec := &decoder{bufio.NewReader(c.conn)}
-
- loop:
- for {
- const pump = time.Second
-
- // Continuously set the read dead line pump the loop below. We can
- // probably set a connection dead threshold that can count these.
- // Usually, this would only matter when there are actually
- // outstanding requests.
- deadline, ok := c.ctx.Deadline()
- if !ok {
- deadline = time.Now().Add(pump)
- } else {
- // if the deadline is before
- nd := time.Now().Add(pump)
- if nd.Before(deadline) {
- deadline = nd
- }
- }
-
- if err := c.conn.SetReadDeadline(deadline); err != nil {
- panic(fmt.Sprintf("error setting read deadline: %v", err))
- }
-
- fc := new(Fcall)
- if err := dec.decode(fc); err != nil {
- switch err := err.(type) {
- case net.Error:
- if err.Timeout() || err.Temporary() {
- break loop
- }
- }
-
- panic(fmt.Sprintf("connection read error: %v", err))
- }
-
- select {
- case <-c.ctx.Done():
- return
- case <-c.closed:
- return
- case responses <- fc:
- }
- }
-
- }()
-
- enc := &encoder{bufio.NewWriter(c.conn)}
-
- for {
- select {
- case <-c.ctx.Done():
- return
- case <-c.closed:
- return
- case req := <-c.requests:
- outstanding[req.fcall.Tag] = &req
-
- // use deadline to set write deadline for this request.
- deadline, ok := req.ctx.Deadline()
- if !ok {
- deadline = time.Now().Add(time.Second)
- }
-
- if err := c.conn.SetWriteDeadline(deadline); err != nil {
- log.Println("error setting write deadline: %v", err)
- }
-
- if err := enc.encode(req.fcall); err != nil {
- delete(outstanding, req.fcall.Tag)
- req.err <- err
- }
- case b := <-responses:
- req, ok := outstanding[b.Tag]
- if !ok {
- panic("unknown tag received")
- }
- delete(outstanding, req.fcall.Tag)
-
- req.response <- b
- }
- }
-}
blob - dbc20c3115cb222fa2a80d65f3472d7425979d5a
blob + d003a08067db2beb05f4e2124a29c59f1d0a43b8
--- cmd/9pr/main.go
+++ cmd/9pr/main.go
"fmt"
"io"
"log"
+ "net"
+ "net/http"
+ _ "net/http/pprof"
"os"
"path"
"strings"
)
func main() {
+ go func() {
+ log.Println(http.ListenAndServe("localhost:6060", nil))
+ }()
+
log.SetFlags(0)
// addr := os.Args[1]
-
+ ctx := context.Background()
// TODO(stevvooe): Use a dialer once we have the server session working
// and running.
+ session := newSimpleSession()
+
+ sconn, cconn := net.Pipe()
+
+ go p9pnew.Serve(ctx, sconn, session)
+
+ log.Println("new session")
+ csession, err := p9pnew.NewSession(ctx, cconn)
+ if err != nil {
+ log.Fatalln(err)
+ }
+
// session, err := p9pnew.Dial(ctx, addr)
// if err != nil {
// log.Fatalln(err)
commander := &fsCommander{
ctx: context.Background(),
- session: newSimpleSession(),
+ session: csession,
pwd: "/",
stdout: os.Stdout,
stderr: os.Stderr,
}
commander.readline = rl
+ log.Println("attach root")
// attach root
commander.nextfid = 1
if _, err := commander.session.Attach(commander.ctx, commander.nextfid, p9pnew.NOFID, "anyone", "/"); err != nil {
commander.rootfid = commander.nextfid
commander.nextfid++
+ log.Println("clone root")
// clone the pwd fid so we can clunk it
if _, err := commander.session.Walk(commander.ctx, commander.rootfid, commander.nextfid); err != nil {
log.Fatalln(err)
args := strings.Fields(line)
name := args[0]
- var cmd func(args ...string) error
+ var cmd func(ctx context.Context, args ...string) error
switch name {
case "ls":
case "cat":
cmd = commander.cmdcat
default:
- cmd = func(args ...string) error {
+ cmd = func(ctx context.Context, args ...string) error {
return fmt.Errorf("command not implemented")
}
}
- if err := cmd(args[1:]...); err != nil {
+ ctx, _ = context.WithTimeout(commander.ctx, time.Second)
+ if err := cmd(ctx, args[1:]...); err != nil {
log.Printf("👹 %s: %v", name, err)
}
}
stderr io.Writer
}
-func (c *fsCommander) cmdls(args ...string) error {
+func (c *fsCommander) cmdls(ctx context.Context, args ...string) error {
ps := []string{c.pwd}
if len(args) > 0 {
ps = args
targetfid := c.nextfid
c.nextfid++
components := strings.Split(strings.Trim(p, "/"), "/")
- if _, err := c.session.Walk(c.ctx, c.rootfid, targetfid, components...); err != nil {
+ if _, err := c.session.Walk(ctx, c.rootfid, targetfid, components...); err != nil {
return err
}
- defer c.session.Clunk(c.ctx, targetfid)
+ defer c.session.Clunk(ctx, targetfid)
- if _, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD); err != nil {
+ if _, _, err := c.session.Open(ctx, targetfid, p9pnew.OREAD); err != nil {
return err
}
p := make([]byte, 4<<20)
- n, err := c.session.Read(c.ctx, targetfid, p, 0)
+ n, err := c.session.Read(ctx, targetfid, p, 0)
if err != nil {
return err
}
return wr.Flush()
}
-func (c *fsCommander) cmdcd(args ...string) error {
+func (c *fsCommander) cmdcd(ctx context.Context, args ...string) error {
var p string
switch len(args) {
case 0:
return nil
}
-func (c *fsCommander) cmdpwd(args ...string) error {
+func (c *fsCommander) cmdpwd(ctx context.Context, args ...string) error {
if len(args) != 0 {
return fmt.Errorf("pwd takes no arguments")
}
return nil
}
-func (c *fsCommander) cmdcat(args ...string) error {
+func (c *fsCommander) cmdcat(ctx context.Context, args ...string) error {
var p string
switch len(args) {
case 0:
}
defer c.session.Clunk(c.ctx, c.pwdfid)
- if _, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD); err != nil {
+ _, msize, err := c.session.Open(c.ctx, targetfid, p9pnew.OREAD)
+ if err != nil {
return err
}
- b := make([]byte, 4<<20)
+ b := make([]byte, msize)
n, err := c.session.Read(c.ctx, targetfid, b, 0)
if err != nil {
b.add(bc)
root.add(a)
root.add(b)
- log.Println(root.children)
- log.Println(a.children)
- log.Println(b.children)
return &simpleSession{
root: root,
panic("not implemented")
}
-func (s *simpleSession) Open(ctx context.Context, fid p9pnew.Fid, mode int32) (p9pnew.Qid, error) {
+func (s *simpleSession) Open(ctx context.Context, fid p9pnew.Fid, mode uint8) (p9pnew.Qid, uint32, error) {
fi, ok := s.fids[fid]
if !ok {
- return p9pnew.Qid{}, p9pnew.ErrUnknownfid
+ return p9pnew.Qid{}, 0, p9pnew.ErrUnknownfid
}
s.opened[fid] = struct{}{}
- return fi.dir.Qid, nil
+ return fi.dir.Qid, 4 << 20, nil
}
func (s *simpleSession) Create(ctx context.Context, parent p9pnew.Fid, name string, perm uint32, mode uint32) (p9pnew.Qid, error) {
blob - 45ec913cb709375259ca68b89724efe2da533146
blob + 529dbcad3e754a2d6af8c32544adc0bfbc3d0c39
--- encoding.go
+++ encoding.go
return elements, nil
}
+
+func pretty9p(w io.Writer, v interface{}) error {
+ switch v := v.(type) {
+ case *Fcall:
+ pretty9p(w, *v)
+ case Fcall:
+ fmt.Fprintf(w, "uint32(%v) %v(%v) ", size9p(v), v.Type, v.Tag)
+ pretty9p(w, v.Message)
+ fmt.Fprintln(w)
+ }
+
+ return nil
+}
blob - 1467c5bfd69e08e5dabbf2b195dd87d531b47ee3
blob + 76b8a9ae3ccc5c4a86aa70d2b232a1155dbd2f8f
--- encoding_test.go
+++ encoding_test.go
0x4, 0x0, 0x71, 0x77, 0x65, 0x72,
0x4, 0x0, 0x7a, 0x78, 0x63, 0x76},
},
+ // Dir
+ // Qid
{
description: "Tversion fcall",
target: &Fcall{
Type: Tversion,
Tag: 2255,
- Message: &MessageVersion{
+ Message: &MessageTversion{
MSize: uint32(1024),
Version: "9PTEST",
},
0x6, 0x0, 0x39, 0x50, 0x54, 0x45, 0x53, 0x54},
},
{
+ description: "Rversion fcall",
+ target: &Fcall{
+ Type: Rversion,
+ Tag: 2255,
+ Message: &MessageRversion{
+ MSize: uint32(1024),
+ Version: "9PTEST",
+ },
+ },
+ marshaled: []byte{
+ 0x13, 0x0, 0x0, 0x0,
+ 0x65, 0xcf, 0x8, 0x0, 0x4, 0x0, 0x0,
+ 0x6, 0x0, 0x39, 0x50, 0x54, 0x45, 0x53, 0x54},
+ },
+ {
description: "Twalk fcall",
target: &Fcall{
Type: Twalk,
blob - 9933b9852590d921bfbd30d5d5611301b45cbabd
blob + 05d7d2c1a99ab94e8695ccc4740dedfda8d0371f
--- fcall.go
+++ fcall.go
Message Message
}
-func (fc Fcall) String() string {
+func newFcall(msg Message) *Fcall {
+ return &Fcall{
+ Type: msg.Type(),
+ Message: msg,
+ }
+}
+
+func (fc *Fcall) String() string {
return fmt.Sprintf("%8d %v(%v) %v", size9p(fc), fc.Type, fc.Tag, fc.Message)
}
type Message interface {
- // Size() uint32
-
- // NOTE(stevvooe): The binary marshal approach isn't particularly nice to
- // generating garbage. Consider using an append model, once we have the
- // messages worked out.
- // encoding.BinaryMarshaler
- // encoding.BinaryUnmarshaler
-
- message9p()
+ // Type indicates the Fcall type of the message. This must match
+ // Fcall.Type.
+ Type() FcallType
}
// newMessage returns a new instance of the message based on the Fcall type.
// NOTE(stevvooe): This is a nasty bit of code but makes the transport
// fairly simple to implement.
switch typ {
- case Tversion, Rversion:
- return &MessageVersion{}, nil
+ case Tversion:
+ return &MessageTversion{}, nil
+ case Rversion:
+ return &MessageRversion{}, nil
case Tauth:
case Rauth:
-
case Tattach:
-
+ return &MessageTattach{}, nil
case Rattach:
-
- case Terror:
-
+ return &MessageRattach{}, nil
case Rerror:
-
+ return &MessageRerror{}, nil
case Tflush:
return &MessageTflush{}, nil
case Rflush:
case Rwalk:
return &MessageRwalk{}, nil
case Topen:
-
+ return &MessageTopen{}, nil
case Ropen:
-
+ return &MessageRopen{}, nil
case Tcreate:
case Rcreate:
case Rwrite:
return &MessageRwrite{}, nil
case Tclunk:
-
+ return &MessageTclunk{}, nil
case Rclunk:
-
+ return nil, nil // no response body
case Tremove:
case Rremove:
case Twstat:
case Rwstat:
- default:
- return nil, fmt.Errorf("unknown message type: %v", typ)
}
- return nil, fmt.Errorf("unknown message")
+ return nil, fmt.Errorf("unknown message type")
}
// MessageVersion encodes the message body for Tversion and Rversion RPC
// calls. The body is identical in both directions.
-type MessageVersion struct {
+type MessageTversion struct {
MSize uint32
Version string
}
-func (MessageVersion) message9p() {}
-
-func (mv MessageVersion) String() string {
- return fmt.Sprintf("msize=%v version=%v", mv.MSize, mv.Version)
+type MessageRversion struct {
+ MSize uint32
+ Version string
}
-type MessageTAuth struct {
+type MessageTauth struct {
Afid Fid
Uname string
Aname string
}
-type MessageRAuth struct {
+type MessageRauth struct {
Qid Qid
}
Ename string
}
-// MessageTflush handles the content for the Tflush message type.
type MessageTflush struct {
Oldtag Tag
}
-func (MessageTflush) message9p() {}
-
type MessageTattach struct {
Fid Fid
Afid Fid
Aname string
}
-func (MessageTattach) message9p() {}
-
type MessageRattach struct {
Qid Qid
}
-func (MessageRattach) message9p() {}
-
type MessageTwalk struct {
Fid Fid
Newfid Fid
- Wname []string
+ Wnames []string
}
-func (MessageTwalk) message9p() {}
-
type MessageRwalk struct {
Qids []Qid
}
-func (MessageRwalk) message9p() {}
-
type MessageTopen struct {
Fid Fid
Mode uint8
}
-func (MessageTopen) message9p() {}
-
type MessageRopen struct {
Qid Qid
Msize uint32
}
-func (MessageRopen) message9p() {}
-
type MessageTcreate struct {
Fid Fid
Name string
Mode uint8
}
-func (MessageTcreate) message9p() {}
-
type MessageRcreate struct {
Qid Qid
IOUnit uint32
}
-func (MessageRcreate) message9p() {}
-
type MessageTread struct {
Fid Fid
Offset uint64
Count uint32
}
-func (MessageTread) message9p() {}
-
type MessageRread struct {
Data []byte
}
-func (MessageRread) message9p() {}
-
type MessageTwrite struct {
Fid Fid
Offset uint64
Data []byte
}
-func (MessageTwrite) message9p() {}
-
type MessageRwrite struct {
Count uint32
}
-func (MessageRwrite) message9p() {}
-
type MessageTclunk struct {
Fid Fid
}
Stat Dir
}
-func (MessageRstat) message9p() {}
-
type MessageTwstat struct {
Fid Fid
Stat Dir
}
+
+func (MessageTversion) Type() FcallType { return Tversion }
+func (MessageRversion) Type() FcallType { return Rversion }
+func (MessageTauth) Type() FcallType { return Tauth }
+func (MessageRauth) Type() FcallType { return Rauth }
+func (MessageRerror) Type() FcallType { return Rerror }
+func (MessageTflush) Type() FcallType { return Tflush }
+func (MessageTattach) Type() FcallType { return Tattach }
+func (MessageRattach) Type() FcallType { return Rattach }
+func (MessageTwalk) Type() FcallType { return Twalk }
+func (MessageRwalk) Type() FcallType { return Rwalk }
+func (MessageTopen) Type() FcallType { return Topen }
+func (MessageRopen) Type() FcallType { return Ropen }
+func (MessageTcreate) Type() FcallType { return Tcreate }
+func (MessageRcreate) Type() FcallType { return Rcreate }
+func (MessageTread) Type() FcallType { return Tread }
+func (MessageRread) Type() FcallType { return Rread }
+func (MessageTwrite) Type() FcallType { return Twrite }
+func (MessageRwrite) Type() FcallType { return Rwrite }
+func (MessageTclunk) Type() FcallType { return Tclunk }
+func (MessageTremove) Type() FcallType { return Tremove }
+func (MessageTstat) Type() FcallType { return Tstat }
+func (MessageRstat) Type() FcallType { return Rstat }
+func (MessageTwstat) Type() FcallType { return Twstat }
blob - 19ec9f4039516b5fcebf2abc647d96a425cc374f
blob + dce6821112555e7c268f22d5c2cbaa952dd1e991
--- server.go
+++ server.go
-// +build ignore
-
package p9pnew
import (
+ "bufio"
+ "fmt"
"log"
"net"
"time"
)
// Serve the 9p session over the provided network connection.
-func Serve(ctx context.Context, conn net.Conn, session Session) error {
- panic("not implemented")
+func Serve(ctx context.Context, conn net.Conn, session Session) {
+ s := &server{
+ ctx: ctx,
+ conn: conn,
+ session: session,
+ }
+
+ s.run()
}
type server struct {
ctx context.Context
session Session
conn net.Conn
+ closed chan struct{}
}
func (s *server) run() {
- dec := decoder{s.conn}
+ brd := bufio.NewReader(s.conn)
+ dec := &decoder{brd}
+ bwr := bufio.NewWriter(s.conn)
+ enc := &encoder{bwr}
- fcall := new(Fcall)
- if err := dec.decode(fcall); err != nil {
- log.Println(err)
- }
+ tags := map[Tag]*Fcall{} // active requests
+ log.Println("server.run()")
+ for {
+ select {
+ case <-s.ctx.Done():
+ log.Println("server: shutdown")
+ return
+ case <-s.closed:
+ default:
+ }
+
+ // NOTE(stevvooe): For now, we only provide a single request at a time
+ // handler. We can refactor this to take requests off the wire as
+ // quickly as they arrive and dispatch in parallel to session.
+
+ log.Println("server:", "wait")
+ fcall := new(Fcall)
+ if err := dec.decode(fcall); err != nil {
+ log.Println("server decoding fcall:", err)
+ continue
+ }
+
+ log.Println("server:", "message", fcall)
+
+ if _, ok := tags[fcall.Tag]; ok {
+ if err := enc.encode(&Fcall{
+ Type: Rerror,
+ Tag: fcall.Tag,
+ Message: &MessageRerror{
+ Ename: ErrDuptag.Error(),
+ },
+ }); err != nil {
+ log.Println("server:", err)
+ }
+ bwr.Flush()
+ continue
+ }
+ tags[fcall.Tag] = fcall
+
+ resp, err := s.handle(fcall)
+ if err != nil {
+ log.Println("server:", err)
+ continue
+ }
+
+ if err := enc.encode(resp); err != nil {
+ log.Println("server:", err)
+ continue
+ }
+ bwr.Flush()
+
+ }
}
// handle responds to an fcall using the session. An error is only returned if
// the handler cannot proceed. All session errors are returned as Rerror.
-func (s *server) handle(f *Fcall) (*Fcall, error) {
+func (s *server) handle(req *Fcall) (*Fcall, error) {
const timeout = 30 * time.Second // TODO(stevvooe): Allow this to be configured.
- ctx, cancel = context.WithTimeout(s.ctx, timeout)
+ ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()
- switch fcall.Type {
+ var resp *Fcall
+ switch req.Type {
case Tattach:
- atc, ok := fcall.Message.(*MessageTattach)
- if ok {
- log.Println("bad message")
- continue
+ reqmsg, ok := req.Message.(*MessageTattach)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
}
- qid, err := s.session.Attach(s.ctx, atc.Fid, atc.Afid, atc.Uname, atc.Aname)
+ qid, err := s.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname)
if err != nil {
- return
+ return nil, err
}
+
+ resp = &Fcall{
+ Type: Rattach,
+ Tag: req.Tag,
+ Message: &MessageRattach{
+ Qid: qid,
+ },
+ }
+ case Twalk:
+ reqmsg, ok := req.Message.(*MessageTwalk)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
+ }
+
+ // TODO(stevvooe): This is one of the places where we need to manage
+ // fid allocation lifecycle. We need to reserve the fid, then, if this
+ // call succeeds, we should alloc the fid for future uses. Also need
+ // to interact correctly with concurrent clunk and the flush of this
+ // walk message.
+ qids, err := s.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...)
+ if err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(&MessageRwalk{
+ Qids: qids,
+ })
}
+
+ if resp == nil {
+ resp = newFcall(&MessageRerror{
+ Ename: "unknown message type",
+ })
+ }
+
+ resp.Tag = req.Tag
+ return resp, nil
}
blob - 9eac603c2d0f52f707d8bf18e975a11f2c712207
blob + 37779b341b219f43c948e22ba0287ac579e16625
--- session.go
+++ session.go
Walk(ctx context.Context, fid Fid, newfid Fid, names ...string) ([]Qid, error)
Read(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error)
Write(ctx context.Context, fid Fid, p []byte, offset int64) (n int, err error)
- Open(ctx context.Context, fid Fid, mode int32) (Qid, error)
+ Open(ctx context.Context, fid Fid, mode uint8) (Qid, uint32, error)
Create(ctx context.Context, parent Fid, name string, perm uint32, mode uint32) (Qid, error)
Stat(context.Context, Fid) (Dir, error)
WStat(context.Context, Fid, Dir) error
blob - 4bea0b041f5c0da62d20e12af577067a28bf09e8
blob + ab572f197ea3d78f7f9b4c37955a30121793f3fa
--- tags.go
+++ tags.go
// NewtagPool returns a tag pool with the maximum number of outstanding
// requests.
-func newTagPool(outstanding int) (*tagPool, error) {
+func newTagPool(outstanding int) *tagPool {
return &tagPool{
maximum: Tag(outstanding),
freelist: make(chan Tag, outstanding),
nexttag: make(chan Tag),
closed: make(chan struct{}),
- }, nil
+ }
}
blob - /dev/null
blob + 82c3b4b965eb8eeaaa15ad008456f41a203c091c (mode 644)
--- /dev/null
+++ transport.go
+package p9pnew
+
+import (
+ "bufio"
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// roundTripper manages the request and response from the client-side. A
+// roundTripper must abide by many of the rules of http.RoundTripper.
+// Typically, the roundTripper will manage tag assignment and message
+// serialization.
+type roundTripper interface {
+ send(ctx context.Context, fc *Fcall) (*Fcall, error)
+}
+
+type transport struct {
+ ctx context.Context
+ conn net.Conn
+ requests chan *fcallRequest
+ closed chan struct{}
+
+ tags uint16
+}
+
+func newTransport(ctx context.Context, conn net.Conn) roundTripper {
+ t := &transport{
+ ctx: ctx,
+ conn: conn,
+ requests: make(chan *fcallRequest),
+ closed: make(chan struct{}),
+ }
+
+ go t.handle()
+
+ return t
+}
+
+type fcallRequest struct {
+ ctx context.Context
+ fcall *Fcall
+ response chan *Fcall
+ err chan error
+}
+
+func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest {
+ return &fcallRequest{
+ ctx: ctx,
+ fcall: fcall,
+ response: make(chan *Fcall, 1),
+ err: make(chan error, 1),
+ }
+}
+
+func (t *transport) send(ctx context.Context, fcall *Fcall) (*Fcall, error) {
+
+ req := newFcallRequest(ctx, fcall)
+
+ log.Println("dispatch", fcall)
+ // dispatch the request.
+ select {
+ case <-t.closed:
+ return nil, ErrClosed
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case t.requests <- req:
+ }
+
+ log.Println("wait", fcall)
+ // wait for the response.
+ select {
+ case <-t.closed:
+ return nil, ErrClosed
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case resp := <-req.response:
+ return resp, nil
+ }
+}
+
+// handle takes messages off the wire and wakes up the waiting tag call.
+func (t *transport) handle() {
+
+ // the following variable block are protected components owned by this thread.
+ var (
+ responses = make(chan *Fcall)
+ tags Tag
+ // outstanding provides a map of tags to outstanding requests.
+ outstanding = map[Tag]*fcallRequest{}
+ brd = bufio.NewReader(t.conn)
+ bwr = bufio.NewWriter(t.conn)
+ enc = &encoder{bwr}
+ dec = &decoder{brd}
+ )
+
+ // loop to read messages off of the connection
+ go func() {
+
+ loop:
+ for {
+ const pump = time.Second
+
+ // Continuously set the read dead line pump the loop below. We can
+ // probably set a connection dead threshold that can count these.
+ // Usually, this would only matter when there are actually
+ // outstanding requests.
+ deadline, ok := t.ctx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(pump)
+ } else {
+ // if the deadline is before
+ nd := time.Now().Add(pump)
+ if nd.Before(deadline) {
+ deadline = nd
+ }
+ }
+
+ if err := t.conn.SetReadDeadline(deadline); err != nil {
+ log.Printf("error setting read deadline: %v", err)
+ }
+
+ fc := new(Fcall)
+ if err := dec.decode(fc); err != nil {
+ switch err := err.(type) {
+ case net.Error:
+ if err.Timeout() || err.Temporary() {
+ break loop
+ }
+ }
+
+ panic(fmt.Sprintf("connection read error: %v", err))
+ }
+
+ select {
+ case <-t.ctx.Done():
+ return
+ case <-t.closed:
+ return
+ case responses <- fc:
+ }
+ }
+ }()
+
+ for {
+ select {
+ case req := <-t.requests:
+ tags++
+ req.fcall.Tag = tags
+ outstanding[req.fcall.Tag] = req
+
+ pretty9p(os.Stdout, req.fcall)
+ // use deadline to set write deadline for this request.
+ deadline, ok := req.ctx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(time.Second)
+ }
+
+ if err := t.conn.SetWriteDeadline(deadline); err != nil {
+ log.Printf("error setting write deadline: %v", err)
+ }
+
+ // TODO(stevvooe): Consider the case of requests that never
+ // receive a response. We need to remove the fcall context from
+ // the tag map and dealloc the tag. We may also want to send a
+ // flush for the tag.
+
+ log.Println("send", req.fcall)
+ if err := enc.encode(req.fcall); err != nil {
+ delete(outstanding, req.fcall.Tag)
+ req.err <- err
+ }
+ if err := bwr.Flush(); err != nil {
+ delete(outstanding, req.fcall.Tag)
+ req.err <- err
+ }
+
+ log.Println("sent", req.fcall)
+ case b := <-responses:
+ req, ok := outstanding[b.Tag]
+ if !ok {
+ panic("unknown tag received")
+ }
+ delete(outstanding, req.fcall.Tag)
+
+ req.response <- b
+
+ // TODO(stevvooe): Reclaim tag id.
+ case <-t.ctx.Done():
+ return
+ case <-t.closed:
+ return
+ }
+ }
+}
+
+func (t *transport) Close() error {
+ select {
+ case <-t.closed:
+ return ErrClosed
+ case <-t.ctx.Done():
+ return t.ctx.Err()
+ default:
+ close(t.closed)
+ }
+
+ return nil
+}