SQL Transform

SQL Transform

The SQL Transform operator allows you to transform log events using SQL queries. This operator provides a powerful and flexible way to process, filter, and enrich log data using the familiar SQL syntax.

The SQL Transform uses ANSI SQL dialect with the full capabilities of Apache Flink SQL 1.19, providing access to a comprehensive set of built-in functions and operators for log processing. This document describes a subset of the available capabilities. For full details, please see the Flink SQL documentation (opens in a new tab).

(Jump to API spec)

Overview

The SQL Transform operator:

  • Processes LogEvent objects through SQL queries
  • Uses the input table alias logs to reference incoming log data
  • Automatically preserves omitted LogEvent fields from input
  • Adds computed columns to the event attributes
  • Supports complex transformations, filtering, and enrichment

Input Table Schema

When using SQL Transform, your input log events are available through a table called logs with the following schema:

ColumnTypeDescription
idSTRINGGlobally unique identifier for the log event
eventtimestampTIMESTAMP_LTZ(3)Milliseconds since the Unix epoch when the event occurred (parsed from the event)
receivedtimestampTIMESTAMP_LTZ(3)Milliseconds since the Unix epoch when Grepr received the event
messageSTRINGThe log message content
severityINTLog severity level (1-24, following OpenTelemetry convention)
tagsMAP<STRING, ARRAY<STRING>>Key-value pairs for filtering and routing
attributesSTRINGJSON string containing structured data associated with the event

Important Notes:

  • To access service or host information, query the tags map: tags['service'] or tags['host']
  • When transforming fields like message or severity, the transformed column must appear before the * wildcard in your SELECT clause to take precedence

Basic Usage

Identity Query

SELECT * FROM logs

Returns all log events unchanged.

View Example Input/Output

Input:

{
  "id": "0H19GZK97FTKS",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 9,
  "message": "State backend is set to heap memory",
  "tags": {
    "service": ["grepr-query"],
    "host": ["ip-10-12-4-129.ec2.internal"]
  },
  "attributes": {
    "process": {
      "thread": {
        "name": "thread-0"
      }
    }
  }
}

Output:

{
  "id": "0H19GZK97FTKS",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 9,
  "message": "State backend is set to heap memory",
  "tags": {
    "service": ["grepr-query"],
    "host": ["ip-10-12-4-129.ec2.internal"]
  },
  "attributes": {
    "process": {
      "thread": {
        "name": "thread-0"
      }
    }
  }
}

Simple Filtering

SELECT * FROM logs WHERE severity < 5

Filters events to only include those with severity less than 5 (TRACE level).

View Example Input/Output

Input:

[
  {
    "id": "evt1",
    "eventtimestamp": 1724214074062,
    "receivedtimestamp": 1724214074188,
    "severity": 1,
    "message": "Debug trace message",
    "tags": {"service": ["api"]},
    "attributes": {}
  },
  {
    "id": "evt2",
    "eventtimestamp": 1724214074063,
    "receivedtimestamp": 1724214074189,
    "severity": 9,
    "message": "Info level message",
    "tags": {"service": ["api"]},
    "attributes": {}
  }
]

Output:

[
  {
    "id": "evt1",
    "eventtimestamp": 1724214074062,
    "receivedtimestamp": 1724214074188,
    "severity": 1,
    "message": "Debug trace message",
    "tags": {"service": ["api"]},
    "attributes": {}
  }
]

Note: The INFO level message (severity 9) was filtered out.

Message Transformation

SELECT UPPER(message) as message, * FROM logs

Converts all log messages to uppercase.

Important: When transforming existing fields, the transformed column must come before the * wildcard to override the original value.

Adding Computed Fields

SELECT *,
  UPPER(message) as upper_message,
  CHAR_LENGTH(message) as message_length,
  severity * 10 as severity_scaled
FROM logs

Adds three new computed fields to the event attributes.

View Example Input/Output

Input:

{
  "id": "evt1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 9,
  "message": "Processing request",
  "tags": {"service": ["api"]},
  "attributes": {
    "userId": "12345"
  }
}

Output:

{
  "id": "evt1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 9,
  "message": "Processing request",
  "tags": {"service": ["api"]},
  "attributes": {
    "userId": "12345",
    "upper_message": "PROCESSING REQUEST",
    "message_length": 18,
    "severity_scaled": 90
  }
}

Note: Computed fields are added to the attributes object.

Custom LogEvent Construction

Grepr automatically adds default values for fields that are not specified. The defaults are:

  • id: when not specified or NULL, defaults to a new random UUID.
  • eventtimestamp: when not specified or NULL, defaults to the current timestamp in milliseconds.
  • receivedtimestamp: when not specified or NULL, defaults to the current timestamp in milliseconds.
  • severity: when not specified or NULL, defaults to 9 (INFO level).
  • message: when not specified or NULL, defaults to an empty string.
  • tags: when not specified or NULL, defaults to an empty map.
  • attributes: when not specified or NULL, defaults to an empty JSON object.
-- Explicitly construct a new LogEvent with specific fields.
SELECT 
  UPPER(message) as message,
  CASE WHEN severity > 16 THEN 21 ELSE severity END as severity,
  MAP['transformed', ARRAY['true']] as tags,
  JSON_OBJECT('original_message' VALUE message) as attributes
FROM logs
View Example Input/Output

Input:

{
  "id": "evt1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 17,
  "message": "user login successful",
  "tags": {"service": ["auth"]},
  "attributes": {}
}

Output:

{
  "id": "12345678-1234-5678-1234-567812345678", // New UUID generated
  "eventtimestamp": 174000000000,               // Current timestamp in milliseconds
  "receivedtimestamp": 174000000002,            // Current timestamp in milliseconds
  "severity": 21,                               // Adjusted severity
  "message": "USER LOGIN SUCCESSFUL",           // Transformed message
  "tags": {"transformed": ["true"]},            // New tags
  "attributes": {                               // New attributes
    "original_message": "user login successful"
  }
}

Advanced Examples

Content-Based Filtering and Enrichment

SELECT *,  -- Select all original fields
  -- Add a category based on message content
  CASE 
    WHEN message LIKE '%ERROR%' THEN 'error_detected'  -- If message contains "ERROR"
    WHEN message LIKE '%WARN%' THEN 'warning_detected'  -- If message contains "WARN"
    ELSE 'normal'  -- For all other messages
  END as log_category,
  -- Replace any IP-like address with placeholder text
  REGEXP_REPLACE(message, '\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', '<redacted>') as sanitized_message
FROM logs
WHERE severity >= 13  -- Only process WARN level and above (13-24)
View Example Input/Output

Input:

[
  {
    "id": "evt1",
    "eventtimestamp": 1724214074062,
    "receivedtimestamp": 1724214074188,
    "severity": 17,
    "message": "ERROR: Database connection to ip 192.1.23.32 failed",
    "tags": {"service": ["database"]},
    "attributes": {}
  },
  {
    "id": "evt2",
    "eventtimestamp": 1724214074063,
    "receivedtimestamp": 1724214074189,
    "severity": 9,
    "message": "INFO: Processing complete",
    "tags": {"service": ["api"]},
    "attributes": {}
  }
]

Output:

[
  {
    "id": "evt1",
    "eventtimestamp": 1724214074062,
    "receivedtimestamp": 1724214074188,
    "severity": 17,
    "message": "ERROR: Database connection to ip 192.1.23.32 failed",
    "tags": {"service": ["database"]},
    "attributes": {
      "log_category": "error_detected",
      "sanitized_message": "ERROR: Database connection to ip redacted failed",
    }
  }
]

Note: INFO level message was filtered out due to severity < 13.

JSON Attribute Processing

SELECT *,  -- Select all original fields
  -- Extract thread name from nested JSON structure
  JSON_VALUE(attributes, '$.process.thread.name') as thread_name,
  -- Extract status field from JSON attributes
  JSON_VALUE(attributes, '$.status') as status_code
FROM logs
-- Only process logs that have a 'process' field in their JSON attributes
WHERE JSON_EXISTS(attributes, '$.process')
View Example Input/Output

Input:

[
  {
    "id": "evt1",
    "eventtimestamp": 1724214074062,
    "receivedtimestamp": 1724214074188,
    "severity": 9,
    "message": "Request processed",
    "tags": {"service": ["api"]},
    "attributes": {
    "process": {
      "thread": {
        "name": "worker-1"
      }
    },
    "status": "success"
  },
  {
    "id": "evt2",
    "eventtimestamp": 1724214074063,
    "receivedtimestamp": 1724214074189,
    "severity": 9,
    "message": "Simple log message",
    "tags": {"service": ["web"]},
    "attributes": {
      "simple": true
    }
  }
]

Output:

[
  {
    "id": "evt1",
    "eventtimestamp": 1724214074062,
    "receivedtimestamp": 1724214074188,
    "severity": 9,
    "message": "Request processed",
    "tags": {"service": ["api"]},
    "attributes": {
      "process": {"thread": {"name": "worker-1"}},
      "status": "success",
      "thread_name": "worker-1",
      "status_code": "success"
    }
  }
]

Note: Event without 'process' field was filtered out.

Time-Based Analysis

SELECT *,  -- Select all original fields
  -- Extract the hour (0-23) from the event timestamp
  EXTRACT(HOUR FROM eventtimestamp) as event_hour,
  -- Extract the day of the month (1-31) from the event timestamp  
  EXTRACT(DAY FROM eventtimestamp) as event_day
FROM logs
View Example Input/Output

Input:

{
  "id": "evt1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 9,
  "message": "Processing request",
  "tags": {"service": ["api"]},
  "attributes": {}
}

Output:

{
  "id": "evt1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 9,
  "message": "Processing request",
  "tags": {"service": ["api"]},
  "attributes": {
    "event_hour": 4,
    "event_day": 21
  }
}

Note: Assumes event occurred on August 21, 2024 at 04:21:14 UTC.

Function Compatibility

The SQL Transform uses Apache Flink SQL 1.19 with ANSI SQL dialect. Here are important function compatibility notes:

Timestamp Functions

  • Use: TO_TIMESTAMP_LTZ(eventtimestamp, 3) to convert millisecond timestamps to TIMESTAMP
  • Don't use: TIMESTAMP_MILLIS() (not available in Flink SQL 1.19 - use TO_TIMESTAMP_LTZ() instead)

String Functions

  • Available: REGEXP_REPLACE(), UPPER(), LOWER(), CONCAT(), CHAR_LENGTH(), LEFT(), RIGHT(), etc.
  • Pattern: Most standard SQL string functions are supported
  • Note: SPLIT_INDEX() is not available in Flink SQL 1.19

Date/Time Functions

  • Available: EXTRACT(), CURRENT_TIMESTAMP, DATE_FORMAT(), etc.
  • Note: Always convert millisecond timestamps first using TO_TIMESTAMP_LTZ()

JSON Functions

  • Available: JSON_VALUE(), JSON_EXISTS() for extracting data from JSON strings
  • Note: The attributes field is a JSON string that can be processed with these functions

For a complete list of available functions, see the Flink SQL documentation (opens in a new tab).

Multi-Field Transformations

SELECT 
  -- Explicitly select these core fields
  id,
  eventtimestamp,
  receivedtimestamp,
  tags,
  attributes,
  -- Transform the message by adding a suffix
  CONCAT(message, ' [PROCESSED]') as message,
  -- Increase severity by 1 (e.g., INFO becomes WARN)
  severity + 1 as severity,
  -- Store the original message in uppercase as a computed field
  UPPER(message) as original_uppercase,
  -- Store the original severity scaled by 100 as a computed field
  severity * 100 as original_severity_scaled
FROM logs
View Example Input/Output

Input:

{
  "id": "evt1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 9,
  "message": "user authenticated",
  "tags": {"service": ["auth"]},
  "attributes": {}
}

Output:

{
  "id": "evt1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 10,
  "message": "user authenticated [PROCESSED]",
  "tags": {"service": ["auth"]},
  "attributes": {
    "original_uppercase": "USER AUTHENTICATED",
    "original_severity_scaled": 900
  }
}

Note: Core fields (message, severity) are modified, computed fields added to attributes.

SQL Operators

The SQL Transform supports all standard SQL operators available in Flink SQL 1.19:

Comparison Operators

  • = - Equal to
  • <> or != - Not equal to
  • < - Less than
  • <= - Less than or equal to
  • > - Greater than
  • >= - Greater than or equal to
  • BETWEEN value1 AND value2 - Range comparison
  • NOT BETWEEN value1 AND value2 - Negated range comparison

Examples:

SELECT * FROM logs WHERE severity = 17
SELECT * FROM logs WHERE severity BETWEEN 13 AND 20
SELECT * FROM logs WHERE eventtimestamp > 1724214074000

Logical Operators

  • AND - Logical AND
  • OR - Logical OR
  • NOT - Logical NOT

Examples:

SELECT * FROM logs WHERE severity > 16 AND message LIKE '%error%'
SELECT * FROM logs WHERE severity = 17 OR severity = 21
SELECT * FROM logs WHERE NOT (severity < 5)

Pattern Matching Operators

  • LIKE 'pattern' - SQL pattern matching with % (any chars) and _ (single char)
  • NOT LIKE 'pattern' - Negated pattern matching
  • SIMILAR TO 'regex' - Regular expression matching
  • NOT SIMILAR TO 'regex' - Negated regex matching

Examples:

SELECT * FROM logs WHERE message LIKE '%failed%'
SELECT * FROM logs WHERE message LIKE 'Error: ___'  -- exactly 3 chars after "Error: "
SELECT * FROM logs WHERE message SIMILAR TO '^[A-Z]+:'  -- starts with uppercase letters and colon

NULL Handling Operators

  • IS NULL - Check for NULL values
  • IS NOT NULL - Check for non-NULL values

Examples:

SELECT * FROM logs WHERE attributes IS NOT NULL
SELECT * FROM logs WHERE JSON_VALUE(attributes, '$.userId') IS NULL

Arithmetic Operators

  • + - Addition
  • - - Subtraction
  • * - Multiplication
  • / - Division
  • % or MOD() - Modulo

Examples:

SELECT *, severity * 10 as severity_scaled FROM logs

Available SQL Functions

The SQL Transform supports the comprehensive function library of Flink SQL. Below is a sample of functions. Please refer to the Flink docs for a full listing and additional details:

String Functions

Case Manipulation

  • UPPER(string) - Converts string to uppercase

    SELECT UPPER(message) as upper_message FROM logs
    -- Input: "Error occurred" → Output: "ERROR OCCURRED"
  • LOWER(string) - Converts string to lowercase

    SELECT LOWER(message) as lower_message FROM logs
    -- Input: "Error OCCURRED" → Output: "error occurred"
  • INITCAP(string) - Capitalizes first letter of each word

    SELECT INITCAP(message) as title_case FROM logs
    -- Input: "user login failed" → Output: "User Login Failed"

String Extraction and Manipulation

  • SUBSTR(string, start [, length]) - Extracts substring

    SELECT SUBSTR(message, 1, 10) as msg_prefix FROM logs
    SELECT SUBSTR(message, 5) as msg_suffix FROM logs  -- from position 5 to end
  • CONCAT(string1, string2, ...) - Concatenates strings

    SELECT CONCAT('[', severity, '] ', message) as formatted_message FROM logs
  • CONCAT_WS(separator, string1, string2, ...) - Concatenates with separator

    SELECT CONCAT_WS(' | ', tags['host'][1], tags['service'][1], message) as combined FROM logs
  • TRIM([BOTH | LEADING | TRAILING] [characters] FROM string) - Removes specified characters

    SELECT TRIM(message) as clean_message FROM logs  -- removes whitespace
    SELECT TRIM(LEADING '0' FROM id) as clean_id FROM logs  -- removes leading zeros
  • REPLACE(string, search, replacement) - Replaces all occurrences

    SELECT REPLACE(message, 'ERROR', 'ALERT') as modified_message FROM logs
  • REGEXP_REPLACE(string, pattern, replacement) - Regex-based replacement

    SELECT REGEXP_REPLACE(message, '\\d{4}-\\d{2}-\\d{2}', 'YYYY-MM-DD') as sanitized FROM logs
    SELECT REGEXP_REPLACE(message, 'user=[^\\s]+', 'user=REDACTED') as redacted FROM logs

String Information Functions

  • CHAR_LENGTH(string) or LENGTH(string) - Returns character count

    SELECT CHAR_LENGTH(message) as msg_length FROM logs
    SELECT LENGTH(message) as msg_length FROM logs  -- Alternative syntax
  • POSITION(substring IN string) - Finds substring position (1-based)

    SELECT POSITION('error' IN LOWER(message)) as error_pos FROM logs
  • ASCII(string) - Returns ASCII value of first character

    SELECT ASCII(LEFT(message, 1)) as first_char_code FROM logs

Advanced String Functions

  • LPAD(string, length, pad_string) - Left-pads string

    SELECT LPAD(CAST(severity AS STRING), 3, '0') as padded_severity FROM logs
    -- Input: "9" → Output: "009"
  • RPAD(string, length, pad_string) - Right-pads string

    SELECT RPAD(tags['service'][1], 10, '-') as padded_service FROM logs
  • LEFT(string, length) - Returns leftmost characters

    SELECT LEFT(message, 10) as msg_prefix FROM logs
  • RIGHT(string, length) - Returns rightmost characters

    SELECT RIGHT(message, 10) as msg_suffix FROM logs
  • REGEXP_EXTRACT(string, pattern [, group]) - Extracts regex match

    SELECT REGEXP_EXTRACT(message, 'response_time=(\\d+)ms', 1) as response_time FROM logs
    SELECT REGEXP_EXTRACT(message, 'user=([^\\s]+)', 1) as username FROM logs

Numeric Functions

Basic Mathematical Functions

  • ABS(numeric) - Returns absolute value

    SELECT ABS(severity - 12) as severity_diff FROM logs
  • MOD(numeric1, numeric2) - Returns remainder

    SELECT MOD(severity, 4) as severity_mod FROM logs
  • POWER(base, exponent) - Returns base raised to exponent

    SELECT POWER(severity, 2) as severity_squared FROM logs
  • SQRT(numeric) - Returns square root

    SELECT SQRT(severity) as severity_sqrt FROM logs
  • SIGN(numeric) - Returns sign (-1, 0, 1)

    SELECT SIGN(receivedtimestamp - eventtimestamp) as delay_sign FROM logs

Rounding Functions

  • ROUND(numeric [, precision]) - Rounds to specified decimal places

    SELECT ROUND(severity / 3.0, 2) as severity_ratio FROM logs
  • FLOOR(numeric) - Rounds down to nearest integer

    SELECT FLOOR(severity / 4.0) as severity_group FROM logs
  • CEIL(numeric) or CEILING(numeric) - Rounds up to nearest integer

    SELECT CEIL(severity / 4.0) as severity_group_up FROM logs
    SELECT CEILING(severity / 4.0) as severity_group_up FROM logs  -- Alternative syntax

Logarithmic and Exponential Functions

  • LN(numeric) or LOG(numeric) - Natural logarithm
  • LOG10(numeric) - Base-10 logarithm
  • LOG2(numeric) - Base-2 logarithm
  • EXP(numeric) - Exponential function (e^x)
  • LOG(base, numeric) - Logarithm with specified base

Trigonometric Functions

  • SIN(numeric), COS(numeric), TAN(numeric) - Trigonometric functions
  • ASIN(numeric), ACOS(numeric), ATAN(numeric) - Inverse trigonometric functions
  • DEGREES(radians), RADIANS(degrees) - Angle conversion

Date/Time Functions

Current Time Functions

  • CURRENT_TIMESTAMP - Returns current timestamp in local timezone
  • LOCALTIME - Returns current SQL time
  • CURRENT_DATE - Returns current SQL date
  • UNIX_TIMESTAMP() - Returns current Unix timestamp in seconds

Time Extraction

  • EXTRACT(unit FROM temporal) - Extracts time component
    SELECT EXTRACT(HOUR FROM eventtimestamp) as event_hour FROM logs
    SELECT EXTRACT(DAY FROM eventtimestamp) as event_day FROM logs
    SELECT EXTRACT(YEAR FROM eventtimestamp) as event_year FROM logs
    Available units: MILLENNIUM, CENTURY, DECADE, YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND

Time Conversion Functions

  • TO_TIMESTAMP_LTZ(numeric, precision) - Converts epoch timestamp to TIMESTAMP_LTZ

    SELECT TO_TIMESTAMP_LTZ(JSON_VALUE(attributes, '$.timestamp' RETURNING BIGINT), 3) as event_time FROM logs  -- 3 for milliseconds
  • UNIX_TIMESTAMP(timestamp) - Converts timestamp to Unix timestamp (seconds)

    SELECT UNIX_TIMESTAMP(eventtimestamp) as unix_time FROM logs
  • TO_TIMESTAMP(string [, format]) - Parses string to timestamp

    SELECT TO_TIMESTAMP(JSON_VALUE(attributes, '$.timestamp'), 'yyyy-MM-dd HH:mm:ss') as parsed_time FROM logs

Conditional Functions

CASE Expressions

  • CASE WHEN condition THEN result [WHEN condition THEN result] ELSE result END

    SELECT *,
      CASE
        WHEN severity <= 4 THEN 'TRACE'
        WHEN severity <= 8 THEN 'DEBUG'
        WHEN severity <= 12 THEN 'INFO'
        WHEN severity <= 16 THEN 'WARN'
        WHEN severity <= 20 THEN 'ERROR'
        ELSE 'FATAL'
      END as severity_level
    FROM logs
  • CASE value WHEN value1 THEN result1 WHEN value2 THEN result2 ELSE default END

    SELECT *,
      CASE severity
        WHEN 1 THEN 'TRACE'
        WHEN 9 THEN 'INFO'
        WHEN 17 THEN 'ERROR'
        ELSE 'OTHER'
      END as severity_name
    FROM logs

NULL Handling Functions

  • COALESCE(value1, value2, ...) - Returns first non-NULL value

    SELECT COALESCE(JSON_VALUE(attributes, '$.userId'), 'unknown') as user_id FROM logs
  • NULLIF(value1, value2) - Returns NULL if values are equal

    SELECT NULLIF(severity, 0) as non_zero_severity FROM logs
  • IF(condition, true_value, false_value) - Simple conditional

    SELECT IF(severity > 16, 'HIGH', 'NORMAL') as priority FROM logs

Comparison Functions

  • GREATEST(value1, value2, ...) - Returns maximum value

    SELECT GREATEST(severity, 10) as min_severity FROM logs
  • LEAST(value1, value2, ...) - Returns minimum value

    SELECT LEAST(severity, 20) as max_severity FROM logs

JSON Functions

JSON Validation

  • string IS JSON [VALUE | SCALAR | ARRAY | OBJECT] - Validates JSON format
    SELECT * FROM logs WHERE attributes IS JSON OBJECT

JSON Path Functions

  • JSON_EXISTS(json_string, path) - Checks if JSON path exists

    SELECT * FROM logs WHERE JSON_EXISTS(attributes, '$.process.thread')
  • JSON_VALUE(json_string, path [RETURNING type] [ON ERROR | ON EMPTY]) - Extracts scalar value

    SELECT JSON_VALUE(attributes, '$.userId' RETURNING STRING) as user_id FROM logs
    SELECT JSON_VALUE(attributes, '$.responseTime' RETURNING INTEGER) as response_time FROM logs
    SELECT JSON_VALUE(attributes, '$.isSuccess' RETURNING BOOLEAN) as success FROM logs
  • JSON_QUERY(json_string, path [RETURNING type] [ON ERROR | ON EMPTY]) - Extracts JSON objects/arrays

    SELECT JSON_QUERY(attributes, '$.process') as process_info FROM logs
    SELECT JSON_QUERY(attributes, '$.tags[*]') as all_tags FROM logs

JSON Construction

  • JSON_OBJECT([KEY key VALUE value] ...) - Creates JSON object

    SELECT JSON_OBJECT('severity' VALUE severity, 'timestamp' VALUE eventtimestamp) as event_summary FROM logs
  • JSON_ARRAY([value1, value2, ...]) - Creates JSON array

    SELECT JSON_ARRAY(severity, EXTRACT(HOUR FROM TO_TIMESTAMP_LTZ(eventtimestamp, 3)), tags['host'][1]) as event_array FROM logs

Hash Functions

  • MD5(string) - Returns MD5 hash

    SELECT MD5(CONCAT(id, message)) as content_hash FROM logs
  • SHA1(string) - Returns SHA-1 hash

  • SHA256(string) - Returns SHA-256 hash

    SELECT SHA256(message) as message_hash FROM logs

Aggregate Functions

Basic Aggregations

  • COUNT([DISTINCT] expression) - Counts rows/distinct values

    SELECT COUNT(*) as total_logs FROM logs
    SELECT COUNT(DISTINCT tags['service'][1]) as unique_services FROM logs
  • SUM([DISTINCT] expression) - Sums numeric values

  • AVG([DISTINCT] expression) - Calculates average

  • MIN(expression) - Finds minimum value

  • MAX(expression) - Finds maximum value

    SELECT tags['service'][1] as service, COUNT(*) as log_count, AVG(severity) as avg_severity FROM logs GROUP BY tags['service'][1]

Statistical Functions

  • STDDEV_POP(expression) - Population standard deviation
  • STDDEV_SAMP(expression) - Sample standard deviation
  • VAR_POP(expression) - Population variance
  • VAR_SAMP(expression) - Sample variance

Collection Functions

  • COLLECT(expression) - Collects values into a multiset
  • LISTAGG(expression [, separator]) - Concatenates values with separator
    SELECT tags['service'][1] as service, LISTAGG(DISTINCT tags['host'][1], ',') as hosts FROM logs GROUP BY tags['service'][1]

Windowing and Aggregations

To aggreagate data in a streaming setup with Flink SQL, you will want to use windowing functions. To learn about this powerful capability, please refer to the Flink documentation here (opens in a new tab).

Window Functions (Streaming)

For streaming data processing, Flink SQL provides windowing table-valued functions (TVFs). These functions create windows over time-based or count-based intervals, allowing you to perform streaming aggregations over subsets of data.

Note when using streaming windowed aggregations along with GROUP BY, you must include window_time as a group-by key.

Tumbling Windows

Fixed-size, non-overlapping windows:

-- ✅ VALID: Get a breakdown of log count per severity per service in 5-minute windows
SELECT
  window_start as eventtimestamp,     -- Window start as event time
  CONCAT('Window summary: ', service, ' had ', CAST(log_count AS STRING), ' logs with severity ',
         CAST(severity AS STRING)) as message,  -- Descriptive message
  severity,
  MAP['service', ARRAY[service], 'window_type', ARRAY['tumbling']] as tags,  -- Service and window type tags
  window_start, -- turns into an attribute
  window_end,   -- turns into an attribute
  log_count    -- turns into an attribute
FROM (
  SELECT
    window_start,                   -- Start time of each 5-minute window
    window_end,                     -- End time of each 5-minute window
    tags['service'][1] as service,  -- Group by service name
    severity,                       -- and severity
    COUNT(*) as log_count           -- Count total logs in each window
  FROM TABLE(
    -- Create 5-minute windows based on event timestamp
    TUMBLE(TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '5' MINUTES)
  )
  -- Group results by window and service to get aggregates per service per window
  GROUP BY window_start, window_end, serverity, tags['service'][1]
)

Hopping (Sliding) Windows

Fixed-size windows that can overlap:

-- ✅ VALID: Convert hopping window analysis into system health LogEvents
SELECT
  -- Generate synthetic LogEvent fields for each window health report
  MD5(CONCAT(CAST(window_start AS STRING), 'system_health')) as id,
  UNIX_TIMESTAMP(window_end) * 1000 as eventtimestamp,  -- Window end as event time
  UNIX_TIMESTAMP(CURRENT_TIMESTAMP) * 1000 as receivedtimestamp,
  CONCAT('System health: ', CAST(log_count AS STRING), ' total logs across ',
         CAST(service_count AS STRING), ' services') as message,
  -- Set severity based on activity level
  CASE
    WHEN log_count < 100 THEN 13  -- WARN: Low activity
    WHEN log_count > 10000 THEN 13  -- WARN: High activity
    ELSE 9  -- INFO: Normal activity
  END as severity,
  MAP['analysis_type', ARRAY['system_health'], 'window_type', ARRAY['hopping']] as tags,
  JSON_OBJECT(
    'window_start' VALUE CAST(window_start AS STRING),
    'window_end' VALUE CAST(window_end AS STRING),
    'total_log_count' VALUE log_count,
    'active_service_count' VALUE service_count,
    'activity_level' VALUE CASE
      WHEN log_count < 100 THEN 'low'
      WHEN log_count > 10000 THEN 'high'
      ELSE 'normal'
    END,
    'analysis_type' VALUE 'hopping_window_health'
  ) as attributes,
  'system' as service,  -- Synthetic service for system-wide metrics
  'health-monitor' as host  -- Synthetic host for health monitoring
FROM (
  SELECT
    window_start,  -- Start time of each window
    window_end,    -- End time of each window
    COUNT(*) as log_count,                    -- Total logs in this window
    COUNT(DISTINCT tags['service'][1]) as service_count  -- Number of unique services in this window
  FROM TABLE(
    HOP(
      TABLE logs,
      DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
      INTERVAL '1' MINUTE,  -- Window slides every 1 minute
      INTERVAL '5' MINUTES  -- Each window covers 5 minutes of data
    )
    -- This creates overlapping 5-minute windows that move forward every minute
  )
  -- Group by window boundaries to get one result per window
  GROUP BY window_start, window_end
)
-- This converts each health summary into a proper LogEvent

Session Windows

Groups events by activity sessions:

-- ✅ VALID: Convert session analysis into session summary LogEvents
SELECT
  -- Generate synthetic LogEvent fields for each session summary
  MD5(CONCAT(CAST(window_start AS STRING), tags['service'][1], 'session')) as id,
  window_end as eventtimestamp,  -- Session end as event time
  UNIX_TIMESTAMP(CURRENT_TIMESTAMP) * 1000 as receivedtimestamp,
  CONCAT('Session completed for ', tags['service'][1], ': ', CAST(session_events AS STRING),
         ' events over ', CAST((window_end - window_start) / 60000 AS STRING), ' minutes') as message,
  -- Set severity based on session length and activity
  CASE
    WHEN session_events < 5 THEN 5      -- DEBUG: Short session
    WHEN session_events > 1000 THEN 13  -- WARN: Very active session
    ELSE 9                               -- INFO: Normal session
  END as severity,
  MAP['service', ARRAY[tags['service'][1]], 'session_type', ARRAY['activity_session']] as tags,
  JSON_OBJECT(
    'session_start' VALUE window_start,
    'session_end' VALUE window_end,
    'session_duration_ms' VALUE (window_end - window_start),
    'session_duration_minutes' VALUE ((window_end - window_start) / 60000),
    'event_count' VALUE session_events,
    'events_per_minute' VALUE ROUND(session_events / ((window_end - window_start) / 60000.0), 2),
    'analysis_type' VALUE 'session_summary'
  ) as attributes,
  service,  -- Required service field
  'session-analyzer' as host  -- Synthetic host for session analysis
FROM (
  SELECT
    window_start,  -- When this session started
    window_end,    -- When this session ended
    tags['service'][1] as service,  -- Service name for this session
    COUNT(*) as session_events  -- Number of events in this session
  FROM TABLE(
    SESSION(
      TABLE logs PARTITION BY tags['service'][1],  -- Create separate sessions for each service
      DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
      INTERVAL '10' MINUTES  -- If no events for 10 minutes, end the session
    )
    -- Groups consecutive events into sessions based on time gaps
  )
  -- Group by session boundaries and service
  GROUP BY window_start, window_end, tags['service'][1]
)
-- This converts each session summary into a proper LogEvent

Cumulative Windows

Windows that expand incrementally:

SELECT
  window_start,
  window_end,
  service,
  COUNT(*) as cumulative_count
FROM TABLE(
  CUMULATE(
    TABLE logs,
    DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
    INTERVAL '1' HOUR,   -- step size
    INTERVAL '1' DAY     -- max window size
  )
)
GROUP BY window_start, window_end, tags['service'][1]

Analytical Window Functions

Ranking Functions

  • ROW_NUMBER() OVER (window_spec) - Sequential row numbers
  • RANK() OVER (window_spec) - Ranking with gaps
  • DENSE_RANK() OVER (window_spec) - Ranking without gaps
  • PERCENT_RANK() OVER (window_spec) - Percentage rank
SELECT *,  -- Include all original fields
  -- Assign sequential numbers to events within each service (newest first)
  ROW_NUMBER() OVER (ORDER BY eventtimestamp) as event_sequence
FROM logs

GROUP BY Operations

Basic Grouping

The below example groups messages by service and severity, counting the number of logs in each group for each minute. This is a common operation to summarize log data.

WITH parsed AS (
  SELECT
    tags['service'][1] as service,  -- Extract service name
    severity,                       -- Extract severity level
    eventtimestamp                  -- Use event timestamp for windowing
  FROM logs
),
aggregated AS (
  SELECT
    service,                 -- Service name
    severity,                -- Severity level
    COUNT(*) as evtcount,       -- Count of logs in each group
    MIN(eventtimestamp) as first_event,    -- Earliest event in group
    MAX(eventtimestamp) as last_event      -- Latest event in group
  FROM TABLE(TUMBLE(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '1' MINUTE))
  GROUP BY window_start, window_end, service, severity  -- Group by service and severity
)
SELECT
  last_event as eventtimestamp,     -- Use latest event time as the summary event time
  CONCAT('Service ', service, ' severity ', CAST(severity AS STRING), ' had ',
         CAST(evtcount AS STRING), ' events') as message,  -- Descriptive summary message
  severity,  -- Use the grouped severity level
  MAP['service', ARRAY[service], 'analysis_type', ARRAY['group_summary']] as tags,
  JSON_OBJECT(
    'event_count' VALUE evtcount,
    'first_event' VALUE first_event,
    'last_event' VALUE last_event,
    'analysis_type' VALUE 'service_severity_summary'
  ) as attributes
FROM aggregated

HAVING Clause

Filters groups after aggregation. In the below example, we create new log messages when the error count exceeds a threshold. This is an example of the complex alerting rules you can create with SQL.

-- ✅ VALID: Convert error count analysis into alert LogEvents
WITH parsed AS (
  SELECT
    tags['service'][1] as service,  -- Extract service name
    severity,                       -- Extract severity level
    eventtimestamp                  -- Use event timestamp for windowing
  FROM logs
),
-- HAVING filters groups, WHERE filters individual rows
aggregated AS (
  SELECT
    service,                 -- Service name
    COUNT(*) as error_count  -- Count of error-level logs
  FROM TABLE(HOP(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '1' MINUTE, INTERVAL '10' MINUTES))
  WHERE severity >= 17  -- First filter: only ERROR level logs (17+)
  GROUP BY window_start, window_end, service  -- Group by service to count errors per service
  HAVING COUNT(*) > 100 -- Second filter: only show services with 100+ errors
)
SELECT
  CONCAT('HIGH ERROR ALERT: Service ', service, ' has ',
         CAST(error_count AS STRING), ' error-level events') as message,
  21 as severity,  -- FATAL level for high error count alerts
  MAP['service', ARRAY[service], 'alert_type', ARRAY['high_error_count'],
      'severity_level', ARRAY['FATAL']] as tags,
  JSON_OBJECT(
    'error_count' VALUE error_count,
    'error_threshold' VALUE 100,
    'severity_filter' VALUE 'ERROR_LEVEL_17_PLUS',
    'alert_reason' VALUE 'error_count_exceeded_threshold',
    'analysis_type' VALUE 'error_count_monitoring'
  ) as attributes
FROM aggregated

Advanced Grouping

GROUPING SETS

Create multiple aggregation levels. In the below example, we create multiple reporting hierarchy levels using grouping sets so we have varied granularity summarizations.

-- ✅ VALID: Convert multi-level aggregations into hierarchy report LogEvents
-- Start by extracting all the items we care about from the LogEvents
WITH parsed AS (
  SELECT
    tags['service'][1] as service,  -- Extract service name
    tags['host'][1] as host,        -- Extract host name
    severity,                       -- Extract severity level
    eventtimestamp                  -- Use event timestamp for windowing
  FROM logs
),
-- Execute the aggregation
WITH aggregated AS (
  SELECT
    window_start,  -- Start of the aggregation window
    service,       -- Service name
    host,          -- Host name
    severity,      -- Severity level
    COUNT(*) as log_count  -- Count of logs in this aggregation level
  FROM TABLE(TUMBLE(TABLE parsed, DESCRIPTOR(eventtimestamp), INTERVAL '10' SECONDS))
  GROUP BY window_start, window_end, GROUPING SETS (
    (service, host, severity),  -- Most detailed: by service + host + severity
    (service, severity),        -- Medium detail: by service + severity (all hosts combined)
    (service),                  -- Less detail: by service only (all hosts and severities)
    ()                          -- Least detail: grand total count (no grouping)
  )
)
-- Create nice messages and attributes for each aggregation level
SELECT
  window_start as eventtimestamp,  -- window time as eventtimestamp
  -- Create descriptive message based on aggregation level
  CASE
    WHEN service IS NOT NULL AND host IS NOT NULL AND severity IS NOT NULL
    THEN CONCAT('Detailed: ', service, '@', host, ' severity=', CAST(severity AS STRING), ' count=', CAST(log_count AS STRING))
    WHEN service IS NOT NULL AND severity IS NOT NULL
    THEN CONCAT('Service summary: ', service, ' severity=', CAST(severity AS STRING), ' count=', CAST(log_count AS STRING))
    WHEN service IS NOT NULL
    THEN CONCAT('Service total: ', service, ' count=', CAST(log_count AS STRING))
    ELSE CONCAT('Grand total: ', CAST(log_count AS STRING), ' logs')
  END as message,
  MAP[
    'service', ARRAY[COALESCE(service, 'ALL')],
    'host', ARRAY[COALESCE(host, 'ALL')],
    'report_type', ARRAY['hierarchy_summary'],
    'aggregation_level', ARRAY[
      CASE
        WHEN service IS NOT NULL AND host IS NOT NULL AND severity IS NOT NULL THEN 'detailed'
        WHEN service IS NOT NULL AND severity IS NOT NULL THEN 'service_severity'
        WHEN service IS NOT NULL THEN 'service_total'
        ELSE 'grand_total'
      END
    ]
  ] as tags,
  JSON_OBJECT(
    'service' VALUE COALESCE(service, 'ALL'),
    'host' VALUE COALESCE(host, 'ALL'),
    'severity' VALUE COALESCE(severity, -1),
    'log_count' VALUE log_count
  ) as attributes
FROM aggregated
)
ROLLUP

Generate hierarchical aggregations. Generates all prefixes of the grouping columns (note this is a batch example):

SELECT
  tags['service'][1] as service,
  tags['host'][1] as host,
  COUNT(*) as log_count
FROM logs
GROUP BY ROLLUP (tags['service'][1], tags['host'][1])
-- Generates: (service, host), (service), ()
CUBE

Generate all possible combinations of the grouping columns (note this is a batch example):

SELECT
  tags['service'][1] as service,
  severity,
  COUNT(*) as log_count
FROM logs
GROUP BY CUBE (tags['service'][1], severity)
-- Generates: (service, severity), (service), (severity), ()

Event-Level Analytical Functions

Event Ranking Within Service

-- ✅ VALID: Add ranking information to individual log events
SELECT *,  -- Include all LogEvent fields
  -- Add ranking of events within each service by severity
  RANK() OVER (PARTITION BY tags['service'][1] ORDER BY severity DESC) as severity_rank_in_service,
  -- Add row numbers to events within each service (most recent first)
  ROW_NUMBER() OVER (PARTITION BY tags['service'][1] ORDER BY eventtimestamp DESC) as event_sequence,
  -- Add the severity of the previous event in this service
  LAG(severity, 1) OVER (PARTITION BY tags['service'][1] ORDER BY eventtimestamp) as prev_event_severity
FROM logs
-- This returns individual LogEvents with additional analytical context

Important Rules

LogEvent Fields

Your SQL query can return any combination of LogEvent fields, but when specified, they must be of the acceptable types:

  • id (STRING) - Globally unique identifier for the log event

  • eventtimestamp (BIGINT or TIMESTAMP_LTZ or TIMESTAMP) - When BIGINT, milliseconds since Unix epoch when event occurred

  • receivedtimestamp (BIGINT or TIMESTAMP_LTZ or TIMESTAMP) - WHEN BIGINT, milliseconds since Unix epoch when Grepr received the event

  • message (STRING) - The log message content

  • severity (INT) - Log severity level (1-24, following OpenTelemetry convention)

  • tags (MAP<STRING, ARRAY<STRING>>) - Key-value pairs for filtering and routing

  • attributes (STRING) - JSON string containing structured data

Additional fields: Any extra computed columns are automatically added to the event's attributes map.

Default Values for Omitted Fields

When core LogEvent fields are omitted from your SELECT clause, the original values from the input event are preserved. However, if you're creating entirely new events as what happens during windowed aggregations, you may want to understand the default behavior:

  • id - Generated unique identifier if not provided

  • eventtimestamp - Current timestamp if not provided

  • receivedtimestamp - Current timestamp if not provided

  • message - Empty string if not provided

  • severity - 9 (INFO level) if not provided

  • tags - Empty map if not provided

  • attributes - Empty map if not provided

Note: In most cases, you'll want to use SELECT * or include specific fields to preserve data from the input events.

Examples:

 
-- Updates only message, all other fields preserved
 
SELECT *, UPPER(message) as message FROM logs
 
-- Updates message and severity, adds computed field into attributes
 
SELECT *, UPPER(message) as message, severity + 1 as severity, CHAR_LENGTH(message) as msg_length FROM logs
 

Error Handling

The SQL Transform will fail if:

  • SQL syntax is invalid - Malformed SQL queries will be rejected during parsing

  • Field types are incompatible - Core LogEvent fields must match their expected types when included

  • Query execution errors - Runtime errors during SQL execution (e.g., division by zero, invalid regex)

  • JSON parsing errors - Invalid JSON operations on the attributes field

SQL Transform Limitations

Streaming vs Batch Context

The SQL Transform operates in both streaming and batch processing. Streaming will have the following additional limitations:

Windowing Limitations

  • Window functions require explicit windowing TVFs (TUMBLE, HOP, SESSION, CUMULATE)

  • No unbounded aggregations - Use windowing for aggregations in streaming mode

State Management

  • Stateful operations can grow unbounded - Your pipeline can fail if the amount of state used for aggregations or joins becomes too large. You can control the Time-To-Live (TTL) of state by setting the stateTtl parameter in the SQL Transform configuration. By default, it's 2 minutes.

  • JOIN operations are limited in streaming context

  • Complex aggregations may require careful state management

Data Ordering

  • Events may arrive out of order - Consider using watermarks for time-based operations

  • ROW_NUMBER() and ranking functions may not guarantee consistent ordering across restarts

Type System Constraints

Attributes Field Handling

  • Use JSON functions (JSON_VALUE, JSON_QUERY, JSON_EXISTS) to work with attributes
  • Type conversion is automatic for basic types (string, number, boolean)

Performance Considerations

Query Optimization

  • Use WHERE clauses early to filter data before expensive operations
  • Avoid SELECT * when possible - explicitly list needed columns
  • Limit regex operations on high-volume streams - they are CPU intensive
  • Use indexes wisely - TO_TIMESTAMP_LTZ(eventtimestamp, 3) operations benefit from time-based partitioning

Memory and State Management

  • Computed fields increase event size - consider impact on downstream processing
  • Complex aggregations require more memory - reduce TTLs if needed.
  • Window operations buffer data - larger windows require more memory
  • JSON operations allocate temporary objects - avoid in tight loops

Throughput Optimization

-- ✅ GOOD - Filter early, simple operations
SELECT *,  -- Select all fields
  UPPER(message) as upper_message  -- Simple string operation (fast)
FROM logs
WHERE severity >= 17  -- Filter first to reduce data volume
 
-- ⚠️ CAUTION - Expensive regex on all records
SELECT *,  -- Processes every single log record
  -- This regex runs on ALL logs, even those without dates (inefficient)
  REGEXP_EXTRACT(message, '(\\d{4}-\\d{2}-\\d{2})', 1) as date_extracted
FROM logs
 
-- ✅ BETTER - Filter then apply expensive operation
SELECT *,  -- Only processes logs that likely contain dates
  -- Regex only runs on pre-filtered logs (much more efficient)
  REGEXP_EXTRACT(message, '(\\d{4}-\\d{2}-\\d{2})', 1) as date_extracted
FROM logs
WHERE message LIKE '%-%-%'  -- Quick pre-filter using simple pattern matching

Resource Usage Patterns

  • String operations (UPPER, LOWER, CONCAT): Low CPU overhead
  • Regex operations (REGEXP_REPLACE, REGEXP_EXTRACT): High CPU overhead
  • JSON operations (JSON_VALUE, JSON_QUERY): Moderate CPU overhead
  • Mathematical operations: Low CPU overhead
  • Window aggregations: High memory usage during window buffering

Use Cases

The SQL Transform is ideal for:

Data Normalization and Standardization

  • Message format standardization: Converting various log formats to a common structure
  • Timestamp normalization: Converting different timestamp formats to standard milliseconds
  • Field standardization: Ensuring consistent field names and types across services

Log Enhancement and Enrichment

  • Computed metrics: Adding calculated fields like processing delays, error rates
  • Categorization: Adding severity levels, priority flags, or classification labels
  • Contextual data: Extracting and promoting key information from log messages

Security and Compliance

  • Data redaction: Removing or masking sensitive information (passwords, tokens, PII)
  • Audit trail enhancement: Adding tracking fields for compliance requirements
  • Anomaly flagging: Marking unusual patterns or suspicious activities

Performance and Monitoring

  • SLA monitoring: Extracting response times and performance metrics
  • Error analysis: Categorizing and enriching error logs for better debugging
  • Trend analysis: Preparing log data for time-series analysis and alerting

Data Pipeline Optimization

  • Selective processing: Filtering logs to reduce downstream processing load
  • Format conversion: Converting logs for compatibility with downstream systems
  • Batch preparation: Aggregating and preparing data for batch analytics

Examples from Production

Security Log Processing

SELECT *,
  CASE
    WHEN message LIKE '%authentication failed%' THEN 'auth_failure'
    WHEN message LIKE '%unauthorized access%' THEN 'unauthorized'
    WHEN message LIKE '%suspicious activity%' THEN 'suspicious'
    ELSE 'normal'
  END as security_category,
  REGEXP_REPLACE(message, 'user=[^\\s]+', 'user=REDACTED') as sanitized_message
FROM logs
WHERE severity >= 17 -- ERROR level and above
View Example Input/Output

Input:

{
  "id": "sec1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 17,
  "message": "authentication failed for user=johndoe from IP 192.168.1.100",
  "tags": {"service": ["auth"], "env": ["prod"]},
  "attributes": {
    "ip": "192.168.1.100",
    "attempt": 3
  }
}

Output:

{
  "id": "sec1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 17,
  "message": "authentication failed for user=johndoe from IP 192.168.1.100",
  "tags": {"service": ["auth"], "env": ["prod"]},
  "attributes": {
    "ip": "192.168.1.100",
    "attempt": 3,
    "security_category": "auth_failure",
    "sanitized_message": "authentication failed for user=REDACTED from IP 192.168.1.100"
  }
}

Application Performance Monitoring

SELECT *,
  CAST(REGEXP_EXTRACT(message, 'response_time=(\\d+)ms', 1) AS BIGINT) as response_time_ms,
  CASE
    WHEN message LIKE '%response_time=%ms' AND CAST(REGEXP_EXTRACT(message, 'response_time=(\\d+)ms', 1) AS BIGINT) > 1000 THEN 'slow'
    ELSE 'normal'
  END as performance_category
FROM logs
WHERE message LIKE '%response_time=%'
View Example Input/Output

Input:

{
  "id": "perf1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 9,
  "message": "API request completed with response_time=1250ms for endpoint /api/users",
  "tags": {"service": ["api"], "endpoint": ["/api/users"]},
  "attributes": {
    "method": "GET",
    "status_code": 200,
    "user_id": "12345"
  }
}

Output:

{
  "id": "perf1",
  "eventtimestamp": 1724214074062,
  "receivedtimestamp": 1724214074188,
  "severity": 9,
  "message": "API request completed with response_time=1250ms for endpoint /api/users",
  "tags": {"service": ["api"], "endpoint": ["/api/users"]},
  "attributes": {
    "method": "GET",
    "status_code": 200,
    "user_id": "12345",
    "response_time_ms": 1250,
    "performance_category": "slow"
  }
}