streams_map_incoming_generic_test.go 7.0 KB


  1. package quic
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/golang/mock/gomock"
  6. "github.com/lucas-clemente/quic-go/internal/protocol"
  7. "github.com/lucas-clemente/quic-go/internal/wire"
  8. . "github.com/onsi/ginkgo"
  9. . "github.com/onsi/gomega"
  10. )
  11. type mockGenericStream struct {
  12. num protocol.StreamNum
  13. closed bool
  14. closeErr error
  15. }
  16. func (s *mockGenericStream) closeForShutdown(err error) {
  17. s.closed = true
  18. s.closeErr = err
  19. }
  20. var _ = Describe("Streams Map (incoming)", func() {
  21. const (
  22. maxNumStreams uint64 = 5
  23. )
  24. var (
  25. m *incomingItemsMap
  26. newItemCounter int
  27. mockSender *MockStreamSender
  28. )
  29. BeforeEach(func() {
  30. newItemCounter = 0
  31. mockSender = NewMockStreamSender(mockCtrl)
  32. m = newIncomingItemsMap(
  33. func(num protocol.StreamNum) item {
  34. newItemCounter++
  35. return &mockGenericStream{num: num}
  36. },
  37. maxNumStreams,
  38. mockSender.queueControlFrame,
  39. )
  40. })
  41. It("opens all streams up to the id on GetOrOpenStream", func() {
  42. _, err := m.GetOrOpenStream(4)
  43. Expect(err).ToNot(HaveOccurred())
  44. Expect(newItemCounter).To(Equal(4))
  45. })
  46. It("starts opening streams at the right position", func() {
  47. // like the test above, but with 2 calls to GetOrOpenStream
  48. _, err := m.GetOrOpenStream(2)
  49. Expect(err).ToNot(HaveOccurred())
  50. Expect(newItemCounter).To(Equal(2))
  51. _, err = m.GetOrOpenStream(5)
  52. Expect(err).ToNot(HaveOccurred())
  53. Expect(newItemCounter).To(Equal(5))
  54. })
  55. It("accepts streams in the right order", func() {
  56. _, err := m.GetOrOpenStream(2) // open streams 1 and 2
  57. Expect(err).ToNot(HaveOccurred())
  58. str, err := m.AcceptStream(context.Background())
  59. Expect(err).ToNot(HaveOccurred())
  60. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  61. str, err = m.AcceptStream(context.Background())
  62. Expect(err).ToNot(HaveOccurred())
  63. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2)))
  64. })
  65. It("allows opening the maximum stream ID", func() {
  66. str, err := m.GetOrOpenStream(1)
  67. Expect(err).ToNot(HaveOccurred())
  68. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  69. })
  70. It("errors when trying to get a stream ID higher than the maximum", func() {
  71. _, err := m.GetOrOpenStream(6)
  72. Expect(err).To(HaveOccurred())
  73. Expect(err.(streamError).TestError()).To(MatchError("peer tried to open stream 6 (current limit: 5)"))
  74. })
  75. It("blocks AcceptStream until a new stream is available", func() {
  76. strChan := make(chan item)
  77. go func() {
  78. defer GinkgoRecover()
  79. str, err := m.AcceptStream(context.Background())
  80. Expect(err).ToNot(HaveOccurred())
  81. strChan <- str
  82. }()
  83. Consistently(strChan).ShouldNot(Receive())
  84. str, err := m.GetOrOpenStream(1)
  85. Expect(err).ToNot(HaveOccurred())
  86. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  87. var acceptedStr item
  88. Eventually(strChan).Should(Receive(&acceptedStr))
  89. Expect(acceptedStr.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  90. })
  91. It("unblocks AcceptStream when the context is canceled", func() {
  92. ctx, cancel := context.WithCancel(context.Background())
  93. done := make(chan struct{})
  94. go func() {
  95. defer GinkgoRecover()
  96. _, err := m.AcceptStream(ctx)
  97. Expect(err).To(MatchError("context canceled"))
  98. close(done)
  99. }()
  100. Consistently(done).ShouldNot(BeClosed())
  101. cancel()
  102. Eventually(done).Should(BeClosed())
  103. })
  104. It("unblocks AcceptStream when it is closed", func() {
  105. testErr := errors.New("test error")
  106. done := make(chan struct{})
  107. go func() {
  108. defer GinkgoRecover()
  109. _, err := m.AcceptStream(context.Background())
  110. Expect(err).To(MatchError(testErr))
  111. close(done)
  112. }()
  113. Consistently(done).ShouldNot(BeClosed())
  114. m.CloseWithError(testErr)
  115. Eventually(done).Should(BeClosed())
  116. })
  117. It("errors AcceptStream immediately if it is closed", func() {
  118. testErr := errors.New("test error")
  119. m.CloseWithError(testErr)
  120. _, err := m.AcceptStream(context.Background())
  121. Expect(err).To(MatchError(testErr))
  122. })
  123. It("closes all streams when CloseWithError is called", func() {
  124. str1, err := m.GetOrOpenStream(1)
  125. Expect(err).ToNot(HaveOccurred())
  126. str2, err := m.GetOrOpenStream(3)
  127. Expect(err).ToNot(HaveOccurred())
  128. testErr := errors.New("test err")
  129. m.CloseWithError(testErr)
  130. Expect(str1.(*mockGenericStream).closed).To(BeTrue())
  131. Expect(str1.(*mockGenericStream).closeErr).To(MatchError(testErr))
  132. Expect(str2.(*mockGenericStream).closed).To(BeTrue())
  133. Expect(str2.(*mockGenericStream).closeErr).To(MatchError(testErr))
  134. })
  135. It("deletes streams", func() {
  136. mockSender.EXPECT().queueControlFrame(gomock.Any())
  137. _, err := m.GetOrOpenStream(1)
  138. Expect(err).ToNot(HaveOccurred())
  139. str, err := m.AcceptStream(context.Background())
  140. Expect(err).ToNot(HaveOccurred())
  141. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  142. Expect(m.DeleteStream(1)).To(Succeed())
  143. str, err = m.GetOrOpenStream(1)
  144. Expect(err).ToNot(HaveOccurred())
  145. Expect(str).To(BeNil())
  146. })
  147. It("waits until a stream is accepted before actually deleting it", func() {
  148. _, err := m.GetOrOpenStream(2)
  149. Expect(err).ToNot(HaveOccurred())
  150. Expect(m.DeleteStream(2)).To(Succeed())
  151. str, err := m.AcceptStream(context.Background())
  152. Expect(err).ToNot(HaveOccurred())
  153. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
  154. // when accepting this stream, it will get deleted, and a MAX_STREAMS frame is queued
  155. mockSender.EXPECT().queueControlFrame(gomock.Any())
  156. str, err = m.AcceptStream(context.Background())
  157. Expect(err).ToNot(HaveOccurred())
  158. Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2)))
  159. })
  160. It("doesn't return a stream queued for deleting from GetOrOpenStream", func() {
  161. str, err := m.GetOrOpenStream(1)
  162. Expect(err).ToNot(HaveOccurred())
  163. Expect(str).ToNot(BeNil())
  164. Expect(m.DeleteStream(1)).To(Succeed())
  165. str, err = m.GetOrOpenStream(1)
  166. Expect(err).ToNot(HaveOccurred())
  167. Expect(str).To(BeNil())
  168. // when accepting this stream, it will get deleted, and a MAX_STREAMS frame is queued
  169. mockSender.EXPECT().queueControlFrame(gomock.Any())
  170. str, err = m.AcceptStream(context.Background())
  171. Expect(err).ToNot(HaveOccurred())
  172. Expect(str).ToNot(BeNil())
  173. })
  174. It("errors when deleting a non-existing stream", func() {
  175. err := m.DeleteStream(1337)
  176. Expect(err).To(HaveOccurred())
  177. Expect(err.(streamError).TestError()).To(MatchError("Tried to delete unknown stream 1337"))
  178. })
  179. It("sends MAX_STREAMS frames when streams are deleted", func() {
  180. // open a bunch of streams
  181. _, err := m.GetOrOpenStream(5)
  182. Expect(err).ToNot(HaveOccurred())
  183. // accept all streams
  184. for i := 0; i < 5; i++ {
  185. _, err := m.AcceptStream(context.Background())
  186. Expect(err).ToNot(HaveOccurred())
  187. }
  188. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  189. Expect(f.(*wire.MaxStreamsFrame).MaxStreamNum).To(Equal(protocol.StreamNum(maxNumStreams + 1)))
  190. })
  191. Expect(m.DeleteStream(3)).To(Succeed())
  192. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  193. Expect(f.(*wire.MaxStreamsFrame).MaxStreamNum).To(Equal(protocol.StreamNum(maxNumStreams + 2)))
  194. })
  195. Expect(m.DeleteStream(4)).To(Succeed())
  196. })
  197. })