streams_map_outgoing_bidi.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. // This file was automatically generated by genny.
  2. // Any changes will be lost if this file is regenerated.
  3. // see https://github.com/cheekybits/genny
  4. package quic
  5. import (
  6. "context"
  7. "sync"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/wire"
  10. )
  11. type outgoingBidiStreamsMap struct {
  12. mutex sync.RWMutex
  13. streams map[protocol.StreamNum]streamI
  14. openQueue map[uint64]chan struct{}
  15. lowestInQueue uint64
  16. highestInQueue uint64
  17. nextStream protocol.StreamNum // stream ID of the stream returned by OpenStream(Sync)
  18. maxStream protocol.StreamNum // the maximum stream ID we're allowed to open
  19. blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream
  20. newStream func(protocol.StreamNum) streamI
  21. queueStreamIDBlocked func(*wire.StreamsBlockedFrame)
  22. closeErr error
  23. }
  24. func newOutgoingBidiStreamsMap(
  25. newStream func(protocol.StreamNum) streamI,
  26. queueControlFrame func(wire.Frame),
  27. ) *outgoingBidiStreamsMap {
  28. return &outgoingBidiStreamsMap{
  29. streams: make(map[protocol.StreamNum]streamI),
  30. openQueue: make(map[uint64]chan struct{}),
  31. maxStream: protocol.InvalidStreamNum,
  32. nextStream: 1,
  33. newStream: newStream,
  34. queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) },
  35. }
  36. }
  37. func (m *outgoingBidiStreamsMap) OpenStream() (streamI, error) {
  38. m.mutex.Lock()
  39. defer m.mutex.Unlock()
  40. if m.closeErr != nil {
  41. return nil, m.closeErr
  42. }
  43. // if there are OpenStreamSync calls waiting, return an error here
  44. if len(m.openQueue) > 0 || m.nextStream > m.maxStream {
  45. m.maybeSendBlockedFrame()
  46. return nil, streamOpenErr{errTooManyOpenStreams}
  47. }
  48. return m.openStream(), nil
  49. }
  50. func (m *outgoingBidiStreamsMap) OpenStreamSync(ctx context.Context) (streamI, error) {
  51. m.mutex.Lock()
  52. defer m.mutex.Unlock()
  53. if m.closeErr != nil {
  54. return nil, m.closeErr
  55. }
  56. if err := ctx.Err(); err != nil {
  57. return nil, err
  58. }
  59. if len(m.openQueue) == 0 && m.nextStream <= m.maxStream {
  60. return m.openStream(), nil
  61. }
  62. waitChan := make(chan struct{}, 1)
  63. queuePos := m.highestInQueue
  64. m.highestInQueue++
  65. if len(m.openQueue) == 0 {
  66. m.lowestInQueue = queuePos
  67. }
  68. m.openQueue[queuePos] = waitChan
  69. m.maybeSendBlockedFrame()
  70. for {
  71. m.mutex.Unlock()
  72. select {
  73. case <-ctx.Done():
  74. m.mutex.Lock()
  75. delete(m.openQueue, queuePos)
  76. return nil, ctx.Err()
  77. case <-waitChan:
  78. }
  79. m.mutex.Lock()
  80. if m.closeErr != nil {
  81. return nil, m.closeErr
  82. }
  83. if m.nextStream > m.maxStream {
  84. // no stream available. Continue waiting
  85. continue
  86. }
  87. str := m.openStream()
  88. delete(m.openQueue, queuePos)
  89. m.unblockOpenSync()
  90. return str, nil
  91. }
  92. }
  93. func (m *outgoingBidiStreamsMap) openStream() streamI {
  94. s := m.newStream(m.nextStream)
  95. m.streams[m.nextStream] = s
  96. m.nextStream++
  97. return s
  98. }
  99. func (m *outgoingBidiStreamsMap) maybeSendBlockedFrame() {
  100. if m.blockedSent {
  101. return
  102. }
  103. var streamNum protocol.StreamNum
  104. if m.maxStream != protocol.InvalidStreamNum {
  105. streamNum = m.maxStream
  106. }
  107. m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
  108. Type: protocol.StreamTypeBidi,
  109. StreamLimit: streamNum,
  110. })
  111. m.blockedSent = true
  112. }
  113. func (m *outgoingBidiStreamsMap) GetStream(num protocol.StreamNum) (streamI, error) {
  114. m.mutex.RLock()
  115. if num >= m.nextStream {
  116. m.mutex.RUnlock()
  117. return nil, streamError{
  118. message: "peer attempted to open stream %d",
  119. nums: []protocol.StreamNum{num},
  120. }
  121. }
  122. s := m.streams[num]
  123. m.mutex.RUnlock()
  124. return s, nil
  125. }
  126. func (m *outgoingBidiStreamsMap) DeleteStream(num protocol.StreamNum) error {
  127. m.mutex.Lock()
  128. defer m.mutex.Unlock()
  129. if _, ok := m.streams[num]; !ok {
  130. return streamError{
  131. message: "Tried to delete unknown stream %d",
  132. nums: []protocol.StreamNum{num},
  133. }
  134. }
  135. delete(m.streams, num)
  136. return nil
  137. }
  138. func (m *outgoingBidiStreamsMap) SetMaxStream(num protocol.StreamNum) {
  139. m.mutex.Lock()
  140. defer m.mutex.Unlock()
  141. if num <= m.maxStream {
  142. return
  143. }
  144. m.maxStream = num
  145. m.blockedSent = false
  146. m.unblockOpenSync()
  147. }
  148. func (m *outgoingBidiStreamsMap) unblockOpenSync() {
  149. if len(m.openQueue) == 0 {
  150. return
  151. }
  152. for qp := m.lowestInQueue; qp <= m.highestInQueue; qp++ {
  153. c, ok := m.openQueue[qp]
  154. if !ok { // entry was deleted because the context was canceled
  155. continue
  156. }
  157. close(c)
  158. m.openQueue[qp] = nil
  159. m.lowestInQueue = qp + 1
  160. return
  161. }
  162. }
  163. func (m *outgoingBidiStreamsMap) CloseWithError(err error) {
  164. m.mutex.Lock()
  165. m.closeErr = err
  166. for _, str := range m.streams {
  167. str.closeForShutdown(err)
  168. }
  169. for _, c := range m.openQueue {
  170. if c != nil {
  171. close(c)
  172. }
  173. }
  174. m.mutex.Unlock()
  175. }