Create Pipeline
Try it
POST
/v0/pipelinesCreate a new pipeline with the provided configuration.
Authentication
JSON web token (JWT) or API keyBearer token
Request body
Content type: application/json
- objectCreate a new pipeline (POST), or fully update an existing pipeline (PUT). Fields which are optional and not provided will be set to their empty type value (for strings: an empty string `""`, for objects: an empty dictionary `{}`).
descriptionstringnamestringrequiredprogram_codestringrequiredprogram_configobjectruntime_configobjectudf_ruststringudf_tomlstring
Response
201Pipeline successfully created
application/json- objectPipeline information. It both includes fields which are user-provided and system-generated.
created_atstring (date-time)requireddeployment_desired_statusstringrequireddeployment_desired_status_sincestring (date-time)requireddeployment_errorobjectdeployment_idstring (uuid)deployment_initialobjectdeployment_resources_desired_statusstringrequireddeployment_resources_desired_status_sincestring (date-time)requireddeployment_resources_statusstringrequiredPipeline resources status. ```text /start (early start failed) ┌───────────────────┐ │ ▼ Stopped ◄────────── Stopping /start │ ▲ │ │ /stop?force=true │ │ OR: timeout (from Provisioning) ▼ │ OR: fatal runtime or resource error ⌛Provisioning ────────────│ OR: runtime status is Suspended │ │ │ │ ▼ │ Provisioned ─────────────┘ ``` ### Desired and actual status We use the desired state model to manage the lifecycle of a pipeline. In this model, the pipeline has two status attributes associated with it: the **desired** status, which represents what the user would like the pipeline to do, and the **current** status, which represents the actual (last observed) status of the pipeline. The pipeline runner service continuously monitors the desired status field to decide where to steer the pipeline towards. There are two desired statuses: - `Provisioned` (set by invoking `/start`) - `Stopped` (set by invoking `/stop?force=true`) The user can monitor the current status of the pipeline via the `GET /v0/pipelines/{name}` endpoint. In a typical scenario, the user first sets the desired status, e.g., by invoking the `/start` endpoint, and then polls the `GET /v0/pipelines/{name}` endpoint to monitor the actual status of the pipeline until its `deployment_resources_status` attribute changes to `Provisioned` indicating that the pipeline has been successfully provisioned, or `Stopped` with `deployment_error` being set.deployment_resources_status_detailsobjectdeployment_resources_status_sincestring (date-time)requireddeployment_runtime_desired_statusobjectdeployment_runtime_desired_status_sincestring (date-time)deployment_runtime_statusobjectdeployment_runtime_status_detailsobjectdeployment_runtime_status_sincestring (date-time)deployment_statusstringrequireddeployment_status_sincestring (date-time)requireddescriptionstringrequiredidstring (uuid)requiredPipeline identifier.namestringrequiredplatform_versionstringrequiredprogram_codestringrequiredprogram_configobjectrequiredProgram configuration.program_errorobjectrequiredLog, warning and error information about the program compilation.program_infoobjectprogram_statusstringrequiredProgram compilation status.program_status_sincestring (date-time)requiredprogram_versioninteger (int64)requiredVersion number.refresh_versioninteger (int64)requiredVersion number.runtime_configobjectrequiredGlobal pipeline configuration settings. This is the publicly exposed type for users to configure pipelines.storage_statusstringrequiredStorage status. The storage status can only transition when the resources status is `Stopped`. ```text Cleared ───┐ ▲ │ /clear │ │ │ │ Clearing │ ▲ │ │ │ InUse ◄───┘ ```storage_status_detailsobjectudf_ruststringrequiredudf_tomlstringrequiredversioninteger (int64)requiredVersion number.
400
application/json- objectInformation returned by REST API endpoints on error.
detailsobjectrequiredDetailed error metadata. The contents of this field is determined by `error_code`.error_codestringrequiredError code is a string that specifies this error type.messagestringrequiredHuman-readable error message.
409Cannot create pipeline as the name already exists
application/json- objectInformation returned by REST API endpoints on error.
detailsobjectrequiredDetailed error metadata. The contents of this field is determined by `error_code`.error_codestringrequiredError code is a string that specifies this error type.messagestringrequiredHuman-readable error message.
500
application/json- objectInformation returned by REST API endpoints on error.
detailsobjectrequiredDetailed error metadata. The contents of this field is determined by `error_code`.error_codestringrequiredError code is a string that specifies this error type.messagestringrequiredHuman-readable error message.
curl -X POST 'https://api.example.com/v0/pipelines' \
-H 'Authorization: Bearer YOUR_TOKEN' \
-H 'Content-Type: application/json' \
-d '{
"name": "example1",
"description": "Description of the pipeline example1",
"runtime_config": {
"workers": 16,
"max_rss_mb": null,
"hosts": 1,
"storage": {
"backend": {
"name": "default"
},
"min_storage_bytes": null,
"min_step_storage_bytes": null,
"compression": "default",
"cache_mib": null
},
"fault_tolerance": {
"model": "none",
"checkpoint_interval_secs": 60
},
"cpu_profiler": true,
"tracing": false,
"tracing_endpoint_jaeger": "",
"min_batch_size_records": 0,
"max_buffering_delay_usecs": 0,
"resources": {
"cpu_cores_min": null,
"cpu_cores_max": null,
"memory_mb_min": null,
"memory_mb_max": null,
"storage_mb_max": null,
"storage_class": null,
"service_account_name": null,
"namespace": null
},
"clock_resolution_usecs": 1000000,
"pin_cpus": [],
"provisioning_timeout_secs": null,
"max_parallel_connector_init": null,
"init_containers": null,
"checkpoint_during_suspend": true,
"http_workers": null,
"io_workers": null,
"env": {},
"dev_tweaks": {},
"logging": null,
"pipeline_template_configmap": null
},
"program_code": "CREATE TABLE table1 ( col1 INT );",
"udf_rust": null,
"udf_toml": null,
"program_config": {
"profile": "optimized",
"cache": true,
"runtime_version": null
}
}'const response = await fetch('https://api.example.com/v0/pipelines', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_TOKEN'
},
body: JSON.stringify({
"name": "example1",
"description": "Description of the pipeline example1",
"runtime_config": {
"workers": 16,
"max_rss_mb": null,
"hosts": 1,
"storage": {
"backend": {
"name": "default"
},
"min_storage_bytes": null,
"min_step_storage_bytes": null,
"compression": "default",
"cache_mib": null
},
"fault_tolerance": {
"model": "none",
"checkpoint_interval_secs": 60
},
"cpu_profiler": true,
"tracing": false,
"tracing_endpoint_jaeger": "",
"min_batch_size_records": 0,
"max_buffering_delay_usecs": 0,
"resources": {
"cpu_cores_min": null,
"cpu_cores_max": null,
"memory_mb_min": null,
"memory_mb_max": null,
"storage_mb_max": null,
"storage_class": null,
"service_account_name": null,
"namespace": null
},
"clock_resolution_usecs": 1000000,
"pin_cpus": [],
"provisioning_timeout_secs": null,
"max_parallel_connector_init": null,
"init_containers": null,
"checkpoint_during_suspend": true,
"http_workers": null,
"io_workers": null,
"env": {},
"dev_tweaks": {},
"logging": null,
"pipeline_template_configmap": null
},
"program_code": "CREATE TABLE table1 ( col1 INT );",
"udf_rust": null,
"udf_toml": null,
"program_config": {
"profile": "optimized",
"cache": true,
"runtime_version": null
}
})
});
const data = await response.json();
console.log(data);interface ApiResponse {
// shape your response here
}
const response: Response = await fetch('https://api.example.com/v0/pipelines', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_TOKEN'
},
body: JSON.stringify({
"name": "example1",
"description": "Description of the pipeline example1",
"runtime_config": {
"workers": 16,
"max_rss_mb": null,
"hosts": 1,
"storage": {
"backend": {
"name": "default"
},
"min_storage_bytes": null,
"min_step_storage_bytes": null,
"compression": "default",
"cache_mib": null
},
"fault_tolerance": {
"model": "none",
"checkpoint_interval_secs": 60
},
"cpu_profiler": true,
"tracing": false,
"tracing_endpoint_jaeger": "",
"min_batch_size_records": 0,
"max_buffering_delay_usecs": 0,
"resources": {
"cpu_cores_min": null,
"cpu_cores_max": null,
"memory_mb_min": null,
"memory_mb_max": null,
"storage_mb_max": null,
"storage_class": null,
"service_account_name": null,
"namespace": null
},
"clock_resolution_usecs": 1000000,
"pin_cpus": [],
"provisioning_timeout_secs": null,
"max_parallel_connector_init": null,
"init_containers": null,
"checkpoint_during_suspend": true,
"http_workers": null,
"io_workers": null,
"env": {},
"dev_tweaks": {},
"logging": null,
"pipeline_template_configmap": null
},
"program_code": "CREATE TABLE table1 ( col1 INT );",
"udf_rust": null,
"udf_toml": null,
"program_config": {
"profile": "optimized",
"cache": true,
"runtime_version": null
}
})
});
const data = (await response.json()) as ApiResponse;
console.log(data);import requests
url = "https://api.example.com/v0/pipelines"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_TOKEN"
}
payload = {
"name": "example1",
"description": "Description of the pipeline example1",
"runtime_config": {
"workers": 16,
"max_rss_mb": None,
"hosts": 1,
"storage": {
"backend": {
"name": "default"
},
"min_storage_bytes": None,
"min_step_storage_bytes": None,
"compression": "default",
"cache_mib": None
},
"fault_tolerance": {
"model": "none",
"checkpoint_interval_secs": 60
},
"cpu_profiler": True,
"tracing": False,
"tracing_endpoint_jaeger": "",
"min_batch_size_records": 0,
"max_buffering_delay_usecs": 0,
"resources": {
"cpu_cores_min": None,
"cpu_cores_max": None,
"memory_mb_min": None,
"memory_mb_max": None,
"storage_mb_max": None,
"storage_class": None,
"service_account_name": None,
"namespace": None
},
"clock_resolution_usecs": 1000000,
"pin_cpus": [],
"provisioning_timeout_secs": None,
"max_parallel_connector_init": None,
"init_containers": None,
"checkpoint_during_suspend": True,
"http_workers": None,
"io_workers": None,
"env": {},
"dev_tweaks": {},
"logging": None,
"pipeline_template_configmap": None
},
"program_code": "CREATE TABLE table1 ( col1 INT );",
"udf_rust": None,
"udf_toml": None,
"program_config": {
"profile": "optimized",
"cache": True,
"runtime_version": None
}
}
response = requests.request("post", url, headers=headers, json=payload)
print(response.json())package main
import (
"fmt"
"io"
"net/http"
"strings"
)
func main() {
body := strings.NewReader(`{
"name": "example1",
"description": "Description of the pipeline example1",
"runtime_config": {
"workers": 16,
"max_rss_mb": null,
"hosts": 1,
"storage": {
"backend": {
"name": "default"
},
"min_storage_bytes": null,
"min_step_storage_bytes": null,
"compression": "default",
"cache_mib": null
},
"fault_tolerance": {
"model": "none",
"checkpoint_interval_secs": 60
},
"cpu_profiler": true,
"tracing": false,
"tracing_endpoint_jaeger": "",
"min_batch_size_records": 0,
"max_buffering_delay_usecs": 0,
"resources": {
"cpu_cores_min": null,
"cpu_cores_max": null,
"memory_mb_min": null,
"memory_mb_max": null,
"storage_mb_max": null,
"storage_class": null,
"service_account_name": null,
"namespace": null
},
"clock_resolution_usecs": 1000000,
"pin_cpus": [],
"provisioning_timeout_secs": null,
"max_parallel_connector_init": null,
"init_containers": null,
"checkpoint_during_suspend": true,
"http_workers": null,
"io_workers": null,
"env": {},
"dev_tweaks": {},
"logging": null,
"pipeline_template_configmap": null
},
"program_code": "CREATE TABLE table1 ( col1 INT );",
"udf_rust": null,
"udf_toml": null,
"program_config": {
"profile": "optimized",
"cache": true,
"runtime_version": null
}
}`)
req, err := http.NewRequest("POST", "https://api.example.com/v0/pipelines", body)
if err != nil {
panic(err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer YOUR_TOKEN")
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
out, _ := io.ReadAll(resp.Body)
fmt.Println(string(out))
}{
"id": "67e55044-10b1-426f-9247-bb680e5fe0c8",
"name": "example1",
"description": "Description of the pipeline example1",
"created_at": "1970-01-01T00:00:00Z",
"version": 4,
"platform_version": "v0",
"runtime_config": {
"workers": 16,
"max_rss_mb": null,
"hosts": 1,
"storage": {
"backend": {
"name": "default"
},
"min_storage_bytes": null,
"min_step_storage_bytes": null,
"compression": "default",
"cache_mib": null
},
"fault_tolerance": {
"model": "none",
"checkpoint_interval_secs": 60
},
"cpu_profiler": true,
"tracing": false,
"tracing_endpoint_jaeger": "",
"min_batch_size_records": 0,
"max_buffering_delay_usecs": 0,
"resources": {
"cpu_cores_min": null,
"cpu_cores_max": null,
"memory_mb_min": null,
"memory_mb_max": null,
"storage_mb_max": null,
"storage_class": null,
"service_account_name": null,
"namespace": null
},
"clock_resolution_usecs": 1000000,
"pin_cpus": [],
"provisioning_timeout_secs": null,
"max_parallel_connector_init": null,
"init_containers": null,
"checkpoint_during_suspend": true,
"http_workers": null,
"io_workers": null,
"env": {},
"dev_tweaks": {},
"logging": null,
"pipeline_template_configmap": null
},
"program_code": "CREATE TABLE table1 ( col1 INT );",
"udf_rust": "",
"udf_toml": "",
"program_config": {
"profile": "optimized",
"cache": true,
"runtime_version": null
},
"program_version": 2,
"program_status": "Pending",
"program_status_since": "1970-01-01T00:00:00Z",
"program_error": {
"sql_compilation": null,
"rust_compilation": null,
"system_error": null
},
"program_info": null,
"deployment_error": null,
"refresh_version": 4,
"storage_status": "Cleared",
"storage_status_details": null,
"deployment_id": null,
"deployment_initial": null,
"deployment_status": "Stopped",
"deployment_status_since": "1970-01-01T00:00:00Z",
"deployment_desired_status": "Stopped",
"deployment_desired_status_since": "1970-01-01T00:00:00Z",
"deployment_resources_status": "Stopped",
"deployment_resources_status_details": null,
"deployment_resources_status_since": "1970-01-01T00:00:00Z",
"deployment_resources_desired_status": "Stopped",
"deployment_resources_desired_status_since": "1970-01-01T00:00:00Z",
"deployment_runtime_status": null,
"deployment_runtime_status_details": null,
"deployment_runtime_status_since": null,
"deployment_runtime_desired_status": null,
"deployment_runtime_desired_status_since": null
}{
"message": "An entity with this name already exists",
"error_code": "DuplicateName",
"details": null
}