Skip to content

Commit

Permalink
Add integration test cases that enable congestion control
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronbee committed Sep 3, 2024
1 parent 992a1b1 commit 8b41182
Showing 1 changed file with 82 additions and 65 deletions.
147 changes: 82 additions & 65 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,79 +415,96 @@ func TestPutMultipleCells(t *testing.T) {
}

func TestMultiplePutsGetsSequentially(t *testing.T) {
const num_ops = 100
keyPrefix := "row3"
headers := map[string][]string{"cf": nil}
c := gohbase.NewClient(*host, gohbase.FlushInterval(time.Millisecond))
defer c.Close()
err := performNPuts(keyPrefix, num_ops)
if err != nil {
t.Errorf("Put returned an error: %v", err)
}
for i := num_ops - 1; i >= 0; i-- {
key := keyPrefix + fmt.Sprintf("%d", i)
get, err := hrpc.NewGetStr(context.Background(), table, key, hrpc.Families(headers))
rsp, err := c.Get(get)
if err != nil {
t.Errorf("Get returned an error: %v", err)
}
if len(rsp.Cells) != 1 {
t.Errorf("Incorrect number of cells returned by Get: %d", len(rsp.Cells))
}
rsp_value := rsp.Cells[0].Value
if !bytes.Equal(rsp_value, []byte(fmt.Sprintf("%d", i))) {
t.Errorf("Get returned an incorrect result. Expected: %v, Got: %v",
[]byte(fmt.Sprintf("%d", i)), rsp_value)
}
for name, opts := range map[string][]gohbase.Option{
"no_congestion_control": nil,
"window20": {gohbase.CongestionControl(1, 20)},
"window100": {gohbase.CongestionControl(1, 100)},
} {
t.Run(name, func(t *testing.T) {
const num_ops = 100
keyPrefix := "row3"
headers := map[string][]string{"cf": nil}
c := gohbase.NewClient(*host, append(opts, gohbase.FlushInterval(time.Millisecond))...)
defer c.Close()
err := performNPuts(keyPrefix, num_ops)
if err != nil {
t.Errorf("Put returned an error: %v", err)
}
for i := num_ops - 1; i >= 0; i-- {
key := keyPrefix + fmt.Sprintf("%d", i)
get, err := hrpc.NewGetStr(context.Background(), table, key, hrpc.Families(headers))
rsp, err := c.Get(get)
if err != nil {
t.Errorf("Get returned an error: %v", err)
}
if len(rsp.Cells) != 1 {
t.Errorf("Incorrect number of cells returned by Get: %d", len(rsp.Cells))
}
rsp_value := rsp.Cells[0].Value
if !bytes.Equal(rsp_value, []byte(fmt.Sprintf("%d", i))) {
t.Errorf("Get returned an incorrect result. Expected: %v, Got: %v",
[]byte(fmt.Sprintf("%d", i)), rsp_value)
}
}
})
}
}

func TestMultiplePutsGetsParallel(t *testing.T) {
c := gohbase.NewClient(*host)
defer c.Close()

const n = 1000
var wg sync.WaitGroup
for i := 0; i < n; i++ {
key := fmt.Sprintf("%s_%d", t.Name(), i)
wg.Add(1)
go func() {
if err := insertKeyValue(c, key, "cf", []byte(key)); err != nil {
t.Error(key, err)
}
wg.Done()
}()
}
wg.Wait()
for name, opts := range map[string][]gohbase.Option{
"no_congestion_control": nil,
"window20": {gohbase.CongestionControl(1, 20)},
"window1000": {gohbase.CongestionControl(1, 1000)},
} {
t.Run(name, func(t *testing.T) {
c := gohbase.NewClient(*host, opts...)
defer c.Close()

// All puts are complete. Now do the same for gets.
headers := map[string][]string{"cf": []string{"a"}}
for i := n - 1; i >= 0; i-- {
key := fmt.Sprintf("%s_%d", t.Name(), i)
wg.Add(1)
go func() {
defer wg.Done()
get, err := hrpc.NewGetStr(context.Background(), table, key, hrpc.Families(headers))
if err != nil {
t.Error(key, err)
return
const n = 1000
var wg sync.WaitGroup
for i := 0; i < n; i++ {
key := fmt.Sprintf("%s_%d", t.Name(), i)
wg.Add(1)
go func() {
if err := insertKeyValue(c, key, "cf", []byte(key)); err != nil {
t.Error(key, err)
}
wg.Done()
}()
}
rsp, err := c.Get(get)
if err != nil {
t.Error(key, err)
return
wg.Wait()

// All puts are complete. Now do the same for gets.
headers := map[string][]string{"cf": []string{"a"}}
for i := n - 1; i >= 0; i-- {
key := fmt.Sprintf("%s_%d", t.Name(), i)
wg.Add(1)
go func() {
defer wg.Done()
get, err := hrpc.NewGetStr(context.Background(), table, key,
hrpc.Families(headers))
if err != nil {
t.Error(key, err)
return
}
rsp, err := c.Get(get)
if err != nil {
t.Error(key, err)
return
}
if len(rsp.Cells) == 0 {
t.Error(key, " got zero cells")
return
}
rsp_value := rsp.Cells[0].Value
if !bytes.Equal(rsp_value, []byte(key)) {
t.Errorf("expected %q, got %q", key, rsp_value)
}
}()
}
if len(rsp.Cells) == 0 {
t.Error(key, " got zero cells")
return
}
rsp_value := rsp.Cells[0].Value
if !bytes.Equal(rsp_value, []byte(key)) {
t.Errorf("expected %q, got %q", key, rsp_value)
}
}()
wg.Wait()
})
}
wg.Wait()
}

func TestTimestampIncreasing(t *testing.T) {
Expand Down

0 comments on commit 8b41182

Please sign in to comment.