Skip to content

Commit

Permalink
add ut and adjust the code about kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
lyh169 committed Oct 8, 2023
1 parent 124feae commit 0f41992
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 15 deletions.
4 changes: 2 additions & 2 deletions gasprice/fixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (f *FixedGasPrice) UpdateGasPriceAvg() {

l2CoinPrice := f.ratePrc.GetL2CoinPrice()
if l2CoinPrice < 1e-18 {
log.Warn("the l2 coin price too small...")
log.Warn("the L2 native coin price too small...")
return
}
res := new(big.Float).Mul(big.NewFloat(0).SetFloat64(f.cfg.GasPriceUsdt/l2CoinPrice), big.NewFloat(0).SetFloat64(OKBWei))
Expand Down Expand Up @@ -78,7 +78,7 @@ func (f *FixedGasPrice) UpdateGasPriceAvg() {
} else {
truncateValue = result
}
log.Debugf("Storing truncated L2 gas price: %v l2CoinPrice: %v", truncateValue, l2CoinPrice)
log.Debugf("Storing truncated L2 gas price: %v, L2 native coin price: %v", truncateValue, l2CoinPrice)
if truncateValue != nil {
err := f.pool.SetGasPrices(ctx, truncateValue.Uint64(), l1GasPrice.Uint64())
if err != nil {
Expand Down
165 changes: 165 additions & 0 deletions gasprice/fixed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package gasprice

import (
"context"
"math/big"
"testing"
"time"

"github.com/0xPolygonHermez/zkevm-node/config/types"
"github.com/0xPolygonHermez/zkevm-node/log"
)

func init() {
log.Init(log.Config{
Level: "debug",
Outputs: []string{"stdout"},
})
}

func TestUpdateGasPriceFixed(t *testing.T) {
ctx := context.Background()
var d time.Duration = 1000000000

cfg := Config{
Type: FixedType,
DefaultGasPriceWei: 1000000000,
UpdatePeriod: types.NewDuration(d),
Factor: 0.5,
KafkaURL: "127.0.0.1:9092",
Topic: "middle_coinPrice_push",
DefaultL2CoinPrice: 40,
GasPriceUsdt: 0.001,
}
l1GasPrice := big.NewInt(10000000000)
l2GasPrice := uint64(25000000000000)
poolM := new(poolMock)
ethM := new(ethermanMock)
ethM.On("GetL1GasPrice", ctx).Return(l1GasPrice).Once()
poolM.On("SetGasPrices", ctx, l2GasPrice, l1GasPrice.Uint64()).Return(nil).Once()
f := newFixedGasPriceSuggester(ctx, cfg, poolM, ethM)

ethM.On("GetL1GasPrice", ctx).Return(l1GasPrice, l1GasPrice).Once()
poolM.On("SetGasPrices", ctx, l2GasPrice, l1GasPrice.Uint64()).Return(nil).Once()
f.UpdateGasPriceAvg()
}

func TestUpdateGasPriceAvgCases(t *testing.T) {
var d time.Duration = 1000000000
testcases := []struct {
cfg Config
l1GasPrice *big.Int
l2GasPrice uint64
}{
{
cfg: Config{
Type: FixedType,
DefaultGasPriceWei: 1000000000,
UpdatePeriod: types.NewDuration(d),
KafkaURL: "127.0.0.1:9092",
Topic: "middle_coinPrice_push",
DefaultL2CoinPrice: 40,
GasPriceUsdt: 0.001,
},
l1GasPrice: big.NewInt(10000000000),
l2GasPrice: uint64(25000000000000),
},
{
cfg: Config{
Type: FixedType,
DefaultGasPriceWei: 1000000000,
UpdatePeriod: types.NewDuration(d),
KafkaURL: "127.0.0.1:9092",
Topic: "middle_coinPrice_push",
DefaultL2CoinPrice: 1e-19,
GasPriceUsdt: 0.001,
},
l1GasPrice: big.NewInt(10000000000),
l2GasPrice: uint64(25000000000000),
},
{ // the gas price small than the min gas price
cfg: Config{
Type: FixedType,
DefaultGasPriceWei: 26000000000000,
UpdatePeriod: types.NewDuration(d),
KafkaURL: "127.0.0.1:9092",
Topic: "middle_coinPrice_push",
DefaultL2CoinPrice: 40,
GasPriceUsdt: 0.001,
},
l1GasPrice: big.NewInt(10000000000),
l2GasPrice: uint64(26000000000000),
},
{ // the gas price bigger than the max gas price
cfg: Config{
Type: FixedType,
DefaultGasPriceWei: 1000000000000,
MaxGasPriceWei: 23000000000000,
UpdatePeriod: types.NewDuration(d),
KafkaURL: "127.0.0.1:9092",
Topic: "middle_coinPrice_push",
DefaultL2CoinPrice: 40,
GasPriceUsdt: 0.001,
},
l1GasPrice: big.NewInt(10000000000),
l2GasPrice: uint64(23000000000000),
},
{
cfg: Config{
Type: FixedType,
DefaultGasPriceWei: 1000000000,
UpdatePeriod: types.NewDuration(d),
KafkaURL: "127.0.0.1:9092",
Topic: "middle_coinPrice_push",
DefaultL2CoinPrice: 30,
GasPriceUsdt: 0.001,
},
l1GasPrice: big.NewInt(10000000000),
l2GasPrice: uint64(33300000000000),
},
{
cfg: Config{
Type: FixedType,
DefaultGasPriceWei: 10,
UpdatePeriod: types.NewDuration(d),
KafkaURL: "127.0.0.1:9092",
Topic: "middle_coinPrice_push",
DefaultL2CoinPrice: 30,
GasPriceUsdt: 1e-15,
},
l1GasPrice: big.NewInt(10000000000),
l2GasPrice: uint64(33),
},
}

for _, tc := range testcases {
ctx := context.Background()
poolM := new(poolMock)
ethM := new(ethermanMock)
ethM.On("GetL1GasPrice", ctx).Return(tc.l1GasPrice).Twice()
poolM.On("SetGasPrices", ctx, tc.l2GasPrice, tc.l1GasPrice.Uint64()).Return(nil).Twice()
f := newFixedGasPriceSuggester(ctx, tc.cfg, poolM, ethM)
f.UpdateGasPriceAvg()
}
}

func TestLimitMasGasPriceFixed(t *testing.T) {
ctx := context.Background()
var d time.Duration = 1000000000

cfg := Config{
Type: FollowerType,
DefaultGasPriceWei: 100000000,
MaxGasPriceWei: 50000000,
UpdatePeriod: types.NewDuration(d),
Factor: 0.5,
}
l1GasPrice := big.NewInt(1000000000)
poolM := new(poolMock)
ethM := new(ethermanMock)
ethM.On("GetL1GasPrice", ctx).Return(l1GasPrice)
// Ensure SetGasPrices is called with the MaxGasPriceWei
poolM.On("SetGasPrices", ctx, cfg.MaxGasPriceWei, l1GasPrice.Uint64()).Return(nil)
f := newFollowerGasPriceSuggester(ctx, cfg, poolM, ethM)
f.UpdateGasPriceAvg()
}
27 changes: 14 additions & 13 deletions gasprice/kafka_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,31 @@ func getKafkaReader(cfg Config) *kafka.Reader {
}

return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: cfg.GroupID,
Topic: cfg.Topic,
MinBytes: 1, // 1
MaxBytes: 10e6, // 10MB
Dialer: dialer,
Brokers: brokers,
GroupID: cfg.GroupID,
Topic: cfg.Topic,
MinBytes: 1, // 1
MaxBytes: 10e6, // 10MB
Dialer: dialer,
StartOffset: kafka.LastOffset, // read data from new message
})
}

func (rp *KafkaProcessor) processor() {
log.Info("follower KafkaProcessor start processor ")
log.Info("kafka processor start processor ")
defer rp.kreader.Close()
for {
select {
case <-rp.ctx.Done():
return
default:
rate, err := rp.ReadAndCalc(rp.ctx)
value, err := rp.ReadAndCalc(rp.ctx)
if err != nil {
log.Warn("follower get okb to eth rate fail ", err)
log.Warn("get the destion data fail ", err)
time.Sleep(time.Second * 10)
continue
}
rp.updateL2CoinPrice(rate)
rp.updateL2CoinPrice(value)
}
}
}
Expand All @@ -166,10 +167,10 @@ func (rp *KafkaProcessor) ReadAndCalc(ctx context.Context) (float64, error) {
return rp.parseL2CoinPrice(m.Value)
}

func (rp *KafkaProcessor) updateL2CoinPrice(rate float64) {
func (rp *KafkaProcessor) updateL2CoinPrice(price float64) {
rp.rwLock.Lock()
defer rp.rwLock.Unlock()
rp.L2Price = rate
rp.L2Price = price
}

func (rp *KafkaProcessor) GetL2CoinPrice() float64 {
Expand All @@ -192,5 +193,5 @@ func (rp *KafkaProcessor) parseL2CoinPrice(value []byte) (float64, error) {
return price.Price, nil
}
}
return 0, fmt.Errorf("not find a correct token price coinId=%f", rp.l2CoinId)
return 0, fmt.Errorf("not find a correct coin price coinId=%v", rp.l2CoinId)
}
57 changes: 57 additions & 0 deletions gasprice/kafka_proc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package gasprice

import (
"context"
"fmt"
"github.com/stretchr/testify/require"
"testing"
)

func TestCalculateRate(t *testing.T) {
testcases := []struct {
l2CoinId int
msg string
check func(rate float64, err error)
}{
{
// error
l2CoinId: okbcoinId,
msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\"}"),
check: func(rate float64, err error) {
require.Error(t, err)
},
},
{
// error
l2CoinId: okbcoinId,
msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", okbcoinId+1),
check: func(rate float64, err error) {
require.Error(t, err)
},
},
{
// correct
l2CoinId: okbcoinId,
msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}, {\"coinId\":%d,\"price\":0.002}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", 1, okbcoinId),
check: func(rate float64, err error) {
require.Equal(t, rate, 0.002)
require.NoError(t, err)
},
},
{
// correct
l2CoinId: okbcoinId,
msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}, {\"coinId\":%d,\"price\":10}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", 1, okbcoinId),
check: func(rate float64, err error) {
require.Equal(t, rate, float64(10))
require.NoError(t, err)
},
},
}

for _, tc := range testcases {
rp := newKafkaProcessor(Config{Topic: "middle_coinPrice_push", L2CoinId: tc.l2CoinId}, context.Background())
rt, err := rp.parseL2CoinPrice([]byte(tc.msg))
tc.check(rt, err)
}
}

0 comments on commit 0f41992

Please sign in to comment.