streams_map_incoming_generic_test.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package quic
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/golang/mock/gomock"
  6. "github.com/lucas-clemente/quic-go/internal/protocol"
  7. "github.com/lucas-clemente/quic-go/internal/wire"
  8. . "github.com/onsi/ginkgo"
  9. . "github.com/onsi/gomega"
  10. )
  11. type mockGenericStream struct {
  12. id protocol.StreamID
  13. closed bool
  14. closeErr error
  15. }
  16. func (s *mockGenericStream) closeForShutdown(err error) {
  17. s.closed = true
  18. s.closeErr = err
  19. }
  20. var _ = Describe("Streams Map (incoming)", func() {
  21. const (
  22. firstNewStream protocol.StreamID = 2
  23. maxNumStreams uint64 = 5
  24. initialMaxStream protocol.StreamID = firstNewStream + 4*protocol.StreamID(maxNumStreams-1)
  25. )
  26. var (
  27. m *incomingItemsMap
  28. newItem func(id protocol.StreamID) item
  29. newItemCounter int
  30. mockSender *MockStreamSender
  31. )
  32. BeforeEach(func() {
  33. newItemCounter = 0
  34. newItem = func(id protocol.StreamID) item {
  35. newItemCounter++
  36. return &mockGenericStream{id: id}
  37. }
  38. mockSender = NewMockStreamSender(mockCtrl)
  39. m = newIncomingItemsMap(firstNewStream, initialMaxStream, maxNumStreams, mockSender.queueControlFrame, newItem)
  40. })
  41. It("opens all streams up to the id on GetOrOpenStream", func() {
  42. _, err := m.GetOrOpenStream(firstNewStream + 4*4)
  43. Expect(err).ToNot(HaveOccurred())
  44. Expect(newItemCounter).To(Equal(5))
  45. })
  46. It("starts opening streams at the right position", func() {
  47. // like the test above, but with 2 calls to GetOrOpenStream
  48. _, err := m.GetOrOpenStream(firstNewStream + 4)
  49. Expect(err).ToNot(HaveOccurred())
  50. Expect(newItemCounter).To(Equal(2))
  51. _, err = m.GetOrOpenStream(firstNewStream + 4*4)
  52. Expect(err).ToNot(HaveOccurred())
  53. Expect(newItemCounter).To(Equal(5))
  54. })
  55. It("accepts streams in the right order", func() {
  56. _, err := m.GetOrOpenStream(firstNewStream + 4) // open stream 20 and 24
  57. Expect(err).ToNot(HaveOccurred())
  58. str, err := m.AcceptStream()
  59. Expect(err).ToNot(HaveOccurred())
  60. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  61. str, err = m.AcceptStream()
  62. Expect(err).ToNot(HaveOccurred())
  63. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream + 4))
  64. })
  65. It("allows opening the maximum stream ID", func() {
  66. str, err := m.GetOrOpenStream(initialMaxStream)
  67. Expect(err).ToNot(HaveOccurred())
  68. Expect(str.(*mockGenericStream).id).To(Equal(initialMaxStream))
  69. })
  70. It("errors when trying to get a stream ID higher than the maximum", func() {
  71. _, err := m.GetOrOpenStream(initialMaxStream + 4)
  72. Expect(err).To(MatchError(fmt.Errorf("peer tried to open stream %d (current limit: %d)", initialMaxStream+4, initialMaxStream)))
  73. })
  74. It("blocks AcceptStream until a new stream is available", func() {
  75. strChan := make(chan item)
  76. go func() {
  77. defer GinkgoRecover()
  78. str, err := m.AcceptStream()
  79. Expect(err).ToNot(HaveOccurred())
  80. strChan <- str
  81. }()
  82. Consistently(strChan).ShouldNot(Receive())
  83. str, err := m.GetOrOpenStream(firstNewStream)
  84. Expect(err).ToNot(HaveOccurred())
  85. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  86. var acceptedStr item
  87. Eventually(strChan).Should(Receive(&acceptedStr))
  88. Expect(acceptedStr.(*mockGenericStream).id).To(Equal(firstNewStream))
  89. })
  90. It("works with stream 0", func() {
  91. m = newIncomingItemsMap(0, 1000, 1000, mockSender.queueControlFrame, newItem)
  92. strChan := make(chan item)
  93. go func() {
  94. defer GinkgoRecover()
  95. str, err := m.AcceptStream()
  96. Expect(err).ToNot(HaveOccurred())
  97. strChan <- str
  98. }()
  99. Consistently(strChan).ShouldNot(Receive())
  100. str, err := m.GetOrOpenStream(0)
  101. Expect(err).ToNot(HaveOccurred())
  102. Expect(str.(*mockGenericStream).id).To(BeZero())
  103. var acceptedStr item
  104. Eventually(strChan).Should(Receive(&acceptedStr))
  105. Expect(acceptedStr.(*mockGenericStream).id).To(BeZero())
  106. })
  107. It("unblocks AcceptStream when it is closed", func() {
  108. testErr := errors.New("test error")
  109. done := make(chan struct{})
  110. go func() {
  111. defer GinkgoRecover()
  112. _, err := m.AcceptStream()
  113. Expect(err).To(MatchError(testErr))
  114. close(done)
  115. }()
  116. Consistently(done).ShouldNot(BeClosed())
  117. m.CloseWithError(testErr)
  118. Eventually(done).Should(BeClosed())
  119. })
  120. It("errors AcceptStream immediately if it is closed", func() {
  121. testErr := errors.New("test error")
  122. m.CloseWithError(testErr)
  123. _, err := m.AcceptStream()
  124. Expect(err).To(MatchError(testErr))
  125. })
  126. It("closes all streams when CloseWithError is called", func() {
  127. str1, err := m.GetOrOpenStream(firstNewStream)
  128. Expect(err).ToNot(HaveOccurred())
  129. str2, err := m.GetOrOpenStream(firstNewStream + 8)
  130. Expect(err).ToNot(HaveOccurred())
  131. testErr := errors.New("test err")
  132. m.CloseWithError(testErr)
  133. Expect(str1.(*mockGenericStream).closed).To(BeTrue())
  134. Expect(str1.(*mockGenericStream).closeErr).To(MatchError(testErr))
  135. Expect(str2.(*mockGenericStream).closed).To(BeTrue())
  136. Expect(str2.(*mockGenericStream).closeErr).To(MatchError(testErr))
  137. })
  138. It("deletes streams", func() {
  139. mockSender.EXPECT().queueControlFrame(gomock.Any())
  140. _, err := m.GetOrOpenStream(firstNewStream)
  141. Expect(err).ToNot(HaveOccurred())
  142. str, err := m.AcceptStream()
  143. Expect(err).ToNot(HaveOccurred())
  144. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  145. Expect(m.DeleteStream(firstNewStream)).To(Succeed())
  146. str, err = m.GetOrOpenStream(firstNewStream)
  147. Expect(err).ToNot(HaveOccurred())
  148. Expect(str).To(BeNil())
  149. })
  150. It("waits until a stream is accepted before actually deleting it", func() {
  151. _, err := m.GetOrOpenStream(firstNewStream + 4)
  152. Expect(err).ToNot(HaveOccurred())
  153. Expect(m.DeleteStream(firstNewStream + 4)).To(Succeed())
  154. str, err := m.AcceptStream()
  155. Expect(err).ToNot(HaveOccurred())
  156. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  157. // when accepting this stream, it will get deleted, and a MAX_STREAMS frame is queued
  158. mockSender.EXPECT().queueControlFrame(gomock.Any())
  159. str, err = m.AcceptStream()
  160. Expect(err).ToNot(HaveOccurred())
  161. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream + 4))
  162. })
  163. It("doesn't return a stream queued for deleting from GetOrOpenStream", func() {
  164. str, err := m.GetOrOpenStream(firstNewStream)
  165. Expect(err).ToNot(HaveOccurred())
  166. Expect(str).ToNot(BeNil())
  167. Expect(m.DeleteStream(firstNewStream)).To(Succeed())
  168. str, err = m.GetOrOpenStream(firstNewStream)
  169. Expect(err).ToNot(HaveOccurred())
  170. Expect(str).To(BeNil())
  171. // when accepting this stream, it will get deleted, and a MAX_STREAMS frame is queued
  172. mockSender.EXPECT().queueControlFrame(gomock.Any())
  173. str, err = m.AcceptStream()
  174. Expect(err).ToNot(HaveOccurred())
  175. Expect(str).ToNot(BeNil())
  176. })
  177. It("errors when deleting a non-existing stream", func() {
  178. err := m.DeleteStream(1337)
  179. Expect(err).To(MatchError("Tried to delete unknown stream 1337"))
  180. })
  181. It("sends MAX_STREAMS frames when streams are deleted", func() {
  182. // open a bunch of streams
  183. _, err := m.GetOrOpenStream(firstNewStream + 4*4)
  184. Expect(err).ToNot(HaveOccurred())
  185. // accept all streams
  186. for i := 0; i < 5; i++ {
  187. _, err := m.AcceptStream()
  188. Expect(err).ToNot(HaveOccurred())
  189. }
  190. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  191. Expect(f.(*wire.MaxStreamsFrame).MaxStreams).To(Equal(maxNumStreams + 1))
  192. })
  193. Expect(m.DeleteStream(firstNewStream + 2*4)).To(Succeed())
  194. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  195. Expect(f.(*wire.MaxStreamsFrame).MaxStreams).To(Equal(maxNumStreams + 2))
  196. })
  197. Expect(m.DeleteStream(firstNewStream + 3*4)).To(Succeed())
  198. })
  199. })