Skip to Content
TransformsSQL Transform

Transform events with the SQL operation

You can use the SQL operation to process data with SQL statements. With support for multiple statement types and complex data flows, the SQL operation gives you a powerful and flexible way to process, filter, and enrich data using a familiar SQL syntax. The SQL operation is supported in the Grepr UI and REST API. To use the API, see the (SqlOperation API spec).

The SQL Operation complies with the ANSI SQL standard. It includes the full capabilities of Apache Flink SQL 1.19, including access to a comprehensive set of built-in functions and operators for data processing. This document describes a subset of the available capabilities. To learn more about Flink SQL, see the Flink SQL documentation .

The SQL Operation processes data through a sequence of SQL statements, and each statement can reference:

  • Input tables: Tables from connected upstream operations (defined in inputs configuration).
  • Previously created tables: Tables created by earlier VIEW statements in the sequence.
  • Dataset tables: Tables from available datasets (defined in the availableDatasets configuration).

When the SQL operation runs, it validates that:

  • All VIEW statement table names are unique within the operation.
  • All OUTPUT statement output names are unique within the operation.
  • At least one OUTPUT statement exists to produce results.

The SQL operation functionality includes:

  • Support for VIEW statements that create temporary tables for intermediate results.
  • Support for OUTPUT statements that produce results for downstream operations.
  • Referencing input tables from connected streams and datasets.
  • Support for multiple data types such as LOG_EVENT, VARIANT, and COMPLETE_SPAN.
  • Supports flexible table naming and output management.

Use cases

The SQL operation is ideal for:

Data normalization and standardization

  • Message format standardization: Converting various log formats to a standard structure.
  • Timestamp normalization: Converting different timestamp formats to standard milliseconds.
  • Field standardization: Ensuring consistent field names and types across services.

Log enhancement and enrichment

  • Computed metrics: Adding calculated fields like processing delays, error rates.
  • Categorization: Adding severity levels, priority flags, or classification labels.
  • Contextual data: Extracting and promoting key information from log messages.

Security and compliance

  • Data redaction: Removing or masking sensitive information (passwords, tokens, PII).
  • Audit trail enhancement: Adding tracking fields for compliance requirements.
  • Anomaly flagging: Marking unusual patterns or suspicious activities.

Performance and monitoring

  • SLA monitoring: Extracting response times and performance metrics.
  • Error analysis: Categorizing and enriching error logs for better debugging.
  • Trend analysis: Preparing log data for time-series analysis and alerting.

Data pipeline optimization

  • Selective processing: Filtering logs to reduce downstream processing load.
  • Format conversion: Converting logs for compatibility with downstream systems.
  • Batch preparation: Aggregating and preparing data for batch analytics.

Statement Types

The SQL operation supports the following statement types:

VIEW statements

VIEW statements create temporary tables that subsequent statements can reference:

{ "type": "sql_view", "tableName": "filtered_events", "sqlQuery": "SELECT * FROM logs WHERE severity >= 13" }

OUTPUT statements

OUTPUT statements convert table results to DataStream outputs for downstream operations:

{ "type": "sql_output", "outputName": "high_severity_logs", "outputType": "LOG_EVENT", "sqlQuery": "SELECT * FROM filtered_events WHERE message LIKE '%ERROR%'" }

Configure the SQL operation

The SQL operation supports the following configuration options:

  • inputs (Map<String, GreprDataType>): A map of input table names and their data type, where each input table corresponds to an input stream that is connected to this operation in the job graph, and the type specifies the input’s schema. Each input appears as a table that can be queried in SQL statements, where the type defines the table’s schema.
  • statements (List<SqlStatement>): SQL statements to execute in order.
  • availableDatasets (Set<String>): Identifiers for DataSets containing tables that can be referenced.
  • globalStateTtl (Duration, the default is 2 minutes): The global state time-to-live. When executing stateful streaming operations, any state that is unused after this time will be dropped to reduce memory pressure.

Supported Data Types

The SQL Operation supports three primary data types:

LOG_EVENT

Structured log events with core fields and flexible attributes:

-- Schema: ROW<id STRING, eventTimestamp TIMESTAMP_LTZ(3), receivedTimestamp TIMESTAMP_LTZ(3), -- message STRING, severity INT, tags MAP<STRING, ARRAY<STRING>>, attributes STRING>

LOG_EVENT schema definition

When using the LOG_EVENT data type, your input log events are available through tables with the following schema:

ColumnTypeDescription
idSTRINGGlobally unique identifier for the log event
eventtimestampTIMESTAMP_LTZ(3)Milliseconds since the Unix epoch when the event occurred (parsed from the event)
receivedtimestampTIMESTAMP_LTZ(3)Milliseconds since the Unix epoch when Grepr received the event
messageSTRINGThe log message content
severityINTLog severity level (1-24, following OpenTelemetry convention)
tagsMAP<STRING, ARRAY<STRING>>Key-value pairs for filtering and routing
attributesSTRINGJSON string containing structured data associated with the event

Important Notes:

  • If you’re transforming a field like message or severity, the transformed field must appear before the * wildcard in your SELECT clause.

Custom LogEvent construction

Grepr automatically adds default values for fields that are not specified. The defaults are:

  • id: When not specified or NULL, defaults to a new random UUID.
  • eventtimestamp: When not specified or NULL, defaults to the current timestamp in milliseconds.
  • receivedtimestamp: When not specified or NULL, defaults to the current timestamp in milliseconds.
  • severity: When not specified or NULL, defaults to 9 (INFO level).
  • message: When not specified or NULL, defaults to an empty string.
  • tags: When not specified or NULL, defaults to an empty map.
  • attributes: When not specified or NULL, defaults to an empty JSON object.
-- Explicitly construct a new LogEvent with specific fields. SELECT UPPER(message) as message, CASE WHEN severity > 16 THEN 21 ELSE severity END as severity, MAP['transformed', ARRAY['true']] as tags, JSON_OBJECT('original_message' VALUE message) as attributes FROM logs

View example input and output

Input:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 17, "message": "user login successful", "tags": {"service": ["auth"]}, "attributes": {} }

Output:

{ "id": "12345678-1234-5678-1234-567812345678", // New UUID generated "eventtimestamp": 174000000000, // Current timestamp in milliseconds "receivedtimestamp": 174000000002, // Current timestamp in milliseconds "severity": 21, // Adjusted severity "message": "USER LOGIN SUCCESSFUL", // Transformed message "tags": {"transformed": ["true"]}, // New tags "attributes": { // New attributes "original_message": "user login successful" } }

VARIANT

Timestamped semi-structured data with flexible JSON content:

-- Schema: ROW<receivedTimestamp TIMESTAMP_LTZ(3), eventTimestamp TIMESTAMP_LTZ(3), data STRING>

Variant example

{ "receivedTimestamp": 174000000000, "eventTimestamp": 174000000000, "data": { "count": 10 } }

COMPLETE_SPAN

OpenTelemetry spans with full resource and scope context:

-- Schema: Complex nested ROW structure with resource, scope, and span fields

Basic usage

Identity query

SELECT * FROM logs

Returns all log events unchanged.

View example input and output

Input:

{ "id": "0H19GZK97FTKS", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "State backend is set to heap memory", "tags": { "service": ["grepr-query"], "host": ["ip-10-12-4-129.ec2.internal"] }, "attributes": { "process": { "thread": { "name": "thread-0" } } } }

Output:

{ "id": "0H19GZK97FTKS", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "State backend is set to heap memory", "tags": { "service": ["grepr-query"], "host": ["ip-10-12-4-129.ec2.internal"] }, "attributes": { "process": { "thread": { "name": "thread-0" } } } }

Simple filtering

SELECT * FROM logs WHERE severity < 5

Filters events to include only those with severity less than 5 (TRACE level).

View example input and output

Input:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 1, "message": "Debug trace message", "tags": {"service": ["api"]}, "attributes": {} }, { "id": "evt2", "eventtimestamp": 1724214074063, "receivedtimestamp": 1724214074189, "severity": 9, "message": "Info level message", "tags": {"service": ["api"]}, "attributes": {} } ]

Output:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 1, "message": "Debug trace message", "tags": {"service": ["api"]}, "attributes": {} } ]

Note: The INFO level message (severity 9) was filtered out.

Message transformation

SELECT UPPER(message) as message, * FROM logs

Converts all log messages to uppercase.

Important: When transforming existing fields, to override the original value, the transformed column must come before the * wildcard.

Adding computed fields

SELECT *, UPPER(message) as upper_message, CHAR_LENGTH(message) as message_length, severity * 10 as severity_scaled FROM logs

Adds three new computed fields to the event attributes.

View example input and output

Input:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Processing request", "tags": {"service": ["api"]}, "attributes": { "userId": "12345" } }

Output:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Processing request", "tags": {"service": ["api"]}, "attributes": { "userId": "12345", "upper_message": "PROCESSING REQUEST", "message_length": 18, "severity_scaled": 90 } }

Note: Computed fields are added to the attributes object.

Multi-statement examples

Basic multi-statement operation

The following example shows how to create a multi-statement operation that filters and transforms data:

{ "type": "sql-operation", "inputs": { "logs": "LOG_EVENT" }, "statements": [ { "type": "sql_view", "tableName": "high_severity_logs", "sqlQuery": "SELECT * FROM logs WHERE severity >= 13" }, { "type": "sql_output", "outputName": "processed_logs", "outputType": "LOG_EVENT", "sqlQuery": "SELECT *, UPPER(message) as message, 'processed' as processing_status FROM high_severity_logs" } ], "watermarkDelay": "PT5S", "globalStateTtl": "PT2M" }

This operation:

  1. Creates a VIEW that selects logs with severity >= 13.
  2. Produces an OUTPUT that transforms the filtered logs by uppercasing messages and adding a processing status.

Complex multi-statement pipeline

{ "type": "sql-operation", "inputs": { "logs": "LOG_EVENT" }, "statements": [ { "type": "sql_view", "tableName": "parsed_logs", "sqlQuery": "SELECT *, JSON_VALUE(attributes, '$.userId') as user_id, JSON_VALUE(attributes, '$.requestId') as request_id FROM logs WHERE JSON_EXISTS(attributes, '$.userId')" }, { "type": "sql_view", "tableName": "user_activity_summary", "sqlQuery": "SELECT user_id, COUNT(*) as activity_count, MAX(eventtimestamp) as last_activity FROM parsed_logs GROUP BY user_id" }, { "type": "sql_output", "outputName": "enriched_logs", "outputType": "LOG_EVENT", "sqlQuery": "SELECT p.*, s.activity_count, s.last_activity FROM parsed_logs p LEFT JOIN user_activity_summary s ON p.user_id = s.user_id" }, { "type": "sql_output", "outputName": "user_summaries", "outputType": "LOG_EVENT", "sqlQuery": "SELECT user_id as id, last_activity as eventtimestamp, CURRENT_TIMESTAMP as receivedtimestamp, CONCAT('User ', user_id, ' had ', CAST(activity_count AS STRING), ' activities') as message, 9 as severity, MAP['summary_type', ARRAY['user_activity']] as tags, JSON_OBJECT('activity_count' VALUE activity_count) as attributes FROM user_activity_summary" } ] }

This operation:

  1. Filters logs that have user IDs and extracts user and request IDs.
  2. Creates a summary of user activity counts.
  3. Enriches original logs with user activity summaries.
  4. Creates synthetic log events summarizing user activity.

Advanced examples

Content-based filtering and enrichment

SELECT *, -- Select all original fields -- Add a category based on message content CASE WHEN message LIKE '%ERROR%' THEN 'error_detected' -- If message contains "ERROR" WHEN message LIKE '%WARN%' THEN 'warning_detected' -- If message contains "WARN" ELSE 'normal' -- For all other messages END as log_category, -- Replace any IP-like address with placeholder text REGEXP_REPLACE(message, '\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}', '<redacted>') as sanitized_message FROM logs WHERE severity >= 13 -- Only process WARN level and above (13-24)

View example input and output

Input:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 17, "message": "ERROR: Database connection to IP 192.1.23.32 failed", "tags": {"service": ["database"]}, "attributes": {} }, { "id": "evt2", "eventtimestamp": 1724214074063, "receivedtimestamp": 1724214074189, "severity": 9, "message": "INFO: Processing complete", "tags": {"service": ["api"]}, "attributes": {} } ]

Output:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 17, "message": "ERROR: Database connection to IP 192.1.23.32 failed", "tags": {"service": ["database"]}, "attributes": { "log_category": "error_detected", "sanitized_message": "ERROR: Database connection to ip redacted failed", } } ]

Note: INFO level message was filtered out due to severity < 13.

JSON attribute processing

SELECT *, -- Select all original fields -- Extract thread name from nested JSON structure JSON_VALUE(attributes, '$.process.thread.name') as thread_name, -- Extract status field from JSON attributes JSON_VALUE(attributes, '$.status') as status_code FROM logs -- Only process logs that have a 'process' field in their JSON attributes WHERE JSON_EXISTS(attributes, '$.process')

View example input and output

Input:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Request processed", "tags": {"service": ["api"]}, "attributes": { "process": { "thread": { "name": "worker-1" } }, "status": "success" }, { "id": "evt2", "eventtimestamp": 1724214074063, "receivedtimestamp": 1724214074189, "severity": 9, "message": "Simple log message", "tags": {"service": ["web"]}, "attributes": { "simple": true } } ]

Output:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Request processed", "tags": {"service": ["api"]}, "attributes": { "process": {"thread": {"name": "worker-1"}}, "status": "success", "thread_name": "worker-1", "status_code": "success" } } ]

Note: Events without a ‘process’ field are filtered out.

Time-Based Analysis

SELECT *, -- Select all original fields -- Extract the hour (0-23) from the event timestamp EXTRACT(HOUR FROM eventtimestamp) as event_hour, -- Extract the day of the month (1-31) from the event timestamp EXTRACT(DAY FROM eventtimestamp) as event_day FROM logs

View example input and output

Input:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Processing request", "tags": {"service": ["api"]}, "attributes": {} }

Output:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Processing request", "tags": {"service": ["api"]}, "attributes": { "event_hour": 4, "event_day": 21 } }

Note: Assumes event occurred on August 21, 2024, at 04:21:14 UTC.

Multi-field transformations

SELECT -- Explicitly select these core fields id, eventtimestamp, receivedtimestamp, tags, attributes, -- Transform the message by adding a suffix CONCAT(message, ' [PROCESSED]') as message, -- Increase severity by 1 (e.g., INFO becomes WARN) severity + 1 as severity, -- Store the original message in uppercase as a computed field UPPER(message) as original_uppercase, -- Store the original severity scaled by 100 as a computed field severity * 100 as original_severity_scaled FROM logs

View example input and output

Input:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "user authenticated", "tags": {"service": ["auth"]}, "attributes": {} }

Output:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 10, "message": "user authenticated [PROCESSED]", "tags": {"service": ["auth"]}, "attributes": { "original_uppercase": "USER AUTHENTICATED", "original_severity_scaled": 900 } }

Note: Core fields (message, severity) are modified, and computed fields are added to attributes.

Function compatibility

The SQL Operation uses Apache Flink SQL 1.19 and is ANSI SQL compliant. Here are important function compatibility notes:

Timestamp functions

  • Use: TO_TIMESTAMP_LTZ(eventtimestamp, 3) to convert millisecond timestamps to TIMESTAMP
  • Don’t use: TIMESTAMP_MILLIS(). This function is not available in Flink SQL 1.19. Use TO_TIMESTAMP_LTZ() instead.

String functions

  • Available: Most standard SQL string functions are supported, includingREGEXP_REPLACE(), UPPER(), LOWER(), CONCAT(), CHAR_LENGTH(), LEFT(), and RIGHT().
  • Note: SPLIT_INDEX() is not available in Flink SQL 1.19.

Date and time functions

  • Available: Standard date and time functions, such asEXTRACT(), CURRENT_TIMESTAMP, and DATE_FORMAT(), are supported.
  • Note: Always convert millisecond timestamps first using TO_TIMESTAMP_LTZ().

JSON functions

  • Available: JSON_VALUE(), JSON_EXISTS() for extracting data from JSON strings.
  • Note: The attributes field is a JSON string that can be processed with these functions.

For a complete list of available functions, see the (https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/functions/systemfunctions/ ).

SQL operators

The SQL operation supports all standard SQL operators available in Flink SQL 1.19:

Comparison operators

  • =: Equal to
  • <> or !=: Not equal to
  • <: Less than
  • <=: Less than or equal to
  • >: Greater than
  • >=: Greater than or equal to
  • BETWEEN value1 AND value2: Range comparison
  • NOT BETWEEN value1 AND value2: Negated range comparison

Examples:

SELECT * FROM logs WHERE severity = 17 SELECT * FROM logs WHERE severity BETWEEN 13 AND 20 SELECT * FROM logs WHERE eventtimestamp > 1724214074000

Logical operators

  • AND: Logical AND
  • OR: Logical OR
  • NOT: Logical NOT

Examples:

SELECT * FROM logs WHERE severity > 16 AND message LIKE '%error%' SELECT * FROM logs WHERE severity = 17 OR severity = 21 SELECT * FROM logs WHERE NOT (severity < 5)

Pattern matching operators

  • LIKE 'pattern': SQL pattern matching with % (any chars) and _ (single char)
  • NOT LIKE 'pattern': Negated pattern matching
  • SIMILAR TO 'regex': Regular expression matching
  • NOT SIMILAR TO 'regex': Negated regex matching

Examples:

SELECT * FROM logs WHERE message LIKE '%failed%' SELECT * FROM logs WHERE message LIKE 'Error: ___' -- exactly 3 chars after "Error: " SELECT * FROM logs WHERE message SIMILAR TO '^[A-Z]+:' -- starts with uppercase letters and colon

NULL handling operators

  • IS NULL: Check for NULL values
  • IS NOT NULL: Check for non-NULL values

Examples:

SELECT * FROM logs WHERE attributes IS NOT NULL SELECT * FROM logs WHERE JSON_VALUE(attributes, '$.userId') IS NULL

Arithmetic operators

  • +: Addition
  • -: Subtraction
  • *: Multiplication
  • /: Division
  • % or MOD(): Modulo

Examples:

SELECT *, severity * 10 as severity_scaled FROM logs

Available SQL functions

The SQL Operation supports the comprehensive function library of Flink SQL. The following describes a subset of available functions. For details on all supported functions, see the Flink documentation .

String functions

Case manipulation

  • UPPER(string): Converts string to uppercase

    SELECT UPPER(message) as upper_message FROM logs -- Input: "Error occurred" → Output: "ERROR OCCURRED"
  • LOWER(string): Converts string to lowercase

    SELECT LOWER(message) as lower_message FROM logs -- Input: "Error OCCURRED" → Output: "error occurred"
  • INITCAP(string): Capitalizes first letter of each word

    SELECT INITCAP(message) as title_case FROM logs -- Input: "user login failed" → Output: "User Login Failed"

String extraction and manipulation

  • SUBSTR(string, start [, length]): Extracts substring

    SELECT SUBSTR(message, 1, 10) as msg_prefix FROM logs SELECT SUBSTR(message, 5) as msg_suffix FROM logs -- from position 5 to end
  • CONCAT(string1, string2, ...): Concatenates strings

    SELECT CONCAT('[', severity, '] ', message) as formatted_message FROM logs
  • CONCAT_WS(separator, string1, string2, ...): Concatenates with separator

    SELECT CONCAT_WS(' | ', tags['host'][1], tags['service'][1], message) as combined FROM logs
  • TRIM([BOTH | LEADING | TRAILING] [characters] FROM string): Removes specified characters

    SELECT TRIM(message) as clean_message FROM logs -- removes whitespace SELECT TRIM(LEADING '0' FROM id) as clean_id FROM logs -- removes leading zeros
  • REPLACE(string, search, replacement): Replaces all occurrences

    SELECT REPLACE(message, 'ERROR', 'ALERT') as modified_message FROM logs
  • REGEXP_REPLACE(string, pattern, replacement): Regex-based replacement

    SELECT REGEXP_REPLACE(message, '\\\\d{4}-\\\\d{2}-\\\\d{2}', 'YYYY-MM-DD') as sanitized FROM logs SELECT REGEXP_REPLACE(message, 'user=[^\\\\s]+', 'user=REDACTED') as redacted FROM logs

String information functions

  • CHAR_LENGTH(string) or LENGTH(string): Returns character count

    SELECT CHAR_LENGTH(message) as msg_length FROM logs SELECT LENGTH(message) as msg_length FROM logs -- Alternative syntax
  • POSITION(substring IN string): Finds substring position (1-based)

    SELECT POSITION('error' IN LOWER(message)) as error_pos FROM logs
  • ASCII(string): Returns ASCII value of first character

    SELECT ASCII(LEFT(message, 1)) as first_char_code FROM logs

Advanced string functions

  • LPAD(string, length, pad_string): Left-pads string

    SELECT LPAD(CAST(severity AS STRING), 3, '0') as padded_severity FROM logs -- Input: "9" → Output: "009"
  • RPAD(string, length, pad_string): Right-pads string

    SELECT RPAD(tags['service'][1], 10, '-') as padded_service FROM logs
  • LEFT(string, length): Returns leftmost characters

    SELECT LEFT(message, 10) as msg_prefix FROM logs
  • RIGHT(string, length): Returns rightmost characters

    SELECT RIGHT(message, 10) as msg_suffix FROM logs
  • REGEXP_EXTRACT(string, pattern [, group]): Extracts regex match

    SELECT REGEXP_EXTRACT(message, 'response_time=(\\\\d+)ms', 1) as response_time FROM logs SELECT REGEXP_EXTRACT(message, 'user=([^\\\\s]+)', 1) as username FROM logs

Numeric functions

Basic mathematical functions

  • ABS(numeric): Returns absolute value

    SELECT ABS(severity: 12) as severity_diff FROM logs
  • MOD(numeric1, numeric2): Returns remainder

    SELECT MOD(severity, 4) as severity_mod FROM logs
  • POWER(base, exponent): Returns base raised to exponent

    SELECT POWER(severity, 2) as severity_squared FROM logs
  • SQRT(numeric): Returns square root

    SELECT SQRT(severity) as severity_sqrt FROM logs
  • SIGN(numeric): Returns sign (-1, 0, 1)

    SELECT SIGN(receivedtimestamp: eventtimestamp) as delay_sign FROM logs

Rounding functions

  • ROUND(numeric [, precision]): Rounds to specified decimal places

    SELECT ROUND(severity / 3.0, 2) as severity_ratio FROM logs
  • FLOOR(numeric): Rounds down to nearest integer

    SELECT FLOOR(severity / 4.0) as severity_group FROM logs
  • CEIL(numeric) or CEILING(numeric): Rounds up to nearest integer

    SELECT CEIL(severity / 4.0) as severity_group_up FROM logs SELECT CEILING(severity / 4.0) as severity_group_up FROM logs -- Alternative syntax

Logarithmic and exponential functions

  • LN(numeric) or LOG(numeric): Natural logarithm
  • LOG10(numeric): Base-10 logarithm
  • LOG2(numeric): Base-2 logarithm
  • EXP(numeric): Exponential function (e^x)
  • LOG(base, numeric): Logarithm with specified base

Trigonometric functions

  • SIN(numeric), COS(numeric), TAN(numeric): Trigonometric functions
  • ASIN(numeric), ACOS(numeric), ATAN(numeric): Inverse trigonometric functions
  • DEGREES(radians), RADIANS(degrees): Angle conversion

Date and time functions

Current time functions

  • CURRENT_TIMESTAMP: Returns current timestamp in local timezone
  • LOCALTIME: Returns current SQL time
  • CURRENT_DATE: Returns current SQL date
  • UNIX_TIMESTAMP(): Returns current Unix timestamp in seconds

Time extraction

  • EXTRACT(unit FROM temporal): Extracts time component
    SELECT EXTRACT(HOUR FROM eventtimestamp) as event_hour FROM logs SELECT EXTRACT(DAY FROM eventtimestamp) as event_day FROM logs SELECT EXTRACT(YEAR FROM eventtimestamp) as event_year FROM logs
    Available units: MILLENNIUM, CENTURY, DECADE, YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND

Time conversion functions

  • TO_TIMESTAMP_LTZ(numeric, precision): Converts epoch timestamp to TIMESTAMP_LTZ

    SELECT TO_TIMESTAMP_LTZ(JSON_VALUE(attributes, '$.timestamp' RETURNING BIGINT), 3) as event_time FROM logs -- 3 for milliseconds
  • UNIX_TIMESTAMP(timestamp): Converts timestamp to Unix timestamp (seconds)

    SELECT UNIX_TIMESTAMP(eventtimestamp) as unix_time FROM logs
  • TO_TIMESTAMP(string [, format]): Parses string to timestamp

    SELECT TO_TIMESTAMP(JSON_VALUE(attributes, '$.timestamp'), 'yyyy-MM-dd HH:mm:ss') as parsed_time FROM logs

Conditional functions

IF conditional

  • IF(condition, true_value, false_value): Simple conditional
    SELECT IF(severity > 16, 'HIGH', 'NORMAL') as priority FROM logs

CASE expressions

  • CASE WHEN condition THEN result [WHEN condition THEN result] ELSE result END

    SELECT *, CASE WHEN severity <= 4 THEN 'TRACE' WHEN severity <= 8 THEN 'DEBUG' WHEN severity <= 12 THEN 'INFO' WHEN severity <= 16 THEN 'WARN' WHEN severity <= 20 THEN 'ERROR' ELSE 'FATAL' END as severity_level FROM logs
  • CASE value WHEN value1 THEN result1 WHEN value2 THEN result2 ELSE default END

    SELECT *, CASE severity WHEN 1 THEN 'TRACE' WHEN 9 THEN 'INFO' WHEN 17 THEN 'ERROR' ELSE 'OTHER' END as severity_name FROM logs

NULL handling functions

  • COALESCE(value1, value2, ...): Returns first non-NULL value

    SELECT COALESCE(JSON_VALUE(attributes, '$.userId'), 'unknown') as user_id FROM logs
  • NULLIF(value1, value2): Returns NULL if values are equal

    SELECT NULLIF(severity, 0) as non_zero_severity FROM logs

Comparison functions

  • GREATEST(value1, value2, ...): Returns maximum value

    SELECT GREATEST(severity, 10) as min_severity FROM logs
  • LEAST(value1, value2, ...): Returns minimum value

    SELECT LEAST(severity, 20) as max_severity FROM logs

JSON functions

JSON validation

  • string IS JSON [VALUE | SCALAR | ARRAY | OBJECT]: Validates JSON format
    SELECT * FROM logs WHERE attributes IS JSON OBJECT

JSON path functions

  • JSON_EXISTS(json_string, path): Checks if JSON path exists

    SELECT * FROM logs WHERE JSON_EXISTS(attributes, '$.process.thread')
  • JSON_VALUE(json_string, path [RETURNING type] [ON ERROR | ON EMPTY]): Extracts scalar value

    SELECT JSON_VALUE(attributes, '$.userId' RETURNING STRING) as user_id FROM logs SELECT JSON_VALUE(attributes, '$.responseTime' RETURNING INTEGER) as response_time FROM logs SELECT JSON_VALUE(attributes, '$.isSuccess' RETURNING BOOLEAN) as success FROM logs
  • JSON_QUERY(json_string, path [RETURNING type] [ON ERROR | ON EMPTY]): Extracts JSON objects/arrays

    SELECT JSON_QUERY(attributes, '$.process') as process_info FROM logs SELECT JSON_QUERY(attributes, '$.tags[*]') as all_tags FROM logs

JSON construction

  • JSON_OBJECT([KEY key VALUE value] ...): Creates JSON object

    SELECT JSON_OBJECT('severity' VALUE severity, 'timestamp' VALUE eventtimestamp) as event_summary FROM logs
  • JSON_ARRAY([value1, value2, ...]): Creates JSON array

    SELECT JSON_ARRAY(severity, EXTRACT(HOUR FROM TO_TIMESTAMP_LTZ(eventtimestamp, 3)), tags['host'][1]) as event_array FROM logs

Hash functions

  • MD5(string): Returns MD5 hash

    SELECT MD5(CONCAT(id, message)) as content_hash FROM logs
  • SHA1(string): Returns SHA-1 hash

  • SHA256(string): Returns SHA-256 hash

    SELECT SHA256(message) as message_hash FROM logs

Aggregate functions

Basic aggregations

  • COUNT([DISTINCT] expression): Counts rows/distinct values

    SELECT COUNT(*) as total_logs FROM logs SELECT COUNT(DISTINCT tags['service'][1]) as unique_services FROM logs
  • SUM([DISTINCT] expression): Sums numeric values

  • AVG([DISTINCT] expression): Calculates average

  • MIN(expression): Finds minimum value

  • MAX(expression): Finds maximum value

    SELECT tags['service'][1] as service, COUNT(*) as log_count, AVG(severity) as avg_severity FROM logs GROUP BY tags['service'][1]

Statistical functions

  • STDDEV_POP(expression): Population standard deviation
  • STDDEV_SAMP(expression): Sample standard deviation
  • VAR_POP(expression): Population variance
  • VAR_SAMP(expression): Sample variance

Collection functions

  • COLLECT(expression) - Collects values into a multiset
  • LISTAGG(expression [, separator]) - Concatenates values with separator
    SELECT tags['service'][1] as service, LISTAGG(DISTINCT tags['host'][1], ',') as hosts FROM logs GROUP BY tags['service'][1]

Windowing and aggregations

To aggregate data in a streaming setup with Flink SQL, use windowing functions. To learn more, see the Flink documentation .

Window functions (streaming)

For streaming data processing, Flink SQL provides windowing table-valued functions (TVFs). These functions create windows over time-based or count-based intervals, allowing you to perform streaming aggregations over subsets of data.

Important: When using streaming windowed aggregations along with GROUP BY, you must include window_time as a group-by key.

Tumbling windows

Fixed-size, non-overlapping windows:

-- ✅ VALID: Get a breakdown of log count per severity per service in 5-minute windows SELECT window_start as eventtimestamp, -- Window start as event time CONCAT('Window summary: ', service, ' had ', CAST(log_count AS STRING), ' logs with severity ', CAST(severity AS STRING)) as message, -- Descriptive message severity, MAP['service', ARRAY[service], 'window_type', ARRAY['tumbling']] as tags, -- Service and window type tags window_start, -- turns into an attribute window_end, -- turns into an attribute log_count -- turns into an attribute FROM ( SELECT window_start, -- Start time of each 5-minute window window_end, -- End time of each 5-minute window tags['service'][1] as service, -- Group by service name severity, -- and severity COUNT(*) as log_count -- Count total logs in each window FROM TABLE( -- Create 5-minute windows based on event timestamp TUMBLE(TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '5' MINUTES) ) -- Group results by window and service to get aggregates per service per window GROUP BY window_start, window_end, serverity, tags['service'][1] )

Hopping (sliding) windows

Fixed-size windows that can overlap:

-- ✅ VALID: Convert hopping window analysis into system health LogEvents SELECT -- Generate synthetic LogEvent fields for each window health report MD5(CONCAT(CAST(window_start AS STRING), 'system_health')) as id, UNIX_TIMESTAMP(window_end) * 1000 as eventtimestamp, -- Window end as event time UNIX_TIMESTAMP(CURRENT_TIMESTAMP) * 1000 as receivedtimestamp, CONCAT('System health: ', CAST(log_count AS STRING), ' total logs across ', CAST(service_count AS STRING), ' services') as message, -- Set severity based on activity level CASE WHEN log_count < 100 THEN 13 -- WARN: Low activity WHEN log_count > 10000 THEN 13 -- WARN: High activity ELSE 9 -- INFO: Normal activity END as severity, MAP['analysis_type', ARRAY['system_health'], 'window_type', ARRAY['hopping']] as tags, JSON_OBJECT( 'window_start' VALUE CAST(window_start AS STRING), 'window_end' VALUE CAST(window_end AS STRING), 'total_log_count' VALUE log_count, 'active_service_count' VALUE service_count, 'activity_level' VALUE CASE WHEN log_count < 100 THEN 'low' WHEN log_count > 10000 THEN 'high' ELSE 'normal' END, 'analysis_type' VALUE 'hopping_window_health' ) as attributes, 'system' as service, -- Synthetic service for system-wide metrics 'health-monitor' as host -- Synthetic host for health monitoring FROM ( SELECT window_start, -- Start time of each window window_end, -- End time of each window COUNT(*) as log_count, -- Total logs in this window COUNT(DISTINCT tags['service'][1]) as service_count -- Number of unique services in this window FROM TABLE( HOP( TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '1' MINUTE, -- Window slides every 1 minute INTERVAL '5' MINUTES -- Each window covers 5 minutes of data ) -- This creates overlapping 5-minute windows that move forward every minute ) -- Group by window boundaries to get one result per window GROUP BY window_start, window_end ) -- This converts each health summary into a proper LogEvent

Session windows

Group events by activity sessions:

-- ✅ VALID: Convert session analysis into session summary LogEvents SELECT -- Generate synthetic LogEvent fields for each session summary MD5(CONCAT(CAST(window_start AS STRING), tags['service'][1], 'session')) as id, window_end as eventtimestamp, -- Session end as event time UNIX_TIMESTAMP(CURRENT_TIMESTAMP) * 1000 as receivedtimestamp, CONCAT('Session completed for ', tags['service'][1], ': ', CAST(session_events AS STRING), ' events over ', CAST((window_end - window_start) / 60000 AS STRING), ' minutes') as message, -- Set severity based on session length and activity CASE WHEN session_events < 5 THEN 5 -- DEBUG: Short session WHEN session_events > 1000 THEN 13 -- WARN: Very active session ELSE 9 -- INFO: Normal session END as severity, MAP['service', ARRAY[tags['service'][1]], 'session_type', ARRAY['activity_session']] as tags, JSON_OBJECT( 'session_start' VALUE window_start, 'session_end' VALUE window_end, 'session_duration_ms' VALUE (window_end: window_start), 'session_duration_minutes' VALUE ((window_end - window_start) / 60000), 'event_count' VALUE session_events, 'events_per_minute' VALUE ROUND(session_events / ((window_end - window_start) / 60000.0), 2), 'analysis_type' VALUE 'session_summary' ) as attributes, service, -- Required service field 'session-analyzer' as host -- Synthetic host for session analysis FROM ( SELECT window_start, -- When this session started window_end, -- When this session ended tags['service'][1] as service, -- Service name for this session COUNT(*) as session_events -- Number of events in this session FROM TABLE( SESSION( TABLE logs PARTITION BY tags['service'][1], -- Create separate sessions for each service DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '10' MINUTES -- If no events for 10 minutes, end the session ) -- Groups consecutive events into sessions based on time gaps ) -- Group by session boundaries and service GROUP BY window_start, window_end, tags['service'][1] ) -- This converts each session summary into a proper LogEvent

Cumulative windows

Windows that expand incrementally:

SELECT window_start, window_end, service, COUNT(*) as cumulative_count FROM TABLE( CUMULATE( TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '1' HOUR, -- step size INTERVAL '1' DAY -- max window size ) ) GROUP BY window_start, window_end, tags['service'][1]

Analytical Window functions

Ranking functions

  • ROW_NUMBER() OVER (window_spec): Sequential row numbers
  • RANK() OVER (window_spec): Ranking with gaps
  • DENSE_RANK() OVER (window_spec): Ranking without gaps
  • PERCENT_RANK() OVER (window_spec): Percentage rank
SELECT *, -- Include all original fields -- Assign sequential numbers to events within each service (newest first) ROW_NUMBER() OVER (ORDER BY eventtimestamp) as event_sequence FROM logs

GROUP BY operations

Basic grouping

The example below groups messages by service and severity, counting the number of logs in each group for each minute. This is a common operation to summarize log data.

WITH parsed AS ( SELECT tags['service'][1] as service, -- Extract service name severity, -- Extract severity level eventtimestamp -- Use event timestamp for windowing FROM logs ), aggregated AS ( SELECT service, -- Service name severity, -- Severity level COUNT(*) as evtcount, -- Count of logs in each group MIN(eventtimestamp) as first_event, -- Earliest event in group MAX(eventtimestamp) as last_event -- Latest event in group FROM TABLE(TUMBLE(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '1' MINUTE)) GROUP BY window_start, window_end, service, severity -- Group by service and severity ) SELECT last_event as eventtimestamp, -- Use the latest event time as the summary event time CONCAT('Service ', service, ' severity ', CAST(severity AS STRING), ' had ', CAST(evtcount AS STRING), ' events') as message, -- Descriptive summary message severity, -- Use the grouped severity level MAP['service', ARRAY[service], 'analysis_type', ARRAY['group_summary']] as tags, JSON_OBJECT( 'event_count' VALUE evtcount, 'first_event' VALUE first_event, 'last_event' VALUE last_event, 'analysis_type' VALUE 'service_severity_summary' ) as attributes FROM aggregated

HAVING clause

Filters groups after aggregation. The example below creates new log messages when the error count exceeds a threshold. This is an example of the complex alerting rules you can create with SQL.

-- ✅ VALID: Convert error count analysis into alert LogEvents WITH parsed AS ( SELECT tags['service'][1] as service, -- Extract service name severity, -- Extract severity level eventtimestamp -- Use event timestamp for windowing FROM logs ), -- HAVING filters groups, WHERE filters individual rows aggregated AS ( SELECT service, -- Service name COUNT(*) as error_count -- Count of error-level logs FROM TABLE(HOP(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '1' MINUTE, INTERVAL '10' MINUTES)) WHERE severity >= 17 -- First filter: only ERROR level logs (17+) GROUP BY window_start, window_end, service -- Group by service to count errors per service HAVING COUNT(*) > 100 -- Second filter: only show services with 100+ errors ) SELECT CONCAT('HIGH ERROR ALERT: Service ', service, ' has ', CAST(error_count AS STRING), ' error-level events') as message, 21 as severity, -- FATAL level for high error count alerts MAP['service', ARRAY[service], 'alert_type', ARRAY['high_error_count'], 'severity_level', ARRAY['FATAL']] as tags, JSON_OBJECT( 'error_count' VALUE error_count, 'error_threshold' VALUE 100, 'severity_filter' VALUE 'ERROR_LEVEL_17_PLUS', 'alert_reason' VALUE 'error_count_exceeded_threshold', 'analysis_type' VALUE 'error_count_monitoring' ) as attributes FROM aggregated

Advanced grouping

GROUPING SETS

Create multiple aggregation levels. The example below creates multiple reporting hierarchy levels using grouping sets, resulting in varied granularity summarizations.

-- ✅ VALID: Convert multi-level aggregations into hierarchy report LogEvents -- Start by extracting all the items we care about from the LogEvents WITH parsed AS ( SELECT tags['service'][1] as service, -- Extract service name tags['host'][1] as host, -- Extract host name severity, -- Extract severity level eventtimestamp -- Use event timestamp for windowing FROM logs ), -- Execute the aggregation WITH aggregated AS ( SELECT window_start, -- Start of the aggregation window service, -- Service name host, -- Host name severity, -- Severity level COUNT(*) as log_count -- Count of logs in this aggregation level FROM TABLE(TUMBLE(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '10' SECONDS)) GROUP BY window_start, window_end, GROUPING SETS ( (service, host, severity), -- Most detailed: by service + host + severity (service, severity), -- Medium detail: by service + severity (all hosts combined) (service), -- Less detail: by service only (all hosts and severities) () -- Least detail: total count (no grouping) ) ) -- Create nice messages and attributes for each aggregation level SELECT window_start as eventtimestamp, -- window time as eventtimestamp -- Create descriptive message based on aggregation level CASE WHEN service IS NOT NULL AND host IS NOT NULL AND severity IS NOT NULL THEN CONCAT('Detailed: ', service, '@', host, ' severity=', CAST(severity AS STRING), ' count=', CAST(log_count AS STRING)) WHEN service IS NOT NULL AND severity IS NOT NULL THEN CONCAT('Service summary: ', service, ' severity=', CAST(severity AS STRING), ' count=', CAST(log_count AS STRING)) WHEN service IS NOT NULL THEN CONCAT('Service total: ', service, ' count=', CAST(log_count AS STRING)) ELSE CONCAT('Grand total: ', CAST(log_count AS STRING), ' logs') END as message, MAP[ 'service', ARRAY[COALESCE(service, 'ALL')], 'host', ARRAY[COALESCE(host, 'ALL')], 'report_type', ARRAY['hierarchy_summary'], 'aggregation_level', ARRAY[ CASE WHEN service IS NOT NULL AND host IS NOT NULL AND severity IS NOT NULL THEN 'detailed' WHEN service IS NOT NULL AND severity IS NOT NULL THEN 'service_severity' WHEN service IS NOT NULL THEN 'service_total' ELSE 'grand_total' END ] ] as tags, JSON_OBJECT( 'service' VALUE COALESCE(service, 'ALL'), 'host' VALUE COALESCE(host, 'ALL'), 'severity' VALUE COALESCE(severity, -1), 'log_count' VALUE log_count ) as attributes FROM aggregated )

ROLLUP

Generate hierarchical aggregations. Generates all prefixes of the grouping columns. This is a batch example:

SELECT tags['service'][1] as service, tags['host'][1] as host, COUNT(*) as log_count FROM logs GROUP BY ROLLUP (tags['service'][1], tags['host'][1]) -- Generates: (service, host), (service), ()

CUBE

Generate all possible combinations of the grouping columns. This is a batch example:

SELECT tags['service'][1] as service, severity, COUNT(*) as log_count FROM logs GROUP BY CUBE (tags['service'][1], severity) -- Generates: (service, severity), (service), (severity), ()

Event-Level Analytical functions

Event ranking in service

-- ✅ VALID: Add ranking information to individual log events SELECT *, -- Include all LogEvent fields -- Add ranking of events within each service by severity RANK() OVER (PARTITION BY tags['service'][1] ORDER BY severity DESC) as severity_rank_in_service, -- Add row numbers to events within each service (most recent first) ROW_NUMBER() OVER (PARTITION BY tags['service'][1] ORDER BY eventtimestamp DESC) as event_sequence, -- Add the severity of the previous event in this service LAG(severity, 1) OVER (PARTITION BY tags['service'][1] ORDER BY eventtimestamp) as prev_event_severity FROM logs -- This returns individual LogEvents with additional analytical context

Important rules

LogEvent fields

Your SQL query can return any combination of LogEvent fields, but when specified, they must be of the acceptable types:

  • id (STRING): Globally unique identifier for the log event

  • eventtimestamp (BIGINT or TIMESTAMP_LTZ or TIMESTAMP): When BIGINT, milliseconds since Unix epoch when event occurred

  • receivedtimestamp (BIGINT or TIMESTAMP_LTZ or TIMESTAMP): WHEN BIGINT, milliseconds since Unix epoch when Grepr received the event

  • message (STRING): The log message content

  • severity (INT): Log severity level (1-24, following OpenTelemetry convention)

  • tags (MAP<STRING, ARRAY<STRING>>): Key-value pairs for filtering and routing

  • attributes (STRING): JSON string containing structured data

Additional fields: Any extra computed columns are automatically added to the event’s attributes map.

Default values for omitted fields

When core LogEvent fields are omitted from your SELECT clause, the original values from the input events are preserved. However, if you’re creating entirely new events, for example, during windowed aggregations, the following is the default behavior when field values are not specified:

  • id: Generated unique identifier if not provided

  • eventtimestamp: Current timestamp if not provided

  • receivedtimestamp: Current timestamp if not provided

  • message: Empty string if not provided

  • severity: 9 (INFO level) if not provided

  • tags: Empty map if not provided

  • attributes: Empty map if not provided

Note: In most cases, you’ll want to use SELECT * or include specific fields to preserve data from the input events.

Examples:

-- Updates only message, all other fields preserved SELECT *, UPPER(message) as message FROM logs -- Updates message and severity, adds computed field into attributes SELECT *, UPPER(message) as message, severity + 1 as severity, CHAR_LENGTH(message) as msg_length FROM logs

Handling attributes fields

  • Use JSON functions (JSON_VALUE, JSON_QUERY, JSON_EXISTS) to work with attributes.
  • Type conversion is automatic for basic types (string, number, boolean).

Error handling

The SQL operation fails if:

  • The SQL syntax is invalid: Malformed SQL queries are rejected during parsing.

  • Field types are incompatible: Core LogEvent fields must match their expected types when included.

  • Query execution errors: Runtime errors during SQL execution (e.g., division by zero, invalid regex).

  • JSON parsing errors: Invalid JSON operations on the attributes field.

SQL Transform Limitations

Streaming vs batch processing

The SQL Transform operates in both streaming and batch processing. Streaming has the following additional limitations:

  • JOIN operations are limited in the streaming context.

  • Complex aggregations may require careful state management.

Windowing limitations:

  • Window functions require explicit windowing TVFs (TUMBLE, HOP, SESSION, CUMULATE).

  • No unbounded aggregations: Use windowing for aggregations in streaming mode.

State management:

  • Stateful operations can grow unbounded: Your pipeline can fail if the amount of state used for aggregations or joins becomes too large. You can control the Time-To-Live (TTL) of state by setting the stateTtl parameter in the SQL Transform configuration. By default, it’s 2 minutes.

Data ordering

  • Events may arrive out of order: Consider using watermarks for time-based operations.
  • ROW_NUMBER() and ranking functions may not guarantee consistent ordering across restarts.

Performance considerations

Query optimization

  • Use WHERE clauses early to filter data before expensive operations.
  • Avoid SELECT * when possible: explicitly list needed columns.
  • Limit regex operations on high-volume streams: they are CPU-intensive.
  • Use indexes wisely: TO_TIMESTAMP_LTZ(eventtimestamp, 3) operations benefit from time-based partitioning.

Memory and state management

  • Computed fields increase event size: consider impact on downstream processing.
  • Complex aggregations require more memory: reduce TTLs if needed.
  • Window operations buffer data: larger windows require more memory.
  • JSON operations allocate temporary objects: avoid in tight loops.

Throughput optimization

-- ✅ GOOD: Filter early, simple operations SELECT *, -- Select all fields UPPER(message) as upper_message -- Simple string operation (fast) FROM logs WHERE severity >= 17 -- Filter first to reduce data volume -- ⚠️ CAUTION: Expensive regex on all records SELECT *, -- Processes every single log record -- This regex runs on ALL logs, even those without dates (inefficient) REGEXP_EXTRACT(message, '(\\\\d{4}-\\\\d{2}-\\\\d{2})', 1) as date_extracted FROM logs -- ✅ BETTER: Filter then apply expensive operation SELECT *, -- Only processes logs that likely contain dates -- Regex only runs on pre-filtered logs (much more efficient) REGEXP_EXTRACT(message, '(\\\\d{4}-\\\\d{2}-\\\\d{2})', 1) as date_extracted FROM logs WHERE message LIKE '%-%-%' -- Quick pre-filter using simple pattern matching

Resource usage patterns

  • String operations (UPPER, LOWER, CONCAT): Low CPU overhead
  • Regex operations (REGEXP_REPLACE, REGEXP_EXTRACT): High CPU overhead
  • JSON operations (JSON_VALUE, JSON_QUERY): Moderate CPU overhead
  • Mathematical operations: Low CPU overhead
  • Window aggregations: High memory usage during window buffering

Examples from production

Security log processing

SELECT *, CASE WHEN message LIKE '%authentication failed%' THEN 'auth_failure' WHEN message LIKE '%unauthorized access%' THEN 'unauthorized' WHEN message LIKE '%suspicious activity%' THEN 'suspicious' ELSE 'normal' END as security_category, REGEXP_REPLACE(message, 'user=[^\\\\s]+', 'user=REDACTED') as sanitized_message FROM logs WHERE severity >= 17 -- ERROR level and above

View example input and output

Input:

{ "id": "sec1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 17, "message": "authentication failed for user=johndoe from IP 192.168.1.100", "tags": {"service": ["auth"], "env": ["prod"]}, "attributes": { "ip": "192.168.1.100", "attempt": 3 } }

Output:

{ "id": "sec1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 17, "message": "authentication failed for user=johndoe from IP 192.168.1.100", "tags": {"service": ["auth"], "env": ["prod"]}, "attributes": { "ip": "192.168.1.100", "attempt": 3, "security_category": "auth_failure", "sanitized_message": "authentication failed for user=REDACTED from IP 192.168.1.100" } }

Application performance monitoring

SELECT *, CAST(REGEXP_EXTRACT(message, 'response_time=(\\\\d+)ms', 1) AS BIGINT) as response_time_ms, CASE WHEN message LIKE '%response_time=%ms' AND CAST(REGEXP_EXTRACT(message, 'response_time=(\\\\d+)ms', 1) AS BIGINT) > 1000 THEN 'slow' ELSE 'normal' END as performance_category FROM logs WHERE message LIKE '%response_time=%'

View example input and output

Input:

{ "id": "perf1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "API request completed with response_time=1250ms for endpoint /api/users", "tags": {"service": ["api"], "endpoint": ["/api/users"]}, "attributes": { "method": "GET", "status_code": 200, "user_id": "12345" } }

Output:

{ "id": "perf1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "API request completed with response_time=1250ms for endpoint /api/users", "tags": {"service": ["api"], "endpoint": ["/api/users"]}, "attributes": { "method": "GET", "status_code": 200, "user_id": "12345", "response_time_ms": 1250, "performance_category": "slow" } }
Last updated on