window_update_queue.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package quic
  2. import (
  3. "sync"
  4. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  5. "github.com/lucas-clemente/quic-go/internal/protocol"
  6. "github.com/lucas-clemente/quic-go/internal/wire"
  7. )
  8. type windowUpdateQueue struct {
  9. mutex sync.Mutex
  10. queue map[protocol.StreamID]bool // used as a set
  11. queuedConn bool // connection-level window update
  12. streamGetter streamGetter
  13. connFlowController flowcontrol.ConnectionFlowController
  14. callback func(wire.Frame)
  15. }
  16. func newWindowUpdateQueue(
  17. streamGetter streamGetter,
  18. connFC flowcontrol.ConnectionFlowController,
  19. cb func(wire.Frame),
  20. ) *windowUpdateQueue {
  21. return &windowUpdateQueue{
  22. queue: make(map[protocol.StreamID]bool),
  23. streamGetter: streamGetter,
  24. connFlowController: connFC,
  25. callback: cb,
  26. }
  27. }
  28. func (q *windowUpdateQueue) AddStream(id protocol.StreamID) {
  29. q.mutex.Lock()
  30. q.queue[id] = true
  31. q.mutex.Unlock()
  32. }
  33. func (q *windowUpdateQueue) AddConnection() {
  34. q.mutex.Lock()
  35. q.queuedConn = true
  36. q.mutex.Unlock()
  37. }
  38. func (q *windowUpdateQueue) QueueAll() {
  39. q.mutex.Lock()
  40. // queue a connection-level window update
  41. if q.queuedConn {
  42. q.callback(&wire.MaxDataFrame{ByteOffset: q.connFlowController.GetWindowUpdate()})
  43. q.queuedConn = false
  44. }
  45. // queue all stream-level window updates
  46. for id := range q.queue {
  47. str, err := q.streamGetter.GetOrOpenReceiveStream(id)
  48. if err != nil || str == nil { // the stream can be nil if it was completed before dequeing the window update
  49. continue
  50. }
  51. offset := str.getWindowUpdate()
  52. if offset == 0 { // can happen if we received a final offset, right after queueing the window update
  53. continue
  54. }
  55. q.callback(&wire.MaxStreamDataFrame{
  56. StreamID: id,
  57. ByteOffset: offset,
  58. })
  59. delete(q.queue, id)
  60. }
  61. q.mutex.Unlock()
  62. }