Blame


1 4b33cdd0 2015-11-30 stephen.d package p9p
2 4b33cdd0 2015-11-30 stephen.d
3 4b33cdd0 2015-11-30 stephen.d import (
4 4b33cdd0 2015-11-30 stephen.d "bufio"
5 4b33cdd0 2015-11-30 stephen.d "encoding/binary"
6 4b33cdd0 2015-11-30 stephen.d "fmt"
7 4b33cdd0 2015-11-30 stephen.d "io"
8 4b33cdd0 2015-11-30 stephen.d "io/ioutil"
9 4b33cdd0 2015-11-30 stephen.d "log"
10 4b33cdd0 2015-11-30 stephen.d "net"
11 4b33cdd0 2015-11-30 stephen.d "time"
12 4b33cdd0 2015-11-30 stephen.d
13 529e2b2e 2016-11-14 noreply "context"
14 4b33cdd0 2015-11-30 stephen.d )
15 4b33cdd0 2015-11-30 stephen.d
16 4b33cdd0 2015-11-30 stephen.d // Channel defines the operations necessary to implement a 9p message channel
17 4b33cdd0 2015-11-30 stephen.d // interface. Typically, message channels do no protocol processing except to
18 4b33cdd0 2015-11-30 stephen.d // send and receive message frames.
19 4b33cdd0 2015-11-30 stephen.d type Channel interface {
20 4b33cdd0 2015-11-30 stephen.d // ReadFcall reads one fcall frame into the provided fcall structure. The
21 4b33cdd0 2015-11-30 stephen.d // Fcall may be cleared whether there is an error or not. If the operation
22 4b33cdd0 2015-11-30 stephen.d // is successful, the contents of the fcall will be populated in the
23 4b33cdd0 2015-11-30 stephen.d // argument. ReadFcall cannot be called concurrently with other calls to
24 4b33cdd0 2015-11-30 stephen.d // ReadFcall. This both to preserve message ordering and to allow lockless
25 4b33cdd0 2015-11-30 stephen.d // buffer reusage.
26 4b33cdd0 2015-11-30 stephen.d ReadFcall(ctx context.Context, fcall *Fcall) error
27 4b33cdd0 2015-11-30 stephen.d
28 4b33cdd0 2015-11-30 stephen.d // WriteFcall writes the provided fcall to the channel. WriteFcall cannot
29 4b33cdd0 2015-11-30 stephen.d // be called concurrently with other calls to WriteFcall.
30 4b33cdd0 2015-11-30 stephen.d WriteFcall(ctx context.Context, fcall *Fcall) error
31 4b33cdd0 2015-11-30 stephen.d
32 4b33cdd0 2015-11-30 stephen.d // MSize returns the current msize for the channel.
33 4b33cdd0 2015-11-30 stephen.d MSize() int
34 4b33cdd0 2015-11-30 stephen.d
35 4b33cdd0 2015-11-30 stephen.d // SetMSize sets the maximum message size for the channel. This must never
36 4b33cdd0 2015-11-30 stephen.d // be called currently with ReadFcall or WriteFcall.
37 4b33cdd0 2015-11-30 stephen.d SetMSize(msize int)
38 4b33cdd0 2015-11-30 stephen.d }
39 4b33cdd0 2015-11-30 stephen.d
40 a0568195 2016-05-23 stevvooe // NewChannel returns a new channel to read and write Fcalls with the provided
41 a0568195 2016-05-23 stevvooe // connection and message size.
42 4b33cdd0 2015-11-30 stephen.d func NewChannel(conn net.Conn, msize int) Channel {
43 4b33cdd0 2015-11-30 stephen.d return newChannel(conn, codec9p{}, msize)
44 4b33cdd0 2015-11-30 stephen.d }
45 4b33cdd0 2015-11-30 stephen.d
46 4b33cdd0 2015-11-30 stephen.d const (
47 e797c539 2016-05-18 stevvooe defaultRWTimeout = 30 * time.Second // default read/write timeout if not set in context
48 4b33cdd0 2015-11-30 stephen.d )
49 4b33cdd0 2015-11-30 stephen.d
50 4b33cdd0 2015-11-30 stephen.d // channel provides bidirectional protocol framing for 9p over net.Conn.
51 4b33cdd0 2015-11-30 stephen.d // Operations are not thread-safe but reads and writes may be carried out
52 4b33cdd0 2015-11-30 stephen.d // concurrently, supporting separate read and write loops.
53 4b33cdd0 2015-11-30 stephen.d //
54 4b33cdd0 2015-11-30 stephen.d // Lifecyle
55 4b33cdd0 2015-11-30 stephen.d //
56 4b33cdd0 2015-11-30 stephen.d // A connection, or message channel abstraction, has a lifecycle delineated by
57 4b33cdd0 2015-11-30 stephen.d // Tversion/Rversion request response cycles. For now, this is part of the
58 4b33cdd0 2015-11-30 stephen.d // channel itself but doesn't necessarily influence the channels state, except
59 4b33cdd0 2015-11-30 stephen.d // the msize. Visually, it might look something like this:
60 4b33cdd0 2015-11-30 stephen.d //
61 4b33cdd0 2015-11-30 stephen.d // [Established] -> [Version] -> [Session] -> [Version]---+
62 4b33cdd0 2015-11-30 stephen.d // ^ |
63 4b33cdd0 2015-11-30 stephen.d // |_________________________________|
64 4b33cdd0 2015-11-30 stephen.d //
65 4b33cdd0 2015-11-30 stephen.d // The connection is established, then we negotiate a version, run a session,
66 4b33cdd0 2015-11-30 stephen.d // then negotiate a version and so on. For most purposes, we are likely going
67 4b33cdd0 2015-11-30 stephen.d // to terminate the connection after the session but we may want to support
68 4b33cdd0 2015-11-30 stephen.d // connection pooling. Pooling may result in possible security leaks if the
69 4b33cdd0 2015-11-30 stephen.d // connections are shared among contexts, since the version is negotiated at
70 4b33cdd0 2015-11-30 stephen.d // the start of the session. To avoid this, we can actually use a "tombstone"
71 4b33cdd0 2015-11-30 stephen.d // version message which clears the server's session state without starting a
72 4b33cdd0 2015-11-30 stephen.d // new session. The next version message would then prepare the session
73 4b33cdd0 2015-11-30 stephen.d // without leaking any Fid's.
74 4b33cdd0 2015-11-30 stephen.d type channel struct {
75 4b33cdd0 2015-11-30 stephen.d conn net.Conn
76 4b33cdd0 2015-11-30 stephen.d codec Codec
77 4b33cdd0 2015-11-30 stephen.d brd *bufio.Reader
78 4b33cdd0 2015-11-30 stephen.d bwr *bufio.Writer
79 4b33cdd0 2015-11-30 stephen.d closed chan struct{}
80 4b33cdd0 2015-11-30 stephen.d msize int
81 4b33cdd0 2015-11-30 stephen.d rdbuf []byte
82 4b33cdd0 2015-11-30 stephen.d }
83 4b33cdd0 2015-11-30 stephen.d
84 4b33cdd0 2015-11-30 stephen.d func newChannel(conn net.Conn, codec Codec, msize int) *channel {
85 4b33cdd0 2015-11-30 stephen.d return &channel{
86 4b33cdd0 2015-11-30 stephen.d conn: conn,
87 4b33cdd0 2015-11-30 stephen.d codec: codec,
88 4b33cdd0 2015-11-30 stephen.d brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size
89 4b33cdd0 2015-11-30 stephen.d bwr: bufio.NewWriterSize(conn, msize),
90 4b33cdd0 2015-11-30 stephen.d closed: make(chan struct{}),
91 4b33cdd0 2015-11-30 stephen.d msize: msize,
92 4b33cdd0 2015-11-30 stephen.d rdbuf: make([]byte, msize),
93 4b33cdd0 2015-11-30 stephen.d }
94 4b33cdd0 2015-11-30 stephen.d }
95 4b33cdd0 2015-11-30 stephen.d
96 4b33cdd0 2015-11-30 stephen.d func (ch *channel) MSize() int {
97 4b33cdd0 2015-11-30 stephen.d return ch.msize
98 4b33cdd0 2015-11-30 stephen.d }
99 4b33cdd0 2015-11-30 stephen.d
100 4b33cdd0 2015-11-30 stephen.d // setmsize resizes the buffers for use with a separate msize. This call must
101 4b33cdd0 2015-11-30 stephen.d // be protected by a mutex or made before passing to other goroutines.
102 4b33cdd0 2015-11-30 stephen.d func (ch *channel) SetMSize(msize int) {
103 4b33cdd0 2015-11-30 stephen.d // NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
104 4b33cdd0 2015-11-30 stephen.d // Proceed assuming that original size is sufficient.
105 4b33cdd0 2015-11-30 stephen.d
106 4b33cdd0 2015-11-30 stephen.d ch.msize = msize
107 4b33cdd0 2015-11-30 stephen.d if msize < len(ch.rdbuf) {
108 4b33cdd0 2015-11-30 stephen.d // just change the cap
109 4b33cdd0 2015-11-30 stephen.d ch.rdbuf = ch.rdbuf[:msize]
110 4b33cdd0 2015-11-30 stephen.d return
111 4b33cdd0 2015-11-30 stephen.d }
112 4b33cdd0 2015-11-30 stephen.d
113 4b33cdd0 2015-11-30 stephen.d ch.rdbuf = make([]byte, msize)
114 4b33cdd0 2015-11-30 stephen.d }
115 4b33cdd0 2015-11-30 stephen.d
116 4b33cdd0 2015-11-30 stephen.d // ReadFcall reads the next message from the channel into fcall.
117 4b33cdd0 2015-11-30 stephen.d func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error {
118 4b33cdd0 2015-11-30 stephen.d select {
119 4b33cdd0 2015-11-30 stephen.d case <-ctx.Done():
120 4b33cdd0 2015-11-30 stephen.d return ctx.Err()
121 4b33cdd0 2015-11-30 stephen.d case <-ch.closed:
122 4b33cdd0 2015-11-30 stephen.d return ErrClosed
123 4b33cdd0 2015-11-30 stephen.d default:
124 4b33cdd0 2015-11-30 stephen.d }
125 4b33cdd0 2015-11-30 stephen.d
126 4b33cdd0 2015-11-30 stephen.d deadline, ok := ctx.Deadline()
127 4b33cdd0 2015-11-30 stephen.d if !ok {
128 4b33cdd0 2015-11-30 stephen.d deadline = time.Now().Add(defaultRWTimeout)
129 4b33cdd0 2015-11-30 stephen.d }
130 4b33cdd0 2015-11-30 stephen.d
131 4b33cdd0 2015-11-30 stephen.d if err := ch.conn.SetReadDeadline(deadline); err != nil {
132 4b33cdd0 2015-11-30 stephen.d log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
133 4b33cdd0 2015-11-30 stephen.d }
134 4b33cdd0 2015-11-30 stephen.d
135 4b33cdd0 2015-11-30 stephen.d n, err := readmsg(ch.brd, ch.rdbuf)
136 4b33cdd0 2015-11-30 stephen.d if err != nil {
137 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): There may be more we can do here to detect partial
138 4b33cdd0 2015-11-30 stephen.d // reads. For now, we just propagate the error untouched.
139 4b33cdd0 2015-11-30 stephen.d return err
140 4b33cdd0 2015-11-30 stephen.d }
141 4b33cdd0 2015-11-30 stephen.d
142 4b33cdd0 2015-11-30 stephen.d if n > len(ch.rdbuf) {
143 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): Make this error detectable and respond with error
144 4b33cdd0 2015-11-30 stephen.d // message.
145 a0568195 2016-05-23 stevvooe return fmt.Errorf("message too large for buffer: %v > %v ", n, len(ch.rdbuf))
146 4b33cdd0 2015-11-30 stephen.d }
147 4b33cdd0 2015-11-30 stephen.d
148 4b33cdd0 2015-11-30 stephen.d // clear out the fcall
149 4b33cdd0 2015-11-30 stephen.d *fcall = Fcall{}
150 4b33cdd0 2015-11-30 stephen.d if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
151 4b33cdd0 2015-11-30 stephen.d return err
152 4b33cdd0 2015-11-30 stephen.d }
153 4b33cdd0 2015-11-30 stephen.d
154 4b33cdd0 2015-11-30 stephen.d return nil
155 4b33cdd0 2015-11-30 stephen.d }
156 4b33cdd0 2015-11-30 stephen.d
157 4b33cdd0 2015-11-30 stephen.d func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error {
158 4b33cdd0 2015-11-30 stephen.d select {
159 4b33cdd0 2015-11-30 stephen.d case <-ctx.Done():
160 4b33cdd0 2015-11-30 stephen.d return ctx.Err()
161 4b33cdd0 2015-11-30 stephen.d case <-ch.closed:
162 4b33cdd0 2015-11-30 stephen.d return ErrClosed
163 4b33cdd0 2015-11-30 stephen.d default:
164 4b33cdd0 2015-11-30 stephen.d }
165 4b33cdd0 2015-11-30 stephen.d
166 4b33cdd0 2015-11-30 stephen.d deadline, ok := ctx.Deadline()
167 4b33cdd0 2015-11-30 stephen.d if !ok {
168 4b33cdd0 2015-11-30 stephen.d deadline = time.Now().Add(defaultRWTimeout)
169 4b33cdd0 2015-11-30 stephen.d }
170 4b33cdd0 2015-11-30 stephen.d
171 4b33cdd0 2015-11-30 stephen.d if err := ch.conn.SetWriteDeadline(deadline); err != nil {
172 4b33cdd0 2015-11-30 stephen.d log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
173 4b33cdd0 2015-11-30 stephen.d }
174 4b33cdd0 2015-11-30 stephen.d
175 4b33cdd0 2015-11-30 stephen.d p, err := ch.codec.Marshal(fcall)
176 4b33cdd0 2015-11-30 stephen.d if err != nil {
177 4b33cdd0 2015-11-30 stephen.d return err
178 4b33cdd0 2015-11-30 stephen.d }
179 4b33cdd0 2015-11-30 stephen.d
180 4b33cdd0 2015-11-30 stephen.d if err := sendmsg(ch.bwr, p); err != nil {
181 4b33cdd0 2015-11-30 stephen.d return err
182 4b33cdd0 2015-11-30 stephen.d }
183 4b33cdd0 2015-11-30 stephen.d
184 4b33cdd0 2015-11-30 stephen.d return ch.bwr.Flush()
185 4b33cdd0 2015-11-30 stephen.d }
186 4b33cdd0 2015-11-30 stephen.d
187 4b33cdd0 2015-11-30 stephen.d // readmsg reads a 9p message into p from rd, ensuring that all bytes are
188 4b33cdd0 2015-11-30 stephen.d // consumed from the size header. If the size header indicates the message is
189 4b33cdd0 2015-11-30 stephen.d // larger than p, the entire message will be discarded, leaving a truncated
190 4b33cdd0 2015-11-30 stephen.d // portion in p. Any error should be treated as a framing error unless n is
191 4b33cdd0 2015-11-30 stephen.d // zero. The caller must check that n is less than or equal to len(p) to
192 4b33cdd0 2015-11-30 stephen.d // ensure that a valid message has been read.
193 4b33cdd0 2015-11-30 stephen.d func readmsg(rd io.Reader, p []byte) (n int, err error) {
194 4b33cdd0 2015-11-30 stephen.d var msize uint32
195 4b33cdd0 2015-11-30 stephen.d
196 4b33cdd0 2015-11-30 stephen.d if err := binary.Read(rd, binary.LittleEndian, &msize); err != nil {
197 4b33cdd0 2015-11-30 stephen.d return 0, err
198 4b33cdd0 2015-11-30 stephen.d }
199 4b33cdd0 2015-11-30 stephen.d
200 4b33cdd0 2015-11-30 stephen.d n += binary.Size(msize)
201 4b33cdd0 2015-11-30 stephen.d mbody := int(msize) - 4
202 4b33cdd0 2015-11-30 stephen.d
203 4b33cdd0 2015-11-30 stephen.d if mbody < len(p) {
204 4b33cdd0 2015-11-30 stephen.d p = p[:mbody]
205 4b33cdd0 2015-11-30 stephen.d }
206 4b33cdd0 2015-11-30 stephen.d
207 4b33cdd0 2015-11-30 stephen.d np, err := io.ReadFull(rd, p)
208 4b33cdd0 2015-11-30 stephen.d if err != nil {
209 4b33cdd0 2015-11-30 stephen.d return np + n, err
210 4b33cdd0 2015-11-30 stephen.d }
211 4b33cdd0 2015-11-30 stephen.d n += np
212 4b33cdd0 2015-11-30 stephen.d
213 4b33cdd0 2015-11-30 stephen.d if mbody > len(p) {
214 4b33cdd0 2015-11-30 stephen.d // message has been read up to len(p) but we must consume the entire
215 4b33cdd0 2015-11-30 stephen.d // message. This is an error condition but is non-fatal if we can
216 4b33cdd0 2015-11-30 stephen.d // consume msize bytes.
217 4b33cdd0 2015-11-30 stephen.d nn, err := io.CopyN(ioutil.Discard, rd, int64(mbody-len(p)))
218 4b33cdd0 2015-11-30 stephen.d n += int(nn)
219 4b33cdd0 2015-11-30 stephen.d if err != nil {
220 4b33cdd0 2015-11-30 stephen.d return n, err
221 4b33cdd0 2015-11-30 stephen.d }
222 4b33cdd0 2015-11-30 stephen.d }
223 4b33cdd0 2015-11-30 stephen.d
224 4b33cdd0 2015-11-30 stephen.d return n, nil
225 4b33cdd0 2015-11-30 stephen.d }
226 4b33cdd0 2015-11-30 stephen.d
227 4b33cdd0 2015-11-30 stephen.d // sendmsg writes a message of len(p) to wr with a 9p size header. All errors
228 4b33cdd0 2015-11-30 stephen.d // should be considered terminal.
229 4b33cdd0 2015-11-30 stephen.d func sendmsg(wr io.Writer, p []byte) error {
230 4b33cdd0 2015-11-30 stephen.d size := uint32(len(p) + 4) // message size plus 4-bytes for size.
231 4b33cdd0 2015-11-30 stephen.d if err := binary.Write(wr, binary.LittleEndian, size); err != nil {
232 5eb5d0b1 2016-05-19 stevvooe return err
233 4b33cdd0 2015-11-30 stephen.d }
234 4b33cdd0 2015-11-30 stephen.d
235 4b33cdd0 2015-11-30 stephen.d // This assume partial writes to wr aren't possible. Not sure if this
236 4b33cdd0 2015-11-30 stephen.d // valid. Matters during timeout retries.
237 4b33cdd0 2015-11-30 stephen.d if n, err := wr.Write(p); err != nil {
238 4b33cdd0 2015-11-30 stephen.d return err
239 4b33cdd0 2015-11-30 stephen.d } else if n < len(p) {
240 4b33cdd0 2015-11-30 stephen.d return io.ErrShortWrite
241 4b33cdd0 2015-11-30 stephen.d }
242 4b33cdd0 2015-11-30 stephen.d
243 4b33cdd0 2015-11-30 stephen.d return nil
244 4b33cdd0 2015-11-30 stephen.d }