Browse Source

account for memory allocated during text analysis

Marty Schoch 1 year ago
parent
commit
3c30bc2d47
3 changed files with 70 additions and 0 deletions
  1. 41 0
      analysis/freq.go
  2. 19 0
      index/analysis.go
  3. 10 0
      index/scorch/scorch.go

+ 41 - 0
analysis/freq.go

@@ -14,6 +14,22 @@
 
 package analysis
 
+import (
+	"reflect"
+
+	"github.com/blevesearch/bleve/size"
+)
+
+var reflectStaticSizeTokenLocation int
+var reflectStaticSizeTokenFreq int
+
+func init() {
+	var tl TokenLocation
+	reflectStaticSizeTokenLocation = int(reflect.TypeOf(tl).Size())
+	var tf TokenFreq
+	reflectStaticSizeTokenFreq = int(reflect.TypeOf(tf).Size())
+}
+
 // TokenLocation represents one occurrence of a term at a particular location in
 // a field. Start, End and Position have the same meaning as in analysis.Token.
 // Field and ArrayPositions identify the field value in the source document.
@@ -26,6 +42,12 @@ type TokenLocation struct {
 	Position       int
 }
 
+func (tl *TokenLocation) Size() int {
+	rv := reflectStaticSizeTokenLocation
+	rv += len(tl.ArrayPositions) * size.SizeOfUint64
+	return rv
+}
+
 // TokenFreq represents all the occurrences of a term in all fields of a
 // document.
 type TokenFreq struct {
@@ -34,6 +56,15 @@ type TokenFreq struct {
 	frequency int
 }
 
+func (tf *TokenFreq) Size() int {
+	rv := reflectStaticSizeTokenFreq
+	rv += len(tf.Term)
+	for _, loc := range tf.Locations {
+		rv += loc.Size()
+	}
+	return rv
+}
+
 func (tf *TokenFreq) Frequency() int {
 	return tf.frequency
 }
@@ -42,6 +73,16 @@ func (tf *TokenFreq) Frequency() int {
 // fields.
 type TokenFrequencies map[string]*TokenFreq
 
+func (tfs TokenFrequencies) Size() int {
+	rv := size.SizeOfMap
+	rv += len(tfs) * (size.SizeOfString + size.SizeOfPtr)
+	for k, v := range tfs {
+		rv += len(k)
+		rv += v.Size()
+	}
+	return rv
+}
+
 func (tfs TokenFrequencies) MergeAll(remoteField string, other TokenFrequencies) {
 	// walk the new token frequencies
 	for tfk, tf := range other {

+ 19 - 0
index/analysis.go

@@ -15,10 +15,20 @@
 package index
 
 import (
+	"reflect"
+
 	"github.com/blevesearch/bleve/analysis"
 	"github.com/blevesearch/bleve/document"
+	"github.com/blevesearch/bleve/size"
 )
 
+var reflectStaticSizeAnalysisResult int
+
+func init() {
+	var ar AnalysisResult
+	reflectStaticSizeAnalysisResult = int(reflect.TypeOf(ar).Size())
+}
+
 type IndexRow interface {
 	KeySize() int
 	KeyTo([]byte) (int, error)
@@ -39,6 +49,15 @@ type AnalysisResult struct {
 	Length   []int
 }
 
+func (a *AnalysisResult) Size() int {
+	rv := reflectStaticSizeAnalysisResult
+	for _, analyzedI := range a.Analyzed {
+		rv += analyzedI.Size()
+	}
+	rv += len(a.Length) * size.SizeOfInt
+	return rv
+}
+
 type AnalysisWork struct {
 	i  Index
 	d  *document.Document

+ 10 - 0
index/scorch/scorch.go

@@ -82,6 +82,8 @@ type internalStats struct {
 	mergeSnapshotSize     uint64
 	newSegBufBytesAdded   uint64
 	newSegBufBytesRemoved uint64
+	analysisBytesAdded    uint64
+	analysisBytesRemoved  uint64
 }
 
 func NewScorch(storeName string,
@@ -295,12 +297,17 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
 	// wait for analysis result
 	analysisResults := make([]*index.AnalysisResult, int(numUpdates))
 	var itemsDeQueued uint64
+	var totalAnalysisSize int
 	for itemsDeQueued < numUpdates {
 		result := <-resultChan
+		resultSize := result.Size()
+		atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize))
+		totalAnalysisSize += resultSize
 		analysisResults[itemsDeQueued] = result
 		itemsDeQueued++
 	}
 	close(resultChan)
+	defer atomic.AddUint64(&s.iStats.analysisBytesRemoved, uint64(totalAnalysisSize))
 
 	atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start)))
 
@@ -531,6 +538,9 @@ func (s *Scorch) MemoryUsed() uint64 {
 	memUsed += (atomic.LoadUint64(&s.iStats.newSegBufBytesAdded) -
 		atomic.LoadUint64(&s.iStats.newSegBufBytesRemoved))
 
+	memUsed += (atomic.LoadUint64(&s.iStats.analysisBytesAdded) -
+		atomic.LoadUint64(&s.iStats.analysisBytesRemoved))
+
 	return memUsed
 }