opentelemetryCNCFCollector

¶核心

Collector
ReceiversProcessorsExporters
pipelinesextensionscollector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
otlp:
endpoint: otelcol:55680

extensions:
health_check:
pprof:
zpages:

service:
extensions: [health_check,pprof,zpages]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
logs:
receivers: [otlp]
processors: [batch]
exporters: [otlp]

通过配置,我们可以初窥之。

¶源码探路

opentelemetry 下称之为 OT

¶Component

OTComponent interface
1
2
3
4
type Component interface {
Start(ctx context.Context, host Host) error
Shutdown(ctx context.Context) error
}
ComponentReceiver
1
2
3
type Receiver interface {
Component
}
ReceiverTracesReceiverMetricsReceiverLogsReceiverOOP

¶Service

pipelinespipelinesserviceservice
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (srv *service) buildPipelines() error {

// 先准备好 Exporters
var err error
srv.builtExporters, err = builder.BuildExporters(srv.logger, srv.startInfo, srv.config, srv.factories.Exporters)
if err != nil {
return fmt.Errorf("cannot build builtExporters: %w", err)
}

// 创建 Pipeline 使用 Service 中包含的 Processer
srv.builtPipelines, err = builder.BuildPipelines(srv.logger, srv.startInfo, srv.config, srv.builtExporters, srv.factories.Processors)
if err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)
}

// 最终创建 Reciver 然后插入 Pipeline 中
srv.builtReceivers, err = builder.BuildReceivers(srv.logger, srv.startInfo, srv.config, srv.builtPipelines, srv.factories.Receivers)
if err != nil {
return fmt.Errorf("cannot build receivers: %w", err)
}

return nil
}
Start
1
2
3
4
5
6
7
8
9
10
11
func (srv *service) Start(ctx context.Context) error {
if err := srv.startExtensions(ctx); err != nil {
return fmt.Errorf("cannot setup extensions: %w", err)
}

if err := srv.startPipelines(ctx); err != nil {
return fmt.Errorf("cannot setup pipelines: %w", err)
}

return srv.builtExtensions.NotifyPipelineReady()
}
BuildExporters
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func BuildExporters(logger *zap.Logger,appInfo component.ApplicationStartInfo,config *configmodels.Config,factories map[configmodels.Type]component.ExporterFactory) (Exporters, error) {
eb := &exportersBuilder{logger.With(zap.String(kindLogKey, kindLogsExporter)), appInfo, config, factories}

exporterInputDataTypes := eb.calcExportersRequiredDataTypes()

exporters := make(Exporters)
// 以配置文件的方式循环
for _, cfg := range eb.config.Exporters {
componentLogger := eb.logger.With(zap.String(typeLogKey, string(cfg.Type())), zap.String(nameLogKey, cfg.Name()))
// 按照配置文件进行 Export 的构建
exp, err := eb.buildExporter(context.Background(), componentLogger, eb.appInfo, cfg, exporterInputDataTypes)
if err != nil {
return nil, err
}

exporters[cfg] = exp
}

return exporters, nil
}
ExportCreateTracesExporter/CreateMetricsExporter/CreateLogsExporter
CreateTracesExporterfactory
1
2
3
4
5
6
7
8
9
func (f *factory) CreateTracesExporter(
ctx context.Context,
params component.ExporterCreateParams,
cfg configmodels.Exporter) (component.TracesExporter, error) {
if f.createTracesExporter != nil {
return f.createTracesExporter(ctx, params, cfg) // 调用 f 的内部函数
}
return nil, configerror.ErrDataTypeIsNotSupported
}
Exporterfactory
1
2
3
4
5
func WithTraces(createTraceExporter CreateTracesExporter) FactoryOption {
return func(o *factory) {
o.createTracesExporter = createTraceExporter
}
}
factoreyExport
1
2
3
4
5
6
func NewFactory() component.ExporterFactory {
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithTraces(createTraceExporter))
}
Factory
1
2
3
4
5
6
7
8
9
10
receivers, err := component.MakeReceiverFactoryMap(
jaegerreceiver.NewFactory(),
fluentforwardreceiver.NewFactory(),
zipkinreceiver.NewFactory(),
prometheusreceiver.NewFactory(),
opencensusreceiver.NewFactory(),
otlpreceiver.NewFactory(),
hostmetricsreceiver.NewFactory(),
kafkareceiver.NewFactory(),
)

不仅仅只支持单入口的,我们也可以支持多入口的

1
2
3
4
5
6
service:
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [memory_limiter, batch]
exporters: [otlp, jaeger, zipkin]
service

¶Receiver

ReceiverZipkinReceiver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (zr *ZipkinReceiver) Start(_ context.Context, host component.Host) error {
zr.startOnce.Do(func() {
err = nil
zr.host = host
zr.server = zr.config.HTTPServerSettings.ToServer(zr)
var listener net.Listener
listener, err = zr.config.HTTPServerSettings.ToListener()
if err != nil {
return
}
zr.shutdownWG.Add(1)
go func() {
defer zr.shutdownWG.Done()

if errHTTP := zr.server.Serve(listener); errHTTP != http.ErrServerClosed {
host.ReportFatalError(errHTTP)
}
}()
})

return err
}
HTTPServeHTTP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 这里需要处理 gzip 之类的
pr := processBodyIfNecessary(r)
slurp, _ := ioutil.ReadAll(pr)
if c, ok := pr.(io.Closer); ok {
_ = c.Close()
}
_ = r.Body.Close()

var td pdata.Traces
var err error
if asZipkinv1 {
// 处理 v1
td, err = zr.v1ToTraceSpans(slurp, r.Header)
} else {
// 处理 v2
td, err = zr.v2ToTraceSpans(slurp, r.Header)
}
// 这里的 nextConsumer 就是 pipeline 的下一跳
consumerErr := zr.nextConsumer.ConsumeTraces(ctx, td)

// 处理结束
obsreport.EndTraceDataReceiveOp(ctx, receiverTagValue, td.SpanCount(), consumerErr)

// 返回 HTTP REPSONE 略
}
ConsumeTraces
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (rb *receiversBuilder) attachReceiverToPipelines(){
case configmodels.TracesDataType:
junction := buildFanoutTraceConsumer(builtPipelines)
createdReceiver, err = factory.CreateTracesReceiver(ctx, creationParams, config, junction)
}


func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.Traces {
if len(pipelines) == 1 {
return pipelines[0].firstTC
}

var pipelineConsumers []consumer.Traces
anyPipelineMutatesData := false
for _, pipeline := range pipelines {
pipelineConsumers = append(pipelineConsumers, pipeline.firstTC)
anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesConsumedData
}

// Create a junction point that fans out to all pipelines.
if anyPipelineMutatesData {
return fanoutconsumer.NewTracesCloning(pipelineConsumers)
}
return fanoutconsumer.NewTraces(pipelineConsumers)
}

¶Exporter

ExporterJaegerExporter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *protoGRPCSender) pushTraceData(ctx context.Context, td pdata.Traces,) error {
batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
if err != nil {
return consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err))
}

if s.metadata.Len() > 0 {
ctx = metadata.NewOutgoingContext(ctx, s.metadata)
}

for _, batch := range batches {
_, err = s.client.PostSpans(
ctx,
&jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady))

if err != nil {
s.logger.Debug("failed to push trace data to Jaeger", zap.Error(err))
return fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err)
}
}

return nil
}
clientjaegerproto.CollectorServiceClientABI
1
2
3
4
type Traces interface {
// ConsumeTraces receives pdata.Traces for consumption.
ConsumeTraces(ctx context.Context, td pdata.Traces) error
}

因此我们发现,其实这几个组件都是实现了此接口。