SPEC-002: Standard Batch Processing API
This document provides details on the Standard Batch Processing API, which is used to trigger asynchronous batch processing tasks executed by another application.
Info
- GitLab: Standard Batch Processing API
- Confluence: Standard Batch Processing API
Topic
Kafka topic pattern for Batch Processing commands:
<tenant-identifier>.<domain>.batch-processing.commands
Transition phase
We are transitioning from <customer-name> to <tenant-identifier>. Legacy integrations continue to use <customer-name>, while new integrations should use <tenant-identifier>.
| Application | Domain | Example Topic |
|---|---|---|
| AI Platform | ai-platform | acme-corp.ai-platform.batch-processing.commands |
| Connect | connect | acme-corp.connect.batch-processing.commands |
| Plan | supply-chain | acme-corp.supply-chain.batch-processing.commands |
Message key: meta.correlation_id - links related messages together.
Environment: Controlled by Kafka cluster selection (dev/qa/test/prod), not included in topic name.
See also: Streaming Conventions for allowed domains and naming standards.
JSON Schema
Loading ....
Source
Click to expand...
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "This schema validates the JSON payloads for messages sent via Kafka under the Standard Batch Processing API, which is designed for asynchronous batch processing tasks initiated by one application and executed by another.",
"type": "object",
"properties": {
"meta": {
"type": "object",
"description": "Contains metadata for the message, including identification and source application details.",
"properties": {
"idempotency_key": {
"type": "string",
"description": "A UUID used to prevent processing the same message multiple times. Ensures idempotence in message processing.",
"pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
"examples": [
"f36fe015-8e20-4213-8956-5e7d88e52c8c"
]
},
"correlation_id": {
"type": "string",
"description": "A UUID that correlates related messages for the same batch job, ensuring traceability across systems.",
"pattern": "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
"examples": [
"a5af482c-e17b-49e2-aa59-997268a1f420"
]
},
"source": {
"type": "object",
"description": "Details about the source application emitting the message, including application name, version, and the environment type.",
"properties": {
"application": {
"type": "string",
"description": "The name of the application from which the message originates.",
"examples": [
"plan"
]
},
"version": {
"type": "string",
"description": "The version of the application, ideally following semantic versioning.",
"examples": [
"10.1"
]
},
"environment_type": {
"type": "string",
"description": "The type of environment (e.g., development, staging, production) from which the message is sent.",
"examples": [
"production"
]
},
"application_instance": {
"type": "string",
"description": "The name of the particular instance of the application from which the message originates.",
"examples": [
"plan-customerx"
]
}
},
"required": [
"application",
"version",
"environment_type"
]
},
"priority": {
"type": "integer",
"description": "Defines the priority of the job. Lower values indicate higher priority.",
"minimum": 1,
"maximum": 100,
"default": 50,
"examples": [
42
]
},
"labels": {
"type": "array",
"description": "Optional labels that can be used for categorizing or tagging messages in a more flexible manner.",
"items": {
"type": "string"
},
"examples": [
[
"workforce"
]
]
},
"version": {
"type": "string",
"description": "The version of the message schema, using semantic versioning to track changes and compatibility.",
"pattern": "^\\d+\\.\\d+\\.\\d+$",
"examples": [
"1.0.0"
]
}
},
"required": [
"idempotency_key",
"correlation_id",
"source",
"version"
]
},
"data": {
"type": "object",
"description": "The actual data of the message, which varies based on the command type (START or CANCEL).",
"oneOf": [
{
"properties": {
"command": {
"const": "START",
"description": "Indicates that the message is a command to start a batch process."
},
"batch_process": {
"type": "object",
"description": "Details about the batch process to be started, including application ID, process ID, and version.",
"properties": {
"application_id": {
"type": "string",
"description": "Identifier for the application that will execute the batch process.",
"examples": [
"intraday"
]
},
"batch_process_id": {
"type": "string",
"description": "Identifier for the specific batch process to be executed.",
"examples": [
"train"
]
},
"batch_process_version": {
"type": "object",
"description": "The version of the batch process to be executed, following semantic versioning.",
"properties": {
"major": {
"type": "integer",
"examples": [
1
]
},
"minor": {
"type": "integer",
"examples": [
0
]
},
"patch": {
"type": "integer",
"examples": [
123
]
},
"pre_release": {
"type": "string",
"examples": [
"alpha"
]
},
"build": {
"type": "string",
"examples": [
"efba2a07"
]
}
},
"required": [
"major"
]
}
},
"required": [
"application_id",
"batch_process_id"
]
},
"parameters": {
"type": "object",
"description": "Parameters specific to the batch process. The schema for this section is defined by the batch process owner.",
"examples": [
{
"start_date": "2022-03-20",
"end_date": "2023-03-20",
"use_super_awesome_calculation_parameter": true,
"scope": {
"oneOf": [
{
"$ref": "#/properties/data/oneOf/0/properties/inputs/items/oneOf/0"
},
{
"$ref": "#/properties/data/oneOf/0/properties/inputs/items/oneOf/1"
},
{
"$ref": "#/properties/data/oneOf/0/properties/inputs/items/oneOf/2"
}
]
}
}
]
},
"inputs": {
"type": "array",
"description": "Defines the input data sets for the batch process, such as files in storage or database queries.",
"items": {
"oneOf": [
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Azure Blob Storage directory resource schema.",
"type": "object",
"properties": {
"type": {
"const": "ABS_DIRECTORY",
"description": "Indicates the data source/destination is a **path** Azure Blob Storage."
},
"storage_account": {
"type": "string",
"description": "The storage account where the data is stored.",
"examples": [
"foocustomeraccount"
]
},
"container": {
"type": "string",
"description": "The storage container where the data is stored.",
"examples": [
"data"
]
},
"directory": {
"type": "string",
"description": "The specific directory to store/fetch the files.",
"examples": [
"input"
]
},
"primary_resource": {
"type": "string",
"description": "A primary identifier for the type of resource being used.",
"examples": [
"intraday_profiles"
]
},
"schema_path": {
"type": "string",
"description": "Path to the file specifying the schema of the exported data.",
"examples": [
"schema/generic_export/example-pr-loc/1.json"
]
},
"data_format": {
"type": "string",
"description": "Format of the exported data.",
"enum": [
"parquet",
"csv",
"json"
],
"examples": [
"csv"
]
},
"attributes": {
"type": "object",
"description": "Additional attributes related to the data.",
"examples": [
{
"attribute1": "value1"
}
]
}
},
"required": [
"type",
"storage_account",
"container",
"directory",
"primary_resource"
]
},
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Azure Blob Storage file resource schema.",
"type": "object",
"properties": {
"type": {
"const": "ABS_FILES",
"description": "Indicates the data source/destination is **set of files** in Azure Blob Storage."
},
"storage_account": {
"type": "string",
"description": "The storage account where the data is located.",
"examples": [
"foocustomeraccount"
]
},
"container": {
"type": "string",
"description": "The storage container where the data is located.",
"examples": [
"data"
]
},
"paths": {
"type": "array",
"description": "The specific paths to the files.",
"items": {
"type": "string"
},
"examples": [
"batchprocessing/6304c48a-0929-4764-88bd-d8a32fa7a795/campaigns.csv"
]
},
"paths_with_metadata": {
"type": "array",
"description": "Paths to the files along with additional file specific attributes.",
"items": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "File path.",
"examples": [
"batchprocessing/6304c48a-0929-4764-88bd-d8a32fa7a795/campaigns.csv"
]
},
"exported_at": {
"type": "string",
"description": "Time at which the file was created in UTC and ISO 8601 format.",
"examples": [
"2025-01-14T00:55:31.820Z"
]
},
"ordering_key": {
"type": "string",
"description": "Specifies the position of the current element in the list according to natural ordering (ascending order for numeric values and lexicographic for others).",
"examples": [
"1",
"2025-01-14T00:55:31.820Z"
]
}
},
"required": [
"path",
"exported_at",
"ordering_key"
]
}
},
"primary_resource": {
"type": "string",
"description": "A primary identifier for the type of resource being used.",
"examples": [
"campaigns"
]
},
"schema_path": {
"type": "string",
"description": "Path to the file specifying the schema of the exported data.",
"examples": [
"schema/generic_export/example-pr-loc/1.json"
]
},
"data_format": {
"type": "string",
"description": "Format of the exported data.",
"enum": [
"parquet",
"csv",
"json"
],
"examples": [
"csv"
]
},
"attributes": {
"type": "object",
"description": "Additional attributes related to the data.",
"examples": [
{
"attribute1": "value1"
}
]
}
},
"required": [
"type",
"storage_account",
"container",
"paths",
"primary_resource"
]
},
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Schema for Snowflake data resource.",
"type": "object",
"properties": {
"type": {
"const": "SNOWFLAKE_TABLE",
"description": "Indicates the data source/destination is Snowflake."
},
"query": {
"type": "string",
"description": "The SQL query to access data in Snowflake.",
"examples": [
"SELECT location_code, date, opening_hour, closing_hour FROM location_opening_hours"
]
},
"primary_resource": {
"type": "string",
"description": "A primary identifier for the type of resource used.",
"examples": [
"opening_hours"
]
},
"attributes": {
"type": "object",
"description": "Additional attributes related to the data.",
"examples": [
{
"attribute1": "value1"
}
]
}
},
"required": [
"type",
"query",
"primary_resource"
]
}
]
}
},
"outputs": {
"type": "array",
"description": "Defines where the results of the batch process will be written, such as storage locations or database tables.",
"items": {
"oneOf": [
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Azure Blob Storage directory resource schema.",
"type": "object",
"properties": {
"type": {
"const": "ABS_DIRECTORY",
"description": "Indicates the data source/destination is a **path** Azure Blob Storage."
},
"storage_account": {
"type": "string",
"description": "The storage account where the data is stored.",
"examples": [
"foocustomeraccount"
]
},
"container": {
"type": "string",
"description": "The storage container where the data is stored.",
"examples": [
"data"
]
},
"directory": {
"type": "string",
"description": "The specific directory to store/fetch the files.",
"examples": [
"input"
]
},
"primary_resource": {
"type": "string",
"description": "A primary identifier for the type of resource being used.",
"examples": [
"intraday_profiles"
]
},
"attributes": {
"type": "object",
"description": "Additional attributes related to the data.",
"examples": [
{
"attribute1": "value1"
}
]
}
},
"required": [
"type",
"storage_account",
"container",
"directory",
"primary_resource"
]
},
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Azure Blob Storage file resource schema.",
"type": "object",
"properties": {
"type": {
"const": "ABS_FILES",
"description": "Indicates the data source/destination is **set of files** in Azure Blob Storage."
},
"storage_account": {
"type": "string",
"description": "The storage account where the data is located.",
"examples": [
"foocustomeraccount"
]
},
"container": {
"type": "string",
"description": "The storage container where the data is located.",
"examples": [
"data"
]
},
"paths": {
"type": "array",
"description": "The specific paths to the files.",
"items": {
"type": "string"
},
"examples": [
"batchprocessing/6304c48a-0929-4764-88bd-d8a32fa7a795/campaigns.csv"
]
},
"primary_resource": {
"type": "string",
"description": "A primary identifier for the type of resource being used.",
"examples": [
"campaigns"
]
},
"attributes": {
"type": "object",
"description": "Additional attributes related to the data.",
"examples": [
{
"attribute1": "value1"
}
]
}
},
"required": [
"type",
"storage_account",
"container",
"paths",
"primary_resource"
]
},
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Schema for Snowflake data resource.",
"type": "object",
"properties": {
"type": {
"const": "SNOWFLAKE_TABLE",
"description": "Indicates the data source/destination is Snowflake."
},
"query": {
"type": "string",
"description": "The SQL query to access data in Snowflake.",
"examples": [
"SELECT location_code, date, opening_hour, closing_hour FROM location_opening_hours"
]
},
"primary_resource": {
"type": "string",
"description": "A primary identifier for the type of resource used.",
"examples": [
"opening_hours"
]
},
"attributes": {
"type": "object",
"description": "Additional attributes related to the data.",
"examples": [
{
"attribute1": "value1"
}
]
}
},
"required": [
"type",
"query",
"primary_resource"
]
}
]
}
}
},
"required": [
"command",
"batch_process"
]
},
{
"properties": {
"command": {
"const": "CANCEL",
"description": "Indicates that the message is a command to cancel a batch process."
}
},
"required": [
"command"
]
}
]
}
},
"required": [
"meta",
"data"
]
}
Examples
The following examples demonstrate how to use the API.
Show example of start message
{
"meta": {
"idempotency_key": "f36fe015-8e20-4213-8956-5e7d88e52c8c",
"correlation_id": "a5af482c-e17b-49e2-aa59-997268a1f420",
"source": {
"application": "plan",
"version": "10.1.0",
"environment_type": "production",
"application_instance": "plan-customerx"
},
"priority": 42,
"labels": [
"workforce"
],
"version": "1.0.0"
},
"data": {
"command": "START",
"batch_process": {
"application_id": "intraday",
"batch_process_id": "train",
"batch_process_version": {
"major": 1,
"minor": 0,
"patch": 123,
"pre_release": "alpha",
"build": "efba2a07"
}
},
"parameters": {
"start_date": "2022-03-20",
"end_date": "2023-03-20",
"use_super_awesome_calculation_parameter": true,
"scope": [
{
"type": "ABS_DIRECTORY",
"storage_account": "foocustomeraccount",
"container": "data",
"path": "intraday/train/1/6304c48a-0929-4764-88bd-d8a32fa7a795/scope/product_locations",
"primary_resource": "product_locations"
}
]
},
"inputs": [
{
"type": "ABS_FILES",
"storage_account": "foocustomeraccount",
"container": "data",
"paths": [
"batchprocessing/6304c48a-0929-4764-88bd-d8a32fa7a795/campaigns.csv"
],
"primary_resource": "campaigns"
},
{
"type": "SNOWFLAKE_TABLE",
"query": "SELECT location_code, date, opening_hour, closing_hour FROM location_opening_hours",
"primary_resource": "opening_hours"
}
],
"outputs": [
{
"type": "ABS_DIRECTORY",
"storage_account": "foocustomeraccount",
"container": "data",
"directory": "input",
"primary_resource": "intraday_profiles"
},
{
"type": "ABS_DIRECTORY",
"storage_account": "foocustomeraccount",
"container": "data",
"directory": "input",
"primary_resource": "product_locations"
}
]
}
}
Show example of cancel message
{
"meta": {
"idempotency_key": "d23bcf56-2e3b-4d85-8a4e-5bfe5e6e3c45",
"correlation_id": "a5af482c-e17b-49e2-aa59-997268a1f420",
"source": {
"application": "plan",
"version": "10.1.0",
"environment_type": "production",
"application_instance": "plan-customerx"
},
"version": "1.0.0"
},
"data": {
"command": "CANCEL"
}
}