streams_map_outgoing_generic_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package quic
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/golang/mock/gomock"
  6. "github.com/lucas-clemente/quic-go/internal/protocol"
  7. "github.com/lucas-clemente/quic-go/internal/wire"
  8. . "github.com/onsi/ginkgo"
  9. . "github.com/onsi/gomega"
  10. )
  11. var _ = Describe("Streams Map (outgoing)", func() {
  12. var (
  13. m *outgoingItemsMap
  14. newItem func(num protocol.StreamNum) item
  15. mockSender *MockStreamSender
  16. )
  17. BeforeEach(func() {
  18. newItem = func(num protocol.StreamNum) item {
  19. return &mockGenericStream{num: num}
  20. }
  21. mockSender = NewMockStreamSender(mockCtrl)
  22. m = newOutgoingItemsMap(newItem, mockSender.queueControlFrame)
  23. })
  24. Context("no stream ID limit", func() {
  25. BeforeEach(func() {
  26. m.SetMaxStream(0xffffffff)
  27. })
  28. It("opens streams", func() {
  29. str, err := m.OpenStream()
  30. Expect(err).ToNot(HaveOccurred())
  31. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  32. str, err = m.OpenStream()
  33. Expect(err).ToNot(HaveOccurred())
  34. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2)))
  35. })
  36. It("doesn't open streams after it has been closed", func() {
  37. testErr := errors.New("close")
  38. m.CloseWithError(testErr)
  39. _, err := m.OpenStream()
  40. Expect(err).To(MatchError(testErr))
  41. })
  42. It("gets streams", func() {
  43. _, err := m.OpenStream()
  44. Expect(err).ToNot(HaveOccurred())
  45. str, err := m.GetStream(1)
  46. Expect(err).ToNot(HaveOccurred())
  47. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  48. })
  49. It("errors when trying to get a stream that has not yet been opened", func() {
  50. _, err := m.GetStream(1)
  51. Expect(err).To(HaveOccurred())
  52. Expect(err.(streamError).TestError()).To(MatchError("peer attempted to open stream 1"))
  53. })
  54. It("deletes streams", func() {
  55. _, err := m.OpenStream()
  56. Expect(err).ToNot(HaveOccurred())
  57. Expect(m.DeleteStream(1)).To(Succeed())
  58. Expect(err).ToNot(HaveOccurred())
  59. str, err := m.GetStream(1)
  60. Expect(err).ToNot(HaveOccurred())
  61. Expect(str).To(BeNil())
  62. })
  63. It("errors when deleting a non-existing stream", func() {
  64. err := m.DeleteStream(1337)
  65. Expect(err).To(HaveOccurred())
  66. Expect(err.(streamError).TestError()).To(MatchError("Tried to delete unknown stream 1337"))
  67. })
  68. It("errors when deleting a stream twice", func() {
  69. _, err := m.OpenStream() // opens firstNewStream
  70. Expect(err).ToNot(HaveOccurred())
  71. Expect(m.DeleteStream(1)).To(Succeed())
  72. err = m.DeleteStream(1)
  73. Expect(err).To(HaveOccurred())
  74. Expect(err.(streamError).TestError()).To(MatchError("Tried to delete unknown stream 1"))
  75. })
  76. It("closes all streams when CloseWithError is called", func() {
  77. str1, err := m.OpenStream()
  78. Expect(err).ToNot(HaveOccurred())
  79. str2, err := m.OpenStream()
  80. Expect(err).ToNot(HaveOccurred())
  81. testErr := errors.New("test err")
  82. m.CloseWithError(testErr)
  83. Expect(str1.(*mockGenericStream).closed).To(BeTrue())
  84. Expect(str1.(*mockGenericStream).closeErr).To(MatchError(testErr))
  85. Expect(str2.(*mockGenericStream).closed).To(BeTrue())
  86. Expect(str2.(*mockGenericStream).closeErr).To(MatchError(testErr))
  87. })
  88. })
  89. Context("with stream ID limits", func() {
  90. It("errors when no stream can be opened immediately", func() {
  91. mockSender.EXPECT().queueControlFrame(gomock.Any())
  92. _, err := m.OpenStream()
  93. expectTooManyStreamsError(err)
  94. })
  95. It("returns immediately when called with a canceled context", func() {
  96. ctx, cancel := context.WithCancel(context.Background())
  97. cancel()
  98. _, err := m.OpenStreamSync(ctx)
  99. Expect(err).To(MatchError("context canceled"))
  100. })
  101. It("blocks until a stream can be opened synchronously", func() {
  102. mockSender.EXPECT().queueControlFrame(gomock.Any())
  103. done := make(chan struct{})
  104. go func() {
  105. defer GinkgoRecover()
  106. str, err := m.OpenStreamSync(context.Background())
  107. Expect(err).ToNot(HaveOccurred())
  108. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  109. close(done)
  110. }()
  111. Consistently(done).ShouldNot(BeClosed())
  112. m.SetMaxStream(1)
  113. Eventually(done).Should(BeClosed())
  114. })
  115. It("unblocks when the context is canceled", func() {
  116. mockSender.EXPECT().queueControlFrame(gomock.Any())
  117. ctx, cancel := context.WithCancel(context.Background())
  118. done := make(chan struct{})
  119. go func() {
  120. defer GinkgoRecover()
  121. _, err := m.OpenStreamSync(ctx)
  122. Expect(err).To(MatchError("context canceled"))
  123. close(done)
  124. }()
  125. Consistently(done).ShouldNot(BeClosed())
  126. cancel()
  127. Eventually(done).Should(BeClosed())
  128. // make sure that the next stream openend is stream 1
  129. m.SetMaxStream(1000)
  130. str, err := m.OpenStream()
  131. Expect(err).ToNot(HaveOccurred())
  132. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  133. })
  134. It("opens streams in the right order", func() {
  135. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  136. done1 := make(chan struct{})
  137. go func() {
  138. defer GinkgoRecover()
  139. str, err := m.OpenStreamSync(context.Background())
  140. Expect(err).ToNot(HaveOccurred())
  141. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  142. close(done1)
  143. }()
  144. Consistently(done1).ShouldNot(BeClosed())
  145. done2 := make(chan struct{})
  146. go func() {
  147. defer GinkgoRecover()
  148. str, err := m.OpenStreamSync(context.Background())
  149. Expect(err).ToNot(HaveOccurred())
  150. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2)))
  151. close(done2)
  152. }()
  153. Consistently(done2).ShouldNot(BeClosed())
  154. m.SetMaxStream(1)
  155. Eventually(done1).Should(BeClosed())
  156. Consistently(done2).ShouldNot(BeClosed())
  157. m.SetMaxStream(2)
  158. Eventually(done2).Should(BeClosed())
  159. })
  160. It("unblocks multiple OpenStreamSync calls at the same time", func() {
  161. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  162. done := make(chan struct{})
  163. go func() {
  164. defer GinkgoRecover()
  165. _, err := m.OpenStreamSync(context.Background())
  166. Expect(err).ToNot(HaveOccurred())
  167. done <- struct{}{}
  168. }()
  169. go func() {
  170. defer GinkgoRecover()
  171. _, err := m.OpenStreamSync(context.Background())
  172. Expect(err).ToNot(HaveOccurred())
  173. done <- struct{}{}
  174. }()
  175. Consistently(done).ShouldNot(Receive())
  176. go func() {
  177. defer GinkgoRecover()
  178. _, err := m.OpenStreamSync(context.Background())
  179. Expect(err).To(MatchError("test done"))
  180. done <- struct{}{}
  181. }()
  182. Consistently(done).ShouldNot(Receive())
  183. m.SetMaxStream(2)
  184. Eventually(done).Should(Receive())
  185. Eventually(done).Should(Receive())
  186. Consistently(done).ShouldNot(Receive())
  187. m.CloseWithError(errors.New("test done"))
  188. Eventually(done).Should(Receive())
  189. })
  190. It("returns an error for OpenStream while an OpenStreamSync call is blocking", func() {
  191. mockSender.EXPECT().queueControlFrame(gomock.Any()).MaxTimes(2)
  192. openedSync := make(chan struct{})
  193. go func() {
  194. defer GinkgoRecover()
  195. str, err := m.OpenStreamSync(context.Background())
  196. Expect(err).ToNot(HaveOccurred())
  197. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  198. close(openedSync)
  199. }()
  200. Consistently(openedSync).ShouldNot(BeClosed())
  201. start := make(chan struct{})
  202. openend := make(chan struct{})
  203. go func() {
  204. defer GinkgoRecover()
  205. var hasStarted bool
  206. for {
  207. str, err := m.OpenStream()
  208. if err == nil {
  209. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2)))
  210. close(openend)
  211. return
  212. }
  213. expectTooManyStreamsError(err)
  214. if !hasStarted {
  215. close(start)
  216. hasStarted = true
  217. }
  218. }
  219. }()
  220. Eventually(start).Should(BeClosed())
  221. m.SetMaxStream(1)
  222. Eventually(openedSync).Should(BeClosed())
  223. Consistently(openend).ShouldNot(BeClosed())
  224. m.SetMaxStream(2)
  225. Eventually(openend).Should(BeClosed())
  226. })
  227. It("stops opening synchronously when it is closed", func() {
  228. mockSender.EXPECT().queueControlFrame(gomock.Any())
  229. testErr := errors.New("test error")
  230. done := make(chan struct{})
  231. go func() {
  232. defer GinkgoRecover()
  233. _, err := m.OpenStreamSync(context.Background())
  234. Expect(err).To(MatchError(testErr))
  235. close(done)
  236. }()
  237. Consistently(done).ShouldNot(BeClosed())
  238. m.CloseWithError(testErr)
  239. Eventually(done).Should(BeClosed())
  240. })
  241. It("doesn't reduce the stream limit", func() {
  242. m.SetMaxStream(2)
  243. m.SetMaxStream(1)
  244. _, err := m.OpenStream()
  245. Expect(err).ToNot(HaveOccurred())
  246. str, err := m.OpenStream()
  247. Expect(err).ToNot(HaveOccurred())
  248. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2)))
  249. })
  250. It("queues a STREAM_ID_BLOCKED frame if no stream can be opened", func() {
  251. m.SetMaxStream(6)
  252. // open the 6 allowed streams
  253. for i := 0; i < 6; i++ {
  254. _, err := m.OpenStream()
  255. Expect(err).ToNot(HaveOccurred())
  256. }
  257. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  258. Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(6))
  259. })
  260. _, err := m.OpenStream()
  261. Expect(err).To(HaveOccurred())
  262. Expect(err.Error()).To(Equal(errTooManyOpenStreams.Error()))
  263. })
  264. It("only sends one STREAM_ID_BLOCKED frame for one stream ID", func() {
  265. m.SetMaxStream(1)
  266. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  267. Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1))
  268. })
  269. _, err := m.OpenStream()
  270. Expect(err).ToNot(HaveOccurred())
  271. // try to open a stream twice, but expect only one STREAM_ID_BLOCKED to be sent
  272. _, err = m.OpenStream()
  273. expectTooManyStreamsError(err)
  274. _, err = m.OpenStream()
  275. expectTooManyStreamsError(err)
  276. })
  277. })
  278. })