Skip to content

Commit

Permalink
Merge branch 'main' into fix-goroutine-leaks-in-grpc-integration-tests
Browse files Browse the repository at this point in the history
Signed-off-by: Will Sewell <[email protected]>
  • Loading branch information
WillSewell committed Apr 11, 2024
2 parents 74d4c6e + 4681e36 commit a1b106f
Show file tree
Hide file tree
Showing 49 changed files with 799 additions and 1,041 deletions.
1 change: 0 additions & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ ignore:
- "thrift-gen/*/*"
- "**/thrift-0.9.2/*"
- "**/main.go"
- "cmd/jaeger/internal/integration/datareceivers"
- "examples/hotrod"

coverage:
Expand Down
7 changes: 5 additions & 2 deletions .github/workflows/ci-grpc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ jobs:
run: |
case ${{ matrix.version }} in
v1)
make grpc-storage-integration-test
SPAN_STORAGE_TYPE=memory \
make grpc-storage-integration-test
;;
v2)
bash scripts/grpc-integration-test.sh latest
STORAGE=grpc \
SPAN_STORAGE_TYPE=memory \
make jaeger-v2-storage-integration-test
;;
esac
Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ all-in-one-integration-test:
# The integration tests are filtered by STORAGE env,
# currently the available STORAGE variable is:
# - grpc
.PHONY: jaeger-storage-integration-test
jaeger-storage-integration-test:
.PHONY: jaeger-v2-storage-integration-test
jaeger-v2-storage-integration-test:
(cd cmd/jaeger/ && go build .)
# Expire tests results for jaeger storage integration tests since the environment might change
# even though the code remains the same.
go clean -testcache
Expand Down
7 changes: 6 additions & 1 deletion cmd/agent/app/configmanager/grpc/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package grpc
import (
"context"
"errors"
"fmt"

"google.golang.org/grpc"

Expand All @@ -38,7 +39,11 @@ func NewConfigManager(conn *grpc.ClientConn) *ConfigManagerProxy {

// GetSamplingStrategy returns sampling strategies from collector.
func (s *ConfigManagerProxy) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
return s.client.GetSamplingStrategy(ctx, &api_v2.SamplingStrategyParameters{ServiceName: serviceName})
resp, err := s.client.GetSamplingStrategy(ctx, &api_v2.SamplingStrategyParameters{ServiceName: serviceName})
if err != nil {
return nil, fmt.Errorf("failed to get sampling strategy: %w", err)
}
return resp, nil
}

// GetBaggageRestrictions returns baggage restrictions from collector.
Expand Down
6 changes: 3 additions & 3 deletions cmd/agent/app/configmanager/grpc/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestSamplingManager_GetSamplingStrategy(t *testing.T) {
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{})
})
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
defer close(t, conn)
require.NoError(t, err)
defer s.GracefulStop()
Expand All @@ -48,14 +48,14 @@ func TestSamplingManager_GetSamplingStrategy(t *testing.T) {
}

func TestSamplingManager_GetSamplingStrategy_error(t *testing.T) {
conn, err := grpc.Dial("foo", grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient("foo", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer close(t, conn)
require.NoError(t, err)
manager := NewConfigManager(conn)
resp, err := manager.GetSamplingStrategy(context.Background(), "any")
require.Nil(t, resp)
require.Error(t, err)
assert.Contains(t, err.Error(), "Error while dialing: dial tcp: address foo: missing port in address")
assert.Contains(t, err.Error(), "failed to get sampling strategy")
}

func TestSamplingManager_GetBaggageRestrictions(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/processors/thrift_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func createProcessor(t *testing.T, mFactory metrics.Factory, tFactory thrift.TPr

func initCollectorAndReporter(t *testing.T) (*metricstest.Factory, *testutils.GrpcCollector, reporter.Reporter, *grpc.ClientConn) {
grpcCollector := testutils.StartGRPCCollector(t)
conn, err := grpc.Dial(grpcCollector.Listener().Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(grpcCollector.Listener().Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
rep := grpcrep.NewReporter(conn, map[string]string{}, zaptest.NewLogger(t))
metricsFactory := metricstest.NewFactory(0)
Expand Down
1 change: 1 addition & 0 deletions cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (b *ConnBuilder) CreateConnection(ctx context.Context, logger *zap.Logger,
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(b.MaxRetry))))
dialOptions = append(dialOptions, b.AdditionalDialOptions...)

// TODO: Need to replace grpc.Dial with grpc.NewClient and pass test
conn, err := grpc.Dial(dialTarget, dialOptions...)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions cmd/agent/app/reporter/grpc/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer s.Stop()
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
//nolint:staticcheck // don't care about errors
require.NoError(t, err)
defer conn.Close()
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestReporter_EmitBatch(t *testing.T) {
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer s.Stop()
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
//nolint:staticcheck // don't care about errors
require.NoError(t, err)
defer conn.Close()
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestReporter_EmitBatch(t *testing.T) {
}

func TestReporter_SendFailure(t *testing.T) {
conn, err := grpc.Dial("invalid-host-name-blah:12345", grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient("invalid-host-name-blah:12345", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
rep := NewReporter(conn, nil, zap.NewNop())
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestReporter_MultitenantEmitBatch(t *testing.T) {
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer s.Stop()
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer func() { require.NoError(t, conn.Close()) }()
rep := NewReporter(conn, nil, zap.NewNop())
Expand Down
6 changes: 1 addition & 5 deletions cmd/anonymizer/app/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"io"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -38,10 +37,7 @@ type Query struct {

// New creates a Query object
func New(addr string) (*Query, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to connect with the jaeger-query service: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func initializeGRPCTestServer(t *testing.T, beforeServe func(s *grpc.Server)) (*
}

func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grpc.ClientConn) {
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
return api_v2.NewCollectorServiceClient(conn), conn
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func applyHTTPSettings(cfg *confighttp.ServerConfig, opts *flags.HTTPOptions) {
}
}

func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting {
return &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
func applyTLSSettings(opts *tlscfg.Options) *configtls.ServerConfig {
return &configtls.ServerConfig{
Config: configtls.Config{
CAFile: opts.CAPath,
CertFile: opts.CertPath,
KeyFile: opts.KeyPath,
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/server/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestSpanCollector(t *testing.T) {
require.NoError(t, err)
defer server.Stop()

conn, err := grpc.Dial(
conn, err := grpc.NewClient(
params.HostPortActual,
grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
Expand Down
4 changes: 0 additions & 4 deletions cmd/jaeger/grpc_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@ service:
extensions:
jaeger_query:
trace_storage: external-storage
trace_storage_archive: external-storage-archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
grpc:
external-storage:
server: localhost:17271
connection-timeout: 5s
external-storage-archive:
server: localhost:17281
connection-timeout: 5s

receivers:
otlp:
Expand Down
43 changes: 29 additions & 14 deletions cmd/jaeger/internal/integration/README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,42 @@
# Integration

Jaeger v2 integration tests are built on top of [OTEL Testbed module](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/testbed). OTEL Testbed provide comprehensive tools for conducting end-to-end tests for the OTEL Collector, such as reproducible short-term benchmarks, correctness tests, long-running stability tests and maximum load stress tests. However, we only utilize the correctness tests from testbed, it generates and sends every combinatorial trace attributes and matches every single of them with the received traces from another end. To learn more about OTEL Testbed, please refer to the their [README](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/testbed/README.md).
The Jaeger v2 integration test is an extension of the existing `integration.StorageIntegration` designed to test the Jaeger-v2 OtelCol binary; currently, it only tests the span store. The existing tests at `plugin/storage/integration` (also called "unit mode") test by writing and reading span data directly to the storage API. In contrast, these tests (or "e2e mode") read and write span data through the RPC client to the Jaeger-v2 OtelCol binary. E2E mode tests read from the jaeger_query extension and write to the receiver in OTLP formats. For details, see the [Architecture](#architecture) section below.

## Architecture

Here's the architecture to test the OpenTelemetry Collector pipeline from end-to-end with the designated storage backends.
![integration diagram](integration-diagram.png)
```mermaid
flowchart LR
Test -->|writeSpan| SpanWriter
SpanWriter --> RPCW[RPC_client]
RPCW --> Receiver
Receiver --> Exporter
Exporter --> B(StorageBackend)
Test -->|readSpan| SpanReader
SpanReader --> RPCR[RPC_client]
RPCR --> jaeger_query
jaeger_query --> B
Testbed components:
| Component | Description |
|-----------|-------------|
| **LoadGenerator** | Encapsulates DataProvider and DataSender in order to generate and send data. |
| Golden DataProvider | Generates traces from the "Golden" dataset generated using pairwise combinatorial testing techniques. Testbed example uses [PICT](https://github.com/microsoft/pict/) to generate the test data, e.g. [testdata](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/coreinternal/goldendataset/testdata). |
| OTLP Trace DataSender | With the generated traces from DataProvider, the DataSender sends traces to OTLP receiver in the collector instance. |
| **Mockbackend** | Encapsulates DataReceiver and provides consume functionality. |
| DataReceiver | A custom DataReceiver that will host a Jaeger storage extension to retrieve traces from the database by pulling them using our artificial Jaeger storage receiver. |
| Consumer | Consumer does not actually a thing in MockBackend but only to make the diagram intuitive, the traces received from our artificial receiver will be stored inside MockBackend. |
| **Correctness Test Validator** | Checks if the traces received from MockBackend are all matches with the generated traces from DataProvider. |
subgraph Integration Test Executable
Test
SpanWriter
SpanReader
RPCW
RPCR
end
subgraph jaeger-v2
Receiver
Exporter
jaeger_query
end
```

## gRPC Integration Test

To conduct the tests, run the following command:

```
scripts/grpc-integration-test.sh <remote_storage_image_version>
STORAGE=grpc \
SPAN_STORAGE_TYPE=memory \
make jaeger-v2-storage-integration-test
```
1 change: 0 additions & 1 deletion cmd/jaeger/internal/integration/datareceivers/.nocover

This file was deleted.

97 changes: 0 additions & 97 deletions cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go

This file was deleted.

Loading

0 comments on commit a1b106f

Please sign in to comment.