SQL transform examples
Basic usage
Identity query
SELECT * FROM logsReturn all log events unchanged.
Input:
{
"id": "0H19GZK97FTKS",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "State backend is set to heap memory",
"tags": {
"service": ["grepr-query"],
"host": ["ip-10-12-4-129.ec2.internal"]
},
"attributes": {
"process": {
"thread": {
"name": "thread-0"
}
}
}
}Output:
{
"id": "0H19GZK97FTKS",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "State backend is set to heap memory",
"tags": {
"service": ["grepr-query"],
"host": ["ip-10-12-4-129.ec2.internal"]
},
"attributes": {
"process": {
"thread": {
"name": "thread-0"
}
}
}
}Simple filtering
SELECT * FROM logs WHERE severity < 5Filter events to include only those with severity less than 5 (TRACE level).
Input:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 1,
"message": "Debug trace message",
"tags": {"service": ["api"]},
"attributes": {}
},
{
"id": "evt2",
"eventtimestamp": 1724214074063,
"receivedtimestamp": 1724214074189,
"severity": 9,
"message": "Info level message",
"tags": {"service": ["api"]},
"attributes": {}
}
]Output:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 1,
"message": "Debug trace message",
"tags": {"service": ["api"]},
"attributes": {}
}
]Message transformation
SELECT UPPER(message) as message, * FROM logsConvert all log messages to uppercase.
When transforming existing fields to override the original value, the transformed column must come before the * wildcard in your SELECT clause. For example: SELECT UPPER(message) as message, * FROM logs
Adding computed fields
SELECT *,
UPPER(message) as upper_message,
CHAR_LENGTH(message) as message_length,
severity * 10 as severity_scaled
FROM logsAdd three new computed fields to the event attributes.
Input:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Processing request",
"tags": {"service": ["api"]},
"attributes": {
"userId": "12345"
}
}Output:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Processing request",
"tags": {"service": ["api"]},
"attributes": {
"userId": "12345",
"upper_message": "PROCESSING REQUEST",
"message_length": 18,
"severity_scaled": 90
}
}Multi-statement examples
Basic multi-statement operation
The following example shows how to create a multi-statement operation that filters and transforms data. This operation:
- Creates a
VIEWthat selects logs with severity >= 13. - Produces an
OUTPUTthat transforms the filtered logs by uppercasing messages and adding a processing status.
{
"type": "sql-operation",
"inputs": {
"logs": "LOG_EVENT"
},
"statements": [
{
"type": "sql_view",
"tableName": "high_severity_logs",
"sqlQuery": "SELECT * FROM logs WHERE severity >= 13"
},
{
"type": "sql_output",
"outputName": "processed_logs",
"outputType": "LOG_EVENT",
"sqlQuery": "SELECT *, UPPER(message) as message, 'processed' as processing_status FROM high_severity_logs"
}
],
"watermarkDelay": "PT5S",
"globalStateTtl": "PT2M"
}Complex multi-statement pipeline
This operation:
- Filters logs that have user IDs and extracts user and request IDs.
- Creates a summary of user activity counts.
- Enriches original logs with user activity summaries.
- Creates synthetic log events summarizing user activity.
{
"type": "sql-operation",
"inputs": {
"logs": "LOG_EVENT"
},
"statements": [
{
"type": "sql_view",
"tableName": "parsed_logs",
"sqlQuery": "SELECT *, JSON_VALUE(attributes, '$.userId') as user_id, JSON_VALUE(attributes, '$.requestId') as request_id FROM logs WHERE JSON_EXISTS(attributes, '$.userId')"
},
{
"type": "sql_view",
"tableName": "user_activity_summary",
"sqlQuery": "SELECT user_id, COUNT(*) as activity_count, MAX(eventtimestamp) as last_activity FROM parsed_logs GROUP BY user_id"
},
{
"type": "sql_output",
"outputName": "enriched_logs",
"outputType": "LOG_EVENT",
"sqlQuery": "SELECT p.*, s.activity_count, s.last_activity FROM parsed_logs p LEFT JOIN user_activity_summary s ON p.user_id = s.user_id"
},
{
"type": "sql_output",
"outputName": "user_summaries",
"outputType": "LOG_EVENT",
"sqlQuery": "SELECT user_id as id, last_activity as eventtimestamp, CURRENT_TIMESTAMP as receivedtimestamp, CONCAT('User ', user_id, ' had ', CAST(activity_count AS STRING), ' activities') as message, 9 as severity, MAP['summary_type', ARRAY['user_activity']] as tags, JSON_OBJECT('activity_count' VALUE activity_count) as attributes FROM user_activity_summary"
}
]
}Advanced examples
Content-based filtering and enrichment
SELECT *, -- Select all original fields
-- Add a category based on message content
CASE
WHEN message LIKE '%ERROR%' THEN 'error_detected' -- If message contains "ERROR"
WHEN message LIKE '%WARN%' THEN 'warning_detected' -- If message contains "WARN"
ELSE 'normal' -- For all other messages
END as log_category,
-- Replace any IP-like address with placeholder text
REGEXP_REPLACE(message, '\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}', '<redacted>') as sanitized_message
FROM logs
WHERE severity >= 13 -- Only process WARN level and above (13-24)Input:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 17,
"message": "ERROR: Database connection to IP 192.1.23.32 failed",
"tags": {"service": ["database"]},
"attributes": {}
},
{
"id": "evt2",
"eventtimestamp": 1724214074063,
"receivedtimestamp": 1724214074189,
"severity": 9,
"message": "INFO: Processing complete",
"tags": {"service": ["api"]},
"attributes": {}
}
]Output:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 17,
"message": "ERROR: Database connection to IP 192.1.23.32 failed",
"tags": {"service": ["database"]},
"attributes": {
"log_category": "error_detected",
"sanitized_message": "ERROR: Database connection to ip redacted failed",
}
}
]JSON attribute processing
SELECT *, -- Select all original fields
-- Extract thread name from nested JSON structure
JSON_VALUE(attributes, '$.process.thread.name') as thread_name,
-- Extract status field from JSON attributes
JSON_VALUE(attributes, '$.status') as status_code
FROM logs
-- Only process logs that have a 'process' field in their JSON attributes
WHERE JSON_EXISTS(attributes, '$.process')Input:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Request processed",
"tags": {"service": ["api"]},
"attributes": {
"process": {
"thread": {
"name": "worker-1"
}
},
"status": "success"
},
{
"id": "evt2",
"eventtimestamp": 1724214074063,
"receivedtimestamp": 1724214074189,
"severity": 9,
"message": "Simple log message",
"tags": {"service": ["web"]},
"attributes": {
"simple": true
}
}
]Output:
[
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Request processed",
"tags": {"service": ["api"]},
"attributes": {
"process": {"thread": {"name": "worker-1"}},
"status": "success",
"thread_name": "worker-1",
"status_code": "success"
}
}
]Time-Based Analysis
SELECT *, -- Select all original fields
-- Extract the hour (0-23) from the event timestamp
EXTRACT(HOUR FROM eventtimestamp) as event_hour,
-- Extract the day of the month (1-31) from the event timestamp
EXTRACT(DAY FROM eventtimestamp) as event_day
FROM logsInput:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Processing request",
"tags": {"service": ["api"]},
"attributes": {}
}Output:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "Processing request",
"tags": {"service": ["api"]},
"attributes": {
"event_hour": 4,
"event_day": 21
}
}Multi-field transformations
SELECT
-- Explicitly select these core fields
id,
eventtimestamp,
receivedtimestamp,
tags,
attributes,
-- Transform the message by adding a suffix
CONCAT(message, ' [PROCESSED]') as message,
-- Increase severity by 1 (e.g., INFO becomes WARN)
severity + 1 as severity,
-- Store the original message in uppercase as a computed field
UPPER(message) as original_uppercase,
-- Store the original severity scaled by 100 as a computed field
severity * 100 as original_severity_scaled
FROM logsInput:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "user authenticated",
"tags": {"service": ["auth"]},
"attributes": {}
}Output:
{
"id": "evt1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 10,
"message": "user authenticated [PROCESSED]",
"tags": {"service": ["auth"]},
"attributes": {
"original_uppercase": "USER AUTHENTICATED",
"original_severity_scaled": 900
}
}Examples from production
Security log processing
SELECT *,
CASE
WHEN message LIKE '%authentication failed%' THEN 'auth_failure'
WHEN message LIKE '%unauthorized access%' THEN 'unauthorized'
WHEN message LIKE '%suspicious activity%' THEN 'suspicious'
ELSE 'normal'
END as security_category,
REGEXP_REPLACE(message, 'user=[^\\\\s]+', 'user=REDACTED') as sanitized_message
FROM logs
WHERE severity >= 17 -- ERROR level and aboveInput:
{
"id": "sec1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 17,
"message": "authentication failed for user=johndoe from IP 192.168.1.100",
"tags": {"service": ["auth"], "env": ["prod"]},
"attributes": {
"ip": "192.168.1.100",
"attempt": 3
}
}Output:
{
"id": "sec1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 17,
"message": "authentication failed for user=johndoe from IP 192.168.1.100",
"tags": {"service": ["auth"], "env": ["prod"]},
"attributes": {
"ip": "192.168.1.100",
"attempt": 3,
"security_category": "auth_failure",
"sanitized_message": "authentication failed for user=REDACTED from IP 192.168.1.100"
}
}Application performance monitoring
SELECT *,
CAST(REGEXP_EXTRACT(message, 'response_time=(\\\\d+)ms', 1) AS BIGINT) as response_time_ms,
CASE
WHEN message LIKE '%response_time=%ms' AND CAST(REGEXP_EXTRACT(message, 'response_time=(\\\\d+)ms', 1) AS BIGINT) > 1000 THEN 'slow'
ELSE 'normal'
END as performance_category
FROM logs
WHERE message LIKE '%response_time=%'Input:
{
"id": "perf1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "API request completed with response_time=1250ms for endpoint /api/users",
"tags": {"service": ["api"], "endpoint": ["/api/users"]},
"attributes": {
"method": "GET",
"status_code": 200,
"user_id": "12345"
}
}Output:
{
"id": "perf1",
"eventtimestamp": 1724214074062,
"receivedtimestamp": 1724214074188,
"severity": 9,
"message": "API request completed with response_time=1250ms for endpoint /api/users",
"tags": {"service": ["api"], "endpoint": ["/api/users"]},
"attributes": {
"method": "GET",
"status_code": 200,
"user_id": "12345",
"response_time_ms": 1250,
"performance_category": "slow"
}
}Last updated on