Skip to content

Commit

Permalink
add new interface in processor
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 committed Jul 3, 2024
1 parent ed564f2 commit 29529c8
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 87 deletions.
81 changes: 0 additions & 81 deletions enterprise/trackedusers/mocks/mock_user_reporter.go

This file was deleted.

7 changes: 6 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type sourceObserver interface {
ObserveSourceEvents(source *backendconfig.SourceT, events []transformer.TransformerEvent)
}

type trackedUsersReporter interface {
ReportUsers(ctx context.Context, reports []*trackedusers.UsersReport, tx *Tx) error
GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourceIdFilter map[string]bool) []*trackedusers.UsersReport
}

// Handle is a handle to the processor module
type Handle struct {
conf *config.Config
Expand Down Expand Up @@ -162,7 +167,7 @@ type Handle struct {
storePlocker kitsync.PartitionLocker

sourceObservers []sourceObserver
trackedUsersReporter trackedusers.UsersReporter
trackedUsersReporter trackedUsersReporter
}
type processorStats struct {
statGatewayDBR func(partition string) stats.Measurement
Expand Down
40 changes: 35 additions & 5 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"testing"
"time"

mockdatacollector "github.com/rudderlabs/rudder-server/enterprise/trackedusers/mocks"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

Expand Down Expand Up @@ -70,6 +70,34 @@ func (m *mockObserver) ObserveSourceEvents(source *backendconfig.SourceT, events
}{source: source, events: events})
}

type mockTrackedUsersReporter struct {
generateCalls []struct {
jobs []*jobsdb.JobT
}
reportCalls []struct {
reportedReports []*trackedusers.UsersReport
}
}

func (m *mockTrackedUsersReporter) ReportUsers(ctx context.Context, reports []*trackedusers.UsersReport, tx *Tx) error {
m.reportCalls = append(m.reportCalls, struct {
reportedReports []*trackedusers.UsersReport
}{reportedReports: reports})
return nil
}

func (m *mockTrackedUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, _ map[string]bool) []*trackedusers.UsersReport {
m.generateCalls = append(m.generateCalls, struct {
jobs []*jobsdb.JobT
}{jobs: jobs})
return lo.FilterMap(jobs, func(job *jobsdb.JobT, _ int) (*trackedusers.UsersReport, bool) {
return &trackedusers.UsersReport{
WorkspaceID: job.WorkspaceId,
SourceID: gjson.GetBytes(job.Parameters, "source_id").String(),
}, true
})
}

type testContext struct {
mockCtrl *gomock.Controller
mockBackendConfig *mocksBackendConfig.MockBackendConfig
Expand All @@ -84,7 +112,7 @@ type testContext struct {
MockDedup *mockDedup.MockDedup
MockObserver *mockObserver
MockRsourcesService *rsources.MockJobService
mockTrackedUsersReporter *mockdatacollector.MockUsersReporter
mockTrackedUsersReporter *mockTrackedUsersReporter
}

func (c *testContext) Setup() {
Expand Down Expand Up @@ -114,7 +142,7 @@ func (c *testContext) Setup() {
c.MockReportingI = mockReportingTypes.NewMockReporting(c.mockCtrl)
c.MockDedup = mockDedup.NewMockDedup(c.mockCtrl)
c.MockObserver = &mockObserver{}
c.mockTrackedUsersReporter = mockdatacollector.NewMockUsersReporter(c.mockCtrl)
c.mockTrackedUsersReporter = &mockTrackedUsersReporter{}
}

func (c *testContext) Finish() {
Expand Down Expand Up @@ -1731,15 +1759,17 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func()
SourceID: SourceIDEnabledNoUT,
},
}
c.mockTrackedUsersReporter.EXPECT().GenerateReportsFromJobs(unprocessedJobsList, map[string]bool{}).Return(trackerUsersReports).Times(1)
c.mockTrackedUsersReporter.EXPECT().ReportUsers(gomock.Any(), trackerUsersReports, gomock.Any()).Times(1)
Setup(processor, c, false, false)
processor.trackedUsersReporter = c.mockTrackedUsersReporter
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")
handlePendingGatewayJobs(processor)
Expect(c.mockTrackedUsersReporter.generateCalls).To(HaveLen(1))
Expect(c.mockTrackedUsersReporter.generateCalls[0].jobs).Should(Equal(unprocessedJobsList))
Expect(c.mockTrackedUsersReporter.reportCalls).To(HaveLen(1))
Expect(c.mockTrackedUsersReporter.reportCalls[0].reportedReports).Should(Equal(trackerUsersReports))
})
})
})
Expand Down

0 comments on commit 29529c8

Please sign in to comment.