streams_map_outgoing_generic_test.go 6.4 KB


  1. package quic
  2. import (
  3. "errors"
  4. "net"
  5. "github.com/golang/mock/gomock"
  6. "github.com/lucas-clemente/quic-go/internal/protocol"
  7. "github.com/lucas-clemente/quic-go/internal/qerr"
  8. "github.com/lucas-clemente/quic-go/internal/wire"
  9. . "github.com/onsi/ginkgo"
  10. . "github.com/onsi/gomega"
  11. )
  12. var _ = Describe("Streams Map (outgoing)", func() {
  13. const firstNewStream protocol.StreamID = 3
  14. var (
  15. m *outgoingItemsMap
  16. newItem func(id protocol.StreamID) item
  17. mockSender *MockStreamSender
  18. )
  19. BeforeEach(func() {
  20. newItem = func(id protocol.StreamID) item {
  21. return &mockGenericStream{id: id}
  22. }
  23. mockSender = NewMockStreamSender(mockCtrl)
  24. m = newOutgoingItemsMap(firstNewStream, newItem, mockSender.queueControlFrame)
  25. })
  26. Context("no stream ID limit", func() {
  27. BeforeEach(func() {
  28. m.SetMaxStream(0xffffffff)
  29. })
  30. It("opens streams", func() {
  31. str, err := m.OpenStream()
  32. Expect(err).ToNot(HaveOccurred())
  33. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  34. str, err = m.OpenStream()
  35. Expect(err).ToNot(HaveOccurred())
  36. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream + 4))
  37. })
  38. It("doesn't open streams after it has been closed", func() {
  39. testErr := errors.New("close")
  40. m.CloseWithError(testErr)
  41. _, err := m.OpenStream()
  42. Expect(err).To(HaveOccurred())
  43. Expect(err.Error()).To(Equal(testErr.Error()))
  44. nerr, ok := err.(net.Error)
  45. Expect(ok).To(BeTrue())
  46. Expect(nerr.Timeout()).To(BeFalse())
  47. Expect(nerr.Temporary()).To(BeFalse())
  48. })
  49. It("gets streams", func() {
  50. _, err := m.OpenStream()
  51. Expect(err).ToNot(HaveOccurred())
  52. str, err := m.GetStream(firstNewStream)
  53. Expect(err).ToNot(HaveOccurred())
  54. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  55. })
  56. It("errors when trying to get a stream that has not yet been opened", func() {
  57. _, err := m.GetStream(firstNewStream)
  58. Expect(err).To(MatchError(qerr.Error(qerr.InvalidStreamID, "peer attempted to open stream 3")))
  59. })
  60. It("deletes streams", func() {
  61. _, err := m.OpenStream() // opens firstNewStream
  62. Expect(err).ToNot(HaveOccurred())
  63. err = m.DeleteStream(firstNewStream)
  64. Expect(err).ToNot(HaveOccurred())
  65. str, err := m.GetStream(firstNewStream)
  66. Expect(err).ToNot(HaveOccurred())
  67. Expect(str).To(BeNil())
  68. })
  69. It("errors when deleting a non-existing stream", func() {
  70. err := m.DeleteStream(1337)
  71. Expect(err).To(MatchError("Tried to delete unknown stream 1337"))
  72. })
  73. It("errors when deleting a stream twice", func() {
  74. _, err := m.OpenStream() // opens firstNewStream
  75. Expect(err).ToNot(HaveOccurred())
  76. err = m.DeleteStream(firstNewStream)
  77. Expect(err).ToNot(HaveOccurred())
  78. err = m.DeleteStream(firstNewStream)
  79. Expect(err).To(MatchError("Tried to delete unknown stream 3"))
  80. })
  81. It("closes all streams when CloseWithError is called", func() {
  82. str1, err := m.OpenStream()
  83. Expect(err).ToNot(HaveOccurred())
  84. str2, err := m.OpenStream()
  85. Expect(err).ToNot(HaveOccurred())
  86. testErr := errors.New("test err")
  87. m.CloseWithError(testErr)
  88. Expect(str1.(*mockGenericStream).closed).To(BeTrue())
  89. Expect(str1.(*mockGenericStream).closeErr).To(MatchError(testErr))
  90. Expect(str2.(*mockGenericStream).closed).To(BeTrue())
  91. Expect(str2.(*mockGenericStream).closeErr).To(MatchError(testErr))
  92. })
  93. })
  94. Context("with stream ID limits", func() {
  95. It("errors when no stream can be opened immediately", func() {
  96. mockSender.EXPECT().queueControlFrame(gomock.Any())
  97. _, err := m.OpenStream()
  98. expectTooManyStreamsError(err)
  99. })
  100. It("blocks until a stream can be opened synchronously", func() {
  101. mockSender.EXPECT().queueControlFrame(gomock.Any())
  102. done := make(chan struct{})
  103. go func() {
  104. defer GinkgoRecover()
  105. str, err := m.OpenStreamSync()
  106. Expect(err).ToNot(HaveOccurred())
  107. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream))
  108. close(done)
  109. }()
  110. Consistently(done).ShouldNot(BeClosed())
  111. m.SetMaxStream(firstNewStream)
  112. Eventually(done).Should(BeClosed())
  113. })
  114. It("works with stream 0", func() {
  115. m = newOutgoingItemsMap(0, newItem, mockSender.queueControlFrame)
  116. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  117. Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeZero())
  118. })
  119. done := make(chan struct{})
  120. go func() {
  121. defer GinkgoRecover()
  122. str, err := m.OpenStreamSync()
  123. Expect(err).ToNot(HaveOccurred())
  124. Expect(str.(*mockGenericStream).id).To(BeZero())
  125. close(done)
  126. }()
  127. Consistently(done).ShouldNot(BeClosed())
  128. m.SetMaxStream(0)
  129. Eventually(done).Should(BeClosed())
  130. })
  131. It("stops opening synchronously when it is closed", func() {
  132. mockSender.EXPECT().queueControlFrame(gomock.Any())
  133. testErr := errors.New("test error")
  134. done := make(chan struct{})
  135. go func() {
  136. defer GinkgoRecover()
  137. _, err := m.OpenStreamSync()
  138. Expect(err).To(HaveOccurred())
  139. Expect(err.Error()).To(Equal(testErr.Error()))
  140. close(done)
  141. }()
  142. Consistently(done).ShouldNot(BeClosed())
  143. m.CloseWithError(testErr)
  144. Eventually(done).Should(BeClosed())
  145. })
  146. It("doesn't reduce the stream limit", func() {
  147. m.SetMaxStream(firstNewStream + 4)
  148. m.SetMaxStream(firstNewStream)
  149. _, err := m.OpenStream()
  150. Expect(err).ToNot(HaveOccurred())
  151. str, err := m.OpenStream()
  152. Expect(err).ToNot(HaveOccurred())
  153. Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream + 4))
  154. })
  155. It("queues a STREAM_ID_BLOCKED frame if no stream can be opened", func() {
  156. m.SetMaxStream(firstNewStream + 5*4)
  157. // open the 6 allowed streams
  158. for i := 0; i < 6; i++ {
  159. _, err := m.OpenStream()
  160. Expect(err).ToNot(HaveOccurred())
  161. }
  162. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  163. Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(6))
  164. })
  165. _, err := m.OpenStream()
  166. Expect(err).To(HaveOccurred())
  167. Expect(err.Error()).To(Equal(errTooManyOpenStreams.Error()))
  168. })
  169. It("only sends one STREAM_ID_BLOCKED frame for one stream ID", func() {
  170. m.SetMaxStream(firstNewStream)
  171. mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
  172. Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1))
  173. })
  174. _, err := m.OpenStream()
  175. Expect(err).ToNot(HaveOccurred())
  176. // try to open a stream twice, but expect only one STREAM_ID_BLOCKED to be sent
  177. _, err = m.OpenStream()
  178. expectTooManyStreamsError(err)
  179. _, err = m.OpenStream()
  180. expectTooManyStreamsError(err)
  181. })
  182. })
  183. })