commit 40d4a02d02470ddbfcb31818213c5daea4d1f545 from: Stephen J Day date: Tue Nov 03 04:38:15 2015 UTC fs/p9p/new: proxy end to end fully working Signed-off-by: Stephen J Day commit - f41196dcf2b4d9e98d8e2addb0cd403e5405a3f0 commit + 40d4a02d02470ddbfcb31818213c5daea4d1f545 blob - 52786490fc789cd9c9104fda5820ec0ddf135dbe blob + a9453687ff9c6bcaec2512db06f1fbd224dd936a --- channel.go +++ channel.go @@ -13,6 +13,27 @@ import ( "golang.org/x/net/context" ) +// Channel defines the operations necessary to implement a 9p message channel +// interface. Typically, message channels do no protocol processing except to +// send and receive message frames. +type Channel interface { + // ReadFcall reads one fcall frame into the provided fcall structure. The + // Fcall may be cleared whether there is an error or not. If the operation + // is successful, the contents of the fcall will be populated in the + // argument. ReadFcall cannot be called concurrently with other calls to + // ReadFcall. This both to preserve message ordering and to allow lockless + // buffer reusage. + ReadFcall(ctx context.Context, fcall *Fcall) error + + // WriteFcall writes the provided fcall to the channel. WriteFcall cannot + // be called concurrently with other calls to WriteFcall. + WriteFcall(ctx context.Context, fcall *Fcall) error + + // SetMSize sets the maximum message size for the channel. This must never + // be called currently with ReadFcall or WriteFcall. + SetMSize(msize int) +} + const ( defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context ) @@ -20,12 +41,34 @@ const ( // channel provides bidirectional protocol framing for 9p over net.Conn. // Operations are not thread-safe but reads and writes may be carried out // concurrently, supporting separate read and write loops. +// +// Lifecyle +// +// A connection, or message channel abstraction, has a lifecycle delineated by +// Tversion/Rversion request response cycles. For now, this is part of the +// channel itself but doesn't necessarily influence the channels state, except +// the msize. Visually, it might look something like this: +// +// [Established] -> [Version] -> [Session] -> [Version]---+ +// ^ | +// |_________________________________| +// +// The connection is established, then we negotiate a version, run a session, +// then negotiate a version and so on. For most purposes, we are likely going +// to terminate the connection after the session but we may want to support +// connection pooling. Pooling may result in possible security leaks if the +// connections are shared among contexts, since the version is negotiated at +// the start of the session. To avoid this, we can actually use a "tombstone" +// version message which clears the server's session state without starting a +// new session. The next version message would then prepare the session +// without leaking any Fid's. type channel struct { conn net.Conn codec Codec brd *bufio.Reader bwr *bufio.Writer closed chan struct{} + msize int rdbuf []byte wrbuf []byte } @@ -37,6 +80,7 @@ func newChannel(conn net.Conn, codec Codec, msize int) brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size bwr: bufio.NewWriterSize(conn, msize), closed: make(chan struct{}), + msize: msize, rdbuf: make([]byte, msize), wrbuf: make([]byte, msize), } @@ -48,6 +92,7 @@ func (ch *channel) setmsize(msize int) { // NOTE(stevvooe): We cannot safely resize the buffered reader and writer. // Proceed assuming that original size is sufficient. + ch.msize = msize if msize < len(ch.rdbuf) { // just change the cap ch.rdbuf = ch.rdbuf[:msize] @@ -85,9 +130,65 @@ func (ch *channel) version(ctx context.Context, msize return mv.MSize, mv.Version, nil } +// negotiate blocks until a version message is received or a timeout occurs. +// The msize for the tranport will be set from the negotiation. If negotiate +// returns nil, a server may proceed with the connection. +// +// In the future, it might be better to handle the version messages in a +// separate object that manages the session. Each set of version requests +// effectively "reset" a connection, meaning all fids get clunked and all +// outstanding IO is aborted. This is probably slightly racy, in practice with +// a misbehaved client. The main issue is that we cannot tell which session +// messages belong to. +func (ch *channel) negotiate(ctx context.Context, version string) error { + // wait for the version message over the transport. + req := new(Fcall) + if err := ch.readFcall(ctx, req); err != nil { + return err + } + + mv, ok := req.Message.(*MessageTversion) + if !ok { + return fmt.Errorf("expected version message: %v", mv) + } + + respmsg := MessageRversion{ + Version: version, + } + + if mv.Version != version { + // TODO(stevvooe): Not the best place to do version handling. We need + // to have a way to pass supported versions into this method then have + // it return the actual version. For now, respond with unknown for + // anything that doesn't match the provided version string. + respmsg.Version = "unknown" + } + + if int(mv.MSize) < ch.msize { + // if the server msize is too large, use the client's suggested msize. + ch.setmsize(int(mv.MSize)) + respmsg.MSize = mv.MSize + } else { + respmsg.MSize = uint32(ch.msize) + } + + resp := newFcall(respmsg) + if err := ch.writeFcall(ctx, resp); err != nil { + return err + } + + if respmsg.Version == "unknown" { + return fmt.Errorf("bad version negotiation") + } + + return nil +} + // ReadFcall reads the next message from the channel into fcall. func (ch *channel) readFcall(ctx context.Context, fcall *Fcall) error { select { + case <-ctx.Done(): + return ctx.Err() case <-ch.closed: return ErrClosed default: @@ -115,6 +216,8 @@ func (ch *channel) readFcall(ctx context.Context, fcal return fmt.Errorf("message large than buffer:", n) } + // clear out the fcall + *fcall = Fcall{} if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil { return err } @@ -124,6 +227,8 @@ func (ch *channel) readFcall(ctx context.Context, fcal func (ch *channel) writeFcall(ctx context.Context, fcall *Fcall) error { select { + case <-ctx.Done(): + return ctx.Err() case <-ch.closed: return ErrClosed default: blob - 25b2080e9f954172c4073e32232291a5f7030dbb blob + 2da30033b5ccc50bec99a071ca5793ebeaec32f0 --- cmd/9pr/main.go +++ cmd/9pr/main.go @@ -34,9 +34,15 @@ func main() { ctx := context.Background() log.SetFlags(0) flag.Parse() + + proto := "tcp" + if strings.HasPrefix(addr, "unix:") { + proto = "unix" + addr = addr[5:] + } log.Println("dialing", addr) - conn, err := net.Dial("tcp", addr) + conn, err := net.Dial(proto, addr) if err != nil { log.Fatal(err) } blob - 7818604df1a25843f729bd82b77a66aa08051e81 blob + e190166973a54ec0561999305e6166b8d9f360ab --- cmd/9ps/main.go +++ cmd/9ps/main.go @@ -45,6 +45,8 @@ func main() { } go func(conn net.Conn) { + defer conn.Close() + ctx := context.WithValue(ctx, "conn", conn) log.Println("connected", conn.RemoteAddr()) session, err := newLocalSession(ctx, root) blob - /dev/null blob + 5d884701a2cbc04f08d9dff5e5ddfcf4c4e27e27 (mode 644) --- /dev/null +++ dispatcher.go @@ -0,0 +1,173 @@ +package p9pnew + +import ( + "fmt" + + "golang.org/x/net/context" +) + +type handler interface { + handle(ctx context.Context, req *Fcall) (*Fcall, error) +} + +// dispatcher routes fcalls to a Session. +type dispatcher struct { + session Session +} + +// handle responds to an fcall using the session. An error is only returned if +// the handler cannot proceed. All session errors are returned as Rerror. +func (d *dispatcher) handle(ctx context.Context, req *Fcall) (*Fcall, error) { + var resp *Fcall + switch req.Type { + case Tauth: + reqmsg, ok := req.Message.(MessageTauth) + if !ok { + return nil, fmt.Errorf("incorrect message for type: %v message=%v", req, req.Message) + } + + qid, err := d.session.Auth(ctx, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname) + if err != nil { + return nil, err + } + + resp = newFcall(MessageRauth{Qid: qid}) + case Tattach: + reqmsg, ok := req.Message.(*MessageTattach) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message) + } + + qid, err := d.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname) + if err != nil { + return nil, err + } + + resp = newFcall(MessageRattach{ + Qid: qid, + }) + case Twalk: + reqmsg, ok := req.Message.(*MessageTwalk) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message) + } + + // TODO(stevvooe): This is one of the places where we need to manage + // fid allocation lifecycle. We need to reserve the fid, then, if this + // call succeeds, we should alloc the fid for future uses. Also need + // to interact correctly with concurrent clunk and the flush of this + // walk message. + qids, err := d.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...) + if err != nil { + return nil, err + } + + resp = newFcall(&MessageRwalk{ + Qids: qids, + }) + case Topen: + reqmsg, ok := req.Message.(*MessageTopen) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) + } + + qid, iounit, err := d.session.Open(ctx, reqmsg.Fid, reqmsg.Mode) + if err != nil { + return nil, err + } + + resp = newFcall(&MessageRopen{ + Qid: qid, + IOUnit: iounit, + }) + case Tcreate: + reqmsg, ok := req.Message.(*MessageTcreate) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) + } + + qid, iounit, err := d.session.Create(ctx, reqmsg.Fid, reqmsg.Name, reqmsg.Perm, uint32(reqmsg.Mode)) + if err != nil { + return nil, err + } + + resp = newFcall(&MessageRcreate{ + Qid: qid, + IOUnit: iounit, + }) + + case Tread: + reqmsg, ok := req.Message.(*MessageTread) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) + } + + p := make([]byte, int(reqmsg.Count)) + n, err := d.session.Read(ctx, reqmsg.Fid, p, int64(reqmsg.Offset)) + if err != nil { + return nil, err + } + + resp = newFcall(&MessageRread{ + Data: p[:n], + }) + case Twrite: + reqmsg, ok := req.Message.(*MessageTwrite) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) + } + + n, err := d.session.Write(ctx, reqmsg.Fid, reqmsg.Data, int64(reqmsg.Offset)) + if err != nil { + return nil, err + } + + resp = newFcall(&MessageRwrite{ + Count: uint32(n), + }) + case Tclunk: + reqmsg, ok := req.Message.(*MessageTclunk) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) + } + + // TODO(stevvooe): Manage the clunking of file descriptors based on + // walk and attach call progression. + if err := d.session.Clunk(ctx, reqmsg.Fid); err != nil { + return nil, err + } + + resp = newFcall(&MessageRclunk{}) + case Tremove: + reqmsg, ok := req.Message.(*MessageTremove) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) + } + + if err := d.session.Remove(ctx, reqmsg.Fid); err != nil { + return nil, err + } + + resp = newFcall(&MessageRremove{}) + case Tstat: + reqmsg, ok := req.Message.(*MessageTstat) + if !ok { + return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) + } + + dir, err := d.session.Stat(ctx, reqmsg.Fid) + if err != nil { + return nil, err + } + + resp = newFcall(&MessageRstat{ + Stat: dir, + }) + case Twstat: + panic("not implemented") + default: + return nil, ErrUnknownMsg + } + + return resp, nil +} blob - 2310a2c5f859b1b9f2486c722d01e703a67ca0a7 blob + e15f26b1b7a1c68a00ef24c5740fd35cde89a245 --- encoding_test.go +++ encoding_test.go @@ -2,6 +2,7 @@ package p9pnew import ( "bytes" + "errors" "reflect" "testing" "time" @@ -155,6 +156,14 @@ func TestEncodeDecode(t *testing.T) { 0x3, 0x0, 0x67, 0x69, 0x64, // gid 0x4, 0x0, 0x6d, 0x75, 0x69, 0x64}, // muid }, + { + description: "Rerror fcall", + target: newErrorFcall(5556, errors.New("A serious error")), + marshaled: []byte{ + 0x75, 0xb4, 0x15, + 0x12, 0x0, 0x0, 0x0, + 0x61, 0x20, 0x6c, 0x6f, 0x74, 0x20, 0x6f, 0x66, 0x20, 0x62, 0x79, 0x74, 0x65, 0x20, 0x64, 0x61, 0x74, 0x61}, + }, } { t.Logf("target under test: %v", testcase.target) fatalf := func(format string, args ...interface{}) { blob - 6a1212e5299bf975367e1754ded62b3be8a88130 blob + 000834bd884dcf2b5745120f03727e76a12f91f3 --- errors.go +++ errors.go @@ -1,9 +1,6 @@ package p9pnew -import ( - "errors" - "fmt" -) +import "errors" // common errors returned by Session interface methods var ( @@ -33,17 +30,12 @@ var ( ErrWalknodir = new9pError("walk in non-directory") // extra errors not part of the normal protocol - ErrTimeout = new9pError("fcall timeout") // returned when timing out on the fcall + ErrTimeout = new9pError("fcall timeout") // returned when timing out on the fcall + ErrUnknownTag = new9pError("unknown tag") + ErrUnknownMsg = new9pError("unknown message") // returned when encountering unknown message type ) -type error9p struct { - Name string -} - +// new9pError returns a new 9p error ready for the wire. func new9pError(s string) error { - return error9p{Name: s} + return MessageRerror{Ename: s} } - -func (e error9p) Error() string { - return fmt.Sprintf("9p: %v", e.Name) -} blob - 9d1fc1aba43ebbc74dc6a62fe3de23ff731f0761 blob + 80d2e7e1fd009f0babc3639130e1519b72bdd533 --- fcall.go +++ fcall.go @@ -121,6 +121,25 @@ func newFcall(msg Message) *Fcall { } } +func newErrorFcall(tag Tag, err error) *Fcall { + var msg Message + + switch v := err.(type) { + case MessageRerror: + msg = v + case *MessageRerror: + msg = v + default: + msg = MessageRerror{Ename: v.Error()} + } + + return &Fcall{ + Type: Rerror, + Tag: tag, + Message: msg, + } +} + func (fc *Fcall) String() string { return fmt.Sprintf("%v(%v) %v", fc.Type, fc.Tag, string9p(fc.Message)) } @@ -141,9 +160,9 @@ func newMessage(typ FcallType) (Message, error) { case Rversion: return &MessageRversion{}, nil case Tauth: - + return &MessageTauth{}, nil case Rauth: - + return &MessageRauth{}, nil case Tattach: return &MessageTattach{}, nil case Rattach: @@ -163,9 +182,9 @@ func newMessage(typ FcallType) (Message, error) { case Ropen: return &MessageRopen{}, nil case Tcreate: - + return &MessageTcreate{}, nil case Rcreate: - + return &MessageRcreate{}, nil case Tread: return &MessageTread{}, nil case Rread: @@ -221,6 +240,10 @@ type MessageRerror struct { Ename string } +func (e MessageRerror) Error() string { + return fmt.Sprintf("9p: %v", e.Ename) +} + type MessageTflush struct { Oldtag Tag } @@ -300,6 +323,8 @@ type MessageTremove struct { Fid Fid } +type MessageRremove struct{} + type MessageTstat struct { Fid Fid } @@ -317,9 +342,9 @@ func (MessageTversion) Type() FcallType { return Tvers func (MessageRversion) Type() FcallType { return Rversion } func (MessageTauth) Type() FcallType { return Tauth } func (MessageRauth) Type() FcallType { return Rauth } -func (MessageRerror) Type() FcallType { return Rerror } func (MessageTflush) Type() FcallType { return Tflush } func (MessageRflush) Type() FcallType { return Rflush } +func (MessageRerror) Type() FcallType { return Rerror } func (MessageTattach) Type() FcallType { return Tattach } func (MessageRattach) Type() FcallType { return Rattach } func (MessageTwalk) Type() FcallType { return Twalk } @@ -335,6 +360,7 @@ func (MessageRwrite) Type() FcallType { return Rwrit func (MessageTclunk) Type() FcallType { return Tclunk } func (MessageRclunk) Type() FcallType { return Rclunk } func (MessageTremove) Type() FcallType { return Tremove } +func (MessageRremove) Type() FcallType { return Rremove } func (MessageTstat) Type() FcallType { return Tstat } func (MessageRstat) Type() FcallType { return Rstat } func (MessageTwstat) Type() FcallType { return Twstat } blob - 325347028d2ac5a31e07eb8ed43af174b5a3beb1 blob + cb90f4c988eb50b54f9e78cd447dbe74b543727c --- server.go +++ server.go @@ -1,20 +1,39 @@ package p9pnew import ( - "bufio" - "fmt" "log" "net" + "time" "golang.org/x/net/context" ) // Serve the 9p session over the provided network connection. func Serve(ctx context.Context, conn net.Conn, session Session) { + const msize = 64 << 10 + const vers = "9P2000" + + ch := newChannel(conn, codec9p{}, msize) + + negctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + // TODO(stevvooe): For now, we negotiate here. It probably makes sense to + // do this outside of this function and then pass in a ready made channel. + // We are not really ready to export the channel type yet. + + if err := ch.negotiate(negctx, vers); err != nil { + // TODO(stevvooe): Need better error handling and retry support here. + // For now, we silently ignore the failure. + log.Println("error negotiating version:", err) + return + } + s := &server{ ctx: ctx, - conn: conn, - session: session, + ch: ch, + handler: &dispatcher{session: session}, + closed: make(chan struct{}), } s.run() @@ -23,18 +42,21 @@ func Serve(ctx context.Context, conn net.Conn, session type server struct { ctx context.Context session Session - conn net.Conn + ch *channel + handler handler closed chan struct{} } +type activeTag struct { + ctx context.Context + request *Fcall + cancel context.CancelFunc + responded bool // true, if some response was sent (Response or Rflush/Rerror) +} + func (s *server) run() { - brd := bufio.NewReader(s.conn) - dec := &decoder{brd} - bwr := bufio.NewWriter(s.conn) - enc := &encoder{bwr} + tags := map[Tag]*activeTag{} // active requests - tags := map[Tag]*Fcall{} // active requests - log.Println("server.run()") for { select { @@ -45,148 +67,90 @@ func (s *server) run() { default: } - // NOTE(stevvooe): For now, we only provide a single request at a time - // handler. We can refactor this to take requests off the wire as - // quickly as they arrive and dispatch in parallel to session. + // BUG(stevvooe): This server blocks on reads, calls to handlers and + // write, effectively single tracking fcalls through a target + // dispatcher. There is no reason we couldn't parallelize these + // requests out to the dispatcher to get massive performance + // improvements. log.Println("server:", "wait") - fcall := new(Fcall) - - // BUG(stevvooe): The decoder is not reliably consuming all of the - // bytes of the wire. Needs to be setup to consume all bytes indicated - // in the size portion. Should use msize from the version negotiation. - - if err := dec.decode(fcall); err != nil { - log.Println("server decoding fcall:", err) + req := new(Fcall) + if err := s.ch.readFcall(s.ctx, req); err != nil { + log.Println("server: error reading fcall", err) continue } - log.Println("server:", "message", fcall) - - if _, ok := tags[fcall.Tag]; ok { - if err := enc.encode(&Fcall{ - Type: Rerror, - Tag: fcall.Tag, - Message: &MessageRerror{ - Ename: ErrDuptag.Error(), - }, - }); err != nil { - log.Println("server:", err) + if _, ok := tags[req.Tag]; ok { + resp := newErrorFcall(req.Tag, ErrDuptag) + if err := s.ch.writeFcall(s.ctx, resp); err != nil { + log.Printf("error sending duplicate tag response: %v", err) } - bwr.Flush() continue } - tags[fcall.Tag] = fcall - resp, err := s.handle(s.ctx, fcall) - if err != nil { - log.Println("server:", err) - continue - } + // handle flush calls. The tag never makes it into active from here. + if mf, ok := req.Message.(MessageTflush); ok { + log.Println("flushing message", mf.Oldtag) - if err := enc.encode(resp); err != nil { - log.Println("server:", err) - continue - } - bwr.Flush() + // check if we have actually know about the requested flush + active, ok := tags[mf.Oldtag] + if ok { + active.cancel() // cancel the context - } -} + resp := newFcall(MessageRflush{}) + resp.Tag = req.Tag + if err := s.ch.writeFcall(s.ctx, resp); err != nil { + log.Printf("error responding to flush: %v", err) + } + active.responded = true + } else { + resp := newErrorFcall(req.Tag, ErrUnknownTag) + if err := s.ch.writeFcall(s.ctx, resp); err != nil { + log.Printf("error responding to flush: %v", err) + } + } -// handle responds to an fcall using the session. An error is only returned if -// the handler cannot proceed. All session errors are returned as Rerror. -func (s *server) handle(ctx context.Context, req *Fcall) (*Fcall, error) { - var resp *Fcall - switch req.Type { - case Tattach: - reqmsg, ok := req.Message.(*MessageTattach) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message) + continue } - qid, err := s.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname) - if err != nil { - return nil, err - } + // TODO(stevvooe): Add handler timeout here, as well, if we desire. - resp = &Fcall{ - Type: Rattach, - Tag: req.Tag, - Message: &MessageRattach{ - Qid: qid, - }, - } - case Twalk: - reqmsg, ok := req.Message.(*MessageTwalk) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message) - } + // Allows us to signal handlers to cancel processing of the fcall + // through context. + ctx, cancel := context.WithCancel(s.ctx) - // TODO(stevvooe): This is one of the places where we need to manage - // fid allocation lifecycle. We need to reserve the fid, then, if this - // call succeeds, we should alloc the fid for future uses. Also need - // to interact correctly with concurrent clunk and the flush of this - // walk message. - qids, err := s.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...) - if err != nil { - return nil, err + tags[req.Tag] = &activeTag{ + ctx: ctx, + request: req, + cancel: cancel, } - resp = newFcall(&MessageRwalk{ - Qids: qids, - }) - case Topen: - reqmsg, ok := req.Message.(*MessageTopen) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) - } - - qid, iounit, err := s.session.Open(ctx, reqmsg.Fid, reqmsg.Mode) + resp, err := s.handler.handle(ctx, req) if err != nil { - return nil, err + // all handler errors are forwarded as protocol errors. + resp = newErrorFcall(req.Tag, err) } + resp.Tag = req.Tag - resp = newFcall(&MessageRopen{ - Qid: qid, - IOUnit: iounit, - }) - case Tread: - reqmsg, ok := req.Message.(*MessageTread) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) - } + if err := ctx.Err(); err != nil { + // NOTE(stevvooe): We aren't really getting our moneys worth for + // how this is being handled. We really need to dispatch each + // request handler to a separate thread. - p := make([]byte, int(reqmsg.Count)) - n, err := s.session.Read(ctx, reqmsg.Fid, p, int64(reqmsg.Offset)) - if err != nil { - return nil, err + // the context was canceled for some reason, perhaps timeout or + // due to a flush call. We treat this as a condition where a + // response should not be sent. + log.Println("context error:", err) + continue } - resp = newFcall(&MessageRread{ - Data: p[:n], - }) - case Tclunk: - reqmsg, ok := req.Message.(*MessageTclunk) - if !ok { - return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message) + if !tags[req.Tag].responded { + if err := s.ch.writeFcall(ctx, resp); err != nil { + log.Println("server: error writing fcall:", err) + continue + } } - // TODO(stevvooe): Manage the clunking of file descriptors based on - // walk and attach call progression. - if err := s.session.Clunk(ctx, reqmsg.Fid); err != nil { - return nil, err - } - - resp = newFcall(&MessageRclunk{}) + delete(tags, req.Tag) } - - if resp == nil { - log.Println("unknown message type:", req.Type) - resp = newFcall(&MessageRerror{ - Ename: "unknown message type", - }) - } - - resp.Tag = req.Tag - return resp, nil } blob - 5bdc01d70056b90a64fa6838de37153d3aec0cdb blob + b7b890057006eb9f329f5e4fbc253f9e15b20fce --- transport.go +++ transport.go @@ -16,6 +16,9 @@ type roundTripper interface { send(ctx context.Context, fc *Fcall) (*Fcall, error) } +// transport plays the role of being a client channel manager. It multiplexes +// function calls onto the wire and dispatches responses to blocking calls to +// send. On the whole, transport is thread-safe for calling send type transport struct { ctx context.Context ch *channel