diff --git a/go.mod b/go.mod index 28bdd2ff4..7f1df8747 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cayleygraph/cayley go 1.12 require ( + github.com/RyouZhang/async-go v0.2.2 // indirect github.com/badgerodon/peg v0.0.0-20130729175151-9e5f7f4d07ca github.com/cayleygraph/quad v1.2.4 github.com/cockroachdb/apd v1.1.0 // indirect diff --git a/go.sum b/go.sum index 24da5c1c9..6207bfe6b 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/Microsoft/go-winio v0.4.12/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcy github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/RyouZhang/async-go v0.2.2 h1:/cRhxuJkRLygF/a+c9Teaa96QIY22mOx1YrOK+WSgAA= +github.com/RyouZhang/async-go v0.2.2/go.mod h1:ogL6baAxf0sZWPi/i9i4XdTt8D1Vn/AxNeG0eUWrY18= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/graph/memstore/all_iterator.go b/graph/memstore/all_iterator.go index 8a26424f8..3981ac4f7 100644 --- a/graph/memstore/all_iterator.go +++ b/graph/memstore/all_iterator.go @@ -178,7 +178,9 @@ func (it *allIteratorContains) Contains(ctx context.Context, v graph.Ref) bool { if !ok { return false } + it.qs.primMu.RLock() p := it.qs.prim[id] + it.qs.primMu.RUnlock() if p.ID > it.maxid { return false } diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index ea9798c4d..f7a667be0 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/cayleygraph/cayley/graph" "github.com/cayleygraph/cayley/graph/iterator" @@ -60,7 +61,7 @@ type QuadDirectionIndex struct { } func NewQuadDirectionIndex() QuadDirectionIndex { - return QuadDirectionIndex{[...]map[int64]*Tree{ + return QuadDirectionIndex{index: [...]map[int64]*Tree{ quad.Subject - 1: make(map[int64]*Tree), quad.Predicate - 1: make(map[int64]*Tree), quad.Object - 1: make(map[int64]*Tree), @@ -72,6 +73,7 @@ func (qdi QuadDirectionIndex) Tree(d quad.Direction, id int64) *Tree { if d < quad.Subject || d > quad.Label { panic("illegal direction") } + tree, ok := qdi.index[d-1][id] if !ok { tree = TreeNew(cmp) @@ -143,6 +145,11 @@ type QuadStore struct { index QuadDirectionIndex horizon int64 // used only to assign ids to tx // vip_index map[string]map[int64]map[string]map[int64]*b.Tree + + valsMu sync.RWMutex + quadsMu sync.RWMutex + primMu sync.RWMutex + indexMu sync.RWMutex } // New creates a new in-memory quad store and loads provided quads. @@ -178,7 +185,10 @@ func (qs *QuadStore) addPrimitive(p *Primitive) int64 { } func (qs *QuadStore) appendPrimitive(p *Primitive) { + qs.primMu.Lock() qs.prim[p.ID] = p + qs.primMu.Unlock() + if !qs.reading { qs.all = append(qs.all, p) } else { @@ -198,24 +208,36 @@ func (qs *QuadStore) resolveVal(v quad.Value, add bool) (int64, bool) { n = n[len(internalBNodePrefix):] id, err := strconv.ParseInt(string(n), 10, 64) if err == nil && id != 0 { + qs.primMu.RLock() if p, ok := qs.prim[id]; ok || !add { + qs.primMu.RUnlock() if add { p.refs++ } return id, ok } + qs.primMu.RUnlock() qs.appendPrimitive(&Primitive{ID: id, refs: 1}) return id, true } } vs := v.String() + qs.valsMu.RLock() if id, exists := qs.vals[vs]; exists || !add { + qs.valsMu.RUnlock() if exists && add { + qs.primMu.Lock() qs.prim[id].refs++ + qs.primMu.Unlock() } return id, exists } + qs.valsMu.RUnlock() + id := qs.addPrimitive(&Primitive{Value: v}) + + qs.valsMu.Lock() + defer qs.valsMu.Unlock() qs.vals[vs] = id return id, true } @@ -237,7 +259,9 @@ func (qs *QuadStore) resolveQuad(q quad.Quad, add bool) (internalQuad, bool) { } func (qs *QuadStore) lookupVal(id int64) quad.Value { + qs.primMu.RLock() pv := qs.prim[id] + qs.primMu.RUnlock() if pv == nil || pv.Value == nil { return quad.BNode(internalBNodePrefix + strconv.FormatInt(id, 10)) } @@ -270,6 +294,9 @@ func (qs *QuadStore) AddValue(v quad.Value) (int64, bool) { } func (qs *QuadStore) indexesForQuad(q internalQuad) []*Tree { + qs.indexMu.Lock() + defer qs.indexMu.Unlock() + trees := make([]*Tree, 0, 4) for dir := quad.Subject; dir <= quad.Label; dir++ { v := q.Dir(dir) @@ -285,13 +312,20 @@ func (qs *QuadStore) indexesForQuad(q internalQuad) []*Tree { // False is returned as a second parameter if quad exists already. func (qs *QuadStore) AddQuad(q quad.Quad) (int64, bool) { p, _ := qs.resolveQuad(q, false) + qs.quadsMu.RLock() if id := qs.quads[p]; id != 0 { + qs.quadsMu.RUnlock() return id, false } + qs.quadsMu.RUnlock() p, _ = qs.resolveQuad(q, true) pr := &Primitive{Quad: p} id := qs.addPrimitive(pr) + + qs.quadsMu.Lock() qs.quads[p] = id + qs.quadsMu.Unlock() + for _, t := range qs.indexesForQuad(p) { t.Set(id, pr) } @@ -345,32 +379,44 @@ func (qs *QuadStore) deleteQuadNodes(q internalQuad) { if id == 0 { continue } + qs.primMu.RLock() if p := qs.prim[id]; p != nil { + qs.primMu.RUnlock() p.refs-- if p.refs < 0 { panic("remove of deleted node") } else if p.refs == 0 { qs.Delete(id) } + } else { + qs.primMu.RUnlock() } } } func (qs *QuadStore) Delete(id int64) bool { + qs.primMu.RLock() p := qs.prim[id] + qs.primMu.RUnlock() if p == nil { return false } // remove from value index if p.Value != nil { + qs.valsMu.Lock() delete(qs.vals, p.Value.String()) + qs.valsMu.Unlock() } // remove from quad indexes for _, t := range qs.indexesForQuad(p.Quad) { t.Delete(id) } + qs.quadsMu.Lock() delete(qs.quads, p.Quad) - // remove primitive + qs.quadsMu.Unlock() + // remove Primitive + qs.primMu.Lock() delete(qs.prim, id) + qs.primMu.Unlock() di := -1 for i, p2 := range qs.all { if p == p2 { @@ -398,6 +444,8 @@ func (qs *QuadStore) findQuad(q quad.Quad) (int64, internalQuad, bool) { if !ok { return 0, p, false } + qs.quadsMu.Lock() + defer qs.quadsMu.Unlock() id := qs.quads[p] return id, p, id != 0 } @@ -456,7 +504,9 @@ func asID(v graph.Ref) (int64, bool) { func (qs *QuadStore) quad(v graph.Ref) (q internalQuad, ok bool) { switch v := v.(type) { case bnode: + qs.primMu.RLock() p := qs.prim[int64(v)] + qs.primMu.RUnlock() if p == nil { return } @@ -482,7 +532,9 @@ func (qs *QuadStore) QuadIterator(d quad.Direction, value graph.Ref) iterator.Sh if !ok { return iterator.NewNull() } + qs.indexMu.RLock() index, ok := qs.index.Get(d, id) + qs.indexMu.RUnlock() if ok && index.Len() != 0 { return qs.newIterator(index, d, id) } @@ -494,7 +546,9 @@ func (qs *QuadStore) QuadIteratorSize(ctx context.Context, d quad.Direction, v g if !ok { return refs.Size{Value: 0, Exact: true}, nil } + qs.indexMu.RLock() index, ok := qs.index.Get(d, id) + qs.indexMu.RUnlock() if !ok { return refs.Size{Value: 0, Exact: true}, nil } @@ -502,6 +556,10 @@ func (qs *QuadStore) QuadIteratorSize(ctx context.Context, d quad.Direction, v g } func (qs *QuadStore) Stats(ctx context.Context, exact bool) (graph.Stats, error) { + qs.valsMu.RLock() + defer qs.valsMu.RUnlock() + qs.quadsMu.RLock() + defer qs.quadsMu.RUnlock() return graph.Stats{ Nodes: refs.Size{ Value: int64(len(qs.vals)), @@ -518,7 +576,10 @@ func (qs *QuadStore) ValueOf(name quad.Value) (graph.Ref, error) { if name == nil { return nil, nil } + + qs.valsMu.Lock() id := qs.vals[name.String()] + qs.valsMu.Unlock() if id == 0 { return nil, nil } @@ -535,9 +596,12 @@ func (qs *QuadStore) NameOf(v graph.Ref) (quad.Value, error) { if !ok { return nil, nil } + qs.primMu.RLock() if _, ok = qs.prim[n]; !ok { + qs.primMu.RUnlock() return nil, nil } + qs.primMu.RUnlock() return qs.lookupVal(n), nil } diff --git a/graph/memstore/quadstore_test.go b/graph/memstore/quadstore_test.go index 0330d11a2..19edd2a54 100644 --- a/graph/memstore/quadstore_test.go +++ b/graph/memstore/quadstore_test.go @@ -16,9 +16,11 @@ package memstore import ( "context" + "fmt" "reflect" "sort" "testing" + "time" "github.com/stretchr/testify/require" @@ -26,9 +28,12 @@ import ( "github.com/cayleygraph/cayley/graph/graphtest" "github.com/cayleygraph/cayley/graph/iterator" "github.com/cayleygraph/cayley/graph/refs" + "github.com/cayleygraph/cayley/query/path" "github.com/cayleygraph/cayley/query/shape" "github.com/cayleygraph/cayley/writer" "github.com/cayleygraph/quad" + + "github.com/RyouZhang/async-go" ) // This is a simple test graph. @@ -264,3 +269,42 @@ func TestTransaction(t *testing.T) { require.NoError(t, err) require.Equal(t, st, st2, "Appended a new quad in a failed transaction") } + +// test multi thread insert and query +func TestMultiThreadQuery(t *testing.T) { + qs, _, _ := makeTestStore(simpleGraph) + + // we make 50 insert, 50 query + funcs := make([]async.LambdaMethod, 100) + for i := 0; i < 100; i++ { + if i%2 == 0 { + index := i + funcs[i] = func() (interface{}, error) { + id, flag := qs.AddQuad(quad.Make( + fmt.Sprintf("E_%d", index), "follows", "G", nil), + ) + if !flag { + return nil, fmt.Errorf("quard exist:%d", id) + } + return id, nil + } + } else { + funcs[i] = func() (interface{}, error) { + ctx := context.Background() + followers, err := path.StartPath(qs, quad.Raw("G")).In("follows").Iterate(ctx).AllValues(qs) + if err != nil { + return nil, err + } + return followers, nil + } + } + } + + results := async.All(funcs, 1*time.Second) + for _, result := range results { + switch result.(type) { + case error: + require.NoError(t, result.(error)) + } + } +} diff --git a/graph/nosql/elastic/elastic.go b/graph/nosql/elastic/elastic.go index 840d5f0b3..2cb7883c1 100644 --- a/graph/nosql/elastic/elastic.go +++ b/graph/nosql/elastic/elastic.go @@ -6,7 +6,6 @@ import ( "github.com/hidal-go/hidalgo/legacy/nosql/elastic" //import hidal-go first so the registration of the no sql stores occurs before quadstore iterates for registration gnosql "github.com/cayleygraph/cayley/graph/nosql" - ) const Type = elastic.Name diff --git a/graph/nosql/mongo/mongo.go b/graph/nosql/mongo/mongo.go index 0b2649b0c..2dcdc1388 100644 --- a/graph/nosql/mongo/mongo.go +++ b/graph/nosql/mongo/mongo.go @@ -6,7 +6,6 @@ import ( "github.com/hidal-go/hidalgo/legacy/nosql/mongo" //import hidal-go first so the registration of the no sql stores occurs before quadstore iterates for registration gnosql "github.com/cayleygraph/cayley/graph/nosql" - ) const Type = mongo.Name