commit - f41196dcf2b4d9e98d8e2addb0cd403e5405a3f0
commit + 40d4a02d02470ddbfcb31818213c5daea4d1f545
blob - 52786490fc789cd9c9104fda5820ec0ddf135dbe
blob + a9453687ff9c6bcaec2512db06f1fbd224dd936a
--- channel.go
+++ channel.go
"golang.org/x/net/context"
)
+// Channel defines the operations necessary to implement a 9p message channel
+// interface. Typically, message channels do no protocol processing except to
+// send and receive message frames.
+type Channel interface {
+ // ReadFcall reads one fcall frame into the provided fcall structure. The
+ // Fcall may be cleared whether there is an error or not. If the operation
+ // is successful, the contents of the fcall will be populated in the
+ // argument. ReadFcall cannot be called concurrently with other calls to
+ // ReadFcall. This both to preserve message ordering and to allow lockless
+ // buffer reusage.
+ ReadFcall(ctx context.Context, fcall *Fcall) error
+
+ // WriteFcall writes the provided fcall to the channel. WriteFcall cannot
+ // be called concurrently with other calls to WriteFcall.
+ WriteFcall(ctx context.Context, fcall *Fcall) error
+
+ // SetMSize sets the maximum message size for the channel. This must never
+ // be called currently with ReadFcall or WriteFcall.
+ SetMSize(msize int)
+}
+
const (
defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context
)
// channel provides bidirectional protocol framing for 9p over net.Conn.
// Operations are not thread-safe but reads and writes may be carried out
// concurrently, supporting separate read and write loops.
+//
+// Lifecyle
+//
+// A connection, or message channel abstraction, has a lifecycle delineated by
+// Tversion/Rversion request response cycles. For now, this is part of the
+// channel itself but doesn't necessarily influence the channels state, except
+// the msize. Visually, it might look something like this:
+//
+// [Established] -> [Version] -> [Session] -> [Version]---+
+// ^ |
+// |_________________________________|
+//
+// The connection is established, then we negotiate a version, run a session,
+// then negotiate a version and so on. For most purposes, we are likely going
+// to terminate the connection after the session but we may want to support
+// connection pooling. Pooling may result in possible security leaks if the
+// connections are shared among contexts, since the version is negotiated at
+// the start of the session. To avoid this, we can actually use a "tombstone"
+// version message which clears the server's session state without starting a
+// new session. The next version message would then prepare the session
+// without leaking any Fid's.
type channel struct {
conn net.Conn
codec Codec
brd *bufio.Reader
bwr *bufio.Writer
closed chan struct{}
+ msize int
rdbuf []byte
wrbuf []byte
}
brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size
bwr: bufio.NewWriterSize(conn, msize),
closed: make(chan struct{}),
+ msize: msize,
rdbuf: make([]byte, msize),
wrbuf: make([]byte, msize),
}
// NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
// Proceed assuming that original size is sufficient.
+ ch.msize = msize
if msize < len(ch.rdbuf) {
// just change the cap
ch.rdbuf = ch.rdbuf[:msize]
return mv.MSize, mv.Version, nil
}
+// negotiate blocks until a version message is received or a timeout occurs.
+// The msize for the tranport will be set from the negotiation. If negotiate
+// returns nil, a server may proceed with the connection.
+//
+// In the future, it might be better to handle the version messages in a
+// separate object that manages the session. Each set of version requests
+// effectively "reset" a connection, meaning all fids get clunked and all
+// outstanding IO is aborted. This is probably slightly racy, in practice with
+// a misbehaved client. The main issue is that we cannot tell which session
+// messages belong to.
+func (ch *channel) negotiate(ctx context.Context, version string) error {
+ // wait for the version message over the transport.
+ req := new(Fcall)
+ if err := ch.readFcall(ctx, req); err != nil {
+ return err
+ }
+
+ mv, ok := req.Message.(*MessageTversion)
+ if !ok {
+ return fmt.Errorf("expected version message: %v", mv)
+ }
+
+ respmsg := MessageRversion{
+ Version: version,
+ }
+
+ if mv.Version != version {
+ // TODO(stevvooe): Not the best place to do version handling. We need
+ // to have a way to pass supported versions into this method then have
+ // it return the actual version. For now, respond with unknown for
+ // anything that doesn't match the provided version string.
+ respmsg.Version = "unknown"
+ }
+
+ if int(mv.MSize) < ch.msize {
+ // if the server msize is too large, use the client's suggested msize.
+ ch.setmsize(int(mv.MSize))
+ respmsg.MSize = mv.MSize
+ } else {
+ respmsg.MSize = uint32(ch.msize)
+ }
+
+ resp := newFcall(respmsg)
+ if err := ch.writeFcall(ctx, resp); err != nil {
+ return err
+ }
+
+ if respmsg.Version == "unknown" {
+ return fmt.Errorf("bad version negotiation")
+ }
+
+ return nil
+}
+
// ReadFcall reads the next message from the channel into fcall.
func (ch *channel) readFcall(ctx context.Context, fcall *Fcall) error {
select {
+ case <-ctx.Done():
+ return ctx.Err()
case <-ch.closed:
return ErrClosed
default:
return fmt.Errorf("message large than buffer:", n)
}
+ // clear out the fcall
+ *fcall = Fcall{}
if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
return err
}
func (ch *channel) writeFcall(ctx context.Context, fcall *Fcall) error {
select {
+ case <-ctx.Done():
+ return ctx.Err()
case <-ch.closed:
return ErrClosed
default:
blob - 25b2080e9f954172c4073e32232291a5f7030dbb
blob + 2da30033b5ccc50bec99a071ca5793ebeaec32f0
--- cmd/9pr/main.go
+++ cmd/9pr/main.go
ctx := context.Background()
log.SetFlags(0)
flag.Parse()
+
+ proto := "tcp"
+ if strings.HasPrefix(addr, "unix:") {
+ proto = "unix"
+ addr = addr[5:]
+ }
log.Println("dialing", addr)
- conn, err := net.Dial("tcp", addr)
+ conn, err := net.Dial(proto, addr)
if err != nil {
log.Fatal(err)
}
blob - 7818604df1a25843f729bd82b77a66aa08051e81
blob + e190166973a54ec0561999305e6166b8d9f360ab
--- cmd/9ps/main.go
+++ cmd/9ps/main.go
}
go func(conn net.Conn) {
+ defer conn.Close()
+
ctx := context.WithValue(ctx, "conn", conn)
log.Println("connected", conn.RemoteAddr())
session, err := newLocalSession(ctx, root)
blob - /dev/null
blob + 5d884701a2cbc04f08d9dff5e5ddfcf4c4e27e27 (mode 644)
--- /dev/null
+++ dispatcher.go
+package p9pnew
+
+import (
+ "fmt"
+
+ "golang.org/x/net/context"
+)
+
+type handler interface {
+ handle(ctx context.Context, req *Fcall) (*Fcall, error)
+}
+
+// dispatcher routes fcalls to a Session.
+type dispatcher struct {
+ session Session
+}
+
+// handle responds to an fcall using the session. An error is only returned if
+// the handler cannot proceed. All session errors are returned as Rerror.
+func (d *dispatcher) handle(ctx context.Context, req *Fcall) (*Fcall, error) {
+ var resp *Fcall
+ switch req.Type {
+ case Tauth:
+ reqmsg, ok := req.Message.(MessageTauth)
+ if !ok {
+ return nil, fmt.Errorf("incorrect message for type: %v message=%v", req, req.Message)
+ }
+
+ qid, err := d.session.Auth(ctx, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname)
+ if err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(MessageRauth{Qid: qid})
+ case Tattach:
+ reqmsg, ok := req.Message.(*MessageTattach)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
+ }
+
+ qid, err := d.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname)
+ if err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(MessageRattach{
+ Qid: qid,
+ })
+ case Twalk:
+ reqmsg, ok := req.Message.(*MessageTwalk)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
+ }
+
+ // TODO(stevvooe): This is one of the places where we need to manage
+ // fid allocation lifecycle. We need to reserve the fid, then, if this
+ // call succeeds, we should alloc the fid for future uses. Also need
+ // to interact correctly with concurrent clunk and the flush of this
+ // walk message.
+ qids, err := d.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...)
+ if err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(&MessageRwalk{
+ Qids: qids,
+ })
+ case Topen:
+ reqmsg, ok := req.Message.(*MessageTopen)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+ }
+
+ qid, iounit, err := d.session.Open(ctx, reqmsg.Fid, reqmsg.Mode)
+ if err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(&MessageRopen{
+ Qid: qid,
+ IOUnit: iounit,
+ })
+ case Tcreate:
+ reqmsg, ok := req.Message.(*MessageTcreate)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+ }
+
+ qid, iounit, err := d.session.Create(ctx, reqmsg.Fid, reqmsg.Name, reqmsg.Perm, uint32(reqmsg.Mode))
+ if err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(&MessageRcreate{
+ Qid: qid,
+ IOUnit: iounit,
+ })
+
+ case Tread:
+ reqmsg, ok := req.Message.(*MessageTread)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+ }
+
+ p := make([]byte, int(reqmsg.Count))
+ n, err := d.session.Read(ctx, reqmsg.Fid, p, int64(reqmsg.Offset))
+ if err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(&MessageRread{
+ Data: p[:n],
+ })
+ case Twrite:
+ reqmsg, ok := req.Message.(*MessageTwrite)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+ }
+
+ n, err := d.session.Write(ctx, reqmsg.Fid, reqmsg.Data, int64(reqmsg.Offset))
+ if err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(&MessageRwrite{
+ Count: uint32(n),
+ })
+ case Tclunk:
+ reqmsg, ok := req.Message.(*MessageTclunk)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+ }
+
+ // TODO(stevvooe): Manage the clunking of file descriptors based on
+ // walk and attach call progression.
+ if err := d.session.Clunk(ctx, reqmsg.Fid); err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(&MessageRclunk{})
+ case Tremove:
+ reqmsg, ok := req.Message.(*MessageTremove)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+ }
+
+ if err := d.session.Remove(ctx, reqmsg.Fid); err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(&MessageRremove{})
+ case Tstat:
+ reqmsg, ok := req.Message.(*MessageTstat)
+ if !ok {
+ return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+ }
+
+ dir, err := d.session.Stat(ctx, reqmsg.Fid)
+ if err != nil {
+ return nil, err
+ }
+
+ resp = newFcall(&MessageRstat{
+ Stat: dir,
+ })
+ case Twstat:
+ panic("not implemented")
+ default:
+ return nil, ErrUnknownMsg
+ }
+
+ return resp, nil
+}
blob - 2310a2c5f859b1b9f2486c722d01e703a67ca0a7
blob + e15f26b1b7a1c68a00ef24c5740fd35cde89a245
--- encoding_test.go
+++ encoding_test.go
import (
"bytes"
+ "errors"
"reflect"
"testing"
"time"
0x3, 0x0, 0x67, 0x69, 0x64, // gid
0x4, 0x0, 0x6d, 0x75, 0x69, 0x64}, // muid
},
+ {
+ description: "Rerror fcall",
+ target: newErrorFcall(5556, errors.New("A serious error")),
+ marshaled: []byte{
+ 0x75, 0xb4, 0x15,
+ 0x12, 0x0, 0x0, 0x0,
+ 0x61, 0x20, 0x6c, 0x6f, 0x74, 0x20, 0x6f, 0x66, 0x20, 0x62, 0x79, 0x74, 0x65, 0x20, 0x64, 0x61, 0x74, 0x61},
+ },
} {
t.Logf("target under test: %v", testcase.target)
fatalf := func(format string, args ...interface{}) {
blob - 6a1212e5299bf975367e1754ded62b3be8a88130
blob + 000834bd884dcf2b5745120f03727e76a12f91f3
--- errors.go
+++ errors.go
package p9pnew
-import (
- "errors"
- "fmt"
-)
+import "errors"
// common errors returned by Session interface methods
var (
ErrWalknodir = new9pError("walk in non-directory")
// extra errors not part of the normal protocol
- ErrTimeout = new9pError("fcall timeout") // returned when timing out on the fcall
+ ErrTimeout = new9pError("fcall timeout") // returned when timing out on the fcall
+ ErrUnknownTag = new9pError("unknown tag")
+ ErrUnknownMsg = new9pError("unknown message") // returned when encountering unknown message type
)
-type error9p struct {
- Name string
-}
-
+// new9pError returns a new 9p error ready for the wire.
func new9pError(s string) error {
- return error9p{Name: s}
+ return MessageRerror{Ename: s}
}
-
-func (e error9p) Error() string {
- return fmt.Sprintf("9p: %v", e.Name)
-}
blob - 9d1fc1aba43ebbc74dc6a62fe3de23ff731f0761
blob + 80d2e7e1fd009f0babc3639130e1519b72bdd533
--- fcall.go
+++ fcall.go
}
}
+func newErrorFcall(tag Tag, err error) *Fcall {
+ var msg Message
+
+ switch v := err.(type) {
+ case MessageRerror:
+ msg = v
+ case *MessageRerror:
+ msg = v
+ default:
+ msg = MessageRerror{Ename: v.Error()}
+ }
+
+ return &Fcall{
+ Type: Rerror,
+ Tag: tag,
+ Message: msg,
+ }
+}
+
func (fc *Fcall) String() string {
return fmt.Sprintf("%v(%v) %v", fc.Type, fc.Tag, string9p(fc.Message))
}
case Rversion:
return &MessageRversion{}, nil
case Tauth:
-
+ return &MessageTauth{}, nil
case Rauth:
-
+ return &MessageRauth{}, nil
case Tattach:
return &MessageTattach{}, nil
case Rattach:
case Ropen:
return &MessageRopen{}, nil
case Tcreate:
-
+ return &MessageTcreate{}, nil
case Rcreate:
-
+ return &MessageRcreate{}, nil
case Tread:
return &MessageTread{}, nil
case Rread:
Ename string
}
+func (e MessageRerror) Error() string {
+ return fmt.Sprintf("9p: %v", e.Ename)
+}
+
type MessageTflush struct {
Oldtag Tag
}
Fid Fid
}
+type MessageRremove struct{}
+
type MessageTstat struct {
Fid Fid
}
func (MessageRversion) Type() FcallType { return Rversion }
func (MessageTauth) Type() FcallType { return Tauth }
func (MessageRauth) Type() FcallType { return Rauth }
-func (MessageRerror) Type() FcallType { return Rerror }
func (MessageTflush) Type() FcallType { return Tflush }
func (MessageRflush) Type() FcallType { return Rflush }
+func (MessageRerror) Type() FcallType { return Rerror }
func (MessageTattach) Type() FcallType { return Tattach }
func (MessageRattach) Type() FcallType { return Rattach }
func (MessageTwalk) Type() FcallType { return Twalk }
func (MessageTclunk) Type() FcallType { return Tclunk }
func (MessageRclunk) Type() FcallType { return Rclunk }
func (MessageTremove) Type() FcallType { return Tremove }
+func (MessageRremove) Type() FcallType { return Rremove }
func (MessageTstat) Type() FcallType { return Tstat }
func (MessageRstat) Type() FcallType { return Rstat }
func (MessageTwstat) Type() FcallType { return Twstat }
blob - 325347028d2ac5a31e07eb8ed43af174b5a3beb1
blob + cb90f4c988eb50b54f9e78cd447dbe74b543727c
--- server.go
+++ server.go
package p9pnew
import (
- "bufio"
- "fmt"
"log"
"net"
+ "time"
"golang.org/x/net/context"
)
// Serve the 9p session over the provided network connection.
func Serve(ctx context.Context, conn net.Conn, session Session) {
+ const msize = 64 << 10
+ const vers = "9P2000"
+
+ ch := newChannel(conn, codec9p{}, msize)
+
+ negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
+ defer cancel()
+
+ // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
+ // do this outside of this function and then pass in a ready made channel.
+ // We are not really ready to export the channel type yet.
+
+ if err := ch.negotiate(negctx, vers); err != nil {
+ // TODO(stevvooe): Need better error handling and retry support here.
+ // For now, we silently ignore the failure.
+ log.Println("error negotiating version:", err)
+ return
+ }
+
s := &server{
ctx: ctx,
- conn: conn,
- session: session,
+ ch: ch,
+ handler: &dispatcher{session: session},
+ closed: make(chan struct{}),
}
s.run()
type server struct {
ctx context.Context
session Session
- conn net.Conn
+ ch *channel
+ handler handler
closed chan struct{}
}
+type activeTag struct {
+ ctx context.Context
+ request *Fcall
+ cancel context.CancelFunc
+ responded bool // true, if some response was sent (Response or Rflush/Rerror)
+}
+
func (s *server) run() {
- brd := bufio.NewReader(s.conn)
- dec := &decoder{brd}
- bwr := bufio.NewWriter(s.conn)
- enc := &encoder{bwr}
+ tags := map[Tag]*activeTag{} // active requests
- tags := map[Tag]*Fcall{} // active requests
-
log.Println("server.run()")
for {
select {
default:
}
- // NOTE(stevvooe): For now, we only provide a single request at a time
- // handler. We can refactor this to take requests off the wire as
- // quickly as they arrive and dispatch in parallel to session.
+ // BUG(stevvooe): This server blocks on reads, calls to handlers and
+ // write, effectively single tracking fcalls through a target
+ // dispatcher. There is no reason we couldn't parallelize these
+ // requests out to the dispatcher to get massive performance
+ // improvements.
log.Println("server:", "wait")
- fcall := new(Fcall)
-
- // BUG(stevvooe): The decoder is not reliably consuming all of the
- // bytes of the wire. Needs to be setup to consume all bytes indicated
- // in the size portion. Should use msize from the version negotiation.
-
- if err := dec.decode(fcall); err != nil {
- log.Println("server decoding fcall:", err)
+ req := new(Fcall)
+ if err := s.ch.readFcall(s.ctx, req); err != nil {
+ log.Println("server: error reading fcall", err)
continue
}
- log.Println("server:", "message", fcall)
-
- if _, ok := tags[fcall.Tag]; ok {
- if err := enc.encode(&Fcall{
- Type: Rerror,
- Tag: fcall.Tag,
- Message: &MessageRerror{
- Ename: ErrDuptag.Error(),
- },
- }); err != nil {
- log.Println("server:", err)
+ if _, ok := tags[req.Tag]; ok {
+ resp := newErrorFcall(req.Tag, ErrDuptag)
+ if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+ log.Printf("error sending duplicate tag response: %v", err)
}
- bwr.Flush()
continue
}
- tags[fcall.Tag] = fcall
- resp, err := s.handle(s.ctx, fcall)
- if err != nil {
- log.Println("server:", err)
- continue
- }
+ // handle flush calls. The tag never makes it into active from here.
+ if mf, ok := req.Message.(MessageTflush); ok {
+ log.Println("flushing message", mf.Oldtag)
- if err := enc.encode(resp); err != nil {
- log.Println("server:", err)
- continue
- }
- bwr.Flush()
+ // check if we have actually know about the requested flush
+ active, ok := tags[mf.Oldtag]
+ if ok {
+ active.cancel() // cancel the context
- }
-}
+ resp := newFcall(MessageRflush{})
+ resp.Tag = req.Tag
+ if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+ log.Printf("error responding to flush: %v", err)
+ }
+ active.responded = true
+ } else {
+ resp := newErrorFcall(req.Tag, ErrUnknownTag)
+ if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+ log.Printf("error responding to flush: %v", err)
+ }
+ }
-// handle responds to an fcall using the session. An error is only returned if
-// the handler cannot proceed. All session errors are returned as Rerror.
-func (s *server) handle(ctx context.Context, req *Fcall) (*Fcall, error) {
- var resp *Fcall
- switch req.Type {
- case Tattach:
- reqmsg, ok := req.Message.(*MessageTattach)
- if !ok {
- return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
+ continue
}
- qid, err := s.session.Attach(ctx, reqmsg.Fid, reqmsg.Afid, reqmsg.Uname, reqmsg.Aname)
- if err != nil {
- return nil, err
- }
+ // TODO(stevvooe): Add handler timeout here, as well, if we desire.
- resp = &Fcall{
- Type: Rattach,
- Tag: req.Tag,
- Message: &MessageRattach{
- Qid: qid,
- },
- }
- case Twalk:
- reqmsg, ok := req.Message.(*MessageTwalk)
- if !ok {
- return nil, fmt.Errorf("bad message: %v message=%#v", req, req.Message)
- }
+ // Allows us to signal handlers to cancel processing of the fcall
+ // through context.
+ ctx, cancel := context.WithCancel(s.ctx)
- // TODO(stevvooe): This is one of the places where we need to manage
- // fid allocation lifecycle. We need to reserve the fid, then, if this
- // call succeeds, we should alloc the fid for future uses. Also need
- // to interact correctly with concurrent clunk and the flush of this
- // walk message.
- qids, err := s.session.Walk(ctx, reqmsg.Fid, reqmsg.Newfid, reqmsg.Wnames...)
- if err != nil {
- return nil, err
+ tags[req.Tag] = &activeTag{
+ ctx: ctx,
+ request: req,
+ cancel: cancel,
}
- resp = newFcall(&MessageRwalk{
- Qids: qids,
- })
- case Topen:
- reqmsg, ok := req.Message.(*MessageTopen)
- if !ok {
- return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
- }
-
- qid, iounit, err := s.session.Open(ctx, reqmsg.Fid, reqmsg.Mode)
+ resp, err := s.handler.handle(ctx, req)
if err != nil {
- return nil, err
+ // all handler errors are forwarded as protocol errors.
+ resp = newErrorFcall(req.Tag, err)
}
+ resp.Tag = req.Tag
- resp = newFcall(&MessageRopen{
- Qid: qid,
- IOUnit: iounit,
- })
- case Tread:
- reqmsg, ok := req.Message.(*MessageTread)
- if !ok {
- return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
- }
+ if err := ctx.Err(); err != nil {
+ // NOTE(stevvooe): We aren't really getting our moneys worth for
+ // how this is being handled. We really need to dispatch each
+ // request handler to a separate thread.
- p := make([]byte, int(reqmsg.Count))
- n, err := s.session.Read(ctx, reqmsg.Fid, p, int64(reqmsg.Offset))
- if err != nil {
- return nil, err
+ // the context was canceled for some reason, perhaps timeout or
+ // due to a flush call. We treat this as a condition where a
+ // response should not be sent.
+ log.Println("context error:", err)
+ continue
}
- resp = newFcall(&MessageRread{
- Data: p[:n],
- })
- case Tclunk:
- reqmsg, ok := req.Message.(*MessageTclunk)
- if !ok {
- return nil, fmt.Errorf("bad message: %v message=%v", req, req.Message)
+ if !tags[req.Tag].responded {
+ if err := s.ch.writeFcall(ctx, resp); err != nil {
+ log.Println("server: error writing fcall:", err)
+ continue
+ }
}
- // TODO(stevvooe): Manage the clunking of file descriptors based on
- // walk and attach call progression.
- if err := s.session.Clunk(ctx, reqmsg.Fid); err != nil {
- return nil, err
- }
-
- resp = newFcall(&MessageRclunk{})
+ delete(tags, req.Tag)
}
-
- if resp == nil {
- log.Println("unknown message type:", req.Type)
- resp = newFcall(&MessageRerror{
- Ename: "unknown message type",
- })
- }
-
- resp.Tag = req.Tag
- return resp, nil
}
blob - 5bdc01d70056b90a64fa6838de37153d3aec0cdb
blob + b7b890057006eb9f329f5e4fbc253f9e15b20fce
--- transport.go
+++ transport.go
send(ctx context.Context, fc *Fcall) (*Fcall, error)
}
+// transport plays the role of being a client channel manager. It multiplexes
+// function calls onto the wire and dispatches responses to blocking calls to
+// send. On the whole, transport is thread-safe for calling send
type transport struct {
ctx context.Context
ch *channel