Skip to content

Commit

Permalink
Fix: OPG computations was moved to app from database
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Jul 10, 2023
1 parent 3394a55 commit 64754b2
Showing 1 changed file with 80 additions and 37 deletions.
117 changes: 80 additions & 37 deletions internal/postgres/operation/storage.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package operation

import (
"bytes"
"fmt"
"time"

Expand Down Expand Up @@ -245,47 +246,89 @@ func (storage *Storage) OPG(address string, size, lastID int64) ([]operation.OPG
return nil, err
}

limit := storage.GetPageSize(size)
var (
end bool
result = make([]operation.OPG, 0)
currentOpg operation.OPG
isAdded bool
)
for !end {
subQuery := storage.DB.Model((*operation.Operation)(nil)).
WhereGroup(
func(q *orm.Query) (*orm.Query, error) {
return q.Where("destination_id = ?", accountID).WhereOr("source_id = ?", accountID), nil
},
).
Limit(1000).
Order("id desc")

if lastID > 0 {
subQuery.Where("id < ?", lastID)
}

var ops []operation.Operation
if err := subQuery.Select(&ops); err != nil {
return nil, err
}

if len(ops) == 0 {
break
}

for i := range ops {
if !bytes.Equal(currentOpg.Hash, ops[i].Hash) && currentOpg.Counter != ops[i].Counter {
if len(currentOpg.Hash) > 0 && currentOpg.Counter > 0 {
result = append(result, currentOpg)
currentOpg = operation.OPG{}
isAdded = true
}

if len(result) == int(size) {
end = true
break
}
}

currentOpg.Hash = ops[i].Hash
currentOpg.Counter = ops[i].Counter
currentOpg.LastID = ops[i].ID
currentOpg.Level = ops[i].Level
currentOpg.ContentIndex = ops[i].ContentIndex
currentOpg.Timestamp = ops[i].Timestamp

if currentOpg.Entrypoint == "" && ops[i].DestinationID == accountID {
currentOpg.Entrypoint = ops[i].Entrypoint.String()
}

if currentOpg.Status < ops[i].Status {
currentOpg.Status = ops[i].Status
}
if currentOpg.Kind > ops[i].Kind || currentOpg.Kind == 0 {
currentOpg.Kind = ops[i].Kind
}

subQuery := storage.DB.Model(new(operation.Operation)).
Column("id", "hash", "counter", "status", "kind").
WhereGroup(
func(q *orm.Query) (*orm.Query, error) {
return q.Where("destination_id = ?", accountID).WhereOr("source_id = ?", accountID), nil
},
)
if ops[i].SourceID == accountID {
currentOpg.Flow -= ops[i].Amount
} else {
currentOpg.Flow += ops[i].Amount
}

if ops[i].Internal {
currentOpg.Internals += 1
}

currentOpg.TotalCost += (ops[i].Burned + ops[i].Fee)
isAdded = false
}

lastID = currentOpg.LastID
}

if lastID > 0 {
subQuery.Where("id < ?", lastID)
if len(currentOpg.Hash) > 0 && currentOpg.Counter > 0 && !isAdded {
result = append(result, currentOpg)
}

var opg []operation.OPG
_, err := storage.DB.Query(&opg, `
with opg as (?)
select
ta.last_id,
ta.status,
ta.counter,
ta.kind,
(select sum(case when source_id = ? then -"amount" else "amount" end) as "flow"
from operations
where hash = ta.hash and counter = ta.counter) as "flow",
(select sum(internal::integer) as internals
from operations
where hash = ta.hash and counter = ta.counter),
(select sum("burned") + sum("fee") as total_cost
from operations
where hash = ta.hash and counter = ta.counter),
ta.hash, operations.level, operations.timestamp, operations.entrypoint, operations.content_index from (
select min(id) as last_id, hash, counter, max(status) as status, min(kind) as kind from (select * from opg) as t
group by hash, counter
order by last_id desc
limit ?
) as ta
join operations on operations.id = ta.last_id
order by last_id desc
`, subQuery, accountID, limit)
return opg, err
return result, nil
}

// GetByHashAndCounter -
Expand Down

0 comments on commit 64754b2

Please sign in to comment.