streams_map.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package quic
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  7. "github.com/lucas-clemente/quic-go/internal/handshake"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/qerr"
  10. "github.com/lucas-clemente/quic-go/internal/wire"
  11. )
  12. type streamOpenErr struct{ error }
  13. var _ net.Error = &streamOpenErr{}
  14. func (e streamOpenErr) Temporary() bool { return e.error == errTooManyOpenStreams }
  15. func (streamOpenErr) Timeout() bool { return false }
  16. // errTooManyOpenStreams is used internally by the outgoing streams maps.
  17. var errTooManyOpenStreams = errors.New("too many open streams")
  18. type streamsMap struct {
  19. perspective protocol.Perspective
  20. sender streamSender
  21. newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController
  22. outgoingBidiStreams *outgoingBidiStreamsMap
  23. outgoingUniStreams *outgoingUniStreamsMap
  24. incomingBidiStreams *incomingBidiStreamsMap
  25. incomingUniStreams *incomingUniStreamsMap
  26. }
  27. var _ streamManager = &streamsMap{}
  28. func newStreamsMap(
  29. sender streamSender,
  30. newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController,
  31. maxIncomingStreams uint64,
  32. maxIncomingUniStreams uint64,
  33. perspective protocol.Perspective,
  34. version protocol.VersionNumber,
  35. ) streamManager {
  36. m := &streamsMap{
  37. perspective: perspective,
  38. newFlowController: newFlowController,
  39. sender: sender,
  40. }
  41. newBidiStream := func(id protocol.StreamID) streamI {
  42. return newStream(id, m.sender, m.newFlowController(id), version)
  43. }
  44. newUniSendStream := func(id protocol.StreamID) sendStreamI {
  45. return newSendStream(id, m.sender, m.newFlowController(id), version)
  46. }
  47. newUniReceiveStream := func(id protocol.StreamID) receiveStreamI {
  48. return newReceiveStream(id, m.sender, m.newFlowController(id), version)
  49. }
  50. m.outgoingBidiStreams = newOutgoingBidiStreamsMap(
  51. protocol.FirstStream(protocol.StreamTypeBidi, perspective),
  52. newBidiStream,
  53. sender.queueControlFrame,
  54. )
  55. m.incomingBidiStreams = newIncomingBidiStreamsMap(
  56. protocol.FirstStream(protocol.StreamTypeBidi, perspective.Opposite()),
  57. protocol.MaxStreamID(protocol.StreamTypeBidi, maxIncomingStreams, perspective.Opposite()),
  58. maxIncomingStreams,
  59. sender.queueControlFrame,
  60. newBidiStream,
  61. )
  62. m.outgoingUniStreams = newOutgoingUniStreamsMap(
  63. protocol.FirstStream(protocol.StreamTypeUni, perspective),
  64. newUniSendStream,
  65. sender.queueControlFrame,
  66. )
  67. m.incomingUniStreams = newIncomingUniStreamsMap(
  68. protocol.FirstStream(protocol.StreamTypeUni, perspective.Opposite()),
  69. protocol.MaxStreamID(protocol.StreamTypeUni, maxIncomingUniStreams, perspective.Opposite()),
  70. maxIncomingUniStreams,
  71. sender.queueControlFrame,
  72. newUniReceiveStream,
  73. )
  74. return m
  75. }
  76. func (m *streamsMap) OpenStream() (Stream, error) {
  77. return m.outgoingBidiStreams.OpenStream()
  78. }
  79. func (m *streamsMap) OpenStreamSync() (Stream, error) {
  80. return m.outgoingBidiStreams.OpenStreamSync()
  81. }
  82. func (m *streamsMap) OpenUniStream() (SendStream, error) {
  83. return m.outgoingUniStreams.OpenStream()
  84. }
  85. func (m *streamsMap) OpenUniStreamSync() (SendStream, error) {
  86. return m.outgoingUniStreams.OpenStreamSync()
  87. }
  88. func (m *streamsMap) AcceptStream() (Stream, error) {
  89. return m.incomingBidiStreams.AcceptStream()
  90. }
  91. func (m *streamsMap) AcceptUniStream() (ReceiveStream, error) {
  92. return m.incomingUniStreams.AcceptStream()
  93. }
  94. func (m *streamsMap) DeleteStream(id protocol.StreamID) error {
  95. switch id.Type() {
  96. case protocol.StreamTypeUni:
  97. if id.InitiatedBy() == m.perspective {
  98. return m.outgoingUniStreams.DeleteStream(id)
  99. }
  100. return m.incomingUniStreams.DeleteStream(id)
  101. case protocol.StreamTypeBidi:
  102. if id.InitiatedBy() == m.perspective {
  103. return m.outgoingBidiStreams.DeleteStream(id)
  104. }
  105. return m.incomingBidiStreams.DeleteStream(id)
  106. }
  107. panic("")
  108. }
  109. func (m *streamsMap) GetOrOpenReceiveStream(id protocol.StreamID) (receiveStreamI, error) {
  110. switch id.Type() {
  111. case protocol.StreamTypeUni:
  112. if id.InitiatedBy() == m.perspective {
  113. // an outgoing unidirectional stream is a send stream, not a receive stream
  114. return nil, fmt.Errorf("peer attempted to open receive stream %d", id)
  115. }
  116. return m.incomingUniStreams.GetOrOpenStream(id)
  117. case protocol.StreamTypeBidi:
  118. if id.InitiatedBy() == m.perspective {
  119. return m.outgoingBidiStreams.GetStream(id)
  120. }
  121. return m.incomingBidiStreams.GetOrOpenStream(id)
  122. }
  123. panic("")
  124. }
  125. func (m *streamsMap) GetOrOpenSendStream(id protocol.StreamID) (sendStreamI, error) {
  126. switch id.Type() {
  127. case protocol.StreamTypeUni:
  128. if id.InitiatedBy() == m.perspective {
  129. return m.outgoingUniStreams.GetStream(id)
  130. }
  131. // an incoming unidirectional stream is a receive stream, not a send stream
  132. return nil, fmt.Errorf("peer attempted to open send stream %d", id)
  133. case protocol.StreamTypeBidi:
  134. if id.InitiatedBy() == m.perspective {
  135. return m.outgoingBidiStreams.GetStream(id)
  136. }
  137. return m.incomingBidiStreams.GetOrOpenStream(id)
  138. }
  139. panic("")
  140. }
  141. func (m *streamsMap) HandleMaxStreamsFrame(f *wire.MaxStreamsFrame) error {
  142. if f.MaxStreams > protocol.MaxStreamCount {
  143. return qerr.StreamLimitError
  144. }
  145. id := protocol.MaxStreamID(f.Type, f.MaxStreams, m.perspective)
  146. switch id.Type() {
  147. case protocol.StreamTypeUni:
  148. m.outgoingUniStreams.SetMaxStream(id)
  149. case protocol.StreamTypeBidi:
  150. fmt.Printf("")
  151. m.outgoingBidiStreams.SetMaxStream(id)
  152. }
  153. return nil
  154. }
  155. func (m *streamsMap) UpdateLimits(p *handshake.TransportParameters) error {
  156. if p.MaxBidiStreams > protocol.MaxStreamCount || p.MaxUniStreams > protocol.MaxStreamCount {
  157. return qerr.StreamLimitError
  158. }
  159. // Max{Uni,Bidi}StreamID returns the highest stream ID that the peer is allowed to open.
  160. m.outgoingBidiStreams.SetMaxStream(protocol.MaxStreamID(protocol.StreamTypeBidi, p.MaxBidiStreams, m.perspective))
  161. m.outgoingUniStreams.SetMaxStream(protocol.MaxStreamID(protocol.StreamTypeUni, p.MaxUniStreams, m.perspective))
  162. return nil
  163. }
  164. func (m *streamsMap) CloseWithError(err error) {
  165. m.outgoingBidiStreams.CloseWithError(err)
  166. m.outgoingUniStreams.CloseWithError(err)
  167. m.incomingBidiStreams.CloseWithError(err)
  168. m.incomingUniStreams.CloseWithError(err)
  169. }