index_alias_impl.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. // Copyright (c) 2014 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bleve
  15. import (
  16. "context"
  17. "sort"
  18. "sync"
  19. "time"
  20. "github.com/blevesearch/bleve/document"
  21. "github.com/blevesearch/bleve/index"
  22. "github.com/blevesearch/bleve/index/store"
  23. "github.com/blevesearch/bleve/mapping"
  24. "github.com/blevesearch/bleve/search"
  25. )
  26. type indexAliasImpl struct {
  27. name string
  28. indexes []Index
  29. mutex sync.RWMutex
  30. open bool
  31. }
  32. // NewIndexAlias creates a new IndexAlias over the provided
  33. // Index objects.
  34. func NewIndexAlias(indexes ...Index) *indexAliasImpl {
  35. return &indexAliasImpl{
  36. name: "alias",
  37. indexes: indexes,
  38. open: true,
  39. }
  40. }
  41. func (i *indexAliasImpl) isAliasToSingleIndex() error {
  42. if len(i.indexes) < 1 {
  43. return ErrorAliasEmpty
  44. } else if len(i.indexes) > 1 {
  45. return ErrorAliasMulti
  46. }
  47. return nil
  48. }
  49. func (i *indexAliasImpl) Index(id string, data interface{}) error {
  50. i.mutex.RLock()
  51. defer i.mutex.RUnlock()
  52. if !i.open {
  53. return ErrorIndexClosed
  54. }
  55. err := i.isAliasToSingleIndex()
  56. if err != nil {
  57. return err
  58. }
  59. return i.indexes[0].Index(id, data)
  60. }
  61. func (i *indexAliasImpl) Delete(id string) error {
  62. i.mutex.RLock()
  63. defer i.mutex.RUnlock()
  64. if !i.open {
  65. return ErrorIndexClosed
  66. }
  67. err := i.isAliasToSingleIndex()
  68. if err != nil {
  69. return err
  70. }
  71. return i.indexes[0].Delete(id)
  72. }
  73. func (i *indexAliasImpl) Batch(b *Batch) error {
  74. i.mutex.RLock()
  75. defer i.mutex.RUnlock()
  76. if !i.open {
  77. return ErrorIndexClosed
  78. }
  79. err := i.isAliasToSingleIndex()
  80. if err != nil {
  81. return err
  82. }
  83. return i.indexes[0].Batch(b)
  84. }
  85. func (i *indexAliasImpl) Document(id string) (*document.Document, error) {
  86. i.mutex.RLock()
  87. defer i.mutex.RUnlock()
  88. if !i.open {
  89. return nil, ErrorIndexClosed
  90. }
  91. err := i.isAliasToSingleIndex()
  92. if err != nil {
  93. return nil, err
  94. }
  95. return i.indexes[0].Document(id)
  96. }
  97. func (i *indexAliasImpl) DocCount() (uint64, error) {
  98. i.mutex.RLock()
  99. defer i.mutex.RUnlock()
  100. rv := uint64(0)
  101. if !i.open {
  102. return 0, ErrorIndexClosed
  103. }
  104. for _, index := range i.indexes {
  105. otherCount, err := index.DocCount()
  106. if err == nil {
  107. rv += otherCount
  108. }
  109. // tolerate errors to produce partial counts
  110. }
  111. return rv, nil
  112. }
  113. func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) {
  114. return i.SearchInContext(context.Background(), req)
  115. }
  116. func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
  117. i.mutex.RLock()
  118. defer i.mutex.RUnlock()
  119. if !i.open {
  120. return nil, ErrorIndexClosed
  121. }
  122. if len(i.indexes) < 1 {
  123. return nil, ErrorAliasEmpty
  124. }
  125. // short circuit the simple case
  126. if len(i.indexes) == 1 {
  127. return i.indexes[0].SearchInContext(ctx, req)
  128. }
  129. return MultiSearch(ctx, req, i.indexes...)
  130. }
  131. func (i *indexAliasImpl) Fields() ([]string, error) {
  132. i.mutex.RLock()
  133. defer i.mutex.RUnlock()
  134. if !i.open {
  135. return nil, ErrorIndexClosed
  136. }
  137. err := i.isAliasToSingleIndex()
  138. if err != nil {
  139. return nil, err
  140. }
  141. return i.indexes[0].Fields()
  142. }
  143. func (i *indexAliasImpl) FieldDict(field string) (index.FieldDict, error) {
  144. i.mutex.RLock()
  145. if !i.open {
  146. i.mutex.RUnlock()
  147. return nil, ErrorIndexClosed
  148. }
  149. err := i.isAliasToSingleIndex()
  150. if err != nil {
  151. i.mutex.RUnlock()
  152. return nil, err
  153. }
  154. fieldDict, err := i.indexes[0].FieldDict(field)
  155. if err != nil {
  156. i.mutex.RUnlock()
  157. return nil, err
  158. }
  159. return &indexAliasImplFieldDict{
  160. index: i,
  161. fieldDict: fieldDict,
  162. }, nil
  163. }
  164. func (i *indexAliasImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) {
  165. i.mutex.RLock()
  166. if !i.open {
  167. i.mutex.RUnlock()
  168. return nil, ErrorIndexClosed
  169. }
  170. err := i.isAliasToSingleIndex()
  171. if err != nil {
  172. i.mutex.RUnlock()
  173. return nil, err
  174. }
  175. fieldDict, err := i.indexes[0].FieldDictRange(field, startTerm, endTerm)
  176. if err != nil {
  177. i.mutex.RUnlock()
  178. return nil, err
  179. }
  180. return &indexAliasImplFieldDict{
  181. index: i,
  182. fieldDict: fieldDict,
  183. }, nil
  184. }
  185. func (i *indexAliasImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) {
  186. i.mutex.RLock()
  187. if !i.open {
  188. i.mutex.RUnlock()
  189. return nil, ErrorIndexClosed
  190. }
  191. err := i.isAliasToSingleIndex()
  192. if err != nil {
  193. i.mutex.RUnlock()
  194. return nil, err
  195. }
  196. fieldDict, err := i.indexes[0].FieldDictPrefix(field, termPrefix)
  197. if err != nil {
  198. i.mutex.RUnlock()
  199. return nil, err
  200. }
  201. return &indexAliasImplFieldDict{
  202. index: i,
  203. fieldDict: fieldDict,
  204. }, nil
  205. }
  206. func (i *indexAliasImpl) Close() error {
  207. i.mutex.Lock()
  208. defer i.mutex.Unlock()
  209. i.open = false
  210. return nil
  211. }
  212. func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
  213. i.mutex.RLock()
  214. defer i.mutex.RUnlock()
  215. if !i.open {
  216. return nil
  217. }
  218. err := i.isAliasToSingleIndex()
  219. if err != nil {
  220. return nil
  221. }
  222. return i.indexes[0].Mapping()
  223. }
  224. func (i *indexAliasImpl) Stats() *IndexStat {
  225. i.mutex.RLock()
  226. defer i.mutex.RUnlock()
  227. if !i.open {
  228. return nil
  229. }
  230. err := i.isAliasToSingleIndex()
  231. if err != nil {
  232. return nil
  233. }
  234. return i.indexes[0].Stats()
  235. }
  236. func (i *indexAliasImpl) StatsMap() map[string]interface{} {
  237. i.mutex.RLock()
  238. defer i.mutex.RUnlock()
  239. if !i.open {
  240. return nil
  241. }
  242. err := i.isAliasToSingleIndex()
  243. if err != nil {
  244. return nil
  245. }
  246. return i.indexes[0].StatsMap()
  247. }
  248. func (i *indexAliasImpl) GetInternal(key []byte) ([]byte, error) {
  249. i.mutex.RLock()
  250. defer i.mutex.RUnlock()
  251. if !i.open {
  252. return nil, ErrorIndexClosed
  253. }
  254. err := i.isAliasToSingleIndex()
  255. if err != nil {
  256. return nil, err
  257. }
  258. return i.indexes[0].GetInternal(key)
  259. }
  260. func (i *indexAliasImpl) SetInternal(key, val []byte) error {
  261. i.mutex.RLock()
  262. defer i.mutex.RUnlock()
  263. if !i.open {
  264. return ErrorIndexClosed
  265. }
  266. err := i.isAliasToSingleIndex()
  267. if err != nil {
  268. return err
  269. }
  270. return i.indexes[0].SetInternal(key, val)
  271. }
  272. func (i *indexAliasImpl) DeleteInternal(key []byte) error {
  273. i.mutex.RLock()
  274. defer i.mutex.RUnlock()
  275. if !i.open {
  276. return ErrorIndexClosed
  277. }
  278. err := i.isAliasToSingleIndex()
  279. if err != nil {
  280. return err
  281. }
  282. return i.indexes[0].DeleteInternal(key)
  283. }
  284. func (i *indexAliasImpl) Advanced() (index.Index, store.KVStore, error) {
  285. i.mutex.RLock()
  286. defer i.mutex.RUnlock()
  287. if !i.open {
  288. return nil, nil, ErrorIndexClosed
  289. }
  290. err := i.isAliasToSingleIndex()
  291. if err != nil {
  292. return nil, nil, err
  293. }
  294. return i.indexes[0].Advanced()
  295. }
  296. func (i *indexAliasImpl) Add(indexes ...Index) {
  297. i.mutex.Lock()
  298. defer i.mutex.Unlock()
  299. i.indexes = append(i.indexes, indexes...)
  300. }
  301. func (i *indexAliasImpl) removeSingle(index Index) {
  302. for pos, in := range i.indexes {
  303. if in == index {
  304. i.indexes = append(i.indexes[:pos], i.indexes[pos+1:]...)
  305. break
  306. }
  307. }
  308. }
  309. func (i *indexAliasImpl) Remove(indexes ...Index) {
  310. i.mutex.Lock()
  311. defer i.mutex.Unlock()
  312. for _, in := range indexes {
  313. i.removeSingle(in)
  314. }
  315. }
  316. func (i *indexAliasImpl) Swap(in, out []Index) {
  317. i.mutex.Lock()
  318. defer i.mutex.Unlock()
  319. // add
  320. i.indexes = append(i.indexes, in...)
  321. // delete
  322. for _, ind := range out {
  323. i.removeSingle(ind)
  324. }
  325. }
  326. // createChildSearchRequest creates a separate
  327. // request from the original
  328. // For now, avoid data race on req structure.
  329. // TODO disable highlight/field load on child
  330. // requests, and add code to do this only on
  331. // the actual final results.
  332. // Perhaps that part needs to be optional,
  333. // could be slower in remote usages.
  334. func createChildSearchRequest(req *SearchRequest) *SearchRequest {
  335. rv := SearchRequest{
  336. Query: req.Query,
  337. Size: req.Size + req.From,
  338. From: 0,
  339. Highlight: req.Highlight,
  340. Fields: req.Fields,
  341. Facets: req.Facets,
  342. Explain: req.Explain,
  343. Sort: req.Sort.Copy(),
  344. IncludeLocations: req.IncludeLocations,
  345. Score: req.Score,
  346. SearchAfter: req.SearchAfter,
  347. SearchBefore: req.SearchBefore,
  348. }
  349. return &rv
  350. }
  351. type asyncSearchResult struct {
  352. Name string
  353. Result *SearchResult
  354. Err error
  355. }
  356. // MultiSearch executes a SearchRequest across multiple Index objects,
  357. // then merges the results. The indexes must honor any ctx deadline.
  358. func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
  359. searchStart := time.Now()
  360. asyncResults := make(chan *asyncSearchResult, len(indexes))
  361. var reverseQueryExecution bool
  362. if req.SearchBefore != nil {
  363. reverseQueryExecution = true
  364. req.Sort.Reverse()
  365. req.SearchAfter = req.SearchBefore
  366. req.SearchBefore = nil
  367. }
  368. // run search on each index in separate go routine
  369. var waitGroup sync.WaitGroup
  370. var searchChildIndex = func(in Index, childReq *SearchRequest) {
  371. rv := asyncSearchResult{Name: in.Name()}
  372. rv.Result, rv.Err = in.SearchInContext(ctx, childReq)
  373. asyncResults <- &rv
  374. waitGroup.Done()
  375. }
  376. waitGroup.Add(len(indexes))
  377. for _, in := range indexes {
  378. go searchChildIndex(in, createChildSearchRequest(req))
  379. }
  380. // on another go routine, close after finished
  381. go func() {
  382. waitGroup.Wait()
  383. close(asyncResults)
  384. }()
  385. var sr *SearchResult
  386. indexErrors := make(map[string]error)
  387. for asr := range asyncResults {
  388. if asr.Err == nil {
  389. if sr == nil {
  390. // first result
  391. sr = asr.Result
  392. } else {
  393. // merge with previous
  394. sr.Merge(asr.Result)
  395. }
  396. } else {
  397. indexErrors[asr.Name] = asr.Err
  398. }
  399. }
  400. // merge just concatenated all the hits
  401. // now lets clean it up
  402. // handle case where no results were successful
  403. if sr == nil {
  404. sr = &SearchResult{
  405. Status: &SearchStatus{
  406. Errors: make(map[string]error),
  407. },
  408. }
  409. }
  410. // sort all hits with the requested order
  411. if len(req.Sort) > 0 {
  412. sorter := newSearchHitSorter(req.Sort, sr.Hits)
  413. sort.Sort(sorter)
  414. }
  415. // now skip over the correct From
  416. if req.From > 0 && len(sr.Hits) > req.From {
  417. sr.Hits = sr.Hits[req.From:]
  418. } else if req.From > 0 {
  419. sr.Hits = search.DocumentMatchCollection{}
  420. }
  421. // now trim to the correct size
  422. if req.Size > 0 && len(sr.Hits) > req.Size {
  423. sr.Hits = sr.Hits[0:req.Size]
  424. }
  425. // fix up facets
  426. for name, fr := range req.Facets {
  427. sr.Facets.Fixup(name, fr.Size)
  428. }
  429. if reverseQueryExecution {
  430. // reverse the sort back to the original
  431. req.Sort.Reverse()
  432. // resort using the original order
  433. mhs := newSearchHitSorter(req.Sort, sr.Hits)
  434. sort.Sort(mhs)
  435. // reset request
  436. req.SearchBefore = req.SearchAfter
  437. req.SearchAfter = nil
  438. }
  439. // fix up original request
  440. sr.Request = req
  441. searchDuration := time.Since(searchStart)
  442. sr.Took = searchDuration
  443. // fix up errors
  444. if len(indexErrors) > 0 {
  445. if sr.Status.Errors == nil {
  446. sr.Status.Errors = make(map[string]error)
  447. }
  448. for indexName, indexErr := range indexErrors {
  449. sr.Status.Errors[indexName] = indexErr
  450. sr.Status.Total++
  451. sr.Status.Failed++
  452. }
  453. }
  454. return sr, nil
  455. }
  456. func (i *indexAliasImpl) NewBatch() *Batch {
  457. i.mutex.RLock()
  458. defer i.mutex.RUnlock()
  459. if !i.open {
  460. return nil
  461. }
  462. err := i.isAliasToSingleIndex()
  463. if err != nil {
  464. return nil
  465. }
  466. return i.indexes[0].NewBatch()
  467. }
  468. func (i *indexAliasImpl) Name() string {
  469. return i.name
  470. }
  471. func (i *indexAliasImpl) SetName(name string) {
  472. i.name = name
  473. }
  474. type indexAliasImplFieldDict struct {
  475. index *indexAliasImpl
  476. fieldDict index.FieldDict
  477. }
  478. func (f *indexAliasImplFieldDict) Next() (*index.DictEntry, error) {
  479. return f.fieldDict.Next()
  480. }
  481. func (f *indexAliasImplFieldDict) Close() error {
  482. defer f.index.mutex.RUnlock()
  483. return f.fieldDict.Close()
  484. }