commit - 6350d27908b0b1a459b7208b9fae7b274384efe4
commit + 3bf22e5860ff26ad641dc222b8121a2bcae316c7
blob - a9453687ff9c6bcaec2512db06f1fbd224dd936a
blob + c74ca513e2ac3883f696294c0d4f0852bcc092f6
--- channel.go
+++ channel.go
// be called concurrently with other calls to WriteFcall.
WriteFcall(ctx context.Context, fcall *Fcall) error
+ // MSize returns the current msize for the channel.
+ MSize() int
+
// SetMSize sets the maximum message size for the channel. This must never
// be called currently with ReadFcall or WriteFcall.
SetMSize(msize int)
}
+func NewChannel(conn net.Conn, msize int) Channel {
+ return newChannel(conn, codec9p{}, msize)
+}
+
const (
defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context
)
}
}
+func (ch *channel) MSize() int {
+ return ch.msize
+}
+
// setmsize resizes the buffers for use with a separate msize. This call must
// be protected by a mutex or made before passing to other goroutines.
-func (ch *channel) setmsize(msize int) {
+func (ch *channel) SetMSize(msize int) {
// NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
// Proceed assuming that original size is sufficient.
ch.wrbuf = make([]byte, msize)
}
-// version negiotiates the protocol version using channel, blocking until a
-// response is received. The received values can be used to set msize for the
-// channel or assist in client setup.
-func (ch *channel) version(ctx context.Context, msize uint32, version string) (uint32, string, error) {
- req := newFcall(MessageTversion{
- MSize: uint32(msize),
- Version: version,
- })
-
- if err := ch.writeFcall(ctx, req); err != nil {
- return 0, "", err
- }
-
- resp := new(Fcall)
- if err := ch.readFcall(ctx, resp); err != nil {
- return 0, "", err
- }
-
- mv, ok := resp.Message.(*MessageRversion)
- if !ok {
- return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
- }
-
- return mv.MSize, mv.Version, nil
-}
-
-// negotiate blocks until a version message is received or a timeout occurs.
-// The msize for the tranport will be set from the negotiation. If negotiate
-// returns nil, a server may proceed with the connection.
-//
-// In the future, it might be better to handle the version messages in a
-// separate object that manages the session. Each set of version requests
-// effectively "reset" a connection, meaning all fids get clunked and all
-// outstanding IO is aborted. This is probably slightly racy, in practice with
-// a misbehaved client. The main issue is that we cannot tell which session
-// messages belong to.
-func (ch *channel) negotiate(ctx context.Context, version string) error {
- // wait for the version message over the transport.
- req := new(Fcall)
- if err := ch.readFcall(ctx, req); err != nil {
- return err
- }
-
- mv, ok := req.Message.(*MessageTversion)
- if !ok {
- return fmt.Errorf("expected version message: %v", mv)
- }
-
- respmsg := MessageRversion{
- Version: version,
- }
-
- if mv.Version != version {
- // TODO(stevvooe): Not the best place to do version handling. We need
- // to have a way to pass supported versions into this method then have
- // it return the actual version. For now, respond with unknown for
- // anything that doesn't match the provided version string.
- respmsg.Version = "unknown"
- }
-
- if int(mv.MSize) < ch.msize {
- // if the server msize is too large, use the client's suggested msize.
- ch.setmsize(int(mv.MSize))
- respmsg.MSize = mv.MSize
- } else {
- respmsg.MSize = uint32(ch.msize)
- }
-
- resp := newFcall(respmsg)
- if err := ch.writeFcall(ctx, resp); err != nil {
- return err
- }
-
- if respmsg.Version == "unknown" {
- return fmt.Errorf("bad version negotiation")
- }
-
- return nil
-}
-
// ReadFcall reads the next message from the channel into fcall.
-func (ch *channel) readFcall(ctx context.Context, fcall *Fcall) error {
+func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error {
select {
case <-ctx.Done():
return ctx.Err()
return nil
}
-func (ch *channel) writeFcall(ctx context.Context, fcall *Fcall) error {
+func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error {
select {
case <-ctx.Done():
return ctx.Err()
blob - 9e1f5f447816c2a1ba015bc2513c4dc1638d1d4f
blob + a1b80f10d12ed6dc1acfecf66b41eb22345519e2
--- client.go
+++ client.go
ch := newChannel(conn, codec9p{}, msize) // sets msize, effectively.
// negotiate the protocol version
- smsize, svers, err := ch.version(ctx, msize, vers)
+ _, err := clientnegotiate(ctx, ch, vers)
if err != nil {
return nil, err
}
- if svers != vers {
- // TODO(stevvooe): A stubborn client indeed!
- return nil, fmt.Errorf("unsupported server version: %v", vers)
- }
-
- if smsize > msize {
- // upgrade msize if server differs.
- ch.setmsize(int(smsize))
- }
-
return &client{
version: vers,
msize: msize,
blob - cb90f4c988eb50b54f9e78cd447dbe74b543727c
blob + 1c2cb5811dd66ef98bf1501ca06ad8ce6b6f6d8a
--- server.go
+++ server.go
// do this outside of this function and then pass in a ready made channel.
// We are not really ready to export the channel type yet.
- if err := ch.negotiate(negctx, vers); err != nil {
+ if err := servernegotiate(negctx, ch, vers); err != nil {
// TODO(stevvooe): Need better error handling and retry support here.
// For now, we silently ignore the failure.
log.Println("error negotiating version:", err)
type server struct {
ctx context.Context
session Session
- ch *channel
+ ch Channel
handler handler
closed chan struct{}
}
log.Println("server:", "wait")
req := new(Fcall)
- if err := s.ch.readFcall(s.ctx, req); err != nil {
+ if err := s.ch.ReadFcall(s.ctx, req); err != nil {
log.Println("server: error reading fcall", err)
continue
}
if _, ok := tags[req.Tag]; ok {
resp := newErrorFcall(req.Tag, ErrDuptag)
- if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+ if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
log.Printf("error sending duplicate tag response: %v", err)
}
continue
resp := newFcall(MessageRflush{})
resp.Tag = req.Tag
- if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+ if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
log.Printf("error responding to flush: %v", err)
}
active.responded = true
} else {
resp := newErrorFcall(req.Tag, ErrUnknownTag)
- if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+ if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
log.Printf("error responding to flush: %v", err)
}
}
}
if !tags[req.Tag].responded {
- if err := s.ch.writeFcall(ctx, resp); err != nil {
+ if err := s.ch.WriteFcall(ctx, resp); err != nil {
log.Println("server: error writing fcall:", err)
continue
}
blob - b7b890057006eb9f329f5e4fbc253f9e15b20fce
blob + 3caf076db643d3f72ce17f7e3d2b78ebcb38c588
--- transport.go
+++ transport.go
// send. On the whole, transport is thread-safe for calling send
type transport struct {
ctx context.Context
- ch *channel
+ ch Channel
requests chan *fcallRequest
closed chan struct{}
loop:
for {
fcall := new(Fcall)
- if err := t.ch.readFcall(t.ctx, fcall); err != nil {
+ if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
switch err := err.(type) {
case net.Error:
if err.Timeout() || err.Temporary() {
// receive a response. We need to remove the fcall context from
// the tag map and dealloc the tag. We may also want to send a
// flush for the tag.
- if err := t.ch.writeFcall(req.ctx, req.fcall); err != nil {
+ if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil {
log.Println("error writing fcall", err, req.fcall)
delete(outstanding, req.fcall.Tag)
req.err <- err
blob - /dev/null
blob + c4c7133658b6c3bd3b428135ca0b61e56b00b033 (mode 644)
--- /dev/null
+++ version.go
+package p9pnew
+
+import (
+ "fmt"
+
+ "golang.org/x/net/context"
+)
+
+// NOTE(stevvooe): This file contains functions for negotiating version on the
+// client and server. There are some nasty details to get right for
+// downgrading the connection on the server-side that are not present yet.
+// Really, these should be refactored into some sort of channel type that can
+// support resets through version messages during the protocol exchange.
+
+// clientnegotiate negiotiates the protocol version using channel, blocking
+// until a response is received. The received value will be the version
+// implemented by the server.
+func clientnegotiate(ctx context.Context, ch Channel, version string) (string, error) {
+ req := newFcall(MessageTversion{
+ MSize: uint32(ch.MSize()),
+ Version: version,
+ })
+
+ if err := ch.WriteFcall(ctx, req); err != nil {
+ return "", err
+ }
+
+ resp := new(Fcall)
+ if err := ch.ReadFcall(ctx, resp); err != nil {
+ return "", err
+ }
+
+ switch v := resp.Message.(type) {
+ case *MessageRversion:
+
+ if v.Version != version {
+ // TODO(stevvooe): A stubborn client indeed!
+ return "", fmt.Errorf("unsupported server version: %v", version)
+ }
+
+ if int(v.MSize) > ch.MSize() {
+ // upgrade msize if server differs.
+ ch.SetMSize(int(v.MSize))
+ }
+
+ return v.Version, nil
+ case error:
+ return "", v
+ default:
+ return "", fmt.Errorf("invalid rpc response for version message: %v", resp)
+ }
+}
+
+// servernegotiate blocks until a version message is received or a timeout
+// occurs. The msize for the tranport will be set from the negotiation. If
+// negotiate returns nil, a server may proceed with the connection.
+//
+// In the future, it might be better to handle the version messages in a
+// separate object that manages the session. Each set of version requests
+// effectively "reset" a connection, meaning all fids get clunked and all
+// outstanding IO is aborted. This is probably slightly racy, in practice with
+// a misbehaved client. The main issue is that we cannot tell which session
+// messages belong to.
+func servernegotiate(ctx context.Context, ch Channel, version string) error {
+ // wait for the version message over the transport.
+ req := new(Fcall)
+ if err := ch.ReadFcall(ctx, req); err != nil {
+ return err
+ }
+
+ mv, ok := req.Message.(*MessageTversion)
+ if !ok {
+ return fmt.Errorf("expected version message: %v", mv)
+ }
+
+ respmsg := MessageRversion{
+ Version: version,
+ }
+
+ if mv.Version != version {
+ // TODO(stevvooe): Not the best place to do version handling. We need
+ // to have a way to pass supported versions into this method then have
+ // it return the actual version. For now, respond with unknown for
+ // anything that doesn't match the provided version string.
+ respmsg.Version = "unknown"
+ }
+
+ if int(mv.MSize) < ch.MSize() {
+ // if the server msize is too large, use the client's suggested msize.
+ ch.SetMSize(int(mv.MSize))
+ respmsg.MSize = mv.MSize
+ } else {
+ respmsg.MSize = uint32(ch.MSize())
+ }
+
+ resp := newFcall(respmsg)
+ if err := ch.WriteFcall(ctx, resp); err != nil {
+ return err
+ }
+
+ if respmsg.Version == "unknown" {
+ return fmt.Errorf("bad version negotiation")
+ }
+
+ return nil
+}