stream_test.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package quic
  2. import (
  3. "io"
  4. "os"
  5. "strconv"
  6. "time"
  7. "github.com/lucas-clemente/quic-go/internal/mocks"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/wire"
  10. . "github.com/onsi/ginkgo"
  11. . "github.com/onsi/gomega"
  12. "github.com/onsi/gomega/gbytes"
  13. )
  14. // in the tests for the stream deadlines we set a deadline
  15. // and wait to make an assertion when Read / Write was unblocked
  16. // on the CIs, the timing is a lot less precise, so scale every duration by this factor
  17. func scaleDuration(t time.Duration) time.Duration {
  18. scaleFactor := 1
  19. if f, err := strconv.Atoi(os.Getenv("TIMESCALE_FACTOR")); err == nil { // parsing "" errors, so this works fine if the env is not set
  20. scaleFactor = f
  21. }
  22. Expect(scaleFactor).ToNot(BeZero())
  23. return time.Duration(scaleFactor) * t
  24. }
  25. var _ = Describe("Stream", func() {
  26. const streamID protocol.StreamID = 1337
  27. var (
  28. str *stream
  29. strWithTimeout io.ReadWriter // str wrapped with gbytes.Timeout{Reader,Writer}
  30. mockFC *mocks.MockStreamFlowController
  31. mockSender *MockStreamSender
  32. )
  33. BeforeEach(func() {
  34. mockSender = NewMockStreamSender(mockCtrl)
  35. mockFC = mocks.NewMockStreamFlowController(mockCtrl)
  36. str = newStream(streamID, mockSender, mockFC, protocol.VersionWhatever)
  37. timeout := scaleDuration(250 * time.Millisecond)
  38. strWithTimeout = struct {
  39. io.Reader
  40. io.Writer
  41. }{
  42. gbytes.TimeoutReader(str, timeout),
  43. gbytes.TimeoutWriter(str, timeout),
  44. }
  45. })
  46. It("gets stream id", func() {
  47. Expect(str.StreamID()).To(Equal(protocol.StreamID(1337)))
  48. })
  49. Context("deadlines", func() {
  50. It("sets a write deadline, when SetDeadline is called", func() {
  51. str.SetDeadline(time.Now().Add(-time.Second))
  52. n, err := strWithTimeout.Write([]byte("foobar"))
  53. Expect(err).To(MatchError(errDeadline))
  54. Expect(n).To(BeZero())
  55. })
  56. It("sets a read deadline, when SetDeadline is called", func() {
  57. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false).AnyTimes()
  58. f := &wire.StreamFrame{Data: []byte("foobar")}
  59. err := str.handleStreamFrame(f)
  60. Expect(err).ToNot(HaveOccurred())
  61. str.SetDeadline(time.Now().Add(-time.Second))
  62. b := make([]byte, 6)
  63. n, err := strWithTimeout.Read(b)
  64. Expect(err).To(MatchError(errDeadline))
  65. Expect(n).To(BeZero())
  66. })
  67. })
  68. Context("completing", func() {
  69. It("is not completed when only the receive side is completed", func() {
  70. // don't EXPECT a call to mockSender.onStreamCompleted()
  71. str.receiveStream.sender.onStreamCompleted(streamID)
  72. })
  73. It("is not completed when only the send side is completed", func() {
  74. // don't EXPECT a call to mockSender.onStreamCompleted()
  75. str.sendStream.sender.onStreamCompleted(streamID)
  76. })
  77. It("is completed when both sides are completed", func() {
  78. mockSender.EXPECT().onStreamCompleted(streamID)
  79. str.sendStream.sender.onStreamCompleted(streamID)
  80. str.receiveStream.sender.onStreamCompleted(streamID)
  81. })
  82. })
  83. })