Browse Source

Changing the CopyTo method signature to accept a
index.Directory interface implementation for the
destination.

Add a default FileSystemDirectory implementation
for the index.Directory interface.

Sreekanth Sivasankaran 1 year ago
parent
commit
c13a1403ac
6 changed files with 103 additions and 89 deletions
  1. 8 10
      index.go
  2. 41 21
      index/scorch/persister.go
  3. 7 7
      index/scorch/snapshot_index.go
  4. 18 3
      index_impl.go
  5. 6 5
      index_meta.go
  6. 23 43
      index_test.go

+ 8 - 10
index.go

@@ -16,7 +16,6 @@ package bleve
 
 import (
 	"context"
-	"io"
 
 	"github.com/blevesearch/bleve/v2/index/upsidedown"
 
@@ -310,14 +309,13 @@ func NewBuilder(path string, mapping mapping.IndexMapping, config map[string]int
 }
 
 // IndexCopyable is an index which supports an online copy operation
-// of the index. This is an experimental api and could potentially get
-// changed or deprecated in future.
+// of the index.
 type IndexCopyable interface {
-	// CopyTo creates a fully functional copy of the index using the
-	// specified destination writer builder callback.
-	// The index implementation would trigger the given builder callback for
-	// each index file and it is the callback implementation's responsibilty
-	// to build and return the respective target writer for the file given
-	// in the callback.
-	CopyTo(func(string) io.WriteCloser) error
+	// CopyTo creates a fully functional copy of the index at the
+	// specified destination directory implementation.
+	CopyTo(d index.Directory) error
 }
+
+// FileSystemDirectory is the default implementation for the
+// index.Directory interface.
+type FileSystemDirectory string

+ 41 - 21
index/scorch/persister.go

@@ -428,20 +428,26 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
 	return true, nil
 }
 
-func copyFile(src string, dest io.WriteCloser) (int64, error) {
-	if dest == nil {
-		return 0, fmt.Errorf("invalid writer for file: %s", src)
+func copyToDirectory(srcPath string, d index.Directory) (int64, error) {
+	if d == nil {
+		return 0, nil
+	}
+
+	dest, err := d.GetWriter(filepath.Join("store", filepath.Base(srcPath)))
+	if err != nil {
+		return 0, fmt.Errorf("GetWriter err: %v", err)
 	}
-	sourceFileStat, err := os.Stat(src)
+
+	sourceFileStat, err := os.Stat(srcPath)
 	if err != nil {
 		return 0, err
 	}
 
 	if !sourceFileStat.Mode().IsRegular() {
-		return 0, fmt.Errorf("%s is not a regular file", src)
+		return 0, fmt.Errorf("%s is not a regular file", srcPath)
 	}
 
-	source, err := os.Open(src)
+	source, err := os.Open(srcPath)
 	if err != nil {
 		return 0, err
 	}
@@ -450,8 +456,30 @@ func copyFile(src string, dest io.WriteCloser) (int64, error) {
 	return io.Copy(dest, source)
 }
 
+func persistToDirectory(seg segment.UnpersistedSegment, d index.Directory,
+	path string) error {
+	if d == nil {
+		return seg.Persist(path)
+	}
+
+	sg, ok := seg.(io.WriterTo)
+	if !ok {
+		return fmt.Errorf("no io.WriterTo segment implementation found")
+	}
+
+	w, err := d.GetWriter(filepath.Join("store", filepath.Base(path)))
+	if err != nil {
+		return err
+	}
+
+	_, err = sg.WriteTo(w)
+	w.Close()
+
+	return err
+}
+
 func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
-	segPlugin SegmentPlugin, getWriter func(string) io.WriteCloser) (
+	segPlugin SegmentPlugin, d index.Directory) (
 	[]string, map[uint64]string, error) {
 	snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
 	if err != nil {
@@ -505,11 +533,9 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
 		switch seg := segmentSnapshot.segment.(type) {
 		case segment.PersistedSegment:
 			segPath := seg.Path()
-			if getWriter != nil {
-				_, err := copyFile(segPath, getWriter(segPath))
-				if err != nil {
-					return nil, nil, fmt.Errorf("segment: %s copy err: %v", segPath, err)
-				}
+			_, err = copyToDirectory(segPath, d)
+			if err != nil {
+				return nil, nil, fmt.Errorf("segment: %s copy err: %v", segPath, err)
 			}
 			filename := filepath.Base(segPath)
 			err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
@@ -519,17 +545,11 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
 			filenames = append(filenames, filename)
 		case segment.UnpersistedSegment:
 			// need to persist this to disk
-			var err error
 			filename := zapFileName(segmentSnapshot.id)
-			path := path + string(os.PathSeparator) + filename
-			if getWriter != nil {
-				err = seg.PersistToWriter(getWriter(path))
-			} else {
-				err = seg.Persist(path)
-			}
-
+			path := filepath.Join(path, filename)
+			err := persistToDirectory(seg, d, path)
 			if err != nil {
-				return nil, nil, fmt.Errorf("error persisting segment: %v", err)
+				return nil, nil, fmt.Errorf("segment: %s persist err: %v", path, err)
 			}
 			newSegmentPaths[segmentSnapshot.id] = path
 			err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))

+ 7 - 7
index/scorch/snapshot_index.go

@@ -18,8 +18,8 @@ import (
 	"container/heap"
 	"encoding/binary"
 	"fmt"
-	"io"
 	"os"
+	"path/filepath"
 	"reflect"
 	"sort"
 	"sync"
@@ -766,15 +766,15 @@ OUTER:
 	return rv
 }
 
-func (i *IndexSnapshot) CopyTo(getWriter func(string) io.WriteCloser) (err error) {
+func (i *IndexSnapshot) CopyTo(d index.Directory) error {
 	// get the root bolt file.
-	w := getWriter("root.bolt")
-	if w == nil {
-		fmt.Errorf("failed to create the root.bolt file")
+	w, err := d.GetWriter(filepath.Join("store", "root.bolt"))
+	if err != nil || w == nil {
+		return fmt.Errorf("failed to create the root.bolt file, err: %v", err)
 	}
 	rootFile, ok := w.(*os.File)
 	if !ok {
-		fmt.Errorf("failed to create the root.bolt file")
+		return fmt.Errorf("invalid root.bolt file found")
 	}
 
 	copyBolt, err := bolt.Open(rootFile.Name(), 0600, nil)
@@ -794,7 +794,7 @@ func (i *IndexSnapshot) CopyTo(getWriter func(string) io.WriteCloser) (err error
 		return err
 	}
 
-	_, _, err = prepareBoltSnapshot(i, tx, "", i.parent.segPlugin, getWriter)
+	_, _, err = prepareBoltSnapshot(i, tx, "", i.parent.segPlugin, d)
 	if err != nil {
 		_ = tx.Rollback()
 		return fmt.Errorf("error backing up index snapshot: %v", err)

+ 18 - 3
index_impl.go

@@ -20,6 +20,7 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"path/filepath"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -912,7 +913,7 @@ func (m *searchHitSorter) Less(i, j int) bool {
 	return c < 0
 }
 
-func (i *indexImpl) CopyTo(getWriter func(string) io.WriteCloser) (err error) {
+func (i *indexImpl) CopyTo(d index.Directory) (err error) {
 	i.mutex.RLock()
 	defer i.mutex.RUnlock()
 
@@ -935,11 +936,25 @@ func (i *indexImpl) CopyTo(getWriter func(string) io.WriteCloser) (err error) {
 		return fmt.Errorf("index implementation does not support copy")
 	}
 
-	err = irc.CopyTo(getWriter)
+	err = irc.CopyTo(d)
 	if err != nil {
 		return fmt.Errorf("error copying index metadata: %v", err)
 	}
 
 	// copy the metadata
-	return i.meta.SaveToWriter(getWriter)
+	return i.meta.CopyTo(d)
+}
+
+func (f FileSystemDirectory) GetWriter(filePath string) (io.WriteCloser,
+	error) {
+	dir, file := filepath.Split(filePath)
+	if dir != "" {
+		err := os.MkdirAll(filepath.Join(string(f), dir), os.ModePerm)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return os.OpenFile(filepath.Join(string(f), dir, file),
+		os.O_RDWR|os.O_CREATE, 0600)
 }

+ 6 - 5
index_meta.go

@@ -17,12 +17,12 @@ package bleve
 import (
 	"encoding/json"
 	"fmt"
-	"io"
 	"io/ioutil"
 	"os"
 	"path/filepath"
 
 	"github.com/blevesearch/bleve/v2/index/upsidedown"
+	index "github.com/blevesearch/bleve_index_api"
 )
 
 const metaFilename = "index_meta.json"
@@ -94,15 +94,16 @@ func (i *indexMeta) Save(path string) (err error) {
 	return nil
 }
 
-func (i *indexMeta) SaveToWriter(getWriter func(string) io.WriteCloser) (err error) {
+func (i *indexMeta) CopyTo(d index.Directory) (err error) {
 	metaBytes, err := json.Marshal(i)
 	if err != nil {
 		return err
 	}
 
-	w := getWriter(metaFilename)
-	if w == nil {
-		return fmt.Errorf("invalid writer for file: %s", metaFilename)
+	w, err := d.GetWriter(metaFilename)
+	if w == nil || err != nil {
+		return fmt.Errorf("invalid writer for file: %s, err: %v",
+			metaFilename, err)
 	}
 	defer w.Close()
 

+ 23 - 43
index_test.go

@@ -18,7 +18,6 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	"io"
 	"io/ioutil"
 	"log"
 	"math"
@@ -2425,83 +2424,64 @@ func TestCopyIndex(t *testing.T) {
 	backupIndexPath := createTmpIndexPath(t)
 	defer cleanupTmpIndexPath(t, backupIndexPath)
 
-	getWriter := func(filePath string) io.WriteCloser {
-		var path string
-		if strings.Contains(filePath, "store") ||
-			strings.Contains(filePath, "root.bolt") {
-			path = filepath.Join(backupIndexPath, "store",
-				filepath.Base(filePath))
-			_ = os.MkdirAll(filepath.Join(backupIndexPath, "store"), 0700)
-		} else {
-			path = filepath.Join(backupIndexPath,
-				filepath.Base(filePath))
-		}
-		f, err := os.OpenFile(path,
-			os.O_RDWR|os.O_CREATE, 0600)
-		if err != nil {
-			return nil
-		}
-		return f
-	}
-
-	err = copyableIndex.CopyTo(getWriter)
+	err = copyableIndex.CopyTo(FileSystemDirectory(backupIndexPath))
 	if err != nil {
-		t.Fatalf("error backing up index: %v", err)
+		t.Fatalf("error copying the index: %v", err)
 	}
 
 	// open the copied  index
-	idxBackup, err := Open(backupIndexPath)
+	idxCopied, err := Open(backupIndexPath)
 	if err != nil {
-		t.Fatalf("unable to open backup index")
+		t.Fatalf("unable to open copy index")
 	}
 	defer func() {
-		err := idxBackup.Close()
+		err := idxCopied.Close()
 		if err != nil {
-			t.Fatalf("error closing backup index: %v", err)
+			t.Fatalf("error closing copy index: %v", err)
 		}
 	}()
 
 	// assertions on copied index
-	backupCount, err := idxBackup.DocCount()
+	copyCount, err := idxCopied.DocCount()
 	if err != nil {
 		t.Fatal(err)
 	}
-	if backupCount != 2 {
-		t.Errorf("expected doc count 2, got %d", backupCount)
+	if copyCount != 2 {
+		t.Errorf("expected doc count 2, got %d", copyCount)
 	}
 
-	backupDoc, err := idxBackup.Document("a")
+	copyDoc, err := idxCopied.Document("a")
 	if err != nil {
 		t.Fatal(err)
 	}
-	backupFoundNameField := false
-	backupDoc.VisitFields(func(field index.Field) {
+	copyFoundNameField := false
+	copyDoc.VisitFields(func(field index.Field) {
 		if field.Name() == "name" && string(field.Value()) == "tester" {
-			backupFoundNameField = true
+			copyFoundNameField = true
 		}
 	})
-	if !backupFoundNameField {
-		t.Errorf("expected backup to find field named 'name' with value 'tester'")
+	if !copyFoundNameField {
+		t.Errorf("expected copy index to find field named 'name' with value 'tester'")
 	}
 
-	backupFields, err := idx.Fields()
+	copyFields, err := idx.Fields()
 	if err != nil {
 		t.Fatal(err)
 	}
-	backupExpectedFields := map[string]bool{
+	copyExpectedFields := map[string]bool{
 		"_all": false,
 		"name": false,
 		"desc": false,
 	}
-	if len(backupFields) < len(backupExpectedFields) {
-		t.Fatalf("expected %d fields got %d", len(backupExpectedFields), len(backupFields))
+	if len(copyFields) < len(copyExpectedFields) {
+		t.Fatalf("expected %d fields got %d", len(copyExpectedFields), len(copyFields))
 	}
-	for _, f := range backupFields {
-		backupExpectedFields[f] = true
+	for _, f := range copyFields {
+		copyExpectedFields[f] = true
 	}
-	for ef, efp := range backupExpectedFields {
+	for ef, efp := range copyExpectedFields {
 		if !efp {
-			t.Errorf("backup field %s is missing", ef)
+			t.Errorf("copy field %s is missing", ef)
 		}
 	}
 }