diff --git a/app/app.yaml b/app/app.yaml index 5eca977..349e2d9 100644 --- a/app/app.yaml +++ b/app/app.yaml @@ -49,4 +49,10 @@ app: # each strategy enables one type of weak cross span_set connection when strong evidence (tcp_seq/x_request_id/span_id) is absent: # net_span_c_to_s_via_trace_id: connect a client-side leaf NetSpan to a server-side root NetSpan # when they share the same trace_id but have different tcp_seq - span_set_connection_strategies: [] \ No newline at end of file + span_set_connection_strategies: [] + # when in multiple trace_id scenarios, we may miss some data due to time range not in query window + # in default, query time range is fixed value from front-end, which is [-1m:+1m] + # but in async request-response scenarios, we have no idea how long an async response would be sent after request + # it may lead to a missing result + # so we use config for extra time range expansion, default: 0, unit: s + iteration_expand_time_range: 0 diff --git a/app/app/application/l7_flow_tracing.py b/app/app/application/l7_flow_tracing.py index 0b7929a..9a7c7cb 100644 --- a/app/app/application/l7_flow_tracing.py +++ b/app/app/application/l7_flow_tracing.py @@ -284,7 +284,7 @@ async def query_and_trace_flowmetas( max_iteration: int = config.max_iteration, network_delay_us: int = config.network_delay_us, host_clock_offset_us: int = config.host_clock_offset_us, - app_spans_from_api: list = []) -> Tuple[Set, list]: + app_spans_from_api: list = []) -> Tuple[Set, list, str]: """多次迭代,查询可追踪到的所有 l7_flow_log 的摘要 参数说明: time_filter: 查询的时间范围过滤条件,SQL表达式 @@ -309,12 +309,17 @@ async def query_and_trace_flowmetas( visited_trace_ids = set() # 已访问过的 trace_id 集合 new_trace_ids_for_next_iteration = set() # 给下一轮迭代查询用的 trace_id 集合 + # during iteration, maybe update query time range based on config.iteration_expand_time_range + # in this case, should updated global query time range for all l7_flow_ids + min_start_time = self.start_time + max_end_time = self.end_time + # Query1: 先获取 _id 对应的数据 - # don't use timefilter here, querier would extract time from _id (e.g.: id>>32) + # don't use time_filter here, querier would extract time from _id (e.g.: id>>32) dataframe_flowmetas = await self.query_flowmetas("1=1", base_filter) if type(dataframe_flowmetas) != DataFrame or dataframe_flowmetas.empty: # when app_spans_from_api got values from api, return it - return set(), app_spans_from_api + return set(), app_spans_from_api, time_filter l7_flow_ids = set(dataframe_flowmetas['_id']) # set(flow._id) # 用于下一轮迭代,记录元信息 @@ -355,6 +360,13 @@ async def query_and_trace_flowmetas( max_iteration = 1 config.tracing_source = config.tracing_source or DEFAULT_TRACING_SOURCE + # 注意扩大时间范围能力只用于搜索多 trace_id,其他关联搜索(tcp_seq, x_req_id, syscall)不要扩大查询范围 + # 当什么也不配置时,查询范围是 _id 对应的 l7_flow_log 的时间前后各推一分钟,由前端传参决定 + trace_id_time_filter = time_filter + if config.iteration_expand_time_range: + start_time = self.start_time - config.iteration_expand_time_range + end_time = self.end_time + config.iteration_expand_time_range + trace_id_time_filter = f"time>={start_time} AND time<={end_time}" # 进行迭代查询,上限为 config.spec.max_iteration for i in range(max_iteration): @@ -384,7 +396,7 @@ async def query_and_trace_flowmetas( # Query2: 基于 trace_id 获取相关数据,第一层迭代 new_trace_id_flows = pd.DataFrame() new_trace_id_flows = await self.query_flowmetas( - time_filter, ' AND '.join(query_trace_filters)) + trace_id_time_filter, ' AND '.join(query_trace_filters)) # 已查询过一次的 trace_id,直接加入 visited,不需要再被查询 # new_trace_ids_for_next_iteration 已被查询,可在这一步直接清空 @@ -444,6 +456,17 @@ async def query_and_trace_flowmetas( dataframe_flowmetas = self.concat_l7_flow_log_dataframe( [dataframe_flowmetas, new_trace_id_flows]) l7_flow_ids = set(dataframe_flowmetas['_id']) + if config.iteration_expand_time_range: + trace_min_s = int( + new_trace_id_flows['start_time_us'].min() // + 1_000_000) + trace_max_s = int( + new_trace_id_flows['end_time_us'].max() // + 1_000_000) + if trace_min_s < min_start_time: + min_start_time = trace_min_s + if trace_max_s > max_end_time: + max_end_time = trace_max_s new_trace_req_tcp_seqs, new_trace_resp_tcp_seqs, new_trace_x_request_ids, new_trace_syscall_trace_ids, new_request_ids = _build_simple_trace_info_from_dataframe( new_trace_id_flows) @@ -547,9 +570,13 @@ async def query_and_trace_flowmetas( elif not new_filters: # only new_trace_ids_for_next_iteration got values continue + # 关联查询(tcp_seq/syscall/dns/xreqid)的时间范围应跟随已查到的 trace_id flows 的实际时间跨度 + # min_start_time/max_end_time 在每次 append 新 flows 时增量更新,此处直接使用 + assoc_time_filter = f"time>={min_start_time} AND time<={max_end_time}" + # Query3: 查询上述基于 Condition[123] 构建出的条件,即与【第一层迭代】关联的所有 flow,此处构建【第二层迭代】查询 new_flows = pd.DataFrame() - new_flows = await self.query_flowmetas(time_filter, + new_flows = await self.query_flowmetas(assoc_time_filter, ' OR '.join(new_filters)) if type(new_flows ) != DataFrame or new_flows.empty: # no more new flows @@ -630,8 +657,9 @@ async def query_and_trace_flowmetas( break # end of `for i in range(max_iteration)` - - return l7_flow_ids, app_spans_from_external + # update final_time_filter for all l7_flow_ids + final_time_filter = f"time>={min_start_time} AND time<={max_end_time}" + return l7_flow_ids, app_spans_from_external, final_time_filter async def trace_l7_flow( self, @@ -654,7 +682,7 @@ async def trace_l7_flow( network_delay_us: 使用Flowmeta进行流日志匹配的时间偏差容忍度,越大漏报率越低但误报率越高,一般设置为网络时延的最大可能值 """ # 多次迭代,查询到所有相关的 l7_flow_log 摘要 - l7_flow_ids, app_spans_from_external = await self.query_and_trace_flowmetas( + l7_flow_ids, app_spans_from_external, final_time_filter = await self.query_and_trace_flowmetas( time_filter, base_filter, max_iteration, network_delay_us, host_clock_offset_us, app_spans_from_api) @@ -668,8 +696,8 @@ async def trace_l7_flow( return_fields.append("attribute") l7_flows = pd.DataFrame() if len(l7_flow_ids) > 0: - l7_flows = await self.query_all_flows(time_filter, l7_flow_ids, - return_fields) + l7_flows = await self.query_all_flows(final_time_filter, + l7_flow_ids, return_fields) if type(l7_flows) != DataFrame or l7_flows.empty: # 一般不可能发生没有 l7_flows 但有 app_spans_from_external 的情况 # 实际上几乎不可能发生没有 l7_flows 的情况,因为至少包含初始 flow diff --git a/app/app/application/tracing_completion.py b/app/app/application/tracing_completion.py index b025c36..0e4667e 100644 --- a/app/app/application/tracing_completion.py +++ b/app/app/application/tracing_completion.py @@ -47,7 +47,8 @@ async def query(self): for j in range(len(self.app_spans)): if i == j: continue - related_map_from_api[self.app_spans[i]['_id']][self.app_spans[j]['_id']] |= L7_FLOW_RELATIONSHIP_SPAN_ID + related_map_from_api[self.app_spans[i]['_id']][ + self.app_spans[j]['_id']] |= L7_FLOW_RELATIONSHIP_SPAN_ID rst = await self.trace_l7_flow( time_filter, base_filter, diff --git a/app/app/config.py b/app/app/config.py index 0705773..c49a8c4 100644 --- a/app/app/config.py +++ b/app/app/config.py @@ -31,6 +31,8 @@ def parse_spec(self, cfg): if not isinstance(strategies, list): strategies = [] self.span_set_connection_strategies = strategies + self.iteration_expand_time_range = int( + spec.get('iteration_expand_time_range', 0)) def parse_querier(self, cfg): querier = cfg.get('querier', dict())