streams_map_test.go 14 KB


  1. package quic
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "net"
  7. "github.com/golang/mock/gomock"
  8. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  9. "github.com/lucas-clemente/quic-go/internal/handshake"
  10. "github.com/lucas-clemente/quic-go/internal/mocks"
  11. "github.com/lucas-clemente/quic-go/internal/protocol"
  12. "github.com/lucas-clemente/quic-go/internal/qerr"
  13. "github.com/lucas-clemente/quic-go/internal/wire"
  14. . "github.com/onsi/ginkgo"
  15. . "github.com/onsi/gomega"
  16. )
  17. type streamMapping struct {
  18. firstIncomingBidiStream protocol.StreamID
  19. firstIncomingUniStream protocol.StreamID
  20. firstOutgoingBidiStream protocol.StreamID
  21. firstOutgoingUniStream protocol.StreamID
  22. }
  23. func expectTooManyStreamsError(err error) {
  24. ExpectWithOffset(1, err).To(HaveOccurred())
  25. ExpectWithOffset(1, err.Error()).To(Equal(errTooManyOpenStreams.Error()))
  26. nerr, ok := err.(net.Error)
  27. ExpectWithOffset(1, ok).To(BeTrue())
  28. ExpectWithOffset(1, nerr.Temporary()).To(BeTrue())
  29. ExpectWithOffset(1, nerr.Timeout()).To(BeFalse())
  30. }
  31. var _ = Describe("Streams Map", func() {
  32. newFlowController := func(protocol.StreamID) flowcontrol.StreamFlowController {
  33. return mocks.NewMockStreamFlowController(mockCtrl)
  34. }
  35. serverStreamMapping := streamMapping{
  36. firstIncomingBidiStream: 0,
  37. firstOutgoingBidiStream: 1,
  38. firstIncomingUniStream: 2,
  39. firstOutgoingUniStream: 3,
  40. }
  41. clientStreamMapping := streamMapping{
  42. firstIncomingBidiStream: 1,
  43. firstOutgoingBidiStream: 0,
  44. firstIncomingUniStream: 3,
  45. firstOutgoingUniStream: 2,
  46. }
  47. for _, p := range []protocol.Perspective{protocol.PerspectiveServer, protocol.PerspectiveClient} {
  48. perspective := p
  49. var ids streamMapping
  50. if perspective == protocol.PerspectiveClient {
  51. ids = clientStreamMapping
  52. } else {
  53. ids = serverStreamMapping
  54. }
  55. Context(perspective.String(), func() {
  56. var (
  57. m *streamsMap
  58. mockSender *MockStreamSender
  59. )
  60. const (
  61. maxBidiStreams = 111
  62. maxUniStreams = 222
  63. )
  64. allowUnlimitedStreams := func() {
  65. m.UpdateLimits(&handshake.TransportParameters{
  66. MaxBidiStreams: math.MaxUint16,
  67. MaxUniStreams: math.MaxUint16,
  68. })
  69. }
  70. BeforeEach(func() {
  71. mockSender = NewMockStreamSender(mockCtrl)
  72. m = newStreamsMap(mockSender, newFlowController, maxBidiStreams, maxUniStreams, perspective, protocol.VersionWhatever).(*streamsMap)
  73. })
  74. Context("opening", func() {
  75. It("opens bidirectional streams", func() {
  76. allowUnlimitedStreams()
  77. str, err := m.OpenStream()
  78. Expect(err).ToNot(HaveOccurred())
  79. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  80. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  81. str, err = m.OpenStream()
  82. Expect(err).ToNot(HaveOccurred())
  83. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  84. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + 4))
  85. })
  86. It("opens unidirectional streams", func() {
  87. allowUnlimitedStreams()
  88. str, err := m.OpenUniStream()
  89. Expect(err).ToNot(HaveOccurred())
  90. Expect(str).To(BeAssignableToTypeOf(&sendStream{}))
  91. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  92. str, err = m.OpenUniStream()
  93. Expect(err).ToNot(HaveOccurred())
  94. Expect(str).To(BeAssignableToTypeOf(&sendStream{}))
  95. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + 4))
  96. })
  97. })
  98. Context("accepting", func() {
  99. It("accepts bidirectional streams", func() {
  100. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream)
  101. Expect(err).ToNot(HaveOccurred())
  102. str, err := m.AcceptStream()
  103. Expect(err).ToNot(HaveOccurred())
  104. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  105. Expect(str.StreamID()).To(Equal(ids.firstIncomingBidiStream))
  106. })
  107. It("accepts unidirectional streams", func() {
  108. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream)
  109. Expect(err).ToNot(HaveOccurred())
  110. str, err := m.AcceptUniStream()
  111. Expect(err).ToNot(HaveOccurred())
  112. Expect(str).To(BeAssignableToTypeOf(&receiveStream{}))
  113. Expect(str.StreamID()).To(Equal(ids.firstIncomingUniStream))
  114. })
  115. })
  116. Context("deleting", func() {
  117. BeforeEach(func() {
  118. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  119. allowUnlimitedStreams()
  120. })
  121. It("deletes outgoing bidirectional streams", func() {
  122. id := ids.firstOutgoingBidiStream
  123. str, err := m.OpenStream()
  124. Expect(err).ToNot(HaveOccurred())
  125. Expect(str.StreamID()).To(Equal(id))
  126. Expect(m.DeleteStream(id)).To(Succeed())
  127. dstr, err := m.GetOrOpenSendStream(id)
  128. Expect(err).ToNot(HaveOccurred())
  129. Expect(dstr).To(BeNil())
  130. })
  131. It("deletes incoming bidirectional streams", func() {
  132. id := ids.firstIncomingBidiStream
  133. str, err := m.GetOrOpenReceiveStream(id)
  134. Expect(err).ToNot(HaveOccurred())
  135. Expect(str.StreamID()).To(Equal(id))
  136. Expect(m.DeleteStream(id)).To(Succeed())
  137. dstr, err := m.GetOrOpenReceiveStream(id)
  138. Expect(err).ToNot(HaveOccurred())
  139. Expect(dstr).To(BeNil())
  140. })
  141. It("accepts bidirectional streams after they have been deleted", func() {
  142. id := ids.firstIncomingBidiStream
  143. _, err := m.GetOrOpenReceiveStream(id)
  144. Expect(err).ToNot(HaveOccurred())
  145. Expect(m.DeleteStream(id)).To(Succeed())
  146. str, err := m.AcceptStream()
  147. Expect(err).ToNot(HaveOccurred())
  148. Expect(str).ToNot(BeNil())
  149. Expect(str.StreamID()).To(Equal(id))
  150. })
  151. It("deletes outgoing unidirectional streams", func() {
  152. id := ids.firstOutgoingUniStream
  153. str, err := m.OpenUniStream()
  154. Expect(err).ToNot(HaveOccurred())
  155. Expect(str.StreamID()).To(Equal(id))
  156. Expect(m.DeleteStream(id)).To(Succeed())
  157. dstr, err := m.GetOrOpenSendStream(id)
  158. Expect(err).ToNot(HaveOccurred())
  159. Expect(dstr).To(BeNil())
  160. })
  161. It("deletes incoming unidirectional streams", func() {
  162. id := ids.firstIncomingUniStream
  163. str, err := m.GetOrOpenReceiveStream(id)
  164. Expect(err).ToNot(HaveOccurred())
  165. Expect(str.StreamID()).To(Equal(id))
  166. Expect(m.DeleteStream(id)).To(Succeed())
  167. dstr, err := m.GetOrOpenReceiveStream(id)
  168. Expect(err).ToNot(HaveOccurred())
  169. Expect(dstr).To(BeNil())
  170. })
  171. It("accepts unirectional streams after they have been deleted", func() {
  172. id := ids.firstIncomingUniStream
  173. _, err := m.GetOrOpenReceiveStream(id)
  174. Expect(err).ToNot(HaveOccurred())
  175. Expect(m.DeleteStream(id)).To(Succeed())
  176. str, err := m.AcceptUniStream()
  177. Expect(err).ToNot(HaveOccurred())
  178. Expect(str).ToNot(BeNil())
  179. Expect(str.StreamID()).To(Equal(id))
  180. })
  181. })
  182. Context("getting streams", func() {
  183. BeforeEach(func() {
  184. allowUnlimitedStreams()
  185. })
  186. Context("send streams", func() {
  187. It("gets an outgoing bidirectional stream", func() {
  188. // need to open the stream ourselves first
  189. // the peer is not allowed to create a stream initiated by us
  190. _, err := m.OpenStream()
  191. Expect(err).ToNot(HaveOccurred())
  192. str, err := m.GetOrOpenSendStream(ids.firstOutgoingBidiStream)
  193. Expect(err).ToNot(HaveOccurred())
  194. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  195. })
  196. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  197. id := ids.firstOutgoingBidiStream + 5*4
  198. _, err := m.GetOrOpenSendStream(id)
  199. Expect(err).To(MatchError(qerr.Error(qerr.InvalidStreamID, fmt.Sprintf("peer attempted to open stream %d", id))))
  200. })
  201. It("gets an outgoing unidirectional stream", func() {
  202. // need to open the stream ourselves first
  203. // the peer is not allowed to create a stream initiated by us
  204. _, err := m.OpenUniStream()
  205. Expect(err).ToNot(HaveOccurred())
  206. str, err := m.GetOrOpenSendStream(ids.firstOutgoingUniStream)
  207. Expect(err).ToNot(HaveOccurred())
  208. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  209. })
  210. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  211. id := ids.firstOutgoingUniStream + 5*4
  212. _, err := m.GetOrOpenSendStream(id)
  213. Expect(err).To(MatchError(qerr.Error(qerr.InvalidStreamID, fmt.Sprintf("peer attempted to open stream %d", id))))
  214. })
  215. It("gets an incoming bidirectional stream", func() {
  216. id := ids.firstIncomingBidiStream + 4*7
  217. str, err := m.GetOrOpenSendStream(id)
  218. Expect(err).ToNot(HaveOccurred())
  219. Expect(str.StreamID()).To(Equal(id))
  220. })
  221. It("errors when trying to get an incoming unidirectional stream", func() {
  222. id := ids.firstIncomingUniStream
  223. _, err := m.GetOrOpenSendStream(id)
  224. Expect(err).To(MatchError(fmt.Errorf("peer attempted to open send stream %d", id)))
  225. })
  226. })
  227. Context("receive streams", func() {
  228. It("gets an outgoing bidirectional stream", func() {
  229. // need to open the stream ourselves first
  230. // the peer is not allowed to create a stream initiated by us
  231. _, err := m.OpenStream()
  232. Expect(err).ToNot(HaveOccurred())
  233. str, err := m.GetOrOpenReceiveStream(ids.firstOutgoingBidiStream)
  234. Expect(err).ToNot(HaveOccurred())
  235. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  236. })
  237. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  238. id := ids.firstOutgoingBidiStream + 5*4
  239. _, err := m.GetOrOpenReceiveStream(id)
  240. Expect(err).To(MatchError(qerr.Error(qerr.InvalidStreamID, fmt.Sprintf("peer attempted to open stream %d", id))))
  241. })
  242. It("gets an incoming bidirectional stream", func() {
  243. id := ids.firstIncomingBidiStream + 4*7
  244. str, err := m.GetOrOpenReceiveStream(id)
  245. Expect(err).ToNot(HaveOccurred())
  246. Expect(str.StreamID()).To(Equal(id))
  247. })
  248. It("gets an incoming unidirectional stream", func() {
  249. id := ids.firstIncomingUniStream + 4*10
  250. str, err := m.GetOrOpenReceiveStream(id)
  251. Expect(err).ToNot(HaveOccurred())
  252. Expect(str.StreamID()).To(Equal(id))
  253. })
  254. It("errors when trying to get an outgoing unidirectional stream", func() {
  255. id := ids.firstOutgoingUniStream
  256. _, err := m.GetOrOpenReceiveStream(id)
  257. Expect(err).To(MatchError(fmt.Errorf("peer attempted to open receive stream %d", id)))
  258. })
  259. })
  260. })
  261. Context("updating stream ID limits", func() {
  262. BeforeEach(func() {
  263. mockSender.EXPECT().queueControlFrame(gomock.Any())
  264. })
  265. It("processes the parameter for outgoing streams, as a server", func() {
  266. m.perspective = protocol.PerspectiveServer
  267. _, err := m.OpenStream()
  268. expectTooManyStreamsError(err)
  269. m.UpdateLimits(&handshake.TransportParameters{
  270. MaxBidiStreams: 5,
  271. MaxUniStreams: 5,
  272. })
  273. Expect(m.outgoingBidiStreams.maxStream).To(Equal(protocol.StreamID(17)))
  274. Expect(m.outgoingUniStreams.maxStream).To(Equal(protocol.StreamID(19)))
  275. })
  276. It("processes the parameter for outgoing streams, as a client", func() {
  277. m.perspective = protocol.PerspectiveClient
  278. _, err := m.OpenUniStream()
  279. expectTooManyStreamsError(err)
  280. m.UpdateLimits(&handshake.TransportParameters{
  281. MaxBidiStreams: 5,
  282. MaxUniStreams: 5,
  283. })
  284. Expect(m.outgoingBidiStreams.maxStream).To(Equal(protocol.StreamID(16)))
  285. Expect(m.outgoingUniStreams.maxStream).To(Equal(protocol.StreamID(18)))
  286. })
  287. })
  288. Context("handling MAX_STREAMS frames", func() {
  289. BeforeEach(func() {
  290. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  291. })
  292. It("processes IDs for outgoing bidirectional streams", func() {
  293. _, err := m.OpenStream()
  294. expectTooManyStreamsError(err)
  295. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  296. Type: protocol.StreamTypeBidi,
  297. MaxStreams: 1,
  298. })).To(Succeed())
  299. str, err := m.OpenStream()
  300. Expect(err).ToNot(HaveOccurred())
  301. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  302. _, err = m.OpenStream()
  303. expectTooManyStreamsError(err)
  304. })
  305. It("processes IDs for outgoing unidirectional streams", func() {
  306. _, err := m.OpenUniStream()
  307. expectTooManyStreamsError(err)
  308. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  309. Type: protocol.StreamTypeUni,
  310. MaxStreams: 1,
  311. })).To(Succeed())
  312. str, err := m.OpenUniStream()
  313. Expect(err).ToNot(HaveOccurred())
  314. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  315. _, err = m.OpenUniStream()
  316. expectTooManyStreamsError(err)
  317. })
  318. })
  319. Context("sending MAX_STREAMS frames", func() {
  320. It("sends a MAX_STREAMS frame for bidirectional streams", func() {
  321. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream)
  322. Expect(err).ToNot(HaveOccurred())
  323. _, err = m.AcceptStream()
  324. Expect(err).ToNot(HaveOccurred())
  325. mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{
  326. Type: protocol.StreamTypeBidi,
  327. MaxStreams: maxBidiStreams + 1,
  328. })
  329. Expect(m.DeleteStream(ids.firstIncomingBidiStream)).To(Succeed())
  330. })
  331. It("sends a MAX_STREAMS frame for unidirectional streams", func() {
  332. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream)
  333. Expect(err).ToNot(HaveOccurred())
  334. _, err = m.AcceptUniStream()
  335. Expect(err).ToNot(HaveOccurred())
  336. mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{
  337. Type: protocol.StreamTypeUni,
  338. MaxStreams: maxUniStreams + 1,
  339. })
  340. Expect(m.DeleteStream(ids.firstIncomingUniStream)).To(Succeed())
  341. })
  342. })
  343. It("closes", func() {
  344. testErr := errors.New("test error")
  345. m.CloseWithError(testErr)
  346. _, err := m.OpenStream()
  347. Expect(err).To(HaveOccurred())
  348. Expect(err.Error()).To(Equal(testErr.Error()))
  349. _, err = m.OpenUniStream()
  350. Expect(err).To(HaveOccurred())
  351. Expect(err.Error()).To(Equal(testErr.Error()))
  352. _, err = m.AcceptStream()
  353. Expect(err).To(HaveOccurred())
  354. Expect(err.Error()).To(Equal(testErr.Error()))
  355. _, err = m.AcceptUniStream()
  356. Expect(err).To(HaveOccurred())
  357. Expect(err.Error()).To(Equal(testErr.Error()))
  358. })
  359. })
  360. }
  361. })