SPEC-001: Standard Job Notification API
This document provides details on the Standard Job Notification API, which is used for reporting job statuses and heartbeats.
Info
- GitHub: Standard Job Notification API
- Confluence: Standard Job Notification API
Schema
Loading ....
Source
Click to expand...
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "This schema validates the JSON payloads for messages sent via Kafka under the Standard Job Notification API, which is designed to facilitate asynchronous communication about job statuses and heartbeats across various consumer and provider applications.",
"type": "object",
"properties": {
"meta": {
"type": "object",
"description": "Contains metadata for the message, including identification and source application details.",
"properties": {
"idempotency_key": {
"type": "string",
"description": "A UUID used to ensure that a message, if repeated, will not be processed more than once. This helps in maintaining idempotence in message processing.",
"pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
"examples": [
"123e4567-e89b-12d3-a456-426614174000"
]
},
"correlation_id": {
"type": "string",
"description": "A UUID that helps correlate multiple related messages that may be part of a single transaction or session across the involved services.",
"pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
"examples": [
"987e6543-e21b-32d1-b456-426614174111"
]
},
"source": {
"type": "object",
"description": "Details about the source application emitting the message, including application name, version, and the environment type.",
"properties": {
"application": {
"type": "string",
"description": "The name of the application from which the message originates.",
"examples": [
"JobProcessor"
]
},
"version": {
"type": "string",
"description": "The version of the application, ideally, but not limited to, following semantic versioning.",
"examples": [
"1.4.3"
]
},
"environment_type": {
"type": "string",
"description": "The type of environment (e.g., development, staging, production) from which the message is sent.",
"examples": [
"production"
]
},
"application_instance": {
"type": "string",
"description": "The name of the particular instance of the application from which the message originates.",
"examples": [
"customerx"
]
}
},
"required": [
"application",
"version",
"environment_type"
]
},
"version": {
"type": "string",
"description": "The version of the message schema, using semantic versioning to track changes and compatibility.",
"pattern": "^\\d+\\.\\d+\\.\\d+$",
"examples": [
"1.1.0"
]
},
"labels": {
"type": "array",
"description": "Optional labels that can be used for categorizing or tagging messages in a more flexible manner.",
"items": {
"type": "string",
"examples": [
"job-update"
]
}
}
},
"required": [
"idempotency_key",
"correlation_id",
"source",
"version"
]
},
"data": {
"type": "object",
"description": "The actual data of the message, which varies based on the notification type (status update or heartbeat).",
"oneOf": [
{
"properties": {
"notification_type": {
"const": "STATUS",
"description": "Indicates that the message is a status update about a job.",
"examples": [
"STATUS"
]
},
"description": {
"type": "string",
"description": "A human-readable description of the job or task associated with the message.",
"examples": [
"Processing job for data migration"
]
},
"status": {
"type": "string",
"description": "The current status of the job, which can be one of several predefined states.",
"enum": [
"PENDING",
"DELAYED",
"QUEUED",
"RUNNING",
"CANCELED",
"COMPLETED",
"FAILED",
"PARTIALLY_FAILED"
],
"examples": [
"RUNNING"
]
},
"message": {
"type": "string",
"description": "Additional information or a detailed message about the status, often used for error messages or updates.",
"examples": [
"Job is currently running and processing data."
]
},
"progress": {
"type": "object",
"description": "Details the progress of the job, providing metrics like percentage completed and counts of processed items.",
"properties": {
"percentage_completed": {
"type": "number",
"description": "The percentage of the job that has been completed at the time of the message.",
"examples": [
45.5
]
},
"rows_completed": {
"type": "integer",
"description": "The number of rows or units processed successfully.",
"examples": [
4550
]
},
"rows_ignored": {
"type": "integer",
"description": "The number of rows or units that were ignored or skipped during processing.",
"examples": [
50
]
},
"rows_total": {
"type": "integer",
"description": "The total number of rows or units that the job expects to process.",
"examples": [
10000
]
}
},
"required": [
"percentage_completed"
]
},
"results": {
"type": "array",
"description": "Contains information about the outputs or results of the job, including storage locations and resource identifiers.",
"items": {
"oneOf": [
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Azure Blob Storage directory resource schema.",
"type": "object",
"properties": {
"type": {
"const": "ABS_DIRECTORY",
"description": "Indicates the data source/destination is a **path** Azure Blob Storage."
},
"storage_account": {
"type": "string",
"description": "The storage account where the data is stored.",
"examples": [
"foocustomeraccount"
]
},
"container": {
"type": "string",
"description": "The storage container where the data is stored.",
"examples": [
"data"
]
},
"directory": {
"type": "string",
"description": "The specific directory to store/fetch the files.",
"examples": [
"input"
]
},
"primary_resource": {
"type": "string",
"description": "A primary identifier for the type of resource being used.",
"examples": [
"intraday_profiles"
]
},
"attributes": {
"type": "object",
"description": "Additional attributes related to the data.",
"examples": [
{
"attribute1": "value1"
}
]
}
},
"required": [
"type",
"storage_account",
"container",
"directory",
"primary_resource"
]
},
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Azure Blob Storage file resource schema.",
"type": "object",
"properties": {
"type": {
"const": "ABS_FILES",
"description": "Indicates the data source/destination is **set of files** in Azure Blob Storage."
},
"storage_account": {
"type": "string",
"description": "The storage account where the data is located.",
"examples": [
"foocustomeraccount"
]
},
"container": {
"type": "string",
"description": "The storage container where the data is located.",
"examples": [
"data"
]
},
"paths": {
"type": "array",
"description": "The specific paths to the files.",
"items": {
"type": "string"
},
"examples": [
"batchprocessing/6304c48a-0929-4764-88bd-d8a32fa7a795/campaigns.csv"
]
},
"primary_resource": {
"type": "string",
"description": "A primary identifier for the type of resource being used.",
"examples": [
"campaigns"
]
},
"attributes": {
"type": "object",
"description": "Additional attributes related to the data.",
"examples": [
{
"attribute1": "value1"
}
]
}
},
"required": [
"type",
"storage_account",
"container",
"paths",
"primary_resource"
]
},
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Schema for Snowflake data resource.",
"type": "object",
"properties": {
"type": {
"const": "SNOWFLAKE_TABLE",
"description": "Indicates the data source/destination is Snowflake."
},
"query": {
"type": "string",
"description": "The SQL query to access data in Snowflake.",
"examples": [
"SELECT location_code, date, opening_hour, closing_hour FROM location_opening_hours"
]
},
"primary_resource": {
"type": "string",
"description": "A primary identifier for the type of resource used.",
"examples": [
"opening_hours"
]
},
"attributes": {
"type": "object",
"description": "Additional attributes related to the data.",
"examples": [
{
"attribute1": "value1"
}
]
}
},
"required": [
"type",
"query",
"primary_resource"
]
}
]
}
},
"job_metadata": {
"type": "object",
"description": "Additional metadata about the job that might be used for custom handling or processing logic.",
"examples": [
{}
]
},
"batch_process": {
"type": "object",
"description": "Details about the batch process that has been started, including application ID, process ID, and version.",
"properties": {
"application_id": {
"type": "string",
"description": "Identifier for the application that is being executed the batch process.",
"examples": [
"intraday"
]
},
"batch_process_id": {
"type": "string",
"description": "Identifier for the specific batch process being executed.",
"examples": [
"train"
]
},
"batch_process_version": {
"type": "object",
"description": "The version of the batch process being executed, following semantic versioning.",
"properties": {
"major": {
"type": "integer",
"examples": [
1
]
},
"minor": {
"type": "integer",
"examples": [
0
]
},
"patch": {
"type": "integer",
"examples": [
123
]
},
"pre_release": {
"type": "string",
"examples": [
"alpha"
]
},
"build": {
"type": "string",
"examples": [
"efba2a07"
]
}
},
"required": [
"major"
]
}
},
"required": [
"application_id",
"batch_process_id"
]
}
},
"required": [
"notification_type",
"status"
]
},
{
"properties": {
"notification_type": {
"const": "HEARTBEAT",
"description": "Indicates that the message is a heartbeat, signaling that the job is still active.",
"examples": [
"HEARTBEAT"
]
},
"description": {
"type": "string",
"description": "A brief description of the current state or action of the job relevant to the heartbeat.",
"examples": [
"Heartbeat for ongoing data migration job"
]
},
"heartbeat_deadline_seconds": {
"type": "integer",
"description": "The number of seconds until the next expected heartbeat. If this duration passes without another heartbeat, the job may be considered stalled or failed.",
"minimum": 1,
"examples": [
300
]
}
},
"required": [
"notification_type",
"heartbeat_deadline_seconds"
]
}
]
}
},
"required": [
"meta",
"data"
]
}
Examples
The following examples demonstrate how to use the API.
Show example of status message
{
"meta": {
"idempotency_key": "f36fe015-8e20-4213-8956-5e7d88e52c8c",
"correlation_id": "a5af482c-e17b-49e2-aa59-997268a1f420",
"source": {
"application": "ai-platform",
"version": "0.0.1",
"environment_type": "research"
},
"version": "1.1.0",
"labels": [
"internal",
"cgp"
]
},
"data": {
"notification_type": "STATUS",
"description": "Optimize production plan",
"status": "COMPLETED",
"message": "Successfully ran production plan optimization for 2023-06-06.",
"progress": {
"percentage_completed": 100,
"rows_completed": 1000,
"rows_ignored": 0,
"rows_total": 1000
},
"results": [
{
"type": "ABS_FILES",
"storage_account": "foocustomeraccount",
"container": "data",
"paths": [
"input/csv_file.csv"
],
"primary_resource": "production_plans"
}
],
"job_metadata": {
"custom_foo_data": "bar"
},
"batch_process": {
"application_id": "intraday",
"batch_process_id": "train",
"batch_process_version": {
"major": 1,
"minor": 0,
"patch": 123,
"pre_release": "alpha",
"build": "efba2a07"
}
}
}
}
Show example of heartbeat message
{
"meta": {
"idempotency_key": "d23bcf56-2e3b-4d85-8a4e-5bfe5e6e3c45",
"correlation_id": "f0f1a8b2-72d5-4b4e-a7c3-1c8a5b6a4d3e",
"source": {
"application": "ai-platform",
"version": "0.0.2",
"environment_type": "staging"
},
"version": "1.1.0"
},
"data": {
"notification_type": "HEARTBEAT",
"description": "Data preprocessing task",
"heartbeat_deadline_seconds": 300
}
}