From 7da7181a7399185322ad437274fa21a5b0fbf906 Mon Sep 17 00:00:00 2001 From: Aaron Beitch Date: Fri, 30 Aug 2024 14:59:19 -0700 Subject: [PATCH] Add integration test cases that enable congestion control --- integration_test.go | 147 ++++++++++++++++++++++++-------------------- 1 file changed, 82 insertions(+), 65 deletions(-) diff --git a/integration_test.go b/integration_test.go index 9d48a9d..acf2978 100644 --- a/integration_test.go +++ b/integration_test.go @@ -415,79 +415,96 @@ func TestPutMultipleCells(t *testing.T) { } func TestMultiplePutsGetsSequentially(t *testing.T) { - const num_ops = 100 - keyPrefix := "row3" - headers := map[string][]string{"cf": nil} - c := gohbase.NewClient(*host, gohbase.FlushInterval(time.Millisecond)) - defer c.Close() - err := performNPuts(keyPrefix, num_ops) - if err != nil { - t.Errorf("Put returned an error: %v", err) - } - for i := num_ops - 1; i >= 0; i-- { - key := keyPrefix + fmt.Sprintf("%d", i) - get, err := hrpc.NewGetStr(context.Background(), table, key, hrpc.Families(headers)) - rsp, err := c.Get(get) - if err != nil { - t.Errorf("Get returned an error: %v", err) - } - if len(rsp.Cells) != 1 { - t.Errorf("Incorrect number of cells returned by Get: %d", len(rsp.Cells)) - } - rsp_value := rsp.Cells[0].Value - if !bytes.Equal(rsp_value, []byte(fmt.Sprintf("%d", i))) { - t.Errorf("Get returned an incorrect result. Expected: %v, Got: %v", - []byte(fmt.Sprintf("%d", i)), rsp_value) - } + for name, opts := range map[string][]gohbase.Option{ + "no_congestion_control": nil, + "window20": {gohbase.CongestionControl(1, 20)}, + "window100": {gohbase.CongestionControl(1, 100)}, + } { + t.Run(name, func(t *testing.T) { + const num_ops = 100 + keyPrefix := "row3" + headers := map[string][]string{"cf": nil} + c := gohbase.NewClient(*host, append(opts, gohbase.FlushInterval(time.Millisecond))...) + defer c.Close() + err := performNPuts(keyPrefix, num_ops) + if err != nil { + t.Errorf("Put returned an error: %v", err) + } + for i := num_ops - 1; i >= 0; i-- { + key := keyPrefix + fmt.Sprintf("%d", i) + get, err := hrpc.NewGetStr(context.Background(), table, key, hrpc.Families(headers)) + rsp, err := c.Get(get) + if err != nil { + t.Errorf("Get returned an error: %v", err) + } + if len(rsp.Cells) != 1 { + t.Errorf("Incorrect number of cells returned by Get: %d", len(rsp.Cells)) + } + rsp_value := rsp.Cells[0].Value + if !bytes.Equal(rsp_value, []byte(fmt.Sprintf("%d", i))) { + t.Errorf("Get returned an incorrect result. Expected: %v, Got: %v", + []byte(fmt.Sprintf("%d", i)), rsp_value) + } + } + }) } } func TestMultiplePutsGetsParallel(t *testing.T) { - c := gohbase.NewClient(*host) - defer c.Close() - - const n = 1000 - var wg sync.WaitGroup - for i := 0; i < n; i++ { - key := fmt.Sprintf("%s_%d", t.Name(), i) - wg.Add(1) - go func() { - if err := insertKeyValue(c, key, "cf", []byte(key)); err != nil { - t.Error(key, err) - } - wg.Done() - }() - } - wg.Wait() + for name, opts := range map[string][]gohbase.Option{ + "no_congestion_control": nil, + "window20": {gohbase.CongestionControl(1, 20)}, + "window1000": {gohbase.CongestionControl(1, 1000)}, + } { + t.Run(name, func(t *testing.T) { + c := gohbase.NewClient(*host, opts...) + defer c.Close() - // All puts are complete. Now do the same for gets. - headers := map[string][]string{"cf": []string{"a"}} - for i := n - 1; i >= 0; i-- { - key := fmt.Sprintf("%s_%d", t.Name(), i) - wg.Add(1) - go func() { - defer wg.Done() - get, err := hrpc.NewGetStr(context.Background(), table, key, hrpc.Families(headers)) - if err != nil { - t.Error(key, err) - return + const n = 1000 + var wg sync.WaitGroup + for i := 0; i < n; i++ { + key := fmt.Sprintf("%s_%d", t.Name(), i) + wg.Add(1) + go func() { + if err := insertKeyValue(c, key, "cf", []byte(key)); err != nil { + t.Error(key, err) + } + wg.Done() + }() } - rsp, err := c.Get(get) - if err != nil { - t.Error(key, err) - return + wg.Wait() + + // All puts are complete. Now do the same for gets. + headers := map[string][]string{"cf": []string{"a"}} + for i := n - 1; i >= 0; i-- { + key := fmt.Sprintf("%s_%d", t.Name(), i) + wg.Add(1) + go func() { + defer wg.Done() + get, err := hrpc.NewGetStr(context.Background(), table, key, + hrpc.Families(headers)) + if err != nil { + t.Error(key, err) + return + } + rsp, err := c.Get(get) + if err != nil { + t.Error(key, err) + return + } + if len(rsp.Cells) == 0 { + t.Error(key, " got zero cells") + return + } + rsp_value := rsp.Cells[0].Value + if !bytes.Equal(rsp_value, []byte(key)) { + t.Errorf("expected %q, got %q", key, rsp_value) + } + }() } - if len(rsp.Cells) == 0 { - t.Error(key, " got zero cells") - return - } - rsp_value := rsp.Cells[0].Value - if !bytes.Equal(rsp_value, []byte(key)) { - t.Errorf("expected %q, got %q", key, rsp_value) - } - }() + wg.Wait() + }) } - wg.Wait() } func TestTimestampIncreasing(t *testing.T) {