streams_map_outgoing_generic_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package quic
  2. import (
  3. "errors"
  4. "github.com/golang/mock/gomock"
  5. "github.com/lucas-clemente/quic-go/internal/protocol"
  6. "github.com/lucas-clemente/quic-go/internal/qerr"
  7. "github.com/lucas-clemente/quic-go/internal/wire"
  8. . "github.com/onsi/ginkgo"
  9. . "github.com/onsi/gomega"
  10. )
  11. var _ = Describe("Streams Map (outgoing)", func() {
  12. const firstNewStream protocol.StreamID = 3
  13. var (
  14. m *outgoingItemsMap
  15. newItem func(id protocol.StreamID) item
  16. mockSender *MockStreamSender
  17. )
  18. BeforeEach(func() {
  19. newItem = func(id protocol.StreamID) item {
  20. return &mockGenericStream{id: id}
  21. }
  22. mockSender = NewMockStreamSender(mockCtrl)
  23. m = newOutgoingItemsMap(firstNewStream, newItem, mockSender.queueControlFrame)
  24. })
  25. Context("no stream ID limit", func() {
  26. BeforeEach(func() {
  27. m.SetMaxStream(0xffffffff)
  28. })
  29. It("opens streams", func() {
  30. str, err := m.OpenStream()
  31. Expect(err).ToNot(HaveOccurred())
  32. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  33. str, err = m.OpenStream()
  34. Expect(err).ToNot(HaveOccurred())
  35. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream + 4))
  36. })
  37. It("doesn't open streams after it has been closed", func() {
  38. testErr := errors.New("close")
  39. m.CloseWithError(testErr)
  40. _, err := m.OpenStream()
  41. Expect(err).To(MatchError(testErr))
  42. })
  43. It("gets streams", func() {
  44. _, err := m.OpenStream()
  45. Expect(err).ToNot(HaveOccurred())
  46. str, err := m.GetStream(firstNewStream)
  47. Expect(err).ToNot(HaveOccurred())
  48. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  49. })
  50. It("errors when trying to get a stream that has not yet been opened", func() {
  51. _, err := m.GetStream(firstNewStream)
  52. Expect(err).To(MatchError(qerr.Error(qerr.StreamStateError, "peer attempted to open stream 3")))
  53. })
  54. It("deletes streams", func() {
  55. _, err := m.OpenStream() // opens firstNewStream
  56. Expect(err).ToNot(HaveOccurred())
  57. err = m.DeleteStream(firstNewStream)
  58. Expect(err).ToNot(HaveOccurred())
  59. str, err := m.GetStream(firstNewStream)
  60. Expect(err).ToNot(HaveOccurred())
  61. Expect(str).To(BeNil())
  62. })
  63. It("errors when deleting a non-existing stream", func() {
  64. err := m.DeleteStream(1337)
  65. Expect(err).To(MatchError("Tried to delete unknown stream 1337"))
  66. })
  67. It("errors when deleting a stream twice", func() {
  68. _, err := m.OpenStream() // opens firstNewStream
  69. Expect(err).ToNot(HaveOccurred())
  70. err = m.DeleteStream(firstNewStream)
  71. Expect(err).ToNot(HaveOccurred())
  72. err = m.DeleteStream(firstNewStream)
  73. Expect(err).To(MatchError("Tried to delete unknown stream 3"))
  74. })
  75. It("closes all streams when CloseWithError is called", func() {
  76. str1, err := m.OpenStream()
  77. Expect(err).ToNot(HaveOccurred())
  78. str2, err := m.OpenStream()
  79. Expect(err).ToNot(HaveOccurred())
  80. testErr := errors.New("test err")
  81. m.CloseWithError(testErr)
  82. Expect(str1.(*mockGenericStream).closed).To(BeTrue())
  83. Expect(str1.(*mockGenericStream).closeErr).To(MatchError(testErr))
  84. Expect(str2.(*mockGenericStream).closed).To(BeTrue())
  85. Expect(str2.(*mockGenericStream).closeErr).To(MatchError(testErr))
  86. })
  87. })
  88. Context("with stream ID limits", func() {
  89. It("errors when no stream can be opened immediately", func() {
  90. mockSender.EXPECT().queueControlFrame(gomock.Any())
  91. _, err := m.OpenStream()
  92. expectTooManyStreamsError(err)
  93. })
  94. It("blocks until a stream can be opened synchronously", func() {
  95. mockSender.EXPECT().queueControlFrame(gomock.Any())
  96. done := make(chan struct{})
  97. go func() {
  98. defer GinkgoRecover()
  99. str, err := m.OpenStreamSync()
  100. Expect(err).ToNot(HaveOccurred())
  101. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  102. close(done)
  103. }()
  104. Consistently(done).ShouldNot(BeClosed())
  105. m.SetMaxStream(firstNewStream)
  106. Eventually(done).Should(BeClosed())
  107. })
  108. It("works with stream 0", func() {
  109. m = newOutgoingItemsMap(0, newItem, mockSender.queueControlFrame)
  110. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  111. Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeZero())
  112. })
  113. done := make(chan struct{})
  114. go func() {
  115. defer GinkgoRecover()
  116. str, err := m.OpenStreamSync()
  117. Expect(err).ToNot(HaveOccurred())
  118. Expect(str.(*mockGenericStream).id).To(BeZero())
  119. close(done)
  120. }()
  121. Consistently(done).ShouldNot(BeClosed())
  122. m.SetMaxStream(0)
  123. Eventually(done).Should(BeClosed())
  124. })
  125. It("stops opening synchronously when it is closed", func() {
  126. mockSender.EXPECT().queueControlFrame(gomock.Any())
  127. testErr := errors.New("test error")
  128. done := make(chan struct{})
  129. go func() {
  130. defer GinkgoRecover()
  131. _, err := m.OpenStreamSync()
  132. Expect(err).To(MatchError(testErr))
  133. close(done)
  134. }()
  135. Consistently(done).ShouldNot(BeClosed())
  136. m.CloseWithError(testErr)
  137. Eventually(done).Should(BeClosed())
  138. })
  139. It("doesn't reduce the stream limit", func() {
  140. m.SetMaxStream(firstNewStream + 4)
  141. m.SetMaxStream(firstNewStream)
  142. _, err := m.OpenStream()
  143. Expect(err).ToNot(HaveOccurred())
  144. str, err := m.OpenStream()
  145. Expect(err).ToNot(HaveOccurred())
  146. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream + 4))
  147. })
  148. It("queues a STREAM_ID_BLOCKED frame if no stream can be opened", func() {
  149. m.SetMaxStream(firstNewStream + 5*4)
  150. // open the 6 allowed streams
  151. for i := 0; i < 6; i++ {
  152. _, err := m.OpenStream()
  153. Expect(err).ToNot(HaveOccurred())
  154. }
  155. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  156. Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(6))
  157. })
  158. _, err := m.OpenStream()
  159. Expect(err).To(HaveOccurred())
  160. Expect(err.Error()).To(Equal(errTooManyOpenStreams.Error()))
  161. })
  162. It("only sends one STREAM_ID_BLOCKED frame for one stream ID", func() {
  163. m.SetMaxStream(firstNewStream)
  164. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  165. Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1))
  166. })
  167. _, err := m.OpenStream()
  168. Expect(err).ToNot(HaveOccurred())
  169. // try to open a stream twice, but expect only one STREAM_ID_BLOCKED to be sent
  170. _, err = m.OpenStream()
  171. expectTooManyStreamsError(err)
  172. _, err = m.OpenStream()
  173. expectTooManyStreamsError(err)
  174. })
  175. })
  176. })