diff --git a/solutions/observability/streams/management/extract.md b/solutions/observability/streams/management/extract.md index e7d2782c35..41edae1d2e 100644 --- a/solutions/observability/streams/management/extract.md +++ b/solutions/observability/streams/management/extract.md @@ -13,71 +13,30 @@ products: - id: cloud-kubernetes - id: elastic-stack --- -# Process documents [streams-extract-fields] +# Process your documents [streams-extract-fields] -After selecting a stream, use the **Processing** tab to add [processors](#streams-extract-processors) and [conditions](#streams-add-processor-conditions) that modify your documents and extract meaningful fields, so you can filter and analyze your data more effectively. +After selecting a stream, use the **Processing** tab to add [processors](#streams-add-processors) and [conditions](#streams-add-processor-conditions) that modify your unstructured documents and extract meaningful fields, so you can filter and analyze your data more effectively. -For example, in [Discover](../../../../explore-analyze/discover.md), extracted fields let you filter for log messages with an `ERROR` log level that occurred during a specific time period to help diagnose an issue. Without extracting the log level and timestamp fields from your messages, those filters wouldn't return meaningful results. +For example, in [Discover](../../../../explore-analyze/discover.md), extracted fields might let you filter for log messages with an `ERROR` log level that occurred during a specific time period to help diagnose an issue. Without extracting the log level and timestamp fields from your messages, those filters wouldn't return meaningful results. -The **Processing** tab also has the following features: +## Why process your documents with Streams? -- {applies_to}`serverless: preview` {applies_to}`stack: preview 9.3+` [Generate pipeline suggestions](#streams-generate-pipeline-suggestions). -- Simulate your processors and provide an immediate [preview](#streams-preview-changes) that's tested end to end. -- Flag indexing issues, like [mapping conflicts](#streams-processing-mapping-conflicts), so you can address them before applying changes. +- **[Add processors](#streams-add-processors)**: Use the Streams UI without needing to manually configuring configuring pipeline JSON or Grok syntax. +- {applies_to}`serverless: preview` {applies_to}`stack: preview 9.3+` **[Generate pipeline suggestions using AI](#streams-generate-pipeline-suggestions)**: Let Streams analyze sample documents and suggests pipeline patterns, so you're refining instead of writing from scratch. +- **[Preview changes](#streams-preview-changes)**: Use the data preview to view which fields your pattern extracts per document, removing the guesswork. +- **[Detect and resolve processing issues](#streams-detect-failures)**: Identify which processor or condition is causing documents to fail during processing. +- **[Catch mapping conflicts](#streams-processing-mapping-conflicts)**: Identify potential mapping conflicts before they cause cluster-wide failures. Streams simulates the indexing process end-to-end before deploying. -## Supported processors [streams-extract-processors] - -Streams supports the following processors: - -- [**Append**](./extract/append.md): Adds a value to an existing array field, or creates the field as an array if it doesn't exist. -- [**Concat**](./extract/concat.md): Concatenates a mix of field values and literal strings into a single field. -- [**Convert**](./extract/convert.md): Converts a field in the currently ingested document to a different type, such as converting a string to an integer. -- [**Date**](./extract/date.md): Converts date strings into timestamps, with options for timezone, locale, and output formatting. -- [**Dissect**](./extract/dissect.md): Extracts fields from structured log messages using defined delimiters instead of patterns, making it faster than Grok and ideal for consistently formatted logs. -- [**Drop**](./extract/drop.md): Drops the document without raising any errors. This is useful to prevent the document from getting indexed based on a condition. -- [**Enrich**](./extract/enrich.md): Adds data from an enrich policy to incoming documents, such as geographic coordinates from an IP address or account details from a user ID. -- [**Grok**](./extract/grok.md): Extracts fields from unstructured log messages using predefined or custom patterns, supports multiple match attempts in sequence, and can automatically generate patterns with an [LLM connector](/explore-analyze/ai-features/llm-guides/llm-connectors.md). -- [**Join**](./extract/join.md): Concatenates the values of multiple fields with a delimiter. -- [**Lowercase**](./extract/lowercase.md): Converts a string field to lowercase. -- [**Math**](./extract/math.md): Evaluates arithmetic or logical expressions. -- [**Network direction**](./extract/network-direction.md): Determines network traffic direction based on source and destination IP addresses. -- [**Redact**](./extract/redact.md): Redacts sensitive data in a string field by matching grok patterns. -- [**Remove**](./extract/remove.md): Removes existing fields or removes fields by prefix. -- [**Rename**](./extract/rename.md): Changes the name of a field, moving its value to a new field name and removing the original. -- [**Replace**](./extract/replace.md): Replaces parts of a string field according to a regular expression pattern with a replacement string. -- [**Set**](./extract/set.md): Assigns a specific value to a field, creating the field if it doesn't exist or overwriting its value if it does. -- [**Trim**](./extract/trim.md): Removes leading and trailing whitespace from a string field. -- [**Uppercase**](./extract/uppercase.md): Converts a string field to uppercase. - -### Processor limitations and inconsistencies [streams-processor-inconsistencies] - -Streams exposes a [Streamlang](./streamlang.md) configuration, but internally it relies on {{es}} ingest pipeline processors and ES|QL. Streamlang doesn't always have 1:1 parity with the ingest processors because it needs to support options that work in both ingest pipelines and ES|QL. In most cases, you won't need to worry about these details, but the underlying design decisions still affect the UI and available configuration options. The following are some limitations and inconsistencies when using Streamlang processors: - -- **Consistently typed fields**: ES|QL requires one consistent type per column, so workflows that produce mixed types across documents won't transpile. -- **Conversion of types**: ES|QL and ingest pipelines accept different conversion combinations and strictness (especially for strings), so `convert` can behave differently across targets. -- **Multi-value commands/functions**: Fields can contain one or multiple values. ES|QL and ingest processors don't always handle these cases the same way. For example, grok in ES|QL handles multiple values automatically, while the grok processor does not -- **Conditional execution**: ES|QL's enforced table shape limits conditional casting, parsing, and wildcard field operations that ingest pipelines can do per-document. -- **Arrays of objects / flattening**: Ingest pipelines preserve nested JSON arrays, while ES|QL flattens to columns, so operations like rename and delete on parent objects can differ or fail. - -## Add processors [streams-add-processors] +## Add and configure processors [streams-add-processors] Streams uses [{{es}} ingest pipelines](../../../../manage-data/ingest/transform-enrich/ingest-pipelines.md) made up of processors to transform your data, without requiring you to switch interfaces and manually update pipelines. To add a processor from the **Processing** tab: -1. Select **Create** → **Create processor** to open a list of supported processors. -1. Select a processor from the **Processor** menu. -1. Configure the processor and select **Create** to save the processor. - -After adding all desired processors and conditions, select **Save changes**. Streams parses all future data ingested into the stream into structured fields accordingly. - -Refer to individual [supported processors](#streams-extract-processors) for more on configuring specific processors. - -:::{note} -Applied changes aren't retroactive and only affect *future ingested data*. -::: +:::::{stepper} +::::{step} Let Streams suggest processors +:anchor: streams-generate-pipeline-suggestions -### Generate pipeline suggestions [streams-generate-pipeline-suggestions] ```{applies_to} stack: preview 9.3+ serverless: preview @@ -86,18 +45,37 @@ serverless: preview This feature requires a [Generative AI connector](kibana://reference/connectors-kibana/gen-ai-connectors.md). ::: -Setting up processors is generally a multistep process. For example, you might need a grok processor to extract fields, a date processor to convert timestamps, and a remove processor to get rid of temporary fields. Instead of creating individual processors manually, you can have AI suggest an entire pipeline for you: +Setting up processors is generally a multi-step process. For example, you might need a grok processor to extract fields, a date processor to convert timestamps, and a remove processor to get rid of temporary fields. Instead of creating individual processors manually, you can have AI suggest an entire pipeline for you: 1. From the **Processing** tab, select **Suggest a pipeline**. 1. Review the suggested processors, and either **Accept** or **Reject** the suggestions. 1. Select **Regenerate** to have Streams regenerate the suggested pipeline. Change the LLM that Streams uses to generate suggestions from the {icon}`controls` menu. -#### How does **Suggest a pipeline** work? [streams-pipeline-generation] +**How does **Suggest a pipeline** work?** :::{include} ../../../_snippets/streams-suggestions.md ::: +:::: -### Add conditions [streams-add-processor-conditions] +::::{step} Manually add processors +:anchor: streams-add-processors-manually + +If you know which processors you want to use, you can add them manually. Refer to the [Streamlang reference](./streamlang.md) for supported processor and configuration details. + +1. Select **Create processor**. You can also let Streams suggest processors by selecting [Suggest a pipeline](#streams-generate-pipeline-suggestions). +1. Select a processor from the **Processor** menu. + + :::{note} + Let Streams suggest patterns for [Grok](./extract/grok.md#streams-grok-patterns) and [dissect](./extract/dissect.md#streams-dissect-patterns) processors by selecting **Generate pattern**. This feature requires a [Generative AI connector](kibana://reference/connectors-kibana/gen-ai-connectors.md). + ::: + +1. Configure the processor and select **Create** to save the processor. +1. Optional: Enable **Ignore failures** if you want document processing to continue even when this processor fails. +1. Optional: For dissect, Grok, and rename processors, enable **Ignore missing fields** if you want processing to continue when a source field is missing. +:::: + +::::{step} Add conditions +:anchor: streams-add-processor-conditions You can add conditions, Boolean expressions that are evaluated for each document, and attach processors that only run when those conditions are met. @@ -106,7 +84,6 @@ To add a condition: 1. Select **Create** → **Create condition**. 1. Provide a **Field**, a **Value**, and a comparator. 1. Select **Create condition**. -1. Select **Save changes**. :::{dropdown} Supported comparators Streams processors support the following comparators: @@ -125,40 +102,10 @@ Streams processors support the following comparators: ::: After creating a condition, add a processor or another condition to it by selecting the {icon}`plus_in_circle`. +:::: -### Editing modes [streams-editing-modes] - -The Streams processing UI provides an [interactive mode](#streams-editing-interactive-mode) and a [YAML mode](#streams-editing-yaml-mode) for editing processors and conditions. - -To switch modes, select the appropriate tab from the top of the processing page. - -:::{image} ../../../images/streams-editing-modes.png -:screenshot: -::: - -Streams defaults to interactive mode unless the configuration can't be represented in interactive mode (for example, when nesting levels are too deep). - -#### Interactive mode [streams-editing-interactive-mode] - -**Interactive** mode provides a form-based interface for creating and editing processors. This mode works best for: - -- Users who prefer a guided, visual approach -- Configurations that don't require deeply nested conditions - -#### YAML mode [streams-editing-yaml-mode] -```{applies_to} -stack: ga 9.3+ -``` - -**YAML** mode provides a code editor for writing Streamlang directly. This mode works best for: - -- Users who prefer working with code -- Advanced configurations with complex or deeply nested conditions - -Refer to the [Streamlang reference](./streamlang.md) for the complete syntax, condition operators, and examples. - - -### Preview changes [streams-preview-changes] +::::{step} Preview changes +:anchor: streams-preview-changes After you create processors, the **Data preview** tab simulates processor results with additional filtering options depending on the outcome of the simulation. @@ -171,34 +118,25 @@ To avoid unexpected results, it's best to add processors rather than remove or r The **Data preview** tab loads 100 documents from your existing data and runs your changes against them. For any newly created processors and conditions, the preview results are reliable, and you can freely create and reorder during the preview. -After making sure everything in the **Data preview** tab is correct, select **Save changes** to apply your changes to the data stream. - -If you edit the stream after saving your changes, keep the following in mind: +If you edit the stream after previewing your changes, keep the following in mind: - Adding processors to the end of the list works as expected. - Editing or reordering existing processors can cause inaccurate results. Because the pipeline might have already processed the documents used for sampling, **Data preview** cannot accurately simulate changes to existing data. - Adding a new processor and moving it before an existing processor can cause inaccurate results. **Data preview** only simulates the new processor, not the existing ones, so the simulation may not accurately reflect changes to existing data. +:::: -### Ignore failures [streams-ignore-failures] - -Each processor has the **Ignore failures** option. When enabled, document processing continues when even if the processor fails. - -### Ignore missing fields [streams-ignore-missing-fields] - -Dissect, grok, and rename processors include the **Ignore missing fields** option. When enabled, document processing continues even if a source field is missing. +::::{step} View processor statistics and detected fields +:anchor: streams-stats-and-detected-fields -### Processor actions [streams-processor-actions] - -To modify an existing processor, open the actions menu {icon}`boxes_vertical` next to it to see the available options: +Once saved, the processor displays its success rate and the fields it added. -* **Move up** or **Move down**: Change the order of the processor. -* **Add description**: Change the processor description from its metadata to a description of your choice. -* **Remove description**: For processors with an added description, use this option to return the description to the metadata. -* **Edit**: Modify the processor configuration. -* **Duplicate**: Create another processor with the same configuration to use as a template. -* **Delete**: Remove the processor permanently. +:::{image} ../../../images/logs-streams-field-stats.png +:screenshot: +::: +:::: -## Detect and resolve failures [streams-detect-failures] +::::{step} Detect and resolve failures +:anchor: streams-detect-failures Documents can fail processing for various reasons. Streams helps you identify and resolve these issues before deploying changes. @@ -220,25 +158,72 @@ Streams displays failures at the bottom of the process editor. Some failures mig :::{image} ../../../images/logs-streams-processor-failures.png :screenshot: ::: +:::: -### Mapping conflicts [streams-processing-mapping-conflicts] +::::{step} Detect mapping conflicts +:anchor: streams-processing-mapping-conflicts -As part of processing, Streams simulates your changes end to end to check for mapping conflicts. If it detects a conflict, Streams marks the processor as failed and displays a message like the following: +As part of processing, Streams simulates your changes end-to-end to check for mapping conflicts. If it detects a conflict, Streams marks the processor as failed and displays a message like the following: :::{image} ../../../images/logs-streams-mapping-conflicts.png :screenshot: ::: Use the information in the failure message to find and troubleshoot the mapping issues. +:::: -## Processor statistics and detected fields [streams-stats-and-detected-fields] +::::{step} Save changes +After adding all desired processors and conditions, select **Save changes**. After creating your processor, Streams parses all future data ingested into the stream into structured fields accordingly. -Once saved, the processor displays its success rate and the fields it added. +:::{note} +Applied changes aren't retroactive and only affect *future ingested data*. +::: -:::{image} ../../../images/logs-streams-field-stats.png +:::: +::::: + +## Modify an existing processor [streams-processor-actions] + +To modify an existing processor, open the actions menu {icon}`boxes_vertical` next to it to see the available options: + +* **Move up** or **Move down**: Change the order of the processor. +* **Add description**: Change the processor description from its metadata to a description of your choice. +* **Remove description**: For processors with an added description, use this option to return the description to the metadata. +* **Edit**: Modify the processor configuration. +* **Duplicate**: Create another processor with the same configuration to use as a template. +* **Delete**: Remove the processor permanently. + +## Switch between interactive and YAML editing modes [streams-editing-modes] + +The Streams processing UI provides an [interactive mode](#streams-editing-interactive-mode) and a [YAML mode](#streams-editing-yaml-mode) for editing processors and conditions. + +To switch modes, select the appropriate tab from the top of the processing page. + +:::{image} ../../../images/streams-editing-modes.png :screenshot: ::: +Streams defaults to interactive mode unless the configuration can't be represented in interactive mode (for example, when nesting levels are too deep). + +### Interactive mode [streams-editing-interactive-mode] + +**Interactive** mode provides a form-based interface for creating and editing processors. This mode works best for: + +- Users who prefer a guided, visual approach +- Configurations that don't require deeply nested conditions + +### YAML mode [streams-editing-yaml-mode] +```{applies_to} +stack: ga 9.3+ +``` + +**YAML** mode provides a code editor for writing Streamlang directly. This mode works best for: + +- Users who prefer working with code +- Advanced configurations with complex or deeply nested conditions + +Refer to the [Streamlang reference](./streamlang.md) for the complete syntax, condition operators, and examples. + ## Advanced: How and where do these changes get applied to the underlying data stream? [streams-applied-changes] When you save processors, Streams appends processing to the best-matching ingest pipeline for the data stream. It either chooses the best-matching pipeline ending in `@custom` in your data stream, or it adds one for you. @@ -276,6 +261,6 @@ You can still add your own processors manually to the `@custom` pipeline if need ## Known limitations [streams-known-limitations] -- Streams does not support all processors. More processors will be added in future versions. -- The data preview simulation might not accurately reflect the changes to the existing data when editing existing processors or re-ordering them. Streams will allow proper simulations using original documents in a future version. +- Streams does not support all processors. Refer to the [Streamlang reference](./streamlang.md) for supported processors. +- The data preview simulation might not accurately reflect the changes to the existing data when editing existing processors or re-ordering them. - Streams can't properly handle arrays. While it supports basic actions like appending or renaming, it can't access individual array elements. For classic streams, the workaround is to use the [manual pipeline configuration](./extract/manual-pipeline-configuration.md) that supports Painless scripting and all ingest processors. diff --git a/solutions/observability/streams/management/extract/append.md b/solutions/observability/streams/management/extract/append.md index a8b5bed2ec..2d42f6b230 100644 --- a/solutions/observability/streams/management/extract/append.md +++ b/solutions/observability/streams/management/extract/append.md @@ -24,7 +24,7 @@ To use an append processor: 1. Set **Source Field** to the field you want append values to. 1. Set **Target field** to the values you want to append to the **Source Field**. -This functionality uses the {{es}} [append processor](elasticsearch://reference/enrich-processor/append-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [append processor](elasticsearch://reference/enrich-processor/append-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## YAML reference [streams-append-yaml-reference] diff --git a/solutions/observability/streams/management/extract/convert.md b/solutions/observability/streams/management/extract/convert.md index eb9876aa9b..58d888b57f 100644 --- a/solutions/observability/streams/management/extract/convert.md +++ b/solutions/observability/streams/management/extract/convert.md @@ -29,7 +29,7 @@ To convert a field to a different data type: If you add a **Convert** processor inside a condition group (a **WHERE** block), you must set a **Target field**. :::: -This functionality uses the {{es}} [Convert processor](elasticsearch://reference/enrich-processor/convert-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Convert processor](elasticsearch://reference/enrich-processor/convert-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## YAML reference [streams-convert-yaml-reference] diff --git a/solutions/observability/streams/management/extract/date.md b/solutions/observability/streams/management/extract/date.md index 4b5721527d..a6c55a3c18 100644 --- a/solutions/observability/streams/management/extract/date.md +++ b/solutions/observability/streams/management/extract/date.md @@ -25,7 +25,7 @@ To extract a timestamp field using the date processor: 1. Set the **Source Field** to the field containing the timestamp. 1. Set the **Format** field to one of the accepted date formats (ISO8601, UNIX, UNIX_MS, or TAI64N) or use a Java time pattern. Refer to the [example formats](#streams-date-examples) for more information. -This functionality uses the {{es}} [Date processor](elasticsearch://reference/enrich-processor/date-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Date processor](elasticsearch://reference/enrich-processor/date-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## Example formats [streams-date-examples] diff --git a/solutions/observability/streams/management/extract/dissect.md b/solutions/observability/streams/management/extract/dissect.md index 9c3ad333f9..0d167ba93f 100644 --- a/solutions/observability/streams/management/extract/dissect.md +++ b/solutions/observability/streams/management/extract/dissect.md @@ -26,7 +26,7 @@ To parse a log message with a dissect processor: 1. Set the **Source Field** to the field you want to dissect. 1. Set the delimiters you want to use in the **Pattern** field. Refer to the [example pattern](#streams-dissect-example) for more information on setting delimiters. -This functionality uses the {{es}} [Dissect processor](elasticsearch://reference/enrich-processor/dissect-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Dissect processor](elasticsearch://reference/enrich-processor/dissect-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## Example dissect pattern [streams-dissect-example] diff --git a/solutions/observability/streams/management/extract/drop.md b/solutions/observability/streams/management/extract/drop.md index 4a95b2ceb0..4140a09aeb 100644 --- a/solutions/observability/streams/management/extract/drop.md +++ b/solutions/observability/streams/management/extract/drop.md @@ -28,7 +28,7 @@ To configure a condition for dropping documents: The default is the `always` condition. Not setting a specific condition results in every document that matches the drop condition getting dropped from indexing. ::: -This functionality uses the {{es}} [Drop processor](elasticsearch://reference/enrich-processor/drop-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Drop processor](elasticsearch://reference/enrich-processor/drop-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## YAML reference [streams-drop-yaml-reference] diff --git a/solutions/observability/streams/management/extract/enrich.md b/solutions/observability/streams/management/extract/enrich.md index 0e04a3f69a..070732cd05 100644 --- a/solutions/observability/streams/management/extract/enrich.md +++ b/solutions/observability/streams/management/extract/enrich.md @@ -27,7 +27,7 @@ To enrich documents: 1. Select an **Enrich policy** from the list of available policies. 1. Set **Target field** to the field where the enriched data is stored. -This functionality uses the {{es}} [Enrich processor](elasticsearch://reference/enrich-processor/enrich-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Enrich processor](elasticsearch://reference/enrich-processor/enrich-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## YAML reference [streams-enrich-yaml-reference] diff --git a/solutions/observability/streams/management/extract/grok.md b/solutions/observability/streams/management/extract/grok.md index 93b3963d87..4ef1b18c3f 100644 --- a/solutions/observability/streams/management/extract/grok.md +++ b/solutions/observability/streams/management/extract/grok.md @@ -30,7 +30,7 @@ To parse a log message with a grok processor: 1. Set the **Source Field** to the field you want to search for grok matches. 1. Set the patterns you want to use in the **Grok patterns** field. Refer to the [example pattern](#streams-grok-example) for more information on patterns. -This functionality uses the {{es}} [Grok processor](elasticsearch://reference/enrich-processor/grok-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Grok processor](elasticsearch://reference/enrich-processor/grok-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## Example grok pattern [streams-grok-example] diff --git a/solutions/observability/streams/management/extract/redact.md b/solutions/observability/streams/management/extract/redact.md index 9fdde6e1b2..f3a156a954 100644 --- a/solutions/observability/streams/management/extract/redact.md +++ b/solutions/observability/streams/management/extract/redact.md @@ -25,7 +25,7 @@ To redact sensitive information: 1. Set the **Source Field** to the field containing text you want to redact. 1. Set the **Patterns** to one or more grok patterns that match sensitive data (for example, IP addresses or email addresses). -This functionality uses the {{es}} [Redact processor](elasticsearch://reference/enrich-processor/redact-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Redact processor](elasticsearch://reference/enrich-processor/redact-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## YAML reference [streams-redact-yaml-reference] diff --git a/solutions/observability/streams/management/extract/remove.md b/solutions/observability/streams/management/extract/remove.md index 1c4bae6b9c..6178a4b837 100644 --- a/solutions/observability/streams/management/extract/remove.md +++ b/solutions/observability/streams/management/extract/remove.md @@ -24,7 +24,7 @@ To remove a field: 1. From the **Processor** menu, select **Remove** to remove a field or **Remove by prefix** to remove a field and all its nested fields. 1. Set the **Source Field** to the field you want to remove. -This functionality uses the {{es}} [Remove processor](elasticsearch://reference/enrich-processor/remove-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Remove processor](elasticsearch://reference/enrich-processor/remove-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## YAML reference [streams-remove-yaml-reference] diff --git a/solutions/observability/streams/management/extract/rename.md b/solutions/observability/streams/management/extract/rename.md index e4f04ad260..f7578cdb08 100644 --- a/solutions/observability/streams/management/extract/rename.md +++ b/solutions/observability/streams/management/extract/rename.md @@ -24,7 +24,7 @@ To use a rename processor: 1. Set **Source Field** to the field you want to rename. 1. Set **Target field** to the new name you want to use for the **Source Field**. -This functionality uses the {{es}} [Rename processor](elasticsearch://reference/enrich-processor/rename-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Rename processor](elasticsearch://reference/enrich-processor/rename-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## YAML reference [streams-rename-yaml-reference] diff --git a/solutions/observability/streams/management/extract/replace.md b/solutions/observability/streams/management/extract/replace.md index 1dae045dbf..265d2592fe 100644 --- a/solutions/observability/streams/management/extract/replace.md +++ b/solutions/observability/streams/management/extract/replace.md @@ -26,7 +26,7 @@ To use the **Replace** processor: 1. Set the **Pattern** to the regular expression or text that you want to replace. 1. Set the **Replacement** to the value that will replace the portion of the string matching your pattern. Replacements can be text, an empty value, or a capture group reference. -This functionality uses the {{es}} [Gsub processor](elasticsearch://reference/enrich-processor/gsub-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Gsub processor](elasticsearch://reference/enrich-processor/gsub-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## YAML reference [streams-replace-yaml-reference] diff --git a/solutions/observability/streams/management/extract/set.md b/solutions/observability/streams/management/extract/set.md index 4af24f5bb7..d3de236e4c 100644 --- a/solutions/observability/streams/management/extract/set.md +++ b/solutions/observability/streams/management/extract/set.md @@ -24,7 +24,7 @@ To use a set processor: 1. Set **Source Field** to the field you want to insert, upsert, or update. 1. Set **Value** to the value you want the source field to be set to. -This functionality uses the {{es}} [Set processor](elasticsearch://reference/enrich-processor/set-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies). +This functionality uses the {{es}} [Set processor](elasticsearch://reference/enrich-processor/set-processor.md) internally, but you configure it in Streamlang. Streamlang doesn't always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../streamlang.md#streams-processor-inconsistencies). ## YAML reference [streams-set-yaml-reference] diff --git a/solutions/observability/streams/management/streamlang.md b/solutions/observability/streams/management/streamlang.md index 77f431ef7e..350fee7d30 100644 --- a/solutions/observability/streams/management/streamlang.md +++ b/solutions/observability/streams/management/streamlang.md @@ -78,6 +78,16 @@ The following table lists all available processors. Refer to the individual proc | [`trim`](./extract/trim.md) | {applies_to}`stack: ga 9.4+` Removes leading and trailing whitespace from a string field. | | [`uppercase`](./extract/uppercase.md) | {applies_to}`stack: ga 9.4+` Converts a string field to uppercase. | +### Processor limitations and inconsistencies [streams-processor-inconsistencies] + +Streams exposes a Streamlang configuration, but internally it relies on {{es}} ingest pipeline processors and {{esql}}. Streamlang doesn't always have 1:1 parity with the ingest processors because it needs to support options that work in both ingest pipelines and {{esql}}. In most cases, you won't need to worry about these details, but the underlying design decisions still affect the UI and available configuration options. The following are some limitations and inconsistencies when using Streamlang processors: + +- **Consistently typed fields**: {{esql}} requires one consistent type per column, so workflows that produce mixed types across documents won't transpile. +- **Conversion of types**: {{esql}} and ingest pipelines accept different conversion combinations and strictness (especially for strings), so `convert` can behave differently across targets. +- **Multi-value commands/functions**: Fields can contain one or multiple values. {{esql}} and ingest processors don't always handle these cases the same way. For example, grok in {{esql}} handles multiple values automatically, while the grok processor does not. +- **Conditional execution**: {{esql}}'s enforced table shape limits conditional casting, parsing, and wildcard field operations that ingest pipelines can do per-document. +- **Arrays of objects / flattening**: Ingest pipelines preserve nested JSON arrays, while {{esql}} flattens to columns, so operations like rename and delete on parent objects can differ or fail. + ## Conditions [streams-streamlang-conditions] Conditions are Boolean expressions that control when processors run and how wired streams route data into partitions. They appear in `where` clauses on processors, in [condition blocks](#streams-streamlang-condition-blocks), and in stream [partitioning](#streams-streamlang-partition-conditions).