skywalking后端处理业务逻辑的梳理-风君雪科技博客

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 接下来会进入下面的代理处理TraceSegment

接下来会调用到TraceAnalyzer的doAnalysis方法,代码如下

public void doAnalysis(SegmentObject segmentObject) {
        if (segmentObject.getSpansList().size() == 0) {
            return;
        }

        createSpanListeners();

        try {
            notifySegmentListener(segmentObject);

            segmentObject.getSpansList().forEach(spanObject -> {
                if (spanObject.getSpanId() == 0) {
                    notifyFirstListener(spanObject, segmentObject);
                }

                if (SpanType.Exit.equals(spanObject.getSpanType())) {
                    notifyExitListener(spanObject, segmentObject);
                } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                    notifyEntryListener(spanObject, segmentObject);
                } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                    notifyLocalListener(spanObject, segmentObject);
                } else {
                    log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                              .name());
                }
            });

            notifyListenerToBuild();
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        }
    }

我们首先来看方法的第一个业务逻辑createSpanListeners(),改方法主要产生处于TraceSegment。以及各类span的listener,默认情况下产生的listener情况如下

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 默认情况下产生上面三个listener

1、SegmentAnalysisListener主要是处理TraceSegment,将整个TraceSegment转行成segement对象存储到数据库中

2、MultiScopesAnalysisListener主要用来处理TraceSegment中的span,依据span的不同类型进行单独处理,然后依据不同的span类型产生不同的指标信息

这是收到的skywalking的出

skywalking后端oap集群收到TraceSegment的处理信息之后,会进行处理,这里处理会对TraceSegment中保存的span进行分类,span分为下面的几种类型

enum Point {
        Entry, Exit, Local, First, Segment
    }

每一种类型会对应一个listen处理器与之对应,经过处理器处理之后,在延伸到后续的流程和操作

五个listen的类型如下面所示

skywalking后端处理业务逻辑的梳理-风君雪科技博客

代码中创建了三个listener情况如下

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 接下来我们分析完成了createSpanListeners();接下来是执行

notifySegmentListener(segmentObject);我们来看下改方法是如何进行处理的

 private void notifySegmentListener(SegmentObject segmentObject) {
        analysisListeners.forEach(listener -> {
            if (listener.containsPoint(AnalysisListener.Point.Segment)) {
                ((SegmentListener) listener).parseSegment(segmentObject);
            }能够
        });
    }

在上面创建的三个listener中只有SegmentAnalysisListener能够处理TraceSegment,我们来看下后面的代码

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 @Override
    public void parseSegment(SegmentObject segmentObject) {
        segment.setTraceId(segmentObject.getTraceId());
        segmentObject.getSpansList().forEach(span -> {
            if (startTimestamp == 0 || startTimestamp > span.getStartTime()) {
                startTimestamp = span.getStartTime();
            }
            if (span.getEndTime() > endTimestamp) {
                endTimestamp = span.getEndTime();
            }
            isError = isError || segmentStatusAnalyzer.isError(span);
            appendSearchableTags(span);
        });
        final long accurateDuration = endTimestamp - startTimestamp;
        duration = accurateDuration > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) accurateDuration;

        if (sampleStatus.equals(SAMPLE_STATUS.UNKNOWN) || sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
            if (sampler.shouldSample(segmentObject.getTraceId())) {
                sampleStatus = SAMPLE_STATUS.SAMPLED;
            } else if (isError && forceSampleErrorSegment) {
                sampleStatus = SAMPLE_STATUS.SAMPLED;
            } else if (traceLatencyThresholdsAndWatcher.shouldSample(duration)) {
                sampleStatus = SAMPLE_STATUS.SAMPLED;
            } else {
                sampleStatus = SAMPLE_STATUS.IGNORE;
            }
        }
    }

我们来重点看下SegmentAnalysisListener下的parseSegment的方法的流程,方法传入的参数segmentObject的结构体如下

traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
traceSegmentId: "ec5de79a4a8848e88146ebf696d157dc.73.16092665874590000"
spans {
  parentSpanId: -1
  startTime: 1609266587459
  endTime: 1609266589471
  refs {
    traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
    parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380004"
    parentSpanId: 1
    parentService: "DEMO-webapp"
    parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
    parentEndpoint: "{GET}/hello/{words}"
    networkAddressUsedAtPeer: "169.254.122.166:20880"
  }
  operationName: "com.xxx.HelloService.say(String)"
  spanLayer: RPCFramework
  componentId: 3
  tags {
    key: "url"
    value: "dubbo://169.254.122.166:20880/com.xxx.HelloService.say(String)"
  }
}
service: "DEMO-provider"
serviceInstance: "c5ec906d3dd24d0b8907fa8549c8306b@192.168.43.80"

这里首先处理的是dubbo-provider上传的TraceSegment,产生的是一个EntrySpan,改EntrySpan中管理了dubbo-webapp的TraceSegment

 skywalking后端处理业务逻辑的梳理-风君雪科技博客

 skywalking后端处理业务逻辑的梳理-风君雪科技博客

 这里在处理的时候如果当前的TraceSegment中存在5个span,那么当前TraceSegment的开始时间等于5个span中最小的starttime时间,最大时间为5个span中最大的endTime的时间,如果当前的TraceSegment中只要存在一个span的熟悉中为iserror,则改TraceSegment就为false

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 这个代码是判断当前的TraceSegment是否被采用处理了,如果被采样了就可以存储到数据库中,如果没有采用改TraceSegment就丢弃不用做任何处理

 notifySegmentListener(segmentObject);本质上就是将后端上传的TraceSegment转化成一个segment对象

接下来继续执行

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 取出当前TraceSegment中的span列表,然后对没有span进行判断,判断当前的span是EntrySpan、localspan、exitspan,依据不同的span传递给不同的listener进行处理

这里当前的TraceSegment中只有一个span

span的对象为

parentSpanId: -1
startTime: 1609266587459
endTime: 1609266589471
refs {
  traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
  parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380004"
  parentSpanId: 1
  parentService: "DEMO-webapp"
  parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
  parentEndpoint: "{GET}/hello/{words}"
  networkAddressUsedAtPeer: "169.254.122.166:20880"
}
operationName: "com.xxx.HelloService.say(String)"
spanLayer: RPCFramework
componentId: 3
tags {
  key: "url"
  value: "dubbo://169.254.122.166:20880/com.xxx.HelloService.say(String)"
}

因为当前的span为EntrySpan,对应的spanID值就是0,会被下面的listener进行处理

skywalking后端处理业务逻辑的梳理-风君雪科技博客

  notifyFirstListener(spanObject, segmentObject);底层会调用SegmentAnalysisListener的parseFirst方法,我们来看下改方法的业务逻辑

@Override
    public void parseFirst(SpanObject span, SegmentObject segmentObject) {
        if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
            return;
        }

        if (StringUtil.isEmpty(serviceId)) {
            serviceName = namingControl.formatServiceName(segmentObject.getService());
            serviceId = IDManager.ServiceID.buildId(
                serviceName,
                NodeType.Normal
            );
        }

        long timeBucket = TimeBucket.getRecordTimeBucket(startTimestamp);

        segment.setSegmentId(segmentObject.getTraceSegmentId());
        segment.setServiceId(serviceId);
        segment.setServiceInstanceId(IDManager.ServiceInstanceID.buildId(
            serviceId,
            namingControl.formatInstanceName(segmentObject.getServiceInstance())
        ));
        segment.setLatency(duration);
        segment.setStartTime(startTimestamp);
        segment.setTimeBucket(timeBucket);
        segment.setEndTime(endTimestamp);
        segment.setIsError(BooleanUtils.booleanToValue(isError));
        segment.setDataBinary(segmentObject.toByteArray());
        segment.setVersion(3);

        endpointName = namingControl.formatEndpointName(serviceName, span.getOperationName());
        endpointId = IDManager.EndpointID.buildId(
            serviceId,
            endpointName
        );
    }

1、当前service: “DEMO-provider”我们需要将”DEMO-provider”转行成对应应用id,存储到segment对象中

2、同理应用实例serviceInstance: “c5ec906d3dd24d0b8907fa8549c8306b@192.168.43.80″我们也要转化成对应的应用实例id存储到segement对象中

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 skywalking后端处理业务逻辑的梳理-风君雪科技博客

segement中的timeBucket字段就是当前TraceSegment中span集合中,比较各个span的开始时间,取出最小的时间,最小的时间作为segement的timeBucket

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 接下来设置segemnt的开始时间,开始时间就是比较各个span的开始时间,取出最小的开始时间作为segemnt的开始时间

segemnt的结束时间结束始时间就是比较各个span的结束时间,取出最大的结束时间作为segemnt的结束时间

整个segemnt的持续时间就是开始时间减去结束时间,此外还将整个segemeOBj对象转化成二进制文件存储在字段setDataBinary中

经过整个parseFirst方法之后,整个segemnt对象的结构为

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 接下来我们继续执行下面的方法

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 当前的span为EntrySpan,接下来会执行 notifyEntryListener(spanObject, segmentObject);方法

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 skywalking后端处理业务逻辑的梳理-风君雪科技博客

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 这里MultiScopesAnalysisListener和SegmentAnalysisListener都会对整个EntrySpan进行处理,我们来分析下下面的代码

首先来看下MultiScopesAnalysisListener对parseEntry的方法对EntrySpan的处理

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 改方法的主要作用是获得当前EntrySpan关联的上游的TraceSegmentRef对象,然后将TraceSegmentRef对象封装到SourceBuilder对象中

我们先来看下当前EntrySpan中获得的TraceSegmentRef对象的信息如下

traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380004"
parentSpanId: 1
parentService: "DEMO-webapp"
parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
parentEndpoint: "{GET}/hello/{words}"
networkAddressUsedAtPeer: "169.254.122.166:20880"

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 在 SourceBuilder 中记录了上下游两个系统的基础信息,核心字段如下所示:

// 上游系统的 service 信息、serviceInstance 信息以及 Endpoint 信息

@Getter @Setter private int sourceServiceId;

@Getter @Setter private String sourceServiceName;

@Getter @Setter private int sourceServiceInstanceId;

@Getter @Setter private String sourceServiceInstanceName;

@Getter @Setter private int sourceEndpointId;

@Getter @Setter private String sourceEndpointName;

// 下游系统的 service 信息、serviceInstance 信息以及 Endpoint 信息

@Getter @Setter private int destServiceId;

@Getter @Setter private String destServiceName;

@Getter @Setter private int destServiceInstanceId;

@Getter @Setter private String destServiceInstanceName;

@Getter @Setter private int destEndpointId;

@Getter @Setter private String destEndpointName;

// 当前系统的组件类型

@Getter @Setter private int componentId;

// 在当前系统中的耗时

@Getter @Setter private int latency;

// 当前系统是否发生Error

@Getter @Setter private boolean status;

@Getter @Setter private int responseCode; // 默认为0

@Getter @Setter private RequestType type; // 请求类型

// 调用关系中的角色,是调用方(Client)、被调用方(Server)还是代理(Proxy)

@Getter @Setter private DetectPoint detectPoint; 

// TraceSegment 起始时间所在分钟级时间窗口

@Getter @Setter private long timeBucket;

这里有一个关键点

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 就是如果当前的EntrySpan中没有TraceSegmentRef没有父亲入口,比如dubbo-webapp入口的EntrySpan,在http请求头信息是没有任何信息的,获得的TraceSegmentRef就是为null,这里代码中设置当前的span的sourceBuilder中的  sourceBuilder.setSourceEndpointName(Const.USER_ENDPOINT_NAME);,public static final String USER_ENDPOINT_NAME = “User”;的函数表示就是浏览器或者postman等入口调用了dubbo-webapp,作为dubbo-webapp调用的入口

这里dubbo-provider的执行的是else条件,入口的方法就是dubbo-webapp的方法

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 skywalking后端处理业务逻辑的梳理-风君雪科技博客

 这里设置sourceBuilder的上游节点内容的时候,需要判断上面有MQ类型还是一般应用,这里是一般应用执行下面的else的方法

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 上游节点的信息设置完成之后,接下来是设置下游的节点信息

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 skywalking后端处理业务逻辑的梳理-风君雪科技博客

 通过这个sourcebuild对象就将当前的EntrySpan和dubb-webapp中产线的exitspan关联起来了,

接下来将创建好的sourceBuilder添加到集合entrySourceBuilders中,后续会进行处理

这里TraceSegment中的span集合中如果存在5个EntrySpan,就会产生5个sourceBuilder对象

接下来继续执行 parseLogicEndpoints(span, segmentObject);

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 我们来看看 parseLogicEndpoints(span, segmentObject);具体的业务代码,改方法的具体业务逻辑后面再进行分析

MultiScopesAnalysisListener的parseEntry执行完成之后,接下来执行NetworkAddressAliasMappingListener类的parseEntry方法

我们来看下改方法的具体业务逻辑,就是将当前EntrySpan所在的网络、应用名称、应用实例封装成一个networkAddressAliasSetup对象,然后将这个对象

传递给sourceReceiver进行处理

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 整个networkAddressAliasSetup的结构如下

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 SegmentAnalysisListener的parseEntry方法就是将应用名称应用实例名称转化成对于的id

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 skywalking后端处理业务逻辑的梳理-风君雪科技博客

 经过上面的转化之后执行notifyListenerToBuild()方法

private void notifyListenerToBuild() {
analysisListeners.forEach(AnalysisListener::build);
}执行各个listener的build方法,主要是执行SegmentAnalysisListener的build方法和MultiScopesAnalysisListener的build方法

我们先介绍SegmentAnalysisListener的build方法,然后在介绍MultiScopesAnalysisListener的build方法

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 收到判断当前的segment是否是被采样忽略的,如果是被采样忽略的

skywalking后端采样率要设置为一样的原因
一个全局的请求,在dubbo-webapp产生了一个TraceSegmentA,在dubbo-dubbo产生了一个TraceSegmentB,需要有这两个TraceSegment才能还原这一次Trace
TraceSegmentA传递给后端的oap集群的A节点处理,TraceSegmentB传递给后端的oap集群的B节点处理,
如果A节点和B节点在后端的采样率设置不一致,就可能导致TraceSegmentA被处理了存储在es中,TraceSegmentB被忽略直接丢弃,导致无法还原当前请求的trace情况
所以oap集群后端每个节点的采样率必须设置为一致

  收到判断当前的segment是否是被采样忽略的,如果是被采样忽略的就直接忽略,如果不是继续执行后续的代码

设置segemnt中endpointId的值,这里endpointID是这样计算出来的

        endpointId = IDManager.EndpointID.buildId(
            serviceId,
            endpointName
        )

当前的应用为dubbo-provider对于的service_id的值为REVNTy13ZWJhcHA=.1

endpointName为当前span对应的  operationName: “com.xxx.HelloService.say(String)”

skywalking后端处理业务逻辑的梳理-风君雪科技博客

 然后通过上面的这两个参数获得对应的值

         */
        public static String buildId(String serviceId, String endpointName) {
            return serviceId
                + Const.ID_CONNECTOR
                + encode(endpointName);
        }

skywalking后端处理业务逻辑的梳理-风君雪科技博客

算出来了之后 

经过处理之后整个TraceSegment的结构如下

 2、预构建 :替换url peer端口、servicenam等信息

1、接下来将TraceSegment中各个span的类型分配给不同的spanlistener进行处理

 接下来对每一个span进行解析

parentSpanId: -1
startTime: 1609266200857
endTime: 1609266202857
refs {
traceId: “f5411eeefb3b403f8cafa89d0b82a82f.57.16092661992120001”
parentTraceSegmentId: “f5411eeefb3b403f8cafa89d0b82a82f.57.16092661992120000”
parentSpanId: 1
parentService: “DEMO-webapp”
parentServiceInstance: “e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80”
parentEndpoint: “{GET}/hello/{words}”
networkAddressUsedAtPeer: “169.254.122.166:20880”
}
operationName: “com.xxx.HelloService.say(String)”
spanLayer: RPCFramework
componentId: 3
tags {
key: “url”
value: “dubbo://169.254.122.166:20880/com.xxx.HelloService.say(String)”
}

依据不同的span类型进行解析,传递给不同的listen对象,然后产生不同的数据指标