Job Creation API Guide
This guide helps developers build programs that create and manage Grepr jobs using our REST APIs. Grepr jobs can be configured as synchronous or asynchronous, and as batch or streaming operations.
Overview
Grepr jobs are composed of a directed graph of operations that process log data. Each job has:
- Execution Mode:
SYNCHRONOUS
orASYNCHRONOUS
- Processing Type:
BATCH
orSTREAMING
- Job Graph: A collection of vertices (operations) and edges (data flow connections)
Authentication
All job operations require authentication using Bearer tokens. See the Authentication Guide for details on obtaining access tokens.
Include your token in API requests:
Authorization: Bearer <your_access_token>
Job Creation Endpoints
Synchronous Jobs
POST /v1/jobs/sync
Returns streaming results as ND-JSON. Connection stays open until job completes. See the API specification for details.
Asynchronous Jobs
POST /v1/jobs/async
Returns immediately with job ID. Poll job status using GET endpoint. See the API specification for details.
Job Management
Get Job Status
GET /v1/jobs/{id}?version={version}&resolved={boolean}
See the API specification for details.
Update Job
PUT /v1/jobs/{id}
See the API specification for details.
Delete Job
DELETE /v1/jobs/{id}
See the API specification for details.
List Jobs
GET /v1/jobs?execution={type}&processing={type}&state={states}
See the API specification for details.
Job Schema
CreateJob Request
{
"name": "my_job",
"execution": "SYNCHRONOUS|ASYNCHRONOUS",
"processing": "BATCH|STREAMING",
"jobGraph": {
"vertices": [...],
"edges": [...]
},
"tags": {
"environment": "production",
"team": "data-engineering"
},
"teamIds": ["team-id-1"]
}
Job Graph Structure
The job graph uses a simplified edge format. Edges can be specified as:
- Simple strings:
"source_op -> destination_op"
- With specific outputs:
"source_op:else -> destination_op"
{
"vertices": [
{
"name": "source_operation",
"type": "logs-iceberg-table-source",
// Operation-specific properties
},
{
"name": "transform_operation",
"type": "log-reducer",
// Operation-specific properties
},
{
"name": "sink_operation",
"type": "logs-sync-sink"
}
],
"edges": [
"source_operation -> transform_operation",
"transform_operation -> sink_operation"
]
}
Common Job Patterns
1. Data Lake Query (Synchronous Batch)
Query historical log data from Grepr's data lake:
{
"name": "data_lake_query",
"execution": "SYNCHRONOUS",
"processing": "BATCH",
"jobGraph": {
"vertices": [
{
"name": "source",
"type": "logs-iceberg-table-source",
"datasetId": "abc123def456",
"start": "2024-01-01T00:00:00Z",
"end": "2024-01-02T00:00:00Z",
"query": {
"type": "datadog-query",
"query": "service:my-service status:error"
},
"limit": 1000,
"reductionInterval": "PT2M",
"sortOrder": "UNSORTED"
},
{
"name": "sink",
"type": "logs-sync-sink"
}
],
"edges": [
"source -> sink"
]
},
"tags": {}
}
2. Streaming Data Processing (Synchronous Streaming)
Process live data with rate limiting for testing:
{
"name": "streaming_processor",
"execution": "SYNCHRONOUS",
"processing": "STREAMING",
"jobGraph": {
"vertices": [
{
"name": "source",
"type": "datadog-log-agent-source"
},
{
"name": "sampler",
"type": "logs-event-sampler",
"maxAllowedRate": 10.0,
"maxBurstLimit": 100
},
{
"name": "transform",
"type": "sql-transform",
"sqlQuery": "SELECT *, UPPER(message) as upper_message FROM logs WHERE severity <= 4"
},
{
"name": "sink",
"type": "logs-sync-sink"
}
],
"edges": [
"source -> sampler",
"sampler -> transform",
"transform -> sink"
]
},
"tags": {}
}
3. Production Pipeline (Asynchronous Streaming)
Complex production pipeline based on our reference implementation:
{
"name": "production_pipeline",
"execution": "ASYNCHRONOUS",
"processing": "STREAMING",
"jobGraph": {
"vertices": [
{
"type": "datadog-log-agent-source",
"name": "datadog_source"
},
{
"type": "logs-filter",
"name": "pre_parser_filter",
"predicate": {
"type": "datadog-query",
"query": "-status:debug"
}
},
{
"type": "json-log-processor",
"name": "json_processor",
"maxNestedDepthForFields": 3
},
{
"type": "log-attributes-remapper",
"name": "attributes_remapper"
},
{
"type": "logs-branch",
"name": "branch_special_logs",
"predicate": {
"type": "datadog-query",
"query": "source:elb"
}
},
{
"type": "grok-parser",
"name": "grok_parser",
"grokParsingRules": [
"elb %{_date_access} %{_elb_name} %{_client_ip}:%{_client_port} %{_status_code}"
],
"grokHelperRules": [
"_elb_name %{notSpace:elb.name}",
"_client_ip %{ipOrHost:network.client.ip}",
"_client_port %{integer:network.client.port}",
"_status_code %{integer:http.status_code}",
"_date_access %{timestampiso8601:timestamp}"
]
},
{
"type": "log-reducer",
"name": "log_reducer",
"logReducerExceptions": [{
"type": "datadog-query",
"query": "skipAggregation:true"
}],
"partitionByTags": ["service"],
"partitionByAttributes": ["http.url_details.path", "http.method"],
"similarityThreshold": 80.0,
"dedupThreshold": 5,
"reductionTimeWindow": "PT120S",
"samplingConfig": {
"type": "window-based-logarithmic-sampling",
"logarithmBase": 2
},
"masks": [
["timestamp", "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}"],
["uuid", "[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"]
],
"enabledMasks": ["timestamp", "uuid"]
},
{
"type": "logs-iceberg-table-sink",
"name": "data_lake_sink",
"datasetId": "processed-logs-dataset"
},
{
"type": "datadog-log-sink",
"name": "datadog_sink",
"integrationId": "dd-integration-id",
"additionalTags": [
"processor:grepr",
"pipeline:production"
]
}
],
"edges": [
"datadog_source -> pre_parser_filter",
"pre_parser_filter -> json_processor",
"json_processor -> attributes_remapper",
"attributes_remapper -> branch_special_logs",
"branch_special_logs -> grok_parser",
"grok_parser -> log_reducer",
"branch_special_logs:else -> log_reducer",
"log_reducer -> data_lake_sink",
"log_reducer -> datadog_sink"
]
},
"tags": {
"pipeline_type": "production"
}
}
4. Multi-Source to Multi-Destination Pipeline
Process logs from multiple sources and send to different destinations:
{
"name": "multi_pipeline",
"execution": "ASYNCHRONOUS",
"processing": "STREAMING",
"jobGraph": {
"vertices": [
{
"name": "datadog_source",
"type": "datadog-log-agent-source"
},
{
"name": "splunk_source",
"type": "splunk-log-agent-source",
"integrationId": "splunk-integration-id"
},
{
"name": "error_filter",
"type": "logs-filter",
"predicate": {
"type": "datadog-query",
"query": "status:error OR status:warning"
}
},
{
"name": "info_filter",
"type": "logs-filter",
"predicate": {
"type": "datadog-query",
"query": "status:info"
}
},
{
"name": "newrelic_sink",
"type": "newrelic-log-sink",
"integrationId": "nr-integration-id"
},
{
"name": "data_lake_sink",
"type": "logs-iceberg-table-sink",
"datasetId": "all-logs-dataset"
}
],
"edges": [
"datadog_source -> error_filter",
"datadog_source -> info_filter",
"splunk_source -> error_filter",
"splunk_source -> info_filter",
"error_filter -> newrelic_sink",
"info_filter -> data_lake_sink"
]
},
"tags": {}
}
Important Considerations
Rate Limiting for Synchronous Streaming Jobs
When creating synchronous streaming jobs, always include a
logs-event-sampler
operation to limit the volume
of data sent back to prevent overwhelming the client connection:
{
"name": "rate_limiter",
"type": "logs-event-sampler",
"maxAllowedRate": 10.0,
"maxBurstLimit": 100,
"filter": {
"type": "datadog-query",
"query": "*"
}
}
Heartbeat Management
For synchronous jobs, the server sends periodic heartbeat tokens through the stream. Clients should respond by calling the heartbeat endpoint to keep the connection alive:
POST /v1/jobs/sync/heartbeat
Content-Type: application/json
"<heartbeat-token>"
Job States
Monitor job progress through these states:
PENDING
: Job accepted, waiting to startSTARTING
: Job transitioning to running stateRUNNING
: Job actively processing dataFINISHED
: Job completed successfullyFAILED
: Job encountered an errorCANCELLED
: Job was cancelled by user
Error Handling
Handle these common HTTP status codes:
200/201/202
: Success400
: Invalid request (malformed job graph, invalid parameters)401
: Authentication required409
: Conflict (concurrent update, resource already exists)404
: Job not found
Code Example
JavaScript Example
import ndjsonstream from 'can-ndjson-stream';
const createSyncJob = async (jobPayload, token) => {
const response = await fetch('https://<your_org_id>.app.grepr.ai/api/v1/jobs/sync', {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
'Accept': 'application/x-ndjson'
},
body: JSON.stringify(jobPayload)
});
// Use ndjsonstream to parse the response
const stream = ndjsonstream(response.body);
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
// Handle heartbeat tokens
if (value?.jobState === 'HEARTBEAT' && value.heartbeatToken) {
// Send heartbeat (don't await to avoid blocking)
fetch('https://<your_org_id>.app.grepr.ai/api/v1/jobs/sync/heartbeat', {
method: 'POST',
headers: { 'Authorization': `Bearer ${token}` },
body: JSON.stringify(value.heartbeatToken)
}).catch(e => console.error('Heartbeat error:', e));
}
// Process data events
if (value?.data) {
console.log('Received log:', value.data);
}
// Check job completion
if (value?.jobState === 'FINISHED') {
console.log('Job completed');
}
if (done) {
break;
}
}
};
Available Operations
Grepr supports a wide variety of sources, transforms, sinks, and special operations for building data processing pipelines. For a complete list of available operations and their configuration options, see the API specification.
Best Practices
- Use appropriate rate limiting for synchronous streaming jobs
- Implement heartbeat handling for long-running synchronous jobs
- Monitor job states and handle failures gracefully
- Use meaningful job names and tags for easier management
- Test with small datasets before running on production data
- Set reasonable limits on batch job result sizes
- Use SQL transforms for complex data transformations
- Leverage templates for reusable pipeline patterns
- Use branching operations to split data flows based on conditions
- Chain operations logically - filter before processing, parse before mapping
For more information, see the API specification for complete endpoint documentation and operation details.