streams_map.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package quic
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  7. "github.com/lucas-clemente/quic-go/internal/handshake"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/wire"
  10. )
  11. type streamOpenErr struct{ error }
  12. var _ net.Error = &streamOpenErr{}
  13. func (e streamOpenErr) Temporary() bool { return e.error == errTooManyOpenStreams }
  14. func (streamOpenErr) Timeout() bool { return false }
  15. // errTooManyOpenStreams is used internally by the outgoing streams maps.
  16. var errTooManyOpenStreams = errors.New("too many open streams")
  17. type streamsMap struct {
  18. perspective protocol.Perspective
  19. sender streamSender
  20. newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController
  21. outgoingBidiStreams *outgoingBidiStreamsMap
  22. outgoingUniStreams *outgoingUniStreamsMap
  23. incomingBidiStreams *incomingBidiStreamsMap
  24. incomingUniStreams *incomingUniStreamsMap
  25. }
  26. var _ streamManager = &streamsMap{}
  27. func newStreamsMap(
  28. sender streamSender,
  29. newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController,
  30. maxIncomingStreams uint64,
  31. maxIncomingUniStreams uint64,
  32. perspective protocol.Perspective,
  33. version protocol.VersionNumber,
  34. ) streamManager {
  35. m := &streamsMap{
  36. perspective: perspective,
  37. newFlowController: newFlowController,
  38. sender: sender,
  39. }
  40. newBidiStream := func(id protocol.StreamID) streamI {
  41. return newStream(id, m.sender, m.newFlowController(id), version)
  42. }
  43. newUniSendStream := func(id protocol.StreamID) sendStreamI {
  44. return newSendStream(id, m.sender, m.newFlowController(id), version)
  45. }
  46. newUniReceiveStream := func(id protocol.StreamID) receiveStreamI {
  47. return newReceiveStream(id, m.sender, m.newFlowController(id), version)
  48. }
  49. m.outgoingBidiStreams = newOutgoingBidiStreamsMap(
  50. protocol.FirstStream(protocol.StreamTypeBidi, perspective),
  51. newBidiStream,
  52. sender.queueControlFrame,
  53. )
  54. m.incomingBidiStreams = newIncomingBidiStreamsMap(
  55. protocol.FirstStream(protocol.StreamTypeBidi, perspective.Opposite()),
  56. protocol.MaxStreamID(protocol.StreamTypeBidi, maxIncomingStreams, perspective.Opposite()),
  57. maxIncomingStreams,
  58. sender.queueControlFrame,
  59. newBidiStream,
  60. )
  61. m.outgoingUniStreams = newOutgoingUniStreamsMap(
  62. protocol.FirstStream(protocol.StreamTypeUni, perspective),
  63. newUniSendStream,
  64. sender.queueControlFrame,
  65. )
  66. m.incomingUniStreams = newIncomingUniStreamsMap(
  67. protocol.FirstStream(protocol.StreamTypeUni, perspective.Opposite()),
  68. protocol.MaxStreamID(protocol.StreamTypeUni, maxIncomingUniStreams, perspective.Opposite()),
  69. maxIncomingUniStreams,
  70. sender.queueControlFrame,
  71. newUniReceiveStream,
  72. )
  73. return m
  74. }
  75. func (m *streamsMap) OpenStream() (Stream, error) {
  76. return m.outgoingBidiStreams.OpenStream()
  77. }
  78. func (m *streamsMap) OpenStreamSync() (Stream, error) {
  79. return m.outgoingBidiStreams.OpenStreamSync()
  80. }
  81. func (m *streamsMap) OpenUniStream() (SendStream, error) {
  82. return m.outgoingUniStreams.OpenStream()
  83. }
  84. func (m *streamsMap) OpenUniStreamSync() (SendStream, error) {
  85. return m.outgoingUniStreams.OpenStreamSync()
  86. }
  87. func (m *streamsMap) AcceptStream() (Stream, error) {
  88. return m.incomingBidiStreams.AcceptStream()
  89. }
  90. func (m *streamsMap) AcceptUniStream() (ReceiveStream, error) {
  91. return m.incomingUniStreams.AcceptStream()
  92. }
  93. func (m *streamsMap) DeleteStream(id protocol.StreamID) error {
  94. switch id.Type() {
  95. case protocol.StreamTypeUni:
  96. if id.InitiatedBy() == m.perspective {
  97. return m.outgoingUniStreams.DeleteStream(id)
  98. }
  99. return m.incomingUniStreams.DeleteStream(id)
  100. case protocol.StreamTypeBidi:
  101. if id.InitiatedBy() == m.perspective {
  102. return m.outgoingBidiStreams.DeleteStream(id)
  103. }
  104. return m.incomingBidiStreams.DeleteStream(id)
  105. }
  106. panic("")
  107. }
  108. func (m *streamsMap) GetOrOpenReceiveStream(id protocol.StreamID) (receiveStreamI, error) {
  109. switch id.Type() {
  110. case protocol.StreamTypeUni:
  111. if id.InitiatedBy() == m.perspective {
  112. // an outgoing unidirectional stream is a send stream, not a receive stream
  113. return nil, fmt.Errorf("peer attempted to open receive stream %d", id)
  114. }
  115. return m.incomingUniStreams.GetOrOpenStream(id)
  116. case protocol.StreamTypeBidi:
  117. if id.InitiatedBy() == m.perspective {
  118. return m.outgoingBidiStreams.GetStream(id)
  119. }
  120. return m.incomingBidiStreams.GetOrOpenStream(id)
  121. }
  122. panic("")
  123. }
  124. func (m *streamsMap) GetOrOpenSendStream(id protocol.StreamID) (sendStreamI, error) {
  125. switch id.Type() {
  126. case protocol.StreamTypeUni:
  127. if id.InitiatedBy() == m.perspective {
  128. return m.outgoingUniStreams.GetStream(id)
  129. }
  130. // an incoming unidirectional stream is a receive stream, not a send stream
  131. return nil, fmt.Errorf("peer attempted to open send stream %d", id)
  132. case protocol.StreamTypeBidi:
  133. if id.InitiatedBy() == m.perspective {
  134. return m.outgoingBidiStreams.GetStream(id)
  135. }
  136. return m.incomingBidiStreams.GetOrOpenStream(id)
  137. }
  138. panic("")
  139. }
  140. func (m *streamsMap) HandleMaxStreamsFrame(f *wire.MaxStreamsFrame) error {
  141. id := protocol.MaxStreamID(f.Type, f.MaxStreams, m.perspective)
  142. switch id.Type() {
  143. case protocol.StreamTypeUni:
  144. m.outgoingUniStreams.SetMaxStream(id)
  145. case protocol.StreamTypeBidi:
  146. fmt.Printf("")
  147. m.outgoingBidiStreams.SetMaxStream(id)
  148. }
  149. return nil
  150. }
  151. func (m *streamsMap) UpdateLimits(p *handshake.TransportParameters) {
  152. // Max{Uni,Bidi}StreamID returns the highest stream ID that the peer is allowed to open.
  153. m.outgoingBidiStreams.SetMaxStream(protocol.MaxStreamID(protocol.StreamTypeBidi, p.MaxBidiStreams, m.perspective))
  154. m.outgoingUniStreams.SetMaxStream(protocol.MaxStreamID(protocol.StreamTypeUni, p.MaxUniStreams, m.perspective))
  155. }
  156. func (m *streamsMap) CloseWithError(err error) {
  157. m.outgoingBidiStreams.CloseWithError(err)
  158. m.outgoingUniStreams.CloseWithError(err)
  159. m.incomingBidiStreams.CloseWithError(err)
  160. m.incomingUniStreams.CloseWithError(err)
  161. }