streams_map_test.go 16 KB


  1. package quic
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "github.com/golang/mock/gomock"
  8. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  9. "github.com/lucas-clemente/quic-go/internal/handshake"
  10. "github.com/lucas-clemente/quic-go/internal/mocks"
  11. "github.com/lucas-clemente/quic-go/internal/protocol"
  12. "github.com/lucas-clemente/quic-go/internal/qerr"
  13. "github.com/lucas-clemente/quic-go/internal/wire"
  14. . "github.com/onsi/ginkgo"
  15. . "github.com/onsi/gomega"
  16. )
  17. func (e streamError) TestError() error {
  18. nums := make([]interface{}, len(e.nums))
  19. for i, num := range e.nums {
  20. nums[i] = num
  21. }
  22. return fmt.Errorf(e.message, nums...)
  23. }
  24. type streamMapping struct {
  25. firstIncomingBidiStream protocol.StreamID
  26. firstIncomingUniStream protocol.StreamID
  27. firstOutgoingBidiStream protocol.StreamID
  28. firstOutgoingUniStream protocol.StreamID
  29. }
  30. func expectTooManyStreamsError(err error) {
  31. ExpectWithOffset(1, err).To(HaveOccurred())
  32. ExpectWithOffset(1, err.Error()).To(Equal(errTooManyOpenStreams.Error()))
  33. nerr, ok := err.(net.Error)
  34. ExpectWithOffset(1, ok).To(BeTrue())
  35. ExpectWithOffset(1, nerr.Temporary()).To(BeTrue())
  36. ExpectWithOffset(1, nerr.Timeout()).To(BeFalse())
  37. }
  38. var _ = Describe("Streams Map", func() {
  39. newFlowController := func(protocol.StreamID) flowcontrol.StreamFlowController {
  40. return mocks.NewMockStreamFlowController(mockCtrl)
  41. }
  42. serverStreamMapping := streamMapping{
  43. firstIncomingBidiStream: 0,
  44. firstOutgoingBidiStream: 1,
  45. firstIncomingUniStream: 2,
  46. firstOutgoingUniStream: 3,
  47. }
  48. clientStreamMapping := streamMapping{
  49. firstIncomingBidiStream: 1,
  50. firstOutgoingBidiStream: 0,
  51. firstIncomingUniStream: 3,
  52. firstOutgoingUniStream: 2,
  53. }
  54. for _, p := range []protocol.Perspective{protocol.PerspectiveServer, protocol.PerspectiveClient} {
  55. perspective := p
  56. var ids streamMapping
  57. if perspective == protocol.PerspectiveClient {
  58. ids = clientStreamMapping
  59. } else {
  60. ids = serverStreamMapping
  61. }
  62. Context(perspective.String(), func() {
  63. var (
  64. m *streamsMap
  65. mockSender *MockStreamSender
  66. )
  67. const (
  68. MaxBidiStreamNum = 111
  69. MaxUniStreamNum = 222
  70. )
  71. allowUnlimitedStreams := func() {
  72. m.UpdateLimits(&handshake.TransportParameters{
  73. MaxBidiStreamNum: protocol.MaxStreamCount,
  74. MaxUniStreamNum: protocol.MaxStreamCount,
  75. })
  76. }
  77. BeforeEach(func() {
  78. mockSender = NewMockStreamSender(mockCtrl)
  79. m = newStreamsMap(mockSender, newFlowController, MaxBidiStreamNum, MaxUniStreamNum, perspective, protocol.VersionWhatever).(*streamsMap)
  80. })
  81. Context("opening", func() {
  82. It("opens bidirectional streams", func() {
  83. allowUnlimitedStreams()
  84. str, err := m.OpenStream()
  85. Expect(err).ToNot(HaveOccurred())
  86. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  87. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  88. str, err = m.OpenStream()
  89. Expect(err).ToNot(HaveOccurred())
  90. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  91. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + 4))
  92. })
  93. It("opens unidirectional streams", func() {
  94. allowUnlimitedStreams()
  95. str, err := m.OpenUniStream()
  96. Expect(err).ToNot(HaveOccurred())
  97. Expect(str).To(BeAssignableToTypeOf(&sendStream{}))
  98. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  99. str, err = m.OpenUniStream()
  100. Expect(err).ToNot(HaveOccurred())
  101. Expect(str).To(BeAssignableToTypeOf(&sendStream{}))
  102. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + 4))
  103. })
  104. })
  105. Context("accepting", func() {
  106. It("accepts bidirectional streams", func() {
  107. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream)
  108. Expect(err).ToNot(HaveOccurred())
  109. str, err := m.AcceptStream(context.Background())
  110. Expect(err).ToNot(HaveOccurred())
  111. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  112. Expect(str.StreamID()).To(Equal(ids.firstIncomingBidiStream))
  113. })
  114. It("accepts unidirectional streams", func() {
  115. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream)
  116. Expect(err).ToNot(HaveOccurred())
  117. str, err := m.AcceptUniStream(context.Background())
  118. Expect(err).ToNot(HaveOccurred())
  119. Expect(str).To(BeAssignableToTypeOf(&receiveStream{}))
  120. Expect(str.StreamID()).To(Equal(ids.firstIncomingUniStream))
  121. })
  122. })
  123. Context("deleting", func() {
  124. BeforeEach(func() {
  125. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  126. allowUnlimitedStreams()
  127. })
  128. It("deletes outgoing bidirectional streams", func() {
  129. id := ids.firstOutgoingBidiStream
  130. str, err := m.OpenStream()
  131. Expect(err).ToNot(HaveOccurred())
  132. Expect(str.StreamID()).To(Equal(id))
  133. Expect(m.DeleteStream(id)).To(Succeed())
  134. dstr, err := m.GetOrOpenSendStream(id)
  135. Expect(err).ToNot(HaveOccurred())
  136. Expect(dstr).To(BeNil())
  137. })
  138. It("deletes incoming bidirectional streams", func() {
  139. id := ids.firstIncomingBidiStream
  140. str, err := m.GetOrOpenReceiveStream(id)
  141. Expect(err).ToNot(HaveOccurred())
  142. Expect(str.StreamID()).To(Equal(id))
  143. Expect(m.DeleteStream(id)).To(Succeed())
  144. dstr, err := m.GetOrOpenReceiveStream(id)
  145. Expect(err).ToNot(HaveOccurred())
  146. Expect(dstr).To(BeNil())
  147. })
  148. It("accepts bidirectional streams after they have been deleted", func() {
  149. id := ids.firstIncomingBidiStream
  150. _, err := m.GetOrOpenReceiveStream(id)
  151. Expect(err).ToNot(HaveOccurred())
  152. Expect(m.DeleteStream(id)).To(Succeed())
  153. str, err := m.AcceptStream(context.Background())
  154. Expect(err).ToNot(HaveOccurred())
  155. Expect(str).ToNot(BeNil())
  156. Expect(str.StreamID()).To(Equal(id))
  157. })
  158. It("deletes outgoing unidirectional streams", func() {
  159. id := ids.firstOutgoingUniStream
  160. str, err := m.OpenUniStream()
  161. Expect(err).ToNot(HaveOccurred())
  162. Expect(str.StreamID()).To(Equal(id))
  163. Expect(m.DeleteStream(id)).To(Succeed())
  164. dstr, err := m.GetOrOpenSendStream(id)
  165. Expect(err).ToNot(HaveOccurred())
  166. Expect(dstr).To(BeNil())
  167. })
  168. It("deletes incoming unidirectional streams", func() {
  169. id := ids.firstIncomingUniStream
  170. str, err := m.GetOrOpenReceiveStream(id)
  171. Expect(err).ToNot(HaveOccurred())
  172. Expect(str.StreamID()).To(Equal(id))
  173. Expect(m.DeleteStream(id)).To(Succeed())
  174. dstr, err := m.GetOrOpenReceiveStream(id)
  175. Expect(err).ToNot(HaveOccurred())
  176. Expect(dstr).To(BeNil())
  177. })
  178. It("accepts unirectional streams after they have been deleted", func() {
  179. id := ids.firstIncomingUniStream
  180. _, err := m.GetOrOpenReceiveStream(id)
  181. Expect(err).ToNot(HaveOccurred())
  182. Expect(m.DeleteStream(id)).To(Succeed())
  183. str, err := m.AcceptUniStream(context.Background())
  184. Expect(err).ToNot(HaveOccurred())
  185. Expect(str).ToNot(BeNil())
  186. Expect(str.StreamID()).To(Equal(id))
  187. })
  188. It("errors when deleting unknown incoming unidirectional streams", func() {
  189. id := ids.firstIncomingUniStream + 4
  190. Expect(m.DeleteStream(id)).To(MatchError(fmt.Sprintf("Tried to delete unknown incoming stream %d", id)))
  191. })
  192. It("errors when deleting unknown outgoing unidirectional streams", func() {
  193. id := ids.firstOutgoingUniStream + 4
  194. Expect(m.DeleteStream(id)).To(MatchError(fmt.Sprintf("Tried to delete unknown outgoing stream %d", id)))
  195. })
  196. It("errors when deleting unknown incoming bidirectional streams", func() {
  197. id := ids.firstIncomingBidiStream + 4
  198. Expect(m.DeleteStream(id)).To(MatchError(fmt.Sprintf("Tried to delete unknown incoming stream %d", id)))
  199. })
  200. It("errors when deleting unknown outgoing bidirectional streams", func() {
  201. id := ids.firstOutgoingBidiStream + 4
  202. Expect(m.DeleteStream(id)).To(MatchError(fmt.Sprintf("Tried to delete unknown outgoing stream %d", id)))
  203. })
  204. })
  205. Context("getting streams", func() {
  206. BeforeEach(func() {
  207. allowUnlimitedStreams()
  208. })
  209. Context("send streams", func() {
  210. It("gets an outgoing bidirectional stream", func() {
  211. // need to open the stream ourselves first
  212. // the peer is not allowed to create a stream initiated by us
  213. _, err := m.OpenStream()
  214. Expect(err).ToNot(HaveOccurred())
  215. str, err := m.GetOrOpenSendStream(ids.firstOutgoingBidiStream)
  216. Expect(err).ToNot(HaveOccurred())
  217. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  218. })
  219. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  220. id := ids.firstOutgoingBidiStream + 5*4
  221. _, err := m.GetOrOpenSendStream(id)
  222. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open stream %d", id)))
  223. })
  224. It("gets an outgoing unidirectional stream", func() {
  225. // need to open the stream ourselves first
  226. // the peer is not allowed to create a stream initiated by us
  227. _, err := m.OpenUniStream()
  228. Expect(err).ToNot(HaveOccurred())
  229. str, err := m.GetOrOpenSendStream(ids.firstOutgoingUniStream)
  230. Expect(err).ToNot(HaveOccurred())
  231. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  232. })
  233. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  234. id := ids.firstOutgoingUniStream + 5*4
  235. _, err := m.GetOrOpenSendStream(id)
  236. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open stream %d", id)))
  237. })
  238. It("gets an incoming bidirectional stream", func() {
  239. id := ids.firstIncomingBidiStream + 4*7
  240. str, err := m.GetOrOpenSendStream(id)
  241. Expect(err).ToNot(HaveOccurred())
  242. Expect(str.StreamID()).To(Equal(id))
  243. })
  244. It("errors when trying to get an incoming unidirectional stream", func() {
  245. id := ids.firstIncomingUniStream
  246. _, err := m.GetOrOpenSendStream(id)
  247. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open send stream %d", id)))
  248. })
  249. })
  250. Context("receive streams", func() {
  251. It("gets an outgoing bidirectional stream", func() {
  252. // need to open the stream ourselves first
  253. // the peer is not allowed to create a stream initiated by us
  254. _, err := m.OpenStream()
  255. Expect(err).ToNot(HaveOccurred())
  256. str, err := m.GetOrOpenReceiveStream(ids.firstOutgoingBidiStream)
  257. Expect(err).ToNot(HaveOccurred())
  258. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  259. })
  260. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  261. id := ids.firstOutgoingBidiStream + 5*4
  262. _, err := m.GetOrOpenReceiveStream(id)
  263. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open stream %d", id)))
  264. })
  265. It("gets an incoming bidirectional stream", func() {
  266. id := ids.firstIncomingBidiStream + 4*7
  267. str, err := m.GetOrOpenReceiveStream(id)
  268. Expect(err).ToNot(HaveOccurred())
  269. Expect(str.StreamID()).To(Equal(id))
  270. })
  271. It("gets an incoming unidirectional stream", func() {
  272. id := ids.firstIncomingUniStream + 4*10
  273. str, err := m.GetOrOpenReceiveStream(id)
  274. Expect(err).ToNot(HaveOccurred())
  275. Expect(str.StreamID()).To(Equal(id))
  276. })
  277. It("errors when trying to get an outgoing unidirectional stream", func() {
  278. id := ids.firstOutgoingUniStream
  279. _, err := m.GetOrOpenReceiveStream(id)
  280. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open receive stream %d", id)))
  281. })
  282. })
  283. })
  284. Context("updating stream ID limits", func() {
  285. for _, p := range []protocol.Perspective{protocol.PerspectiveClient, protocol.PerspectiveServer} {
  286. pers := p
  287. It(fmt.Sprintf("processes the parameter for outgoing streams, as a %s", pers), func() {
  288. mockSender.EXPECT().queueControlFrame(gomock.Any())
  289. m.perspective = pers
  290. _, err := m.OpenStream()
  291. expectTooManyStreamsError(err)
  292. Expect(m.UpdateLimits(&handshake.TransportParameters{
  293. MaxBidiStreamNum: 5,
  294. MaxUniStreamNum: 8,
  295. })).To(Succeed())
  296. mockSender.EXPECT().queueControlFrame(gomock.Any()).Times(2)
  297. // test we can only 5 bidirectional streams
  298. for i := 0; i < 5; i++ {
  299. str, err := m.OpenStream()
  300. Expect(err).ToNot(HaveOccurred())
  301. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + protocol.StreamID(4*i)))
  302. }
  303. _, err = m.OpenStream()
  304. expectTooManyStreamsError(err)
  305. // test we can only 8 unidirectional streams
  306. for i := 0; i < 8; i++ {
  307. str, err := m.OpenUniStream()
  308. Expect(err).ToNot(HaveOccurred())
  309. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + protocol.StreamID(4*i)))
  310. }
  311. _, err = m.OpenUniStream()
  312. expectTooManyStreamsError(err)
  313. })
  314. }
  315. It("rejects parameters with too large unidirectional stream counts", func() {
  316. Expect(m.UpdateLimits(&handshake.TransportParameters{
  317. MaxUniStreamNum: protocol.MaxStreamCount + 1,
  318. })).To(MatchError(qerr.StreamLimitError))
  319. })
  320. It("rejects parameters with too large unidirectional stream counts", func() {
  321. Expect(m.UpdateLimits(&handshake.TransportParameters{
  322. MaxBidiStreamNum: protocol.MaxStreamCount + 1,
  323. })).To(MatchError(qerr.StreamLimitError))
  324. })
  325. })
  326. Context("handling MAX_STREAMS frames", func() {
  327. BeforeEach(func() {
  328. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  329. })
  330. It("processes IDs for outgoing bidirectional streams", func() {
  331. _, err := m.OpenStream()
  332. expectTooManyStreamsError(err)
  333. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  334. Type: protocol.StreamTypeBidi,
  335. MaxStreamNum: 1,
  336. })).To(Succeed())
  337. str, err := m.OpenStream()
  338. Expect(err).ToNot(HaveOccurred())
  339. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  340. _, err = m.OpenStream()
  341. expectTooManyStreamsError(err)
  342. })
  343. It("processes IDs for outgoing unidirectional streams", func() {
  344. _, err := m.OpenUniStream()
  345. expectTooManyStreamsError(err)
  346. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  347. Type: protocol.StreamTypeUni,
  348. MaxStreamNum: 1,
  349. })).To(Succeed())
  350. str, err := m.OpenUniStream()
  351. Expect(err).ToNot(HaveOccurred())
  352. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  353. _, err = m.OpenUniStream()
  354. expectTooManyStreamsError(err)
  355. })
  356. })
  357. Context("sending MAX_STREAMS frames", func() {
  358. It("sends a MAX_STREAMS frame for bidirectional streams", func() {
  359. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream)
  360. Expect(err).ToNot(HaveOccurred())
  361. _, err = m.AcceptStream(context.Background())
  362. Expect(err).ToNot(HaveOccurred())
  363. mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{
  364. Type: protocol.StreamTypeBidi,
  365. MaxStreamNum: MaxBidiStreamNum + 1,
  366. })
  367. Expect(m.DeleteStream(ids.firstIncomingBidiStream)).To(Succeed())
  368. })
  369. It("sends a MAX_STREAMS frame for unidirectional streams", func() {
  370. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream)
  371. Expect(err).ToNot(HaveOccurred())
  372. _, err = m.AcceptUniStream(context.Background())
  373. Expect(err).ToNot(HaveOccurred())
  374. mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{
  375. Type: protocol.StreamTypeUni,
  376. MaxStreamNum: MaxUniStreamNum + 1,
  377. })
  378. Expect(m.DeleteStream(ids.firstIncomingUniStream)).To(Succeed())
  379. })
  380. })
  381. It("closes", func() {
  382. testErr := errors.New("test error")
  383. m.CloseWithError(testErr)
  384. _, err := m.OpenStream()
  385. Expect(err).To(HaveOccurred())
  386. Expect(err.Error()).To(Equal(testErr.Error()))
  387. _, err = m.OpenUniStream()
  388. Expect(err).To(HaveOccurred())
  389. Expect(err.Error()).To(Equal(testErr.Error()))
  390. _, err = m.AcceptStream(context.Background())
  391. Expect(err).To(HaveOccurred())
  392. Expect(err.Error()).To(Equal(testErr.Error()))
  393. _, err = m.AcceptUniStream(context.Background())
  394. Expect(err).To(HaveOccurred())
  395. Expect(err.Error()).To(Equal(testErr.Error()))
  396. })
  397. })
  398. }
  399. })