websocket.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package gold
  2. import (
  3. "io"
  4. "log"
  5. "net/http"
  6. "strings"
  7. "sync"
  8. "golang.org/x/net/websocket"
  9. )
  10. // type wsConn struct {
  11. // subbed bool
  12. // uuid string
  13. // }
  14. var (
  15. websocketSubs = map[string]map[*websocket.Conn]string{}
  16. websocketSubsL = new(sync.RWMutex)
  17. )
  18. func onDeleteURI(uri string) {
  19. websocketPublish(uri)
  20. }
  21. func onUpdateURI(uri string) {
  22. websocketPublish(uri)
  23. }
  24. // Handles each websocket connection
  25. func websocketHandler(ws *websocket.Conn) {
  26. // @@TODO switch to server logging
  27. // log.Println("opened via:", ws.RemoteAddr())
  28. uris := map[string]bool{}
  29. message := ""
  30. for {
  31. err := websocket.Message.Receive(ws, &message)
  32. if err == io.EOF {
  33. break
  34. }
  35. if err != nil {
  36. log.Println(err)
  37. break
  38. }
  39. argv := strings.Split(message, " ")
  40. if len(argv) < 2 {
  41. argv = append(argv, "")
  42. }
  43. cmd, uri := argv[0], argv[1]
  44. switch cmd {
  45. case "ping":
  46. websocket.Message.Send(ws, "pong")
  47. case "sub":
  48. uris[uri] = true
  49. websocketSubsL.Lock()
  50. if _, ex := websocketSubs[uri]; !ex {
  51. websocketSubs[uri] = map[*websocket.Conn]string{}
  52. }
  53. websocketSubs[uri][ws] = NewUUID()
  54. websocketSubsL.Unlock()
  55. websocket.Message.Send(ws, "ack "+uri)
  56. case "unsub":
  57. websocketSubsL.Lock()
  58. uris[uri] = false
  59. if len(websocketSubs[uri][ws]) > 0 {
  60. delete(websocketSubs[uri], ws)
  61. }
  62. websocketSubsL.Unlock()
  63. websocket.Message.Send(ws, "removed "+uri)
  64. default:
  65. log.Println("invalid message:", message)
  66. }
  67. }
  68. websocketSubsL.Lock()
  69. for k := range uris {
  70. delete(websocketSubs[k], ws)
  71. }
  72. websocketSubsL.Unlock()
  73. // @@TODO switch to server logging
  74. // log.Println("closed via:", ws.RemoteAddr())
  75. }
  76. func websocketPublish(uri string, uuid ...string) {
  77. websocketSubsL.RLock()
  78. subs := websocketSubs[uri]
  79. websocketSubsL.RUnlock()
  80. for k := range subs {
  81. uuidMatch := true
  82. // log.Println(uuid)
  83. if len(uuid) > 0 && subs[k] != uuid[0] {
  84. uuidMatch = false
  85. }
  86. if uuidMatch {
  87. err := websocket.Message.Send(k, "pub "+uri)
  88. if err != nil {
  89. log.Println(err)
  90. }
  91. }
  92. }
  93. }
  94. // Converts an HTTP request to a websocket server
  95. func websocketServe(w http.ResponseWriter, req *http.Request) {
  96. websocket.Handler(websocketHandler).ServeHTTP(w, req)
  97. }
  98. // Checks whether an HTTP request looks like websocket
  99. func websocketUpgrade(r *http.Request) bool {
  100. if r == nil {
  101. return false
  102. }
  103. if strings.ToLower(r.Header.Get("Connection")) != "upgrade" {
  104. return false
  105. }
  106. if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
  107. return false
  108. }
  109. return true
  110. }