-
Notifications
You must be signed in to change notification settings - Fork 0
/
database.go
362 lines (297 loc) · 8.2 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
package leveldb
import (
"bytes"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"sync"
"sync/atomic"
"unsafe"
"github.com/nightlyone/lockfile"
)
const dbMemorySegment = 1024 * 1024
const dbMaxSegments = 8
type dbState struct {
segments []segment
memory *memorySegment
multi segment
}
type Statistics struct {
NumberOfSegments int
}
// Database reference is obtained via Open()
type Database struct {
sync.Mutex
open bool
// atomically updated flag to control database closing
closing int32
merger chan bool
// atomically updated flag to control merger
inMerge int32
deleter Deleter
path string
wg sync.WaitGroup
nextSegID uint64
lockfile lockfile.Lockfile
options Options
// atomic CAS to avoid contention, db.state is read-only
state *dbState
snapshots []*Snapshot
// if non-nil an asynchronous error has occurred, and the database cannot be used. must be atomically updated
err error
}
type KeyComparison func([]byte, []byte) int
type batchReadMode int
const (
DiscardPartial batchReadMode = 0
ApplyPartial batchReadMode = 1
ReturnOpenError batchReadMode = 2
)
type Options struct {
// If true, then if the database does not exist on Open() it will be created.
CreateIfNeeded bool
// The database segments are periodically merged to enforce MaxSegments.
// If this is true, the merging only occurs during Close().
DisableAutoMerge bool
// Maximum number of segments per database which controls the number of open files.
// If the number of segments exceeds 2x this value, producers are paused while the
// segments are merged.
MaxSegments uint
// Maximum size of memory segment in bytes. Maximum memory usage per database is
// roughly MaxSegments * MaxMemoryBytes but can be higher based on producer rate.
MaxMemoryBytes uint64
// Disable flush to disk when writing to increase performance.
DisableWriteFlush bool
// Force sync to disk when writing. If true, then DisableWriteFlush is ignored.
EnableSyncWrite bool
// Determines handling of partial batches during Open()
BatchReadMode batchReadMode
// Key comparison function or nil to use standard bytes.Compare
UserKeyCompare KeyComparison
}
// LookupIterator iterator interface for table scanning. all iterators should be read until completion
type LookupIterator interface {
// Next returns EndOfIterator when complete, if err is nil, then key and value are valid
Next() (key []byte, value []byte, err error)
// returns the next non-deleted key in the index
peekKey() ([]byte, error)
}
type emptyIterator struct{}
func (i *emptyIterator) Next() (key []byte, value []byte, err error) { return nil, nil, EndOfIterator }
func (i *emptyIterator) peekKey() ([]byte, error) { return nil, EndOfIterator }
var global_lock sync.RWMutex
// Open a database. The database can only be opened by a single process, but the *Database
// reference can be shared across Go routines. The path is a directory name.
// if createIfNeeded is true, them if the db doesn't exist it will be created.
func Open(path string, options Options) (*Database, error) {
global_lock.Lock()
defer global_lock.Unlock()
db, err := open(path, options)
if err == NoDatabaseFound && options.CreateIfNeeded == true {
return create(path, options)
}
return db, err
}
func open(path string, options Options) (*Database, error) {
path = filepath.Clean(path)
err := IsValidDatabase(path)
if err != nil {
return nil, err
}
abs, err := filepath.Abs(path + "/lockfile")
if err != nil {
return nil, err
}
lf, err := lockfile.New(abs)
if err != nil {
return nil, err
}
err = lf.TryLock()
if err != nil {
return nil, DatabaseInUse
}
db := &Database{path: path, open: true, options: options}
db.lockfile = lf
db.deleter = newDeleter(path)
err = db.deleter.deleteScheduled()
if err != nil {
return nil, err
}
segments, err := loadDiskSegments(path, db.options)
if err != nil {
return nil, err
}
maxSegID := uint64(0)
for _, seg := range segments {
if seg.UpperID() > maxSegID {
maxSegID = seg.UpperID()
}
}
atomic.StoreUint64(&db.nextSegID, uint64(maxSegID))
memory := newMemorySegment(db.path, db.nextSegmentID(), db.options)
multi := newMultiSegment(copyAndAppend(segments, memory))
state := &dbState{segments: segments, memory: memory, multi: multi}
db.setState(state)
db.merger = make(chan bool)
if db.options.MaxMemoryBytes < dbMemorySegment {
db.options.MaxMemoryBytes = dbMemorySegment
}
if db.options.MaxSegments < dbMaxSegments {
db.options.MaxSegments = dbMaxSegments
}
if !options.DisableAutoMerge {
db.wg.Add(1)
go mergeSegments(db)
}
return db, nil
}
func create(path string, options Options) (*Database, error) {
path = filepath.Clean(path)
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, err
}
return open(path, options)
}
// Remove the database, deleting all files. the caller must be able to
// gain exclusive multi to the database
func Remove(path string) error {
global_lock.Lock()
defer global_lock.Unlock()
path = filepath.Clean(path)
err := IsValidDatabase(path)
if err != nil {
return err
}
abs, err := filepath.Abs(path + "/lockfile")
if err != nil {
return err
}
lf, err := lockfile.New(abs)
if err != nil {
return err
}
err = lf.TryLock()
if err != nil {
return DatabaseInUse
}
return os.RemoveAll(path)
}
// IsValidDatabase checks if the path points to a valid database or empty directory (which is also valid)
func IsValidDatabase(path string) error {
fi, err := os.Stat(path)
if err != nil {
return NoDatabaseFound
}
if !fi.IsDir() {
return NotADirectory
}
infos, err := ioutil.ReadDir(path)
if err != nil {
return err
}
for _, f := range infos {
if "lockfile" == f.Name() {
continue
}
if "deleted" == f.Name() {
continue
}
if f.Name() == filepath.Base(path) {
continue
}
if matched, _ := regexp.Match("(log|keys|data)\\..*", []byte(f.Name())); !matched {
return NotValidDatabase
}
}
return nil
}
// Close the database. any memory segments are persisted to disk.
// The resulting segments are merged until the default maxSegments is reached
func (db *Database) Close() error {
return db.CloseWithMerge(db.options.MaxSegments)
}
// CloseWithMerge closes the database with control of the segment count. if segmentCount is 0, then
// the merge process is skipped
func (db *Database) CloseWithMerge(segmentCount uint) error {
global_lock.Lock()
defer global_lock.Unlock()
if !db.open {
return DatabaseClosed
}
err := db.err
var state *dbState
if err != nil {
goto finish
}
atomic.StoreInt32(&db.closing, 1)
close(db.merger)
db.wg.Wait() // wait for background merger to exit
state = &dbState{
segments: copyAndAppend(db.state.segments, db.state.memory),
memory: nil,
multi: nil,
}
db.state = state
if segmentCount > 0 {
db.err = mergeSegments0(db, segmentCount, false)
}
if db.err != nil {
goto finish
}
// write any remaining memory segments to disk
db.Lock()
for _, s := range db.snapshots {
s.Close()
}
db.snapshots = nil
for _, s := range db.state.segments {
ms, ok := s.(*memorySegment)
if ok {
db.wg.Add(1)
go func(s *memorySegment) {
err0 := writeSegmentToDisk(db, s)
if err0 != nil {
db.err = err0
}
db.wg.Done()
}(ms)
}
}
db.wg.Wait()
for _, s := range db.state.segments {
s.Close()
}
err = db.deleter.deleteScheduled()
finish:
db.state = &dbState{segments: []segment{}}
db.lockfile.Unlock()
db.open = false
return err
}
func (db *Database) nextSegmentID() uint64 {
return atomic.AddUint64(&db.nextSegID, 1)
}
func (db *Database) getState() *dbState {
return (*dbState)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&db.state))))
}
func (db *Database) setState(state *dbState) {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&db.state)), unsafe.Pointer(state))
}
func (db *Database) Stats() Statistics {
db.Lock()
defer db.Unlock()
return Statistics{NumberOfSegments: len(db.getState().segments)}
}
func less(a []byte, b []byte) bool {
return bytes.Compare(a, b) < 0
}
func equal(a []byte, b []byte) bool {
return bytes.Equal(a, b)
}
func copyAndAppend(seg []segment, segs ...segment) []segment {
newSlice := make([]segment, len(seg), len(seg)+len(segs))
copy(newSlice, seg)
return append(newSlice, segs...)
}