SQL transform functions and operators
This page provides a reference to SQL operators and functions supported by the Grepr SQL transform. Although this page describes a subset of available functions, the SQL transform supports the comprehensive function library of Flink SQL. For details on all supported functions, see System (Built-in) Functions .
SQL operators
The SQL transform supports all standard SQL operators available in Flink SQL 1.20:
Comparison operators
=: Equal to<>or!=: Not equal to<: Less than<=: Less than or equal to>: Greater than>=: Greater than or equal toBETWEEN value1 AND value2: Range comparisonNOT BETWEEN value1 AND value2: Negated range comparison
Examples
SELECT * FROM logs WHERE severity = 17
SELECT * FROM logs WHERE severity BETWEEN 13 AND 20
SELECT * FROM logs WHERE eventtimestamp > 1724214074000Logical operators
AND: Logical ANDOR: Logical ORNOT: Logical NOT
Examples
SELECT * FROM logs WHERE severity > 16 AND message LIKE '%error%'
SELECT * FROM logs WHERE severity = 17 OR severity = 21
SELECT * FROM logs WHERE NOT (severity < 5)Pattern matching operators
LIKE 'pattern': SQL pattern matching with%(any chars) and_(single char)NOT LIKE 'pattern': Negated pattern matchingSIMILAR TO 'regex': Regular expression matchingNOT SIMILAR TO 'regex': Negated regex matching
Examples
SELECT * FROM logs WHERE message LIKE '%failed%'
SELECT * FROM logs WHERE message LIKE 'Error: ___' -- exactly 3 chars after "Error: "
SELECT * FROM logs WHERE message SIMILAR TO '^[A-Z]+:' -- starts with uppercase letters and colonNULL handling operators
IS NULL: Check forNULLvaluesIS NOT NULL: Check for non-NULLvalues
Examples
SELECT * FROM logs WHERE attributes IS NOT NULL
SELECT * FROM logs WHERE JSON_VALUE(attributes, '$.userId') IS NULLArithmetic operators
+: Addition-: Subtraction*: Multiplication/: Division%orMOD(): Modulo
Examples
SELECT *, severity * 10 as severity_scaled FROM logsFunction compatibility with Flink SQL
The SQL transform uses Apache Flink SQL 1.20 and is ANSI SQL compliant. Here are important function compatibility notes:
Timestamp functions
- Use:
TO_TIMESTAMP_LTZ(timestamp_in_millis, 3)to convert millisecond timestamps to TIMESTAMP - Don’t use:
TIMESTAMP_MILLIS(). This function is not available in Flink SQL 1.20. UseTO_TIMESTAMP_LTZ()instead.
String functions
- Available: Most standard SQL string functions are supported, including
REGEXP_REPLACE(),UPPER(),LOWER(),CONCAT(),CHAR_LENGTH(),LEFT(), andRIGHT().
Date and time functions
- Available: Standard date and time functions, such as
EXTRACT(),CURRENT_TIMESTAMP, andDATE_FORMAT(), are supported. - Note: Always convert millisecond timestamps first using
TO_TIMESTAMP_LTZ().
JSON functions
- Available:
JSON_VALUE(),JSON_EXISTS()for extracting data from JSON strings. - Note: The
attributesfield is a JSON string that can be processed with these functions.
For a complete list of available functions, see System (Built-in) Functions .
Available SQL functions
String functions
Case manipulation
-
UPPER(string): Converts string to uppercaseSELECT UPPER(message) as upper_message FROM logs -- Input: "Error occurred" → Output: "ERROR OCCURRED" -
LOWER(string): Converts string to lowercaseSELECT LOWER(message) as lower_message FROM logs -- Input: "Error OCCURRED" → Output: "error occurred" -
INITCAP(string): Capitalizes first letter of each wordSELECT INITCAP(message) as title_case FROM logs -- Input: "user login failed" → Output: "User Login Failed"
String extraction and manipulation
-
SUBSTR(string, start [, length]): Extracts substringSELECT SUBSTR(message, 1, 10) as msg_prefix FROM logs SELECT SUBSTR(message, 5) as msg_suffix FROM logs -- from position 5 to end -
CONCAT(string1, string2, ...): Concatenates stringsSELECT CONCAT('[', severity, '] ', message) as formatted_message FROM logs -
CONCAT_WS(separator, string1, string2, ...): Concatenates with separatorSELECT CONCAT_WS(' | ', tags['host'][1], tags['service'][1], message) as combined FROM logs -
TRIM([BOTH | LEADING | TRAILING] [characters] FROM string): Removes specified charactersSELECT TRIM(message) as clean_message FROM logs -- removes whitespace SELECT TRIM(LEADING '0' FROM id) as clean_id FROM logs -- removes leading zeros -
REPLACE(string, search, replacement): Replaces all occurrencesSELECT REPLACE(message, 'ERROR', 'ALERT') as modified_message FROM logs -
REGEXP_REPLACE(string, pattern, replacement): Regex-based replacementSELECT REGEXP_REPLACE(message, '\\\\d{4}-\\\\d{2}-\\\\d{2}', 'YYYY-MM-DD') as sanitized FROM logs SELECT REGEXP_REPLACE(message, 'user=[^\\\\s]+', 'user=REDACTED') as redacted FROM logs
String information functions
-
CHAR_LENGTH(string)orLENGTH(string): Returns character countSELECT CHAR_LENGTH(message) as msg_length FROM logs SELECT LENGTH(message) as msg_length FROM logs -- Alternative syntax -
POSITION(substring IN string): Finds substring position (1-based)SELECT POSITION('error' IN LOWER(message)) as error_pos FROM logs -
ASCII(string): Returns ASCII value of first characterSELECT ASCII(LEFT(message, 1)) as first_char_code FROM logs
Advanced string functions
-
LPAD(string, length, pad_string): Left-pads stringSELECT LPAD(CAST(severity AS STRING), 3, '0') as padded_severity FROM logs -- Input: "9" → Output: "009" -
RPAD(string, length, pad_string): Right-pads stringSELECT RPAD(tags['service'][1], 10, '-') as padded_service FROM logs -
LEFT(string, length): Returns leftmost charactersSELECT LEFT(message, 10) as msg_prefix FROM logs -
RIGHT(string, length): Returns rightmost charactersSELECT RIGHT(message, 10) as msg_suffix FROM logs -
REGEXP_EXTRACT(string, pattern [, group]): Extracts regex matchSELECT REGEXP_EXTRACT(message, 'response_time=(\\\\d+)ms', 1) as response_time FROM logs SELECT REGEXP_EXTRACT(message, 'user=([^\\\\s]+)', 1) as username FROM logs
Numeric functions
Basic mathematical functions
-
ABS(numeric): Returns absolute valueSELECT ABS(severity: 12) as severity_diff FROM logs -
MOD(numeric1, numeric2): Returns remainderSELECT MOD(severity, 4) as severity_mod FROM logs -
POWER(base, exponent): Returns base raised to exponentSELECT POWER(severity, 2) as severity_squared FROM logs -
SQRT(numeric): Returns square rootSELECT SQRT(severity) as severity_sqrt FROM logs -
SIGN(numeric): Returns sign (-1, 0, 1)SELECT SIGN(receivedtimestamp - eventtimestamp) as delay_sign FROM logs
Rounding functions
-
ROUND(numeric [, precision]): Rounds to specified decimal placesSELECT ROUND(severity / 3.0, 2) as severity_ratio FROM logs -
FLOOR(numeric): Rounds down to nearest integerSELECT FLOOR(severity / 4.0) as severity_group FROM logs -
CEIL(numeric)orCEILING(numeric): Rounds up to nearest integerSELECT CEIL(severity / 4.0) as severity_group_up FROM logs SELECT CEILING(severity / 4.0) as severity_group_up FROM logs -- Alternative syntax
Logarithmic and exponential functions
LN(numeric)orLOG(numeric): Natural logarithmLOG10(numeric): Base-10 logarithmLOG2(numeric): Base-2 logarithmEXP(numeric): Exponential function (e^x)LOG(base, numeric): Logarithm with specified base
Trigonometric functions
SIN(numeric),COS(numeric),TAN(numeric): Trigonometric functionsASIN(numeric),ACOS(numeric),ATAN(numeric): Inverse trigonometric functionsDEGREES(radians),RADIANS(degrees): Angle conversion
Date and time functions
Current time functions
CURRENT_TIMESTAMP: Returns current timestamp in local timezoneLOCALTIME: Returns current SQL timeCURRENT_DATE: Returns current SQL dateUNIX_TIMESTAMP(): Returns current Unix timestamp in seconds
Time extraction
EXTRACT(unit FROM temporal): Extracts time componentAvailable units: MILLENNIUM, CENTURY, DECADE, YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECONDSELECT EXTRACT(HOUR FROM eventtimestamp) as event_hour FROM logs SELECT EXTRACT(DAY FROM eventtimestamp) as event_day FROM logs SELECT EXTRACT(YEAR FROM eventtimestamp) as event_year FROM logs
Time conversion functions
-
TO_TIMESTAMP_LTZ(numeric, precision): Converts epoch timestamp to TIMESTAMP_LTZSELECT TO_TIMESTAMP_LTZ(JSON_VALUE(attributes, '$.timestamp' RETURNING BIGINT), 3) as event_time FROM logs -- 3 for milliseconds -
UNIX_TIMESTAMP(timestamp): Converts timestamp to Unix timestamp (seconds)SELECT UNIX_TIMESTAMP(eventtimestamp) as unix_time FROM logs -
TO_TIMESTAMP(string [, format]): Parses string to timestampSELECT TO_TIMESTAMP(JSON_VALUE(attributes, '$.timestamp'), 'yyyy-MM-dd HH:mm:ss') as parsed_time FROM logs
Conditional functions
IF conditional
IF(condition, true_value, false_value): Simple conditionalSELECT IF(severity > 16, 'HIGH', 'NORMAL') as priority FROM logs
CASE expressions
-
CASE WHEN condition THEN result [WHEN condition THEN result] ELSE result ENDSELECT *, CASE WHEN severity <= 4 THEN 'TRACE' WHEN severity <= 8 THEN 'DEBUG' WHEN severity <= 12 THEN 'INFO' WHEN severity <= 16 THEN 'WARN' WHEN severity <= 20 THEN 'ERROR' ELSE 'FATAL' END as severity_level FROM logs -
CASE value WHEN value1 THEN result1 WHEN value2 THEN result2 ELSE default ENDSELECT *, CASE severity WHEN 1 THEN 'TRACE' WHEN 9 THEN 'INFO' WHEN 17 THEN 'ERROR' ELSE 'OTHER' END as severity_name FROM logs
NULL handling functions
-
COALESCE(value1, value2, ...): Returns first non-NULL valueSELECT COALESCE(JSON_VALUE(attributes, '$.userId'), 'unknown') as user_id FROM logs -
NULLIF(value1, value2): Returns NULL if values are equalSELECT NULLIF(severity, 0) as non_zero_severity FROM logs
Comparison functions
-
GREATEST(value1, value2, ...): Returns maximum valueSELECT GREATEST(severity, 10) as min_severity FROM logs -
LEAST(value1, value2, ...): Returns minimum valueSELECT LEAST(severity, 20) as max_severity FROM logs
JSON functions
JSON validation
string IS JSON [VALUE | SCALAR | ARRAY | OBJECT]: Validates JSON formatSELECT * FROM logs WHERE attributes IS JSON OBJECT
JSON path functions
-
JSON_EXISTS(json_string, path): Checks if JSON path existsSELECT * FROM logs WHERE JSON_EXISTS(attributes, '$.process.thread') -
JSON_VALUE(json_string, path [RETURNING type] [ON ERROR | ON EMPTY]): Extracts scalar valueSELECT JSON_VALUE(attributes, '$.userId' RETURNING STRING) as user_id FROM logs SELECT JSON_VALUE(attributes, '$.responseTime' RETURNING INTEGER) as response_time FROM logs SELECT JSON_VALUE(attributes, '$.isSuccess' RETURNING BOOLEAN) as success FROM logs -
JSON_QUERY(json_string, path [RETURNING type] [ON ERROR | ON EMPTY]): Extracts JSON objects/arraysSELECT JSON_QUERY(attributes, '$.process') as process_info FROM logs SELECT JSON_QUERY(attributes, '$.tags[*]') as all_tags FROM logs
JSON construction
-
JSON_OBJECT([KEY key VALUE value] ...): Creates JSON objectSELECT JSON_OBJECT('severity' VALUE severity, 'timestamp' VALUE eventtimestamp) as event_summary FROM logs -
JSON_ARRAY([value1, value2, ...]): Creates JSON arraySELECT JSON_ARRAY(severity, EXTRACT(HOUR FROM eventtimestamp), tags['host'][1]) as event_array FROM logs
Hash functions
-
MD5(string): Returns MD5 hashSELECT MD5(CONCAT(id, message)) as content_hash FROM logs -
SHA1(string): Returns SHA-1 hash -
SHA256(string): Returns SHA-256 hashSELECT SHA256(message) as message_hash FROM logs
Aggregate functions
Basic aggregations
-
COUNT([DISTINCT] expression): Counts rows/distinct valuesSELECT COUNT(*) as total_logs FROM logs SELECT COUNT(DISTINCT tags['service'][1]) as unique_services FROM logs -
SUM([DISTINCT] expression): Sums numeric values -
AVG([DISTINCT] expression): Calculates average -
MIN(expression): Finds minimum value -
MAX(expression): Finds maximum valueSELECT tags['service'][1] as service, COUNT(*) as log_count, AVG(severity) as avg_severity FROM logs GROUP BY tags['service'][1]
Statistical functions
STDDEV_POP(expression): Population standard deviationSTDDEV_SAMP(expression): Sample standard deviationVAR_POP(expression): Population varianceVAR_SAMP(expression): Sample variance
Percentile functions
-
GREPR_PERCENTILES(numeric_expression, percentile1, percentile2, ...): Calculates specified percentiles-- Calculate median (50th percentile) and 95th percentile for response times -- Note that arrays are 1-indexed in SQL SELECT tags['service'][1] as service, GREPR_PERCENTILES(JSON_VALUE(attributes, '$.response_time' RETURNING DOUBLE), 0.5, 0.95) as percentiles FROM logs WHERE JSON_EXISTS(attributes, '$.response_time') GROUP BY tags['service'][1]Features:
- Supports multiple percentile values in a single function call.
- Uses exact calculation for small datasets (< 1000 values) for precision.
- Automatically switches to KLL sketches for large datasets with ±1.65% error guarantee.
- Handles NULL values gracefully by ignoring them.
- Returns an array of doubles corresponding to the requested percentiles.
- Percentile ranks must be between 0.0 and 1.0 (e.g., 0.5 for median, 0.95 for 95th percentile).
Collection functions
COLLECT(expression)- Collects values into a multisetLISTAGG(expression [, separator])- Concatenates values with separatorSELECT tags['service'][1] as service, LISTAGG(DISTINCT tags['host'][1], ',') as hosts FROM logs GROUP BY tags['service'][1]
Windowing and aggregations
To aggregate streaming data with Flink SQL, use windowing functions. To learn more, see Windowing table-valued functions (Windowing TVFs)Â .
Window functions (streaming)
For streaming data processing, Flink SQL provides windowing table-valued functions (TVFs). These functions create windows over time-based or count-based intervals, allowing you to perform streaming aggregations over subsets of data.
Important: When using streaming windowed aggregations along with GROUP BY, you must
include window_time as a group-by key.
Tumbling windows
Fixed-size, non-overlapping windows:
-- Get a breakdown of log count per severity per service in 5-minute windows
SELECT
window_start as eventtimestamp, -- Window start as event time
CONCAT('Window summary: ', service, ' had ', CAST(log_count AS STRING), ' logs with severity ',
CAST(severity AS STRING)) as message, -- Descriptive message
severity,
MAP['service', ARRAY[service], 'window_type', ARRAY['tumbling']] as tags, -- Service and window type tags
window_start, -- turns into an attribute
window_end, -- turns into an attribute
log_count -- turns into an attribute
FROM (
SELECT
window_start, -- Start time of each 5-minute window
window_end, -- End time of each 5-minute window
tags['service'][1] as service, -- Group by service name
severity, -- and severity
COUNT(*) as log_count -- Count total logs in each window
FROM TABLE(
-- Create 5-minute windows based on event timestamp
TUMBLE(TABLE logs, DESCRIPTOR(eventtimestamp), INTERVAL '5' MINUTES)
)
-- Group results by window and service to get aggregates per service per window
GROUP BY window_start, window_end, severity, tags['service'][1]
)Hopping (sliding) windows
Fixed-size windows that can overlap:
-- Convert hopping window analysis into system health LogEvents
SELECT
-- Generate synthetic LogEvent fields for each window health report
MD5(CONCAT(CAST(window_start AS STRING), 'system_health')) as id,
UNIX_TIMESTAMP(window_end) * 1000 as eventtimestamp, -- Window end as event time
UNIX_TIMESTAMP(CURRENT_TIMESTAMP) * 1000 as receivedtimestamp,
CONCAT('System health: ', CAST(log_count AS STRING), ' total logs across ',
CAST(service_count AS STRING), ' services') as message,
-- Set severity based on activity level
CASE
WHEN log_count < 100 THEN 13 -- WARN: Low activity
WHEN log_count > 10000 THEN 13 -- WARN: High activity
ELSE 9 -- INFO: Normal activity
END as severity,
MAP['analysis_type', ARRAY['system_health'], 'window_type', ARRAY['hopping']] as tags,
JSON_OBJECT(
'window_start' VALUE CAST(window_start AS STRING),
'window_end' VALUE CAST(window_end AS STRING),
'total_log_count' VALUE log_count,
'active_service_count' VALUE service_count,
'activity_level' VALUE CASE
WHEN log_count < 100 THEN 'low'
WHEN log_count > 10000 THEN 'high'
ELSE 'normal'
END,
'analysis_type' VALUE 'hopping_window_health'
) as attributes,
'system' as service, -- Synthetic service for system-wide metrics
'health-monitor' as host -- Synthetic host for health monitoring
FROM (
SELECT
window_start, -- Start time of each window
window_end, -- End time of each window
COUNT(*) as log_count, -- Total logs in this window
COUNT(DISTINCT tags['service'][1]) as service_count -- Number of unique services in this window
FROM TABLE(
HOP(
TABLE logs,
DESCRIPTOR(eventtimestamp),
INTERVAL '1' MINUTE, -- Window slides every 1 minute
INTERVAL '5' MINUTES -- Each window covers 5 minutes of data
)
-- This creates overlapping 5-minute windows that move forward every minute
)
-- Group by window boundaries to get one result per window
GROUP BY window_start, window_end
)
-- This converts each health summary into a proper LogEventSession windows
Group events by activity sessions:
-- Convert session analysis into session summary LogEvents
SELECT
-- Generate synthetic LogEvent fields for each session summary
MD5(CONCAT(CAST(window_start AS STRING), tags['service'][1], 'session')) as id,
window_end as eventtimestamp, -- Session end as event time
UNIX_TIMESTAMP(CURRENT_TIMESTAMP) * 1000 as receivedtimestamp,
CONCAT('Session completed for ', tags['service'][1], ': ', CAST(session_events AS STRING),
' events over ', CAST((window_end - window_start) / 60000 AS STRING), ' minutes') as message,
-- Set severity based on session length and activity
CASE
WHEN session_events < 5 THEN 5 -- DEBUG: Short session
WHEN session_events > 1000 THEN 13 -- WARN: Very active session
ELSE 9 -- INFO: Normal session
END as severity,
MAP['service', ARRAY[tags['service'][1]], 'session_type', ARRAY['activity_session']] as tags,
JSON_OBJECT(
'session_start' VALUE window_start,
'session_end' VALUE window_end,
'session_duration_ms' VALUE (window_end: window_start),
'session_duration_minutes' VALUE ((window_end - window_start) / 60000),
'event_count' VALUE session_events,
'events_per_minute' VALUE ROUND(session_events / ((window_end - window_start) / 60000.0), 2),
'analysis_type' VALUE 'session_summary'
) as attributes,
service, -- Required service field
'session-analyzer' as host -- Synthetic host for session analysis
FROM (
SELECT
window_start, -- When this session started
window_end, -- When this session ended
tags['service'][1] as service, -- Service name for this session
COUNT(*) as session_events -- Number of events in this session
FROM TABLE(
SESSION(
TABLE logs PARTITION BY tags['service'][1], -- Create separate sessions for each service
DESCRIPTOR(eventtimestamp),
INTERVAL '10' MINUTES -- If no events for 10 minutes, end the session
)
-- Groups consecutive events into sessions based on time gaps
)
-- Group by session boundaries and service
GROUP BY window_start, window_end, tags['service'][1]
)
-- This converts each session summary into a proper LogEventCumulative windows
Windows that expand incrementally:
SELECT
window_start,
window_end,
service,
COUNT(*) as cumulative_count
FROM TABLE(
CUMULATE(
TABLE logs,
DESCRIPTOR(eventtimestamp),
INTERVAL '1' HOUR, -- step size
INTERVAL '1' DAY -- max window size
)
)
GROUP BY window_start, window_end, tags['service'][1]Analytical Window functions
Ranking functions
ROW_NUMBER() OVER (window_spec): Sequential row numbersRANK() OVER (window_spec): Ranking with gapsDENSE_RANK() OVER (window_spec): Ranking without gapsPERCENT_RANK() OVER (window_spec): Percentage rank
SELECT *, -- Include all original fields
-- Assign sequential numbers to events in each service (newest first)
ROW_NUMBER() OVER (ORDER BY eventtimestamp) as event_sequence
FROM logsGROUP BY operations
Basic grouping
The example below groups messages by service and severity, counting the number of logs in each group for each minute. This is a common operation to summarize log data.
WITH parsed AS (
SELECT
tags['service'][1] as service, -- Extract service name
severity, -- Extract severity level
eventtimestamp -- Use event timestamp for windowing
FROM logs
),
aggregated AS (
SELECT
service, -- Service name
severity, -- Severity level
COUNT(*) as evtcount, -- Count of logs in each group
MIN(eventtimestamp) as first_event, -- Earliest event in group
MAX(eventtimestamp) as last_event -- Latest event in group
FROM TABLE(TUMBLE(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end, service, severity -- Group by service and severity
)
SELECT
last_event as eventtimestamp, -- Use the latest event time as the summary event time
CONCAT('Service ', service, ' severity ', CAST(severity AS STRING), ' had ',
CAST(evtcount AS STRING), ' events') as message, -- Descriptive summary message
severity, -- Use the grouped severity level
MAP['service', ARRAY[service], 'analysis_type', ARRAY['group_summary']] as tags,
JSON_OBJECT(
'event_count' VALUE evtcount,
'first_event' VALUE first_event,
'last_event' VALUE last_event,
'analysis_type' VALUE 'service_severity_summary'
) as attributes
FROM aggregatedHAVING clause
Filters groups after aggregation. The example below creates new log messages when the error count exceeds a threshold. This is an example of the complex alerting rules you can create with SQL.
-- Convert error count analysis into alert LogEvents
WITH parsed AS (
SELECT
tags['service'][1] as service, -- Extract service name
severity, -- Extract severity level
eventtimestamp -- Use event timestamp for windowing
FROM logs
),
-- HAVING filters groups, WHERE filters individual rows
aggregated AS (
SELECT
service, -- Service name
COUNT(*) as error_count -- Count of error-level logs
FROM TABLE(HOP(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '1' MINUTE, INTERVAL '10' MINUTES))
WHERE severity >= 17 -- First filter: only ERROR level logs (17+)
GROUP BY window_start, window_end, service -- Group by service to count errors per service
HAVING COUNT(*) > 100 -- Second filter: only show services with 100+ errors
)
SELECT
CONCAT('HIGH ERROR ALERT: Service ', service, ' has ',
CAST(error_count AS STRING), ' error-level events') as message,
21 as severity, -- FATAL level for high error count alerts
MAP['service', ARRAY[service], 'alert_type', ARRAY['high_error_count'],
'severity_level', ARRAY['FATAL']] as tags,
JSON_OBJECT(
'error_count' VALUE error_count,
'error_threshold' VALUE 100,
'severity_filter' VALUE 'ERROR_LEVEL_17_PLUS',
'alert_reason' VALUE 'error_count_exceeded_threshold',
'analysis_type' VALUE 'error_count_monitoring'
) as attributes
FROM aggregatedAdvanced grouping
GROUPING SETS
Create multiple aggregation levels. The example below creates multiple reporting hierarchy levels using grouping sets, resulting in varied granularity summarizations.
-- Convert multi-level aggregations into hierarchy report LogEvents
-- Start by extracting all the items we care about from the LogEvents
WITH parsed AS (
SELECT
tags['service'][1] as service, -- Extract service name
tags['host'][1] as host, -- Extract host name
severity, -- Extract severity level
eventtimestamp -- Use event timestamp for windowing
FROM logs
),
-- Execute the aggregation
WITH aggregated AS (
SELECT
window_start, -- Start of the aggregation window
service, -- Service name
host, -- Host name
severity, -- Severity level
COUNT(*) as log_count -- Count of logs in this aggregation level
FROM TABLE(TUMBLE(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '10' SECONDS))
GROUP BY window_start, window_end, GROUPING SETS (
(service, host, severity), -- Most detailed: by service + host + severity
(service, severity), -- Medium detail: by service + severity (all hosts combined)
(service), -- Less detail: by service only (all hosts and severities)
() -- Least detail: total count (no grouping)
)
)
-- Create nice messages and attributes for each aggregation level
SELECT
window_start as eventtimestamp, -- window time as eventtimestamp
-- Create descriptive message based on aggregation level
CASE
WHEN service IS NOT NULL AND host IS NOT NULL AND severity IS NOT NULL
THEN CONCAT('Detailed: ', service, '@', host, ' severity=', CAST(severity AS STRING), ' count=', CAST(log_count AS STRING))
WHEN service IS NOT NULL AND severity IS NOT NULL
THEN CONCAT('Service summary: ', service, ' severity=', CAST(severity AS STRING), ' count=', CAST(log_count AS STRING))
WHEN service IS NOT NULL
THEN CONCAT('Service total: ', service, ' count=', CAST(log_count AS STRING))
ELSE CONCAT('Grand total: ', CAST(log_count AS STRING), ' logs')
END as message,
MAP[
'service', ARRAY[COALESCE(service, 'ALL')],
'host', ARRAY[COALESCE(host, 'ALL')],
'report_type', ARRAY['hierarchy_summary'],
'aggregation_level', ARRAY[
CASE
WHEN service IS NOT NULL AND host IS NOT NULL AND severity IS NOT NULL THEN 'detailed'
WHEN service IS NOT NULL AND severity IS NOT NULL THEN 'service_severity'
WHEN service IS NOT NULL THEN 'service_total'
ELSE 'grand_total'
END
]
] as tags,
JSON_OBJECT(
'service' VALUE COALESCE(service, 'ALL'),
'host' VALUE COALESCE(host, 'ALL'),
'severity' VALUE COALESCE(severity, -1),
'log_count' VALUE log_count
) as attributes
FROM aggregated
)ROLLUP
Generate hierarchical aggregations. Generates all prefixes of the grouping columns. This is a batch example:
SELECT
tags['service'][1] as service,
tags['host'][1] as host,
COUNT(*) as log_count
FROM logs
GROUP BY ROLLUP (tags['service'][1], tags['host'][1])
-- Generates: (service, host), (service), ()CUBE
Generate all possible combinations of the grouping columns. This is a batch example:
SELECT
tags['service'][1] as service,
severity,
COUNT(*) as log_count
FROM logs
GROUP BY CUBE (tags['service'][1], severity)
-- Generates: (service, severity), (service), (severity), ()Event-Level Analytical functions
Event ranking in service
-- Add ranking information to individual log events
SELECT *, -- Include all LogEvent fields
-- Add ranking of events in each service by severity
RANK() OVER (PARTITION BY tags['service'][1] ORDER BY severity DESC) as severity_rank_in_service,
-- Add row numbers to events in each service (most recent first)
ROW_NUMBER() OVER (PARTITION BY tags['service'][1] ORDER BY eventtimestamp DESC) as event_sequence,
-- Add the severity of the previous event in this service
LAG(severity, 1) OVER (PARTITION BY tags['service'][1] ORDER BY eventtimestamp) as prev_event_severity
FROM logs
-- This returns individual LogEvents with additional analytical contextImportant rules
LogEvent fields
Your SQL query can return any combination of LogEvent fields, but when specified, they must be of the acceptable types:
id(STRING): Globally unique identifier for the log event.eventtimestamp(BIGINTorTIMESTAMP_LTZorTIMESTAMP): WhenBIGINT, milliseconds since Unix epoch when event occurred.receivedtimestamp(BIGINTorTIMESTAMP_LTZorTIMESTAMP): WHENBIGINT, milliseconds since Unix epoch when Grepr received the event.message(STRING): The log message content.severity(INT): The severity level of the event. This is a value between 1 and 24, following the OpenTelemetry convention.tags(MAP<STRING, ARRAY<STRING>>): Key-value pairs for filtering and routing.attributes(STRING): JSON string containing structured data.
Additional fields: Any extra computed columns are automatically added to the event’s attributes map.
Default values for omitted fields
When core LogEvent fields are omitted from your SELECT clause, the original values from the input events are preserved. However, if you’re creating entirely new events, for example, during windowed aggregations, the following is the default behavior when field values are not specified:
id: Generated unique identifier if not provided.eventtimestamp: Current timestamp if not provided.receivedtimestamp: Current timestamp if not provided.message: Empty string if not provided.severity: 9 (INFO level) if not provided.tags: Empty map if not provided.attributes: Empty map if not provided.
Note: In most cases, you’ll want to use SELECT * or include specific fields to preserve data from the input events.
Examples
-- Updates only message, all other fields preserved
SELECT *, UPPER(message) as message FROM logs
-- Updates message and severity, adds computed field into attributes
SELECT *, UPPER(message) as message, severity + 1 as severity, CHAR_LENGTH(message) as msg_length FROM logsHandling attributes fields
- Use JSON functions (JSON_VALUE, JSON_QUERY, JSON_EXISTS) to work with attributes.
- Type conversion is automatic for basic types (string, number, boolean).