Blame


1 fb37ce2a 2015-10-30 stephen.d package p9pnew
2 fb37ce2a 2015-10-30 stephen.d
3 fb37ce2a 2015-10-30 stephen.d import (
4 fb37ce2a 2015-10-30 stephen.d "bufio"
5 fb37ce2a 2015-10-30 stephen.d "encoding/binary"
6 fb37ce2a 2015-10-30 stephen.d "fmt"
7 fb37ce2a 2015-10-30 stephen.d "io"
8 fb37ce2a 2015-10-30 stephen.d "io/ioutil"
9 fb37ce2a 2015-10-30 stephen.d "log"
10 fb37ce2a 2015-10-30 stephen.d "net"
11 a8abc687 2015-10-30 stephen.d "time"
12 fb37ce2a 2015-10-30 stephen.d
13 fb37ce2a 2015-10-30 stephen.d "golang.org/x/net/context"
14 fb37ce2a 2015-10-30 stephen.d )
15 fb37ce2a 2015-10-30 stephen.d
16 40d4a02d 2015-11-03 stephen.d // Channel defines the operations necessary to implement a 9p message channel
17 40d4a02d 2015-11-03 stephen.d // interface. Typically, message channels do no protocol processing except to
18 40d4a02d 2015-11-03 stephen.d // send and receive message frames.
19 40d4a02d 2015-11-03 stephen.d type Channel interface {
20 40d4a02d 2015-11-03 stephen.d // ReadFcall reads one fcall frame into the provided fcall structure. The
21 40d4a02d 2015-11-03 stephen.d // Fcall may be cleared whether there is an error or not. If the operation
22 40d4a02d 2015-11-03 stephen.d // is successful, the contents of the fcall will be populated in the
23 40d4a02d 2015-11-03 stephen.d // argument. ReadFcall cannot be called concurrently with other calls to
24 40d4a02d 2015-11-03 stephen.d // ReadFcall. This both to preserve message ordering and to allow lockless
25 40d4a02d 2015-11-03 stephen.d // buffer reusage.
26 40d4a02d 2015-11-03 stephen.d ReadFcall(ctx context.Context, fcall *Fcall) error
27 40d4a02d 2015-11-03 stephen.d
28 40d4a02d 2015-11-03 stephen.d // WriteFcall writes the provided fcall to the channel. WriteFcall cannot
29 40d4a02d 2015-11-03 stephen.d // be called concurrently with other calls to WriteFcall.
30 40d4a02d 2015-11-03 stephen.d WriteFcall(ctx context.Context, fcall *Fcall) error
31 40d4a02d 2015-11-03 stephen.d
32 3bf22e58 2015-11-04 stephen.d // MSize returns the current msize for the channel.
33 3bf22e58 2015-11-04 stephen.d MSize() int
34 3bf22e58 2015-11-04 stephen.d
35 40d4a02d 2015-11-03 stephen.d // SetMSize sets the maximum message size for the channel. This must never
36 40d4a02d 2015-11-03 stephen.d // be called currently with ReadFcall or WriteFcall.
37 40d4a02d 2015-11-03 stephen.d SetMSize(msize int)
38 40d4a02d 2015-11-03 stephen.d }
39 40d4a02d 2015-11-03 stephen.d
40 3bf22e58 2015-11-04 stephen.d func NewChannel(conn net.Conn, msize int) Channel {
41 3bf22e58 2015-11-04 stephen.d return newChannel(conn, codec9p{}, msize)
42 3bf22e58 2015-11-04 stephen.d }
43 3bf22e58 2015-11-04 stephen.d
44 a8abc687 2015-10-30 stephen.d const (
45 a8abc687 2015-10-30 stephen.d defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context
46 a8abc687 2015-10-30 stephen.d )
47 a8abc687 2015-10-30 stephen.d
48 fb37ce2a 2015-10-30 stephen.d // channel provides bidirectional protocol framing for 9p over net.Conn.
49 fb37ce2a 2015-10-30 stephen.d // Operations are not thread-safe but reads and writes may be carried out
50 fb37ce2a 2015-10-30 stephen.d // concurrently, supporting separate read and write loops.
51 40d4a02d 2015-11-03 stephen.d //
52 40d4a02d 2015-11-03 stephen.d // Lifecyle
53 40d4a02d 2015-11-03 stephen.d //
54 40d4a02d 2015-11-03 stephen.d // A connection, or message channel abstraction, has a lifecycle delineated by
55 40d4a02d 2015-11-03 stephen.d // Tversion/Rversion request response cycles. For now, this is part of the
56 40d4a02d 2015-11-03 stephen.d // channel itself but doesn't necessarily influence the channels state, except
57 40d4a02d 2015-11-03 stephen.d // the msize. Visually, it might look something like this:
58 40d4a02d 2015-11-03 stephen.d //
59 40d4a02d 2015-11-03 stephen.d // [Established] -> [Version] -> [Session] -> [Version]---+
60 40d4a02d 2015-11-03 stephen.d // ^ |
61 40d4a02d 2015-11-03 stephen.d // |_________________________________|
62 40d4a02d 2015-11-03 stephen.d //
63 40d4a02d 2015-11-03 stephen.d // The connection is established, then we negotiate a version, run a session,
64 40d4a02d 2015-11-03 stephen.d // then negotiate a version and so on. For most purposes, we are likely going
65 40d4a02d 2015-11-03 stephen.d // to terminate the connection after the session but we may want to support
66 40d4a02d 2015-11-03 stephen.d // connection pooling. Pooling may result in possible security leaks if the
67 40d4a02d 2015-11-03 stephen.d // connections are shared among contexts, since the version is negotiated at
68 40d4a02d 2015-11-03 stephen.d // the start of the session. To avoid this, we can actually use a "tombstone"
69 40d4a02d 2015-11-03 stephen.d // version message which clears the server's session state without starting a
70 40d4a02d 2015-11-03 stephen.d // new session. The next version message would then prepare the session
71 40d4a02d 2015-11-03 stephen.d // without leaking any Fid's.
72 fb37ce2a 2015-10-30 stephen.d type channel struct {
73 fb37ce2a 2015-10-30 stephen.d conn net.Conn
74 fb37ce2a 2015-10-30 stephen.d codec Codec
75 fb37ce2a 2015-10-30 stephen.d brd *bufio.Reader
76 fb37ce2a 2015-10-30 stephen.d bwr *bufio.Writer
77 fb37ce2a 2015-10-30 stephen.d closed chan struct{}
78 40d4a02d 2015-11-03 stephen.d msize int
79 fb37ce2a 2015-10-30 stephen.d rdbuf []byte
80 fb37ce2a 2015-10-30 stephen.d wrbuf []byte
81 fb37ce2a 2015-10-30 stephen.d }
82 fb37ce2a 2015-10-30 stephen.d
83 fb37ce2a 2015-10-30 stephen.d func newChannel(conn net.Conn, codec Codec, msize int) *channel {
84 fb37ce2a 2015-10-30 stephen.d return &channel{
85 fb37ce2a 2015-10-30 stephen.d conn: conn,
86 fb37ce2a 2015-10-30 stephen.d codec: codec,
87 fb37ce2a 2015-10-30 stephen.d brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size
88 fb37ce2a 2015-10-30 stephen.d bwr: bufio.NewWriterSize(conn, msize),
89 fb37ce2a 2015-10-30 stephen.d closed: make(chan struct{}),
90 40d4a02d 2015-11-03 stephen.d msize: msize,
91 fb37ce2a 2015-10-30 stephen.d rdbuf: make([]byte, msize),
92 fb37ce2a 2015-10-30 stephen.d wrbuf: make([]byte, msize),
93 fb37ce2a 2015-10-30 stephen.d }
94 fb37ce2a 2015-10-30 stephen.d }
95 fb37ce2a 2015-10-30 stephen.d
96 3bf22e58 2015-11-04 stephen.d func (ch *channel) MSize() int {
97 3bf22e58 2015-11-04 stephen.d return ch.msize
98 3bf22e58 2015-11-04 stephen.d }
99 3bf22e58 2015-11-04 stephen.d
100 fb37ce2a 2015-10-30 stephen.d // setmsize resizes the buffers for use with a separate msize. This call must
101 fb37ce2a 2015-10-30 stephen.d // be protected by a mutex or made before passing to other goroutines.
102 3bf22e58 2015-11-04 stephen.d func (ch *channel) SetMSize(msize int) {
103 fb37ce2a 2015-10-30 stephen.d // NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
104 fb37ce2a 2015-10-30 stephen.d // Proceed assuming that original size is sufficient.
105 fb37ce2a 2015-10-30 stephen.d
106 40d4a02d 2015-11-03 stephen.d ch.msize = msize
107 fb37ce2a 2015-10-30 stephen.d if msize < len(ch.rdbuf) {
108 fb37ce2a 2015-10-30 stephen.d // just change the cap
109 fb37ce2a 2015-10-30 stephen.d ch.rdbuf = ch.rdbuf[:msize]
110 fb37ce2a 2015-10-30 stephen.d ch.wrbuf = ch.wrbuf[:msize]
111 fb37ce2a 2015-10-30 stephen.d return
112 fb37ce2a 2015-10-30 stephen.d }
113 fb37ce2a 2015-10-30 stephen.d
114 fb37ce2a 2015-10-30 stephen.d ch.rdbuf = make([]byte, msize)
115 fb37ce2a 2015-10-30 stephen.d ch.wrbuf = make([]byte, msize)
116 fb37ce2a 2015-10-30 stephen.d }
117 fb37ce2a 2015-10-30 stephen.d
118 fb37ce2a 2015-10-30 stephen.d // ReadFcall reads the next message from the channel into fcall.
119 3bf22e58 2015-11-04 stephen.d func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error {
120 fb37ce2a 2015-10-30 stephen.d select {
121 40d4a02d 2015-11-03 stephen.d case <-ctx.Done():
122 40d4a02d 2015-11-03 stephen.d return ctx.Err()
123 fb37ce2a 2015-10-30 stephen.d case <-ch.closed:
124 fb37ce2a 2015-10-30 stephen.d return ErrClosed
125 fb37ce2a 2015-10-30 stephen.d default:
126 fb37ce2a 2015-10-30 stephen.d }
127 fb37ce2a 2015-10-30 stephen.d
128 a8abc687 2015-10-30 stephen.d deadline, ok := ctx.Deadline()
129 a8abc687 2015-10-30 stephen.d if !ok {
130 a8abc687 2015-10-30 stephen.d deadline = time.Now().Add(defaultRWTimeout)
131 fb37ce2a 2015-10-30 stephen.d }
132 fb37ce2a 2015-10-30 stephen.d
133 a8abc687 2015-10-30 stephen.d if err := ch.conn.SetReadDeadline(deadline); err != nil {
134 a8abc687 2015-10-30 stephen.d log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
135 a8abc687 2015-10-30 stephen.d }
136 a8abc687 2015-10-30 stephen.d
137 fb37ce2a 2015-10-30 stephen.d n, err := readmsg(ch.brd, ch.rdbuf)
138 fb37ce2a 2015-10-30 stephen.d if err != nil {
139 fb37ce2a 2015-10-30 stephen.d // TODO(stevvooe): There may be more we can do here to detect partial
140 fb37ce2a 2015-10-30 stephen.d // reads. For now, we just propagate the error untouched.
141 fb37ce2a 2015-10-30 stephen.d return err
142 fb37ce2a 2015-10-30 stephen.d }
143 fb37ce2a 2015-10-30 stephen.d
144 fb37ce2a 2015-10-30 stephen.d if n > len(ch.rdbuf) {
145 fb37ce2a 2015-10-30 stephen.d // TODO(stevvooe): Make this error detectable and respond with error
146 fb37ce2a 2015-10-30 stephen.d // message.
147 fb37ce2a 2015-10-30 stephen.d return fmt.Errorf("message large than buffer:", n)
148 fb37ce2a 2015-10-30 stephen.d }
149 fb37ce2a 2015-10-30 stephen.d
150 40d4a02d 2015-11-03 stephen.d // clear out the fcall
151 40d4a02d 2015-11-03 stephen.d *fcall = Fcall{}
152 74ec7ac9 2015-10-30 stephen.d if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
153 74ec7ac9 2015-10-30 stephen.d return err
154 74ec7ac9 2015-10-30 stephen.d }
155 74ec7ac9 2015-10-30 stephen.d log.Println("channel: recv", fcall)
156 74ec7ac9 2015-10-30 stephen.d return nil
157 fb37ce2a 2015-10-30 stephen.d }
158 fb37ce2a 2015-10-30 stephen.d
159 3bf22e58 2015-11-04 stephen.d func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error {
160 fb37ce2a 2015-10-30 stephen.d select {
161 40d4a02d 2015-11-03 stephen.d case <-ctx.Done():
162 40d4a02d 2015-11-03 stephen.d return ctx.Err()
163 fb37ce2a 2015-10-30 stephen.d case <-ch.closed:
164 fb37ce2a 2015-10-30 stephen.d return ErrClosed
165 fb37ce2a 2015-10-30 stephen.d default:
166 fb37ce2a 2015-10-30 stephen.d }
167 74ec7ac9 2015-10-30 stephen.d log.Println("channel: send", fcall)
168 fb37ce2a 2015-10-30 stephen.d
169 a8abc687 2015-10-30 stephen.d deadline, ok := ctx.Deadline()
170 a8abc687 2015-10-30 stephen.d if !ok {
171 a8abc687 2015-10-30 stephen.d deadline = time.Now().Add(defaultRWTimeout)
172 fb37ce2a 2015-10-30 stephen.d }
173 fb37ce2a 2015-10-30 stephen.d
174 a8abc687 2015-10-30 stephen.d if err := ch.conn.SetWriteDeadline(deadline); err != nil {
175 a8abc687 2015-10-30 stephen.d log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
176 a8abc687 2015-10-30 stephen.d }
177 a8abc687 2015-10-30 stephen.d
178 fb37ce2a 2015-10-30 stephen.d n, err := ch.codec.Marshal(ch.wrbuf, fcall)
179 fb37ce2a 2015-10-30 stephen.d if err != nil {
180 fb37ce2a 2015-10-30 stephen.d return err
181 fb37ce2a 2015-10-30 stephen.d }
182 fb37ce2a 2015-10-30 stephen.d
183 fb37ce2a 2015-10-30 stephen.d p := ch.wrbuf[:n]
184 fb37ce2a 2015-10-30 stephen.d
185 fb37ce2a 2015-10-30 stephen.d if err := sendmsg(ch.bwr, p); err != nil {
186 fb37ce2a 2015-10-30 stephen.d return err
187 fb37ce2a 2015-10-30 stephen.d }
188 fb37ce2a 2015-10-30 stephen.d
189 fb37ce2a 2015-10-30 stephen.d return ch.bwr.Flush()
190 fb37ce2a 2015-10-30 stephen.d }
191 fb37ce2a 2015-10-30 stephen.d
192 fb37ce2a 2015-10-30 stephen.d // readmsg reads a 9p message into p from rd, ensuring that all bytes are
193 fb37ce2a 2015-10-30 stephen.d // consumed from the size header. If the size header indicates the message is
194 fb37ce2a 2015-10-30 stephen.d // larger than p, the entire message will be discarded, leaving a truncated
195 fb37ce2a 2015-10-30 stephen.d // portion in p. Any error should be treated as a framing error unless n is
196 fb37ce2a 2015-10-30 stephen.d // zero. The caller must check that n is less than or equal to len(p) to
197 fb37ce2a 2015-10-30 stephen.d // ensure that a valid message has been read.
198 fb37ce2a 2015-10-30 stephen.d func readmsg(rd io.Reader, p []byte) (n int, err error) {
199 fb37ce2a 2015-10-30 stephen.d var msize uint32
200 fb37ce2a 2015-10-30 stephen.d
201 fb37ce2a 2015-10-30 stephen.d if err := binary.Read(rd, binary.LittleEndian, &msize); err != nil {
202 fb37ce2a 2015-10-30 stephen.d return 0, err
203 fb37ce2a 2015-10-30 stephen.d }
204 fb37ce2a 2015-10-30 stephen.d
205 fb37ce2a 2015-10-30 stephen.d n += binary.Size(msize)
206 fb37ce2a 2015-10-30 stephen.d mbody := int(msize) - 4
207 fb37ce2a 2015-10-30 stephen.d
208 fb37ce2a 2015-10-30 stephen.d if mbody < len(p) {
209 fb37ce2a 2015-10-30 stephen.d p = p[:mbody]
210 fb37ce2a 2015-10-30 stephen.d }
211 fb37ce2a 2015-10-30 stephen.d
212 fb37ce2a 2015-10-30 stephen.d np, err := io.ReadFull(rd, p)
213 fb37ce2a 2015-10-30 stephen.d if err != nil {
214 fb37ce2a 2015-10-30 stephen.d return np + n, err
215 fb37ce2a 2015-10-30 stephen.d }
216 fb37ce2a 2015-10-30 stephen.d n += np
217 fb37ce2a 2015-10-30 stephen.d
218 fb37ce2a 2015-10-30 stephen.d if mbody > len(p) {
219 fb37ce2a 2015-10-30 stephen.d // message has been read up to len(p) but we must consume the entire
220 fb37ce2a 2015-10-30 stephen.d // message. This is an error condition but is non-fatal if we can
221 fb37ce2a 2015-10-30 stephen.d // consume msize bytes.
222 fb37ce2a 2015-10-30 stephen.d nn, err := io.CopyN(ioutil.Discard, rd, int64(mbody-len(p)))
223 fb37ce2a 2015-10-30 stephen.d n += int(nn)
224 fb37ce2a 2015-10-30 stephen.d if err != nil {
225 fb37ce2a 2015-10-30 stephen.d return n, err
226 fb37ce2a 2015-10-30 stephen.d }
227 fb37ce2a 2015-10-30 stephen.d }
228 fb37ce2a 2015-10-30 stephen.d
229 fb37ce2a 2015-10-30 stephen.d return n, nil
230 fb37ce2a 2015-10-30 stephen.d }
231 fb37ce2a 2015-10-30 stephen.d
232 fb37ce2a 2015-10-30 stephen.d // sendmsg writes a message of len(p) to wr with a 9p size header. All errors
233 fb37ce2a 2015-10-30 stephen.d // should be considered terminal.
234 fb37ce2a 2015-10-30 stephen.d func sendmsg(wr io.Writer, p []byte) error {
235 fb37ce2a 2015-10-30 stephen.d size := uint32(len(p) + 4) // message size plus 4-bytes for size.
236 fb37ce2a 2015-10-30 stephen.d if err := binary.Write(wr, binary.LittleEndian, size); err != nil {
237 fb37ce2a 2015-10-30 stephen.d return nil
238 fb37ce2a 2015-10-30 stephen.d }
239 fb37ce2a 2015-10-30 stephen.d
240 fb37ce2a 2015-10-30 stephen.d // This assume partial writes to wr aren't possible. Not sure if this
241 fb37ce2a 2015-10-30 stephen.d // valid. Matters during timeout retries.
242 fb37ce2a 2015-10-30 stephen.d if n, err := wr.Write(p); err != nil {
243 fb37ce2a 2015-10-30 stephen.d return err
244 fb37ce2a 2015-10-30 stephen.d } else if n < len(p) {
245 fb37ce2a 2015-10-30 stephen.d return io.ErrShortWrite
246 fb37ce2a 2015-10-30 stephen.d }
247 fb37ce2a 2015-10-30 stephen.d
248 fb37ce2a 2015-10-30 stephen.d return nil
249 fb37ce2a 2015-10-30 stephen.d }