diff --git a/cmd/ingress-perf.go b/cmd/ingress-perf.go index 2824a40..e72d4be 100644 --- a/cmd/ingress-perf.go +++ b/cmd/ingress-perf.go @@ -17,7 +17,6 @@ package main import ( "fmt" - "github.com/cloud-bulldozer/go-commons/indexers" "github.com/cloud-bulldozer/go-commons/version" "github.com/cloud-bulldozer/ingress-perf/pkg/config" _ "github.com/cloud-bulldozer/ingress-perf/pkg/log" @@ -45,7 +44,7 @@ var versionCmd = &cobra.Command{ func run() *cobra.Command { var cfg, uuid, baseUUID, esServer, esIndex, logLevel, baseIndex, outputDir string - var cleanup bool + var cleanup, podMetrics bool var tolerancy int cmd := &cobra.Command{ Use: "run", @@ -61,37 +60,16 @@ func run() *cobra.Command { return nil }, RunE: func(cmd *cobra.Command, args []string) error { - var indexer *indexers.Indexer - var err error log.Infof("Running ingress-perf (%s@%s) with uuid %s", version.Version, version.GitCommit, uuid) if err := config.Load(cfg); err != nil { return err } - if baseUUID != "" && (tolerancy > 100 || tolerancy < 1) { - return fmt.Errorf("tolerancy is an integer between 1 and 100") - } - if esServer != "" || outputDir != "" { - var indexerCfg indexers.IndexerConfig - if esServer != "" { - log.Infof("Creating %s indexer", indexers.ElasticIndexer) - indexerCfg = indexers.IndexerConfig{ - Type: indexers.ElasticIndexer, - Servers: []string{esServer}, - Index: esIndex, - } - } else if outputDir != "" { - log.Infof("Creating %s indexer", indexers.LocalIndexer) - indexerCfg = indexers.IndexerConfig{ - Type: indexers.LocalIndexer, - MetricsDirectory: outputDir, - } - } - indexer, err = indexers.NewIndexer(indexerCfg) - if err != nil { - return err - } - } - return runner.Start(uuid, baseUUID, baseIndex, tolerancy, indexer, cleanup) + r := runner.New( + uuid, cleanup, + runner.WithComparison(baseUUID, baseIndex, tolerancy), + runner.WithIndexer(esServer, esIndex, outputDir, podMetrics), + ) + return r.Start() }, } cmd.Flags().StringVarP(&cfg, "cfg", "c", "", "Configuration file") @@ -103,6 +81,7 @@ func run() *cobra.Command { cmd.Flags().StringVar(&esIndex, "es-index", "ingress-performance", "Elasticsearch index") cmd.Flags().StringVar(&outputDir, "output-dir", "output", "Store collected metrics in this directory") cmd.Flags().BoolVar(&cleanup, "cleanup", true, "Cleanup benchmark assets") + cmd.Flags().BoolVar(&podMetrics, "pod-metrics", false, "Index per pod metrics") cmd.Flags().StringVar(&logLevel, "loglevel", "info", "Log level. Allowed levels are error, info and debug") cmd.MarkFlagRequired("cfg") return cmd diff --git a/pkg/runner/exec.go b/pkg/runner/exec.go index 2a817dd..c2c3df8 100644 --- a/pkg/runner/exec.go +++ b/pkg/runner/exec.go @@ -38,7 +38,7 @@ import ( var lock = &sync.Mutex{} -func runBenchmark(cfg config.Config, clusterMetadata tools.ClusterMetadata, p *prometheus.Prometheus) ([]tools.Result, error) { +func runBenchmark(cfg config.Config, clusterMetadata tools.ClusterMetadata, p *prometheus.Prometheus, podMetrics bool) ([]tools.Result, error) { var aggAvgRps, aggAvgLatency, aggP95Latency float64 var timeouts, httpErrors int64 var benchmarkResult []tools.Result @@ -101,6 +101,9 @@ func runBenchmark(cfg config.Config, clusterMetadata tools.ClusterMetadata, p *p continue } normalizeResults(&result) + if !podMetrics { + result.Pods = nil + } aggAvgRps += result.TotalAvgRps aggAvgLatency += result.AvgLatency aggP95Latency += result.P95Latency diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 8e8c99e..0d0d3c9 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -50,13 +50,64 @@ var dynamicClient *dynamic.DynamicClient var orClientSet *openshiftrouteclientset.Clientset var currentTuning string -func Start(uuid, baseUUID, baseIndex string, tolerancy int, indexer *indexers.Indexer, cleanupAssets bool) error { - var benchmarkResultDocuments []interface{} +func New(uuid string, cleanup bool, opts ...OptsFunctions) *Runner { + r := &Runner{ + uuid: uuid, + cleanup: cleanup, + } + for _, opts := range opts { + opts(r) + } + return r +} + +func WithComparison(baseUUID, baseIndex string, tolerancy int) OptsFunctions { + return func(r *Runner) { + if baseUUID != "" { + if tolerancy > 100 || tolerancy < 1 { + log.Fatalf("Tolerancy must be an integer between 1 and 100: %d", tolerancy) + } + r.baseUUID = baseUUID + r.tolerancy = tolerancy + r.baseIndex = baseIndex + } + } +} +func WithIndexer(esServer, esIndex, resultsDir string, podMetrics bool) OptsFunctions { + return func(r *Runner) { + if esServer != "" || resultsDir != "" { + var indexerCfg indexers.IndexerConfig + if esServer != "" { + log.Infof("Creating %s indexer", indexers.ElasticIndexer) + indexerCfg = indexers.IndexerConfig{ + Type: indexers.ElasticIndexer, + Servers: []string{esServer}, + Index: esIndex, + } + } else if resultsDir != "" { + log.Infof("Creating %s indexer", indexers.LocalIndexer) + indexerCfg = indexers.IndexerConfig{ + Type: indexers.LocalIndexer, + MetricsDirectory: resultsDir, + } + } + indexer, err := indexers.NewIndexer(indexerCfg) + if err != nil { + log.Fatal(err) + } + r.indexer = indexer + r.podMetrics = podMetrics + } + } +} + +func (r *Runner) Start() error { var err error var kubeconfig string var benchmarkResult []tools.Result var comparator comparison.Comparator var clusterMetadata tools.ClusterMetadata + var benchmarkResultDocuments []interface{} passed := true if os.Getenv("KUBECONFIG") != "" { kubeconfig = os.Getenv("KUBECONFIG") @@ -96,16 +147,16 @@ func Start(uuid, baseUUID, baseIndex string, tolerancy int, indexer *indexers.In } else { log.Infof("HAProxy version: %s", clusterMetadata.HAProxyVersion) } - if indexer != nil { - if _, ok := (*indexer).(*indexers.Elastic); ok { - comparator = comparison.NewComparator(*indexers.ESClient, baseIndex) + if r.indexer != nil { + if _, ok := (*r.indexer).(*indexers.Elastic); ok { + comparator = comparison.NewComparator(*indexers.ESClient, r.baseIndex) } } if err := deployAssets(); err != nil { return err } for i, cfg := range config.Cfg { - cfg.UUID = uuid + cfg.UUID = r.uuid log.Infof("Running test %d/%d", i+1, len(config.Cfg)) log.Infof("Tool:%s termination:%v servers:%d concurrency:%d procs:%d connections:%d duration:%v", cfg.Tool, @@ -125,32 +176,32 @@ func Start(uuid, baseUUID, baseIndex string, tolerancy int, indexer *indexers.In return err } } - if benchmarkResult, err = runBenchmark(cfg, clusterMetadata, p); err != nil { + if benchmarkResult, err = runBenchmark(cfg, clusterMetadata, p, r.podMetrics); err != nil { return err } - if indexer != nil { + if r.indexer != nil { if !cfg.Warmup { for _, res := range benchmarkResult { benchmarkResultDocuments = append(benchmarkResultDocuments, res) } - // When not using the local indexer, clear the documents array revert benchmark - if _, ok := (*indexer).(*indexers.Local); !ok { - if indexDocuments(*indexer, benchmarkResultDocuments, indexers.IndexingOpts{}) != nil { + // When not using local indexer, empty the documents array when all documents after indexing them + if _, ok := (*r.indexer).(*indexers.Local); !ok { + if indexDocuments(*r.indexer, benchmarkResultDocuments, indexers.IndexingOpts{}) != nil { log.Errorf("Indexing error: %v", err.Error()) } benchmarkResultDocuments = []interface{}{} } - if baseUUID != "" { - log.Infof("Comparing total_avg_rps with baseline: %v in index %s", baseUUID, baseIndex) + if r.baseUUID != "" { + log.Infof("Comparing total_avg_rps with baseline: %v in index %s", r.baseUUID, r.baseIndex) var totalAvgRps float64 query := fmt.Sprintf("uuid.keyword: %s AND config.termination.keyword: %s AND config.concurrency: %d AND config.connections: %d AND config.serverReplicas: %d AND config.path.keyword: \\%s", - baseUUID, cfg.Termination, cfg.Concurrency, cfg.Connections, cfg.ServerReplicas, cfg.Path) + r.baseUUID, cfg.Termination, cfg.Concurrency, cfg.Connections, cfg.ServerReplicas, cfg.Path) log.Debugf("Query: %s", query) for _, b := range benchmarkResult { totalAvgRps += b.TotalAvgRps } totalAvgRps = totalAvgRps / float64(len(benchmarkResult)) - msg, err := comparator.Compare("total_avg_rps", query, comparison.Avg, totalAvgRps, tolerancy) + msg, err := comparator.Compare("total_avg_rps", query, comparison.Avg, totalAvgRps, r.tolerancy) if err != nil { log.Error(err.Error()) passed = false @@ -163,12 +214,12 @@ func Start(uuid, baseUUID, baseIndex string, tolerancy int, indexer *indexers.In } } } - if indexer != nil { - if err := indexDocuments(*indexer, benchmarkResultDocuments, indexers.IndexingOpts{MetricName: uuid}); err != nil { + if r.indexer != nil { + if err := indexDocuments(*r.indexer, benchmarkResultDocuments, indexers.IndexingOpts{MetricName: r.uuid}); err != nil { log.Errorf("Indexing error: %v", err.Error()) } } - if cleanupAssets { + if r.cleanup { if cleanup(10*time.Minute) != nil { return err } diff --git a/pkg/runner/tools/types.go b/pkg/runner/tools/types.go index 36b7b12..6124fa1 100644 --- a/pkg/runner/tools/types.go +++ b/pkg/runner/tools/types.go @@ -55,7 +55,7 @@ type Result struct { UUID string `json:"uuid"` Sample int `json:"sample"` Config config.Config `json:"config"` - Pods []PodResult `json:"pods"` + Pods []PodResult `json:"pods,omitempty"` Timestamp time.Time `json:"timestamp"` TotalAvgRps float64 `json:"total_avg_rps"` StdevRps float64 `json:"rps_stdev"` diff --git a/pkg/runner/types.go b/pkg/runner/types.go index 4e160c0..9b5a480 100644 --- a/pkg/runner/types.go +++ b/pkg/runner/types.go @@ -17,6 +17,7 @@ package runner import ( "fmt" + "github.com/cloud-bulldozer/go-commons/indexers" routev1 "github.com/openshift/api/route/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -34,6 +35,18 @@ const ( clientName = "ingress-perf-client" ) +type Runner struct { + uuid string + baseUUID string + baseIndex string + tolerancy int + indexer *indexers.Indexer + podMetrics bool + cleanup bool +} + +type OptsFunctions func(r *Runner) + var workerAffinity = &corev1.Affinity{ NodeAffinity: &corev1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{