Transform events with SQL
You can use the SQL transform to process data with SQL statements. With support for multiple statement types and complex data flows, the SQL transform gives you a powerful and flexible way to process, filter, and enrich data using a familiar SQL syntax.
The SQL Operation complies with the ANSI SQL standard. It includes the full capabilities of Apache Flink SQL 1.20, including access to a comprehensive set of built-in functions and operators for data processing. This document describes a subset of the available capabilities. To learn more about Flink SQL, see SQL in the Flink 1.20 documentation.
To learn how to use the SQL transform in the Grepr UI or the REST API, see How do I use the SQL transform?.
When should I use the SQL transform?
The SQL transform is ideal when you need to perform complex data transformations, filtering, or enrichment using a declarative SQL syntax, including:
- Data normalization and standardization
- Message format standardization: Converting various log formats to a standard structure.
- Timestamp normalization: Converting different timestamp formats to standard milliseconds.
- Field standardization: Ensuring consistent field names and types across services.
- Log enhancement and enrichment
- Computed metrics: Adding calculated fields like processing delays and error rates.
- Categorization: Adding severity levels, priority flags, or classification labels.
- Contextual data: Extracting and promoting key information from log messages.
- Security and compliance
- Data redaction: Removing or masking sensitive information (passwords, tokens, PII).
- Audit trail enhancement: Adding tracking fields for compliance requirements.
- Anomaly flagging: Marking unusual patterns or suspicious activities.
- Performance and monitoring
- SLA monitoring: Extracting response times and performance metrics.
- Error analysis: Categorizing and enriching error logs for better debugging.
- Trend analysis: Preparing log data for time-series analysis and alerting.
- Data pipeline optimization
- Selective processing: Filtering logs to reduce downstream processing load.
- Format conversion: Converting logs for compatibility with downstream systems.
- Batch preparation: Aggregating and preparing data for batch analytics.
How does the SQL transform process data?
The SQL transform processes data through one or more SQL statements. You specify SQL statements that define the data structure after processing, and the SQL transform processes the input data to transform it into those structures. You can configure the transform to send the results of running the SQL statement to either:
- A
view, which is a temporary table for intermediate results. Subsequent statements can then reference theviewduring their processing. - An
outputobject, which converts the results of a view statement into a Flink DataStream object that can be consumed by downstream statements. Eachoutputmust have a unique name assigned to it.
view and output statements are defined as part of a configuration that also includes details such as:
- Any
viewscreated by upstream statements that contain input data for the statement. - Tables to reference during processing that are part of a data lake dataset.
When the SQL transform runs, it validates that:
- All
viewscreated by the transform use unique names. - At least one
outputobject exists to produce results, and alloutputnames in the transform are unique.
How do I use the SQL transform?
You use SQL transforms by adding them to your Grepr pipelines. To add a SQL transform to a pipeline in the Grepr UI, you configure the transform when you configure a pipeline filter. See Filter events in a Grepr pipeline.
To add a transform in the REST API, you define the SQL transform as part of the pipeline’s job graph using a SqlOperation. The SqlOperation has the following schema:
{
"name": "<operation_name>",
"type": "sql-operation",
"inputs": {
"<input-tablename-1>": "<LOG_EVENT|VARIANT|COMPLETE_SPAN>",
"<input-tablename-2>": "<LOG_EVENT|VARIANT|COMPLETE_SPAN>",
"...": "..."
},
"statements": [
{
"type": "sql_view",
"tableName": "<output-object-name>",
"sqlQuery": "<sql-statement>",
"shouldMaterialize": boolean
},
{
"type": "sql_output",
"outputName": "<output-stream-name>",
"outputType": "<LOG_EVENT|VARIANT|COMPLETE_SPAN>",
"sqlQuery": "<sql-statement>"
}
],
"availableDatasets": [
"<dataset-identifier-1>",
"<dataset-identifier-2>",
"..."
],
"globalStateTtl": "<time-to-live-duration>",
"watermarkDelay": "<watermark-delay-duration>"
}| Field | Type | Description |
|---|---|---|
name | string | A name identifying the SQL operation. This field is required. |
type | string | The type of operation. For the SQL transform, this value is always sql-operation. This field is required. |
inputs | object | Key-value pairs where the key is an input table name and the value is the data type. Each input table corresponds to an input stream that is connected to this operation in the job graph, and the type specifies the input’s schema. Each input appears as a table that can be queried in SQL statements. |
statements | array | One or more SQL statements to run. Multiple statements run in the same order that you define them. Statements are either view statements or output statements. This field is required. See Statement types. |
availableDatasets | array | Identifiers for datasets containing tables that can be used by the statements in this transform configuration. |
globalStateTtl | string | The global state time-to-live. When running stateful streaming operations, any unused state that is not used after this time is dropped to reduce memory pressure. |
shouldMaterialize | boolean | Whether to materialize a query’s result set. Use this option to force downstream operations to reuse query results instead of re-running the query. |
To learn about the SQL transform input and output types, see SQL transform supported data types.
The following example of a SQL transform configuration creates a view named error_logs that filters for input log events with a severity of 13 or higher. It then creates an output stream named critical_errors that selects log events from the error_logs view where the message contains the string “CRITICAL”:
{
"name": "example_sql_operation",
"type": "sql-operation",
"inputs": {
"logs": "LOG_EVENT"
},
"statements": [
{
"type": "sql_view",
"tableName": "error_logs",
"sqlQuery": "SELECT * FROM logs WHERE severity >= 13",
"shouldMaterialize": false
},
{
"type": "sql_output",
"outputName": "critical_errors",
"outputType": "LOG_EVENT",
"sqlQuery": "SELECT * FROM error_logs WHERE message LIKE '%CRITICAL%'"
}
],
"availableDatasets": [],
"globalStateTtl": "PT2M",
"watermarkDelay": "PT30S"
}Performance considerations
The following are best practices to optimize the performance of your queries when using the SQL transform.
Query optimization
- Use
WHEREclauses early to filter data and reduce the resources required by complex queries. - Avoid
SELECT *when possible. Instead, explicitly list the columns required for your processing. - Because they are CPU-intensive, limit the use of regular expressions on high-volume streams.
The following is an example of an inefficient query. This query applies a regular expression to all log messages, even when the message doesn’t contain relevant content:
SELECT *,
REGEXP_EXTRACT(message, '(\\\\d{4}-\\\\d{2}-\\\\d{2})', 1) as date_extracted
FROM logsA better approach is to filter the logs first using a WHERE clause, and then apply the regular expression to the filtered logs:
SELECT *, -- Only processes logs that likely contain dates
-- Regex only runs on pre-filtered logs (much more efficient)
REGEXP_EXTRACT(message, '(\\\\d{4}-\\\\d{2}-\\\\d{2})', 1) as date_extracted
FROM logs
WHERE message LIKE '%-%-%' -- Quick pre-filter using simple pattern matchingRemoving the SELECT * and explicitly listing only the required fields can further improve performance for this query.
Memory and state management
- Computed fields increase the size of events and might impact downstream processing.
- Complex aggregations require more memory. If necessary, reduce time-to-live values to reduce memory usage.
- Because window operations buffer data, memory requirements increase with larger windows.
- Because JSON operations allocate temporary objects, you should avoid using them in tight loops.
Resource usage patterns
When you write SQL queries, consider the typical resource usage for common operations:
- String operations, such as UPPER, LOWER, and CONCAT, have low CPU overhead.
- Regular expression operations, such as REGEXP_REPLACE and REGEXP_EXTRACT, have high CPU overhead.
- JSON operations, such as JSON_VALUE and JSON_QUERY, have moderate CPU overhead.
- Mathematical operations have low CPU overhead.
- Window aggregations have high memory usage during window buffering.
Error handling
The SQL transform fails if:
- The SQL syntax is invalid. Malformed SQL queries are rejected during parsing.
- Field types are incompatible. For example, the standard fields in a log event must match their expected types when you include them in a query.
- Runtime errors occur when running a query. For example, division by zero or an invalid regular expression.
- JSON parsing errors occur, such as running invalid JSON operations on the attributes field.
Limitations
Streaming vs batch processing
The SQL transform processes both streaming and batch data. Compared to batch processing, streaming has the following limitations:
JOINoperations are limited.- Complex aggregations or joins might require state management. Your pipeline can fail if the amount of state used for aggregations or joins becomes too large. You can manage the amount of state using the time-to-live configuration setting (
globalStateTtl) in the SQL transform configuration. By default, this is set to two minutes.
Windowing limitations:
- Window functions require using windowing table-valued functions (TVFs), such as
TUMBLE,HOP,SESSION, orCUMULATE. - You cannot use unbounded aggregations. Instead, use windowing for aggregations in streaming mode.
Data ordering:
- Events might arrive out of order. Consider using watermarks for time-based operations.
ROW_NUMBER()and ranking functions might not guarantee consistent ordering across restarts.
Next steps
- To learn about using the SQL transform in the Grepr pipeline UI, see Filter events in a Grepr pipeline.
- To learn about supported data types for the SQL transform, see SQL transform supported data types.
- To see examples of using the SQL transform, see SQL transform examples.
- To learn about using supported SQL functions and operators, see SQL transform functions and operators.