Skip to Content

SQL transform examples

Basic usage

Identity query

SELECT * FROM logs

Return all log events unchanged.

Input:

{ "id": "0H19GZK97FTKS", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "State backend is set to heap memory", "tags": { "service": ["grepr-query"], "host": ["ip-10-12-4-129.ec2.internal"] }, "attributes": { "process": { "thread": { "name": "thread-0" } } } }

Output:

{ "id": "0H19GZK97FTKS", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "State backend is set to heap memory", "tags": { "service": ["grepr-query"], "host": ["ip-10-12-4-129.ec2.internal"] }, "attributes": { "process": { "thread": { "name": "thread-0" } } } }

Simple filtering

SELECT * FROM logs WHERE severity < 5

Filter events to include only those with severity less than 5 (TRACE level).

Input:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 1, "message": "Debug trace message", "tags": {"service": ["api"]}, "attributes": {} }, { "id": "evt2", "eventtimestamp": 1724214074063, "receivedtimestamp": 1724214074189, "severity": 9, "message": "Info level message", "tags": {"service": ["api"]}, "attributes": {} } ]

Output:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 1, "message": "Debug trace message", "tags": {"service": ["api"]}, "attributes": {} } ]

Message transformation

SELECT UPPER(message) as message, * FROM logs

Convert all log messages to uppercase.

When transforming existing fields to override the original value, the transformed column must come before the * wildcard in your SELECT clause. For example: SELECT UPPER(message) as message, * FROM logs

Adding computed fields

SELECT *, UPPER(message) as upper_message, CHAR_LENGTH(message) as message_length, severity * 10 as severity_scaled FROM logs

Add three new computed fields to the event attributes.

Input:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Processing request", "tags": {"service": ["api"]}, "attributes": { "userId": "12345" } }

Output:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Processing request", "tags": {"service": ["api"]}, "attributes": { "userId": "12345", "upper_message": "PROCESSING REQUEST", "message_length": 18, "severity_scaled": 90 } }

Multi-statement examples

Basic multi-statement operation

The following example shows how to create a multi-statement operation that filters and transforms data. This operation:

  1. Creates a VIEW that selects logs with severity >= 13.
  2. Produces an OUTPUT that transforms the filtered logs by uppercasing messages and adding a processing status.
{ "type": "sql-operation", "inputs": { "logs": "LOG_EVENT" }, "statements": [ { "type": "sql_view", "tableName": "high_severity_logs", "sqlQuery": "SELECT * FROM logs WHERE severity >= 13" }, { "type": "sql_output", "outputName": "processed_logs", "outputType": "LOG_EVENT", "sqlQuery": "SELECT *, UPPER(message) as message, 'processed' as processing_status FROM high_severity_logs" } ], "watermarkDelay": "PT5S", "globalStateTtl": "PT2M" }

Complex multi-statement pipeline

This operation:

  1. Filters logs that have user IDs and extracts user and request IDs.
  2. Creates a summary of user activity counts.
  3. Enriches original logs with user activity summaries.
  4. Creates synthetic log events summarizing user activity.
{ "type": "sql-operation", "inputs": { "logs": "LOG_EVENT" }, "statements": [ { "type": "sql_view", "tableName": "parsed_logs", "sqlQuery": "SELECT *, JSON_VALUE(attributes, '$.userId') as user_id, JSON_VALUE(attributes, '$.requestId') as request_id FROM logs WHERE JSON_EXISTS(attributes, '$.userId')" }, { "type": "sql_view", "tableName": "user_activity_summary", "sqlQuery": "SELECT user_id, COUNT(*) as activity_count, MAX(eventtimestamp) as last_activity FROM parsed_logs GROUP BY user_id" }, { "type": "sql_output", "outputName": "enriched_logs", "outputType": "LOG_EVENT", "sqlQuery": "SELECT p.*, s.activity_count, s.last_activity FROM parsed_logs p LEFT JOIN user_activity_summary s ON p.user_id = s.user_id" }, { "type": "sql_output", "outputName": "user_summaries", "outputType": "LOG_EVENT", "sqlQuery": "SELECT user_id as id, last_activity as eventtimestamp, CURRENT_TIMESTAMP as receivedtimestamp, CONCAT('User ', user_id, ' had ', CAST(activity_count AS STRING), ' activities') as message, 9 as severity, MAP['summary_type', ARRAY['user_activity']] as tags, JSON_OBJECT('activity_count' VALUE activity_count) as attributes FROM user_activity_summary" } ] }

Advanced examples

Content-based filtering and enrichment

SELECT *, -- Select all original fields -- Add a category based on message content CASE WHEN message LIKE '%ERROR%' THEN 'error_detected' -- If message contains "ERROR" WHEN message LIKE '%WARN%' THEN 'warning_detected' -- If message contains "WARN" ELSE 'normal' -- For all other messages END as log_category, -- Replace any IP-like address with placeholder text REGEXP_REPLACE(message, '\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}', '<redacted>') as sanitized_message FROM logs WHERE severity >= 13 -- Only process WARN level and above (13-24)

Input:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 17, "message": "ERROR: Database connection to IP 192.1.23.32 failed", "tags": {"service": ["database"]}, "attributes": {} }, { "id": "evt2", "eventtimestamp": 1724214074063, "receivedtimestamp": 1724214074189, "severity": 9, "message": "INFO: Processing complete", "tags": {"service": ["api"]}, "attributes": {} } ]

Output:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 17, "message": "ERROR: Database connection to IP 192.1.23.32 failed", "tags": {"service": ["database"]}, "attributes": { "log_category": "error_detected", "sanitized_message": "ERROR: Database connection to ip redacted failed", } } ]

JSON attribute processing

SELECT *, -- Select all original fields -- Extract thread name from nested JSON structure JSON_VALUE(attributes, '$.process.thread.name') as thread_name, -- Extract status field from JSON attributes JSON_VALUE(attributes, '$.status') as status_code FROM logs -- Only process logs that have a 'process' field in their JSON attributes WHERE JSON_EXISTS(attributes, '$.process')

Input:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Request processed", "tags": {"service": ["api"]}, "attributes": { "process": { "thread": { "name": "worker-1" } }, "status": "success" }, { "id": "evt2", "eventtimestamp": 1724214074063, "receivedtimestamp": 1724214074189, "severity": 9, "message": "Simple log message", "tags": {"service": ["web"]}, "attributes": { "simple": true } } ]

Output:

[ { "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Request processed", "tags": {"service": ["api"]}, "attributes": { "process": {"thread": {"name": "worker-1"}}, "status": "success", "thread_name": "worker-1", "status_code": "success" } } ]

Time-Based Analysis

SELECT *, -- Select all original fields -- Extract the hour (0-23) from the event timestamp EXTRACT(HOUR FROM eventtimestamp) as event_hour, -- Extract the day of the month (1-31) from the event timestamp EXTRACT(DAY FROM eventtimestamp) as event_day FROM logs

Input:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Processing request", "tags": {"service": ["api"]}, "attributes": {} }

Output:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "Processing request", "tags": {"service": ["api"]}, "attributes": { "event_hour": 4, "event_day": 21 } }

Multi-field transformations

SELECT -- Explicitly select these core fields id, eventtimestamp, receivedtimestamp, tags, attributes, -- Transform the message by adding a suffix CONCAT(message, ' [PROCESSED]') as message, -- Increase severity by 1 (e.g., INFO becomes WARN) severity + 1 as severity, -- Store the original message in uppercase as a computed field UPPER(message) as original_uppercase, -- Store the original severity scaled by 100 as a computed field severity * 100 as original_severity_scaled FROM logs

Input:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "user authenticated", "tags": {"service": ["auth"]}, "attributes": {} }

Output:

{ "id": "evt1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 10, "message": "user authenticated [PROCESSED]", "tags": {"service": ["auth"]}, "attributes": { "original_uppercase": "USER AUTHENTICATED", "original_severity_scaled": 900 } }

Examples from production

Security log processing

SELECT *, CASE WHEN message LIKE '%authentication failed%' THEN 'auth_failure' WHEN message LIKE '%unauthorized access%' THEN 'unauthorized' WHEN message LIKE '%suspicious activity%' THEN 'suspicious' ELSE 'normal' END as security_category, REGEXP_REPLACE(message, 'user=[^\\\\s]+', 'user=REDACTED') as sanitized_message FROM logs WHERE severity >= 17 -- ERROR level and above

Input:

{ "id": "sec1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 17, "message": "authentication failed for user=johndoe from IP 192.168.1.100", "tags": {"service": ["auth"], "env": ["prod"]}, "attributes": { "ip": "192.168.1.100", "attempt": 3 } }

Output:

{ "id": "sec1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 17, "message": "authentication failed for user=johndoe from IP 192.168.1.100", "tags": {"service": ["auth"], "env": ["prod"]}, "attributes": { "ip": "192.168.1.100", "attempt": 3, "security_category": "auth_failure", "sanitized_message": "authentication failed for user=REDACTED from IP 192.168.1.100" } }

Application performance monitoring

SELECT *, CAST(REGEXP_EXTRACT(message, 'response_time=(\\\\d+)ms', 1) AS BIGINT) as response_time_ms, CASE WHEN message LIKE '%response_time=%ms' AND CAST(REGEXP_EXTRACT(message, 'response_time=(\\\\d+)ms', 1) AS BIGINT) > 1000 THEN 'slow' ELSE 'normal' END as performance_category FROM logs WHERE message LIKE '%response_time=%'

Input:

{ "id": "perf1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "API request completed with response_time=1250ms for endpoint /api/users", "tags": {"service": ["api"], "endpoint": ["/api/users"]}, "attributes": { "method": "GET", "status_code": 200, "user_id": "12345" } }

Output:

{ "id": "perf1", "eventtimestamp": 1724214074062, "receivedtimestamp": 1724214074188, "severity": 9, "message": "API request completed with response_time=1250ms for endpoint /api/users", "tags": {"service": ["api"], "endpoint": ["/api/users"]}, "attributes": { "method": "GET", "status_code": 200, "user_id": "12345", "response_time_ms": 1250, "performance_category": "slow" } }
Last updated on