diff --git a/Directory.Packages.props b/Directory.Packages.props index 010c130c2a8..e5a187cc825 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -25,6 +25,8 @@ + + diff --git a/libs/host/Configuration/Options.cs b/libs/host/Configuration/Options.cs index 03607922117..8926df821b1 100644 --- a/libs/host/Configuration/Options.cs +++ b/libs/host/Configuration/Options.cs @@ -21,6 +21,7 @@ using Garnet.server.Auth.Settings; using Garnet.server.TLS; using Microsoft.Extensions.Logging; +using OpenTelemetry.Exporter; using Tsavorite.core; using Tsavorite.devices; @@ -363,6 +364,20 @@ internal sealed class Options : ICloneable [Option("metrics-sampling-freq", Required = false, HelpText = "Metrics sampling frequency in seconds. Value of 0 disables metrics monitor task.")] public int MetricsSamplingFrequency { get; set; } + [Option("opentelemetry-endpoint", Required = false, HelpText = "The endpoint to which OpenTelemetry metrics will be exported. If null, OpenTelemetry metrics will not be exported.")] + public Uri OpenTelemetryEndpoint { get; set; } + + [IntRangeValidation(0, int.MaxValue, isRequired: false)] + [Option("opentelemetry-export-interval", Required = false, HelpText = "The interval in milliseconds to export OpenTelemetry metrics. If 0, the default interval of 60 seconds will be used.")] + public int OpenTelemetryExportInterval { get; set; } + + [Option("opentelemetry-export-protocol", Required = false, HelpText = "The protocol to use when exporting OpenTelemetry metrics. Value options: Grpc, HttpProtobuf. If null, the default protocol will be used.")] + public OtlpExportProtocol? OpenTelemetryExportProtocol { get; set; } + + [IntRangeValidation(0, int.MaxValue, isRequired: false)] + [Option("opentelemetry-export-timeout", Required = false, HelpText = "The timeout in milliseconds when exporting OpenTelemetry metrics. If 0, the default timeout of 10 seconds will be used.")] + public int OpenTelemetryExportTimeout { get; set; } + [OptionValidation] [Option('q', Required = false, HelpText = "Enabling quiet mode does not print server version and text art.")] public bool? QuietMode { get; set; } @@ -914,6 +929,10 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null) SlowLogThreshold = SlowLogThreshold, SlowLogMaxEntries = SlowLogMaxEntries, MetricsSamplingFrequency = MetricsSamplingFrequency, + OpenTelemetryEndpoint = OpenTelemetryEndpoint, + OpenTelemetryExportInterval = OpenTelemetryExportInterval, + OpenTelemetryExportProtocol = OpenTelemetryExportProtocol, + OpenTelemetryExportTimeout = OpenTelemetryExportTimeout, LogLevel = LogLevel, LoggingFrequency = LoggingFrequency, QuietMode = QuietMode.GetValueOrDefault(), diff --git a/libs/host/defaults.conf b/libs/host/defaults.conf index a4519d58816..9bc055fbfae 100644 --- a/libs/host/defaults.conf +++ b/libs/host/defaults.conf @@ -456,5 +456,16 @@ "ClusterReplicaResumeWithData": false, /* Enable Vector Sets (preview) - this feature (and associated commands) are incomplete, unstable, and subject to change while still in preview */ - "EnableVectorSetPreview": false + "EnableVectorSetPreview": false, + + /* Disable OpenTelemetry metrics reporting by default */ + "OpenTelemetryEndpoint": null, + + /* Use default export interval from OpenTelemetry SDK */ + "OpenTelemetryExportInterval": 0, + + /* Use default export protocol from OpenTelemetry SDK */ + "OpenTelemetryExportProtocol": null, + /* Use default export timeout from OpenTelemetry SDK */ + "OpenTelemetryExportTimeout": 0 } \ No newline at end of file diff --git a/libs/server/Garnet.server.csproj b/libs/server/Garnet.server.csproj index dc679f37e8f..a6270b635ff 100644 --- a/libs/server/Garnet.server.csproj +++ b/libs/server/Garnet.server.csproj @@ -19,6 +19,7 @@ + diff --git a/libs/server/Metrics/GarnetOpenTelemetryServerMetrics.cs b/libs/server/Metrics/GarnetOpenTelemetryServerMetrics.cs new file mode 100644 index 00000000000..56a34769d07 --- /dev/null +++ b/libs/server/Metrics/GarnetOpenTelemetryServerMetrics.cs @@ -0,0 +1,60 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Diagnostics.Metrics; + +namespace Garnet.server.Metrics +{ + /// + /// Provides OpenTelemetry-compatible metrics for Garnet server using . + /// Consumers can subscribe to these metrics using the OpenTelemetry SDK or any other . + /// The command-rate and network rates are not exposed as metrics as they can be calculated based on the other exposed metrics. + /// + internal sealed class GarnetOpenTelemetryServerMetrics : IDisposable + { + /// + /// The meter name used by Garnet server metrics. + /// + public const string MeterName = "Microsoft.Garnet.Server"; + + private readonly Meter meter; + + /// + /// Initializes a new instance of the class, + /// creating observable instruments that expose server connection metrics via a . + /// + /// + /// The instance whose connection counters + /// (active, received, and disposed) are observed by the created instruments. + /// + internal GarnetOpenTelemetryServerMetrics(GarnetServerMetrics serverMetrics) + { + meter = new Meter(MeterName); + + meter.CreateObservableGauge( + "garnet.server.connections.active", + () => serverMetrics.total_connections_active, + unit: "{connection}", + description: "Number of currently active client connections."); + + meter.CreateObservableCounter( + "garnet.server.connections.received", + () => serverMetrics.total_connections_received, + unit: "{connection}", + description: "Total number of client connections received."); + + meter.CreateObservableCounter( + "garnet.server.connections.disposed", + () => serverMetrics.total_connections_disposed, + unit: "{connection}", + description: "Total number of client connections disposed."); + } + + /// + public void Dispose() + { + meter.Dispose(); + } + } +} \ No newline at end of file diff --git a/libs/server/Metrics/GarnetOpenTelemetryServerMonitor.cs b/libs/server/Metrics/GarnetOpenTelemetryServerMonitor.cs new file mode 100644 index 00000000000..9d3c5436415 --- /dev/null +++ b/libs/server/Metrics/GarnetOpenTelemetryServerMonitor.cs @@ -0,0 +1,117 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Reflection; +using Garnet.server.Metrics.Latency; +using OpenTelemetry; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; + +namespace Garnet.server.Metrics +{ + /// + /// Registers OpenTelemetry metrics for Garnet server and manages their lifecycle. This includes server-level metrics, session-level metrics, and latency metrics. + /// + /// + /// + /// This class acts as the central coordinator for all OpenTelemetry metrics in the Garnet server. + /// It wraps the raw metrics sources ( and ) + /// into OpenTelemetry-compatible instruments exposed via instances: + /// + /// + /// — connection-level metrics (active, received, disposed). + /// — session-level metrics (commands processed, network I/O, cache lookups). + /// — latency histograms and counters (command latency, bytes/ops per receive call). + /// + /// + /// Call after construction to configure the OTLP exporter and begin metric collection. + /// The exporter endpoint, protocol, timeout, and interval are controlled by the corresponding + /// properties on . + /// + /// + /// This class implements ; disposing it tears down all underlying meters + /// and the latency metrics singleton. + /// + /// + internal sealed class GarnetOpenTelemetryServerMonitor : IDisposable + { + private readonly GarnetServerOptions options; + private readonly GarnetOpenTelemetryServerMetrics serverMetrics; + private readonly GarnetOpenTelemetrySessionMetrics sessionMetrics; + private MeterProvider meterProvider; + + /// + /// Initializes a new instance of the class, + /// creating the OpenTelemetry metric wrappers for server and session metrics and initializing + /// the latency metrics singleton. + /// + /// + /// The that control OpenTelemetry export behavior, including + /// , + /// , + /// and + /// + /// + /// + /// The instance that provides raw server and session counters. + /// If is null, session-level + /// metrics will not be registered. + /// + public GarnetOpenTelemetryServerMonitor(GarnetServerOptions options, GarnetServerMetrics serverMetrics) + { + this.options = options; + this.serverMetrics = new GarnetOpenTelemetryServerMetrics(serverMetrics); + this.sessionMetrics = serverMetrics.globalSessionMetrics != null + ? new GarnetOpenTelemetrySessionMetrics(serverMetrics.globalSessionMetrics) + : null; + + GarnetOpenTelemetryLatencyMetrics.Initialize(options.LatencyMonitor); + } + + /// + /// Initializes and configures OpenTelemetry metrics exporting if an endpoint is specified in the options. + /// + /// Call this method to enable OpenTelemetry metrics collection and exporting for the + /// service. Metrics will be exported using the configured endpoint and protocol. If no endpoint is specified, + /// metrics exporting will not be enabled. + public void Start() + { + if (this.options.OpenTelemetryEndpoint != null) + { + this.meterProvider = Sdk.CreateMeterProviderBuilder() + .ConfigureResource(rb => rb.AddService("Microsoft.Garnet", serviceVersion: Assembly.GetEntryAssembly()?.GetName()?.Version?.ToString() ?? "unknown")) + .AddMeter(GarnetOpenTelemetryServerMetrics.MeterName, GarnetOpenTelemetrySessionMetrics.MeterName, GarnetOpenTelemetryLatencyMetrics.MeterName) + .AddOtlpExporter(opts => + { + opts.Endpoint = this.options.OpenTelemetryEndpoint; + + if (this.options.OpenTelemetryExportProtocol.HasValue) + { + opts.Protocol = this.options.OpenTelemetryExportProtocol.Value; + } + + if (this.options.OpenTelemetryExportTimeout != 0) + { + opts.TimeoutMilliseconds = this.options.OpenTelemetryExportTimeout; + } + + if (this.options.OpenTelemetryExportInterval != 0) + { + opts.BatchExportProcessorOptions.ScheduledDelayMilliseconds = this.options.OpenTelemetryExportInterval; + } + }) + .Build(); + } + } + + /// + public void Dispose() + { + this.serverMetrics.Dispose(); + this.sessionMetrics?.Dispose(); + GarnetOpenTelemetryLatencyMetrics.DisposeInstance(); + this.meterProvider?.Dispose(); + } + } +} \ No newline at end of file diff --git a/libs/server/Metrics/GarnetOpenTelemetrySessionMetrics.cs b/libs/server/Metrics/GarnetOpenTelemetrySessionMetrics.cs new file mode 100644 index 00000000000..9e350467de1 --- /dev/null +++ b/libs/server/Metrics/GarnetOpenTelemetrySessionMetrics.cs @@ -0,0 +1,127 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Diagnostics.Metrics; + +namespace Garnet.server.Metrics +{ + /// + /// Exposes Garnet server session metrics as OpenTelemetry instruments using . + /// Registers observable counters and gauges that report command processing, network I/O, + /// cache lookup, and session exception statistics from a instance. + /// + internal sealed class GarnetOpenTelemetrySessionMetrics : IDisposable + { + /// + /// The meter name used by Garnet session metrics. + /// + public const string MeterName = "Microsoft.Garnet.Server.Session"; + + /// + /// The instance used to create and manage OpenTelemetry instruments + /// for session-level metrics. + /// + private readonly Meter meter; + + /// + /// Initializes a new instance of the class, + /// creating observable counters and gauges that report session-level statistics from the + /// specified instance. + /// + /// + /// The instance that supplies the aggregated session statistics. + /// Must not be . + /// + /// + /// Thrown when is . + /// + internal GarnetOpenTelemetrySessionMetrics(GarnetSessionMetrics globalSessionMetrics) + { + if (globalSessionMetrics == null) + { + throw new ArgumentNullException(nameof(globalSessionMetrics)); + } + + meter = new Meter(MeterName); + + meter.CreateObservableCounter( + "garnet.server.commands.processed", + () => Convert.ToInt64(globalSessionMetrics.get_total_commands_processed()), + unit: "{command}", + description: "Total number of commands processed."); + + meter.CreateObservableCounter( + "garnet.server.transaction.commands.received", + () => Convert.ToInt64(globalSessionMetrics.get_total_transaction_commands_received()), + unit: "{command}", + description: "Total number of transaction commands received."); + + meter.CreateObservableCounter( + "garnet.server.transaction.commands.failed", + () => Convert.ToInt64(globalSessionMetrics.get_total_transaction_commands_execution_failed()), + unit: "{command}", + description: "Total number of transaction command executions that failed."); + + meter.CreateObservableCounter( + "garnet.server.write.commands.processed", + () => Convert.ToInt64(globalSessionMetrics.get_total_write_commands_processed()), + unit: "{command}", + description: "Total number of write commands processed."); + + meter.CreateObservableCounter( + "garnet.server.read.commands.processed", + () => Convert.ToInt64(globalSessionMetrics.get_total_read_commands_processed()), + unit: "{command}", + description: "Total number of read commands processed."); + + meter.CreateObservableCounter( + "garnet.server.cluster.commands.processed", + () => Convert.ToInt64(globalSessionMetrics.get_total_cluster_commands_processed()), + unit: "{command}", + description: "Total number of cluster commands processed."); + + meter.CreateObservableCounter( + "garnet.server.network.bytes.received", + () => Convert.ToInt64(globalSessionMetrics.get_total_net_input_bytes()), + unit: "By", + description: "Total number of bytes received from the network."); + + meter.CreateObservableCounter( + "garnet.server.network.bytes.sent", + () => Convert.ToInt64(globalSessionMetrics.get_total_net_output_bytes()), + unit: "By", + description: "Total number of bytes sent to the network."); + + meter.CreateObservableCounter( + "garnet.server.cache.lookups", + () => Convert.ToInt64(globalSessionMetrics.get_total_found()) + Convert.ToInt64(globalSessionMetrics.get_total_notfound()), + unit: "{lookup}", + description: "Total number of cache lookups."); + + meter.CreateObservableCounter( + "garnet.server.cache.lookups.missed", + () => Convert.ToInt64(globalSessionMetrics.get_total_notfound()), + unit: "{miss}", + description: "Total number of cache misses (unsuccessful key lookups)."); + + meter.CreateObservableGauge( + "garnet.server.operations.pending", + () => Convert.ToInt64(globalSessionMetrics.get_total_pending()), + unit: "{operation}", + description: "Current number of pending operations."); + + meter.CreateObservableCounter( + "garnet.server.resp.session.exceptions", + () => Convert.ToInt64(globalSessionMetrics.get_total_number_resp_server_session_exceptions()), + unit: "{exception}", + description: "Total number of RESP server session exceptions."); + } + + /// + public void Dispose() + { + meter.Dispose(); + } + } +} \ No newline at end of file diff --git a/libs/server/Metrics/Info/GarnetInfoMetrics.cs b/libs/server/Metrics/Info/GarnetInfoMetrics.cs index 8d407d400ae..1e6e8c23cb3 100644 --- a/libs/server/Metrics/Info/GarnetInfoMetrics.cs +++ b/libs/server/Metrics/Info/GarnetInfoMetrics.cs @@ -56,6 +56,8 @@ private void PopulateServerInfo(StoreWrapper storeWrapper) new("uptime_in_days", ((int)uptime.TotalDays).ToString()), new("monitor_task", storeWrapper.serverOptions.MetricsSamplingFrequency > 0 ? "enabled" : "disabled"), new("monitor_freq", storeWrapper.serverOptions.MetricsSamplingFrequency.ToString()), + new("otel_export", storeWrapper.serverOptions.OpenTelemetryEndpoint != null ? "enabled" : "disabled"), + new("otel_endpoint", storeWrapper.serverOptions.OpenTelemetryEndpoint?.ToString() ?? "-"), new("latency_monitor", storeWrapper.serverOptions.LatencyMonitor ? "enabled" : "disabled"), new("run_id", storeWrapper.RunId), new("redis_version", storeWrapper.redisProtocolVersion), diff --git a/libs/server/Metrics/Latency/GarnetLatencyMetricsSession.cs b/libs/server/Metrics/Latency/GarnetLatencyMetricsSession.cs index fb8bba3b0f7..34e4387c0d1 100644 --- a/libs/server/Metrics/Latency/GarnetLatencyMetricsSession.cs +++ b/libs/server/Metrics/Latency/GarnetLatencyMetricsSession.cs @@ -4,6 +4,7 @@ using System; using System.Runtime.CompilerServices; using Garnet.common; +using Garnet.server.Metrics.Latency; namespace Garnet.server { @@ -71,6 +72,8 @@ public void StopAndSwitch(LatencyMetricsType oldCmd, LatencyMetricsType newCmd) int new_idx = (int)newCmd; metrics[new_idx].startTimestamp = metrics[old_idx].startTimestamp; metrics[old_idx].startTimestamp = 0; + // This call has to happen before the GarnetLatencyMetricsSession's RecordValue call because the latter resets the startTimestamp. + GarnetOpenTelemetryLatencyMetrics.Instance?.RecordLatency(metrics[new_idx].startTimestamp, newCmd); metrics[new_idx].RecordValue(Version); } @@ -78,6 +81,8 @@ public void StopAndSwitch(LatencyMetricsType oldCmd, LatencyMetricsType newCmd) public void Stop(LatencyMetricsType cmd) { int idx = (int)cmd; + // This call has to happen before the GarnetLatencyMetricsSession's RecordValue call because the latter resets the startTimestamp. + GarnetOpenTelemetryLatencyMetrics.Instance?.RecordLatency(metrics[idx].startTimestamp, cmd); metrics[idx].RecordValue(Version); } @@ -86,6 +91,14 @@ public void RecordValue(LatencyMetricsType cmd, long value) { int idx = (int)cmd; metrics[idx].RecordValue(Version, value); + if (cmd == LatencyMetricsType.NET_RS_BYTES) + { + GarnetOpenTelemetryLatencyMetrics.Instance?.RecordBytesProcessed(value); + } + else if (cmd == LatencyMetricsType.NET_RS_OPS) + { + GarnetOpenTelemetryLatencyMetrics.Instance?.RecordOperationsProcessed(value); + } } public void ResetAll() diff --git a/libs/server/Metrics/Latency/GarnetOpenTelemetryLatencyMetrics.cs b/libs/server/Metrics/Latency/GarnetOpenTelemetryLatencyMetrics.cs new file mode 100644 index 00000000000..55c0a18900b --- /dev/null +++ b/libs/server/Metrics/Latency/GarnetOpenTelemetryLatencyMetrics.cs @@ -0,0 +1,133 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.Metrics; +using Garnet.common; + +namespace Garnet.server.Metrics.Latency +{ + + /// + /// Provides OpenTelemetry-compatible latency metrics for Garnet server using . + /// Tracks command latency, bytes processed, and operations processed per network receive call. + /// This class follows a singleton pattern via and . + /// Consumers can subscribe to these metrics using the OpenTelemetry SDK or any other . + /// + internal sealed class GarnetOpenTelemetryLatencyMetrics : IDisposable + { + /// + /// The meter name used by Garnet latency metrics. + /// + public const string MeterName = "Microsoft.Garnet.Server.Latency"; + + /// + /// Gets the singleton instance of , + /// or null if latency tracking is disabled. + /// + public static GarnetOpenTelemetryLatencyMetrics Instance { get; private set; } + + /// + /// The used to create all latency-related instruments. + /// + private readonly Meter meter; + + /// + /// Histogram that records command processing latency (in seconds) per network receive call. + /// Tagged with the of the recorded operation. + /// + private readonly Histogram latencyHistogram; + + /// + /// Histogram that records the number of bytes processed per network receive call. + /// + private readonly Histogram bytesPerCallHistogram; + + /// + /// Histogram that records the number of operations processed per network receive call. + /// + private readonly Histogram operationsPerCallHistogram; + + /// + /// Initializes the singleton . + /// If is true, a new instance is created; + /// otherwise, is set to null. + /// + /// Whether to enable latency tracking. + public static void Initialize(bool trackLatency) + { + Instance = trackLatency + ? new GarnetOpenTelemetryLatencyMetrics() + : null; + } + + /// + /// Disposes the current singleton and sets it to null. + /// + public static void DisposeInstance() + { + Instance?.Dispose(); + Instance = null; + } + + /// + /// Records the elapsed time since as a latency measurement. + /// + /// A timestamp obtained from at the start of the operation. + /// The categorizing this latency measurement. + public void RecordLatency(long startTimestamp, LatencyMetricsType type) + { + var elapsed = Stopwatch.GetElapsedTime(startTimestamp, Stopwatch.GetTimestamp()); + latencyHistogram?.Record(elapsed.TotalSeconds, new KeyValuePair("type", type.ToString())); + } + + /// + /// Records the number of bytes processed in a single network receive call. + /// + /// The number of bytes processed. + public void RecordBytesProcessed(long bytes) + { + bytesPerCallHistogram.Record(Convert.ToInt32(Math.Clamp(bytes, 0, int.MaxValue))); + } + + /// + /// Records the number of operations processed in a single network receive call. + /// + /// The number of operations processed. + public void RecordOperationsProcessed(long operations) + { + operationsPerCallHistogram.Record(Convert.ToInt32(Math.Clamp(operations, 0, int.MaxValue))); + } + + /// + /// Initializes a new instance of , + /// creating the and all histogram instruments. + /// + private GarnetOpenTelemetryLatencyMetrics() + { + this.meter = new Meter(MeterName); + this.latencyHistogram = meter.CreateHistogram( + "garnet.server.command.latency", + unit: "s", + description: "Latency of processing, per network receive call (server side).", + advice: new InstrumentAdvice() + { + // 50, 100, 200, 400, 800, 1600, 3200, 6400, 12800, 25600, 51200, 102400, 204800, 409600 Microseconds: + HistogramBucketBoundaries = [0.00005, 0.0001, 0.0002, 0.0004, 0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096] + }); + + this.bytesPerCallHistogram = meter.CreateHistogram("garnet.server.bytes.processed", unit: "By", description: "Bytes processed, per network receive call (server side)."); + this.operationsPerCallHistogram = meter.CreateHistogram("garnet.server.operations.processed", unit: "{operations}", description: "Ops processed, per network receive call (server side)."); + } + + /// + /// Disposes the underlying and releases associated resources. + /// + public void Dispose() + { + this.meter?.Dispose(); + } + } +} \ No newline at end of file diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs index f3669cf070b..500167ab497 100644 --- a/libs/server/Servers/GarnetServerOptions.cs +++ b/libs/server/Servers/GarnetServerOptions.cs @@ -7,6 +7,7 @@ using Garnet.server.Auth.Settings; using Garnet.server.TLS; using Microsoft.Extensions.Logging; +using OpenTelemetry.Exporter; using Tsavorite.core; namespace Garnet.server @@ -241,6 +242,27 @@ public class GarnetServerOptions : ServerOptions /// public int MetricsSamplingFrequency = 0; + /// + /// The Url of the OpenTelemetry collector to export metrics to. If null, OpenTelemetry metrics export is disabled. + /// + public Uri OpenTelemetryEndpoint = null; + + /// + /// The Protocol to use when exporting OpenTelemetry metrics. + /// If null, the default will be used. + /// + public OtlpExportProtocol? OpenTelemetryExportProtocol = null; + + /// + /// The interval in milliseconds to export OpenTelemetry metrics. If 0, the default interval of 60 seconds will be used. + /// + public int OpenTelemetryExportInterval = 0; + + /// + /// The timeout in milliseconds when exporting OpenTelemetry metrics. If 0, the default timeout of 10 seconds will be used. + /// + public int OpenTelemetryExportTimeout = 0; + /// /// Logging level. Value options: Trace, Debug, Information, Warning, Error, Critical, None /// @@ -669,6 +691,9 @@ public KVSettings GetSettings(ILoggerFactory loggerFactory, if (LatencyMonitor && MetricsSamplingFrequency == 0) throw new Exception("LatencyMonitor requires MetricsSamplingFrequency to be set"); + if (OpenTelemetryEndpoint != null && MetricsSamplingFrequency == 0) + throw new Exception("OpenTelemetry requires MetricsSamplingFrequency to be set"); + // Read cache related settings if (EnableReadCache && !EnableStorageTier) { diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index 0274fba9912..2ad351a5bc8 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -141,6 +141,7 @@ public sealed class StoreWrapper internal readonly CollectionItemBroker itemBroker; internal readonly CustomCommandManager customCommandManager; internal readonly GarnetServerMonitor monitor; + internal readonly GarnetOpenTelemetryServerMonitor openTelemetryServerMonitor; internal readonly IClusterProvider clusterProvider; internal readonly SlowLogContainer slowLogContainer; internal readonly ILogger sessionLogger; @@ -200,6 +201,9 @@ public StoreWrapper( ? new GarnetServerMonitor(this, serverOptions, servers, loggerFactory?.CreateLogger("GarnetServerMonitor")) : null; + this.openTelemetryServerMonitor = serverOptions.MetricsSamplingFrequency > 0 && serverOptions.OpenTelemetryEndpoint != null + ? new GarnetOpenTelemetryServerMonitor(serverOptions, monitor.GlobalMetrics) + : null; this.logger = loggerFactory?.CreateLogger("StoreWrapper"); this.sessionLogger = loggerFactory?.CreateLogger("Session"); this.accessControlList = accessControlList; @@ -813,6 +817,7 @@ private async Task IndexAutoGrowTask(CancellationToken token) internal void Start() { monitor?.Start(); + openTelemetryServerMonitor?.Start(); clusterProvider?.Start(); luaTimeoutManager?.Start(); @@ -919,6 +924,7 @@ public void Dispose() itemBroker?.Dispose(); clusterProvider?.Dispose(); monitor?.Dispose(); + openTelemetryServerMonitor?.Dispose(); luaTimeoutManager?.Dispose(); ctsCommit?.Cancel(); taskManager.Dispose(); diff --git a/test/Garnet.test/OpenTelemetryTests.cs b/test/Garnet.test/OpenTelemetryTests.cs new file mode 100644 index 00000000000..2f2cc3c4d54 --- /dev/null +++ b/test/Garnet.test/OpenTelemetryTests.cs @@ -0,0 +1,189 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics.Metrics; +using System.Threading; +using Allure.NUnit; +using NUnit.Framework; +using StackExchange.Redis; + +namespace Garnet.test +{ + [AllureNUnit] + [TestFixture] + public class OpenTelemetryTests : AllureTestBase + { + private GarnetServer server; + + [SetUp] + public void Setup() + { + server = null; + TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); + } + + [TearDown] + public void TearDown() + { + server?.Dispose(); + TestUtils.OnTearDown(waitForDelete: true); + } + + [Test] + public void OpenTelemetryMetricsDisabled_WhenNoOpenTelemetryEndpointConfigured() + { + // Arrange: + using var meterListener = new MeterListener(); + var publishedInstruments = new HashSet(); + meterListener.InstrumentPublished = (instrument, listener) => publishedInstruments.Add(instrument.Name); + meterListener.Start(); + server = CreateAndStartServer(0, false, false); + + // Act: + SetAndGetRandomData(); + + // Assert: + Assert.That(server, Is.Not.Null); + Assert.That(server.Provider, Is.Not.Null); + Assert.That(server.Provider.StoreWrapper, Is.Not.Null); + Assert.That(server.Provider.StoreWrapper.openTelemetryServerMonitor, Is.Null); + Assert.That(publishedInstruments, Has.None.StartsWith("garnet.server.")); + } + + [Test] + public void StartThrowsException_WhenOpenTelemetryConfigurationInvalid() + { + // Arrange: + + // Act & Assert: + var exception = Assert.Throws(() => CreateAndStartServer(0, false, true)); + Assert.That(exception.Message, Is.EqualTo("OpenTelemetry requires MetricsSamplingFrequency to be set")); + } + + [TestCase(false)] + [TestCase(true)] + public void OpenTelemetryMetricsRecorded_WhenOpenTelemetryEndpointConfigured(bool enableLatencyMonitor) + { + // Arrange: + using var meterListener = new MeterListener(); + var publishedInstruments = new HashSet(); + var recordedMeasurements = new ConcurrentDictionary(); + + meterListener.InstrumentPublished = (instrument, listener) => + { + if (instrument.Name.StartsWith("garnet.server.")) + { + publishedInstruments.Add(instrument.Name); + listener.EnableMeasurementEvents(instrument); + } + }; + + meterListener.SetMeasurementEventCallback((instrument, measurement, tags, state) => + recordedMeasurements.AddOrUpdate(instrument.Name, measurement, (_, existing) => existing + measurement)); + meterListener.SetMeasurementEventCallback((instrument, measurement, tags, state) => + recordedMeasurements.AddOrUpdate(instrument.Name, measurement, (_, existing) => existing + measurement)); + meterListener.SetMeasurementEventCallback((instrument, measurement, tags, state) => + recordedMeasurements.AddOrUpdate(instrument.Name, measurement, (_, existing) => existing + measurement)); + + meterListener.Start(); + server = CreateAndStartServer(1, enableLatencyMonitor, true); + + // Act: + SetAndGetRandomData(); + + // Wait for at least one monitor sampling cycle (configured at 1 second) to + // aggregate session metrics from active sessions into globalSessionMetrics. + Thread.Sleep(2000); + + // Trigger collection of observable instruments + meterListener.RecordObservableInstruments(); + + // Assert: Verify all expected server metrics are published + Assert.That(publishedInstruments, Has.Member("garnet.server.connections.active")); + Assert.That(publishedInstruments, Has.Member("garnet.server.connections.received")); + Assert.That(publishedInstruments, Has.Member("garnet.server.connections.disposed")); + + // Assert: Verify all expected session metrics are published + Assert.That(publishedInstruments, Has.Member("garnet.server.commands.processed")); + Assert.That(publishedInstruments, Has.Member("garnet.server.transaction.commands.received")); + Assert.That(publishedInstruments, Has.Member("garnet.server.transaction.commands.failed")); + Assert.That(publishedInstruments, Has.Member("garnet.server.write.commands.processed")); + Assert.That(publishedInstruments, Has.Member("garnet.server.read.commands.processed")); + Assert.That(publishedInstruments, Has.Member("garnet.server.cluster.commands.processed")); + Assert.That(publishedInstruments, Has.Member("garnet.server.network.bytes.received")); + Assert.That(publishedInstruments, Has.Member("garnet.server.network.bytes.sent")); + Assert.That(publishedInstruments, Has.Member("garnet.server.cache.lookups")); + Assert.That(publishedInstruments, Has.Member("garnet.server.cache.lookups.missed")); + Assert.That(publishedInstruments, Has.Member("garnet.server.operations.pending")); + Assert.That(publishedInstruments, Has.Member("garnet.server.resp.session.exceptions")); + + // Assert: Verify latency metrics are published only when latency monitoring is enabled + if (enableLatencyMonitor) + { + Assert.That(publishedInstruments, Has.Member("garnet.server.command.latency")); + Assert.That(publishedInstruments, Has.Member("garnet.server.bytes.processed")); + Assert.That(publishedInstruments, Has.Member("garnet.server.operations.processed")); + } + else + { + Assert.That(publishedInstruments, Has.No.Member("garnet.server.command.latency")); + Assert.That(publishedInstruments, Has.No.Member("garnet.server.bytes.processed")); + Assert.That(publishedInstruments, Has.No.Member("garnet.server.operations.processed")); + } + + // Assert: Verify session metrics have non-zero values after operations. + Assert.That(recordedMeasurements.GetValueOrDefault("garnet.server.commands.processed"), Is.GreaterThan(0), + "Expected commands to have been processed."); + Assert.That(recordedMeasurements.GetValueOrDefault("garnet.server.write.commands.processed"), Is.GreaterThan(0), + "Expected write commands to have been processed."); + Assert.That(recordedMeasurements.GetValueOrDefault("garnet.server.read.commands.processed"), Is.GreaterThan(0), + "Expected read commands to have been processed."); + Assert.That(recordedMeasurements.GetValueOrDefault("garnet.server.network.bytes.received"), Is.GreaterThan(0), + "Expected bytes to have been received from the network."); + Assert.That(recordedMeasurements.GetValueOrDefault("garnet.server.network.bytes.sent"), Is.GreaterThan(0), + "Expected bytes to have been sent to the network."); + Assert.That(recordedMeasurements.GetValueOrDefault("garnet.server.cache.lookups"), Is.GreaterThan(0), + "Expected cache lookups to have occurred."); + + if (enableLatencyMonitor) + { + Assert.That(recordedMeasurements.GetValueOrDefault("garnet.server.command.latency"), Is.GreaterThan(0), + "Expected command latency to have been recorded."); + Assert.That(recordedMeasurements.GetValueOrDefault("garnet.server.bytes.processed"), Is.GreaterThan(0), + "Expected bytes processed per call to have been recorded."); + Assert.That(recordedMeasurements.GetValueOrDefault("garnet.server.operations.processed"), Is.GreaterThan(0), + "Expected operations processed per call to have been recorded."); + } + } + + private void SetAndGetRandomData() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + int opCount = 100; + for (int i = 0; i < opCount; i++) + { + Assert.That(db.StringSet(i.ToString(), i.ToString()), Is.True); + var result = (string)db.StringGet(i.ToString()); + Assert.That(result, Is.EqualTo(i.ToString())); + } + } + + private GarnetServer CreateAndStartServer(int metricsSamplingFrequency, bool enableLatencyMonitor, bool enableOpenTelemetry) + { + var server = TestUtils.CreateGarnetServer( + TestUtils.MethodTestDir, + metricsSamplingFreq: metricsSamplingFrequency, + latencyMonitor: enableLatencyMonitor, + enableOpenTelemetry: enableOpenTelemetry); + + server.Start(); + + return server; + } + } +} diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index cc64f5cba03..34d6b53da3c 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -278,6 +278,7 @@ public static GarnetServer CreateGarnetServer( bool useInChainRevivOnly = false, bool useLogNullDevice = false, bool enableVectorSetPreview = true, + bool enableOpenTelemetry = false, string aofMemorySize = "64m" ) { @@ -346,6 +347,7 @@ public static GarnetServer CreateGarnetServer( QuietMode = true, MetricsSamplingFrequency = metricsSamplingFreq, LatencyMonitor = latencyMonitor, + OpenTelemetryEndpoint = enableOpenTelemetry ? new Uri("http://localhost:4317") : null, DeviceFactoryCreator = useAzureStorage ? logger == null ? TestUtils.AzureStorageNamedDeviceFactoryCreator : new AzureStorageNamedDeviceFactoryCreator(AzureEmulatedStorageString, logger) : new LocalStorageNamedDeviceFactoryCreator(logger: logger),