Skip to content

Commit

Permalink
refactor: move datapipe status state from in-memory to postgres (#653)
Browse files Browse the repository at this point in the history
  • Loading branch information
brandonshearin committed Jun 26, 2024
1 parent 924acb4 commit 9b52371
Show file tree
Hide file tree
Showing 14 changed files with 226 additions and 105 deletions.
4 changes: 1 addition & 3 deletions cmd/api/src/api/registration/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
v2 "github.com/specterops/bloodhound/src/api/v2"
"github.com/specterops/bloodhound/src/auth"
"github.com/specterops/bloodhound/src/config"
"github.com/specterops/bloodhound/src/daemons/datapipe"
"github.com/specterops/bloodhound/src/database"
"github.com/specterops/bloodhound/src/queries"
)
Expand Down Expand Up @@ -59,7 +58,6 @@ func RegisterFossRoutes(
apiCache cache.Cache,
collectorManifests config.CollectorManifests,
authenticator api.Authenticator,
taskNotifier datapipe.Tasker,
authorizer auth.Authorizer,
) {
router.With(middleware.DefaultRateLimitMiddleware,
Expand All @@ -77,6 +75,6 @@ func RegisterFossRoutes(
routerInst.PathPrefix("/ui", static.AssetHandler),
)

var resources = v2.NewResources(rdms, graphDB, cfg, apiCache, graphQuery, collectorManifests, taskNotifier, authorizer)
var resources = v2.NewResources(rdms, graphDB, cfg, apiCache, graphQuery, collectorManifests, authorizer)
NewV2API(cfg, resources, routerInst, authenticator)
}
6 changes: 5 additions & 1 deletion cmd/api/src/api/v2/datapipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@ import (
)

func (s Resources) GetDatapipeStatus(response http.ResponseWriter, request *http.Request) {
api.WriteBasicResponse(request.Context(), s.TaskNotifier.GetStatus(), http.StatusOK, response)
if datapipeStatus, err := s.DB.GetDatapipeStatus(request.Context()); err != nil {
api.HandleDatabaseError(request, response, err)
} else {
api.WriteBasicResponse(request.Context(), datapipeStatus, http.StatusOK, response)
}
}
39 changes: 39 additions & 0 deletions cmd/api/src/api/v2/datapipe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

//go:build serial_integration
// +build serial_integration

package v2_test

import (
"testing"
"time"

"github.com/specterops/bloodhound/src/api/v2/integration"
"github.com/specterops/bloodhound/src/model"
"github.com/stretchr/testify/require"
)

func TestGetDatapipeStatus(t *testing.T) {
testCtx := integration.NewFOSSContext(t)

testCtx.WaitForDatapipeIdle(90 * time.Second)

datapipeStatus, err := testCtx.AdminClient().GetDatapipeStatus()
require.Nil(t, err)
require.Equal(t, datapipeStatus.Status, model.DatapipeStatusIdle)
}
4 changes: 0 additions & 4 deletions cmd/api/src/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/specterops/bloodhound/dawgs/graph"
"github.com/specterops/bloodhound/src/auth"
"github.com/specterops/bloodhound/src/config"
"github.com/specterops/bloodhound/src/daemons/datapipe"
"github.com/specterops/bloodhound/src/database"
"github.com/specterops/bloodhound/src/model"
"github.com/specterops/bloodhound/src/queries"
Expand Down Expand Up @@ -142,7 +141,6 @@ type Resources struct {
QueryParameterFilterParser model.QueryParameterFilterParser
Cache cache.Cache
CollectorManifests config.CollectorManifests
TaskNotifier datapipe.Tasker
Authorizer auth.Authorizer
}

Expand All @@ -153,7 +151,6 @@ func NewResources(
apiCache cache.Cache,
graphQuery queries.Graph,
collectorManifests config.CollectorManifests,
taskNotifier datapipe.Tasker,
authorizer auth.Authorizer,
) Resources {
return Resources{
Expand All @@ -165,7 +162,6 @@ func NewResources(
QueryParameterFilterParser: model.NewQueryParameterFilterParser(),
Cache: apiCache,
CollectorManifests: collectorManifests,
TaskNotifier: taskNotifier,
Authorizer: authorizer,
}
}
45 changes: 25 additions & 20 deletions cmd/api/src/daemons/datapipe/datapipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
//
// SPDX-License-Identifier: Apache-2.0

//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/tasker.go -package=mocks . Tasker
package datapipe

import (
Expand All @@ -37,17 +36,12 @@ const (
pruningInterval = time.Hour * 24
)

type Tasker interface {
GetStatus() model.DatapipeStatusWrapper
}

type Daemon struct {
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
orphanedFileSweeper *OrphanFileSweeper
}
Expand All @@ -65,17 +59,9 @@ func NewDaemon(ctx context.Context, cfg config.Configuration, connections bootst
ctx: ctx,
orphanedFileSweeper: NewOrphanFileSweeper(NewOSFileOperations(), cfg.TempDirectory()),
tickInterval: tickInterval,
status: model.DatapipeStatusWrapper{
Status: model.DatapipeStatusIdle,
UpdatedAt: time.Now().UTC(),
},
}
}

func (s *Daemon) GetStatus() model.DatapipeStatusWrapper {
return s.status
}

func (s *Daemon) analyze() {
// Ensure that the user-requested analysis switch is deleted. This is done at the beginning of the
// function so that any re-analysis requests are caught while analysis is in-progress.
Expand All @@ -88,16 +74,27 @@ func (s *Daemon) analyze() {
return
}

s.status.Update(model.DatapipeStatusAnalyzing, false)
if err := s.db.SetDatapipeStatus(s.ctx, model.DatapipeStatusAnalyzing, false); err != nil {
log.Errorf("Error setting datapipe status: %v", err)
return
}

defer log.LogAndMeasure(log.LevelInfo, "Graph Analysis")()

if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil {
if errors.Is(err, ErrAnalysisFailed) {
FailAnalyzedFileUploadJobs(s.ctx, s.db)
s.status.Update(model.DatapipeStatusIdle, false)
if err := s.db.SetDatapipeStatus(s.ctx, model.DatapipeStatusIdle, false); err != nil {
log.Errorf("Error setting datapipe status: %v", err)
return
}

} else if errors.Is(err, ErrAnalysisPartiallyCompleted) {
PartialCompleteFileUploadJobs(s.ctx, s.db)
s.status.Update(model.DatapipeStatusIdle, true)
if err := s.db.SetDatapipeStatus(s.ctx, model.DatapipeStatusIdle, true); err != nil {
log.Errorf("Error setting datapipe status: %v", err)
return
}
}
} else {
CompleteAnalyzedFileUploadJobs(s.ctx, s.db)
Expand All @@ -108,7 +105,10 @@ func (s *Daemon) analyze() {
resetCache(s.cache, entityPanelCachingFlag.Enabled)
}

s.status.Update(model.DatapipeStatusIdle, true)
if err := s.db.SetDatapipeStatus(s.ctx, model.DatapipeStatusIdle, true); err != nil {
log.Errorf("Error setting datapipe status: %v", err)
return
}
}
}

Expand Down Expand Up @@ -175,12 +175,17 @@ func (s *Daemon) Start(ctx context.Context) {

func (s *Daemon) deleteData() {
defer func() {
s.status.Update(model.DatapipeStatusIdle, false)
_ = s.db.SetDatapipeStatus(s.ctx, model.DatapipeStatusIdle, false)
_ = s.db.DeleteAnalysisRequest(s.ctx)
_ = s.db.RequestAnalysis(s.ctx, "datapie")
}()
defer log.Measure(log.LevelInfo, "Purge Graph Data Completed")()
s.status.Update(model.DatapipeStatusPurging, false)

if err := s.db.SetDatapipeStatus(s.ctx, model.DatapipeStatusPurging, false); err != nil {
log.Errorf("Error setting datapipe status: %v", err)
return
}

log.Infof("Begin Purge Graph Data")

if err := s.db.CancelAllFileUploads(s.ctx); err != nil {
Expand Down
7 changes: 5 additions & 2 deletions cmd/api/src/daemons/datapipe/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,11 @@ func (s *Daemon) processIngestFile(ctx context.Context, path string, fileType mo

// processIngestTasks covers the generic file upload case for ingested data.
func (s *Daemon) processIngestTasks(ctx context.Context, ingestTasks model.IngestTasks) {
s.status.Update(model.DatapipeStatusIngesting, false)
defer s.status.Update(model.DatapipeStatusIdle, false)
if err := s.db.SetDatapipeStatus(s.ctx, model.DatapipeStatusIngesting, false); err != nil {
log.Errorf("Error setting datapipe status: %v", err)
return
}
defer s.db.SetDatapipeStatus(s.ctx, model.DatapipeStatusIdle, false)

for _, ingestTask := range ingestTasks {
// Check the context to see if we should continue processing ingest tasks. This has to be explicit since error
Expand Down
65 changes: 0 additions & 65 deletions cmd/api/src/daemons/datapipe/mocks/tasker.go

This file was deleted.

50 changes: 50 additions & 0 deletions cmd/api/src/database/datapipestatus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package database

import (
"context"
"database/sql"
"time"

"github.com/specterops/bloodhound/src/model"
)

func (s *BloodhoundDB) SetDatapipeStatus(ctx context.Context, status model.DatapipeStatus, updateAnalysisTime bool) error {

updateSql := "UPDATE datapipe_status SET status = ?, updated_at = ?"
now := time.Now().UTC()

if updateAnalysisTime {
updateSql += ", last_complete_analysis_at = ?;"
return s.db.WithContext(ctx).Exec(updateSql, status, now, now).Error
} else {
updateSql += ";"
return s.db.WithContext(ctx).Exec(updateSql, status, now).Error
}

}

func (s *BloodhoundDB) GetDatapipeStatus(ctx context.Context) (model.DatapipeStatusWrapper, error) {
var datapipeStatus model.DatapipeStatusWrapper

if tx := s.db.WithContext(ctx).Raw("SELECT status, updated_at, last_complete_analysis_at FROM datapipe_status LIMIT 1;").Scan(&datapipeStatus); tx.RowsAffected == 0 {
return datapipeStatus, sql.ErrNoRows
}

return datapipeStatus, nil
}
53 changes: 53 additions & 0 deletions cmd/api/src/database/datapipestatus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package database_test

import (
"context"
"testing"

"github.com/specterops/bloodhound/src/model"
"github.com/specterops/bloodhound/src/test/integration"
"github.com/stretchr/testify/require"
)

func TestDatapipeStatus(t *testing.T) {
var (
testCtx = context.Background()
db = integration.SetupDB(t)
)

status, err := db.GetDatapipeStatus(testCtx)
require.Nil(t, err)
require.Equal(t, model.DatapipeStatusIdle, status.Status)

err = db.SetDatapipeStatus(testCtx, model.DatapipeStatusAnalyzing, false)
require.Nil(t, err)

status, err = db.GetDatapipeStatus(testCtx)
require.Nil(t, err)
require.Equal(t, model.DatapipeStatusAnalyzing, status.Status)

// when `SetDatapipeStatus` is called with `true` for the `updateAnalysisTime` parameter, assert that the time is no longer null
require.True(t, status.LastCompleteAnalysisAt.IsZero())
err = db.SetDatapipeStatus(testCtx, model.DatapipeStatusAnalyzing, true)
require.Nil(t, err)
status, err = db.GetDatapipeStatus(testCtx)
require.Nil(t, err)
require.True(t, !status.LastCompleteAnalysisAt.IsZero())

}
Loading

0 comments on commit 9b52371

Please sign in to comment.