streams_map_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. package quic
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "github.com/golang/mock/gomock"
  8. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  9. "github.com/lucas-clemente/quic-go/internal/handshake"
  10. "github.com/lucas-clemente/quic-go/internal/mocks"
  11. "github.com/lucas-clemente/quic-go/internal/protocol"
  12. "github.com/lucas-clemente/quic-go/internal/qerr"
  13. "github.com/lucas-clemente/quic-go/internal/wire"
  14. . "github.com/onsi/ginkgo"
  15. . "github.com/onsi/gomega"
  16. )
  17. func (e streamError) TestError() error {
  18. nums := make([]interface{}, len(e.nums))
  19. for i, num := range e.nums {
  20. nums[i] = num
  21. }
  22. return fmt.Errorf(e.message, nums...)
  23. }
  24. type streamMapping struct {
  25. firstIncomingBidiStream protocol.StreamID
  26. firstIncomingUniStream protocol.StreamID
  27. firstOutgoingBidiStream protocol.StreamID
  28. firstOutgoingUniStream protocol.StreamID
  29. }
  30. func expectTooManyStreamsError(err error) {
  31. ExpectWithOffset(1, err).To(HaveOccurred())
  32. ExpectWithOffset(1, err.Error()).To(Equal(errTooManyOpenStreams.Error()))
  33. nerr, ok := err.(net.Error)
  34. ExpectWithOffset(1, ok).To(BeTrue())
  35. ExpectWithOffset(1, nerr.Temporary()).To(BeTrue())
  36. ExpectWithOffset(1, nerr.Timeout()).To(BeFalse())
  37. }
  38. var _ = Describe("Streams Map", func() {
  39. newFlowController := func(protocol.StreamID) flowcontrol.StreamFlowController {
  40. return mocks.NewMockStreamFlowController(mockCtrl)
  41. }
  42. serverStreamMapping := streamMapping{
  43. firstIncomingBidiStream: 0,
  44. firstOutgoingBidiStream: 1,
  45. firstIncomingUniStream: 2,
  46. firstOutgoingUniStream: 3,
  47. }
  48. clientStreamMapping := streamMapping{
  49. firstIncomingBidiStream: 1,
  50. firstOutgoingBidiStream: 0,
  51. firstIncomingUniStream: 3,
  52. firstOutgoingUniStream: 2,
  53. }
  54. for _, p := range []protocol.Perspective{protocol.PerspectiveServer, protocol.PerspectiveClient} {
  55. perspective := p
  56. var ids streamMapping
  57. if perspective == protocol.PerspectiveClient {
  58. ids = clientStreamMapping
  59. } else {
  60. ids = serverStreamMapping
  61. }
  62. Context(perspective.String(), func() {
  63. var (
  64. m *streamsMap
  65. mockSender *MockStreamSender
  66. )
  67. const (
  68. MaxBidiStreamNum = 111
  69. MaxUniStreamNum = 222
  70. )
  71. allowUnlimitedStreams := func() {
  72. m.UpdateLimits(&handshake.TransportParameters{
  73. MaxBidiStreamNum: protocol.MaxStreamCount,
  74. MaxUniStreamNum: protocol.MaxStreamCount,
  75. })
  76. }
  77. BeforeEach(func() {
  78. mockSender = NewMockStreamSender(mockCtrl)
  79. m = newStreamsMap(mockSender, newFlowController, MaxBidiStreamNum, MaxUniStreamNum, perspective, protocol.VersionWhatever).(*streamsMap)
  80. })
  81. Context("opening", func() {
  82. It("opens bidirectional streams", func() {
  83. allowUnlimitedStreams()
  84. str, err := m.OpenStream()
  85. Expect(err).ToNot(HaveOccurred())
  86. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  87. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  88. str, err = m.OpenStream()
  89. Expect(err).ToNot(HaveOccurred())
  90. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  91. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + 4))
  92. })
  93. It("opens unidirectional streams", func() {
  94. allowUnlimitedStreams()
  95. str, err := m.OpenUniStream()
  96. Expect(err).ToNot(HaveOccurred())
  97. Expect(str).To(BeAssignableToTypeOf(&sendStream{}))
  98. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  99. str, err = m.OpenUniStream()
  100. Expect(err).ToNot(HaveOccurred())
  101. Expect(str).To(BeAssignableToTypeOf(&sendStream{}))
  102. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + 4))
  103. })
  104. })
  105. Context("accepting", func() {
  106. It("accepts bidirectional streams", func() {
  107. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream)
  108. Expect(err).ToNot(HaveOccurred())
  109. str, err := m.AcceptStream(context.Background())
  110. Expect(err).ToNot(HaveOccurred())
  111. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  112. Expect(str.StreamID()).To(Equal(ids.firstIncomingBidiStream))
  113. })
  114. It("accepts unidirectional streams", func() {
  115. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream)
  116. Expect(err).ToNot(HaveOccurred())
  117. str, err := m.AcceptUniStream(context.Background())
  118. Expect(err).ToNot(HaveOccurred())
  119. Expect(str).To(BeAssignableToTypeOf(&receiveStream{}))
  120. Expect(str.StreamID()).To(Equal(ids.firstIncomingUniStream))
  121. })
  122. })
  123. Context("deleting", func() {
  124. BeforeEach(func() {
  125. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  126. allowUnlimitedStreams()
  127. })
  128. It("deletes outgoing bidirectional streams", func() {
  129. id := ids.firstOutgoingBidiStream
  130. str, err := m.OpenStream()
  131. Expect(err).ToNot(HaveOccurred())
  132. Expect(str.StreamID()).To(Equal(id))
  133. Expect(m.DeleteStream(id)).To(Succeed())
  134. dstr, err := m.GetOrOpenSendStream(id)
  135. Expect(err).ToNot(HaveOccurred())
  136. Expect(dstr).To(BeNil())
  137. })
  138. It("deletes incoming bidirectional streams", func() {
  139. id := ids.firstIncomingBidiStream
  140. str, err := m.GetOrOpenReceiveStream(id)
  141. Expect(err).ToNot(HaveOccurred())
  142. Expect(str.StreamID()).To(Equal(id))
  143. Expect(m.DeleteStream(id)).To(Succeed())
  144. dstr, err := m.GetOrOpenReceiveStream(id)
  145. Expect(err).ToNot(HaveOccurred())
  146. Expect(dstr).To(BeNil())
  147. })
  148. It("accepts bidirectional streams after they have been deleted", func() {
  149. id := ids.firstIncomingBidiStream
  150. _, err := m.GetOrOpenReceiveStream(id)
  151. Expect(err).ToNot(HaveOccurred())
  152. Expect(m.DeleteStream(id)).To(Succeed())
  153. str, err := m.AcceptStream(context.Background())
  154. Expect(err).ToNot(HaveOccurred())
  155. Expect(str).ToNot(BeNil())
  156. Expect(str.StreamID()).To(Equal(id))
  157. })
  158. It("deletes outgoing unidirectional streams", func() {
  159. id := ids.firstOutgoingUniStream
  160. str, err := m.OpenUniStream()
  161. Expect(err).ToNot(HaveOccurred())
  162. Expect(str.StreamID()).To(Equal(id))
  163. Expect(m.DeleteStream(id)).To(Succeed())
  164. dstr, err := m.GetOrOpenSendStream(id)
  165. Expect(err).ToNot(HaveOccurred())
  166. Expect(dstr).To(BeNil())
  167. })
  168. It("deletes incoming unidirectional streams", func() {
  169. id := ids.firstIncomingUniStream
  170. str, err := m.GetOrOpenReceiveStream(id)
  171. Expect(err).ToNot(HaveOccurred())
  172. Expect(str.StreamID()).To(Equal(id))
  173. Expect(m.DeleteStream(id)).To(Succeed())
  174. dstr, err := m.GetOrOpenReceiveStream(id)
  175. Expect(err).ToNot(HaveOccurred())
  176. Expect(dstr).To(BeNil())
  177. })
  178. It("accepts unirectional streams after they have been deleted", func() {
  179. id := ids.firstIncomingUniStream
  180. _, err := m.GetOrOpenReceiveStream(id)
  181. Expect(err).ToNot(HaveOccurred())
  182. Expect(m.DeleteStream(id)).To(Succeed())
  183. str, err := m.AcceptUniStream(context.Background())
  184. Expect(err).ToNot(HaveOccurred())
  185. Expect(str).ToNot(BeNil())
  186. Expect(str.StreamID()).To(Equal(id))
  187. })
  188. })
  189. Context("getting streams", func() {
  190. BeforeEach(func() {
  191. allowUnlimitedStreams()
  192. })
  193. Context("send streams", func() {
  194. It("gets an outgoing bidirectional stream", func() {
  195. // need to open the stream ourselves first
  196. // the peer is not allowed to create a stream initiated by us
  197. _, err := m.OpenStream()
  198. Expect(err).ToNot(HaveOccurred())
  199. str, err := m.GetOrOpenSendStream(ids.firstOutgoingBidiStream)
  200. Expect(err).ToNot(HaveOccurred())
  201. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  202. })
  203. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  204. id := ids.firstOutgoingBidiStream + 5*4
  205. _, err := m.GetOrOpenSendStream(id)
  206. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open stream %d", id)))
  207. })
  208. It("gets an outgoing unidirectional stream", func() {
  209. // need to open the stream ourselves first
  210. // the peer is not allowed to create a stream initiated by us
  211. _, err := m.OpenUniStream()
  212. Expect(err).ToNot(HaveOccurred())
  213. str, err := m.GetOrOpenSendStream(ids.firstOutgoingUniStream)
  214. Expect(err).ToNot(HaveOccurred())
  215. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  216. })
  217. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  218. id := ids.firstOutgoingUniStream + 5*4
  219. _, err := m.GetOrOpenSendStream(id)
  220. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open stream %d", id)))
  221. })
  222. It("gets an incoming bidirectional stream", func() {
  223. id := ids.firstIncomingBidiStream + 4*7
  224. str, err := m.GetOrOpenSendStream(id)
  225. Expect(err).ToNot(HaveOccurred())
  226. Expect(str.StreamID()).To(Equal(id))
  227. })
  228. It("errors when trying to get an incoming unidirectional stream", func() {
  229. id := ids.firstIncomingUniStream
  230. _, err := m.GetOrOpenSendStream(id)
  231. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open send stream %d", id)))
  232. })
  233. })
  234. Context("receive streams", func() {
  235. It("gets an outgoing bidirectional stream", func() {
  236. // need to open the stream ourselves first
  237. // the peer is not allowed to create a stream initiated by us
  238. _, err := m.OpenStream()
  239. Expect(err).ToNot(HaveOccurred())
  240. str, err := m.GetOrOpenReceiveStream(ids.firstOutgoingBidiStream)
  241. Expect(err).ToNot(HaveOccurred())
  242. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  243. })
  244. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  245. id := ids.firstOutgoingBidiStream + 5*4
  246. _, err := m.GetOrOpenReceiveStream(id)
  247. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open stream %d", id)))
  248. })
  249. It("gets an incoming bidirectional stream", func() {
  250. id := ids.firstIncomingBidiStream + 4*7
  251. str, err := m.GetOrOpenReceiveStream(id)
  252. Expect(err).ToNot(HaveOccurred())
  253. Expect(str.StreamID()).To(Equal(id))
  254. })
  255. It("gets an incoming unidirectional stream", func() {
  256. id := ids.firstIncomingUniStream + 4*10
  257. str, err := m.GetOrOpenReceiveStream(id)
  258. Expect(err).ToNot(HaveOccurred())
  259. Expect(str.StreamID()).To(Equal(id))
  260. })
  261. It("errors when trying to get an outgoing unidirectional stream", func() {
  262. id := ids.firstOutgoingUniStream
  263. _, err := m.GetOrOpenReceiveStream(id)
  264. Expect(err).To(MatchError(fmt.Sprintf("STREAM_STATE_ERROR: peer attempted to open receive stream %d", id)))
  265. })
  266. })
  267. })
  268. Context("updating stream ID limits", func() {
  269. for _, p := range []protocol.Perspective{protocol.PerspectiveClient, protocol.PerspectiveServer} {
  270. pers := p
  271. It(fmt.Sprintf("processes the parameter for outgoing streams, as a %s", pers), func() {
  272. mockSender.EXPECT().queueControlFrame(gomock.Any())
  273. m.perspective = pers
  274. _, err := m.OpenStream()
  275. expectTooManyStreamsError(err)
  276. Expect(m.UpdateLimits(&handshake.TransportParameters{
  277. MaxBidiStreamNum: 5,
  278. MaxUniStreamNum: 8,
  279. })).To(Succeed())
  280. mockSender.EXPECT().queueControlFrame(gomock.Any()).Times(2)
  281. // test we can only 5 bidirectional streams
  282. for i := 0; i < 5; i++ {
  283. str, err := m.OpenStream()
  284. Expect(err).ToNot(HaveOccurred())
  285. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + protocol.StreamID(4*i)))
  286. }
  287. _, err = m.OpenStream()
  288. expectTooManyStreamsError(err)
  289. // test we can only 8 unidirectional streams
  290. for i := 0; i < 8; i++ {
  291. str, err := m.OpenUniStream()
  292. Expect(err).ToNot(HaveOccurred())
  293. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + protocol.StreamID(4*i)))
  294. }
  295. _, err = m.OpenUniStream()
  296. expectTooManyStreamsError(err)
  297. })
  298. }
  299. It("rejects parameters with too large unidirectional stream counts", func() {
  300. Expect(m.UpdateLimits(&handshake.TransportParameters{
  301. MaxUniStreamNum: protocol.MaxStreamCount + 1,
  302. })).To(MatchError(qerr.StreamLimitError))
  303. })
  304. It("rejects parameters with too large unidirectional stream counts", func() {
  305. Expect(m.UpdateLimits(&handshake.TransportParameters{
  306. MaxBidiStreamNum: protocol.MaxStreamCount + 1,
  307. })).To(MatchError(qerr.StreamLimitError))
  308. })
  309. })
  310. Context("handling MAX_STREAMS frames", func() {
  311. BeforeEach(func() {
  312. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  313. })
  314. It("processes IDs for outgoing bidirectional streams", func() {
  315. _, err := m.OpenStream()
  316. expectTooManyStreamsError(err)
  317. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  318. Type: protocol.StreamTypeBidi,
  319. MaxStreamNum: 1,
  320. })).To(Succeed())
  321. str, err := m.OpenStream()
  322. Expect(err).ToNot(HaveOccurred())
  323. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  324. _, err = m.OpenStream()
  325. expectTooManyStreamsError(err)
  326. })
  327. It("processes IDs for outgoing unidirectional streams", func() {
  328. _, err := m.OpenUniStream()
  329. expectTooManyStreamsError(err)
  330. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  331. Type: protocol.StreamTypeUni,
  332. MaxStreamNum: 1,
  333. })).To(Succeed())
  334. str, err := m.OpenUniStream()
  335. Expect(err).ToNot(HaveOccurred())
  336. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  337. _, err = m.OpenUniStream()
  338. expectTooManyStreamsError(err)
  339. })
  340. It("rejects MAX_STREAMS frames with too large values", func() {
  341. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  342. Type: protocol.StreamTypeBidi,
  343. MaxStreamNum: protocol.MaxStreamCount + 1,
  344. })).To(MatchError(qerr.StreamLimitError))
  345. })
  346. })
  347. Context("sending MAX_STREAMS frames", func() {
  348. It("sends a MAX_STREAMS frame for bidirectional streams", func() {
  349. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream)
  350. Expect(err).ToNot(HaveOccurred())
  351. _, err = m.AcceptStream(context.Background())
  352. Expect(err).ToNot(HaveOccurred())
  353. mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{
  354. Type: protocol.StreamTypeBidi,
  355. MaxStreamNum: MaxBidiStreamNum + 1,
  356. })
  357. Expect(m.DeleteStream(ids.firstIncomingBidiStream)).To(Succeed())
  358. })
  359. It("sends a MAX_STREAMS frame for unidirectional streams", func() {
  360. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream)
  361. Expect(err).ToNot(HaveOccurred())
  362. _, err = m.AcceptUniStream(context.Background())
  363. Expect(err).ToNot(HaveOccurred())
  364. mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{
  365. Type: protocol.StreamTypeUni,
  366. MaxStreamNum: MaxUniStreamNum + 1,
  367. })
  368. Expect(m.DeleteStream(ids.firstIncomingUniStream)).To(Succeed())
  369. })
  370. })
  371. It("closes", func() {
  372. testErr := errors.New("test error")
  373. m.CloseWithError(testErr)
  374. _, err := m.OpenStream()
  375. Expect(err).To(HaveOccurred())
  376. Expect(err.Error()).To(Equal(testErr.Error()))
  377. _, err = m.OpenUniStream()
  378. Expect(err).To(HaveOccurred())
  379. Expect(err.Error()).To(Equal(testErr.Error()))
  380. _, err = m.AcceptStream(context.Background())
  381. Expect(err).To(HaveOccurred())
  382. Expect(err.Error()).To(Equal(testErr.Error()))
  383. _, err = m.AcceptUniStream(context.Background())
  384. Expect(err).To(HaveOccurred())
  385. Expect(err.Error()).To(Equal(testErr.Error()))
  386. })
  387. })
  388. }
  389. })