From afd39c21a889a786c78ad4231f7c3d5915c12f23 Mon Sep 17 00:00:00 2001 From: kastov Date: Mon, 6 Apr 2026 02:24:43 +0300 Subject: [PATCH] feat: GetUsersStats API to retrieve user statistics including traffic data --- app/dispatcher/default.go | 22 +- app/metrics/metrics.go | 7 +- app/stats/command/command.go | 93 ++++- app/stats/command/command.pb.go | 342 ++++++++++++++++-- app/stats/command/command.proto | 26 ++ app/stats/command/command_grpc.pb.go | 38 ++ app/stats/online_map.go | 42 +-- app/stats/stats.go | 54 ++- features/stats/stats.go | 33 +- main/commands/all/api/stats_online_ip_list.go | 39 +- 10 files changed, 580 insertions(+), 116 deletions(-) diff --git a/app/dispatcher/default.go b/app/dispatcher/default.go index e6f89657e29b..9652043e89c0 100644 --- a/app/dispatcher/default.go +++ b/app/dispatcher/default.go @@ -181,12 +181,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *tran } if p.Stats.UserOnline { - name := "user>>>" + user.Email + ">>>online" - if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil { - userIP := sessionInbound.Source.Address.String() - om.AddIP(userIP) - context.AfterFunc(ctx, func() { om.RemoveIP(userIP) }) - } + trackOnlineIP(ctx, d.stats, user.Email, sessionInbound.Source.Address.String()) } } @@ -220,18 +215,21 @@ func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager st } } if p.Stats.UserOnline { - name := "user>>>" + user.Email + ">>>online" - if om, _ := stats.GetOrRegisterOnlineMap(statsManager, name); om != nil { - userIP := sessionInbound.Source.Address.String() - om.AddIP(userIP) - context.AfterFunc(ctx, func() { om.RemoveIP(userIP) }) - } + trackOnlineIP(ctx, statsManager, user.Email, sessionInbound.Source.Address.String()) } } return link } +func trackOnlineIP(ctx context.Context, sm stats.Manager, email, ip string) { + name := "user>>>" + email + ">>>online" + if om, _ := stats.GetOrRegisterOnlineMap(sm, name); om != nil { + om.AddIP(ip) + context.AfterFunc(ctx, func() { om.RemoveIP(ip) }) + } +} + func (d *DefaultDispatcher) shouldOverride(ctx context.Context, result SniffResult, request session.SniffingRequest, destination net.Destination) bool { domain := result.Domain() if domain == "" { diff --git a/app/metrics/metrics.go b/app/metrics/metrics.go index 1dc5b2f5d2d2..a37373842d34 100644 --- a/app/metrics/metrics.go +++ b/app/metrics/metrics.go @@ -8,7 +8,6 @@ import ( "strings" "github.com/xtls/xray-core/app/observatory" - "github.com/xtls/xray-core/app/stats" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/net" @@ -39,16 +38,12 @@ func NewMetricsHandler(ctx context.Context, config *Config) (*MetricsHandler, er c.ohm = om })) expvar.Publish("stats", expvar.Func(func() interface{} { - manager, ok := c.statsManager.(*stats.Manager) - if !ok { - return nil - } resp := map[string]map[string]map[string]int64{ "inbound": {}, "outbound": {}, "user": {}, } - manager.VisitCounters(func(name string, counter feature_stats.Counter) bool { + c.statsManager.VisitCounters(func(name string, counter feature_stats.Counter) bool { nameSplit := strings.Split(name, ">>>") typeName, tagOrUser, direction := nameSplit[0], nameSplit[1], nameSplit[3] if item, found := resp[typeName][tagOrUser]; found { diff --git a/app/stats/command/command.go b/app/stats/command/command.go index 079df1ecae4c..535cbac57172 100644 --- a/app/stats/command/command.go +++ b/app/stats/command/command.go @@ -3,11 +3,10 @@ package command import ( "context" "runtime" + "strings" "time" - "github.com/xtls/xray-core/app/stats" "github.com/xtls/xray-core/common" - "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/strmatcher" "github.com/xtls/xray-core/core" feature_stats "github.com/xtls/xray-core/features/stats" @@ -70,9 +69,10 @@ func (s *statsServer) GetStatsOnlineIpList(ctx context.Context, request *GetStat } ips := make(map[string]int64) - for ip, t := range c.IPTimeMap() { - ips[ip] = t.Unix() - } + c.ForEach(func(ip string, lastSeen int64) bool { + ips[ip] = lastSeen + return true + }) return &GetStatsOnlineIpListResponse{ Name: request.Name, @@ -86,6 +86,82 @@ func (s *statsServer) GetAllOnlineUsers(ctx context.Context, request *GetAllOnli }, nil } +func (s *statsServer) GetUsersStats(ctx context.Context, request *GetUsersStatsRequest) (*GetUsersStatsResponse, error) { + userMap := make(map[string]*UserStat) + + s.stats.VisitOnlineMaps(func(name string, om feature_stats.OnlineMap) bool { + if om.Count() == 0 { + return true + } + + _, rest, _ := strings.Cut(name, ">>>") + email, _, _ := strings.Cut(rest, ">>>") + + user := &UserStat{Email: email} + om.ForEach(func(ip string, lastSeen int64) bool { + user.Ips = append(user.Ips, &OnlineIPEntry{ + Ip: ip, + LastSeen: lastSeen, + }) + return true + }) + if len(user.Ips) > 0 { + userMap[email] = user + } + return true + }) + + if request.IncludeTraffic { + for _, u := range userMap { + u.Traffic = &TrafficUserStat{} + } + const ( + prefixUser = "user>>>" + suffixUplink = ">>>traffic>>>uplink" + suffixDownlink = ">>>traffic>>>downlink" + ) + s.stats.VisitCounters(func(name string, c feature_stats.Counter) bool { + var email string + var isUplink bool + + if strings.HasSuffix(name, suffixUplink) { + email = name[len(prefixUser) : len(name)-len(suffixUplink)] + isUplink = true + } else if strings.HasSuffix(name, suffixDownlink) { + email = name[len(prefixUser) : len(name)-len(suffixDownlink)] + } else { + return true + } + + u, ok := userMap[email] + if !ok { + return true + } + + var value int64 + if request.Reset_ { + value = c.Set(0) + } else { + value = c.Value() + } + + if isUplink { + u.Traffic.Uplink = value + } else { + u.Traffic.Downlink = value + } + return true + }) + } + + resp := &GetUsersStatsResponse{} + for _, u := range userMap { + resp.Users = append(resp.Users, u) + } + + return resp, nil +} + func (s *statsServer) QueryStats(ctx context.Context, request *QueryStatsRequest) (*QueryStatsResponse, error) { matcher, err := strmatcher.Substr.New(request.Pattern) if err != nil { @@ -94,12 +170,7 @@ func (s *statsServer) QueryStats(ctx context.Context, request *QueryStatsRequest response := &QueryStatsResponse{} - manager, ok := s.stats.(*stats.Manager) - if !ok { - return nil, errors.New("QueryStats only works its own stats.Manager.") - } - - manager.VisitCounters(func(name string, c feature_stats.Counter) bool { + s.stats.VisitCounters(func(name string, c feature_stats.Counter) bool { if matcher.Match(name) { var value int64 if request.Reset_ { diff --git a/app/stats/command/command.pb.go b/app/stats/command/command.pb.go index 119cdda4fc1c..4320ff2295af 100644 --- a/app/stats/command/command.pb.go +++ b/app/stats/command/command.pb.go @@ -551,6 +551,266 @@ func (x *GetAllOnlineUsersResponse) GetUsers() []string { return nil } +type OnlineIPEntry struct { + state protoimpl.MessageState `protogen:"open.v1"` + Ip string `protobuf:"bytes,1,opt,name=ip,proto3" json:"ip,omitempty"` + LastSeen int64 `protobuf:"varint,2,opt,name=last_seen,json=lastSeen,proto3" json:"last_seen,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *OnlineIPEntry) Reset() { + *x = OnlineIPEntry{} + mi := &file_app_stats_command_command_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *OnlineIPEntry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*OnlineIPEntry) ProtoMessage() {} + +func (x *OnlineIPEntry) ProtoReflect() protoreflect.Message { + mi := &file_app_stats_command_command_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use OnlineIPEntry.ProtoReflect.Descriptor instead. +func (*OnlineIPEntry) Descriptor() ([]byte, []int) { + return file_app_stats_command_command_proto_rawDescGZIP(), []int{10} +} + +func (x *OnlineIPEntry) GetIp() string { + if x != nil { + return x.Ip + } + return "" +} + +func (x *OnlineIPEntry) GetLastSeen() int64 { + if x != nil { + return x.LastSeen + } + return 0 +} + +type TrafficUserStat struct { + state protoimpl.MessageState `protogen:"open.v1"` + Uplink int64 `protobuf:"varint,1,opt,name=uplink,proto3" json:"uplink,omitempty"` + Downlink int64 `protobuf:"varint,2,opt,name=downlink,proto3" json:"downlink,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TrafficUserStat) Reset() { + *x = TrafficUserStat{} + mi := &file_app_stats_command_command_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TrafficUserStat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TrafficUserStat) ProtoMessage() {} + +func (x *TrafficUserStat) ProtoReflect() protoreflect.Message { + mi := &file_app_stats_command_command_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TrafficUserStat.ProtoReflect.Descriptor instead. +func (*TrafficUserStat) Descriptor() ([]byte, []int) { + return file_app_stats_command_command_proto_rawDescGZIP(), []int{11} +} + +func (x *TrafficUserStat) GetUplink() int64 { + if x != nil { + return x.Uplink + } + return 0 +} + +func (x *TrafficUserStat) GetDownlink() int64 { + if x != nil { + return x.Downlink + } + return 0 +} + +type UserStat struct { + state protoimpl.MessageState `protogen:"open.v1"` + Email string `protobuf:"bytes,1,opt,name=email,proto3" json:"email,omitempty"` + Ips []*OnlineIPEntry `protobuf:"bytes,2,rep,name=ips,proto3" json:"ips,omitempty"` + Traffic *TrafficUserStat `protobuf:"bytes,3,opt,name=traffic,proto3" json:"traffic,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UserStat) Reset() { + *x = UserStat{} + mi := &file_app_stats_command_command_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UserStat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UserStat) ProtoMessage() {} + +func (x *UserStat) ProtoReflect() protoreflect.Message { + mi := &file_app_stats_command_command_proto_msgTypes[12] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UserStat.ProtoReflect.Descriptor instead. +func (*UserStat) Descriptor() ([]byte, []int) { + return file_app_stats_command_command_proto_rawDescGZIP(), []int{12} +} + +func (x *UserStat) GetEmail() string { + if x != nil { + return x.Email + } + return "" +} + +func (x *UserStat) GetIps() []*OnlineIPEntry { + if x != nil { + return x.Ips + } + return nil +} + +func (x *UserStat) GetTraffic() *TrafficUserStat { + if x != nil { + return x.Traffic + } + return nil +} + +type GetUsersStatsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + IncludeTraffic bool `protobuf:"varint,1,opt,name=include_traffic,json=includeTraffic,proto3" json:"include_traffic,omitempty"` + Reset_ bool `protobuf:"varint,2,opt,name=reset,proto3" json:"reset,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetUsersStatsRequest) Reset() { + *x = GetUsersStatsRequest{} + mi := &file_app_stats_command_command_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetUsersStatsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetUsersStatsRequest) ProtoMessage() {} + +func (x *GetUsersStatsRequest) ProtoReflect() protoreflect.Message { + mi := &file_app_stats_command_command_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetUsersStatsRequest.ProtoReflect.Descriptor instead. +func (*GetUsersStatsRequest) Descriptor() ([]byte, []int) { + return file_app_stats_command_command_proto_rawDescGZIP(), []int{13} +} + +func (x *GetUsersStatsRequest) GetIncludeTraffic() bool { + if x != nil { + return x.IncludeTraffic + } + return false +} + +func (x *GetUsersStatsRequest) GetReset_() bool { + if x != nil { + return x.Reset_ + } + return false +} + +type GetUsersStatsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Users []*UserStat `protobuf:"bytes,1,rep,name=users,proto3" json:"users,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetUsersStatsResponse) Reset() { + *x = GetUsersStatsResponse{} + mi := &file_app_stats_command_command_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetUsersStatsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetUsersStatsResponse) ProtoMessage() {} + +func (x *GetUsersStatsResponse) ProtoReflect() protoreflect.Message { + mi := &file_app_stats_command_command_proto_msgTypes[14] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetUsersStatsResponse.ProtoReflect.Descriptor instead. +func (*GetUsersStatsResponse) Descriptor() ([]byte, []int) { + return file_app_stats_command_command_proto_rawDescGZIP(), []int{14} +} + +func (x *GetUsersStatsResponse) GetUsers() []*UserStat { + if x != nil { + return x.Users + } + return nil +} + type Config struct { state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields @@ -559,7 +819,7 @@ type Config struct { func (x *Config) Reset() { *x = Config{} - mi := &file_app_stats_command_command_proto_msgTypes[10] + mi := &file_app_stats_command_command_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -571,7 +831,7 @@ func (x *Config) String() string { func (*Config) ProtoMessage() {} func (x *Config) ProtoReflect() protoreflect.Message { - mi := &file_app_stats_command_command_proto_msgTypes[10] + mi := &file_app_stats_command_command_proto_msgTypes[15] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -584,7 +844,7 @@ func (x *Config) ProtoReflect() protoreflect.Message { // Deprecated: Use Config.ProtoReflect.Descriptor instead. func (*Config) Descriptor() ([]byte, []int) { - return file_app_stats_command_command_proto_rawDescGZIP(), []int{10} + return file_app_stats_command_command_proto_rawDescGZIP(), []int{15} } var File_app_stats_command_command_proto protoreflect.FileDescriptor @@ -628,8 +888,23 @@ const file_app_stats_command_command_proto_rawDesc = "" + "\x05value\x18\x02 \x01(\x03R\x05value:\x028\x01\"\x1a\n" + "\x18GetAllOnlineUsersRequest\"1\n" + "\x19GetAllOnlineUsersResponse\x12\x14\n" + - "\x05users\x18\x01 \x03(\tR\x05users\"\b\n" + - "\x06Config2\x96\x05\n" + + "\x05users\x18\x01 \x03(\tR\x05users\"<\n" + + "\rOnlineIPEntry\x12\x0e\n" + + "\x02ip\x18\x01 \x01(\tR\x02ip\x12\x1b\n" + + "\tlast_seen\x18\x02 \x01(\x03R\blastSeen\"E\n" + + "\x0fTrafficUserStat\x12\x16\n" + + "\x06uplink\x18\x01 \x01(\x03R\x06uplink\x12\x1a\n" + + "\bdownlink\x18\x02 \x01(\x03R\bdownlink\"\x9c\x01\n" + + "\bUserStat\x12\x14\n" + + "\x05email\x18\x01 \x01(\tR\x05email\x127\n" + + "\x03ips\x18\x02 \x03(\v2%.xray.app.stats.command.OnlineIPEntryR\x03ips\x12A\n" + + "\atraffic\x18\x03 \x01(\v2'.xray.app.stats.command.TrafficUserStatR\atraffic\"U\n" + + "\x14GetUsersStatsRequest\x12'\n" + + "\x0finclude_traffic\x18\x01 \x01(\bR\x0eincludeTraffic\x12\x14\n" + + "\x05reset\x18\x02 \x01(\bR\x05reset\"O\n" + + "\x15GetUsersStatsResponse\x126\n" + + "\x05users\x18\x01 \x03(\v2 .xray.app.stats.command.UserStatR\x05users\"\b\n" + + "\x06Config2\x86\x06\n" + "\fStatsService\x12_\n" + "\bGetStats\x12'.xray.app.stats.command.GetStatsRequest\x1a(.xray.app.stats.command.GetStatsResponse\"\x00\x12e\n" + "\x0eGetStatsOnline\x12'.xray.app.stats.command.GetStatsRequest\x1a(.xray.app.stats.command.GetStatsResponse\"\x00\x12e\n" + @@ -637,7 +912,8 @@ const file_app_stats_command_command_proto_rawDesc = "" + "QueryStats\x12).xray.app.stats.command.QueryStatsRequest\x1a*.xray.app.stats.command.QueryStatsResponse\"\x00\x12b\n" + "\vGetSysStats\x12'.xray.app.stats.command.SysStatsRequest\x1a(.xray.app.stats.command.SysStatsResponse\"\x00\x12w\n" + "\x14GetStatsOnlineIpList\x12'.xray.app.stats.command.GetStatsRequest\x1a4.xray.app.stats.command.GetStatsOnlineIpListResponse\"\x00\x12z\n" + - "\x11GetAllOnlineUsers\x120.xray.app.stats.command.GetAllOnlineUsersRequest\x1a1.xray.app.stats.command.GetAllOnlineUsersResponse\"\x00Bd\n" + + "\x11GetAllOnlineUsers\x120.xray.app.stats.command.GetAllOnlineUsersRequest\x1a1.xray.app.stats.command.GetAllOnlineUsersResponse\"\x00\x12n\n" + + "\rGetUsersStats\x12,.xray.app.stats.command.GetUsersStatsRequest\x1a-.xray.app.stats.command.GetUsersStatsResponse\"\x00Bd\n" + "\x1acom.xray.app.stats.commandP\x01Z+github.com/xtls/xray-core/app/stats/command\xaa\x02\x16Xray.App.Stats.Commandb\x06proto3" var ( @@ -652,7 +928,7 @@ func file_app_stats_command_command_proto_rawDescGZIP() []byte { return file_app_stats_command_command_proto_rawDescData } -var file_app_stats_command_command_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_app_stats_command_command_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_app_stats_command_command_proto_goTypes = []any{ (*GetStatsRequest)(nil), // 0: xray.app.stats.command.GetStatsRequest (*Stat)(nil), // 1: xray.app.stats.command.Stat @@ -664,30 +940,40 @@ var file_app_stats_command_command_proto_goTypes = []any{ (*GetStatsOnlineIpListResponse)(nil), // 7: xray.app.stats.command.GetStatsOnlineIpListResponse (*GetAllOnlineUsersRequest)(nil), // 8: xray.app.stats.command.GetAllOnlineUsersRequest (*GetAllOnlineUsersResponse)(nil), // 9: xray.app.stats.command.GetAllOnlineUsersResponse - (*Config)(nil), // 10: xray.app.stats.command.Config - nil, // 11: xray.app.stats.command.GetStatsOnlineIpListResponse.IpsEntry + (*OnlineIPEntry)(nil), // 10: xray.app.stats.command.OnlineIPEntry + (*TrafficUserStat)(nil), // 11: xray.app.stats.command.TrafficUserStat + (*UserStat)(nil), // 12: xray.app.stats.command.UserStat + (*GetUsersStatsRequest)(nil), // 13: xray.app.stats.command.GetUsersStatsRequest + (*GetUsersStatsResponse)(nil), // 14: xray.app.stats.command.GetUsersStatsResponse + (*Config)(nil), // 15: xray.app.stats.command.Config + nil, // 16: xray.app.stats.command.GetStatsOnlineIpListResponse.IpsEntry } var file_app_stats_command_command_proto_depIdxs = []int32{ 1, // 0: xray.app.stats.command.GetStatsResponse.stat:type_name -> xray.app.stats.command.Stat 1, // 1: xray.app.stats.command.QueryStatsResponse.stat:type_name -> xray.app.stats.command.Stat - 11, // 2: xray.app.stats.command.GetStatsOnlineIpListResponse.ips:type_name -> xray.app.stats.command.GetStatsOnlineIpListResponse.IpsEntry - 0, // 3: xray.app.stats.command.StatsService.GetStats:input_type -> xray.app.stats.command.GetStatsRequest - 0, // 4: xray.app.stats.command.StatsService.GetStatsOnline:input_type -> xray.app.stats.command.GetStatsRequest - 3, // 5: xray.app.stats.command.StatsService.QueryStats:input_type -> xray.app.stats.command.QueryStatsRequest - 5, // 6: xray.app.stats.command.StatsService.GetSysStats:input_type -> xray.app.stats.command.SysStatsRequest - 0, // 7: xray.app.stats.command.StatsService.GetStatsOnlineIpList:input_type -> xray.app.stats.command.GetStatsRequest - 8, // 8: xray.app.stats.command.StatsService.GetAllOnlineUsers:input_type -> xray.app.stats.command.GetAllOnlineUsersRequest - 2, // 9: xray.app.stats.command.StatsService.GetStats:output_type -> xray.app.stats.command.GetStatsResponse - 2, // 10: xray.app.stats.command.StatsService.GetStatsOnline:output_type -> xray.app.stats.command.GetStatsResponse - 4, // 11: xray.app.stats.command.StatsService.QueryStats:output_type -> xray.app.stats.command.QueryStatsResponse - 6, // 12: xray.app.stats.command.StatsService.GetSysStats:output_type -> xray.app.stats.command.SysStatsResponse - 7, // 13: xray.app.stats.command.StatsService.GetStatsOnlineIpList:output_type -> xray.app.stats.command.GetStatsOnlineIpListResponse - 9, // 14: xray.app.stats.command.StatsService.GetAllOnlineUsers:output_type -> xray.app.stats.command.GetAllOnlineUsersResponse - 9, // [9:15] is the sub-list for method output_type - 3, // [3:9] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 16, // 2: xray.app.stats.command.GetStatsOnlineIpListResponse.ips:type_name -> xray.app.stats.command.GetStatsOnlineIpListResponse.IpsEntry + 10, // 3: xray.app.stats.command.UserStat.ips:type_name -> xray.app.stats.command.OnlineIPEntry + 11, // 4: xray.app.stats.command.UserStat.traffic:type_name -> xray.app.stats.command.TrafficUserStat + 12, // 5: xray.app.stats.command.GetUsersStatsResponse.users:type_name -> xray.app.stats.command.UserStat + 0, // 6: xray.app.stats.command.StatsService.GetStats:input_type -> xray.app.stats.command.GetStatsRequest + 0, // 7: xray.app.stats.command.StatsService.GetStatsOnline:input_type -> xray.app.stats.command.GetStatsRequest + 3, // 8: xray.app.stats.command.StatsService.QueryStats:input_type -> xray.app.stats.command.QueryStatsRequest + 5, // 9: xray.app.stats.command.StatsService.GetSysStats:input_type -> xray.app.stats.command.SysStatsRequest + 0, // 10: xray.app.stats.command.StatsService.GetStatsOnlineIpList:input_type -> xray.app.stats.command.GetStatsRequest + 8, // 11: xray.app.stats.command.StatsService.GetAllOnlineUsers:input_type -> xray.app.stats.command.GetAllOnlineUsersRequest + 13, // 12: xray.app.stats.command.StatsService.GetUsersStats:input_type -> xray.app.stats.command.GetUsersStatsRequest + 2, // 13: xray.app.stats.command.StatsService.GetStats:output_type -> xray.app.stats.command.GetStatsResponse + 2, // 14: xray.app.stats.command.StatsService.GetStatsOnline:output_type -> xray.app.stats.command.GetStatsResponse + 4, // 15: xray.app.stats.command.StatsService.QueryStats:output_type -> xray.app.stats.command.QueryStatsResponse + 6, // 16: xray.app.stats.command.StatsService.GetSysStats:output_type -> xray.app.stats.command.SysStatsResponse + 7, // 17: xray.app.stats.command.StatsService.GetStatsOnlineIpList:output_type -> xray.app.stats.command.GetStatsOnlineIpListResponse + 9, // 18: xray.app.stats.command.StatsService.GetAllOnlineUsers:output_type -> xray.app.stats.command.GetAllOnlineUsersResponse + 14, // 19: xray.app.stats.command.StatsService.GetUsersStats:output_type -> xray.app.stats.command.GetUsersStatsResponse + 13, // [13:20] is the sub-list for method output_type + 6, // [6:13] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_app_stats_command_command_proto_init() } @@ -701,7 +987,7 @@ func file_app_stats_command_command_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_app_stats_command_command_proto_rawDesc), len(file_app_stats_command_command_proto_rawDesc)), NumEnums: 0, - NumMessages: 12, + NumMessages: 17, NumExtensions: 0, NumServices: 1, }, diff --git a/app/stats/command/command.proto b/app/stats/command/command.proto index 9c3f9131d087..c2f7405414ec 100644 --- a/app/stats/command/command.proto +++ b/app/stats/command/command.proto @@ -57,6 +57,31 @@ message GetAllOnlineUsersResponse { repeated string users = 1; } +message OnlineIPEntry { + string ip = 1; + int64 last_seen = 2; +} + +message TrafficUserStat { + int64 uplink = 1; + int64 downlink = 2; +} + +message UserStat { + string email = 1; + repeated OnlineIPEntry ips = 2; + TrafficUserStat traffic = 3; +} + +message GetUsersStatsRequest { + bool include_traffic = 1; + bool reset = 2; +} + +message GetUsersStatsResponse { + repeated UserStat users = 1; +} + service StatsService { rpc GetStats(GetStatsRequest) returns (GetStatsResponse) {} rpc GetStatsOnline(GetStatsRequest) returns (GetStatsResponse) {} @@ -64,6 +89,7 @@ service StatsService { rpc GetSysStats(SysStatsRequest) returns (SysStatsResponse) {} rpc GetStatsOnlineIpList(GetStatsRequest) returns (GetStatsOnlineIpListResponse) {} rpc GetAllOnlineUsers(GetAllOnlineUsersRequest) returns (GetAllOnlineUsersResponse) {} + rpc GetUsersStats(GetUsersStatsRequest) returns (GetUsersStatsResponse) {} } message Config {} diff --git a/app/stats/command/command_grpc.pb.go b/app/stats/command/command_grpc.pb.go index 1fd039b8f02e..0401d14cea76 100644 --- a/app/stats/command/command_grpc.pb.go +++ b/app/stats/command/command_grpc.pb.go @@ -25,6 +25,7 @@ const ( StatsService_GetSysStats_FullMethodName = "/xray.app.stats.command.StatsService/GetSysStats" StatsService_GetStatsOnlineIpList_FullMethodName = "/xray.app.stats.command.StatsService/GetStatsOnlineIpList" StatsService_GetAllOnlineUsers_FullMethodName = "/xray.app.stats.command.StatsService/GetAllOnlineUsers" + StatsService_GetUsersStats_FullMethodName = "/xray.app.stats.command.StatsService/GetUsersStats" ) // StatsServiceClient is the client API for StatsService service. @@ -37,6 +38,7 @@ type StatsServiceClient interface { GetSysStats(ctx context.Context, in *SysStatsRequest, opts ...grpc.CallOption) (*SysStatsResponse, error) GetStatsOnlineIpList(ctx context.Context, in *GetStatsRequest, opts ...grpc.CallOption) (*GetStatsOnlineIpListResponse, error) GetAllOnlineUsers(ctx context.Context, in *GetAllOnlineUsersRequest, opts ...grpc.CallOption) (*GetAllOnlineUsersResponse, error) + GetUsersStats(ctx context.Context, in *GetUsersStatsRequest, opts ...grpc.CallOption) (*GetUsersStatsResponse, error) } type statsServiceClient struct { @@ -107,6 +109,16 @@ func (c *statsServiceClient) GetAllOnlineUsers(ctx context.Context, in *GetAllOn return out, nil } +func (c *statsServiceClient) GetUsersStats(ctx context.Context, in *GetUsersStatsRequest, opts ...grpc.CallOption) (*GetUsersStatsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetUsersStatsResponse) + err := c.cc.Invoke(ctx, StatsService_GetUsersStats_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // StatsServiceServer is the server API for StatsService service. // All implementations must embed UnimplementedStatsServiceServer // for forward compatibility. @@ -117,6 +129,7 @@ type StatsServiceServer interface { GetSysStats(context.Context, *SysStatsRequest) (*SysStatsResponse, error) GetStatsOnlineIpList(context.Context, *GetStatsRequest) (*GetStatsOnlineIpListResponse, error) GetAllOnlineUsers(context.Context, *GetAllOnlineUsersRequest) (*GetAllOnlineUsersResponse, error) + GetUsersStats(context.Context, *GetUsersStatsRequest) (*GetUsersStatsResponse, error) mustEmbedUnimplementedStatsServiceServer() } @@ -145,6 +158,9 @@ func (UnimplementedStatsServiceServer) GetStatsOnlineIpList(context.Context, *Ge func (UnimplementedStatsServiceServer) GetAllOnlineUsers(context.Context, *GetAllOnlineUsersRequest) (*GetAllOnlineUsersResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetAllOnlineUsers not implemented") } +func (UnimplementedStatsServiceServer) GetUsersStats(context.Context, *GetUsersStatsRequest) (*GetUsersStatsResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetUsersStats not implemented") +} func (UnimplementedStatsServiceServer) mustEmbedUnimplementedStatsServiceServer() {} func (UnimplementedStatsServiceServer) testEmbeddedByValue() {} @@ -274,6 +290,24 @@ func _StatsService_GetAllOnlineUsers_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } +func _StatsService_GetUsersStats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetUsersStatsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StatsServiceServer).GetUsersStats(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: StatsService_GetUsersStats_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StatsServiceServer).GetUsersStats(ctx, req.(*GetUsersStatsRequest)) + } + return interceptor(ctx, in, info, handler) +} + // StatsService_ServiceDesc is the grpc.ServiceDesc for StatsService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -305,6 +339,10 @@ var StatsService_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetAllOnlineUsers", Handler: _StatsService_GetAllOnlineUsers_Handler, }, + { + MethodName: "GetUsersStats", + Handler: _StatsService_GetUsersStats_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "app/stats/command/command.proto", diff --git a/app/stats/online_map.go b/app/stats/online_map.go index cba625983cc9..575cd38c6aee 100644 --- a/app/stats/online_map.go +++ b/app/stats/online_map.go @@ -13,14 +13,14 @@ const ( type ipEntry struct { refCount int - lastSeen time.Time + lastSeen int64 } // OnlineMap is a refcount-based implementation of stats.OnlineMap. // IPs are tracked by reference counting: AddIP increments, RemoveIP decrements. // An IP is removed from the map when its reference count reaches zero. type OnlineMap struct { - entries map[string]*ipEntry + entries map[string]ipEntry access sync.Mutex count atomic.Int64 } @@ -28,7 +28,7 @@ type OnlineMap struct { // NewOnlineMap creates a new OnlineMap instance. func NewOnlineMap() *OnlineMap { return &OnlineMap{ - entries: make(map[string]*ipEntry), + entries: make(map[string]ipEntry), } } @@ -37,17 +37,17 @@ func (om *OnlineMap) AddIP(ip string) { if ip == localhostIPv4 || ip == localhostIPv6 { return } - + now := time.Now().Unix() om.access.Lock() defer om.access.Unlock() - if e, ok := om.entries[ip]; ok { e.refCount++ - e.lastSeen = time.Now() + e.lastSeen = now + om.entries[ip] = e } else { - om.entries[ip] = &ipEntry{ + om.entries[ip] = ipEntry{ refCount: 1, - lastSeen: time.Now(), + lastSeen: now, } om.count.Add(1) } @@ -57,7 +57,6 @@ func (om *OnlineMap) AddIP(ip string) { func (om *OnlineMap) RemoveIP(ip string) { om.access.Lock() defer om.access.Unlock() - e, ok := om.entries[ip] if !ok { return @@ -66,6 +65,8 @@ func (om *OnlineMap) RemoveIP(ip string) { if e.refCount <= 0 { delete(om.entries, ip) om.count.Add(-1) + } else { + om.entries[ip] = e } } @@ -74,26 +75,13 @@ func (om *OnlineMap) Count() int { return int(om.count.Load()) } -// List implements stats.OnlineMap. -func (om *OnlineMap) List() []string { - om.access.Lock() - defer om.access.Unlock() - - keys := make([]string, 0, len(om.entries)) - for ip := range om.entries { - keys = append(keys, ip) - } - return keys -} - -// IPTimeMap implements stats.OnlineMap. -func (om *OnlineMap) IPTimeMap() map[string]time.Time { +// ForEach calls fn for each online IP. If fn returns false, iteration stops. +func (om *OnlineMap) ForEach(fn func(string, int64) bool) { om.access.Lock() defer om.access.Unlock() - - result := make(map[string]time.Time, len(om.entries)) for ip, e := range om.entries { - result[ip] = e.lastSeen + if !fn(ip, e.lastSeen) { + break + } } - return result } diff --git a/app/stats/stats.go b/app/stats/stats.go index fc12fa23a07c..68fb9be18b8f 100644 --- a/app/stats/stats.go +++ b/app/stats/stats.go @@ -11,19 +11,19 @@ import ( // Manager is an implementation of stats.Manager. type Manager struct { - access sync.RWMutex - counters map[string]*Counter - onlineMap map[string]*OnlineMap - channels map[string]*Channel - running bool + access sync.RWMutex + counters map[string]*Counter + onlineMaps map[string]*OnlineMap + channels map[string]*Channel + running bool } // NewManager creates an instance of Statistics Manager. func NewManager(ctx context.Context, config *Config) (*Manager, error) { m := &Manager{ - counters: make(map[string]*Counter), - onlineMap: make(map[string]*OnlineMap), - channels: make(map[string]*Channel), + counters: make(map[string]*Counter), + onlineMaps: make(map[string]*OnlineMap), + channels: make(map[string]*Channel), } return m, nil @@ -88,12 +88,12 @@ func (m *Manager) RegisterOnlineMap(name string) (stats.OnlineMap, error) { m.access.Lock() defer m.access.Unlock() - if _, found := m.onlineMap[name]; found { - return nil, errors.New("onlineMap ", name, " already registered.") + if _, found := m.onlineMaps[name]; found { + return nil, errors.New("OnlineMap ", name, " already registered.") } - errors.LogDebug(context.Background(), "create new onlineMap ", name) + errors.LogDebug(context.Background(), "create new OnlineMap ", name) om := NewOnlineMap() - m.onlineMap[name] = om + m.onlineMaps[name] = om return om, nil } @@ -102,9 +102,9 @@ func (m *Manager) UnregisterOnlineMap(name string) error { m.access.Lock() defer m.access.Unlock() - if _, found := m.onlineMap[name]; found { - errors.LogDebug(context.Background(), "remove onlineMap ", name) - delete(m.onlineMap, name) + if _, found := m.onlineMaps[name]; found { + errors.LogDebug(context.Background(), "remove OnlineMap ", name) + delete(m.onlineMaps, name) } return nil } @@ -114,12 +114,24 @@ func (m *Manager) GetOnlineMap(name string) stats.OnlineMap { m.access.RLock() defer m.access.RUnlock() - if om, found := m.onlineMap[name]; found { + if om, found := m.onlineMaps[name]; found { return om } return nil } +// VisitOnlineMaps calls visitor function on all managed online maps. +// The visitor runs under a read lock; it must not call RegisterOnlineMap or UnregisterOnlineMap (would deadlock). +func (m *Manager) VisitOnlineMaps(visitor func(string, stats.OnlineMap) bool) { + m.access.RLock() + defer m.access.RUnlock() + for name, om := range m.onlineMaps { + if !visitor(name, om) { + break + } + } +} + // RegisterChannel implements stats.Manager. func (m *Manager) RegisterChannel(name string) (stats.Channel, error) { m.access.Lock() @@ -166,9 +178,9 @@ func (m *Manager) GetAllOnlineUsers() []string { m.access.RLock() defer m.access.RUnlock() - usersOnline := make([]string, 0, len(m.onlineMap)) - for user, onlineMap := range m.onlineMap { - if onlineMap.Count() > 0 { + usersOnline := make([]string, 0, len(m.onlineMaps)) + for user, om := range m.onlineMaps { + if om.Count() > 0 { usersOnline = append(usersOnline, user) } } @@ -198,6 +210,10 @@ func (m *Manager) Close() error { m.access.Lock() defer m.access.Unlock() m.running = false + for name := range m.onlineMaps { + errors.LogDebug(context.Background(), "remove OnlineMap ", name) + delete(m.onlineMaps, name) + } errs := []error{} for name, channel := range m.channels { errors.LogDebug(context.Background(), "remove channel ", name) diff --git a/features/stats/stats.go b/features/stats/stats.go index abea7459f749..1d490c10b537 100644 --- a/features/stats/stats.go +++ b/features/stats/stats.go @@ -2,7 +2,6 @@ package stats import ( "context" - "time" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/errors" @@ -21,7 +20,7 @@ type Counter interface { Add(int64) int64 } -// OnlineMap is the interface for stats. +// OnlineMap is the interface for tracking online IP addresses. // // xray:api:stable type OnlineMap interface { @@ -31,10 +30,10 @@ type OnlineMap interface { AddIP(string) // RemoveIP decrements the reference count for the given IP. Deletes at zero. RemoveIP(string) - // List returns all currently online IPs. - List() []string - // IPTimeMap returns a snapshot copy of IPs to their last-seen times. - IPTimeMap() map[string]time.Time + // ForEach calls fn for each online IP with its last-seen Unix timestamp. + // If fn returns false, iteration stops. + // The callback must not call AddIP/RemoveIP on the same OnlineMap (would deadlock). + ForEach(func(string, int64) bool) } // Channel is the interface for stats channel. @@ -86,13 +85,19 @@ type Manager interface { UnregisterCounter(string) error // GetCounter returns a counter by its identifier. GetCounter(string) Counter + // VisitCounters calls visitor on all managed counters. + // The visitor runs under a read lock; it must not call RegisterCounter or UnregisterCounter (would deadlock). + VisitCounters(func(string, Counter) bool) - // RegisterOnlineMap registers a new onlinemap to the manager. The identifier string must not be empty, and unique among other onlinemaps. + // RegisterOnlineMap registers a new OnlineMap to the manager. The identifier string must not be empty, and unique among other OnlineMaps. RegisterOnlineMap(string) (OnlineMap, error) - // UnregisterOnlineMap unregisters a onlinemap from the manager by its identifier. + // UnregisterOnlineMap unregisters an OnlineMap from the manager by its identifier. UnregisterOnlineMap(string) error - // GetOnlineMap returns a onlinemap by its identifier. + // GetOnlineMap returns an OnlineMap by its identifier. GetOnlineMap(string) OnlineMap + // VisitOnlineMaps calls visitor on all managed online maps. + // The visitor runs under a read lock; it must not call RegisterOnlineMap or UnregisterOnlineMap (would deadlock). + VisitOnlineMaps(func(string, OnlineMap) bool) // RegisterChannel registers a new channel to the manager. The identifier string must not be empty, and unique among other channels. RegisterChannel(string) (Channel, error) @@ -115,7 +120,7 @@ func GetOrRegisterCounter(m Manager, name string) (Counter, error) { return m.RegisterCounter(name) } -// GetOrRegisterOnlineMap tries to get the OnlineMap first. If not exist, it then tries to create a new onlinemap. +// GetOrRegisterOnlineMap tries to get the OnlineMap first. If not exist, it then tries to create a new OnlineMap. func GetOrRegisterOnlineMap(m Manager, name string) (OnlineMap, error) { onlineMap := m.GetOnlineMap(name) if onlineMap != nil { @@ -142,7 +147,7 @@ func ManagerType() interface{} { return (*Manager)(nil) } -// NoopManager is an implementation of Manager, which doesn't has actual functionalities. +// NoopManager is an implementation of Manager, which doesn't have actual functionality. type NoopManager struct{} // Type implements common.HasType. @@ -165,6 +170,9 @@ func (NoopManager) GetCounter(string) Counter { return nil } +// VisitCounters implements Manager. +func (NoopManager) VisitCounters(func(string, Counter) bool) {} + // RegisterOnlineMap implements Manager. func (NoopManager) RegisterOnlineMap(string) (OnlineMap, error) { return nil, errors.New("not implemented") @@ -180,6 +188,9 @@ func (NoopManager) GetOnlineMap(string) OnlineMap { return nil } +// VisitOnlineMaps implements Manager. +func (NoopManager) VisitOnlineMaps(func(string, OnlineMap) bool) {} + // RegisterChannel implements Manager. func (NoopManager) RegisterChannel(string) (Channel, error) { return nil, errors.New("not implemented") diff --git a/main/commands/all/api/stats_online_ip_list.go b/main/commands/all/api/stats_online_ip_list.go index 74e066f99e94..69c8bedd17e9 100644 --- a/main/commands/all/api/stats_online_ip_list.go +++ b/main/commands/all/api/stats_online_ip_list.go @@ -7,10 +7,11 @@ import ( var cmdOnlineStatsIpList = &base.Command{ CustomFlags: true, - UsageLine: "{{.Exec}} api statsonlineiplist [--server=127.0.0.1:8080] [-email '']", + UsageLine: "{{.Exec}} api statsonlineiplist [--server=127.0.0.1:8080] [-email '' | -all [-include-traffic] [-reset]]", Short: "Retrieve a user's online IP addresses and access times", Long: ` Retrieve the online IP addresses and corresponding access timestamps for a user from Xray. +Use -all to retrieve all online users with their IPs and timestamps. Arguments: @@ -23,9 +24,20 @@ Arguments: -email The user's email address. + -all + Retrieve all online users with their IPs and timestamps. + + -include-traffic + Include traffic statistics when using -all. + + -reset + Reset traffic counters after fetching. Only with -all and -include-traffic. + Example: {{.Exec}} {{.LongName}} --server=127.0.0.1:8080 -email "xray@love.com" + {{.Exec}} {{.LongName}} --server=127.0.0.1:8080 -all + {{.Exec}} {{.LongName}} --server=127.0.0.1:8080 -all -include-traffic `, Run: executeOnlineStatsIpList, } @@ -33,12 +45,35 @@ Example: func executeOnlineStatsIpList(cmd *base.Command, args []string) { setSharedFlags(cmd) email := cmd.Flag.String("email", "", "") + all := cmd.Flag.Bool("all", false, "") + includeTraffic := cmd.Flag.Bool("include-traffic", false, "") + reset := cmd.Flag.Bool("reset", false, "") cmd.Flag.Parse(args) - statName := "user>>>" + *email + ">>>online" + if *all && *email != "" { + base.Fatalf("-all and -email are mutually exclusive") + } + if !*all && *email == "" { + base.Fatalf("either -all or -email must be specified") + } conn, ctx, close := dialAPIServer() defer close() client := statsService.NewStatsServiceClient(conn) + + if *all { + r := &statsService.GetUsersStatsRequest{ + IncludeTraffic: *includeTraffic, + Reset_: *reset, + } + resp, err := client.GetUsersStats(ctx, r) + if err != nil { + base.Fatalf("failed to get stats: %s", err) + } + showJSONResponse(resp) + return + } + + statName := "user>>>" + *email + ">>>online" r := &statsService.GetStatsRequest{ Name: statName, Reset_: false,