diff --git a/agent/src/flow_generator/protocol_logs/mq/rocketmq.rs b/agent/src/flow_generator/protocol_logs/mq/rocketmq.rs index 835e542086f..67e60573af1 100644 --- a/agent/src/flow_generator/protocol_logs/mq/rocketmq.rs +++ b/agent/src/flow_generator/protocol_logs/mq/rocketmq.rs @@ -28,11 +28,14 @@ use crate::{ flow_generator::{ error::{Error, Result}, protocol_logs::{ - pb_adapter::{ExtendedInfo, L7ProtocolSendLog, L7Request, L7Response, TraceInfo}, + pb_adapter::{ + ExtendedInfo, KeyVal, L7ProtocolSendLog, L7Request, L7Response, TraceInfo, + }, set_captured_byte, swap_if, value_is_default, value_is_negative, AppProtoHead, - L7ResponseStatus, PrioFields, BASE_FIELD_PRIORITY, + L7ResponseStatus, PrioFields, BASE_FIELD_PRIORITY, PLUGIN_FIELD_PRIORITY, }, }, + plugin::{wasm::WasmData, CustomInfo}, utils::bytes, }; @@ -43,6 +46,10 @@ pub struct RocketmqInfo { msg_type: LogMessageType, #[serde(skip)] is_tls: bool, + #[serde(skip)] + is_async: bool, + #[serde(skip)] + is_reversed: bool, #[serde(rename = "x_request_id", skip_serializing_if = "value_is_default")] pub msg_key: String, @@ -84,6 +91,18 @@ pub struct RocketmqInfo { )] pub remark: String, + #[serde(skip)] + attributes: Vec, + + #[serde(skip_serializing_if = "value_is_default")] + biz_type: u8, + #[serde(skip_serializing_if = "value_is_default")] + biz_code: String, + #[serde(skip_serializing_if = "value_is_default")] + biz_scenario: String, + #[serde(skip_serializing_if = "value_is_default")] + biz_response_code: String, + captured_request_byte: u32, captured_response_byte: u32, @@ -167,10 +186,18 @@ impl L7ProtocolInfoInterface for RocketmqInfo { self.is_tls } + fn is_reversed(&self) -> bool { + self.is_reversed + } + fn get_endpoint(&self) -> Option { self.endpoint.clone() } + fn get_biz_type(&self) -> u8 { + self.biz_type + } + fn get_request_resource_length(&self) -> usize { self.ext_topic.len() } @@ -190,6 +217,12 @@ impl RocketmqInfo { } pub fn merge(&mut self, other: &mut Self) { + if other.is_async { + self.is_async = other.is_async; + } + if other.is_reversed { + self.is_reversed = other.is_reversed; + } swap_if!(self, resp_msg_size, is_none, other); if self.status == L7ResponseStatus::default() { self.status = other.status; @@ -199,6 +232,16 @@ impl RocketmqInfo { self.resp_code = other.resp_code; } swap_if!(self, remark, is_empty, other); + let other_trace_ids = std::mem::take(&mut other.trace_ids); + self.trace_ids.merge(other_trace_ids); + swap_if!(self, span_id, is_empty, other); + self.attributes.append(&mut other.attributes); + if other.biz_type > 0 { + self.biz_type = other.biz_type; + } + swap_if!(self, biz_code, is_empty, other); + swap_if!(self, biz_scenario, is_empty, other); + swap_if!(self, biz_response_code, is_empty, other); self.captured_response_byte = other.captured_response_byte; swap_if!(self, endpoint, is_none, other); if other.is_on_blacklist { @@ -206,6 +249,76 @@ impl RocketmqInfo { } } + pub fn merge_custom_info(&mut self, custom: CustomInfo) { + if !custom.req.version.is_empty() { + self.version = custom.req.version; + } + if !custom.req.req_type.is_empty() { + self.req_code_name = custom.req.req_type; + } + if !custom.req.domain.is_empty() { + self.ext_group = custom.req.domain; + } + if !custom.req.resource.is_empty() { + self.ext_topic = custom.req.resource; + } + if !custom.req.endpoint.is_empty() { + self.endpoint = Some(custom.req.endpoint); + } + if let Some(code) = custom.resp.code { + self.resp_code = code; + } + if custom.resp.status != L7ResponseStatus::default() { + self.status = custom.resp.status; + } + if !custom.resp.exception.is_empty() { + self.remark = custom.resp.exception; + } + if !custom.resp.req_type.is_empty() { + self.req_code_name = custom.resp.req_type; + } + if !custom.resp.endpoint.is_empty() { + self.endpoint = Some(custom.resp.endpoint); + } + + self.trace_ids + .merge_same_priority(PLUGIN_FIELD_PRIORITY, custom.trace.trace_ids); + if let Some(span_id) = custom.trace.span_id { + self.span_id = span_id; + } + if let Some(x_request_id_0) = custom.trace.x_request_id_0 { + self.msg_key = x_request_id_0; + } + if let Some(x_request_id_1) = custom.trace.x_request_id_1 { + self.msg_key = x_request_id_1; + } + + if !custom.attributes.is_empty() { + self.attributes.extend(custom.attributes); + } + if custom.biz_type > 0 { + self.biz_type = custom.biz_type; + } + if let Some(biz_code) = custom.biz_code { + self.biz_code = biz_code; + } + if let Some(biz_scenario) = custom.biz_scenario { + self.biz_scenario = biz_scenario; + } + if let Some(biz_response_code) = custom.biz_response_code { + self.biz_response_code = biz_response_code; + } + if let Some(is_async) = custom.is_async { + self.is_async = is_async; + } + if let Some(is_reversed) = custom.is_reversed { + self.is_reversed = is_reversed; + } + if self.endpoint.is_none() { + self.endpoint = self.generate_endpoint(); + } + } + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::RocketMQ) { self.is_on_blacklist = t.request_type.is_on_blacklist(&self.req_code_name) @@ -222,11 +335,17 @@ impl RocketmqInfo { impl From for L7ProtocolSendLog { fn from(f: RocketmqInfo) -> Self { - let flags = if f.is_tls { - ApplicationFlags::TLS.bits() + let mut flags = if f.is_tls { + ApplicationFlags::TLS } else { - ApplicationFlags::NONE.bits() + ApplicationFlags::NONE }; + if f.is_async { + flags = flags | ApplicationFlags::ASYNC; + } + if f.is_reversed { + flags = flags | ApplicationFlags::REVERSED; + } L7ProtocolSendLog { captured_request_byte: f.captured_request_byte, captured_response_byte: f.captured_response_byte, @@ -255,9 +374,17 @@ impl From for L7ProtocolSendLog { request_id: Some(f.opaque), x_request_id_0: Some(f.msg_key.clone()), x_request_id_1: Some(f.msg_key.clone()), + attributes: if f.attributes.is_empty() { + None + } else { + Some(f.attributes) + }, ..Default::default() }), - flags, + flags: flags.bits(), + biz_code: f.biz_code, + biz_scenario: f.biz_scenario, + biz_response_code: f.biz_response_code, ..Default::default() } } @@ -297,6 +424,7 @@ impl L7ProtocolParserInterface for RocketmqLog { self.parse(payload, param.l4_protocol, param.direction, &mut info)?; info.is_tls = param.is_tls(); set_captured_byte!(info, param); + self.wasm_hook(param, payload, &mut info); if let Some(config) = param.parse_config { info.set_is_on_blacklist(config); } @@ -304,7 +432,7 @@ impl L7ProtocolParserInterface for RocketmqLog { if param.parse_perf { let mut perf_stat = L7PerfStats::default(); if info.msg_type == LogMessageType::Response && info.endpoint.is_none() { - if let Some(endpoint) = info.load_endpoint_from_cache(param, false) { + if let Some(endpoint) = info.load_endpoint_from_cache(param, info.is_reversed) { info.endpoint = Some(endpoint.to_string()); } } @@ -335,6 +463,17 @@ impl L7ProtocolParserInterface for RocketmqLog { } impl RocketmqLog { + fn wasm_hook(&mut self, param: &ParseParam, payload: &[u8], info: &mut RocketmqInfo) { + let mut vm_ref = param.wasm_vm.borrow_mut(); + let Some(vm) = vm_ref.as_mut() else { + return; + }; + let wasm_data = WasmData::new(L7Protocol::RocketMQ); + if let Some(custom) = vm.on_custom_message(payload, param, wasm_data) { + info.merge_custom_info(custom); + } + } + fn check(&mut self, payload: &[u8], protocol: IpProtocol) -> bool { if protocol != IpProtocol::TCP { return false;