From 779bb50bdc6d740b3e877db0eac0d16967186b41 Mon Sep 17 00:00:00 2001 From: Bill Katz Date: Tue, 23 Jul 2024 11:43:37 -0400 Subject: [PATCH] add /fieldtimes endpoint --- datatype/neuronjson/memstore.go | 36 +++++++++++++++++----- datatype/neuronjson/neuronjson.go | 41 ++++++++++++++++++++++++++ datatype/neuronjson/neuronjson_test.go | 14 +++++++++ 3 files changed, 84 insertions(+), 7 deletions(-) diff --git a/datatype/neuronjson/memstore.go b/datatype/neuronjson/memstore.go index ab62c13c..5a4f94f9 100644 --- a/datatype/neuronjson/memstore.go +++ b/datatype/neuronjson/memstore.go @@ -50,10 +50,11 @@ func (d *Data) getMemDBbyVersion(v dvid.VersionID) (db *memdb, found bool) { // in-memory neuron annotations with sorted body id list for optional sorted iteration. type memdb struct { - data map[uint64]NeuronJSON - ids []uint64 // sorted list of body ids - fields map[string]struct{} // list of all fields among the annotations - mu sync.RWMutex + data map[uint64]NeuronJSON + ids []uint64 // sorted list of body ids + fields map[string]struct{} // list of all fields among the annotations across versions + fieldTimes map[string]string // timestamp of last update for each field in HEAD + mu sync.RWMutex } // initializes the in-memory dbs for the given list of UUIDs + branch names in @@ -67,9 +68,10 @@ func (d *Data) initMemoryDB(versions []string) error { dvid.Infof("Initializing in-memory dbs for neuronjson %q with versions %v\n", d.DataName(), versions) for _, versionSpec := range versions { mdb := &memdb{ - data: make(map[uint64]NeuronJSON), - fields: make(map[string]struct{}), - ids: []uint64{}, + data: make(map[uint64]NeuronJSON), + fields: make(map[string]struct{}), + fieldTimes: make(map[string]string), + ids: []uint64{}, } if strings.HasPrefix(versionSpec, ":") { branch := strings.TrimPrefix(versionSpec, ":") @@ -81,6 +83,7 @@ func (d *Data) initMemoryDB(versions []string) error { } else if err := d.loadMemDB(v, mdb); err != nil { return err } + d.initFieldTimes(mdb) } else { uuid, v, err := datastore.MatchingUUID(versionSpec) if err != nil { @@ -98,6 +101,25 @@ func (d *Data) initMemoryDB(versions []string) error { return nil } +// initialize the fieldTimes map for an already loaded memdb. +func (d *Data) initFieldTimes(mdb *memdb) { + for _, neuronjson := range mdb.data { + for field := range neuronjson { + if strings.HasSuffix(field, "_time") { + rootField := field[:len(field)-5] + timestamp := neuronjson[field].(string) + if _, found := mdb.fieldTimes[rootField]; !found { + mdb.fieldTimes[rootField] = timestamp + } else { + if timestamp > mdb.fieldTimes[rootField] { + mdb.fieldTimes[rootField] = timestamp + } + } + } + } + } +} + func (d *Data) loadMemDB(v dvid.VersionID, mdb *memdb) error { ctx := datastore.NewVersionedCtx(d, v) db, err := datastore.GetOrderedKeyValueDB(d) diff --git a/datatype/neuronjson/neuronjson.go b/datatype/neuronjson/neuronjson.go index d9bd6c63..14c4258f 100644 --- a/datatype/neuronjson/neuronjson.go +++ b/datatype/neuronjson/neuronjson.go @@ -209,6 +209,12 @@ GET /node///fields ["field1", "field2", ...] +GET /node///fieldtimes + + Returns the RFC3339 timestamps for each field in the most recent version: + + {"field1": "timestamp1", "field2": "timestamp2", ...} + GET /node///keyrange// Returns all keys between 'key1' and 'key2' for this data instance in JSON format: @@ -1176,6 +1182,20 @@ func (d *Data) GetFields(ctx storage.VersionedCtx) ([]string, error) { return fields, nil } +func (d *Data) GetFieldTimes(ctx storage.VersionedCtx) (map[string]string, error) { + mdb, found := d.getMemDBbyVersion(ctx.VersionID()) + if !found { + return nil, fmt.Errorf("unable to get fields because no in-memory db for neuronjson %q, version %d", d.DataName(), ctx.VersionID()) + } + mdb.mu.RLock() + fieldTimes := make(map[string]string, len(mdb.fieldTimes)) + for field, timeStr := range mdb.fieldTimes { + fieldTimes[field] = timeStr + } + mdb.mu.RUnlock() + return fieldTimes, nil +} + // GetByBodyID returns the JSON for a given body ID. func (d *Data) GetByBodyID(ctx storage.VersionedCtx, bodyid uint64, fieldMap map[string]struct{}, showFields Fields) (jsonData []byte, found bool, err error) { var value map[string]interface{} @@ -1369,8 +1389,14 @@ func (d *Data) storeAndUpdate(ctx *datastore.VersionedCtx, keyStr string, newDat if found { mdb.mu.Lock() mdb.data[bodyid] = newData + + // cache updated field and field timestamps for field := range newData { mdb.fields[field] = struct{}{} + if strings.HasSuffix(field, "_time") { + rootField := field[:len(field)-5] + mdb.fieldTimes[rootField] = newData[field].(string) + } } mdb.addBodyID(bodyid) mdb.mu.Unlock() @@ -2136,6 +2162,21 @@ func (d *Data) ServeHTTP(uuid dvid.UUID, ctx *datastore.VersionedCtx, w http.Res fmt.Fprint(w, string(jsonBytes)) comment = "HTTP GET fields" + case "fieldtimes": + fieldTimes, err := d.GetFieldTimes(ctx) + if err != nil { + server.BadRequest(w, r, err) + return + } + jsonBytes, err := json.Marshal(fieldTimes) + if err != nil { + server.BadRequest(w, r, err) + return + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, string(jsonBytes)) + comment = "HTTP GET fieldtimes" + case "query": if action != "post" && action != "get" { server.BadRequest(w, r, fmt.Errorf("only GET or POST methods allowed for /query endpoint")) diff --git a/datatype/neuronjson/neuronjson_test.go b/datatype/neuronjson/neuronjson_test.go index 4935aec7..3a9959e7 100644 --- a/datatype/neuronjson/neuronjson_test.go +++ b/datatype/neuronjson/neuronjson_test.go @@ -932,6 +932,7 @@ func TestKeyvalueRequests(t *testing.T) { } // Check if keys are re-POSTed using replace=true. Don't modify "a number" field. + time.Sleep(2 * time.Second) // need to change timestamp for test value3mod = `{"bodyid": 3000, "a string": "goo replaced", "only list": [1, 2], "a number": 3456}` key3modreq = fmt.Sprintf("%snode/%s/%s/key/%s?u=sandra&show=all&replace=true", server.WebAPIPath, uuid, name, key3) server.TestHTTP(t, "POST", key3modreq, strings.NewReader(value3mod)) @@ -955,9 +956,12 @@ func TestKeyvalueRequests(t *testing.T) { if value, found := responseJSON["a number_time"]; !found || value != old_time { t.Fatalf("Bad response: %v\n", responseJSON) } + if value, found := responseJSON["a string"]; !found || value != "goo replaced" { t.Fatalf("Bad response: %v\n", responseJSON) } + new_time := responseJSON["a string_time"] + if value, found := responseJSON["only list"]; !found || !reflect.DeepEqual(value, []int64{1, 2}) { t.Fatalf("Bad response (type %s): %v\n", reflect.TypeOf(value), responseJSON) } @@ -970,6 +974,16 @@ func TestKeyvalueRequests(t *testing.T) { if len(responseJSON) != 10 { t.Fatalf("Expected 10 fields in response after replace, got: %v\n", responseJSON) } + + // Check if fieldtimes endpoint has the newer times. + fieldtimeReq := fmt.Sprintf("%snode/%s/%s/fieldtimes", server.WebAPIPath, uuid, name) + returnValue = server.TestHTTP(t, "GET", fieldtimeReq, nil) + if err := json.Unmarshal(returnValue, &responseJSON); err != nil { + t.Fatalf("Couldn't unmarshal response JSON: %s\n", string(returnValue)) + } + if value, found := responseJSON["a string"]; !found || value != new_time { + t.Fatalf("Bad timestamp response: %v\n", responseJSON) + } } var testData = []struct {