commit - 7cd259a2e418951032d970bf316a144fda865dc5
commit + de4c554d8644b65ef777de9bc6b39639dbd7a51e
blob - ba7e8afaf8519d8105ffc64584a83ef84983882e
blob + c4b14e214cfc3b54492974e33ef924bb6b9fc5c1
--- fcall.go
+++ fcall.go
Message Message
}
-func newFcall(msg Message) *Fcall {
- var tag Tag
-
+func newFcall(tag Tag, msg Message) *Fcall {
switch msg.Type() {
case Tversion, Rversion:
tag = NOTAG
blob - 6a2c75e2341826863a836e5c7336f924e79113ac
blob + 58d498921fbdef1f84d000e53549cacbb4624061
--- server.go
+++ server.go
closed chan struct{}
}
-type activeTag struct {
- ctx context.Context
- request *Fcall
- cancel context.CancelFunc
- responded bool // true, if some response was sent (Response or Rflush/Rerror)
+// activeRequest includes information about the active request.
+type activeRequest struct {
+ ctx context.Context
+ request *Fcall
+ cancel context.CancelFunc
}
func (s *server) run() {
- tags := map[Tag]*activeTag{} // active requests
+ tags := map[Tag]*activeRequest{} // active requests
- log.Println("server.run()")
- for {
- select {
- case <-s.ctx.Done():
- log.Println("server: context done")
- return
- case <-s.closed:
- log.Println("server: shutdown")
- default:
- }
+ requests := make(chan *Fcall) // sync, read-limited
+ responses := make(chan *Fcall)
+ completed := make(chan *Fcall, 1)
- // BUG(stevvooe): This server blocks on reads, calls to handlers and
- // write, effectively single tracking fcalls through a target
- // dispatcher. There is no reason we couldn't parallelize these
- // requests out to the dispatcher to get massive performance
- // improvements.
-
- log.Println("server:", "wait")
- req := new(Fcall)
- if err := s.ch.ReadFcall(s.ctx, req); err != nil {
- if err, ok := err.(net.Error); ok {
- if err.Timeout() || err.Temporary() {
- continue
+ // read loop
+ go func() {
+ for {
+ req := new(Fcall)
+ if err := s.ch.ReadFcall(s.ctx, req); err != nil {
+ if err, ok := err.(net.Error); ok {
+ if err.Timeout() || err.Temporary() {
+ continue
+ }
}
+
+ log.Println("server: error reading fcall", err)
+ return
}
- log.Println("server: error reading fcall", err)
- return
- }
-
- if _, ok := tags[req.Tag]; ok {
- resp := newErrorFcall(req.Tag, ErrDuptag)
- if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
- log.Printf("error sending duplicate tag response: %v", err)
+ select {
+ case requests <- req:
+ case <-s.ctx.Done():
+ log.Println("server: context done")
+ return
+ case <-s.closed:
+ log.Println("server: shutdown")
+ return
}
- continue
}
+ }()
- // handle flush calls. The tag never makes it into active from here.
- if mf, ok := req.Message.(MessageTflush); ok {
- log.Println("flushing message", mf.Oldtag)
-
- // check if we have actually know about the requested flush
- active, ok := tags[mf.Oldtag]
- if ok {
- active.cancel() // cancel the context
-
- resp := newFcall(MessageRflush{})
- resp.Tag = req.Tag
+ // write loop
+ go func() {
+ for {
+ select {
+ case resp := <-responses:
if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
- log.Printf("error responding to flush: %v", err)
+ log.Println("server: error writing fcall:", err)
}
- active.responded = true
- } else {
- resp := newErrorFcall(req.Tag, ErrUnknownTag)
- if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
- log.Printf("error responding to flush: %v", err)
- }
+ case <-s.ctx.Done():
+ log.Println("server: context done")
+ return
+ case <-s.closed:
+ log.Println("server: shutdown")
+ return
}
-
- continue
}
+ }()
- // TODO(stevvooe): Add handler timeout here, as well, if we desire.
+ log.Println("server.run()")
+ for {
+ log.Println("server:", "wait")
+ select {
+ case req := <-requests:
+ log.Println("request", req)
+ if _, ok := tags[req.Tag]; ok {
+ select {
+ case responses <- newErrorFcall(req.Tag, ErrDuptag):
+ // Send to responses, bypass tag management.
+ case <-s.ctx.Done():
+ return
+ case <-s.closed:
+ return
+ }
+ continue
+ }
- // Allows us to signal handlers to cancel processing of the fcall
- // through context.
- ctx, cancel := context.WithCancel(s.ctx)
+ switch msg := req.Message.(type) {
+ case MessageTflush:
+ log.Println("server: flushing message", msg.Oldtag)
- tags[req.Tag] = &activeTag{
- ctx: ctx,
- request: req,
- cancel: cancel,
- }
+ var resp *Fcall
+ // check if we have actually know about the requested flush
+ active, ok := tags[msg.Oldtag]
+ if ok {
+ active.cancel() // cancel the context of oldtag
+ resp = newFcall(req.Tag, MessageRflush{})
+ } else {
+ resp = newErrorFcall(req.Tag, ErrUnknownTag)
+ }
- var resp *Fcall
- msg, err := s.handler.Handle(ctx, req.Message)
- if err != nil {
- // all handler errors are forwarded as protocol errors.
- resp = newErrorFcall(req.Tag, err)
- } else {
- resp = newFcall(msg)
- }
- resp.Tag = req.Tag
+ select {
+ case responses <- resp:
+ // bypass tag management in completed.
+ case <-s.ctx.Done():
+ return
+ case <-s.closed:
+ return
+ }
+ default:
+ // Allows us to session handlers to cancel processing of the fcall
+ // through context.
+ ctx, cancel := context.WithCancel(s.ctx)
- if err := ctx.Err(); err != nil {
- // NOTE(stevvooe): We aren't really getting our moneys worth for
- // how this is being handled. We really need to dispatch each
- // request handler to a separate thread.
+ // The contents of these instances are only writable in the main
+ // server loop. The value of tag will not change.
+ tags[req.Tag] = &activeRequest{
+ ctx: ctx,
+ request: req,
+ cancel: cancel,
+ }
- // the context was canceled for some reason, perhaps timeout or
- // due to a flush call. We treat this as a condition where a
- // response should not be sent.
- log.Println("context error:", err)
- continue
- }
+ go func(ctx context.Context, req *Fcall) {
+ var resp *Fcall
+ msg, err := s.handler.Handle(ctx, req.Message)
+ if err != nil {
+ // all handler errors are forwarded as protocol errors.
+ resp = newErrorFcall(req.Tag, err)
+ } else {
+ resp = newFcall(req.Tag, msg)
+ }
- if !tags[req.Tag].responded {
- if err := s.ch.WriteFcall(ctx, resp); err != nil {
- log.Println("server: error writing fcall:", err)
- continue
+ select {
+ case completed <- resp:
+ case <-ctx.Done():
+ return
+ case <-s.closed:
+ return
+ }
+ }(ctx, req)
}
- }
+ case resp := <-completed:
+ log.Println("completed", resp)
+ // only responses that flip the tag state traverse this section.
+ active, ok := tags[resp.Tag]
+ if !ok {
+ panic("BUG: unbalanced tag")
+ }
- delete(tags, req.Tag)
+ select {
+ case responses <- resp:
+ case <-active.ctx.Done():
+ // the context was canceled for some reason, perhaps timeout or
+ // due to a flush call. We treat this as a condition where a
+ // response should not be sent.
+ log.Println("canceled", resp, active.ctx.Err())
+ }
+ delete(tags, resp.Tag)
+ case <-s.ctx.Done():
+ log.Println("server: context done")
+ return
+ case <-s.closed:
+ log.Println("server: shutdown")
+ return
+ }
}
}
blob - 8d44e875d5083ae7baba35b73ddd5b16d4c71409
blob + 8d93c7c601d73f26b6390e4039775b6e873ad029
--- transport.go
+++ transport.go
return t
}
+// fcallRequest encompasses the request to send a message via fcall.
type fcallRequest struct {
ctx context.Context
- fcall *Fcall
+ message Message
response chan *Fcall
err chan error
}
-func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest {
+func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
return &fcallRequest{
ctx: ctx,
- fcall: fcall,
+ message: msg,
response: make(chan *Fcall, 1),
err: make(chan error, 1),
}
}
func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
- fcall := newFcall(msg)
- req := newFcallRequest(ctx, fcall)
+ req := newFcallRequest(ctx, msg)
// dispatch the request.
select {
log.Println("wait...")
select {
case req := <-t.requests:
- if req.fcall.Tag == NOTAG {
- // NOTE(stevvooe): We disallow fcalls with NOTAG to come
- // through this path since we can't join the tagged response
- // with the waiting caller. This is typically used for the
- // Tversion/Rversion round trip to setup a session.
- //
- // It may be better to allow these through but block all
- // requests until a notag message has a response.
-
- req.err <- fmt.Errorf("disallowed tag through transport")
- continue
- }
-
// BUG(stevvooe): This is an awful tag allocation procedure.
// Replace this with something that let's us allocate tags and
// associate data with them, returning to them to a pool when
// complete. Such a system would provide a lot of information
// about outstanding requests.
tags++
- req.fcall.Tag = tags
- outstanding[req.fcall.Tag] = req
+ fcall := newFcall(tags, req.message)
+ outstanding[fcall.Tag] = req
// TODO(stevvooe): Consider the case of requests that never
// receive a response. We need to remove the fcall context from
// the tag map and dealloc the tag. We may also want to send a
// flush for the tag.
- if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil {
- delete(outstanding, req.fcall.Tag)
+ if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
+ delete(outstanding, fcall.Tag)
req.err <- err
}
case b := <-responses:
if !ok {
panic("unknown tag received")
}
- delete(outstanding, req.fcall.Tag)
+ // BUG(stevvooe): Must detect duplicate tag and ensure that we are
+ // waking up the right caller. If a duplicate is received, the
+ // entry should not be deleted.
+ delete(outstanding, b.Tag)
+
req.response <- b
// TODO(stevvooe): Reclaim tag id.
blob - c69518c669475a28ff5c7e3cb5e54f3786f6ba9d
blob + 4c6712bcc7939d9947815e2a15bec59f9f868604
--- version.go
+++ version.go
// until a response is received. The received value will be the version
// implemented by the server.
func clientnegotiate(ctx context.Context, ch Channel, version string) (string, error) {
- req := newFcall(MessageTversion{
+ req := newFcall(NOTAG, MessageTversion{
MSize: uint32(ch.MSize()),
Version: version,
})
respmsg.MSize = uint32(ch.MSize())
}
- resp := newFcall(respmsg)
+ resp := newFcall(NOTAG, respmsg)
if err := ch.WriteFcall(ctx, resp); err != nil {
return err
}