session.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283
  1. package quic
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "sync"
  10. "time"
  11. "github.com/bifurcation/mint"
  12. "github.com/lucas-clemente/quic-go/internal/ackhandler"
  13. "github.com/lucas-clemente/quic-go/internal/congestion"
  14. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  15. "github.com/lucas-clemente/quic-go/internal/handshake"
  16. "github.com/lucas-clemente/quic-go/internal/protocol"
  17. "github.com/lucas-clemente/quic-go/internal/utils"
  18. "github.com/lucas-clemente/quic-go/internal/wire"
  19. "github.com/lucas-clemente/quic-go/qerr"
  20. )
  21. type unpacker interface {
  22. Unpack(headerBinary []byte, hdr *wire.Header, data []byte) (*unpackedPacket, error)
  23. }
  24. type streamGetter interface {
  25. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  26. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  27. }
  28. type streamManager interface {
  29. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  30. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  31. OpenStream() (Stream, error)
  32. OpenUniStream() (SendStream, error)
  33. OpenStreamSync() (Stream, error)
  34. OpenUniStreamSync() (SendStream, error)
  35. AcceptStream() (Stream, error)
  36. AcceptUniStream() (ReceiveStream, error)
  37. DeleteStream(protocol.StreamID) error
  38. UpdateLimits(*handshake.TransportParameters)
  39. HandleMaxStreamIDFrame(*wire.MaxStreamIDFrame) error
  40. CloseWithError(error)
  41. }
  42. type cryptoStreamHandler interface {
  43. HandleCryptoStream() error
  44. ConnectionState() handshake.ConnectionState
  45. }
  46. type divNonceSetter interface {
  47. SetDiversificationNonce([]byte) error
  48. }
  49. type receivedPacket struct {
  50. remoteAddr net.Addr
  51. header *wire.Header
  52. data []byte
  53. rcvTime time.Time
  54. }
  55. var (
  56. newCryptoSetup = handshake.NewCryptoSetup
  57. newCryptoSetupClient = handshake.NewCryptoSetupClient
  58. )
  59. type closeError struct {
  60. err error
  61. remote bool
  62. sendClose bool
  63. }
  64. // A Session is a QUIC session
  65. type session struct {
  66. sessionRunner sessionRunner
  67. destConnID protocol.ConnectionID
  68. srcConnID protocol.ConnectionID
  69. perspective protocol.Perspective
  70. version protocol.VersionNumber
  71. config *Config
  72. conn connection
  73. streamsMap streamManager
  74. cryptoStream cryptoStream
  75. rttStats *congestion.RTTStats
  76. sentPacketHandler ackhandler.SentPacketHandler
  77. receivedPacketHandler ackhandler.ReceivedPacketHandler
  78. streamFramer *streamFramer
  79. windowUpdateQueue *windowUpdateQueue
  80. connFlowController flowcontrol.ConnectionFlowController
  81. unpacker unpacker
  82. packer *packetPacker
  83. cryptoStreamHandler cryptoStreamHandler
  84. receivedPackets chan *receivedPacket
  85. sendingScheduled chan struct{}
  86. // closeChan is used to notify the run loop that it should terminate.
  87. closeChan chan closeError
  88. closeOnce sync.Once
  89. ctx context.Context
  90. ctxCancel context.CancelFunc
  91. // when we receive too many undecryptable packets during the handshake, we send a Public reset
  92. // but only after a time of protocol.PublicResetTimeout has passed
  93. undecryptablePackets []*receivedPacket
  94. receivedTooManyUndecrytablePacketsTime time.Time
  95. // this channel is passed to the CryptoSetup and receives the transport parameters, as soon as the peer sends them
  96. paramsChan <-chan handshake.TransportParameters
  97. // the handshakeEvent channel is passed to the CryptoSetup.
  98. // It receives when it makes sense to try decrypting undecryptable packets.
  99. handshakeEvent <-chan struct{}
  100. handshakeComplete bool
  101. receivedFirstPacket bool // since packet numbers start at 0, we can't use largestRcvdPacketNumber != 0 for this
  102. receivedFirstForwardSecurePacket bool
  103. lastRcvdPacketNumber protocol.PacketNumber
  104. // Used to calculate the next packet number from the truncated wire
  105. // representation, and sent back in public reset packets
  106. largestRcvdPacketNumber protocol.PacketNumber
  107. sessionCreationTime time.Time
  108. lastNetworkActivityTime time.Time
  109. // pacingDeadline is the time when the next packet should be sent
  110. pacingDeadline time.Time
  111. peerParams *handshake.TransportParameters
  112. timer *utils.Timer
  113. // keepAlivePingSent stores whether a Ping frame was sent to the peer or not
  114. // it is reset as soon as we receive a packet from the peer
  115. keepAlivePingSent bool
  116. logger utils.Logger
  117. }
  118. var _ Session = &session{}
  119. var _ streamSender = &session{}
  120. // newSession makes a new session
  121. func newSession(
  122. conn connection,
  123. sessionRunner sessionRunner,
  124. v protocol.VersionNumber,
  125. destConnID protocol.ConnectionID,
  126. srcConnID protocol.ConnectionID,
  127. scfg *handshake.ServerConfig,
  128. tlsConf *tls.Config,
  129. config *Config,
  130. logger utils.Logger,
  131. ) (quicSession, error) {
  132. logger.Debugf("Creating new session. Destination Connection ID: %s, Source Connection ID: %s", destConnID, srcConnID)
  133. paramsChan := make(chan handshake.TransportParameters)
  134. handshakeEvent := make(chan struct{}, 1)
  135. s := &session{
  136. conn: conn,
  137. sessionRunner: sessionRunner,
  138. srcConnID: srcConnID,
  139. destConnID: destConnID,
  140. perspective: protocol.PerspectiveServer,
  141. version: v,
  142. config: config,
  143. handshakeEvent: handshakeEvent,
  144. paramsChan: paramsChan,
  145. logger: logger,
  146. }
  147. s.preSetup()
  148. transportParams := &handshake.TransportParameters{
  149. StreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow,
  150. ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow,
  151. MaxStreams: uint32(s.config.MaxIncomingStreams),
  152. IdleTimeout: s.config.IdleTimeout,
  153. }
  154. divNonce := make([]byte, 32)
  155. if _, err := rand.Read(divNonce); err != nil {
  156. return nil, err
  157. }
  158. cs, err := newCryptoSetup(
  159. s.cryptoStream,
  160. srcConnID,
  161. s.conn.RemoteAddr(),
  162. s.version,
  163. divNonce,
  164. scfg,
  165. transportParams,
  166. s.config.Versions,
  167. s.config.AcceptCookie,
  168. paramsChan,
  169. handshakeEvent,
  170. s.logger,
  171. )
  172. if err != nil {
  173. return nil, err
  174. }
  175. s.cryptoStreamHandler = cs
  176. s.unpacker = newPacketUnpackerGQUIC(cs, s.version)
  177. s.streamsMap = newStreamsMapLegacy(s.newStream, s.config.MaxIncomingStreams, s.perspective)
  178. s.streamFramer = newStreamFramer(s.cryptoStream, s.streamsMap, s.version)
  179. s.packer = newPacketPacker(
  180. destConnID,
  181. srcConnID,
  182. 1,
  183. s.sentPacketHandler.GetPacketNumberLen,
  184. s.RemoteAddr(),
  185. nil, // no token
  186. divNonce,
  187. cs,
  188. s.streamFramer,
  189. s.perspective,
  190. s.version,
  191. )
  192. return s, s.postSetup()
  193. }
  194. // declare this as a variable, so that we can it mock it in the tests
  195. var newClientSession = func(
  196. conn connection,
  197. sessionRunner sessionRunner,
  198. hostname string,
  199. v protocol.VersionNumber,
  200. destConnID protocol.ConnectionID,
  201. srcConnID protocol.ConnectionID,
  202. tlsConf *tls.Config,
  203. config *Config,
  204. initialVersion protocol.VersionNumber,
  205. negotiatedVersions []protocol.VersionNumber, // needed for validation of the GQUIC version negotiation
  206. logger utils.Logger,
  207. ) (quicSession, error) {
  208. logger.Debugf("Creating new session. Destination Connection ID: %s, Source Connection ID: %s", destConnID, srcConnID)
  209. paramsChan := make(chan handshake.TransportParameters)
  210. handshakeEvent := make(chan struct{}, 1)
  211. s := &session{
  212. conn: conn,
  213. sessionRunner: sessionRunner,
  214. srcConnID: srcConnID,
  215. destConnID: destConnID,
  216. perspective: protocol.PerspectiveClient,
  217. version: v,
  218. config: config,
  219. handshakeEvent: handshakeEvent,
  220. paramsChan: paramsChan,
  221. logger: logger,
  222. }
  223. s.preSetup()
  224. transportParams := &handshake.TransportParameters{
  225. StreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow,
  226. ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow,
  227. MaxStreams: uint32(s.config.MaxIncomingStreams),
  228. IdleTimeout: s.config.IdleTimeout,
  229. OmitConnectionID: s.config.RequestConnectionIDOmission,
  230. }
  231. cs, err := newCryptoSetupClient(
  232. s.cryptoStream,
  233. hostname,
  234. destConnID,
  235. s.version,
  236. tlsConf,
  237. transportParams,
  238. paramsChan,
  239. handshakeEvent,
  240. initialVersion,
  241. negotiatedVersions,
  242. s.logger,
  243. )
  244. if err != nil {
  245. return nil, err
  246. }
  247. s.cryptoStreamHandler = cs
  248. s.unpacker = newPacketUnpackerGQUIC(cs, s.version)
  249. s.streamsMap = newStreamsMapLegacy(s.newStream, s.config.MaxIncomingStreams, s.perspective)
  250. s.streamFramer = newStreamFramer(s.cryptoStream, s.streamsMap, s.version)
  251. s.packer = newPacketPacker(
  252. destConnID,
  253. srcConnID,
  254. 1,
  255. s.sentPacketHandler.GetPacketNumberLen,
  256. s.RemoteAddr(),
  257. nil, // no token
  258. nil, // no diversification nonce
  259. cs,
  260. s.streamFramer,
  261. s.perspective,
  262. s.version,
  263. )
  264. return s, s.postSetup()
  265. }
  266. func newTLSServerSession(
  267. conn connection,
  268. runner sessionRunner,
  269. origConnID protocol.ConnectionID,
  270. destConnID protocol.ConnectionID,
  271. srcConnID protocol.ConnectionID,
  272. initialPacketNumber protocol.PacketNumber,
  273. config *Config,
  274. mintConf *mint.Config,
  275. peerParams *handshake.TransportParameters,
  276. logger utils.Logger,
  277. v protocol.VersionNumber,
  278. ) (quicSession, error) {
  279. handshakeEvent := make(chan struct{}, 1)
  280. s := &session{
  281. conn: conn,
  282. sessionRunner: runner,
  283. config: config,
  284. srcConnID: srcConnID,
  285. destConnID: destConnID,
  286. perspective: protocol.PerspectiveServer,
  287. version: v,
  288. handshakeEvent: handshakeEvent,
  289. logger: logger,
  290. }
  291. s.preSetup()
  292. cs, err := handshake.NewCryptoSetupTLSServer(
  293. s.cryptoStream,
  294. origConnID,
  295. mintConf,
  296. handshakeEvent,
  297. v,
  298. )
  299. if err != nil {
  300. return nil, err
  301. }
  302. s.cryptoStreamHandler = cs
  303. s.streamsMap = newStreamsMap(s, s.newFlowController, s.config.MaxIncomingStreams, s.config.MaxIncomingUniStreams, s.perspective, s.version)
  304. s.streamFramer = newStreamFramer(s.cryptoStream, s.streamsMap, s.version)
  305. s.packer = newPacketPacker(
  306. s.destConnID,
  307. s.srcConnID,
  308. initialPacketNumber,
  309. s.sentPacketHandler.GetPacketNumberLen,
  310. s.RemoteAddr(),
  311. nil, // no token
  312. nil, // no diversification nonce
  313. cs,
  314. s.streamFramer,
  315. s.perspective,
  316. s.version,
  317. )
  318. if err := s.postSetup(); err != nil {
  319. return nil, err
  320. }
  321. s.peerParams = peerParams
  322. s.processTransportParameters(peerParams)
  323. s.unpacker = newPacketUnpacker(cs, s.version)
  324. return s, nil
  325. }
  326. // declare this as a variable, such that we can it mock it in the tests
  327. var newTLSClientSession = func(
  328. conn connection,
  329. runner sessionRunner,
  330. token []byte,
  331. destConnID protocol.ConnectionID,
  332. srcConnID protocol.ConnectionID,
  333. conf *Config,
  334. mintConf *mint.Config,
  335. paramsChan <-chan handshake.TransportParameters,
  336. initialPacketNumber protocol.PacketNumber,
  337. logger utils.Logger,
  338. v protocol.VersionNumber,
  339. ) (quicSession, error) {
  340. handshakeEvent := make(chan struct{}, 1)
  341. s := &session{
  342. conn: conn,
  343. sessionRunner: runner,
  344. config: conf,
  345. srcConnID: srcConnID,
  346. destConnID: destConnID,
  347. perspective: protocol.PerspectiveClient,
  348. version: v,
  349. handshakeEvent: handshakeEvent,
  350. paramsChan: paramsChan,
  351. logger: logger,
  352. }
  353. s.preSetup()
  354. cs, err := handshake.NewCryptoSetupTLSClient(
  355. s.cryptoStream,
  356. s.destConnID,
  357. mintConf,
  358. handshakeEvent,
  359. v,
  360. )
  361. if err != nil {
  362. return nil, err
  363. }
  364. s.cryptoStreamHandler = cs
  365. s.unpacker = newPacketUnpacker(cs, s.version)
  366. s.streamsMap = newStreamsMap(s, s.newFlowController, s.config.MaxIncomingStreams, s.config.MaxIncomingUniStreams, s.perspective, s.version)
  367. s.streamFramer = newStreamFramer(s.cryptoStream, s.streamsMap, s.version)
  368. s.packer = newPacketPacker(
  369. s.destConnID,
  370. s.srcConnID,
  371. initialPacketNumber,
  372. s.sentPacketHandler.GetPacketNumberLen,
  373. s.RemoteAddr(),
  374. token,
  375. nil, // no diversification nonce
  376. cs,
  377. s.streamFramer,
  378. s.perspective,
  379. s.version,
  380. )
  381. return s, s.postSetup()
  382. }
  383. func (s *session) preSetup() {
  384. s.rttStats = &congestion.RTTStats{}
  385. s.sentPacketHandler = ackhandler.NewSentPacketHandler(s.rttStats, s.logger, s.version)
  386. s.connFlowController = flowcontrol.NewConnectionFlowController(
  387. protocol.ReceiveConnectionFlowControlWindow,
  388. protocol.ByteCount(s.config.MaxReceiveConnectionFlowControlWindow),
  389. s.onHasConnectionWindowUpdate,
  390. s.rttStats,
  391. s.logger,
  392. )
  393. s.cryptoStream = s.newCryptoStream()
  394. }
  395. func (s *session) postSetup() error {
  396. s.receivedPackets = make(chan *receivedPacket, protocol.MaxSessionUnprocessedPackets)
  397. s.closeChan = make(chan closeError, 1)
  398. s.sendingScheduled = make(chan struct{}, 1)
  399. s.undecryptablePackets = make([]*receivedPacket, 0, protocol.MaxUndecryptablePackets)
  400. s.ctx, s.ctxCancel = context.WithCancel(context.Background())
  401. s.timer = utils.NewTimer()
  402. now := time.Now()
  403. s.lastNetworkActivityTime = now
  404. s.sessionCreationTime = now
  405. s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version)
  406. s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.cryptoStream, s.connFlowController, s.packer.QueueControlFrame)
  407. return nil
  408. }
  409. // run the session main loop
  410. func (s *session) run() error {
  411. defer s.ctxCancel()
  412. go func() {
  413. if err := s.cryptoStreamHandler.HandleCryptoStream(); err != nil {
  414. s.closeLocal(err)
  415. }
  416. }()
  417. var closeErr closeError
  418. runLoop:
  419. for {
  420. // Close immediately if requested
  421. select {
  422. case closeErr = <-s.closeChan:
  423. break runLoop
  424. case _, ok := <-s.handshakeEvent:
  425. // when the handshake is completed, the channel will be closed
  426. s.handleHandshakeEvent(!ok)
  427. default:
  428. }
  429. s.maybeResetTimer()
  430. select {
  431. case closeErr = <-s.closeChan:
  432. break runLoop
  433. case <-s.timer.Chan():
  434. s.timer.SetRead()
  435. // We do all the interesting stuff after the switch statement, so
  436. // nothing to see here.
  437. case <-s.sendingScheduled:
  438. // We do all the interesting stuff after the switch statement, so
  439. // nothing to see here.
  440. case p := <-s.receivedPackets:
  441. err := s.handlePacketImpl(p)
  442. if err != nil {
  443. if qErr, ok := err.(*qerr.QuicError); ok && qErr.ErrorCode == qerr.DecryptionFailure {
  444. s.tryQueueingUndecryptablePacket(p)
  445. continue
  446. }
  447. s.closeLocal(err)
  448. continue
  449. }
  450. // This is a bit unclean, but works properly, since the packet always
  451. // begins with the public header and we never copy it.
  452. putPacketBuffer(&p.header.Raw)
  453. case p := <-s.paramsChan:
  454. s.processTransportParameters(&p)
  455. case _, ok := <-s.handshakeEvent:
  456. // when the handshake is completed, the channel will be closed
  457. s.handleHandshakeEvent(!ok)
  458. }
  459. now := time.Now()
  460. if timeout := s.sentPacketHandler.GetAlarmTimeout(); !timeout.IsZero() && timeout.Before(now) {
  461. // This could cause packets to be retransmitted.
  462. // Check it before trying to send packets.
  463. if err := s.sentPacketHandler.OnAlarm(); err != nil {
  464. s.closeLocal(err)
  465. }
  466. }
  467. var pacingDeadline time.Time
  468. if s.pacingDeadline.IsZero() { // the timer didn't have a pacing deadline set
  469. pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  470. }
  471. if s.config.KeepAlive && !s.keepAlivePingSent && s.handshakeComplete && time.Since(s.lastNetworkActivityTime) >= s.peerParams.IdleTimeout/2 {
  472. // send a PING frame since there is no activity in the session
  473. s.logger.Debugf("Sending a keep-alive ping to keep the connection alive.")
  474. s.packer.QueueControlFrame(&wire.PingFrame{})
  475. s.keepAlivePingSent = true
  476. } else if !pacingDeadline.IsZero() && now.Before(pacingDeadline) {
  477. // If we get to this point before the pacing deadline, we should wait until that deadline.
  478. // This can happen when scheduleSending is called, or a packet is received.
  479. // Set the timer and restart the run loop.
  480. s.pacingDeadline = pacingDeadline
  481. continue
  482. }
  483. if err := s.sendPackets(); err != nil {
  484. s.closeLocal(err)
  485. }
  486. if !s.receivedTooManyUndecrytablePacketsTime.IsZero() && s.receivedTooManyUndecrytablePacketsTime.Add(protocol.PublicResetTimeout).Before(now) && len(s.undecryptablePackets) != 0 {
  487. s.closeLocal(qerr.Error(qerr.DecryptionFailure, "too many undecryptable packets received"))
  488. }
  489. if !s.handshakeComplete && now.Sub(s.sessionCreationTime) >= s.config.HandshakeTimeout {
  490. s.closeLocal(qerr.Error(qerr.HandshakeTimeout, "Crypto handshake did not complete in time."))
  491. }
  492. if s.handshakeComplete && now.Sub(s.lastNetworkActivityTime) >= s.config.IdleTimeout {
  493. s.closeLocal(qerr.Error(qerr.NetworkIdleTimeout, "No recent network activity."))
  494. }
  495. }
  496. if err := s.handleCloseError(closeErr); err != nil {
  497. s.logger.Infof("Handling close error failed: %s", err)
  498. }
  499. s.logger.Infof("Connection %s closed.", s.srcConnID)
  500. s.sessionRunner.removeConnectionID(s.srcConnID)
  501. return closeErr.err
  502. }
  503. func (s *session) Context() context.Context {
  504. return s.ctx
  505. }
  506. func (s *session) ConnectionState() ConnectionState {
  507. return s.cryptoStreamHandler.ConnectionState()
  508. }
  509. func (s *session) maybeResetTimer() {
  510. var deadline time.Time
  511. if s.config.KeepAlive && s.handshakeComplete && !s.keepAlivePingSent {
  512. deadline = s.lastNetworkActivityTime.Add(s.peerParams.IdleTimeout / 2)
  513. } else {
  514. deadline = s.lastNetworkActivityTime.Add(s.config.IdleTimeout)
  515. }
  516. if ackAlarm := s.receivedPacketHandler.GetAlarmTimeout(); !ackAlarm.IsZero() {
  517. deadline = utils.MinTime(deadline, ackAlarm)
  518. }
  519. if lossTime := s.sentPacketHandler.GetAlarmTimeout(); !lossTime.IsZero() {
  520. deadline = utils.MinTime(deadline, lossTime)
  521. }
  522. if !s.handshakeComplete {
  523. handshakeDeadline := s.sessionCreationTime.Add(s.config.HandshakeTimeout)
  524. deadline = utils.MinTime(deadline, handshakeDeadline)
  525. }
  526. if !s.receivedTooManyUndecrytablePacketsTime.IsZero() {
  527. deadline = utils.MinTime(deadline, s.receivedTooManyUndecrytablePacketsTime.Add(protocol.PublicResetTimeout))
  528. }
  529. if !s.pacingDeadline.IsZero() {
  530. deadline = utils.MinTime(deadline, s.pacingDeadline)
  531. }
  532. s.timer.Reset(deadline)
  533. }
  534. func (s *session) handleHandshakeEvent(completed bool) {
  535. if !completed {
  536. s.tryDecryptingQueuedPackets()
  537. return
  538. }
  539. s.handshakeComplete = true
  540. s.handshakeEvent = nil // prevent this case from ever being selected again
  541. s.sessionRunner.onHandshakeComplete(s)
  542. // In gQUIC, the server completes the handshake first (after sending the SHLO).
  543. // In TLS 1.3, the client completes the handshake first (after sending the CFIN).
  544. // We need to make sure they learn about the peer completing the handshake,
  545. // in order to stop retransmitting handshake packets.
  546. // They will stop retransmitting handshake packets when receiving the first forward-secure packet.
  547. // We need to make sure that a retransmittable forward-secure packet is sent,
  548. // independent from the application protocol.
  549. if (!s.version.UsesTLS() && s.perspective == protocol.PerspectiveClient) ||
  550. (s.version.UsesTLS() && s.perspective == protocol.PerspectiveServer) {
  551. s.queueControlFrame(&wire.PingFrame{})
  552. s.sentPacketHandler.SetHandshakeComplete()
  553. }
  554. }
  555. func (s *session) handlePacketImpl(p *receivedPacket) error {
  556. hdr := p.header
  557. // The server can change the source connection ID with the first Handshake packet.
  558. // After this, all packets with a different source connection have to be ignored.
  559. if s.receivedFirstPacket && hdr.IsLongHeader && !hdr.SrcConnectionID.Equal(s.destConnID) {
  560. s.logger.Debugf("Dropping packet with unexpected source connection ID: %s (expected %s)", p.header.SrcConnectionID, s.destConnID)
  561. return nil
  562. }
  563. if s.perspective == protocol.PerspectiveClient {
  564. if divNonce := p.header.DiversificationNonce; len(divNonce) > 0 {
  565. if err := s.cryptoStreamHandler.(divNonceSetter).SetDiversificationNonce(divNonce); err != nil {
  566. return err
  567. }
  568. }
  569. }
  570. if p.rcvTime.IsZero() {
  571. // To simplify testing
  572. p.rcvTime = time.Now()
  573. }
  574. // Calculate packet number
  575. hdr.PacketNumber = protocol.InferPacketNumber(
  576. hdr.PacketNumberLen,
  577. s.largestRcvdPacketNumber,
  578. hdr.PacketNumber,
  579. s.version,
  580. )
  581. packet, err := s.unpacker.Unpack(hdr.Raw, hdr, p.data)
  582. if s.logger.Debug() {
  583. if err != nil {
  584. s.logger.Debugf("<- Reading packet 0x%x (%d bytes) for connection %s", hdr.PacketNumber, len(p.data)+len(hdr.Raw), hdr.DestConnectionID)
  585. } else {
  586. s.logger.Debugf("<- Reading packet 0x%x (%d bytes) for connection %s, %s", hdr.PacketNumber, len(p.data)+len(hdr.Raw), hdr.DestConnectionID, packet.encryptionLevel)
  587. }
  588. hdr.Log(s.logger)
  589. }
  590. // if the decryption failed, this might be a packet sent by an attacker
  591. if err != nil {
  592. return err
  593. }
  594. // The server can change the source connection ID with the first Handshake packet.
  595. if s.perspective == protocol.PerspectiveClient && !s.receivedFirstPacket && hdr.IsLongHeader && !hdr.SrcConnectionID.Equal(s.destConnID) {
  596. s.logger.Debugf("Received first packet. Switching destination connection ID to: %s", hdr.SrcConnectionID)
  597. s.destConnID = hdr.SrcConnectionID
  598. s.packer.ChangeDestConnectionID(s.destConnID)
  599. }
  600. s.receivedFirstPacket = true
  601. s.lastNetworkActivityTime = p.rcvTime
  602. s.keepAlivePingSent = false
  603. // In gQUIC, the server completes the handshake first (after sending the SHLO).
  604. // In TLS 1.3, the client completes the handshake first (after sending the CFIN).
  605. // We know that the peer completed the handshake as soon as we receive a forward-secure packet.
  606. if (!s.version.UsesTLS() && s.perspective == protocol.PerspectiveServer) ||
  607. (s.version.UsesTLS() && s.perspective == protocol.PerspectiveClient) {
  608. if !s.receivedFirstForwardSecurePacket && packet.encryptionLevel == protocol.EncryptionForwardSecure {
  609. s.receivedFirstForwardSecurePacket = true
  610. s.sentPacketHandler.SetHandshakeComplete()
  611. }
  612. }
  613. s.lastRcvdPacketNumber = hdr.PacketNumber
  614. // Only do this after decrypting, so we are sure the packet is not attacker-controlled
  615. s.largestRcvdPacketNumber = utils.MaxPacketNumber(s.largestRcvdPacketNumber, hdr.PacketNumber)
  616. // If this is a Retry packet, there's no need to send an ACK.
  617. // The session will be closed and recreated as soon as the crypto setup processed the HRR.
  618. if hdr.Type != protocol.PacketTypeRetry {
  619. isRetransmittable := ackhandler.HasRetransmittableFrames(packet.frames)
  620. if err := s.receivedPacketHandler.ReceivedPacket(hdr.PacketNumber, p.rcvTime, isRetransmittable); err != nil {
  621. return err
  622. }
  623. }
  624. return s.handleFrames(packet.frames, packet.encryptionLevel)
  625. }
  626. func (s *session) handleFrames(fs []wire.Frame, encLevel protocol.EncryptionLevel) error {
  627. for _, ff := range fs {
  628. var err error
  629. wire.LogFrame(s.logger, ff, false)
  630. switch frame := ff.(type) {
  631. case *wire.StreamFrame:
  632. err = s.handleStreamFrame(frame, encLevel)
  633. case *wire.AckFrame:
  634. err = s.handleAckFrame(frame, encLevel)
  635. case *wire.ConnectionCloseFrame:
  636. s.closeRemote(qerr.Error(frame.ErrorCode, frame.ReasonPhrase))
  637. case *wire.GoawayFrame:
  638. err = errors.New("unimplemented: handling GOAWAY frames")
  639. case *wire.StopWaitingFrame: // ignore STOP_WAITINGs
  640. case *wire.RstStreamFrame:
  641. err = s.handleRstStreamFrame(frame)
  642. case *wire.MaxDataFrame:
  643. s.handleMaxDataFrame(frame)
  644. case *wire.MaxStreamDataFrame:
  645. err = s.handleMaxStreamDataFrame(frame)
  646. case *wire.MaxStreamIDFrame:
  647. err = s.handleMaxStreamIDFrame(frame)
  648. case *wire.BlockedFrame:
  649. case *wire.StreamBlockedFrame:
  650. case *wire.StreamIDBlockedFrame:
  651. case *wire.StopSendingFrame:
  652. err = s.handleStopSendingFrame(frame)
  653. case *wire.PingFrame:
  654. case *wire.PathChallengeFrame:
  655. s.handlePathChallengeFrame(frame)
  656. case *wire.PathResponseFrame:
  657. // since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs
  658. err = errors.New("unexpected PATH_RESPONSE frame")
  659. default:
  660. return errors.New("Session BUG: unexpected frame type")
  661. }
  662. if err != nil {
  663. return err
  664. }
  665. }
  666. return nil
  667. }
  668. // handlePacket is called by the server with a new packet
  669. func (s *session) handlePacket(p *receivedPacket) {
  670. // Discard packets once the amount of queued packets is larger than
  671. // the channel size, protocol.MaxSessionUnprocessedPackets
  672. select {
  673. case s.receivedPackets <- p:
  674. default:
  675. }
  676. }
  677. func (s *session) handleStreamFrame(frame *wire.StreamFrame, encLevel protocol.EncryptionLevel) error {
  678. if frame.StreamID == s.version.CryptoStreamID() {
  679. if frame.FinBit {
  680. return errors.New("Received STREAM frame with FIN bit for the crypto stream")
  681. }
  682. return s.cryptoStream.handleStreamFrame(frame)
  683. } else if encLevel <= protocol.EncryptionUnencrypted {
  684. return qerr.Error(qerr.UnencryptedStreamData, fmt.Sprintf("received unencrypted stream data on stream %d", frame.StreamID))
  685. }
  686. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  687. if err != nil {
  688. return err
  689. }
  690. if str == nil {
  691. // Stream is closed and already garbage collected
  692. // ignore this StreamFrame
  693. return nil
  694. }
  695. return str.handleStreamFrame(frame)
  696. }
  697. func (s *session) handleMaxDataFrame(frame *wire.MaxDataFrame) {
  698. s.connFlowController.UpdateSendWindow(frame.ByteOffset)
  699. }
  700. func (s *session) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error {
  701. if frame.StreamID == s.version.CryptoStreamID() {
  702. s.cryptoStream.handleMaxStreamDataFrame(frame)
  703. return nil
  704. }
  705. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  706. if err != nil {
  707. return err
  708. }
  709. if str == nil {
  710. // stream is closed and already garbage collected
  711. return nil
  712. }
  713. str.handleMaxStreamDataFrame(frame)
  714. return nil
  715. }
  716. func (s *session) handleMaxStreamIDFrame(frame *wire.MaxStreamIDFrame) error {
  717. return s.streamsMap.HandleMaxStreamIDFrame(frame)
  718. }
  719. func (s *session) handleRstStreamFrame(frame *wire.RstStreamFrame) error {
  720. if frame.StreamID == s.version.CryptoStreamID() {
  721. return errors.New("Received RST_STREAM frame for the crypto stream")
  722. }
  723. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  724. if err != nil {
  725. return err
  726. }
  727. if str == nil {
  728. // stream is closed and already garbage collected
  729. return nil
  730. }
  731. return str.handleRstStreamFrame(frame)
  732. }
  733. func (s *session) handleStopSendingFrame(frame *wire.StopSendingFrame) error {
  734. if frame.StreamID == s.version.CryptoStreamID() {
  735. return errors.New("Received a STOP_SENDING frame for the crypto stream")
  736. }
  737. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  738. if err != nil {
  739. return err
  740. }
  741. if str == nil {
  742. // stream is closed and already garbage collected
  743. return nil
  744. }
  745. str.handleStopSendingFrame(frame)
  746. return nil
  747. }
  748. func (s *session) handlePathChallengeFrame(frame *wire.PathChallengeFrame) {
  749. s.queueControlFrame(&wire.PathResponseFrame{Data: frame.Data})
  750. }
  751. func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error {
  752. if err := s.sentPacketHandler.ReceivedAck(frame, s.lastRcvdPacketNumber, encLevel, s.lastNetworkActivityTime); err != nil {
  753. return err
  754. }
  755. s.receivedPacketHandler.IgnoreBelow(s.sentPacketHandler.GetLowestPacketNotConfirmedAcked())
  756. return nil
  757. }
  758. // closeLocal closes the session and send a CONNECTION_CLOSE containing the error
  759. func (s *session) closeLocal(e error) {
  760. s.closeOnce.Do(func() {
  761. s.closeChan <- closeError{err: e, sendClose: true, remote: false}
  762. })
  763. }
  764. // destroy closes the session without sending the error on the wire
  765. func (s *session) destroy(e error) {
  766. s.closeOnce.Do(func() {
  767. s.closeChan <- closeError{err: e, sendClose: false, remote: false}
  768. })
  769. }
  770. func (s *session) closeRemote(e error) {
  771. s.closeOnce.Do(func() {
  772. s.closeChan <- closeError{err: e, remote: true}
  773. })
  774. }
  775. // Close the connection. It sends a qerr.PeerGoingAway.
  776. // It waits until the run loop has stopped before returning
  777. func (s *session) Close() error {
  778. s.closeLocal(nil)
  779. <-s.ctx.Done()
  780. return nil
  781. }
  782. func (s *session) CloseWithError(code protocol.ApplicationErrorCode, e error) error {
  783. s.closeLocal(qerr.Error(qerr.ErrorCode(code), e.Error()))
  784. <-s.ctx.Done()
  785. return nil
  786. }
  787. func (s *session) handleCloseError(closeErr closeError) error {
  788. if closeErr.err == nil {
  789. closeErr.err = qerr.PeerGoingAway
  790. }
  791. var quicErr *qerr.QuicError
  792. var ok bool
  793. if quicErr, ok = closeErr.err.(*qerr.QuicError); !ok {
  794. quicErr = qerr.ToQuicError(closeErr.err)
  795. }
  796. // Don't log 'normal' reasons
  797. if quicErr.ErrorCode == qerr.PeerGoingAway || quicErr.ErrorCode == qerr.NetworkIdleTimeout {
  798. s.logger.Infof("Closing connection %s.", s.srcConnID)
  799. } else {
  800. s.logger.Errorf("Closing session with error: %s", closeErr.err.Error())
  801. }
  802. s.cryptoStream.closeForShutdown(quicErr)
  803. s.streamsMap.CloseWithError(quicErr)
  804. if !closeErr.sendClose {
  805. return nil
  806. }
  807. // If this is a remote close we're done here
  808. if closeErr.remote {
  809. return nil
  810. }
  811. if quicErr.ErrorCode == qerr.DecryptionFailure ||
  812. quicErr == handshake.ErrNSTPExperiment {
  813. return s.sendPublicReset(s.lastRcvdPacketNumber)
  814. }
  815. return s.sendConnectionClose(quicErr)
  816. }
  817. func (s *session) processTransportParameters(params *handshake.TransportParameters) {
  818. s.peerParams = params
  819. s.streamsMap.UpdateLimits(params)
  820. if params.OmitConnectionID {
  821. s.packer.SetOmitConnectionID()
  822. }
  823. if params.MaxPacketSize != 0 {
  824. s.packer.SetMaxPacketSize(params.MaxPacketSize)
  825. }
  826. s.connFlowController.UpdateSendWindow(params.ConnectionFlowControlWindow)
  827. // the crypto stream is the only open stream at this moment
  828. // so we don't need to update stream flow control windows
  829. }
  830. func (s *session) sendPackets() error {
  831. s.pacingDeadline = time.Time{}
  832. sendMode := s.sentPacketHandler.SendMode()
  833. if sendMode == ackhandler.SendNone { // shortcut: return immediately if there's nothing to send
  834. return nil
  835. }
  836. numPackets := s.sentPacketHandler.ShouldSendNumPackets()
  837. var numPacketsSent int
  838. sendLoop:
  839. for {
  840. switch sendMode {
  841. case ackhandler.SendNone:
  842. break sendLoop
  843. case ackhandler.SendAck:
  844. // We can at most send a single ACK only packet.
  845. // There will only be a new ACK after receiving new packets.
  846. // SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer.
  847. return s.maybeSendAckOnlyPacket()
  848. case ackhandler.SendTLP, ackhandler.SendRTO:
  849. if err := s.sendProbePacket(); err != nil {
  850. return err
  851. }
  852. numPacketsSent++
  853. case ackhandler.SendRetransmission:
  854. sentPacket, err := s.maybeSendRetransmission()
  855. if err != nil {
  856. return err
  857. }
  858. if sentPacket {
  859. numPacketsSent++
  860. // This can happen if a retransmission queued, but it wasn't necessary to send it.
  861. // e.g. when an Initial is queued, but we already received a packet from the server.
  862. }
  863. case ackhandler.SendAny:
  864. sentPacket, err := s.sendPacket()
  865. if err != nil {
  866. return err
  867. }
  868. if !sentPacket {
  869. break sendLoop
  870. }
  871. numPacketsSent++
  872. default:
  873. return fmt.Errorf("BUG: invalid send mode %d", sendMode)
  874. }
  875. if numPacketsSent >= numPackets {
  876. break
  877. }
  878. sendMode = s.sentPacketHandler.SendMode()
  879. }
  880. // Only start the pacing timer if we sent as many packets as we were allowed.
  881. // There will probably be more to send when calling sendPacket again.
  882. if numPacketsSent == numPackets {
  883. s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  884. }
  885. return nil
  886. }
  887. func (s *session) maybeSendAckOnlyPacket() error {
  888. ack := s.receivedPacketHandler.GetAckFrame()
  889. if ack == nil {
  890. return nil
  891. }
  892. s.packer.QueueControlFrame(ack)
  893. if s.version.UsesStopWaitingFrames() { // for gQUIC, maybe add a STOP_WAITING
  894. if swf := s.sentPacketHandler.GetStopWaitingFrame(false); swf != nil {
  895. s.packer.QueueControlFrame(swf)
  896. }
  897. }
  898. packet, err := s.packer.PackAckPacket()
  899. if err != nil {
  900. return err
  901. }
  902. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
  903. return s.sendPackedPacket(packet)
  904. }
  905. // maybeSendRetransmission sends retransmissions for at most one packet.
  906. // It takes care that Initials aren't retransmitted, if a packet from the server was already received.
  907. func (s *session) maybeSendRetransmission() (bool, error) {
  908. var retransmitPacket *ackhandler.Packet
  909. for {
  910. retransmitPacket = s.sentPacketHandler.DequeuePacketForRetransmission()
  911. if retransmitPacket == nil {
  912. return false, nil
  913. }
  914. // Don't retransmit Initial packets if we already received a response.
  915. // An Initial might have been retransmitted multiple times before we receive a response.
  916. // As soon as we receive one response, we don't need to send any more Initials.
  917. if s.receivedFirstPacket && retransmitPacket.PacketType == protocol.PacketTypeInitial {
  918. s.logger.Debugf("Skipping retransmission of packet %d. Already received a response to an Initial.", retransmitPacket.PacketNumber)
  919. continue
  920. }
  921. break
  922. }
  923. if retransmitPacket.EncryptionLevel != protocol.EncryptionForwardSecure {
  924. s.logger.Debugf("Dequeueing handshake retransmission for packet 0x%x", retransmitPacket.PacketNumber)
  925. } else {
  926. s.logger.Debugf("Dequeueing retransmission for packet 0x%x", retransmitPacket.PacketNumber)
  927. }
  928. if s.version.UsesStopWaitingFrames() {
  929. s.packer.QueueControlFrame(s.sentPacketHandler.GetStopWaitingFrame(true))
  930. }
  931. packets, err := s.packer.PackRetransmission(retransmitPacket)
  932. if err != nil {
  933. return false, err
  934. }
  935. ackhandlerPackets := make([]*ackhandler.Packet, len(packets))
  936. for i, packet := range packets {
  937. ackhandlerPackets[i] = packet.ToAckHandlerPacket()
  938. }
  939. s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, retransmitPacket.PacketNumber)
  940. for _, packet := range packets {
  941. if err := s.sendPackedPacket(packet); err != nil {
  942. return false, err
  943. }
  944. }
  945. return true, nil
  946. }
  947. func (s *session) sendProbePacket() error {
  948. p, err := s.sentPacketHandler.DequeueProbePacket()
  949. if err != nil {
  950. return err
  951. }
  952. s.logger.Debugf("Sending a retransmission for %#x as a probe packet.", p.PacketNumber)
  953. if s.version.UsesStopWaitingFrames() {
  954. s.packer.QueueControlFrame(s.sentPacketHandler.GetStopWaitingFrame(true))
  955. }
  956. packets, err := s.packer.PackRetransmission(p)
  957. if err != nil {
  958. return err
  959. }
  960. ackhandlerPackets := make([]*ackhandler.Packet, len(packets))
  961. for i, packet := range packets {
  962. ackhandlerPackets[i] = packet.ToAckHandlerPacket()
  963. }
  964. s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, p.PacketNumber)
  965. for _, packet := range packets {
  966. if err := s.sendPackedPacket(packet); err != nil {
  967. return err
  968. }
  969. }
  970. return nil
  971. }
  972. func (s *session) sendPacket() (bool, error) {
  973. if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked {
  974. s.packer.QueueControlFrame(&wire.BlockedFrame{Offset: offset})
  975. }
  976. s.windowUpdateQueue.QueueAll()
  977. if ack := s.receivedPacketHandler.GetAckFrame(); ack != nil {
  978. s.packer.QueueControlFrame(ack)
  979. if s.version.UsesStopWaitingFrames() {
  980. if swf := s.sentPacketHandler.GetStopWaitingFrame(false); swf != nil {
  981. s.packer.QueueControlFrame(swf)
  982. }
  983. }
  984. }
  985. packet, err := s.packer.PackPacket()
  986. if err != nil || packet == nil {
  987. return false, err
  988. }
  989. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
  990. if err := s.sendPackedPacket(packet); err != nil {
  991. return false, err
  992. }
  993. return true, nil
  994. }
  995. func (s *session) sendPackedPacket(packet *packedPacket) error {
  996. defer putPacketBuffer(&packet.raw)
  997. s.logPacket(packet)
  998. return s.conn.Write(packet.raw)
  999. }
  1000. func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error {
  1001. packet, err := s.packer.PackConnectionClose(&wire.ConnectionCloseFrame{
  1002. ErrorCode: quicErr.ErrorCode,
  1003. ReasonPhrase: quicErr.ErrorMessage,
  1004. })
  1005. if err != nil {
  1006. return err
  1007. }
  1008. s.logPacket(packet)
  1009. return s.conn.Write(packet.raw)
  1010. }
  1011. func (s *session) logPacket(packet *packedPacket) {
  1012. if !s.logger.Debug() {
  1013. // We don't need to allocate the slices for calling the format functions
  1014. return
  1015. }
  1016. s.logger.Debugf("-> Sending packet 0x%x (%d bytes) for connection %s, %s", packet.header.PacketNumber, len(packet.raw), s.srcConnID, packet.encryptionLevel)
  1017. packet.header.Log(s.logger)
  1018. for _, frame := range packet.frames {
  1019. wire.LogFrame(s.logger, frame, true)
  1020. }
  1021. }
  1022. // GetOrOpenStream either returns an existing stream, a newly opened stream, or nil if a stream with the provided ID is already closed.
  1023. // It is *only* needed for gQUIC's H2.
  1024. // It will be removed as soon as gQUIC moves towards the IETF H2/QUIC stream mapping.
  1025. func (s *session) GetOrOpenStream(id protocol.StreamID) (Stream, error) {
  1026. str, err := s.streamsMap.GetOrOpenSendStream(id)
  1027. if str != nil {
  1028. if bstr, ok := str.(Stream); ok {
  1029. return bstr, err
  1030. }
  1031. return nil, fmt.Errorf("Stream %d is not a bidirectional stream", id)
  1032. }
  1033. // make sure to return an actual nil value here, not an Stream with value nil
  1034. return nil, err
  1035. }
  1036. // AcceptStream returns the next stream openend by the peer
  1037. func (s *session) AcceptStream() (Stream, error) {
  1038. return s.streamsMap.AcceptStream()
  1039. }
  1040. func (s *session) AcceptUniStream() (ReceiveStream, error) {
  1041. return s.streamsMap.AcceptUniStream()
  1042. }
  1043. // OpenStream opens a stream
  1044. func (s *session) OpenStream() (Stream, error) {
  1045. return s.streamsMap.OpenStream()
  1046. }
  1047. func (s *session) OpenStreamSync() (Stream, error) {
  1048. return s.streamsMap.OpenStreamSync()
  1049. }
  1050. func (s *session) OpenUniStream() (SendStream, error) {
  1051. return s.streamsMap.OpenUniStream()
  1052. }
  1053. func (s *session) OpenUniStreamSync() (SendStream, error) {
  1054. return s.streamsMap.OpenUniStreamSync()
  1055. }
  1056. func (s *session) newStream(id protocol.StreamID) streamI {
  1057. flowController := s.newFlowController(id)
  1058. return newStream(id, s, flowController, s.version)
  1059. }
  1060. func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController {
  1061. var initialSendWindow protocol.ByteCount
  1062. if s.peerParams != nil {
  1063. initialSendWindow = s.peerParams.StreamFlowControlWindow
  1064. }
  1065. return flowcontrol.NewStreamFlowController(
  1066. id,
  1067. s.version.StreamContributesToConnectionFlowControl(id),
  1068. s.connFlowController,
  1069. protocol.ReceiveStreamFlowControlWindow,
  1070. protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow),
  1071. initialSendWindow,
  1072. s.onHasStreamWindowUpdate,
  1073. s.rttStats,
  1074. s.logger,
  1075. )
  1076. }
  1077. func (s *session) newCryptoStream() cryptoStream {
  1078. id := s.version.CryptoStreamID()
  1079. flowController := flowcontrol.NewStreamFlowController(
  1080. id,
  1081. s.version.StreamContributesToConnectionFlowControl(id),
  1082. s.connFlowController,
  1083. protocol.ReceiveStreamFlowControlWindow,
  1084. protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow),
  1085. 0,
  1086. s.onHasStreamWindowUpdate,
  1087. s.rttStats,
  1088. s.logger,
  1089. )
  1090. return newCryptoStream(s, flowController, s.version)
  1091. }
  1092. func (s *session) sendPublicReset(rejectedPacketNumber protocol.PacketNumber) error {
  1093. s.logger.Infof("Sending PUBLIC_RESET for connection %s, packet number %d", s.destConnID, rejectedPacketNumber)
  1094. return s.conn.Write(wire.WritePublicReset(s.destConnID, rejectedPacketNumber, 0))
  1095. }
  1096. // scheduleSending signals that we have data for sending
  1097. func (s *session) scheduleSending() {
  1098. select {
  1099. case s.sendingScheduled <- struct{}{}:
  1100. default:
  1101. }
  1102. }
  1103. func (s *session) tryQueueingUndecryptablePacket(p *receivedPacket) {
  1104. if s.handshakeComplete {
  1105. s.logger.Debugf("Received undecryptable packet from %s after the handshake: %#v, %d bytes data", p.remoteAddr.String(), p.header, len(p.data))
  1106. return
  1107. }
  1108. if len(s.undecryptablePackets)+1 > protocol.MaxUndecryptablePackets {
  1109. // if this is the first time the undecryptablePackets runs full, start the timer to send a Public Reset
  1110. if s.receivedTooManyUndecrytablePacketsTime.IsZero() {
  1111. s.receivedTooManyUndecrytablePacketsTime = time.Now()
  1112. s.maybeResetTimer()
  1113. }
  1114. s.logger.Infof("Dropping undecrytable packet 0x%x (undecryptable packet queue full)", p.header.PacketNumber)
  1115. return
  1116. }
  1117. s.logger.Infof("Queueing packet 0x%x for later decryption", p.header.PacketNumber)
  1118. s.undecryptablePackets = append(s.undecryptablePackets, p)
  1119. }
  1120. func (s *session) tryDecryptingQueuedPackets() {
  1121. for _, p := range s.undecryptablePackets {
  1122. s.handlePacket(p)
  1123. }
  1124. s.undecryptablePackets = s.undecryptablePackets[:0]
  1125. }
  1126. func (s *session) queueControlFrame(f wire.Frame) {
  1127. s.packer.QueueControlFrame(f)
  1128. s.scheduleSending()
  1129. }
  1130. func (s *session) onHasStreamWindowUpdate(id protocol.StreamID) {
  1131. s.windowUpdateQueue.AddStream(id)
  1132. s.scheduleSending()
  1133. }
  1134. func (s *session) onHasConnectionWindowUpdate() {
  1135. s.windowUpdateQueue.AddConnection()
  1136. s.scheduleSending()
  1137. }
  1138. func (s *session) onHasStreamData(id protocol.StreamID) {
  1139. s.streamFramer.AddActiveStream(id)
  1140. s.scheduleSending()
  1141. }
  1142. func (s *session) onStreamCompleted(id protocol.StreamID) {
  1143. if err := s.streamsMap.DeleteStream(id); err != nil {
  1144. s.closeLocal(err)
  1145. }
  1146. }
  1147. func (s *session) LocalAddr() net.Addr {
  1148. return s.conn.LocalAddr()
  1149. }
  1150. func (s *session) RemoteAddr() net.Addr {
  1151. return s.conn.RemoteAddr()
  1152. }
  1153. func (s *session) GetVersion() protocol.VersionNumber {
  1154. return s.version
  1155. }