SQL Transform
The SQL Transform operator allows you to transform log events using SQL queries. This operator provides a powerful and flexible way to process, filter, and enrich log data using the familiar SQL syntax.
The SQL Transform uses ANSI SQL dialect with the full capabilities of Apache Flink SQL 1.19, providing access to a comprehensive set of built-in functions and operators for log processing. This document describes a subset of the available capabilities. For full details, please see the Flink SQL documentation (opens in a new tab).
Overview
The SQL Transform operator:
- Processes LogEvent objects through SQL queries
- Uses the input table alias
logs
to reference incoming log data - Automatically preserves omitted LogEvent fields from input
- Adds computed columns to the event attributes
- Supports complex transformations, filtering, and enrichment
Input Table Schema
When using SQL Transform, your input log events are available through a table called logs
with the following schema:
Column | Type | Description |
---|---|---|
id | STRING | Globally unique identifier for the log event |
eventtimestamp | TIMESTAMP_LTZ(3) | Milliseconds since the Unix epoch when the event occurred (parsed from the event) |
receivedtimestamp | TIMESTAMP_LTZ(3) | Milliseconds since the Unix epoch when Grepr received the event |
message | STRING | The log message content |
severity | INT | Log severity level (1-24, following OpenTelemetry convention) |
tags | MAP<STRING, ARRAY<STRING>> | Key-value pairs for filtering and routing |
attributes | STRING | JSON string containing structured data associated with the event |
Important Notes:
- To access service or host information, query the tags map:
tags['service']
ortags['host']
- When transforming fields like
message
orseverity
, the transformed column must appear before the*
wildcard in your SELECT clause to take precedence
Basic Usage
Identity Query
SELECT * FROM logs
Returns all log events unchanged.
View Example Input/Output
Input:
{
"id": "0H19GZK97FTKS",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "State backend is set to heap memory",
"tags": {
"service": ["grepr-query"],
"host": ["ip-10-12-4-129.ec2.internal"]
},
"attributes": {
"process": {
"thread": {
"name": "thread-0"
}
}
}
}
Output:
{
"id": "0H19GZK97FTKS",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "State backend is set to heap memory",
"tags": {
"service": ["grepr-query"],
"host": ["ip-10-12-4-129.ec2.internal"]
},
"attributes": {
"process": {
"thread": {
"name": "thread-0"
}
}
}
}
Simple Filtering
SELECT * FROM logs WHERE severity < 5
Filters events to only include those with severity less than 5 (TRACE level).
View Example Input/Output
Input:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 1,
"message": "Debug trace message",
"tags": {"service": ["api"]},
"attributes": {}
},
{
"id": "evt2",
"eventtimestamp": 1724214074063,
"receivedtimestamp": 1724214074189,
"severity": 9,
"message": "Info level message",
"tags": {"service": ["api"]},
"attributes": {}
}
]
Output:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 1,
"message": "Debug trace message",
"tags": {"service": ["api"]},
"attributes": {}
}
]
Note: The INFO level message (severity 9) was filtered out.
Message Transformation
SELECT UPPER(message) as message, * FROM logs
Converts all log messages to uppercase.
Important: When transforming existing fields, the transformed column must come before the *
wildcard to override the original value.
Adding Computed Fields
SELECT *,
UPPER(message) as upper_message,
CHAR_LENGTH(message) as message_length,
severity * 10 as severity_scaled
FROM logs
Adds three new computed fields to the event attributes.
View Example Input/Output
Input:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Processing request",
"tags": {"service": ["api"]},
"attributes": {
"userId": "12345"
}
}
Output:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Processing request",
"tags": {"service": ["api"]},
"attributes": {
"userId": "12345",
"upper_message": "PROCESSING REQUEST",
"message_length": 18,
"severity_scaled": 90
}
}
Note: Computed fields are added to the attributes object.
Custom LogEvent Construction
Grepr automatically adds default values for fields that are not specified. The defaults are:
id
: when not specified orNULL
, defaults to a new random UUID.eventtimestamp
: when not specified orNULL
, defaults to the current timestamp in milliseconds.receivedtimestamp
: when not specified orNULL
, defaults to the current timestamp in milliseconds.severity
: when not specified orNULL
, defaults to 9 (INFO level).message
: when not specified orNULL
, defaults to an empty string.tags
: when not specified orNULL
, defaults to an empty map.attributes
: when not specified orNULL
, defaults to an empty JSON object.
-- Explicitly construct a new LogEvent with specific fields.
SELECT
UPPER(message) as message,
CASE WHEN severity > 16 THEN 21 ELSE severity END as severity,
MAP['transformed', ARRAY['true']] as tags,
JSON_OBJECT('original_message' VALUE message) as attributes
FROM logs
View Example Input/Output
Input:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 17,
"message": "user login successful",
"tags": {"service": ["auth"]},
"attributes": {}
}
Output:
{
"id": "12345678-1234-5678-1234-567812345678", // New UUID generated
"eventtimestamp": 174000000000, // Current timestamp in milliseconds
"receivedtimestamp": 174000000002, // Current timestamp in milliseconds
"severity": 21, // Adjusted severity
"message": "USER LOGIN SUCCESSFUL", // Transformed message
"tags": {"transformed": ["true"]}, // New tags
"attributes": { // New attributes
"original_message": "user login successful"
}
}
Advanced Examples
Content-Based Filtering and Enrichment
SELECT *, -- Select all original fields
-- Add a category based on message content
CASE
WHEN message LIKE '%ERROR%' THEN 'error_detected' -- If message contains "ERROR"
WHEN message LIKE '%WARN%' THEN 'warning_detected' -- If message contains "WARN"
ELSE 'normal' -- For all other messages
END as log_category,
-- Replace any IP-like address with placeholder text
REGEXP_REPLACE(message, '\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', '<redacted>') as sanitized_message
FROM logs
WHERE severity >= 13 -- Only process WARN level and above (13-24)
View Example Input/Output
Input:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 17,
"message": "ERROR: Database connection to ip 192.1.23.32 failed",
"tags": {"service": ["database"]},
"attributes": {}
},
{
"id": "evt2",
"eventtimestamp": 1724214074063,
"receivedtimestamp": 1724214074189,
"severity": 9,
"message": "INFO: Processing complete",
"tags": {"service": ["api"]},
"attributes": {}
}
]
Output:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 17,
"message": "ERROR: Database connection to ip 192.1.23.32 failed",
"tags": {"service": ["database"]},
"attributes": {
"log_category": "error_detected",
"sanitized_message": "ERROR: Database connection to ip redacted failed",
}
}
]
Note: INFO level message was filtered out due to severity < 13.
JSON Attribute Processing
SELECT *, -- Select all original fields
-- Extract thread name from nested JSON structure
JSON_VALUE(attributes, '$.process.thread.name') as thread_name,
-- Extract status field from JSON attributes
JSON_VALUE(attributes, '$.status') as status_code
FROM logs
-- Only process logs that have a 'process' field in their JSON attributes
WHERE JSON_EXISTS(attributes, '$.process')
View Example Input/Output
Input:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Request processed",
"tags": {"service": ["api"]},
"attributes": {
"process": {
"thread": {
"name": "worker-1"
}
},
"status": "success"
},
{
"id": "evt2",
"eventtimestamp": 1724214074063,
"receivedtimestamp": 1724214074189,
"severity": 9,
"message": "Simple log message",
"tags": {"service": ["web"]},
"attributes": {
"simple": true
}
}
]
Output:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Request processed",
"tags": {"service": ["api"]},
"attributes": {
"process": {"thread": {"name": "worker-1"}},
"status": "success",
"thread_name": "worker-1",
"status_code": "success"
}
}
]
Note: Event without 'process' field was filtered out.
Time-Based Analysis
SELECT *, -- Select all original fields
-- Extract the hour (0-23) from the event timestamp
EXTRACT(HOUR FROM eventtimestamp) as event_hour,
-- Extract the day of the month (1-31) from the event timestamp
EXTRACT(DAY FROM eventtimestamp) as event_day
FROM logs
View Example Input/Output
Input:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Processing request",
"tags": {"service": ["api"]},
"attributes": {}
}
Output:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Processing request",
"tags": {"service": ["api"]},
"attributes": {
"event_hour": 4,
"event_day": 21
}
}
Note: Assumes event occurred on August 21, 2024 at 04:21:14 UTC.
Function Compatibility
The SQL Transform uses Apache Flink SQL 1.19 with ANSI SQL dialect. Here are important function compatibility notes:
Timestamp Functions
- Use:
TO_TIMESTAMP_LTZ(eventtimestamp, 3)
to convert millisecond timestamps to TIMESTAMP - Don't use:
TIMESTAMP_MILLIS()
(not available in Flink SQL 1.19 - useTO_TIMESTAMP_LTZ()
instead)
String Functions
- Available:
REGEXP_REPLACE()
,UPPER()
,LOWER()
,CONCAT()
,CHAR_LENGTH()
,LEFT()
,RIGHT()
, etc. - Pattern: Most standard SQL string functions are supported
- Note:
SPLIT_INDEX()
is not available in Flink SQL 1.19
Date/Time Functions
- Available:
EXTRACT()
,CURRENT_TIMESTAMP
,DATE_FORMAT()
, etc. - Note: Always convert millisecond timestamps first using
TO_TIMESTAMP_LTZ()
JSON Functions
- Available:
JSON_VALUE()
,JSON_EXISTS()
for extracting data from JSON strings - Note: The
attributes
field is a JSON string that can be processed with these functions
For a complete list of available functions, see the Flink SQL documentation (opens in a new tab).
Multi-Field Transformations
SELECT
-- Explicitly select these core fields
id,
eventtimestamp,
receivedtimestamp,
tags,
attributes,
-- Transform the message by adding a suffix
CONCAT(message, ' [PROCESSED]') as message,
-- Increase severity by 1 (e.g., INFO becomes WARN)
severity + 1 as severity,
-- Store the original message in uppercase as a computed field
UPPER(message) as original_uppercase,
-- Store the original severity scaled by 100 as a computed field
severity * 100 as original_severity_scaled
FROM logs
View Example Input/Output
Input:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "user authenticated",
"tags": {"service": ["auth"]},
"attributes": {}
}
Output:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 10,
"message": "user authenticated [PROCESSED]",
"tags": {"service": ["auth"]},
"attributes": {
"original_uppercase": "USER AUTHENTICATED",
"original_severity_scaled": 900
}
}
Note: Core fields (message, severity) are modified, computed fields added to attributes.
SQL Operators
The SQL Transform supports all standard SQL operators available in Flink SQL 1.19:
Comparison Operators
=
- Equal to<>
or!=
- Not equal to<
- Less than<=
- Less than or equal to>
- Greater than>=
- Greater than or equal toBETWEEN value1 AND value2
- Range comparisonNOT BETWEEN value1 AND value2
- Negated range comparison
Examples:
SELECT * FROM logs WHERE severity = 17
SELECT * FROM logs WHERE severity BETWEEN 13 AND 20
SELECT * FROM logs WHERE eventtimestamp > 1724214074000
Logical Operators
AND
- Logical ANDOR
- Logical ORNOT
- Logical NOT
Examples:
SELECT * FROM logs WHERE severity > 16 AND message LIKE '%error%'
SELECT * FROM logs WHERE severity = 17 OR severity = 21
SELECT * FROM logs WHERE NOT (severity < 5)
Pattern Matching Operators
LIKE 'pattern'
- SQL pattern matching with%
(any chars) and_
(single char)NOT LIKE 'pattern'
- Negated pattern matchingSIMILAR TO 'regex'
- Regular expression matchingNOT SIMILAR TO 'regex'
- Negated regex matching
Examples:
SELECT * FROM logs WHERE message LIKE '%failed%'
SELECT * FROM logs WHERE message LIKE 'Error: ___' -- exactly 3 chars after "Error: "
SELECT * FROM logs WHERE message SIMILAR TO '^[A-Z]+:' -- starts with uppercase letters and colon
NULL Handling Operators
IS NULL
- Check for NULL valuesIS NOT NULL
- Check for non-NULL values
Examples:
SELECT * FROM logs WHERE attributes IS NOT NULL
SELECT * FROM logs WHERE JSON_VALUE(attributes, '$.userId') IS NULL
Arithmetic Operators
+
- Addition-
- Subtraction*
- Multiplication/
- Division%
orMOD()
- Modulo
Examples:
SELECT *, severity * 10 as severity_scaled FROM logs
Available SQL Functions
The SQL Transform supports the comprehensive function library of Flink SQL. Below is a sample of functions. Please refer to the Flink docs for a full listing and additional details:
String Functions
Case Manipulation
-
UPPER(string)
- Converts string to uppercaseSELECT UPPER(message) as upper_message FROM logs -- Input: "Error occurred" → Output: "ERROR OCCURRED"
-
LOWER(string)
- Converts string to lowercaseSELECT LOWER(message) as lower_message FROM logs -- Input: "Error OCCURRED" → Output: "error occurred"
-
INITCAP(string)
- Capitalizes first letter of each wordSELECT INITCAP(message) as title_case FROM logs -- Input: "user login failed" → Output: "User Login Failed"
String Extraction and Manipulation
-
SUBSTR(string, start [, length])
- Extracts substringSELECT SUBSTR(message, 1, 10) as msg_prefix FROM logs SELECT SUBSTR(message, 5) as msg_suffix FROM logs -- from position 5 to end
-
CONCAT(string1, string2, ...)
- Concatenates stringsSELECT CONCAT('[', severity, '] ', message) as formatted_message FROM logs
-
CONCAT_WS(separator, string1, string2, ...)
- Concatenates with separatorSELECT CONCAT_WS(' | ', tags['host'][1], tags['service'][1], message) as combined FROM logs
-
TRIM([BOTH | LEADING | TRAILING] [characters] FROM string)
- Removes specified charactersSELECT TRIM(message) as clean_message FROM logs -- removes whitespace SELECT TRIM(LEADING '0' FROM id) as clean_id FROM logs -- removes leading zeros
-
REPLACE(string, search, replacement)
- Replaces all occurrencesSELECT REPLACE(message, 'ERROR', 'ALERT') as modified_message FROM logs
-
REGEXP_REPLACE(string, pattern, replacement)
- Regex-based replacementSELECT REGEXP_REPLACE(message, '\\d{4}-\\d{2}-\\d{2}', 'YYYY-MM-DD') as sanitized FROM logs SELECT REGEXP_REPLACE(message, 'user=[^\\s]+', 'user=REDACTED') as redacted FROM logs
String Information Functions
-
CHAR_LENGTH(string)
orLENGTH(string)
- Returns character countSELECT CHAR_LENGTH(message) as msg_length FROM logs SELECT LENGTH(message) as msg_length FROM logs -- Alternative syntax
-
POSITION(substring IN string)
- Finds substring position (1-based)SELECT POSITION('error' IN LOWER(message)) as error_pos FROM logs
-
ASCII(string)
- Returns ASCII value of first characterSELECT ASCII(LEFT(message, 1)) as first_char_code FROM logs
Advanced String Functions
-
LPAD(string, length, pad_string)
- Left-pads stringSELECT LPAD(CAST(severity AS STRING), 3, '0') as padded_severity FROM logs -- Input: "9" → Output: "009"
-
RPAD(string, length, pad_string)
- Right-pads stringSELECT RPAD(tags['service'][1], 10, '-') as padded_service FROM logs
-
LEFT(string, length)
- Returns leftmost charactersSELECT LEFT(message, 10) as msg_prefix FROM logs
-
RIGHT(string, length)
- Returns rightmost charactersSELECT RIGHT(message, 10) as msg_suffix FROM logs
-
REGEXP_EXTRACT(string, pattern [, group])
- Extracts regex matchSELECT REGEXP_EXTRACT(message, 'response_time=(\\d+)ms', 1) as response_time FROM logs SELECT REGEXP_EXTRACT(message, 'user=([^\\s]+)', 1) as username FROM logs
Numeric Functions
Basic Mathematical Functions
-
ABS(numeric)
- Returns absolute valueSELECT ABS(severity - 12) as severity_diff FROM logs
-
MOD(numeric1, numeric2)
- Returns remainderSELECT MOD(severity, 4) as severity_mod FROM logs
-
POWER(base, exponent)
- Returns base raised to exponentSELECT POWER(severity, 2) as severity_squared FROM logs
-
SQRT(numeric)
- Returns square rootSELECT SQRT(severity) as severity_sqrt FROM logs
-
SIGN(numeric)
- Returns sign (-1, 0, 1)SELECT SIGN(receivedtimestamp - eventtimestamp) as delay_sign FROM logs
Rounding Functions
-
ROUND(numeric [, precision])
- Rounds to specified decimal placesSELECT ROUND(severity / 3.0, 2) as severity_ratio FROM logs
-
FLOOR(numeric)
- Rounds down to nearest integerSELECT FLOOR(severity / 4.0) as severity_group FROM logs
-
CEIL(numeric)
orCEILING(numeric)
- Rounds up to nearest integerSELECT CEIL(severity / 4.0) as severity_group_up FROM logs SELECT CEILING(severity / 4.0) as severity_group_up FROM logs -- Alternative syntax
Logarithmic and Exponential Functions
LN(numeric)
orLOG(numeric)
- Natural logarithmLOG10(numeric)
- Base-10 logarithmLOG2(numeric)
- Base-2 logarithmEXP(numeric)
- Exponential function (e^x)LOG(base, numeric)
- Logarithm with specified base
Trigonometric Functions
SIN(numeric)
,COS(numeric)
,TAN(numeric)
- Trigonometric functionsASIN(numeric)
,ACOS(numeric)
,ATAN(numeric)
- Inverse trigonometric functionsDEGREES(radians)
,RADIANS(degrees)
- Angle conversion
Date/Time Functions
Current Time Functions
CURRENT_TIMESTAMP
- Returns current timestamp in local timezoneLOCALTIME
- Returns current SQL timeCURRENT_DATE
- Returns current SQL dateUNIX_TIMESTAMP()
- Returns current Unix timestamp in seconds
Time Extraction
EXTRACT(unit FROM temporal)
- Extracts time componentAvailable units: MILLENNIUM, CENTURY, DECADE, YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECONDSELECT EXTRACT(HOUR FROM eventtimestamp) as event_hour FROM logs SELECT EXTRACT(DAY FROM eventtimestamp) as event_day FROM logs SELECT EXTRACT(YEAR FROM eventtimestamp) as event_year FROM logs
Time Conversion Functions
-
TO_TIMESTAMP_LTZ(numeric, precision)
- Converts epoch timestamp to TIMESTAMP_LTZSELECT TO_TIMESTAMP_LTZ(JSON_VALUE(attributes, '$.timestamp' RETURNING BIGINT), 3) as event_time FROM logs -- 3 for milliseconds
-
UNIX_TIMESTAMP(timestamp)
- Converts timestamp to Unix timestamp (seconds)SELECT UNIX_TIMESTAMP(eventtimestamp) as unix_time FROM logs
-
TO_TIMESTAMP(string [, format])
- Parses string to timestampSELECT TO_TIMESTAMP(JSON_VALUE(attributes, '$.timestamp'), 'yyyy-MM-dd HH:mm:ss') as parsed_time FROM logs
Conditional Functions
CASE Expressions
-
CASE WHEN condition THEN result [WHEN condition THEN result] ELSE result END
SELECT *, CASE WHEN severity <= 4 THEN 'TRACE' WHEN severity <= 8 THEN 'DEBUG' WHEN severity <= 12 THEN 'INFO' WHEN severity <= 16 THEN 'WARN' WHEN severity <= 20 THEN 'ERROR' ELSE 'FATAL' END as severity_level FROM logs
-
CASE value WHEN value1 THEN result1 WHEN value2 THEN result2 ELSE default END
SELECT *, CASE severity WHEN 1 THEN 'TRACE' WHEN 9 THEN 'INFO' WHEN 17 THEN 'ERROR' ELSE 'OTHER' END as severity_name FROM logs
NULL Handling Functions
-
COALESCE(value1, value2, ...)
- Returns first non-NULL valueSELECT COALESCE(JSON_VALUE(attributes, '$.userId'), 'unknown') as user_id FROM logs
-
NULLIF(value1, value2)
- Returns NULL if values are equalSELECT NULLIF(severity, 0) as non_zero_severity FROM logs
-
IF(condition, true_value, false_value)
- Simple conditionalSELECT IF(severity > 16, 'HIGH', 'NORMAL') as priority FROM logs
Comparison Functions
-
GREATEST(value1, value2, ...)
- Returns maximum valueSELECT GREATEST(severity, 10) as min_severity FROM logs
-
LEAST(value1, value2, ...)
- Returns minimum valueSELECT LEAST(severity, 20) as max_severity FROM logs
JSON Functions
JSON Validation
string IS JSON [VALUE | SCALAR | ARRAY | OBJECT]
- Validates JSON formatSELECT * FROM logs WHERE attributes IS JSON OBJECT
JSON Path Functions
-
JSON_EXISTS(json_string, path)
- Checks if JSON path existsSELECT * FROM logs WHERE JSON_EXISTS(attributes, '$.process.thread')
-
JSON_VALUE(json_string, path [RETURNING type] [ON ERROR | ON EMPTY])
- Extracts scalar valueSELECT JSON_VALUE(attributes, '$.userId' RETURNING STRING) as user_id FROM logs SELECT JSON_VALUE(attributes, '$.responseTime' RETURNING INTEGER) as response_time FROM logs SELECT JSON_VALUE(attributes, '$.isSuccess' RETURNING BOOLEAN) as success FROM logs
-
JSON_QUERY(json_string, path [RETURNING type] [ON ERROR | ON EMPTY])
- Extracts JSON objects/arraysSELECT JSON_QUERY(attributes, '$.process') as process_info FROM logs SELECT JSON_QUERY(attributes, '$.tags[*]') as all_tags FROM logs
JSON Construction
-
JSON_OBJECT([KEY key VALUE value] ...)
- Creates JSON objectSELECT JSON_OBJECT('severity' VALUE severity, 'timestamp' VALUE eventtimestamp) as event_summary FROM logs
-
JSON_ARRAY([value1, value2, ...])
- Creates JSON arraySELECT JSON_ARRAY(severity, EXTRACT(HOUR FROM TO_TIMESTAMP_LTZ(eventtimestamp, 3)), tags['host'][1]) as event_array FROM logs
Hash Functions
-
MD5(string)
- Returns MD5 hashSELECT MD5(CONCAT(id, message)) as content_hash FROM logs
-
SHA1(string)
- Returns SHA-1 hash -
SHA256(string)
- Returns SHA-256 hashSELECT SHA256(message) as message_hash FROM logs
Aggregate Functions
Basic Aggregations
-
COUNT([DISTINCT] expression)
- Counts rows/distinct valuesSELECT COUNT(*) as total_logs FROM logs SELECT COUNT(DISTINCT tags['service'][1]) as unique_services FROM logs
-
SUM([DISTINCT] expression)
- Sums numeric values -
AVG([DISTINCT] expression)
- Calculates average -
MIN(expression)
- Finds minimum value -
MAX(expression)
- Finds maximum valueSELECT tags['service'][1] as service, COUNT(*) as log_count, AVG(severity) as avg_severity FROM logs GROUP BY tags['service'][1]
Statistical Functions
STDDEV_POP(expression)
- Population standard deviationSTDDEV_SAMP(expression)
- Sample standard deviationVAR_POP(expression)
- Population varianceVAR_SAMP(expression)
- Sample variance
Collection Functions
COLLECT(expression)
- Collects values into a multisetLISTAGG(expression [, separator])
- Concatenates values with separatorSELECT tags['service'][1] as service, LISTAGG(DISTINCT tags['host'][1], ',') as hosts FROM logs GROUP BY tags['service'][1]
Windowing and Aggregations
To aggreagate data in a streaming setup with Flink SQL, you will want to use windowing functions. To learn about this powerful capability, please refer to the Flink documentation here (opens in a new tab).
Window Functions (Streaming)
For streaming data processing, Flink SQL provides windowing table-valued functions (TVFs). These functions create windows over time-based or count-based intervals, allowing you to perform streaming aggregations over subsets of data.
Note when using streaming windowed aggregations along with GROUP BY
, you must
include window_time
as a group-by key.
Tumbling Windows
Fixed-size, non-overlapping windows:
-- ✅ VALID: Get a breakdown of log count per severity per service in 5-minute windows
SELECT
window_start as eventtimestamp, -- Window start as event time
CONCAT('Window summary: ', service, ' had ', CAST(log_count AS STRING), ' logs with severity ',
CAST(severity AS STRING)) as message, -- Descriptive message
severity,
MAP['service', ARRAY[service], 'window_type', ARRAY['tumbling']] as tags, -- Service and window type tags
window_start, -- turns into an attribute
window_end, -- turns into an attribute
log_count -- turns into an attribute
FROM (
SELECT
window_start, -- Start time of each 5-minute window
window_end, -- End time of each 5-minute window
tags['service'][1] as service, -- Group by service name
severity, -- and severity
COUNT(*) as log_count -- Count total logs in each window
FROM TABLE(
-- Create 5-minute windows based on event timestamp
TUMBLE(TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '5' MINUTES)
)
-- Group results by window and service to get aggregates per service per window
GROUP BY window_start, window_end, serverity, tags['service'][1]
)
Hopping (Sliding) Windows
Fixed-size windows that can overlap:
-- ✅ VALID: Convert hopping window analysis into system health LogEvents
SELECT
-- Generate synthetic LogEvent fields for each window health report
MD5(CONCAT(CAST(window_start AS STRING), 'system_health')) as id,
UNIX_TIMESTAMP(window_end) * 1000 as eventtimestamp, -- Window end as event time
UNIX_TIMESTAMP(CURRENT_TIMESTAMP) * 1000 as receivedtimestamp,
CONCAT('System health: ', CAST(log_count AS STRING), ' total logs across ',
CAST(service_count AS STRING), ' services') as message,
-- Set severity based on activity level
CASE
WHEN log_count < 100 THEN 13 -- WARN: Low activity
WHEN log_count > 10000 THEN 13 -- WARN: High activity
ELSE 9 -- INFO: Normal activity
END as severity,
MAP['analysis_type', ARRAY['system_health'], 'window_type', ARRAY['hopping']] as tags,
JSON_OBJECT(
'window_start' VALUE CAST(window_start AS STRING),
'window_end' VALUE CAST(window_end AS STRING),
'total_log_count' VALUE log_count,
'active_service_count' VALUE service_count,
'activity_level' VALUE CASE
WHEN log_count < 100 THEN 'low'
WHEN log_count > 10000 THEN 'high'
ELSE 'normal'
END,
'analysis_type' VALUE 'hopping_window_health'
) as attributes,
'system' as service, -- Synthetic service for system-wide metrics
'health-monitor' as host -- Synthetic host for health monitoring
FROM (
SELECT
window_start, -- Start time of each window
window_end, -- End time of each window
COUNT(*) as log_count, -- Total logs in this window
COUNT(DISTINCT tags['service'][1]) as service_count -- Number of unique services in this window
FROM TABLE(
HOP(
TABLE logs,
DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
INTERVAL '1' MINUTE, -- Window slides every 1 minute
INTERVAL '5' MINUTES -- Each window covers 5 minutes of data
)
-- This creates overlapping 5-minute windows that move forward every minute
)
-- Group by window boundaries to get one result per window
GROUP BY window_start, window_end
)
-- This converts each health summary into a proper LogEvent
Session Windows
Groups events by activity sessions:
-- ✅ VALID: Convert session analysis into session summary LogEvents
SELECT
-- Generate synthetic LogEvent fields for each session summary
MD5(CONCAT(CAST(window_start AS STRING), tags['service'][1], 'session')) as id,
window_end as eventtimestamp, -- Session end as event time
UNIX_TIMESTAMP(CURRENT_TIMESTAMP) * 1000 as receivedtimestamp,
CONCAT('Session completed for ', tags['service'][1], ': ', CAST(session_events AS STRING),
' events over ', CAST((window_end - window_start) / 60000 AS STRING), ' minutes') as message,
-- Set severity based on session length and activity
CASE
WHEN session_events < 5 THEN 5 -- DEBUG: Short session
WHEN session_events > 1000 THEN 13 -- WARN: Very active session
ELSE 9 -- INFO: Normal session
END as severity,
MAP['service', ARRAY[tags['service'][1]], 'session_type', ARRAY['activity_session']] as tags,
JSON_OBJECT(
'session_start' VALUE window_start,
'session_end' VALUE window_end,
'session_duration_ms' VALUE (window_end - window_start),
'session_duration_minutes' VALUE ((window_end - window_start) / 60000),
'event_count' VALUE session_events,
'events_per_minute' VALUE ROUND(session_events / ((window_end - window_start) / 60000.0), 2),
'analysis_type' VALUE 'session_summary'
) as attributes,
service, -- Required service field
'session-analyzer' as host -- Synthetic host for session analysis
FROM (
SELECT
window_start, -- When this session started
window_end, -- When this session ended
tags['service'][1] as service, -- Service name for this session
COUNT(*) as session_events -- Number of events in this session
FROM TABLE(
SESSION(
TABLE logs PARTITION BY tags['service'][1], -- Create separate sessions for each service
DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
INTERVAL '10' MINUTES -- If no events for 10 minutes, end the session
)
-- Groups consecutive events into sessions based on time gaps
)
-- Group by session boundaries and service
GROUP BY window_start, window_end, tags['service'][1]
)
-- This converts each session summary into a proper LogEvent
Cumulative Windows
Windows that expand incrementally:
SELECT
window_start,
window_end,
service,
COUNT(*) as cumulative_count
FROM TABLE(
CUMULATE(
TABLE logs,
DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
INTERVAL '1' HOUR, -- step size
INTERVAL '1' DAY -- max window size
)
)
GROUP BY window_start, window_end, tags['service'][1]
Analytical Window Functions
Ranking Functions
ROW_NUMBER() OVER (window_spec)
- Sequential row numbersRANK() OVER (window_spec)
- Ranking with gapsDENSE_RANK() OVER (window_spec)
- Ranking without gapsPERCENT_RANK() OVER (window_spec)
- Percentage rank
SELECT *, -- Include all original fields
-- Assign sequential numbers to events within each service (newest first)
ROW_NUMBER() OVER (ORDER BY eventtimestamp) as event_sequence
FROM logs
GROUP BY Operations
Basic Grouping
The below example groups messages by service and severity, counting the number of logs in each group for each minute. This is a common operation to summarize log data.
WITH parsed AS (
SELECT
tags['service'][1] as service, -- Extract service name
severity, -- Extract severity level
eventtimestamp -- Use event timestamp for windowing
FROM logs
),
aggregated AS (
SELECT
service, -- Service name
severity, -- Severity level
COUNT(*) as evtcount, -- Count of logs in each group
MIN(eventtimestamp) as first_event, -- Earliest event in group
MAX(eventtimestamp) as last_event -- Latest event in group
FROM TABLE(TUMBLE(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end, service, severity -- Group by service and severity
)
SELECT
last_event as eventtimestamp, -- Use latest event time as the summary event time
CONCAT('Service ', service, ' severity ', CAST(severity AS STRING), ' had ',
CAST(evtcount AS STRING), ' events') as message, -- Descriptive summary message
severity, -- Use the grouped severity level
MAP['service', ARRAY[service], 'analysis_type', ARRAY['group_summary']] as tags,
JSON_OBJECT(
'event_count' VALUE evtcount,
'first_event' VALUE first_event,
'last_event' VALUE last_event,
'analysis_type' VALUE 'service_severity_summary'
) as attributes
FROM aggregated
HAVING Clause
Filters groups after aggregation. In the below example, we create new log messages when the error count exceeds a threshold. This is an example of the complex alerting rules you can create with SQL.
-- ✅ VALID: Convert error count analysis into alert LogEvents
WITH parsed AS (
SELECT
tags['service'][1] as service, -- Extract service name
severity, -- Extract severity level
eventtimestamp -- Use event timestamp for windowing
FROM logs
),
-- HAVING filters groups, WHERE filters individual rows
aggregated AS (
SELECT
service, -- Service name
COUNT(*) as error_count -- Count of error-level logs
FROM TABLE(HOP(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '1' MINUTE, INTERVAL '10' MINUTES))
WHERE severity >= 17 -- First filter: only ERROR level logs (17+)
GROUP BY window_start, window_end, service -- Group by service to count errors per service
HAVING COUNT(*) > 100 -- Second filter: only show services with 100+ errors
)
SELECT
CONCAT('HIGH ERROR ALERT: Service ', service, ' has ',
CAST(error_count AS STRING), ' error-level events') as message,
21 as severity, -- FATAL level for high error count alerts
MAP['service', ARRAY[service], 'alert_type', ARRAY['high_error_count'],
'severity_level', ARRAY['FATAL']] as tags,
JSON_OBJECT(
'error_count' VALUE error_count,
'error_threshold' VALUE 100,
'severity_filter' VALUE 'ERROR_LEVEL_17_PLUS',
'alert_reason' VALUE 'error_count_exceeded_threshold',
'analysis_type' VALUE 'error_count_monitoring'
) as attributes
FROM aggregated
Advanced Grouping
GROUPING SETS
Create multiple aggregation levels. In the below example, we create multiple reporting hierarchy levels using grouping sets so we have varied granularity summarizations.
-- ✅ VALID: Convert multi-level aggregations into hierarchy report LogEvents
-- Start by extracting all the items we care about from the LogEvents
WITH parsed AS (
SELECT
tags['service'][1] as service, -- Extract service name
tags['host'][1] as host, -- Extract host name
severity, -- Extract severity level
eventtimestamp -- Use event timestamp for windowing
FROM logs
),
-- Execute the aggregation
WITH aggregated AS (
SELECT
window_start, -- Start of the aggregation window
service, -- Service name
host, -- Host name
severity, -- Severity level
COUNT(*) as log_count -- Count of logs in this aggregation level
FROM TABLE(TUMBLE(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '10' SECONDS))
GROUP BY window_start, window_end, GROUPING SETS (
(service, host, severity), -- Most detailed: by service + host + severity
(service, severity), -- Medium detail: by service + severity (all hosts combined)
(service), -- Less detail: by service only (all hosts and severities)
() -- Least detail: grand total count (no grouping)
)
)
-- Create nice messages and attributes for each aggregation level
SELECT
window_start as eventtimestamp, -- window time as eventtimestamp
-- Create descriptive message based on aggregation level
CASE
WHEN service IS NOT NULL AND host IS NOT NULL AND severity IS NOT NULL
THEN CONCAT('Detailed: ', service, '@', host, ' severity=', CAST(severity AS STRING), ' count=', CAST(log_count AS STRING))
WHEN service IS NOT NULL AND severity IS NOT NULL
THEN CONCAT('Service summary: ', service, ' severity=', CAST(severity AS STRING), ' count=', CAST(log_count AS STRING))
WHEN service IS NOT NULL
THEN CONCAT('Service total: ', service, ' count=', CAST(log_count AS STRING))
ELSE CONCAT('Grand total: ', CAST(log_count AS STRING), ' logs')
END as message,
MAP[
'service', ARRAY[COALESCE(service, 'ALL')],
'host', ARRAY[COALESCE(host, 'ALL')],
'report_type', ARRAY['hierarchy_summary'],
'aggregation_level', ARRAY[
CASE
WHEN service IS NOT NULL AND host IS NOT NULL AND severity IS NOT NULL THEN 'detailed'
WHEN service IS NOT NULL AND severity IS NOT NULL THEN 'service_severity'
WHEN service IS NOT NULL THEN 'service_total'
ELSE 'grand_total'
END
]
] as tags,
JSON_OBJECT(
'service' VALUE COALESCE(service, 'ALL'),
'host' VALUE COALESCE(host, 'ALL'),
'severity' VALUE COALESCE(severity, -1),
'log_count' VALUE log_count
) as attributes
FROM aggregated
)
ROLLUP
Generate hierarchical aggregations. Generates all prefixes of the grouping columns (note this is a batch example):
SELECT
tags['service'][1] as service,
tags['host'][1] as host,
COUNT(*) as log_count
FROM logs
GROUP BY ROLLUP (tags['service'][1], tags['host'][1])
-- Generates: (service, host), (service), ()
CUBE
Generate all possible combinations of the grouping columns (note this is a batch example):
SELECT
tags['service'][1] as service,
severity,
COUNT(*) as log_count
FROM logs
GROUP BY CUBE (tags['service'][1], severity)
-- Generates: (service, severity), (service), (severity), ()
Event-Level Analytical Functions
Event Ranking Within Service
-- ✅ VALID: Add ranking information to individual log events
SELECT *, -- Include all LogEvent fields
-- Add ranking of events within each service by severity
RANK() OVER (PARTITION BY tags['service'][1] ORDER BY severity DESC) as severity_rank_in_service,
-- Add row numbers to events within each service (most recent first)
ROW_NUMBER() OVER (PARTITION BY tags['service'][1] ORDER BY eventtimestamp DESC) as event_sequence,
-- Add the severity of the previous event in this service
LAG(severity, 1) OVER (PARTITION BY tags['service'][1] ORDER BY eventtimestamp) as prev_event_severity
FROM logs
-- This returns individual LogEvents with additional analytical context
Important Rules
LogEvent Fields
Your SQL query can return any combination of LogEvent fields, but when specified, they must be of the acceptable types:
-
id
(STRING
) - Globally unique identifier for the log event -
eventtimestamp
(BIGINT
orTIMESTAMP_LTZ
orTIMESTAMP
) - WhenBIGINT
, milliseconds since Unix epoch when event occurred -
receivedtimestamp
(BIGINT
orTIMESTAMP_LTZ
orTIMESTAMP
) - WHENBIGINT
, milliseconds since Unix epoch when Grepr received the event -
message
(STRING
) - The log message content -
severity
(INT
) - Log severity level (1-24, following OpenTelemetry convention) -
tags
(MAP<STRING, ARRAY<STRING>>
) - Key-value pairs for filtering and routing -
attributes
(STRING
) - JSON string containing structured data
Additional fields: Any extra computed columns are automatically added to the event's
attributes
map.
Default Values for Omitted Fields
When core LogEvent fields are omitted from your SELECT clause, the original values from the input event are preserved. However, if you're creating entirely new events as what happens during windowed aggregations, you may want to understand the default behavior:
-
id
- Generated unique identifier if not provided -
eventtimestamp
- Current timestamp if not provided -
receivedtimestamp
- Current timestamp if not provided -
message
- Empty string if not provided -
severity
- 9 (INFO level) if not provided -
tags
- Empty map if not provided -
attributes
- Empty map if not provided
Note: In most cases, you'll want to use SELECT *
or include specific
fields to preserve data from the input events.
Examples:
-- Updates only message, all other fields preserved
SELECT *, UPPER(message) as message FROM logs
-- Updates message and severity, adds computed field into attributes
SELECT *, UPPER(message) as message, severity + 1 as severity, CHAR_LENGTH(message) as msg_length FROM logs
Error Handling
The SQL Transform will fail if:
-
SQL syntax is invalid - Malformed SQL queries will be rejected during parsing
-
Field types are incompatible - Core LogEvent fields must match their expected types when included
-
Query execution errors - Runtime errors during SQL execution (e.g., division by zero, invalid regex)
-
JSON parsing errors - Invalid JSON operations on the attributes field
SQL Transform Limitations
Streaming vs Batch Context
The SQL Transform operates in both streaming and batch processing. Streaming will have the following additional limitations:
Windowing Limitations
-
Window functions require explicit windowing TVFs (TUMBLE, HOP, SESSION, CUMULATE)
-
No unbounded aggregations - Use windowing for aggregations in streaming mode
State Management
-
Stateful operations can grow unbounded - Your pipeline can fail if the amount of state used for aggregations or joins becomes too large. You can control the Time-To-Live (TTL) of state by setting the
stateTtl
parameter in the SQL Transform configuration. By default, it's 2 minutes. -
JOIN operations are limited in streaming context
-
Complex aggregations may require careful state management
Data Ordering
-
Events may arrive out of order - Consider using watermarks for time-based operations
-
ROW_NUMBER() and ranking functions may not guarantee consistent ordering across restarts
Type System Constraints
Attributes Field Handling
- Use JSON functions (JSON_VALUE, JSON_QUERY, JSON_EXISTS) to work with attributes
- Type conversion is automatic for basic types (string, number, boolean)
Performance Considerations
Query Optimization
- Use WHERE clauses early to filter data before expensive operations
- Avoid SELECT * when possible - explicitly list needed columns
- Limit regex operations on high-volume streams - they are CPU intensive
- Use indexes wisely - TO_TIMESTAMP_LTZ(eventtimestamp, 3) operations benefit from time-based partitioning
Memory and State Management
- Computed fields increase event size - consider impact on downstream processing
- Complex aggregations require more memory - reduce TTLs if needed.
- Window operations buffer data - larger windows require more memory
- JSON operations allocate temporary objects - avoid in tight loops
Throughput Optimization
-- ✅ GOOD - Filter early, simple operations
SELECT *, -- Select all fields
UPPER(message) as upper_message -- Simple string operation (fast)
FROM logs
WHERE severity >= 17 -- Filter first to reduce data volume
-- ⚠️ CAUTION - Expensive regex on all records
SELECT *, -- Processes every single log record
-- This regex runs on ALL logs, even those without dates (inefficient)
REGEXP_EXTRACT(message, '(\\d{4}-\\d{2}-\\d{2})', 1) as date_extracted
FROM logs
-- ✅ BETTER - Filter then apply expensive operation
SELECT *, -- Only processes logs that likely contain dates
-- Regex only runs on pre-filtered logs (much more efficient)
REGEXP_EXTRACT(message, '(\\d{4}-\\d{2}-\\d{2})', 1) as date_extracted
FROM logs
WHERE message LIKE '%-%-%' -- Quick pre-filter using simple pattern matching
Resource Usage Patterns
- String operations (UPPER, LOWER, CONCAT): Low CPU overhead
- Regex operations (REGEXP_REPLACE, REGEXP_EXTRACT): High CPU overhead
- JSON operations (JSON_VALUE, JSON_QUERY): Moderate CPU overhead
- Mathematical operations: Low CPU overhead
- Window aggregations: High memory usage during window buffering
Use Cases
The SQL Transform is ideal for:
Data Normalization and Standardization
- Message format standardization: Converting various log formats to a common structure
- Timestamp normalization: Converting different timestamp formats to standard milliseconds
- Field standardization: Ensuring consistent field names and types across services
Log Enhancement and Enrichment
- Computed metrics: Adding calculated fields like processing delays, error rates
- Categorization: Adding severity levels, priority flags, or classification labels
- Contextual data: Extracting and promoting key information from log messages
Security and Compliance
- Data redaction: Removing or masking sensitive information (passwords, tokens, PII)
- Audit trail enhancement: Adding tracking fields for compliance requirements
- Anomaly flagging: Marking unusual patterns or suspicious activities
Performance and Monitoring
- SLA monitoring: Extracting response times and performance metrics
- Error analysis: Categorizing and enriching error logs for better debugging
- Trend analysis: Preparing log data for time-series analysis and alerting
Data Pipeline Optimization
- Selective processing: Filtering logs to reduce downstream processing load
- Format conversion: Converting logs for compatibility with downstream systems
- Batch preparation: Aggregating and preparing data for batch analytics
Examples from Production
Security Log Processing
SELECT *,
CASE
WHEN message LIKE '%authentication failed%' THEN 'auth_failure'
WHEN message LIKE '%unauthorized access%' THEN 'unauthorized'
WHEN message LIKE '%suspicious activity%' THEN 'suspicious'
ELSE 'normal'
END as security_category,
REGEXP_REPLACE(message, 'user=[^\\s]+', 'user=REDACTED') as sanitized_message
FROM logs
WHERE severity >= 17 -- ERROR level and above
View Example Input/Output
Input:
{
"id": "sec1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 17,
"message": "authentication failed for user=johndoe from IP 192.168.1.100",
"tags": {"service": ["auth"], "env": ["prod"]},
"attributes": {
"ip": "192.168.1.100",
"attempt": 3
}
}
Output:
{
"id": "sec1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 17,
"message": "authentication failed for user=johndoe from IP 192.168.1.100",
"tags": {"service": ["auth"], "env": ["prod"]},
"attributes": {
"ip": "192.168.1.100",
"attempt": 3,
"security_category": "auth_failure",
"sanitized_message": "authentication failed for user=REDACTED from IP 192.168.1.100"
}
}
Application Performance Monitoring
SELECT *,
CAST(REGEXP_EXTRACT(message, 'response_time=(\\d+)ms', 1) AS BIGINT) as response_time_ms,
CASE
WHEN message LIKE '%response_time=%ms' AND CAST(REGEXP_EXTRACT(message, 'response_time=(\\d+)ms', 1) AS BIGINT) > 1000 THEN 'slow'
ELSE 'normal'
END as performance_category
FROM logs
WHERE message LIKE '%response_time=%'
View Example Input/Output
Input:
{
"id": "perf1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "API request completed with response_time=1250ms for endpoint /api/users",
"tags": {"service": ["api"], "endpoint": ["/api/users"]},
"attributes": {
"method": "GET",
"status_code": 200,
"user_id": "12345"
}
}
Output:
{
"id": "perf1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "API request completed with response_time=1250ms for endpoint /api/users",
"tags": {"service": ["api"], "endpoint": ["/api/users"]},
"attributes": {
"method": "GET",
"status_code": 200,
"user_id": "12345",
"response_time_ms": 1250,
"performance_category": "slow"
}
}