Skip to content

Commit

Permalink
add /fieldtimes endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed Jul 23, 2024
1 parent faab38d commit 779bb50
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 7 deletions.
36 changes: 29 additions & 7 deletions datatype/neuronjson/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ func (d *Data) getMemDBbyVersion(v dvid.VersionID) (db *memdb, found bool) {

// in-memory neuron annotations with sorted body id list for optional sorted iteration.
type memdb struct {
data map[uint64]NeuronJSON
ids []uint64 // sorted list of body ids
fields map[string]struct{} // list of all fields among the annotations
mu sync.RWMutex
data map[uint64]NeuronJSON
ids []uint64 // sorted list of body ids
fields map[string]struct{} // list of all fields among the annotations across versions
fieldTimes map[string]string // timestamp of last update for each field in HEAD
mu sync.RWMutex
}

// initializes the in-memory dbs for the given list of UUIDs + branch names in
Expand All @@ -67,9 +68,10 @@ func (d *Data) initMemoryDB(versions []string) error {
dvid.Infof("Initializing in-memory dbs for neuronjson %q with versions %v\n", d.DataName(), versions)
for _, versionSpec := range versions {
mdb := &memdb{
data: make(map[uint64]NeuronJSON),
fields: make(map[string]struct{}),
ids: []uint64{},
data: make(map[uint64]NeuronJSON),
fields: make(map[string]struct{}),
fieldTimes: make(map[string]string),
ids: []uint64{},
}
if strings.HasPrefix(versionSpec, ":") {
branch := strings.TrimPrefix(versionSpec, ":")
Expand All @@ -81,6 +83,7 @@ func (d *Data) initMemoryDB(versions []string) error {
} else if err := d.loadMemDB(v, mdb); err != nil {
return err
}
d.initFieldTimes(mdb)
} else {
uuid, v, err := datastore.MatchingUUID(versionSpec)
if err != nil {
Expand All @@ -98,6 +101,25 @@ func (d *Data) initMemoryDB(versions []string) error {
return nil
}

// initialize the fieldTimes map for an already loaded memdb.
func (d *Data) initFieldTimes(mdb *memdb) {
for _, neuronjson := range mdb.data {
for field := range neuronjson {
if strings.HasSuffix(field, "_time") {
rootField := field[:len(field)-5]
timestamp := neuronjson[field].(string)
if _, found := mdb.fieldTimes[rootField]; !found {
mdb.fieldTimes[rootField] = timestamp
} else {
if timestamp > mdb.fieldTimes[rootField] {
mdb.fieldTimes[rootField] = timestamp
}
}
}
}
}
}

func (d *Data) loadMemDB(v dvid.VersionID, mdb *memdb) error {
ctx := datastore.NewVersionedCtx(d, v)
db, err := datastore.GetOrderedKeyValueDB(d)
Expand Down
41 changes: 41 additions & 0 deletions datatype/neuronjson/neuronjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ GET <api URL>/node/<UUID>/<data name>/fields
["field1", "field2", ...]
GET <api URL>/node/<UUID>/<data name>/fieldtimes
Returns the RFC3339 timestamps for each field in the most recent version:
{"field1": "timestamp1", "field2": "timestamp2", ...}
GET <api URL>/node/<UUID>/<data name>/keyrange/<key1>/<key2>
Returns all keys between 'key1' and 'key2' for this data instance in JSON format:
Expand Down Expand Up @@ -1176,6 +1182,20 @@ func (d *Data) GetFields(ctx storage.VersionedCtx) ([]string, error) {
return fields, nil
}

func (d *Data) GetFieldTimes(ctx storage.VersionedCtx) (map[string]string, error) {
mdb, found := d.getMemDBbyVersion(ctx.VersionID())
if !found {
return nil, fmt.Errorf("unable to get fields because no in-memory db for neuronjson %q, version %d", d.DataName(), ctx.VersionID())
}
mdb.mu.RLock()
fieldTimes := make(map[string]string, len(mdb.fieldTimes))
for field, timeStr := range mdb.fieldTimes {
fieldTimes[field] = timeStr
}
mdb.mu.RUnlock()
return fieldTimes, nil
}

// GetByBodyID returns the JSON for a given body ID.
func (d *Data) GetByBodyID(ctx storage.VersionedCtx, bodyid uint64, fieldMap map[string]struct{}, showFields Fields) (jsonData []byte, found bool, err error) {
var value map[string]interface{}
Expand Down Expand Up @@ -1369,8 +1389,14 @@ func (d *Data) storeAndUpdate(ctx *datastore.VersionedCtx, keyStr string, newDat
if found {
mdb.mu.Lock()
mdb.data[bodyid] = newData

// cache updated field and field timestamps
for field := range newData {
mdb.fields[field] = struct{}{}
if strings.HasSuffix(field, "_time") {
rootField := field[:len(field)-5]
mdb.fieldTimes[rootField] = newData[field].(string)
}
}
mdb.addBodyID(bodyid)
mdb.mu.Unlock()
Expand Down Expand Up @@ -2136,6 +2162,21 @@ func (d *Data) ServeHTTP(uuid dvid.UUID, ctx *datastore.VersionedCtx, w http.Res
fmt.Fprint(w, string(jsonBytes))
comment = "HTTP GET fields"

case "fieldtimes":
fieldTimes, err := d.GetFieldTimes(ctx)
if err != nil {
server.BadRequest(w, r, err)
return
}
jsonBytes, err := json.Marshal(fieldTimes)
if err != nil {
server.BadRequest(w, r, err)
return
}
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, string(jsonBytes))
comment = "HTTP GET fieldtimes"

case "query":
if action != "post" && action != "get" {
server.BadRequest(w, r, fmt.Errorf("only GET or POST methods allowed for /query endpoint"))
Expand Down
14 changes: 14 additions & 0 deletions datatype/neuronjson/neuronjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,7 @@ func TestKeyvalueRequests(t *testing.T) {
}

// Check if keys are re-POSTed using replace=true. Don't modify "a number" field.
time.Sleep(2 * time.Second) // need to change timestamp for test
value3mod = `{"bodyid": 3000, "a string": "goo replaced", "only list": [1, 2], "a number": 3456}`
key3modreq = fmt.Sprintf("%snode/%s/%s/key/%s?u=sandra&show=all&replace=true", server.WebAPIPath, uuid, name, key3)
server.TestHTTP(t, "POST", key3modreq, strings.NewReader(value3mod))
Expand All @@ -955,9 +956,12 @@ func TestKeyvalueRequests(t *testing.T) {
if value, found := responseJSON["a number_time"]; !found || value != old_time {
t.Fatalf("Bad response: %v\n", responseJSON)
}

if value, found := responseJSON["a string"]; !found || value != "goo replaced" {
t.Fatalf("Bad response: %v\n", responseJSON)
}
new_time := responseJSON["a string_time"]

if value, found := responseJSON["only list"]; !found || !reflect.DeepEqual(value, []int64{1, 2}) {
t.Fatalf("Bad response (type %s): %v\n", reflect.TypeOf(value), responseJSON)
}
Expand All @@ -970,6 +974,16 @@ func TestKeyvalueRequests(t *testing.T) {
if len(responseJSON) != 10 {
t.Fatalf("Expected 10 fields in response after replace, got: %v\n", responseJSON)
}

// Check if fieldtimes endpoint has the newer times.
fieldtimeReq := fmt.Sprintf("%snode/%s/%s/fieldtimes", server.WebAPIPath, uuid, name)
returnValue = server.TestHTTP(t, "GET", fieldtimeReq, nil)
if err := json.Unmarshal(returnValue, &responseJSON); err != nil {
t.Fatalf("Couldn't unmarshal response JSON: %s\n", string(returnValue))
}
if value, found := responseJSON["a string"]; !found || value != new_time {
t.Fatalf("Bad timestamp response: %v\n", responseJSON)
}
}

var testData = []struct {
Expand Down

0 comments on commit 779bb50

Please sign in to comment.