Subscribe to View
Try it
POST
/v0/pipelines/{pipeline_name}/egress/{table_name}Subscribe to a stream of updates from a SQL view or table.
The pipeline responds with a continuous stream of changes to the specified table or view. The stream is configurable two ways:
-
Simple configuration of the format may be provided using query parameters. Use
formatto specifycsvorjsonoutput and, forjsononly,arrayto specify whether to group updates into JSON arrays. Specifybackpressureto specify behavior when the HTTP client cannot keep up. -
Comprehensive configuration may be provided by providing a connector configuration as a JSON body. In this case, no query parameters are allowed.
Updates are split into Chunks.
The pipeline continues sending updates until the client closes the connection or the pipeline is stopped.
Authentication
JSON web token (JWT) or API keyBearer token
Parameters
Path parameters
| Name | Type | Required | Description |
|---|---|---|---|
pipeline_name | string | Yes | Unique pipeline name |
table_name | string | Yes | SQL table name. Unquoted SQL names have to be capitalized. Quoted SQL names have to exactly match the case from the SQL program. |
Query parameters
| Name | Type | Required | Description |
|---|---|---|---|
format | string | Yes | Output data format, either 'csv' or 'json'. |
array | boolean | No | Set to `true` to group updates in this stream into JSON arrays (used in conjunction with `format=json`). The default value is `false` |
backpressure | boolean | No | Apply backpressure on the pipeline when the HTTP client cannot receive data fast enough. When this flag is set to false (the default), the HTTP connector drops data chunks if the client is not keeping up with its output. This prevents a slow HTTP client from slowing down the entire pipeline. When the flag is set to true, the connector waits for the client to receive each chunk and blocks the pipeline if the client cannot keep up. |
Response
200Connection to the endpoint successfully established. The body of the response contains a stream of data chunks.
application/json- objectA set of updates to a SQL table or view. The `sequence_number` field stores the offset of the chunk relative to the start of the stream and can be used to implement reliable delivery. The payload is stored in the `bin_data`, `text_data`, or `json_data` field depending on the data format used.
bin_datastring (binary)Base64 encoded binary payload, e.g., bincode.json_dataobjectJSON payload.sequence_numberinteger (int64)requiredtext_datastringText payload, e.g., CSV.
400
application/json- objectInformation returned by REST API endpoints on error.
detailsobjectrequiredDetailed error metadata. The contents of this field is determined by `error_code`.error_codestringrequiredError code is a string that specifies this error type.messagestringrequiredHuman-readable error message.
404Pipeline and/or table/view with that name does not exist
application/json- objectInformation returned by REST API endpoints on error.
detailsobjectrequiredDetailed error metadata. The contents of this field is determined by `error_code`.error_codestringrequiredError code is a string that specifies this error type.messagestringrequiredHuman-readable error message.
500
application/json- objectInformation returned by REST API endpoints on error.
detailsobjectrequiredDetailed error metadata. The contents of this field is determined by `error_code`.error_codestringrequiredError code is a string that specifies this error type.messagestringrequiredHuman-readable error message.
503
application/json- objectInformation returned by REST API endpoints on error.
detailsobjectrequiredDetailed error metadata. The contents of this field is determined by `error_code`.error_codestringrequiredError code is a string that specifies this error type.messagestringrequiredHuman-readable error message.
curl -X POST 'https://api.example.com/v0/pipelines/{pipeline_name}/egress/{table_name}?format=<format>&array=<array>&backpressure=<backpressure>' \
-H 'Authorization: Bearer YOUR_TOKEN'const response = await fetch('https://api.example.com/v0/pipelines/{pipeline_name}/egress/{table_name}?format=<format>&array=<array>&backpressure=<backpressure>', {
method: 'POST',
headers: {
'Authorization': 'Bearer YOUR_TOKEN'
}
});
const data = await response.json();
console.log(data);interface ApiResponse {
// shape your response here
}
const response: Response = await fetch('https://api.example.com/v0/pipelines/{pipeline_name}/egress/{table_name}?format=<format>&array=<array>&backpressure=<backpressure>', {
method: 'POST',
headers: {
'Authorization': 'Bearer YOUR_TOKEN'
}
});
const data = (await response.json()) as ApiResponse;
console.log(data);import requests
url = "https://api.example.com/v0/pipelines/{pipeline_name}/egress/{table_name}"
params = {
"format": "<format>",
"array": "<array>",
"backpressure": "<backpressure>"
}
headers = {
"Authorization": "Bearer YOUR_TOKEN"
}
response = requests.request("post", url, params=params, headers=headers)
print(response.json())package main
import (
"fmt"
"io"
"net/http"
)
func main() {
req, err := http.NewRequest("POST", "https://api.example.com/v0/pipelines/{pipeline_name}/egress/{table_name}?format=<format>&array=<array>&backpressure=<backpressure>", nil)
if err != nil {
panic(err)
}
req.Header.Set("Authorization", "Bearer YOUR_TOKEN")
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
out, _ := io.ReadAll(resp.Body)
fmt.Println(string(out))
}