Browse Source

fix data race with batch reset (#1150)

this is another variation of the race found/fixed in #1092
in that case the batch was empty, which meant we would skip
the code that properly synchronized access.  our fix only
handled this exact case (no data operations), however
there is another variation, if the batch contains only deletes
(which are data ops) we still spawned the goroutine, although
since there were no real updates, again the synchronization
code would be skipped, and thus the data race could happen.

the fix is to check the number of updates (computed earlier on
the caller's goroutine, so it's safe) instead of the length
of the IndexOps (which includes updates and deletes)

the key is that we should only spawn the goroutine that will
range over the batch, in cases where we will synchronize on
waiting for the analysis to complete (at least one update).

fixes #1149
Marty Schoch 3 months ago
parent
commit
a91b427b59
3 changed files with 34 additions and 2 deletions
  1. 1 1
      index/scorch/scorch.go
  2. 1 1
      index/upsidedown/upsidedown.go
  3. 32 0
      index_test.go

+ 1 - 1
index/scorch/scorch.go

@@ -312,7 +312,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
 
 	// FIXME could sort ids list concurrent with analysis?
 
-	if len(batch.IndexOps) > 0 {
+	if numUpdates > 0 {
 		go func() {
 			for _, doc := range batch.IndexOps {
 				if doc != nil {

+ 1 - 1
index/upsidedown/upsidedown.go

@@ -810,7 +810,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
 		}
 	}
 
-	if len(batch.IndexOps) > 0 {
+	if numUpdates > 0 {
 		go func() {
 			for _, doc := range batch.IndexOps {
 				if doc != nil {

+ 32 - 0
index_test.go

@@ -2184,3 +2184,35 @@ func TestDataRaceBug1092(t *testing.T) {
 		batch.Reset()
 	}
 }
+
+func TestBatchRaceBug1149(t *testing.T) {
+	defer func() {
+		err := os.RemoveAll("testidx")
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	i, err := New("testidx", NewIndexMapping())
+	//i, err := NewUsing("testidx", NewIndexMapping(), "scorch", "scorch", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		err := i.Close()
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+	b := i.NewBatch()
+	b.Delete("1")
+	err = i.Batch(b)
+	if err != nil {
+		t.Fatal(err)
+	}
+	b.Reset()
+	err = i.Batch(b)
+	if err != nil {
+		t.Fatal(err)
+	}
+	b.Reset()
+}