APIs
Job Creation Guide

Job Creation API Guide

This guide helps developers build programs that create and manage Grepr jobs using our REST APIs. Grepr jobs can be configured as synchronous or asynchronous, and as batch or streaming operations.

Overview

Grepr jobs are composed of a directed graph of operations that process log data. Each job has:

  • Execution Mode: SYNCHRONOUS or ASYNCHRONOUS
  • Processing Type: BATCH or STREAMING
  • Job Graph: A collection of vertices (operations) and edges (data flow connections)

Authentication

All job operations require authentication using Bearer tokens. See the Authentication Guide for details on obtaining access tokens.

Include your token in API requests:

Authorization: Bearer <your_access_token>

Job Creation Endpoints

Synchronous Jobs

POST /v1/jobs/sync

Returns streaming results as ND-JSON. Connection stays open until job completes. See the API specification for details.

Asynchronous Jobs

POST /v1/jobs/async

Returns immediately with job ID. Poll job status using GET endpoint. See the API specification for details.

Job Management

Get Job Status

GET /v1/jobs/{id}?version={version}&resolved={boolean}

See the API specification for details.

Update Job

PUT /v1/jobs/{id}

See the API specification for details.

Delete Job

DELETE /v1/jobs/{id}

See the API specification for details.

List Jobs

GET /v1/jobs?execution={type}&processing={type}&state={states}

See the API specification for details.

Job Schema

CreateJob Request

{
  "name": "my_job",
  "execution": "SYNCHRONOUS|ASYNCHRONOUS", 
  "processing": "BATCH|STREAMING",
  "jobGraph": {
    "vertices": [...],
    "edges": [...]
  },
  "tags": {
    "environment": "production",
    "team": "data-engineering"
  },
  "teamIds": ["team-id-1"]
}

Job Graph Structure

The job graph uses a simplified edge format. Edges can be specified as:

  • Simple strings: "source_op -> destination_op"
  • With specific outputs: "source_op:else -> destination_op"
{
  "vertices": [
    {
      "name": "source_operation",
      "type": "logs-iceberg-table-source",
      // Operation-specific properties
    },
    {
      "name": "transform_operation", 
      "type": "log-reducer",
      // Operation-specific properties
    },
    {
      "name": "sink_operation",
      "type": "logs-sync-sink"
    }
  ],
  "edges": [
    "source_operation -> transform_operation",
    "transform_operation -> sink_operation"
  ]
}

Common Job Patterns

1. Data Lake Query (Synchronous Batch)

Query historical log data from Grepr's data lake:

{
  "name": "data_lake_query",
  "execution": "SYNCHRONOUS",
  "processing": "BATCH", 
  "jobGraph": {
    "vertices": [
      {
        "name": "source",
        "type": "logs-iceberg-table-source",
        "datasetId": "abc123def456",
        "start": "2024-01-01T00:00:00Z",
        "end": "2024-01-02T00:00:00Z", 
        "query": {
          "type": "datadog-query",
          "query": "service:my-service status:error"
        },
        "limit": 1000,
        "reductionInterval": "PT2M",
        "sortOrder": "UNSORTED"
      },
      {
        "name": "sink", 
        "type": "logs-sync-sink"
      }
    ],
    "edges": [
      "source -> sink"
    ]
  },
  "tags": {}
}

2. Streaming Data Processing (Synchronous Streaming)

Process live data with rate limiting for testing:

{
  "name": "streaming_processor",
  "execution": "SYNCHRONOUS",
  "processing": "STREAMING",
  "jobGraph": {
    "vertices": [
      {
        "name": "source",
        "type": "datadog-log-agent-source"
      },
      {
        "name": "sampler",
        "type": "logs-event-sampler", 
        "maxAllowedRate": 10.0,
        "maxBurstLimit": 100
      },
      {
        "name": "transform",
        "type": "sql-transform",
        "sqlQuery": "SELECT *, UPPER(message) as upper_message FROM logs WHERE severity <= 4"
      },
      {
        "name": "sink",
        "type": "logs-sync-sink"
      }
    ],
    "edges": [
      "source -> sampler",
      "sampler -> transform",
      "transform -> sink"
    ]
  },
  "tags": {}
}

3. Production Pipeline (Asynchronous Streaming)

Complex production pipeline based on our reference implementation:

{
  "name": "production_pipeline",
  "execution": "ASYNCHRONOUS", 
  "processing": "STREAMING",
  "jobGraph": {
    "vertices": [
      {
        "type": "datadog-log-agent-source",
        "name": "datadog_source"
      },
      {
        "type": "logs-filter",
        "name": "pre_parser_filter",
        "predicate": {
          "type": "datadog-query",
          "query": "-status:debug"
        }
      },
      {
        "type": "json-log-processor",
        "name": "json_processor",
        "maxNestedDepthForFields": 3
      },
      {
        "type": "log-attributes-remapper",
        "name": "attributes_remapper"
      },
      {
        "type": "logs-branch",
        "name": "branch_special_logs",
        "predicate": {
          "type": "datadog-query",
          "query": "source:elb"
        }
      },
      {
        "type": "grok-parser",
        "name": "grok_parser",
        "grokParsingRules": [
          "elb %{_date_access} %{_elb_name} %{_client_ip}:%{_client_port} %{_status_code}"
        ],
        "grokHelperRules": [
          "_elb_name %{notSpace:elb.name}",
          "_client_ip %{ipOrHost:network.client.ip}",
          "_client_port %{integer:network.client.port}",
          "_status_code %{integer:http.status_code}",
          "_date_access %{timestampiso8601:timestamp}"
        ]
      },
      { 
        "type": "log-reducer",
        "name": "log_reducer",
        "logReducerExceptions": [{
          "type": "datadog-query",
          "query": "skipAggregation:true"
        }],
        "partitionByTags": ["service"],
        "partitionByAttributes": ["http.url_details.path", "http.method"],
        "similarityThreshold": 80.0,
        "dedupThreshold": 5,
        "reductionTimeWindow": "PT120S",
        "samplingConfig": {
          "type": "window-based-logarithmic-sampling",
          "logarithmBase": 2
        },
        "masks": [
          ["timestamp", "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}"],
          ["uuid", "[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"]
        ],
        "enabledMasks": ["timestamp", "uuid"]
      },
      {
        "type": "logs-iceberg-table-sink",
        "name": "data_lake_sink",
        "datasetId": "processed-logs-dataset"
      },
      {
        "type": "datadog-log-sink",
        "name": "datadog_sink",
        "integrationId": "dd-integration-id",
        "additionalTags": [
          "processor:grepr",
          "pipeline:production"
        ]
      }
    ],
    "edges": [
      "datadog_source -> pre_parser_filter",
      "pre_parser_filter -> json_processor",
      "json_processor -> attributes_remapper",
      "attributes_remapper -> branch_special_logs",
      "branch_special_logs -> grok_parser",
      "grok_parser -> log_reducer",
      "branch_special_logs:else -> log_reducer",
      "log_reducer -> data_lake_sink",
      "log_reducer -> datadog_sink"
    ]
  },
  "tags": {
    "pipeline_type": "production"
  }
}

4. Multi-Source to Multi-Destination Pipeline

Process logs from multiple sources and send to different destinations:

{
  "name": "multi_pipeline",
  "execution": "ASYNCHRONOUS",
  "processing": "STREAMING", 
  "jobGraph": {
    "vertices": [
      {
        "name": "datadog_source",
        "type": "datadog-log-agent-source"
      },
      {
        "name": "splunk_source",
        "type": "splunk-log-agent-source",
        "integrationId": "splunk-integration-id"
      },
      {
        "name": "error_filter",
        "type": "logs-filter",
        "predicate": {
          "type": "datadog-query", 
          "query": "status:error OR status:warning"
        }
      },
      {
        "name": "info_filter",
        "type": "logs-filter",
        "predicate": {
          "type": "datadog-query",
          "query": "status:info"
        }
      },
      {
        "name": "newrelic_sink",
        "type": "newrelic-log-sink",
        "integrationId": "nr-integration-id"
      },
      {
        "name": "data_lake_sink",
        "type": "logs-iceberg-table-sink",
        "datasetId": "all-logs-dataset"
      }
    ],
    "edges": [
      "datadog_source -> error_filter",
      "datadog_source -> info_filter", 
      "splunk_source -> error_filter",
      "splunk_source -> info_filter",
      "error_filter -> newrelic_sink",
      "info_filter -> data_lake_sink"
    ]
  },
  "tags": {}
}

Important Considerations

Rate Limiting for Synchronous Streaming Jobs

When creating synchronous streaming jobs, always include a logs-event-sampler operation to limit the volume of data sent back to prevent overwhelming the client connection:

{
  "name": "rate_limiter",
  "type": "logs-event-sampler",
  "maxAllowedRate": 10.0,
  "maxBurstLimit": 100,
  "filter": {
    "type": "datadog-query",
    "query": "*"
  }
}

Heartbeat Management

For synchronous jobs, the server sends periodic heartbeat tokens through the stream. Clients should respond by calling the heartbeat endpoint to keep the connection alive:

POST /v1/jobs/sync/heartbeat
Content-Type: application/json

"<heartbeat-token>"

Job States

Monitor job progress through these states:

  • PENDING: Job accepted, waiting to start
  • STARTING: Job transitioning to running state
  • RUNNING: Job actively processing data
  • FINISHED: Job completed successfully
  • FAILED: Job encountered an error
  • CANCELLED: Job was cancelled by user

Error Handling

Handle these common HTTP status codes:

  • 200/201/202: Success
  • 400: Invalid request (malformed job graph, invalid parameters)
  • 401: Authentication required
  • 409: Conflict (concurrent update, resource already exists)
  • 404: Job not found

Code Example

JavaScript Example

import ndjsonstream from 'can-ndjson-stream';
 
const createSyncJob = async (jobPayload, token) => {
  const response = await fetch('https://<your_org_id>.app.grepr.ai/api/v1/jobs/sync', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${token}`,
      'Content-Type': 'application/json',
      'Accept': 'application/x-ndjson'
    },
    body: JSON.stringify(jobPayload)
  });
 
  // Use ndjsonstream to parse the response
  const stream = ndjsonstream(response.body);
  const reader = stream.getReader();
 
  while (true) {
    const { done, value } = await reader.read();
    
    // Handle heartbeat tokens
    if (value?.jobState === 'HEARTBEAT' && value.heartbeatToken) {
      // Send heartbeat (don't await to avoid blocking)
      fetch('https://<your_org_id>.app.grepr.ai/api/v1/jobs/sync/heartbeat', {
        method: 'POST',
        headers: { 'Authorization': `Bearer ${token}` },
        body: JSON.stringify(value.heartbeatToken)
      }).catch(e => console.error('Heartbeat error:', e));
    }
    
    // Process data events
    if (value?.data) {
      console.log('Received log:', value.data);
    }
    
    // Check job completion
    if (value?.jobState === 'FINISHED') {
      console.log('Job completed');
    }
    
    if (done) {
      break;
    }
  }
};

Available Operations

Grepr supports a wide variety of sources, transforms, sinks, and special operations for building data processing pipelines. For a complete list of available operations and their configuration options, see the API specification.

Best Practices

  1. Use appropriate rate limiting for synchronous streaming jobs
  2. Implement heartbeat handling for long-running synchronous jobs
  3. Monitor job states and handle failures gracefully
  4. Use meaningful job names and tags for easier management
  5. Test with small datasets before running on production data
  6. Set reasonable limits on batch job result sizes
  7. Use SQL transforms for complex data transformations
  8. Leverage templates for reusable pipeline patterns
  9. Use branching operations to split data flows based on conditions
  10. Chain operations logically - filter before processing, parse before mapping

For more information, see the API specification for complete endpoint documentation and operation details.