Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,20 @@ struct SingerLogConfig {
* Configuration to transform log message
*/
15: optional MessageTransformerConfig messageTransformerConfig;

/**
* Allowlist of pod identifiers that this log stream should process.
*
* Behavior (in Kubernetes mode):
* - Not set (null) or empty: Universal config - processed for all pods log directories and host-level directories
* - ["svc1", "svc2"]: Pod-only config - only matching pods, skipped for host-level
*
* Adding "__HOST__" to the allowlist indicates the config should also be processed for host-level directories.
* Pod IDs use prefix matching.
*
* The pod identifier is determined by KubeConfig.podAllowlistMetadataKey.
*/
16: optional list<string> podAllowlist;
}

/**
Expand Down Expand Up @@ -382,6 +396,12 @@ struct KubeConfig {
* If true, Singer will directly delete pod directories.
*/
12: optional bool enablePodLogDirectoryCleanup = false;

/**
* The metadata key to use for checking against podAllowlist in SingerLogConfig.
* If not set, the podAllowlist feature is disabled and all configs are initialized for all pods.
*/
13: optional string podAllowlistMetadataKey;
}

struct AdminConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,8 @@ public class SingerConfigDef {
public static final String MATCH_ABSOLUTE_PATH = "matchAbsolutePath";
public static final String CONTENT_TYPE = "contentType";
public static final String NAMED_GROUP_PATTERN = "\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>";

// Pod allowlist config for Kubernetes log stream filtering
public static final String POD_ALLOWLIST = "podAllowlist";
public static final String POD_ALLOWLIST_METADATA_KEY = "podAllowlistMetadataKey";
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class SingerMetrics {
public static final String NUMBER_OF_PODS = KUBE_PREFIX + "number_of_pods";
public static final String POD_METADATA_UPDATED = KUBE_PREFIX + "pod_metadata_updated";
public static final String POD_METADATA_MAP_SIZE = KUBE_PREFIX + "pod_metadata_size";
public static final String POD_ALLOWLIST_MATCH = KUBE_PREFIX + "pod_allowlist_match";

public static final String ADMIN_PREFIX = SINGER_PREIX + "admin.";
public static final String ADMIN_SERVER_STARTED = ADMIN_PREFIX + "admin_server_started";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ private KubeService() {
Preconditions.checkNotNull(kubeConfig);
try {
init(kubeConfig);
// Register here so it is the first watcher in the set
addWatcher(PodMetadataWatcher.getInstance());
} catch (Exception e) {
LOG.error("Exception while initializing KubeService", e);
throw new RuntimeException(e);
Expand Down Expand Up @@ -277,6 +275,12 @@ public void updatePodNames() throws IOException {
*/
public void updatePodWatchers(String podName, boolean isDelete) {
LOG.debug("Pod change:" + podName + " deleted:" + isDelete);

// Remove metadata from cache when pod is deleted
if (isDelete) {
PodMetadataFetcher.getInstance().remove(podName);
}

for (PodWatcher watcher : registeredWatchers) {
try {
if (isDelete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,51 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class PodMetadataWatcher implements PodWatcher {
private static final Logger LOG = LoggerFactory.getLogger(PodMetadataWatcher.class);
/**
* Service for fetching and caching pod metadata from the kubelet API.
*
* This class provides lazy loading of pod metadata - metadata is fetched
* on-demand when first requested and cached for subsequent access.
*
*/
public class PodMetadataFetcher {
private static final Logger LOG = LoggerFactory.getLogger(PodMetadataFetcher.class);
private final Map<String, Map<String,String>> podMetadata = new ConcurrentHashMap<>();
private final List<String> podMetadataFields;
private final String podLogDirectory;
private static PodMetadataWatcher instance;
private static PodMetadataFetcher instance;


public static PodMetadataWatcher getInstance() {
public static PodMetadataFetcher getInstance() {
if (instance == null) {
synchronized (PodMetadataWatcher.class) {
synchronized (PodMetadataFetcher.class) {
if (instance == null) {
instance = new PodMetadataWatcher();
instance = new PodMetadataFetcher();
}
}
}
return instance;
}

protected PodMetadataWatcher() {
@VisibleForTesting
public static void reset() {
instance = null;
}

protected PodMetadataFetcher() {
SingerConfig config = SingerSettings.getSingerConfig();
Preconditions.checkNotNull(config);
KubeConfig kubeConfig = config.getKubeConfig();
Preconditions.checkNotNull(kubeConfig);
podLogDirectory = kubeConfig.getPodLogDirectory();
if (kubeConfig.getPodMetadataFields() == null || kubeConfig.getPodMetadataFields().isEmpty()) {
LOG.warn("Pod metadata fields are not set in the config. Pod metadata will not be updated.");
LOG.warn("Pod metadata fields are not set in the config. Pod metadata will not be fetched.");
}
podMetadataFields = kubeConfig.getPodMetadataFields();
}

@VisibleForTesting
protected PodMetadataWatcher(KubeConfig kubeConfig) {
protected PodMetadataFetcher(KubeConfig kubeConfig) {
Preconditions.checkNotNull(kubeConfig);
podMetadataFields = kubeConfig.getPodMetadataFields();
podLogDirectory = kubeConfig.getPodLogDirectory();
Expand All @@ -63,50 +75,71 @@ public Map<String, Map<String,String>> getPodMetadataMap() {
return new HashMap<>(podMetadata);
}

@Override
public void podCreated(String podUid) {
try {
if (!podMetadata.containsKey(podUid)) {
updatePodMetadata(podUid);
OpenTsdbMetricConverter.gauge(SingerMetrics.POD_METADATA_MAP_SIZE, podMetadata.size());
}
} catch (IOException e) {
LOG.error("Failed to update pod metadata for pod: {}", podUid, e);
/**
* Get metadata for a pod, fetching from kubelet if not already cached.
*
* @param podUid the pod directory name (namespace_name_uid format)
* @return the pod metadata map, or null if unavailable
*/
public Map<String, String> getPodMetadata(String podUid) {
// Return cached if available
if (podMetadata.containsKey(podUid)) {
return podMetadata.get(podUid);
}

// Fetch and cache
return fetchAndCache(podUid);
}

@Override
public void podDeleted(String podUid) {
/**
* Remove a pod's metadata from the cache.
* Should be called when a pod is deleted.
*
* @param podUid the pod directory name
*/
public void remove(String podUid) {
podMetadata.remove(podUid);
OpenTsdbMetricConverter.gauge(SingerMetrics.POD_METADATA_MAP_SIZE, podMetadata.size());
LOG.debug("Removed pod metadata from cache: {}", podUid);
}

/**
* Update podMetadata from kubelet for active pods. This method is only called after
* the initial file system pod discovery.
*
* @throws IOException
* Fetch metadata from kubelet and cache it.
* Synchronized to prevent duplicate fetches for the same pod.
*/
private void updatePodMetadata(String podUid) throws IOException {
private synchronized Map<String, String> fetchAndCache(String podUid) {
if (podMetadataFields == null || podMetadataFields.isEmpty()) {
return;
return null;
}
JsonArray podList = KubeService.getPodListFromKubelet();
if (podList != null) {
for (int i = 0; i < podList.size(); i++) {
JsonObject metadata = podList.get(i).getAsJsonObject().get("metadata").getAsJsonObject();
String
podDirectoryName =
KubeService.getPodDirectoryName(podLogDirectory, metadata.get("namespace").getAsString(),
metadata.get("name").getAsString(), metadata.get("uid").getAsString());
if (podDirectoryName.equals(podUid)) {
podMetadata.put(podUid, extractPodMetadataFields(metadata, podMetadataFields));
LOG.info("Pod metadata updated for pod: {}", podUid);
OpenTsdbMetricConverter.incr(SingerMetrics.POD_METADATA_UPDATED,
"podName=" + podUid, "namespace=" + metadata.get("namespace").getAsString());

try {
JsonArray podList = KubeService.getPodListFromKubelet();
if (podList != null) {
for (int i = 0; i < podList.size(); i++) {
JsonObject metadata = podList.get(i).getAsJsonObject().get("metadata").getAsJsonObject();
String podDirectoryName = KubeService.getPodDirectoryName(
podLogDirectory,
metadata.get("namespace").getAsString(),
metadata.get("name").getAsString(),
metadata.get("uid").getAsString()
);

if (podDirectoryName.equals(podUid)) {
Map<String, String> extracted = extractPodMetadataFields(metadata, podMetadataFields);
podMetadata.put(podUid, extracted);
LOG.info("Fetched and cached metadata for pod: {} is {}", podUid, extracted);
OpenTsdbMetricConverter.gauge(SingerMetrics.POD_METADATA_MAP_SIZE, podMetadata.size());
OpenTsdbMetricConverter.incr(SingerMetrics.POD_METADATA_UPDATED,
"podName=" + podUid, "namespace=" + metadata.get("namespace").getAsString());
return extracted;
}
}
}
} catch (IOException e) {
LOG.warn("Failed to fetch metadata for pod: {}", podUid, e);
}

return null;
}

/**
Expand All @@ -131,13 +164,9 @@ public Map<String, String> extractPodMetadataFields(JsonObject metadata, List<St
fieldKey = key;
}
if (fieldKey != null && currentElement != null && currentElement.isJsonPrimitive()) {
extractedFields.putIfAbsent(fieldKey, currentElement.toString());
extractedFields.putIfAbsent(fieldKey, currentElement.getAsString());
}
}
return extractedFields;
}

public Map<String, String> getPodMetadata(String podName) {
return podMetadata.get(podName) != null ? podMetadata.get(podName) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import com.pinterest.singer.common.errors.LogStreamException;
import com.pinterest.singer.common.errors.SingerLogException;
import com.pinterest.singer.kubernetes.KubeService;
import com.pinterest.singer.kubernetes.PodMetadataWatcher;
import com.pinterest.singer.kubernetes.PodMetadataFetcher;
import com.pinterest.singer.kubernetes.PodWatcher;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.thrift.LogFile;
Expand All @@ -82,6 +82,8 @@ public class LogStreamManager implements PodWatcher {
private static final String POD_LOGNAME_SEPARATOR = "..";
// use an empty string as non-kubernetes pod id for backward compatibility
public static final String NON_KUBERNETES_POD_ID = "";
// Marker value in podAllowlist to indicate config should also be processed at host level directories (outside of podLogDirectory)
private static final String INCLUDE_HOST_MARKER = "__HOST__";
private static final Logger LOG = LoggerFactory.getLogger(LogStreamManager.class);
private static LogStreamManager instance;

Expand All @@ -106,6 +108,8 @@ public class LogStreamManager implements PodWatcher {
*/
private Map<String, Set<SingerLog>> singerLogPaths = new ConcurrentHashMap<>();
private String podLogDirectory = "";
private String podAllowlistMetadataKey = null;
private final boolean kubernetesEnabled;

private FileSystemEventFetcher recursiveDirectoryWatcher;
private Thread recursiveEventProcessorThread;
Expand All @@ -120,10 +124,17 @@ public MissingDirChecker getMissingDirChecker() {
protected LogStreamManager() {
missingDirChecker = new MissingDirChecker();
SingerConfig singerConfig = SingerSettings.getSingerConfig();
if(singerConfig!=null &&
singerConfig.isKubernetesEnabled()) {
kubernetesEnabled = singerConfig != null && singerConfig.isKubernetesEnabled();
if (kubernetesEnabled) {
KubeService.getInstance().addWatcher(this);
podLogDirectory = singerConfig.getKubeConfig().getPodLogDirectory();
String configuredMetadataKey = singerConfig.getKubeConfig().getPodAllowlistMetadataKey();
if (configuredMetadataKey != null && !configuredMetadataKey.isEmpty()) {
podAllowlistMetadataKey = configuredMetadataKey;
LOG.warn("Pod allowlist feature enabled with metadata key: {}", podAllowlistMetadataKey);
} else {
LOG.warn("Pod allowlist feature disabled - no podAllowlistMetadataKey configured");
}
try {
recursiveDirectoryWatcher = new FileSystemEventFetcher(singerConfig);
recursiveDirectoryWatcher.start("RecursiveDirectoryWatcher");
Expand Down Expand Up @@ -265,6 +276,14 @@ private void initializeLogStreamsInternal() throws SingerLogException{

if (logConfigs!=null) {
for (SingerLogConfig singerLogConfig : logConfigs) {
// In Kubernetes mode, only process configs that include host-level processing
// Skip configs that are pod-only (have allowlist but no INCLUDE_HOST_MARKER)
if (kubernetesEnabled && !includesHostProcessing(singerLogConfig)) {
LOG.info("Skipping pod-only config {} for host-level initialization",
singerLogConfig.getName());
continue;
}

ArrayList<String> directories = SingerUtils.splitString(singerLogConfig.getLogDir());
SingerLog singerLog = new SingerLog(singerLogConfig);

Expand Down Expand Up @@ -754,8 +773,20 @@ private long checkAndCleanupLogStreams(int deletionTimeout) {
* @throws SingerLogException
*/
public void initializeLogStreamForPod(String podUid, Collection<SingerLogConfig> configs) throws SingerLogException {
// initialize logstreams for the supplied configs only
boolean podAllowlistEnabled = podAllowlistMetadataKey != null && !podAllowlistMetadataKey.isEmpty();
Map<String, String> podMetadata = PodMetadataFetcher.getInstance().getPodMetadata(podUid);

String podIdentifier = null;
if (podAllowlistEnabled && podMetadata != null) {
podIdentifier = podMetadata.get(podAllowlistMetadataKey);
}

for(SingerLogConfig singerLogConfig : configs) {
// Skip if this config should not be initialized for this pod
if (podAllowlistEnabled && !shouldInitializeForPod(podIdentifier, singerLogConfig)) {
continue;
}

ArrayList<String> directories = SingerUtils.splitString(singerLogConfig.getLogDir());
String logPathKeys = directories.stream()
.map(dir -> new File(podLogDirectory + "/" + podUid + "/" + dir).getAbsolutePath())
Expand All @@ -771,7 +802,6 @@ public void initializeLogStreamForPod(String podUid, Collection<SingerLogConfig>
clone.setName(podUid + POD_LOGNAME_SEPARATOR + clone.getName());
singerLog = new SingerLog(clone, podUid);

Map<String, String> podMetadata = PodMetadataWatcher.getInstance().getPodMetadata(podUid);
if (podMetadata != null) {
LOG.info("Initializing pod metadata {} for pod: {}", podMetadata, podUid);
OpenTsdbMetricConverter.incr("pod_metadata_enabled", 1, "pod=" + podUid, "log=" + clone.getName());
Expand All @@ -794,6 +824,51 @@ public void initializeLogStreamForPod(String podUid, Collection<SingerLogConfig>
}
}

/**
* Check if a config should be processed at the host level.
*
* @param singerLogConfig the log config to check
* @return true if this config should be processed at host level
*/
private boolean includesHostProcessing(SingerLogConfig singerLogConfig) {
if (!singerLogConfig.isSetPodAllowlist() || singerLogConfig.getPodAllowlist().isEmpty()) {
return true;
}
return singerLogConfig.getPodAllowlist().contains(INCLUDE_HOST_MARKER);
}

/**
* Determine if a log stream config should be initialized for a pod.
* - No allowlist or empty allowlist: initialize for all pods (universal)
* - Allowlist with pod IDs: only initialize if pod's identifier matches
*
* @param podIdentifier the pod's identifier from metadata (null if not present)
* @param singerLogConfig the log config to check
* @return true if the config should be initialized for this pod
*/
private boolean shouldInitializeForPod(String podIdentifier, SingerLogConfig singerLogConfig) {
if (!singerLogConfig.isSetPodAllowlist() || singerLogConfig.getPodAllowlist().isEmpty()) {
return true;
}

if (podIdentifier == null) {
return false;
}

// Check if pod's identifier matches any entry (skip INCLUDE_HOST_MARKER marker)
for (String entry : singerLogConfig.getPodAllowlist()) {
if (INCLUDE_HOST_MARKER.equals(entry)) {
continue;
}
if (podIdentifier.startsWith(entry)) {
OpenTsdbMetricConverter.incr(SingerMetrics.POD_ALLOWLIST_MATCH,
"podId=" + podIdentifier, "log=" + singerLogConfig.getName());
return true;
}
}
return false;
}

/**
* Exposed for testing only
* @return
Expand Down
Loading