Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion app/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,10 @@ app:
# each strategy enables one type of weak cross span_set connection when strong evidence (tcp_seq/x_request_id/span_id) is absent:
# net_span_c_to_s_via_trace_id: connect a client-side leaf NetSpan to a server-side root NetSpan
# when they share the same trace_id but have different tcp_seq
span_set_connection_strategies: []
span_set_connection_strategies: []
# when in multiple trace_id scenarios, we may miss some data due to time range not in query window
# in default, query time range is fixed value from front-end, which is [-1m:+1m]
# but in async request-response scenarios, we have no idea how long an async response would be sent after request
# it may lead to a missing result
# so we use config for extra time range expansion, default: 0, unit: s
iteration_expand_time_range: 0
48 changes: 38 additions & 10 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async def query_and_trace_flowmetas(
max_iteration: int = config.max_iteration,
network_delay_us: int = config.network_delay_us,
host_clock_offset_us: int = config.host_clock_offset_us,
app_spans_from_api: list = []) -> Tuple[Set, list]:
app_spans_from_api: list = []) -> Tuple[Set, list, str]:
"""多次迭代,查询可追踪到的所有 l7_flow_log 的摘要
参数说明:
time_filter: 查询的时间范围过滤条件,SQL表达式
Expand All @@ -309,12 +309,17 @@ async def query_and_trace_flowmetas(
visited_trace_ids = set() # 已访问过的 trace_id 集合
new_trace_ids_for_next_iteration = set() # 给下一轮迭代查询用的 trace_id 集合

# during iteration, maybe update query time range based on config.iteration_expand_time_range
# in this case, should updated global query time range for all l7_flow_ids
min_start_time = self.start_time
max_end_time = self.end_time

# Query1: 先获取 _id 对应的数据
# don't use timefilter here, querier would extract time from _id (e.g.: id>>32)
# don't use time_filter here, querier would extract time from _id (e.g.: id>>32)
dataframe_flowmetas = await self.query_flowmetas("1=1", base_filter)
if type(dataframe_flowmetas) != DataFrame or dataframe_flowmetas.empty:
# when app_spans_from_api got values from api, return it
return set(), app_spans_from_api
return set(), app_spans_from_api, time_filter
l7_flow_ids = set(dataframe_flowmetas['_id']) # set(flow._id)

# 用于下一轮迭代,记录元信息
Expand Down Expand Up @@ -355,6 +360,13 @@ async def query_and_trace_flowmetas(
max_iteration = 1

config.tracing_source = config.tracing_source or DEFAULT_TRACING_SOURCE
# 注意扩大时间范围能力只用于搜索多 trace_id,其他关联搜索(tcp_seq, x_req_id, syscall)不要扩大查询范围
# 当什么也不配置时,查询范围是 _id 对应的 l7_flow_log 的时间前后各推一分钟,由前端传参决定
trace_id_time_filter = time_filter
if config.iteration_expand_time_range:
start_time = self.start_time - config.iteration_expand_time_range
end_time = self.end_time + config.iteration_expand_time_range
trace_id_time_filter = f"time>={start_time} AND time<={end_time}"

# 进行迭代查询,上限为 config.spec.max_iteration
for i in range(max_iteration):
Expand Down Expand Up @@ -384,7 +396,7 @@ async def query_and_trace_flowmetas(
# Query2: 基于 trace_id 获取相关数据,第一层迭代
new_trace_id_flows = pd.DataFrame()
new_trace_id_flows = await self.query_flowmetas(
time_filter, ' AND '.join(query_trace_filters))
trace_id_time_filter, ' AND '.join(query_trace_filters))

# 已查询过一次的 trace_id,直接加入 visited,不需要再被查询
# new_trace_ids_for_next_iteration 已被查询,可在这一步直接清空
Expand Down Expand Up @@ -444,6 +456,17 @@ async def query_and_trace_flowmetas(
dataframe_flowmetas = self.concat_l7_flow_log_dataframe(
[dataframe_flowmetas, new_trace_id_flows])
l7_flow_ids = set(dataframe_flowmetas['_id'])
if config.iteration_expand_time_range:
trace_min_s = int(
new_trace_id_flows['start_time_us'].min() //
1_000_000)
trace_max_s = int(
new_trace_id_flows['end_time_us'].max() //
1_000_000)
if trace_min_s < min_start_time:
min_start_time = trace_min_s
if trace_max_s > max_end_time:
max_end_time = trace_max_s

new_trace_req_tcp_seqs, new_trace_resp_tcp_seqs, new_trace_x_request_ids, new_trace_syscall_trace_ids, new_request_ids = _build_simple_trace_info_from_dataframe(
new_trace_id_flows)
Expand Down Expand Up @@ -547,9 +570,13 @@ async def query_and_trace_flowmetas(
elif not new_filters: # only new_trace_ids_for_next_iteration got values
continue

# 关联查询(tcp_seq/syscall/dns/xreqid)的时间范围应跟随已查到的 trace_id flows 的实际时间跨度
# min_start_time/max_end_time 在每次 append 新 flows 时增量更新,此处直接使用
assoc_time_filter = f"time>={min_start_time} AND time<={max_end_time}"

# Query3: 查询上述基于 Condition[123] 构建出的条件,即与【第一层迭代】关联的所有 flow,此处构建【第二层迭代】查询
new_flows = pd.DataFrame()
new_flows = await self.query_flowmetas(time_filter,
new_flows = await self.query_flowmetas(assoc_time_filter,
' OR '.join(new_filters))
if type(new_flows
) != DataFrame or new_flows.empty: # no more new flows
Expand Down Expand Up @@ -630,8 +657,9 @@ async def query_and_trace_flowmetas(
break

# end of `for i in range(max_iteration)`

return l7_flow_ids, app_spans_from_external
# update final_time_filter for all l7_flow_ids
final_time_filter = f"time>={min_start_time} AND time<={max_end_time}"
return l7_flow_ids, app_spans_from_external, final_time_filter

async def trace_l7_flow(
self,
Expand All @@ -654,7 +682,7 @@ async def trace_l7_flow(
network_delay_us: 使用Flowmeta进行流日志匹配的时间偏差容忍度,越大漏报率越低但误报率越高,一般设置为网络时延的最大可能值
"""
# 多次迭代,查询到所有相关的 l7_flow_log 摘要
l7_flow_ids, app_spans_from_external = await self.query_and_trace_flowmetas(
l7_flow_ids, app_spans_from_external, final_time_filter = await self.query_and_trace_flowmetas(
time_filter, base_filter, max_iteration, network_delay_us,
host_clock_offset_us, app_spans_from_api)

Expand All @@ -668,8 +696,8 @@ async def trace_l7_flow(
return_fields.append("attribute")
l7_flows = pd.DataFrame()
if len(l7_flow_ids) > 0:
l7_flows = await self.query_all_flows(time_filter, l7_flow_ids,
return_fields)
l7_flows = await self.query_all_flows(final_time_filter,
l7_flow_ids, return_fields)
if type(l7_flows) != DataFrame or l7_flows.empty:
# 一般不可能发生没有 l7_flows 但有 app_spans_from_external 的情况
# 实际上几乎不可能发生没有 l7_flows 的情况,因为至少包含初始 flow
Expand Down
3 changes: 2 additions & 1 deletion app/app/application/tracing_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ async def query(self):
for j in range(len(self.app_spans)):
if i == j:
continue
related_map_from_api[self.app_spans[i]['_id']][self.app_spans[j]['_id']] |= L7_FLOW_RELATIONSHIP_SPAN_ID
related_map_from_api[self.app_spans[i]['_id']][
self.app_spans[j]['_id']] |= L7_FLOW_RELATIONSHIP_SPAN_ID
rst = await self.trace_l7_flow(
time_filter,
base_filter,
Expand Down
2 changes: 2 additions & 0 deletions app/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def parse_spec(self, cfg):
if not isinstance(strategies, list):
strategies = []
self.span_set_connection_strategies = strategies
self.iteration_expand_time_range = int(
spec.get('iteration_expand_time_range', 0))

def parse_querier(self, cfg):
querier = cfg.get('querier', dict())
Expand Down