Browse Source

Add support for sqlite3_unlock_notify

Mura Li 10 months ago
parent
commit
eb08795f52
5 changed files with 441 additions and 5 deletions
  1. 1 1
      .travis.yml
  2. 41 4
      sqlite3.go
  3. 85 0
      sqlite3_opt_unlock_notify.c
  4. 92 0
      sqlite3_opt_unlock_notify.go
  5. 222 0
      sqlite3_opt_unlock_notify_test.go

+ 1 - 1
.travis.yml

@@ -12,7 +12,7 @@ env:
   matrix:
     - GOTAGS=
     - GOTAGS=libsqlite3
-    - GOTAGS="sqlite_allow_uri_authority sqlite_app_armor sqlite_foreign_keys sqlite_fts5 sqlite_icu sqlite_introspect sqlite_json sqlite_secure_delete sqlite_see sqlite_stat4 sqlite_trace sqlite_userauth sqlite_vacuum_incr sqlite_vtable"
+    - GOTAGS="sqlite_allow_uri_authority sqlite_app_armor sqlite_foreign_keys sqlite_fts5 sqlite_icu sqlite_introspect sqlite_json sqlite_secure_delete sqlite_see sqlite_stat4 sqlite_trace sqlite_userauth sqlite_vacuum_incr sqlite_vtable sqlite_unlock_notify"
     - GOTAGS=sqlite_vacuum_full
 
 go:

+ 41 - 4
sqlite3.go

@@ -78,8 +78,38 @@ _sqlite3_exec(sqlite3* db, const char* pcmd, long long* rowid, long long* change
   return rv;
 }
 
+#ifdef SQLITE_ENABLE_UNLOCK_NOTIFY
+extern int sqlite3_step_blocking(sqlite3_stmt *stmt);
+extern int _sqlite3_step_blocking(sqlite3_stmt* stmt, long long* rowid, long long* changes);
+extern int sqlite3_prepare_v2_blocking(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail);
+
+static int
+sqlite3_step_internal(sqlite3_stmt *stmt)
+{
+  return sqlite3_step_blocking(stmt);
+}
+
+static int
+_sqlite3_step_internal(sqlite3_stmt* stmt, long long* rowid, long long* changes)
+{
+  return _sqlite3_step_blocking(stmt, rowid, changes);
+}
+
 static int
-_sqlite3_step(sqlite3_stmt* stmt, long long* rowid, long long* changes)
+sqlite3_prepare_v2_internal(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail)
+{
+  return sqlite3_prepare_v2_blocking(db, zSql, nBytes, ppStmt, pzTail);
+}
+
+#else
+static int
+sqlite3_step_internal(sqlite3_stmt *stmt)
+{
+  return sqlite3_step(stmt);
+}
+
+static int
+_sqlite3_step_internal(sqlite3_stmt* stmt, long long* rowid, long long* changes)
 {
   int rv = sqlite3_step(stmt);
   sqlite3* db = sqlite3_db_handle(stmt);
@@ -88,6 +118,13 @@ _sqlite3_step(sqlite3_stmt* stmt, long long* rowid, long long* changes)
   return rv;
 }
 
+static int
+sqlite3_prepare_v2_internal(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail)
+{
+  return sqlite3_prepare_v2(db, zSql, nBytes, ppStmt, pzTail);
+}
+#endif
+
 void _sqlite3_result_text(sqlite3_context* ctx, const char* s) {
   sqlite3_result_text(ctx, s, -1, &free);
 }
@@ -1637,7 +1674,7 @@ func (c *SQLiteConn) prepare(ctx context.Context, query string) (driver.Stmt, er
 	defer C.free(unsafe.Pointer(pquery))
 	var s *C.sqlite3_stmt
 	var tail *C.char
-	rv := C.sqlite3_prepare_v2(c.db, pquery, -1, &s, &tail)
+	rv := C.sqlite3_prepare_v2_internal(c.db, pquery, -1, &s, &tail)
 	if rv != C.SQLITE_OK {
 		return nil, c.lastError()
 	}
@@ -1871,7 +1908,7 @@ func (s *SQLiteStmt) exec(ctx context.Context, args []namedValue) (driver.Result
 	}
 
 	var rowid, changes C.longlong
-	rv := C._sqlite3_step(s.s, &rowid, &changes)
+	rv := C._sqlite3_step_internal(s.s, &rowid, &changes)
 	if rv != C.SQLITE_ROW && rv != C.SQLITE_OK && rv != C.SQLITE_DONE {
 		err := s.c.lastError()
 		C.sqlite3_reset(s.s)
@@ -1943,7 +1980,7 @@ func (rc *SQLiteRows) Next(dest []driver.Value) error {
 	if rc.s.closed {
 		return io.EOF
 	}
-	rv := C.sqlite3_step(rc.s.s)
+	rv := C.sqlite3_step_internal(rc.s.s)
 	if rv == C.SQLITE_DONE {
 		return io.EOF
 	}

+ 85 - 0
sqlite3_opt_unlock_notify.c

@@ -0,0 +1,85 @@
+// Copyright (C) 2018 Yasuhiro Matsumoto <mattn.jp@gmail.com>.
+//
+// Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+#ifdef SQLITE_ENABLE_UNLOCK_NOTIFY
+#include <stdio.h>
+#include <sqlite3-binding.h>
+
+extern int unlock_notify_wait(sqlite3 *db);
+
+int
+sqlite3_step_blocking(sqlite3_stmt *stmt)
+{
+  int rv;
+  sqlite3* db;
+
+  db = sqlite3_db_handle(stmt);
+  for (;;) {
+    rv = sqlite3_step(stmt);
+    if (rv != SQLITE_LOCKED) {
+      break;
+    }
+    if (sqlite3_extended_errcode(db) != SQLITE_LOCKED_SHAREDCACHE) {
+      break;
+    }
+    rv = unlock_notify_wait(db);
+    if (rv != SQLITE_OK) {
+      break;
+    }
+    sqlite3_reset(stmt);
+  }
+
+  return rv;
+}
+
+int
+_sqlite3_step_blocking(sqlite3_stmt* stmt, long long* rowid, long long* changes)
+{
+  int rv;
+  sqlite3* db;
+
+  db = sqlite3_db_handle(stmt);
+  for (;;) {
+    rv = sqlite3_step(stmt);
+    if (rv!=SQLITE_LOCKED) {
+      break;
+    }
+    if (sqlite3_extended_errcode(db) != SQLITE_LOCKED_SHAREDCACHE) {
+      break;
+    }
+    rv = unlock_notify_wait(db);
+    if (rv != SQLITE_OK) {
+      break;
+    }
+    sqlite3_reset(stmt);
+  }
+
+  *rowid = (long long) sqlite3_last_insert_rowid(db);
+  *changes = (long long) sqlite3_changes(db);
+  return rv;
+}
+
+int
+sqlite3_prepare_v2_blocking(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail)
+{
+  int rv;
+
+  for (;;) {
+    rv = sqlite3_prepare_v2(db, zSql, nBytes, ppStmt, pzTail);
+    if (rv!=SQLITE_LOCKED) {
+      break;
+    }
+    if (sqlite3_extended_errcode(db) != SQLITE_LOCKED_SHAREDCACHE) {
+      break;
+    }
+    rv = unlock_notify_wait(db);
+    if (rv != SQLITE_OK) {
+      break;
+    }
+  }
+
+  return rv;
+}
+#endif

+ 92 - 0
sqlite3_opt_unlock_notify.go

@@ -0,0 +1,92 @@
+// Copyright (C) 2018 Yasuhiro Matsumoto <mattn.jp@gmail.com>.
+//
+// Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+// +build cgo
+// +build sqlite_unlock_notify
+
+package sqlite3
+
+/*
+#cgo CFLAGS: -DSQLITE_ENABLE_UNLOCK_NOTIFY
+
+#include <stdlib.h>
+#include <sqlite3-binding.h>
+
+extern void unlock_notify_callback(void *arg, int argc);
+*/
+import "C"
+import (
+	"fmt"
+	"sync"
+	"unsafe"
+)
+
+type unlock_notify_table struct {
+	sync.Mutex
+	seqnum uint
+	table  map[uint]chan struct{}
+}
+
+var unt unlock_notify_table = unlock_notify_table{table: make(map[uint]chan struct{})}
+
+func (t *unlock_notify_table) add(c chan struct{}) uint {
+	t.Lock()
+	defer t.Unlock()
+	h := t.seqnum
+	t.table[h] = c
+	t.seqnum++
+	return h
+}
+
+func (t *unlock_notify_table) remove(h uint) {
+	t.Lock()
+	defer t.Unlock()
+	delete(t.table, h)
+}
+
+func (t *unlock_notify_table) get(h uint) chan struct{} {
+	t.Lock()
+	defer t.Unlock()
+	c, ok := t.table[h]
+	if !ok {
+		panic(fmt.Sprintf("Non-existent key for unlcok-notify channel: %d", h))
+	}
+	return c
+}
+
+//export unlock_notify_callback
+func unlock_notify_callback(argv unsafe.Pointer, argc C.int) {
+	for i := 0; i < int(argc); i++ {
+		parg := ((*(*[1 << 30]*[1]uint)(argv))[i])
+		arg := *parg
+		h := arg[0]
+		c := unt.get(h)
+		c <- struct{}{}
+	}
+}
+
+//export unlock_notify_wait
+func unlock_notify_wait(db *C.sqlite3) C.int {
+	// It has to be a bufferred channel to not block in sqlite_unlock_notify
+	// as sqlite_unlock_notify could invoke the callback before it returns.
+	c := make(chan struct{}, 1)
+	defer close(c)
+
+	h := unt.add(c)
+	defer unt.remove(h)
+
+	pargv := C.malloc(C.sizeof_uint)
+	defer C.free(pargv)
+
+	argv := (*[1]uint)(pargv)
+	argv[0] = h
+	if rv := C.sqlite3_unlock_notify(db, (*[0]byte)(C.unlock_notify_callback), unsafe.Pointer(pargv)); rv != C.SQLITE_OK {
+		return rv
+	}
+
+	<-c
+
+	return C.SQLITE_OK
+}

+ 222 - 0
sqlite3_opt_unlock_notify_test.go

@@ -0,0 +1,222 @@
+// Copyright (C) 2018 Yasuhiro Matsumoto <mattn.jp@gmail.com>.
+//
+// Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+// +build sqlite_unlock_notify
+
+package sqlite3
+
+import (
+	"database/sql"
+	"fmt"
+	"os"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestUnlockNotify(t *testing.T) {
+	tempFilename := TempFilename(t)
+	defer os.Remove(tempFilename)
+	dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc&_busy_timeout=%d", tempFilename, 500)
+	db, err := sql.Open("sqlite3", dsn)
+	if err != nil {
+		t.Fatal("Failed to open database:", err)
+	}
+	defer db.Close()
+
+	_, err = db.Exec("CREATE TABLE foo(id INTEGER, status INTEGER)")
+	if err != nil {
+		t.Fatal("Failed to create table:", err)
+	}
+
+	tx, err := db.Begin()
+	if err != nil {
+		t.Fatal("Failed to begin transaction:", err)
+	}
+
+	_, err = tx.Exec("INSERT INTO foo(id, status) VALUES(1, 100)")
+	if err != nil {
+		t.Fatal("Failed to insert null:", err)
+	}
+
+	_, err = tx.Exec("UPDATE foo SET status = 200 WHERE id = 1")
+	if err != nil {
+		t.Fatal("Failed to update table:", err)
+	}
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	timer := time.NewTimer(500 * time.Millisecond)
+	go func() {
+		<-timer.C
+		err := tx.Commit()
+		if err != nil {
+			t.Fatal("Failed to commit transaction:", err)
+		}
+		wg.Done()
+	}()
+
+	rows, err := db.Query("SELECT count(*) from foo")
+	if err != nil {
+		t.Fatal("Unable to query foo table:", err)
+	}
+
+	if rows.Next() {
+		var count int
+		if err := rows.Scan(&count); err != nil {
+			t.Fatal("Failed to Scan rows", err)
+		}
+	}
+	if err := rows.Err(); err != nil {
+		t.Fatal("Failed at the call to Next:", err)
+	}
+	wg.Wait()
+
+}
+
+func TestUnlockNotifyMany(t *testing.T) {
+	tempFilename := TempFilename(t)
+	defer os.Remove(tempFilename)
+	dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc&_busy_timeout=%d", tempFilename, 500)
+	db, err := sql.Open("sqlite3", dsn)
+	if err != nil {
+		t.Fatal("Failed to open database:", err)
+	}
+	defer db.Close()
+
+	_, err = db.Exec("CREATE TABLE foo(id INTEGER, status INTEGER)")
+	if err != nil {
+		t.Fatal("Failed to create table:", err)
+	}
+
+	tx, err := db.Begin()
+	if err != nil {
+		t.Fatal("Failed to begin transaction:", err)
+	}
+
+	_, err = tx.Exec("INSERT INTO foo(id, status) VALUES(1, 100)")
+	if err != nil {
+		t.Fatal("Failed to insert null:", err)
+	}
+
+	_, err = tx.Exec("UPDATE foo SET status = 200 WHERE id = 1")
+	if err != nil {
+		t.Fatal("Failed to update table:", err)
+	}
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	timer := time.NewTimer(500 * time.Millisecond)
+	go func() {
+		<-timer.C
+		err := tx.Commit()
+		if err != nil {
+			t.Fatal("Failed to commit transaction:", err)
+		}
+		wg.Done()
+	}()
+
+	const concurrentQueries = 1000
+	wg.Add(concurrentQueries)
+	for i := 0; i < concurrentQueries; i++ {
+		go func() {
+			rows, err := db.Query("SELECT count(*) from foo")
+			if err != nil {
+				t.Fatal("Unable to query foo table:", err)
+			}
+
+			if rows.Next() {
+				var count int
+				if err := rows.Scan(&count); err != nil {
+					t.Fatal("Failed to Scan rows", err)
+				}
+			}
+			if err := rows.Err(); err != nil {
+				t.Fatal("Failed at the call to Next:", err)
+			}
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+}
+
+func TestUnlockNotifyDeadlock(t *testing.T) {
+	tempFilename := TempFilename(t)
+	defer os.Remove(tempFilename)
+	dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc&_busy_timeout=%d", tempFilename, 500)
+	db, err := sql.Open("sqlite3", dsn)
+	if err != nil {
+		t.Fatal("Failed to open database:", err)
+	}
+	defer db.Close()
+
+	_, err = db.Exec("CREATE TABLE foo(id INTEGER, status INTEGER)")
+	if err != nil {
+		t.Fatal("Failed to create table:", err)
+	}
+
+	tx, err := db.Begin()
+	if err != nil {
+		t.Fatal("Failed to begin transaction:", err)
+	}
+
+	_, err = tx.Exec("INSERT INTO foo(id, status) VALUES(1, 100)")
+	if err != nil {
+		t.Fatal("Failed to insert null:", err)
+	}
+
+	_, err = tx.Exec("UPDATE foo SET status = 200 WHERE id = 1")
+	if err != nil {
+		t.Fatal("Failed to update table:", err)
+	}
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	timer := time.NewTimer(500 * time.Millisecond)
+	go func() {
+		<-timer.C
+		err := tx.Commit()
+		if err != nil {
+			t.Fatal("Failed to commit transaction:", err)
+		}
+		wg.Done()
+	}()
+
+	wg.Add(1)
+	go func() {
+		tx2, err := db.Begin()
+		if err != nil {
+			t.Fatal("Failed to begin transaction:", err)
+		}
+		defer tx2.Rollback()
+
+		_, err = tx2.Exec("DELETE FROM foo")
+		if err != nil {
+			t.Fatal("Failed to delete table:", err)
+		}
+		err = tx2.Commit()
+		if err != nil {
+			t.Fatal("Failed to commit transaction:", err)
+		}
+		wg.Done()
+	}()
+
+	rows, err := tx.Query("SELECT count(*) from foo")
+	if err != nil {
+		t.Fatal("Unable to query foo table:", err)
+	}
+
+	if rows.Next() {
+		var count int
+		if err := rows.Scan(&count); err != nil {
+			t.Fatal("Failed to Scan rows", err)
+		}
+	}
+	if err := rows.Err(); err != nil {
+		t.Fatal("Failed at the call to Next:", err)
+	}
+
+	wg.Wait()
+}