commit 3bf22e5860ff26ad641dc222b8121a2bcae316c7 from: Stephen J Day date: Wed Nov 04 03:45:56 2015 UTC fs/p9p/new: refactor to use interface for channel Signed-off-by: Stephen J Day commit - 6350d27908b0b1a459b7208b9fae7b274384efe4 commit + 3bf22e5860ff26ad641dc222b8121a2bcae316c7 blob - a9453687ff9c6bcaec2512db06f1fbd224dd936a blob + c74ca513e2ac3883f696294c0d4f0852bcc092f6 --- channel.go +++ channel.go @@ -29,11 +29,18 @@ type Channel interface { // be called concurrently with other calls to WriteFcall. WriteFcall(ctx context.Context, fcall *Fcall) error + // MSize returns the current msize for the channel. + MSize() int + // SetMSize sets the maximum message size for the channel. This must never // be called currently with ReadFcall or WriteFcall. SetMSize(msize int) } +func NewChannel(conn net.Conn, msize int) Channel { + return newChannel(conn, codec9p{}, msize) +} + const ( defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context ) @@ -86,9 +93,13 @@ func newChannel(conn net.Conn, codec Codec, msize int) } } +func (ch *channel) MSize() int { + return ch.msize +} + // setmsize resizes the buffers for use with a separate msize. This call must // be protected by a mutex or made before passing to other goroutines. -func (ch *channel) setmsize(msize int) { +func (ch *channel) SetMSize(msize int) { // NOTE(stevvooe): We cannot safely resize the buffered reader and writer. // Proceed assuming that original size is sufficient. @@ -104,88 +115,8 @@ func (ch *channel) setmsize(msize int) { ch.wrbuf = make([]byte, msize) } -// version negiotiates the protocol version using channel, blocking until a -// response is received. The received values can be used to set msize for the -// channel or assist in client setup. -func (ch *channel) version(ctx context.Context, msize uint32, version string) (uint32, string, error) { - req := newFcall(MessageTversion{ - MSize: uint32(msize), - Version: version, - }) - - if err := ch.writeFcall(ctx, req); err != nil { - return 0, "", err - } - - resp := new(Fcall) - if err := ch.readFcall(ctx, resp); err != nil { - return 0, "", err - } - - mv, ok := resp.Message.(*MessageRversion) - if !ok { - return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp) - } - - return mv.MSize, mv.Version, nil -} - -// negotiate blocks until a version message is received or a timeout occurs. -// The msize for the tranport will be set from the negotiation. If negotiate -// returns nil, a server may proceed with the connection. -// -// In the future, it might be better to handle the version messages in a -// separate object that manages the session. Each set of version requests -// effectively "reset" a connection, meaning all fids get clunked and all -// outstanding IO is aborted. This is probably slightly racy, in practice with -// a misbehaved client. The main issue is that we cannot tell which session -// messages belong to. -func (ch *channel) negotiate(ctx context.Context, version string) error { - // wait for the version message over the transport. - req := new(Fcall) - if err := ch.readFcall(ctx, req); err != nil { - return err - } - - mv, ok := req.Message.(*MessageTversion) - if !ok { - return fmt.Errorf("expected version message: %v", mv) - } - - respmsg := MessageRversion{ - Version: version, - } - - if mv.Version != version { - // TODO(stevvooe): Not the best place to do version handling. We need - // to have a way to pass supported versions into this method then have - // it return the actual version. For now, respond with unknown for - // anything that doesn't match the provided version string. - respmsg.Version = "unknown" - } - - if int(mv.MSize) < ch.msize { - // if the server msize is too large, use the client's suggested msize. - ch.setmsize(int(mv.MSize)) - respmsg.MSize = mv.MSize - } else { - respmsg.MSize = uint32(ch.msize) - } - - resp := newFcall(respmsg) - if err := ch.writeFcall(ctx, resp); err != nil { - return err - } - - if respmsg.Version == "unknown" { - return fmt.Errorf("bad version negotiation") - } - - return nil -} - // ReadFcall reads the next message from the channel into fcall. -func (ch *channel) readFcall(ctx context.Context, fcall *Fcall) error { +func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error { select { case <-ctx.Done(): return ctx.Err() @@ -225,7 +156,7 @@ func (ch *channel) readFcall(ctx context.Context, fcal return nil } -func (ch *channel) writeFcall(ctx context.Context, fcall *Fcall) error { +func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error { select { case <-ctx.Done(): return ctx.Err() blob - 9e1f5f447816c2a1ba015bc2513c4dc1638d1d4f blob + a1b80f10d12ed6dc1acfecf66b41eb22345519e2 --- client.go +++ client.go @@ -26,21 +26,11 @@ func NewSession(ctx context.Context, conn net.Conn) (S ch := newChannel(conn, codec9p{}, msize) // sets msize, effectively. // negotiate the protocol version - smsize, svers, err := ch.version(ctx, msize, vers) + _, err := clientnegotiate(ctx, ch, vers) if err != nil { return nil, err } - if svers != vers { - // TODO(stevvooe): A stubborn client indeed! - return nil, fmt.Errorf("unsupported server version: %v", vers) - } - - if smsize > msize { - // upgrade msize if server differs. - ch.setmsize(int(smsize)) - } - return &client{ version: vers, msize: msize, blob - cb90f4c988eb50b54f9e78cd447dbe74b543727c blob + 1c2cb5811dd66ef98bf1501ca06ad8ce6b6f6d8a --- server.go +++ server.go @@ -22,7 +22,7 @@ func Serve(ctx context.Context, conn net.Conn, session // do this outside of this function and then pass in a ready made channel. // We are not really ready to export the channel type yet. - if err := ch.negotiate(negctx, vers); err != nil { + if err := servernegotiate(negctx, ch, vers); err != nil { // TODO(stevvooe): Need better error handling and retry support here. // For now, we silently ignore the failure. log.Println("error negotiating version:", err) @@ -42,7 +42,7 @@ func Serve(ctx context.Context, conn net.Conn, session type server struct { ctx context.Context session Session - ch *channel + ch Channel handler handler closed chan struct{} } @@ -75,14 +75,14 @@ func (s *server) run() { log.Println("server:", "wait") req := new(Fcall) - if err := s.ch.readFcall(s.ctx, req); err != nil { + if err := s.ch.ReadFcall(s.ctx, req); err != nil { log.Println("server: error reading fcall", err) continue } if _, ok := tags[req.Tag]; ok { resp := newErrorFcall(req.Tag, ErrDuptag) - if err := s.ch.writeFcall(s.ctx, resp); err != nil { + if err := s.ch.WriteFcall(s.ctx, resp); err != nil { log.Printf("error sending duplicate tag response: %v", err) } continue @@ -99,13 +99,13 @@ func (s *server) run() { resp := newFcall(MessageRflush{}) resp.Tag = req.Tag - if err := s.ch.writeFcall(s.ctx, resp); err != nil { + if err := s.ch.WriteFcall(s.ctx, resp); err != nil { log.Printf("error responding to flush: %v", err) } active.responded = true } else { resp := newErrorFcall(req.Tag, ErrUnknownTag) - if err := s.ch.writeFcall(s.ctx, resp); err != nil { + if err := s.ch.WriteFcall(s.ctx, resp); err != nil { log.Printf("error responding to flush: %v", err) } } @@ -145,7 +145,7 @@ func (s *server) run() { } if !tags[req.Tag].responded { - if err := s.ch.writeFcall(ctx, resp); err != nil { + if err := s.ch.WriteFcall(ctx, resp); err != nil { log.Println("server: error writing fcall:", err) continue } blob - b7b890057006eb9f329f5e4fbc253f9e15b20fce blob + 3caf076db643d3f72ce17f7e3d2b78ebcb38c588 --- transport.go +++ transport.go @@ -21,7 +21,7 @@ type roundTripper interface { // send. On the whole, transport is thread-safe for calling send type transport struct { ctx context.Context - ch *channel + ch Channel requests chan *fcallRequest closed chan struct{} @@ -118,7 +118,7 @@ func (t *transport) handle() { loop: for { fcall := new(Fcall) - if err := t.ch.readFcall(t.ctx, fcall); err != nil { + if err := t.ch.ReadFcall(t.ctx, fcall); err != nil { switch err := err.(type) { case net.Error: if err.Timeout() || err.Temporary() { @@ -177,7 +177,7 @@ func (t *transport) handle() { // receive a response. We need to remove the fcall context from // the tag map and dealloc the tag. We may also want to send a // flush for the tag. - if err := t.ch.writeFcall(req.ctx, req.fcall); err != nil { + if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil { log.Println("error writing fcall", err, req.fcall) delete(outstanding, req.fcall.Tag) req.err <- err blob - /dev/null blob + c4c7133658b6c3bd3b428135ca0b61e56b00b033 (mode 644) --- /dev/null +++ version.go @@ -0,0 +1,106 @@ +package p9pnew + +import ( + "fmt" + + "golang.org/x/net/context" +) + +// NOTE(stevvooe): This file contains functions for negotiating version on the +// client and server. There are some nasty details to get right for +// downgrading the connection on the server-side that are not present yet. +// Really, these should be refactored into some sort of channel type that can +// support resets through version messages during the protocol exchange. + +// clientnegotiate negiotiates the protocol version using channel, blocking +// until a response is received. The received value will be the version +// implemented by the server. +func clientnegotiate(ctx context.Context, ch Channel, version string) (string, error) { + req := newFcall(MessageTversion{ + MSize: uint32(ch.MSize()), + Version: version, + }) + + if err := ch.WriteFcall(ctx, req); err != nil { + return "", err + } + + resp := new(Fcall) + if err := ch.ReadFcall(ctx, resp); err != nil { + return "", err + } + + switch v := resp.Message.(type) { + case *MessageRversion: + + if v.Version != version { + // TODO(stevvooe): A stubborn client indeed! + return "", fmt.Errorf("unsupported server version: %v", version) + } + + if int(v.MSize) > ch.MSize() { + // upgrade msize if server differs. + ch.SetMSize(int(v.MSize)) + } + + return v.Version, nil + case error: + return "", v + default: + return "", fmt.Errorf("invalid rpc response for version message: %v", resp) + } +} + +// servernegotiate blocks until a version message is received or a timeout +// occurs. The msize for the tranport will be set from the negotiation. If +// negotiate returns nil, a server may proceed with the connection. +// +// In the future, it might be better to handle the version messages in a +// separate object that manages the session. Each set of version requests +// effectively "reset" a connection, meaning all fids get clunked and all +// outstanding IO is aborted. This is probably slightly racy, in practice with +// a misbehaved client. The main issue is that we cannot tell which session +// messages belong to. +func servernegotiate(ctx context.Context, ch Channel, version string) error { + // wait for the version message over the transport. + req := new(Fcall) + if err := ch.ReadFcall(ctx, req); err != nil { + return err + } + + mv, ok := req.Message.(*MessageTversion) + if !ok { + return fmt.Errorf("expected version message: %v", mv) + } + + respmsg := MessageRversion{ + Version: version, + } + + if mv.Version != version { + // TODO(stevvooe): Not the best place to do version handling. We need + // to have a way to pass supported versions into this method then have + // it return the actual version. For now, respond with unknown for + // anything that doesn't match the provided version string. + respmsg.Version = "unknown" + } + + if int(mv.MSize) < ch.MSize() { + // if the server msize is too large, use the client's suggested msize. + ch.SetMSize(int(mv.MSize)) + respmsg.MSize = mv.MSize + } else { + respmsg.MSize = uint32(ch.MSize()) + } + + resp := newFcall(respmsg) + if err := ch.WriteFcall(ctx, resp); err != nil { + return err + } + + if respmsg.Version == "unknown" { + return fmt.Errorf("bad version negotiation") + } + + return nil +}