diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift index f29835ae..7dde461a 100644 --- a/singer-commons/src/main/thrift/config.thrift +++ b/singer-commons/src/main/thrift/config.thrift @@ -293,6 +293,20 @@ struct SingerLogConfig { * Configuration to transform log message */ 15: optional MessageTransformerConfig messageTransformerConfig; + + /** + * Allowlist of pod identifiers that this log stream should process. + * + * Behavior (in Kubernetes mode): + * - Not set (null) or empty: Universal config - processed for all pods log directories and host-level directories + * - ["svc1", "svc2"]: Pod-only config - only matching pods, skipped for host-level + * + * Adding "__HOST__" to the allowlist indicates the config should also be processed for host-level directories. + * Pod IDs use prefix matching. + * + * The pod identifier is determined by KubeConfig.podAllowlistMetadataKey. + */ + 16: optional list podAllowlist; } /** @@ -382,6 +396,12 @@ struct KubeConfig { * If true, Singer will directly delete pod directories. */ 12: optional bool enablePodLogDirectoryCleanup = false; + + /** + * The metadata key to use for checking against podAllowlist in SingerLogConfig. + * If not set, the podAllowlist feature is disabled and all configs are initialized for all pods. + */ + 13: optional string podAllowlistMetadataKey; } struct AdminConfig { diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java index 183f684d..14d187e0 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java @@ -130,4 +130,8 @@ public class SingerConfigDef { public static final String MATCH_ABSOLUTE_PATH = "matchAbsolutePath"; public static final String CONTENT_TYPE = "contentType"; public static final String NAMED_GROUP_PATTERN = "\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>"; + + // Pod allowlist config for Kubernetes log stream filtering + public static final String POD_ALLOWLIST = "podAllowlist"; + public static final String POD_ALLOWLIST_METADATA_KEY = "podAllowlistMetadataKey"; } \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java index 5630ea33..a5d8252a 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java @@ -108,6 +108,7 @@ public class SingerMetrics { public static final String NUMBER_OF_PODS = KUBE_PREFIX + "number_of_pods"; public static final String POD_METADATA_UPDATED = KUBE_PREFIX + "pod_metadata_updated"; public static final String POD_METADATA_MAP_SIZE = KUBE_PREFIX + "pod_metadata_size"; + public static final String POD_ALLOWLIST_MATCH = KUBE_PREFIX + "pod_allowlist_match"; public static final String ADMIN_PREFIX = SINGER_PREIX + "admin."; public static final String ADMIN_SERVER_STARTED = ADMIN_PREFIX + "admin_server_started"; diff --git a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java index db1ef63a..db24ea17 100644 --- a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java +++ b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java @@ -102,8 +102,6 @@ private KubeService() { Preconditions.checkNotNull(kubeConfig); try { init(kubeConfig); - // Register here so it is the first watcher in the set - addWatcher(PodMetadataWatcher.getInstance()); } catch (Exception e) { LOG.error("Exception while initializing KubeService", e); throw new RuntimeException(e); @@ -277,6 +275,12 @@ public void updatePodNames() throws IOException { */ public void updatePodWatchers(String podName, boolean isDelete) { LOG.debug("Pod change:" + podName + " deleted:" + isDelete); + + // Remove metadata from cache when pod is deleted + if (isDelete) { + PodMetadataFetcher.getInstance().remove(podName); + } + for (PodWatcher watcher : registeredWatchers) { try { if (isDelete) { diff --git a/singer/src/main/java/com/pinterest/singer/kubernetes/PodMetadataWatcher.java b/singer/src/main/java/com/pinterest/singer/kubernetes/PodMetadataFetcher.java similarity index 52% rename from singer/src/main/java/com/pinterest/singer/kubernetes/PodMetadataWatcher.java rename to singer/src/main/java/com/pinterest/singer/kubernetes/PodMetadataFetcher.java index 508edd6d..b438b301 100644 --- a/singer/src/main/java/com/pinterest/singer/kubernetes/PodMetadataWatcher.java +++ b/singer/src/main/java/com/pinterest/singer/kubernetes/PodMetadataFetcher.java @@ -20,39 +20,51 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class PodMetadataWatcher implements PodWatcher { - private static final Logger LOG = LoggerFactory.getLogger(PodMetadataWatcher.class); +/** + * Service for fetching and caching pod metadata from the kubelet API. + * + * This class provides lazy loading of pod metadata - metadata is fetched + * on-demand when first requested and cached for subsequent access. + * + */ +public class PodMetadataFetcher { + private static final Logger LOG = LoggerFactory.getLogger(PodMetadataFetcher.class); private final Map> podMetadata = new ConcurrentHashMap<>(); private final List podMetadataFields; private final String podLogDirectory; - private static PodMetadataWatcher instance; + private static PodMetadataFetcher instance; - public static PodMetadataWatcher getInstance() { + public static PodMetadataFetcher getInstance() { if (instance == null) { - synchronized (PodMetadataWatcher.class) { + synchronized (PodMetadataFetcher.class) { if (instance == null) { - instance = new PodMetadataWatcher(); + instance = new PodMetadataFetcher(); } } } return instance; } - protected PodMetadataWatcher() { + @VisibleForTesting + public static void reset() { + instance = null; + } + + protected PodMetadataFetcher() { SingerConfig config = SingerSettings.getSingerConfig(); Preconditions.checkNotNull(config); KubeConfig kubeConfig = config.getKubeConfig(); Preconditions.checkNotNull(kubeConfig); podLogDirectory = kubeConfig.getPodLogDirectory(); if (kubeConfig.getPodMetadataFields() == null || kubeConfig.getPodMetadataFields().isEmpty()) { - LOG.warn("Pod metadata fields are not set in the config. Pod metadata will not be updated."); + LOG.warn("Pod metadata fields are not set in the config. Pod metadata will not be fetched."); } podMetadataFields = kubeConfig.getPodMetadataFields(); } @VisibleForTesting - protected PodMetadataWatcher(KubeConfig kubeConfig) { + protected PodMetadataFetcher(KubeConfig kubeConfig) { Preconditions.checkNotNull(kubeConfig); podMetadataFields = kubeConfig.getPodMetadataFields(); podLogDirectory = kubeConfig.getPodLogDirectory(); @@ -63,50 +75,71 @@ public Map> getPodMetadataMap() { return new HashMap<>(podMetadata); } - @Override - public void podCreated(String podUid) { - try { - if (!podMetadata.containsKey(podUid)) { - updatePodMetadata(podUid); - OpenTsdbMetricConverter.gauge(SingerMetrics.POD_METADATA_MAP_SIZE, podMetadata.size()); - } - } catch (IOException e) { - LOG.error("Failed to update pod metadata for pod: {}", podUid, e); + /** + * Get metadata for a pod, fetching from kubelet if not already cached. + * + * @param podUid the pod directory name (namespace_name_uid format) + * @return the pod metadata map, or null if unavailable + */ + public Map getPodMetadata(String podUid) { + // Return cached if available + if (podMetadata.containsKey(podUid)) { + return podMetadata.get(podUid); } + + // Fetch and cache + return fetchAndCache(podUid); } - @Override - public void podDeleted(String podUid) { + /** + * Remove a pod's metadata from the cache. + * Should be called when a pod is deleted. + * + * @param podUid the pod directory name + */ + public void remove(String podUid) { podMetadata.remove(podUid); OpenTsdbMetricConverter.gauge(SingerMetrics.POD_METADATA_MAP_SIZE, podMetadata.size()); + LOG.debug("Removed pod metadata from cache: {}", podUid); } /** - * Update podMetadata from kubelet for active pods. This method is only called after - * the initial file system pod discovery. - * - * @throws IOException + * Fetch metadata from kubelet and cache it. + * Synchronized to prevent duplicate fetches for the same pod. */ - private void updatePodMetadata(String podUid) throws IOException { + private synchronized Map fetchAndCache(String podUid) { if (podMetadataFields == null || podMetadataFields.isEmpty()) { - return; + return null; } - JsonArray podList = KubeService.getPodListFromKubelet(); - if (podList != null) { - for (int i = 0; i < podList.size(); i++) { - JsonObject metadata = podList.get(i).getAsJsonObject().get("metadata").getAsJsonObject(); - String - podDirectoryName = - KubeService.getPodDirectoryName(podLogDirectory, metadata.get("namespace").getAsString(), - metadata.get("name").getAsString(), metadata.get("uid").getAsString()); - if (podDirectoryName.equals(podUid)) { - podMetadata.put(podUid, extractPodMetadataFields(metadata, podMetadataFields)); - LOG.info("Pod metadata updated for pod: {}", podUid); - OpenTsdbMetricConverter.incr(SingerMetrics.POD_METADATA_UPDATED, - "podName=" + podUid, "namespace=" + metadata.get("namespace").getAsString()); + + try { + JsonArray podList = KubeService.getPodListFromKubelet(); + if (podList != null) { + for (int i = 0; i < podList.size(); i++) { + JsonObject metadata = podList.get(i).getAsJsonObject().get("metadata").getAsJsonObject(); + String podDirectoryName = KubeService.getPodDirectoryName( + podLogDirectory, + metadata.get("namespace").getAsString(), + metadata.get("name").getAsString(), + metadata.get("uid").getAsString() + ); + + if (podDirectoryName.equals(podUid)) { + Map extracted = extractPodMetadataFields(metadata, podMetadataFields); + podMetadata.put(podUid, extracted); + LOG.info("Fetched and cached metadata for pod: {} is {}", podUid, extracted); + OpenTsdbMetricConverter.gauge(SingerMetrics.POD_METADATA_MAP_SIZE, podMetadata.size()); + OpenTsdbMetricConverter.incr(SingerMetrics.POD_METADATA_UPDATED, + "podName=" + podUid, "namespace=" + metadata.get("namespace").getAsString()); + return extracted; + } } } + } catch (IOException e) { + LOG.warn("Failed to fetch metadata for pod: {}", podUid, e); } + + return null; } /** @@ -131,13 +164,9 @@ public Map extractPodMetadataFields(JsonObject metadata, List getPodMetadata(String podName) { - return podMetadata.get(podName) != null ? podMetadata.get(podName) : null; - } } diff --git a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java index 65059dea..9dfc3411 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java @@ -58,7 +58,7 @@ import com.pinterest.singer.common.errors.LogStreamException; import com.pinterest.singer.common.errors.SingerLogException; import com.pinterest.singer.kubernetes.KubeService; -import com.pinterest.singer.kubernetes.PodMetadataWatcher; +import com.pinterest.singer.kubernetes.PodMetadataFetcher; import com.pinterest.singer.kubernetes.PodWatcher; import com.pinterest.singer.metrics.OpenTsdbMetricConverter; import com.pinterest.singer.thrift.LogFile; @@ -82,6 +82,8 @@ public class LogStreamManager implements PodWatcher { private static final String POD_LOGNAME_SEPARATOR = ".."; // use an empty string as non-kubernetes pod id for backward compatibility public static final String NON_KUBERNETES_POD_ID = ""; + // Marker value in podAllowlist to indicate config should also be processed at host level directories (outside of podLogDirectory) + private static final String INCLUDE_HOST_MARKER = "__HOST__"; private static final Logger LOG = LoggerFactory.getLogger(LogStreamManager.class); private static LogStreamManager instance; @@ -106,6 +108,8 @@ public class LogStreamManager implements PodWatcher { */ private Map> singerLogPaths = new ConcurrentHashMap<>(); private String podLogDirectory = ""; + private String podAllowlistMetadataKey = null; + private final boolean kubernetesEnabled; private FileSystemEventFetcher recursiveDirectoryWatcher; private Thread recursiveEventProcessorThread; @@ -120,10 +124,17 @@ public MissingDirChecker getMissingDirChecker() { protected LogStreamManager() { missingDirChecker = new MissingDirChecker(); SingerConfig singerConfig = SingerSettings.getSingerConfig(); - if(singerConfig!=null && - singerConfig.isKubernetesEnabled()) { + kubernetesEnabled = singerConfig != null && singerConfig.isKubernetesEnabled(); + if (kubernetesEnabled) { KubeService.getInstance().addWatcher(this); podLogDirectory = singerConfig.getKubeConfig().getPodLogDirectory(); + String configuredMetadataKey = singerConfig.getKubeConfig().getPodAllowlistMetadataKey(); + if (configuredMetadataKey != null && !configuredMetadataKey.isEmpty()) { + podAllowlistMetadataKey = configuredMetadataKey; + LOG.warn("Pod allowlist feature enabled with metadata key: {}", podAllowlistMetadataKey); + } else { + LOG.warn("Pod allowlist feature disabled - no podAllowlistMetadataKey configured"); + } try { recursiveDirectoryWatcher = new FileSystemEventFetcher(singerConfig); recursiveDirectoryWatcher.start("RecursiveDirectoryWatcher"); @@ -265,6 +276,14 @@ private void initializeLogStreamsInternal() throws SingerLogException{ if (logConfigs!=null) { for (SingerLogConfig singerLogConfig : logConfigs) { + // In Kubernetes mode, only process configs that include host-level processing + // Skip configs that are pod-only (have allowlist but no INCLUDE_HOST_MARKER) + if (kubernetesEnabled && !includesHostProcessing(singerLogConfig)) { + LOG.info("Skipping pod-only config {} for host-level initialization", + singerLogConfig.getName()); + continue; + } + ArrayList directories = SingerUtils.splitString(singerLogConfig.getLogDir()); SingerLog singerLog = new SingerLog(singerLogConfig); @@ -754,8 +773,20 @@ private long checkAndCleanupLogStreams(int deletionTimeout) { * @throws SingerLogException */ public void initializeLogStreamForPod(String podUid, Collection configs) throws SingerLogException { - // initialize logstreams for the supplied configs only + boolean podAllowlistEnabled = podAllowlistMetadataKey != null && !podAllowlistMetadataKey.isEmpty(); + Map podMetadata = PodMetadataFetcher.getInstance().getPodMetadata(podUid); + + String podIdentifier = null; + if (podAllowlistEnabled && podMetadata != null) { + podIdentifier = podMetadata.get(podAllowlistMetadataKey); + } + for(SingerLogConfig singerLogConfig : configs) { + // Skip if this config should not be initialized for this pod + if (podAllowlistEnabled && !shouldInitializeForPod(podIdentifier, singerLogConfig)) { + continue; + } + ArrayList directories = SingerUtils.splitString(singerLogConfig.getLogDir()); String logPathKeys = directories.stream() .map(dir -> new File(podLogDirectory + "/" + podUid + "/" + dir).getAbsolutePath()) @@ -771,7 +802,6 @@ public void initializeLogStreamForPod(String podUid, Collection clone.setName(podUid + POD_LOGNAME_SEPARATOR + clone.getName()); singerLog = new SingerLog(clone, podUid); - Map podMetadata = PodMetadataWatcher.getInstance().getPodMetadata(podUid); if (podMetadata != null) { LOG.info("Initializing pod metadata {} for pod: {}", podMetadata, podUid); OpenTsdbMetricConverter.incr("pod_metadata_enabled", 1, "pod=" + podUid, "log=" + clone.getName()); @@ -794,6 +824,51 @@ public void initializeLogStreamForPod(String podUid, Collection } } + /** + * Check if a config should be processed at the host level. + * + * @param singerLogConfig the log config to check + * @return true if this config should be processed at host level + */ + private boolean includesHostProcessing(SingerLogConfig singerLogConfig) { + if (!singerLogConfig.isSetPodAllowlist() || singerLogConfig.getPodAllowlist().isEmpty()) { + return true; + } + return singerLogConfig.getPodAllowlist().contains(INCLUDE_HOST_MARKER); + } + + /** + * Determine if a log stream config should be initialized for a pod. + * - No allowlist or empty allowlist: initialize for all pods (universal) + * - Allowlist with pod IDs: only initialize if pod's identifier matches + * + * @param podIdentifier the pod's identifier from metadata (null if not present) + * @param singerLogConfig the log config to check + * @return true if the config should be initialized for this pod + */ + private boolean shouldInitializeForPod(String podIdentifier, SingerLogConfig singerLogConfig) { + if (!singerLogConfig.isSetPodAllowlist() || singerLogConfig.getPodAllowlist().isEmpty()) { + return true; + } + + if (podIdentifier == null) { + return false; + } + + // Check if pod's identifier matches any entry (skip INCLUDE_HOST_MARKER marker) + for (String entry : singerLogConfig.getPodAllowlist()) { + if (INCLUDE_HOST_MARKER.equals(entry)) { + continue; + } + if (podIdentifier.startsWith(entry)) { + OpenTsdbMetricConverter.incr(SingerMetrics.POD_ALLOWLIST_MATCH, + "podId=" + podIdentifier, "log=" + singerLogConfig.getName()); + return true; + } + } + return false; + } + /** * Exposed for testing only * @return diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java index 69d27dc9..cdcb6bd0 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java @@ -359,6 +359,12 @@ private static KubeConfig parseKubeConfig(PropertiesConfiguration configHeader) config.setEnablePodLogDirectoryCleanup( subsetConfig.getBoolean(SingerConfigDef.KUBE_ENABLE_POD_LOG_DIRECTORY_CLEANUP)); } + + if (subsetConfig.containsKey(SingerConfigDef.POD_ALLOWLIST_METADATA_KEY)) { + String metadataKey = subsetConfig.getString(SingerConfigDef.POD_ALLOWLIST_METADATA_KEY); + config.setPodAllowlistMetadataKey(metadataKey); + LOG.info("Pod allowlist metadata key configured: {}", metadataKey); + } return config; } @@ -765,6 +771,16 @@ public static SingerLogConfig parseLogConfig(String logName, if (logConfiguration.containsKey(SingerConfigDef.SKIP_DRAINING)) { config.setSkipDraining(logConfiguration.getBoolean(SingerConfigDef.SKIP_DRAINING)); } + + // Parse pod allowlist configuration for Kubernetes environments + if (logConfiguration.containsKey(SingerConfigDef.POD_ALLOWLIST)) { + List allowlistObjects = logConfiguration.getList(SingerConfigDef.POD_ALLOWLIST); + List podAllowlist = allowlistObjects.stream() + .map(Object::toString) + .collect(Collectors.toList()); + config.setPodAllowlist(podAllowlist); + LOG.info("Pod allowlist configured for log {}: {}", logName, podAllowlist); + } ts = System.currentTimeMillis() - ts; LOG.debug("Loaded:"+logName+" configuration in "+ts+"ms"); diff --git a/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java b/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java index df3e9b32..6a4f6a63 100644 --- a/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java +++ b/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java @@ -33,7 +33,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -55,8 +54,31 @@ @SuppressWarnings("restriction") public class TestKubeService { - private static HttpServer server; - private final List podNames = Arrays.asList( + protected static HttpServer server; + + /** + * Ensure the test HTTP server is running. Can be called from other test classes. + */ + protected static void ensureServerRunning() throws IOException { + if (server == null) { + server = HttpServer.create(new InetSocketAddress(10255), 0); + server.start(); + } + } + + /** + * Remove the /pods context to allow registering a new one. + */ + protected static void removePodsContext() { + try { + if (server != null) { + server.removeContext("/pods"); + } + } catch (Exception e) { + // ignore + } + } + protected final List podNames = Arrays.asList( "default_nginx-deployment-5c689d7589-abcde_12345678-1234-1234-1234-1234567890ab", "default_nginx-deployment-5c689d7589-fghij_12345678-1234-5678-1234-567890abcdef", "default_backend-service-7987d5b5c-12345_54321678-9876-5432-9876-5432198765ac", @@ -66,13 +88,15 @@ public class TestKubeService { @BeforeClass public static void beforeClass() throws IOException { - server = com.sun.net.httpserver.HttpServer.create(new InetSocketAddress(10255), 0); - server.start(); + ensureServerRunning(); } @AfterClass public static void afterClass() { - server.stop(0); + if (server != null) { + server.stop(0); + server = null; + } } @Before @@ -191,20 +215,23 @@ public void testUpdatePodMetadata() { KubeConfig kubeConfig = new KubeConfig(); kubeConfig.setPodMetadataFields( Arrays.asList("name", "namespace", "uid")); - KubeService kubeService = new KubeService(kubeConfig); - PodMetadataWatcher pmdTracker = new PodMetadataWatcher(kubeConfig); - kubeService.addWatcher(pmdTracker); + kubeConfig.setPodLogDirectory(""); + + PodMetadataFetcher pmdTracker = new PodMetadataFetcher(kubeConfig); + for (String pod : podNames) { - kubeService.updatePodWatchers(pod, false); + Map metadata = pmdTracker.getPodMetadata(pod); + assertNotNull("Metadata should be fetched for pod: " + pod, metadata); + assertNotNull(metadata.get("namespace")); + assertNotNull(metadata.get("name")); + assertNotNull(metadata.get("uid")); } + assertEquals(podNames.size(), pmdTracker.getPodMetadataMap().size()); - for (Entry> pod : pmdTracker.getPodMetadataMap().entrySet()) { - assertTrue(podNames.contains(pod.getKey())); - assertNotNull(pod.getValue().get("namespace")); - assertNotNull(pod.getValue().get("name")); - assertNotNull(pod.getValue().get("uid")); - } - kubeService.updatePodWatchers(podNames.get(podNames.size() - 1), true); + + // Test removing a pod from the cache + String lastPod = podNames.get(podNames.size() - 1); + pmdTracker.remove(lastPod); assertEquals(podNames.size() - 1, pmdTracker.getPodMetadataMap().size()); } @@ -283,7 +310,7 @@ public void handle(HttpExchange exchange) throws IOException { }); } - public void registerGoodResponse() { + protected static void registerGoodResponse() { server.createContext("/pods", new HttpHandler() { @Override diff --git a/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodAllowlist.java b/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodAllowlist.java new file mode 100644 index 00000000..820d6891 --- /dev/null +++ b/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodAllowlist.java @@ -0,0 +1,356 @@ +/** + * Copyright 2019 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.kubernetes; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.concurrent.Executors; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.pinterest.singer.SingerTestBase; +import com.pinterest.singer.common.SingerSettings; +import com.pinterest.singer.monitor.LogStreamManager; +import com.pinterest.singer.thrift.configuration.FileNameMatchMode; +import com.pinterest.singer.thrift.configuration.KubeConfig; +import com.pinterest.singer.thrift.configuration.SingerConfig; +import com.pinterest.singer.thrift.configuration.SingerLogConfig; + +/** + * Tests for the pod allowlist feature which filters log stream initialization + * based on pod metadata (name). + * + * Uses the shared HTTP server from TestKubeService and pods-goodresponse.json. + */ +public class TestPodAllowlist { + + private SingerConfig config; + private KubeConfig kubeConfig; + private String podLogPath; + private Path tempDir; + + // Pod directory names from pods-goodresponse.json: namespace_name_uid + private static final String POD_NGINX_1 = "default_nginx-deployment-5c689d7589-abcde_12345678-1234-1234-1234-1234567890ab"; + private static final String POD_BACKEND = "default_backend-service-7987d5b5c-12345_54321678-9876-5432-9876-5432198765ac"; + private static final String POD_DATABASE = "default_database-7f8d5b7c6-mnopq_98765432-7654-4321-6543-987654321098"; + + @BeforeClass + public static void beforeClass() throws IOException { + TestKubeService.ensureServerRunning(); + } + + @AfterClass + public static void afterClass() { + TestKubeService.removePodsContext(); + } + + @Before + public void before() throws IOException { + TestKubeService.removePodsContext(); + TestKubeService.registerGoodResponse(); + + LogStreamManager.getInstance().getSingerLogPaths().clear(); + SingerSettings.getFsMonitorMap().clear(); + LogStreamManager.reset(); + KubeService.reset(); + PodMetadataFetcher.reset(); + SingerSettings.reset(); + + SingerSettings.setBackgroundTaskExecutor( + Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build())); + + config = new SingerConfig(); + config.setKubernetesEnabled(true); + SingerSettings.setSingerConfig(config); + + kubeConfig = new KubeConfig(); + config.setKubeConfig(kubeConfig); + + tempDir = Files.createTempDirectory("pods_allowlist_test"); + podLogPath = tempDir.toAbsolutePath().toString(); + kubeConfig.setPodLogDirectory(podLogPath); + kubeConfig.setPodMetadataFields(Arrays.asList("name")); + } + + @After + public void after() { + TestKubeService.removePodsContext(); + SingerSettings.getFsMonitorMap().clear(); + if (tempDir != null) { + deleteDirectory(tempDir.toFile()); + } + LogStreamManager.reset(); + KubeService.reset(); + PodMetadataFetcher.reset(); + SingerSettings.reset(); + } + + @Test + public void testAllowlistDisabledWhenMetadataKeyNotConfigured() throws Exception { + // Don't set podAllowlistMetadataKey - feature should be disabled + SingerLogConfig logConfig = createLogConfig("test-log", "/var/log", "app.log"); + logConfig.setPodAllowlist(Arrays.asList("nginx-deployment-5c689d7589-abcde")); // Only allow this pod + + config.setLogConfigs(Arrays.asList(logConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + createPodDirectory(POD_BACKEND, "/var/log", "app.log"); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertEquals("Should initialize because allowlist feature is disabled", + 1, lsm.getSingerLogPaths().size()); + + instance.stop(); + } + + @Test + public void testAllowlistMatchAllowsInitialization() throws Exception { + kubeConfig.setPodAllowlistMetadataKey("name"); + + SingerLogConfig logConfig = createLogConfig("test-log", "/var/log", "app.log"); + logConfig.setPodAllowlist(Arrays.asList( + "nginx-deployment-5c689d7589-abcde", + "nginx-deployment-5c689d7589-fghij")); + + config.setLogConfigs(Arrays.asList(logConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + createPodDirectory(POD_NGINX_1, "/var/log", "app.log"); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertEquals("Pod nginx-1 should be initialized", 1, lsm.getSingerLogPaths().size()); + assertTrue(lsm.getSingerLogPaths().containsKey(podLogPath + "/" + POD_NGINX_1 + "/var/log")); + + instance.stop(); + } + + @Test + public void testAllowlistNoMatchSkipsInitialization() throws Exception { + kubeConfig.setPodAllowlistMetadataKey("name"); + + SingerLogConfig logConfig = createLogConfig("test-log", "/var/log", "app.log"); + // Only allow nginx pods + logConfig.setPodAllowlist(Arrays.asList( + "nginx-deployment-5c689d7589-abcde", + "nginx-deployment-5c689d7589-fghij")); + + config.setLogConfigs(Arrays.asList(logConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + createPodDirectory(POD_BACKEND, "/var/log", "app.log"); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertEquals("Pod backend should be skipped", 0, lsm.getSingerLogPaths().size()); + + instance.stop(); + } + + @Test + public void testConfigWithoutAllowlistInitializesForAllPods() throws Exception { + kubeConfig.setPodAllowlistMetadataKey("name"); + + SingerLogConfig logConfig = createLogConfig("test-log", "/var/log", "app.log"); + + config.setLogConfigs(Arrays.asList(logConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + createPodDirectory(POD_DATABASE, "/var/log", "app.log"); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertEquals("Pod should be initialized - config has no allowlist", 1, lsm.getSingerLogPaths().size()); + + instance.stop(); + } + + @Test + public void testMultipleConfigsWithDifferentAllowlists() throws Exception { + kubeConfig.setPodAllowlistMetadataKey("name"); + + SingerLogConfig logConfig1 = createLogConfig("log-nginx", "/var/log/nginx", "nginx.log"); + logConfig1.setPodAllowlist(Arrays.asList( + "nginx-deployment-5c689d7589-abcde", + "nginx-deployment-5c689d7589-fghij")); + + SingerLogConfig logConfig2 = createLogConfig("log-backend", "/var/log/backend", "backend.log"); + logConfig2.setPodAllowlist(Arrays.asList("backend-service-7987d5b5c-12345")); + + SingerLogConfig logConfig3 = createLogConfig("log-universal", "/var/log/common", "common.log"); + + config.setLogConfigs(Arrays.asList(logConfig1, logConfig2, logConfig3)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + new File(podLogPath + "/" + POD_NGINX_1 + "/var/log/nginx").mkdirs(); + new File(podLogPath + "/" + POD_NGINX_1 + "/var/log/nginx/nginx.log").createNewFile(); + new File(podLogPath + "/" + POD_NGINX_1 + "/var/log/backend").mkdirs(); + new File(podLogPath + "/" + POD_NGINX_1 + "/var/log/backend/backend.log").createNewFile(); + new File(podLogPath + "/" + POD_NGINX_1 + "/var/log/common").mkdirs(); + new File(podLogPath + "/" + POD_NGINX_1 + "/var/log/common/common.log").createNewFile(); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertEquals("Should have 2 log paths initialized", 2, lsm.getSingerLogPaths().size()); + assertTrue("Should have /var/log/nginx", + lsm.getSingerLogPaths().containsKey(podLogPath + "/" + POD_NGINX_1 + "/var/log/nginx")); + assertTrue("Should have /var/log/common", + lsm.getSingerLogPaths().containsKey(podLogPath + "/" + POD_NGINX_1 + "/var/log/common")); + assertFalse("Should NOT have /var/log/backend", + lsm.getSingerLogPaths().containsKey(podLogPath + "/" + POD_NGINX_1 + "/var/log/backend")); + + instance.stop(); + } + + @Test + public void testHostOnlyConfigSkipsPodsInitialization() throws Exception { + kubeConfig.setPodAllowlistMetadataKey("name"); + + SingerLogConfig hostOnlyConfig = createLogConfig("host-only-log", "/var/log", "host.log"); + hostOnlyConfig.setPodAllowlist(Arrays.asList("__HOST__")); + + config.setLogConfigs(Arrays.asList(hostOnlyConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + createPodDirectory(POD_NGINX_1, "/var/log", "host.log"); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertEquals("Host-only config should be skipped for pods", 0, lsm.getSingerLogPaths().size()); + + instance.stop(); + } + + @Test + public void testIncludeHostMarkerWithPodIds() throws Exception { + kubeConfig.setPodAllowlistMetadataKey("name"); + + Path hostLogDir = Files.createTempDirectory("host_logs_test"); + String hostLogPath = hostLogDir.toAbsolutePath().toString(); + new File(hostLogPath + "/mixed.log").createNewFile(); + + SingerLogConfig mixedConfig = createLogConfig("mixed-log", hostLogPath, "mixed.log"); + mixedConfig.setPodAllowlist(Arrays.asList( + "__HOST__", + "nginx-deployment-5c689d7589-abcde")); + + config.setLogConfigs(Arrays.asList(mixedConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + LogStreamManager.initializeLogStreams(); + LogStreamManager lsm = LogStreamManager.getInstance(); + + assertTrue("Host-level log should be initialized", + lsm.getSingerLogPaths().containsKey(hostLogPath)); + + createPodDirectory(POD_NGINX_1, hostLogPath, "mixed.log"); + + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertEquals("Should have 2 log paths - host and pod", 2, lsm.getSingerLogPaths().size()); + assertTrue("Host-level log should be initialized", + lsm.getSingerLogPaths().containsKey(hostLogPath)); + assertTrue("Pod-level log should be initialized", + lsm.getSingerLogPaths().containsKey(podLogPath + "/" + POD_NGINX_1 + hostLogPath)); + + instance.stop(); + deleteDirectory(hostLogDir.toFile()); + } + + @Test + public void testPrefixMatchingWithPodIds() throws Exception { + kubeConfig.setPodAllowlistMetadataKey("name"); + + SingerLogConfig prefixConfig = createLogConfig("prefix-log", "/var/log", "prefix.log"); + prefixConfig.setPodAllowlist(Arrays.asList("nginx-")); + + config.setLogConfigs(Arrays.asList(prefixConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + createPodDirectory(POD_NGINX_1, "/var/log", "prefix.log"); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS * 3); + + assertEquals("Prefix matching should work - 'nginx-' matches 'nginx-deployment-...'", + 1, lsm.getSingerLogPaths().size()); + + instance.stop(); + } + + private SingerLogConfig createLogConfig(String name, String logDir, String regex) { + SingerLogConfig logConfig = new SingerLogConfig(); + logConfig.setName(name); + logConfig.setLogDir(logDir); + logConfig.setLogStreamRegex(regex); + logConfig.setFilenameMatchMode(FileNameMatchMode.PREFIX); + return logConfig; + } + + private void createPodDirectory(String podUid, String logDir, String logFile) throws IOException { + new File(podLogPath + "/" + podUid + logDir).mkdirs(); + new File(podLogPath + "/" + podUid + logDir + "/" + logFile).createNewFile(); + } + + private static boolean deleteDirectory(File dir) { + if (dir.isDirectory()) { + String[] children = dir.list(); + if (children != null) { + for (String child : children) { + if (!deleteDirectory(new File(dir, child))) { + return false; + } + } + } + } + return dir.delete(); + } +}