streams_map_outgoing_bidi.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. // This file was automatically generated by genny.
  2. // Any changes will be lost if this file is regenerated.
  3. // see https://github.com/cheekybits/genny
  4. package quic
  5. import (
  6. "fmt"
  7. "sync"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/qerr"
  10. "github.com/lucas-clemente/quic-go/internal/wire"
  11. )
  12. type outgoingBidiStreamsMap struct {
  13. mutex sync.RWMutex
  14. cond sync.Cond
  15. streams map[protocol.StreamID]streamI
  16. nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
  17. maxStream protocol.StreamID // the maximum stream ID we're allowed to open
  18. maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0)
  19. blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream
  20. newStream func(protocol.StreamID) streamI
  21. queueStreamIDBlocked func(*wire.StreamsBlockedFrame)
  22. closeErr error
  23. }
  24. func newOutgoingBidiStreamsMap(
  25. nextStream protocol.StreamID,
  26. newStream func(protocol.StreamID) streamI,
  27. queueControlFrame func(wire.Frame),
  28. ) *outgoingBidiStreamsMap {
  29. m := &outgoingBidiStreamsMap{
  30. streams: make(map[protocol.StreamID]streamI),
  31. nextStream: nextStream,
  32. newStream: newStream,
  33. queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) },
  34. }
  35. m.cond.L = &m.mutex
  36. return m
  37. }
  38. func (m *outgoingBidiStreamsMap) OpenStream() (streamI, error) {
  39. m.mutex.Lock()
  40. defer m.mutex.Unlock()
  41. str, err := m.openStreamImpl()
  42. if err != nil {
  43. return nil, streamOpenErr{err}
  44. }
  45. return str, nil
  46. }
  47. func (m *outgoingBidiStreamsMap) OpenStreamSync() (streamI, error) {
  48. m.mutex.Lock()
  49. defer m.mutex.Unlock()
  50. for {
  51. str, err := m.openStreamImpl()
  52. if err == nil {
  53. return str, nil
  54. }
  55. if err != nil && err != errTooManyOpenStreams {
  56. return nil, streamOpenErr{err}
  57. }
  58. m.cond.Wait()
  59. }
  60. }
  61. func (m *outgoingBidiStreamsMap) openStreamImpl() (streamI, error) {
  62. if m.closeErr != nil {
  63. return nil, m.closeErr
  64. }
  65. if !m.maxStreamSet || m.nextStream > m.maxStream {
  66. if !m.blockedSent {
  67. if m.maxStreamSet {
  68. m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
  69. Type: protocol.StreamTypeBidi,
  70. StreamLimit: m.maxStream.StreamNum(),
  71. })
  72. } else {
  73. m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
  74. Type: protocol.StreamTypeBidi,
  75. StreamLimit: 0,
  76. })
  77. }
  78. m.blockedSent = true
  79. }
  80. return nil, errTooManyOpenStreams
  81. }
  82. s := m.newStream(m.nextStream)
  83. m.streams[m.nextStream] = s
  84. m.nextStream += 4
  85. return s, nil
  86. }
  87. func (m *outgoingBidiStreamsMap) GetStream(id protocol.StreamID) (streamI, error) {
  88. m.mutex.RLock()
  89. if id >= m.nextStream {
  90. m.mutex.RUnlock()
  91. return nil, qerr.Error(qerr.InvalidStreamID, fmt.Sprintf("peer attempted to open stream %d", id))
  92. }
  93. s := m.streams[id]
  94. m.mutex.RUnlock()
  95. return s, nil
  96. }
  97. func (m *outgoingBidiStreamsMap) DeleteStream(id protocol.StreamID) error {
  98. m.mutex.Lock()
  99. defer m.mutex.Unlock()
  100. if _, ok := m.streams[id]; !ok {
  101. return fmt.Errorf("Tried to delete unknown stream %d", id)
  102. }
  103. delete(m.streams, id)
  104. return nil
  105. }
  106. func (m *outgoingBidiStreamsMap) SetMaxStream(id protocol.StreamID) {
  107. m.mutex.Lock()
  108. if !m.maxStreamSet || id > m.maxStream {
  109. m.maxStream = id
  110. m.maxStreamSet = true
  111. m.blockedSent = false
  112. m.cond.Broadcast()
  113. }
  114. m.mutex.Unlock()
  115. }
  116. func (m *outgoingBidiStreamsMap) CloseWithError(err error) {
  117. m.mutex.Lock()
  118. m.closeErr = err
  119. for _, str := range m.streams {
  120. str.closeForShutdown(err)
  121. }
  122. m.cond.Broadcast()
  123. m.mutex.Unlock()
  124. }