scorch.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. // Copyright (c) 2018 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package scorch
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/RoaringBitmap/roaring"
  24. "github.com/blevesearch/bleve/analysis"
  25. "github.com/blevesearch/bleve/document"
  26. "github.com/blevesearch/bleve/index"
  27. "github.com/blevesearch/bleve/index/scorch/segment"
  28. "github.com/blevesearch/bleve/index/scorch/segment/zap"
  29. "github.com/blevesearch/bleve/index/store"
  30. "github.com/blevesearch/bleve/registry"
  31. "github.com/boltdb/bolt"
  32. )
  33. const Name = "scorch"
  34. const Version uint8 = 2
  35. var ErrClosed = fmt.Errorf("scorch closed")
  36. type Scorch struct {
  37. readOnly bool
  38. version uint8
  39. config map[string]interface{}
  40. analysisQueue *index.AnalysisQueue
  41. stats Stats
  42. nextSegmentID uint64
  43. path string
  44. unsafeBatch bool
  45. rootLock sync.RWMutex
  46. root *IndexSnapshot // holds 1 ref-count on the root
  47. rootPersisted []chan error // closed when root is persisted
  48. nextSnapshotEpoch uint64
  49. eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
  50. ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
  51. numSnapshotsToKeep int
  52. closeCh chan struct{}
  53. introductions chan *segmentIntroduction
  54. persists chan *persistIntroduction
  55. merges chan *segmentMerge
  56. introducerNotifier chan *epochWatcher
  57. revertToSnapshots chan *snapshotReversion
  58. persisterNotifier chan *epochWatcher
  59. rootBolt *bolt.DB
  60. asyncTasks sync.WaitGroup
  61. onEvent func(event Event)
  62. onAsyncError func(err error)
  63. iStats internalStats
  64. }
  65. type internalStats struct {
  66. persistEpoch uint64
  67. persistSnapshotSize uint64
  68. mergeEpoch uint64
  69. mergeSnapshotSize uint64
  70. newSegBufBytesAdded uint64
  71. newSegBufBytesRemoved uint64
  72. analysisBytesAdded uint64
  73. analysisBytesRemoved uint64
  74. }
  75. func NewScorch(storeName string,
  76. config map[string]interface{},
  77. analysisQueue *index.AnalysisQueue) (index.Index, error) {
  78. rv := &Scorch{
  79. version: Version,
  80. config: config,
  81. analysisQueue: analysisQueue,
  82. nextSnapshotEpoch: 1,
  83. closeCh: make(chan struct{}),
  84. ineligibleForRemoval: map[string]bool{},
  85. }
  86. rv.root = &IndexSnapshot{parent: rv, refs: 1, creator: "NewScorch"}
  87. ro, ok := config["read_only"].(bool)
  88. if ok {
  89. rv.readOnly = ro
  90. }
  91. ub, ok := config["unsafe_batch"].(bool)
  92. if ok {
  93. rv.unsafeBatch = ub
  94. }
  95. ecbName, ok := config["eventCallbackName"].(string)
  96. if ok {
  97. rv.onEvent = RegistryEventCallbacks[ecbName]
  98. }
  99. aecbName, ok := config["asyncErrorCallbackName"].(string)
  100. if ok {
  101. rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName]
  102. }
  103. return rv, nil
  104. }
  105. func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
  106. if s.onEvent != nil {
  107. s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
  108. }
  109. }
  110. func (s *Scorch) fireAsyncError(err error) {
  111. if s.onAsyncError != nil {
  112. s.onAsyncError(err)
  113. }
  114. atomic.AddUint64(&s.stats.TotOnErrors, 1)
  115. }
  116. func (s *Scorch) Open() error {
  117. err := s.openBolt()
  118. if err != nil {
  119. return err
  120. }
  121. s.asyncTasks.Add(1)
  122. go s.mainLoop()
  123. if !s.readOnly && s.path != "" {
  124. s.asyncTasks.Add(1)
  125. go s.persisterLoop()
  126. s.asyncTasks.Add(1)
  127. go s.mergerLoop()
  128. }
  129. return nil
  130. }
  131. func (s *Scorch) openBolt() error {
  132. var ok bool
  133. s.path, ok = s.config["path"].(string)
  134. if !ok {
  135. return fmt.Errorf("must specify path")
  136. }
  137. if s.path == "" {
  138. s.unsafeBatch = true
  139. }
  140. var rootBoltOpt *bolt.Options
  141. if s.readOnly {
  142. rootBoltOpt = &bolt.Options{
  143. ReadOnly: true,
  144. }
  145. } else {
  146. if s.path != "" {
  147. err := os.MkdirAll(s.path, 0700)
  148. if err != nil {
  149. return err
  150. }
  151. }
  152. }
  153. rootBoltPath := s.path + string(os.PathSeparator) + "root.bolt"
  154. var err error
  155. if s.path != "" {
  156. s.rootBolt, err = bolt.Open(rootBoltPath, 0600, rootBoltOpt)
  157. if err != nil {
  158. return err
  159. }
  160. // now see if there is any existing state to load
  161. err = s.loadFromBolt()
  162. if err != nil {
  163. _ = s.Close()
  164. return err
  165. }
  166. }
  167. s.introductions = make(chan *segmentIntroduction)
  168. s.persists = make(chan *persistIntroduction)
  169. s.merges = make(chan *segmentMerge)
  170. s.introducerNotifier = make(chan *epochWatcher, 1)
  171. s.revertToSnapshots = make(chan *snapshotReversion)
  172. s.persisterNotifier = make(chan *epochWatcher, 1)
  173. if !s.readOnly && s.path != "" {
  174. err := s.removeOldZapFiles() // Before persister or merger create any new files.
  175. if err != nil {
  176. _ = s.Close()
  177. return err
  178. }
  179. }
  180. s.numSnapshotsToKeep = NumSnapshotsToKeep
  181. if v, ok := s.config["numSnapshotsToKeep"]; ok {
  182. var t int
  183. if t, err = parseToInteger(v); err != nil {
  184. return fmt.Errorf("numSnapshotsToKeep parse err: %v", err)
  185. }
  186. if t > 0 {
  187. s.numSnapshotsToKeep = t
  188. }
  189. }
  190. return nil
  191. }
  192. func (s *Scorch) Close() (err error) {
  193. startTime := time.Now()
  194. defer func() {
  195. s.fireEvent(EventKindClose, time.Since(startTime))
  196. }()
  197. s.fireEvent(EventKindCloseStart, 0)
  198. // signal to async tasks we want to close
  199. close(s.closeCh)
  200. // wait for them to close
  201. s.asyncTasks.Wait()
  202. // now close the root bolt
  203. if s.rootBolt != nil {
  204. err = s.rootBolt.Close()
  205. s.rootLock.Lock()
  206. if s.root != nil {
  207. _ = s.root.DecRef()
  208. }
  209. s.root = nil
  210. s.rootLock.Unlock()
  211. }
  212. return
  213. }
  214. func (s *Scorch) Update(doc *document.Document) error {
  215. b := index.NewBatch()
  216. b.Update(doc)
  217. return s.Batch(b)
  218. }
  219. func (s *Scorch) Delete(id string) error {
  220. b := index.NewBatch()
  221. b.Delete(id)
  222. return s.Batch(b)
  223. }
  224. // Batch applices a batch of changes to the index atomically
  225. func (s *Scorch) Batch(batch *index.Batch) (err error) {
  226. start := time.Now()
  227. defer func() {
  228. s.fireEvent(EventKindBatchIntroduction, time.Since(start))
  229. }()
  230. resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
  231. var numUpdates uint64
  232. var numDeletes uint64
  233. var numPlainTextBytes uint64
  234. var ids []string
  235. for docID, doc := range batch.IndexOps {
  236. if doc != nil {
  237. // insert _id field
  238. doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil))
  239. numUpdates++
  240. numPlainTextBytes += doc.NumPlainTextBytes()
  241. } else {
  242. numDeletes++
  243. }
  244. ids = append(ids, docID)
  245. }
  246. // FIXME could sort ids list concurrent with analysis?
  247. go func() {
  248. for _, doc := range batch.IndexOps {
  249. if doc != nil {
  250. aw := index.NewAnalysisWork(s, doc, resultChan)
  251. // put the work on the queue
  252. s.analysisQueue.Queue(aw)
  253. }
  254. }
  255. }()
  256. // wait for analysis result
  257. analysisResults := make([]*index.AnalysisResult, int(numUpdates))
  258. var itemsDeQueued uint64
  259. var totalAnalysisSize int
  260. for itemsDeQueued < numUpdates {
  261. result := <-resultChan
  262. resultSize := result.Size()
  263. atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize))
  264. totalAnalysisSize += resultSize
  265. analysisResults[itemsDeQueued] = result
  266. itemsDeQueued++
  267. }
  268. close(resultChan)
  269. defer atomic.AddUint64(&s.iStats.analysisBytesRemoved, uint64(totalAnalysisSize))
  270. atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start)))
  271. indexStart := time.Now()
  272. // notify handlers that we're about to introduce a segment
  273. s.fireEvent(EventKindBatchIntroductionStart, 0)
  274. var newSegment segment.Segment
  275. var bufBytes uint64
  276. if len(analysisResults) > 0 {
  277. newSegment, bufBytes, err = zap.AnalysisResultsToSegmentBase(analysisResults, DefaultChunkFactor)
  278. if err != nil {
  279. return err
  280. }
  281. atomic.AddUint64(&s.iStats.newSegBufBytesAdded, bufBytes)
  282. } else {
  283. atomic.AddUint64(&s.stats.TotBatchesEmpty, 1)
  284. }
  285. err = s.prepareSegment(newSegment, ids, batch.InternalOps)
  286. if err != nil {
  287. if newSegment != nil {
  288. _ = newSegment.Close()
  289. }
  290. atomic.AddUint64(&s.stats.TotOnErrors, 1)
  291. } else {
  292. atomic.AddUint64(&s.stats.TotUpdates, numUpdates)
  293. atomic.AddUint64(&s.stats.TotDeletes, numDeletes)
  294. atomic.AddUint64(&s.stats.TotBatches, 1)
  295. atomic.AddUint64(&s.stats.TotIndexedPlainTextBytes, numPlainTextBytes)
  296. }
  297. atomic.AddUint64(&s.iStats.newSegBufBytesRemoved, bufBytes)
  298. atomic.AddUint64(&s.stats.TotIndexTime, uint64(time.Since(indexStart)))
  299. return err
  300. }
  301. func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
  302. internalOps map[string][]byte) error {
  303. // new introduction
  304. introduction := &segmentIntroduction{
  305. id: atomic.AddUint64(&s.nextSegmentID, 1),
  306. data: newSegment,
  307. ids: ids,
  308. obsoletes: make(map[uint64]*roaring.Bitmap),
  309. internal: internalOps,
  310. applied: make(chan error),
  311. }
  312. if !s.unsafeBatch {
  313. introduction.persisted = make(chan error, 1)
  314. }
  315. // optimistically prepare obsoletes outside of rootLock
  316. s.rootLock.RLock()
  317. root := s.root
  318. root.AddRef()
  319. s.rootLock.RUnlock()
  320. for _, seg := range root.segment {
  321. delta, err := seg.segment.DocNumbers(ids)
  322. if err != nil {
  323. return err
  324. }
  325. introduction.obsoletes[seg.id] = delta
  326. }
  327. _ = root.DecRef()
  328. introStartTime := time.Now()
  329. s.introductions <- introduction
  330. // block until this segment is applied
  331. err := <-introduction.applied
  332. if err != nil {
  333. return err
  334. }
  335. if introduction.persisted != nil {
  336. err = <-introduction.persisted
  337. }
  338. introTime := uint64(time.Since(introStartTime))
  339. atomic.AddUint64(&s.stats.TotBatchIntroTime, introTime)
  340. if atomic.LoadUint64(&s.stats.MaxBatchIntroTime) < introTime {
  341. atomic.StoreUint64(&s.stats.MaxBatchIntroTime, introTime)
  342. }
  343. return err
  344. }
  345. func (s *Scorch) SetInternal(key, val []byte) error {
  346. b := index.NewBatch()
  347. b.SetInternal(key, val)
  348. return s.Batch(b)
  349. }
  350. func (s *Scorch) DeleteInternal(key []byte) error {
  351. b := index.NewBatch()
  352. b.DeleteInternal(key)
  353. return s.Batch(b)
  354. }
  355. // Reader returns a low-level accessor on the index data. Close it to
  356. // release associated resources.
  357. func (s *Scorch) Reader() (index.IndexReader, error) {
  358. return s.currentSnapshot(), nil
  359. }
  360. func (s *Scorch) currentSnapshot() *IndexSnapshot {
  361. s.rootLock.RLock()
  362. rv := s.root
  363. rv.AddRef()
  364. s.rootLock.RUnlock()
  365. return rv
  366. }
  367. func (s *Scorch) Stats() json.Marshaler {
  368. return &s.stats
  369. }
  370. func (s *Scorch) StatsMap() map[string]interface{} {
  371. m := s.stats.ToMap()
  372. if s.path != "" {
  373. finfos, err := ioutil.ReadDir(s.path)
  374. if err == nil {
  375. var numFilesOnDisk, numBytesUsedDisk uint64
  376. for _, finfo := range finfos {
  377. if !finfo.IsDir() {
  378. numBytesUsedDisk += uint64(finfo.Size())
  379. numFilesOnDisk++
  380. }
  381. }
  382. m["CurOnDiskBytes"] = numBytesUsedDisk
  383. m["CurOnDiskFiles"] = numFilesOnDisk
  384. }
  385. }
  386. // TODO: consider one day removing these backwards compatible
  387. // names for apps using the old names
  388. m["updates"] = m["TotUpdates"]
  389. m["deletes"] = m["TotDeletes"]
  390. m["batches"] = m["TotBatches"]
  391. m["errors"] = m["TotOnErrors"]
  392. m["analysis_time"] = m["TotAnalysisTime"]
  393. m["index_time"] = m["TotIndexTime"]
  394. m["term_searchers_started"] = m["TotTermSearchersStarted"]
  395. m["term_searchers_finished"] = m["TotTermSearchersFinished"]
  396. m["num_plain_text_bytes_indexed"] = m["TotIndexedPlainTextBytes"]
  397. m["num_items_introduced"] = m["TotIntroducedItems"]
  398. m["num_items_persisted"] = m["TotPersistedItems"]
  399. m["num_bytes_used_disk"] = m["CurOnDiskBytes"]
  400. m["num_files_on_disk"] = m["CurOnDiskFiles"]
  401. m["total_compaction_written_bytes"] = m["TotFileMergeWrittenBytes"]
  402. return m
  403. }
  404. func (s *Scorch) Analyze(d *document.Document) *index.AnalysisResult {
  405. rv := &index.AnalysisResult{
  406. Document: d,
  407. Analyzed: make([]analysis.TokenFrequencies, len(d.Fields)+len(d.CompositeFields)),
  408. Length: make([]int, len(d.Fields)+len(d.CompositeFields)),
  409. }
  410. for i, field := range d.Fields {
  411. if field.Options().IsIndexed() {
  412. fieldLength, tokenFreqs := field.Analyze()
  413. rv.Analyzed[i] = tokenFreqs
  414. rv.Length[i] = fieldLength
  415. if len(d.CompositeFields) > 0 {
  416. // see if any of the composite fields need this
  417. for _, compositeField := range d.CompositeFields {
  418. compositeField.Compose(field.Name(), fieldLength, tokenFreqs)
  419. }
  420. }
  421. }
  422. }
  423. return rv
  424. }
  425. func (s *Scorch) Advanced() (store.KVStore, error) {
  426. return nil, nil
  427. }
  428. func (s *Scorch) AddEligibleForRemoval(epoch uint64) {
  429. s.rootLock.Lock()
  430. if s.root == nil || s.root.epoch != epoch {
  431. s.eligibleForRemoval = append(s.eligibleForRemoval, epoch)
  432. }
  433. s.rootLock.Unlock()
  434. }
  435. func (s *Scorch) MemoryUsed() uint64 {
  436. indexSnapshot := s.currentSnapshot()
  437. defer func() {
  438. _ = indexSnapshot.Close()
  439. }()
  440. // Account for current root snapshot overhead
  441. memUsed := uint64(indexSnapshot.Size())
  442. // Account for snapshot that the persister may be working on
  443. persistEpoch := atomic.LoadUint64(&s.iStats.persistEpoch)
  444. persistSnapshotSize := atomic.LoadUint64(&s.iStats.persistSnapshotSize)
  445. if persistEpoch != 0 && indexSnapshot.epoch > persistEpoch {
  446. // the snapshot that the persister is working on isn't the same as
  447. // the current snapshot
  448. memUsed += persistSnapshotSize
  449. }
  450. // Account for snapshot that the merger may be working on
  451. mergeEpoch := atomic.LoadUint64(&s.iStats.mergeEpoch)
  452. mergeSnapshotSize := atomic.LoadUint64(&s.iStats.mergeSnapshotSize)
  453. if mergeEpoch != 0 && indexSnapshot.epoch > mergeEpoch {
  454. // the snapshot that the merger is working on isn't the same as
  455. // the current snapshot
  456. memUsed += mergeSnapshotSize
  457. }
  458. memUsed += (atomic.LoadUint64(&s.iStats.newSegBufBytesAdded) -
  459. atomic.LoadUint64(&s.iStats.newSegBufBytesRemoved))
  460. memUsed += (atomic.LoadUint64(&s.iStats.analysisBytesAdded) -
  461. atomic.LoadUint64(&s.iStats.analysisBytesRemoved))
  462. return memUsed
  463. }
  464. func (s *Scorch) markIneligibleForRemoval(filename string) {
  465. s.rootLock.Lock()
  466. s.ineligibleForRemoval[filename] = true
  467. s.rootLock.Unlock()
  468. }
  469. func (s *Scorch) unmarkIneligibleForRemoval(filename string) {
  470. s.rootLock.Lock()
  471. delete(s.ineligibleForRemoval, filename)
  472. s.rootLock.Unlock()
  473. }
  474. func init() {
  475. registry.RegisterIndexType(Name, NewScorch)
  476. }
  477. func parseToInteger(i interface{}) (int, error) {
  478. switch v := i.(type) {
  479. case float64:
  480. return int(v), nil
  481. case int:
  482. return v, nil
  483. default:
  484. return 0, fmt.Errorf("expects int or float64 value")
  485. }
  486. }