Skip to main content

NoETL Workflow Tasks Guide

This guide provides detailed information about the available tasks in NoETL and their parameters.

Overview

NoETL provides a variety of built-in tasks for common operations, such as HTTP requests, database operations, file operations, data transformations, and notifications. Tasks are used in the workflow section of a playbook to define the steps of the workflow.

Task Structure

A task step in a NoETL playbook has the following structure:

- step: step_name
call:
name: step_name
type: task_type
args:
param1: value1
param2: value2
next:
- step: next_step
  • step: A unique name for the step
  • call: Indicates this is a task call
  • name: The name of the task
  • type: The type of task to perform (e.g., http, secret, postgres, duckdb, workbook, playbook)
  • args: Parameters for the task
  • next: The next step to execute after this task

HTTP Tasks

HTTP tasks allow you to interact with web APIs and services.

http.get

Fetches data from a URL using the HTTP GET method.

Parameters:

  • url (string, required): The URL to fetch data from
  • params (object, optional): Query parameters to include in the URL
  • headers (object, optional): HTTP headers to include in the request
  • timeout (number, optional): Request timeout in seconds (default: 30)
  • verify (boolean, optional): Whether to verify SSL certificates (default: true)

Example:

- step: fetch_data
call:
name: fetch_data
type: http
method: GET
endpoint: "https://api.example.com/data"
params:
id: "{{ workload.id }}"
headers:
Authorization: "Bearer {{ workload.api_key }}"
timeout: 60
verify: true
next:
- step: process_data

http.post

Sends data to a URL using the HTTP POST method.

Parameters:

  • url (string, required): The URL to send data to
  • data (object/string, optional): Data to include in the request body
  • json (object, optional): JSON data to include in the request body
  • params (object, optional): Query parameters to include in the URL
  • headers (object, optional): HTTP headers to include in the request
  • timeout (number, optional): Request timeout in seconds (default: 30)
  • verify (boolean, optional): Whether to verify SSL certificates (default: true)

Example:

- step: create_resource
call:
name: create_resource
type: http
method: POST
endpoint: "https://api.example.com/resources"
json:
name: "{{ workload.resource_name }}"
description: "{{ workload.resource_description }}"
headers:
Authorization: "Bearer {{ workload.api_key }}"
Content-Type: "application/json"
next:
- step: process_response

http.put

Updates data at a URL using the HTTP PUT method.

Parameters:

  • url (string, required): The URL to update data at
  • data (object/string, optional): Data to include in the request body
  • json (object, optional): JSON data to include in the request body
  • params (object, optional): Query parameters to include in the URL
  • headers (object, optional): HTTP headers to include in the request
  • timeout (number, optional): Request timeout in seconds (default: 30)
  • verify (boolean, optional): Whether to verify SSL certificates (default: true)

Example:

- step: update_resource
call:
name: update_resource
type: http
method: PUT
endpoint: "https://api.example.com/resources/{{ workload.resource_id }}"
json:
name: "{{ workload.resource_name }}"
description: "{{ workload.resource_description }}"
headers:
Authorization: "Bearer {{ workload.api_key }}"
Content-Type: "application/json"
next:
- step: process_response

http.delete

Deletes data at a URL using the HTTP DELETE method.

Parameters:

  • url (string, required): The URL to delete data at
  • params (object, optional): Query parameters to include in the URL
  • headers (object, optional): HTTP headers to include in the request
  • timeout (number, optional): Request timeout in seconds (default: 30)
  • verify (boolean, optional): Whether to verify SSL certificates (default: true)

Example:

- step: delete_resource
call:
name: delete_resource
type: http
method: DELETE
endpoint: "https://api.example.com/resources/{{ workload.resource_id }}"
headers:
Authorization: "Bearer {{ workload.api_key }}"
next:
- step: process_response

Database Tasks

Database tasks allow you to interact with databases.

db.query

Executes a SQL query and returns the results.

Parameters:

  • connection (string, required): Database connection string or connection name
  • query (string, required): SQL query to execute
  • params (object/array, optional): Parameters for the query

Example:

- step: query_data
tool:
kind: postgres
auth:
type: postgres
inline:
host: "{{ workload.db_host }}"
port: "{{ workload.db_port }}"
user: "{{ workload.db_user }}"
password: "{{ workload.db_password }}"
database: "{{ workload.db_name }}"
command: |
SELECT * FROM users WHERE id = {{ workload.user_id }}
next:
- step: process_data

db.execute

Executes a SQL statement that doesn't return results (e.g., INSERT, UPDATE, DELETE).

Parameters:

  • connection (string, required): Database connection string or connection name
  • statement (string, required): SQL statement to execute
  • params (object/array, optional): Parameters for the statement

Example:

- step: update_data
tool:
kind: postgres
auth:
type: postgres
inline:
host: "{{ workload.db_host }}"
port: "{{ workload.db_port }}"
user: "{{ workload.db_user }}"
password: "{{ workload.db_password }}"
database: "{{ workload.db_name }}"
command: |
UPDATE users SET name = '{{ workload.user_name }}' WHERE id = {{ workload.user_id }}
next:
- step: process_result

db.insert

Inserts data into a table.

Parameters:

  • connection (string, required): Database connection string or connection name
  • table (string, required): Table to insert data into
  • data (object/array, required): Data to insert

Example:

- step: insert_data
tool:
kind: postgres
auth:
type: postgres
inline:
host: "{{ workload.db_host }}"
port: "{{ workload.db_port }}"
user: "{{ workload.db_user }}"
password: "{{ workload.db_password }}"
database: "{{ workload.db_name }}"
command: |
INSERT INTO users (name, email, created_at)
VALUES ('{{ workload.user_name }}', '{{ workload.user_email }}', NOW())
next:
- step: process_result

db.update

Updates data in a table.

Parameters:

  • connection (string, required): Database connection string or connection name
  • table (string, required): Table to update data in
  • data (object, required): Data to update
  • where (object, required): Conditions for the update

Example:

- step: update_data
tool:
kind: postgres
auth:
type: postgres
inline:
host: "{{ workload.db_host }}"
port: "{{ workload.db_port }}"
user: "{{ workload.db_user }}"
password: "{{ workload.db_password }}"
database: "{{ workload.db_name }}"
command: |
UPDATE users
SET name = '{{ workload.user_name }}', updated_at = NOW()
WHERE id = {{ workload.user_id }}
next:
- step: process_result

db.delete

Deletes data from a table.

Parameters:

  • connection (string, required): Database connection string or connection name
  • table (string, required): Table to delete data from
  • where (object, required): Conditions for the delete

Example:

- step: delete_data
tool:
kind: postgres
auth:
type: postgres
inline:
host: "{{ workload.db_host }}"
port: "{{ workload.db_port }}"
user: "{{ workload.db_user }}"
password: "{{ workload.db_password }}"
database: "{{ workload.db_name }}"
command: |
DELETE FROM users
WHERE id = {{ workload.user_id }}
next:
- step: process_result

File Tasks

File tasks allow you to interact with the file system.

file.read

Reads data from a file.

Parameters:

  • path (string, required): Path to the file
  • format (string, optional): Format of the file (json, yaml, csv, text)
  • encoding (string, optional): Encoding of the file (default: utf-8)

Example:

- step: read_file
call:
name: read_file
type: file
args:
operation: "read"
path: "{{ workload.file_path }}"
format: "json"
encoding: "utf-8"
next:
- step: process_data

file.write

Writes data to a file.

Parameters:

  • path (string, required): Path to the file
  • data (any, required): Data to write
  • format (string, optional): Format of the file (json, yaml, csv, text)
  • encoding (string, optional): Encoding of the file (default: utf-8)
  • mode (string, optional): File mode (w, a, default: w)

Example:

- step: write_file
call:
name: write_file
type: file
args:
operation: "write"
path: "{{ workload.output_path }}"
data: "{{ process_data.result }}"
format: "json"
encoding: "utf-8"
mode: "w"
next:
- step: notify_user

file.append

Appends data to a file.

Parameters:

  • path (string, required): Path to the file
  • data (any, required): Data to append
  • format (string, optional): Format of the file (json, yaml, csv, text)
  • encoding (string, optional): Encoding of the file (default: utf-8)

Example:

- step: append_file
call:
name: append_file
type: file
args:
operation: "append"
path: "{{ workload.log_path }}"
data: "{{ process_data.result }}"
format: "text"
encoding: "utf-8"
next:
- step: notify_user

file.delete

Deletes a file.

Parameters:

  • path (string, required): Path to the file

Example:

- step: delete_file
call:
name: delete_file
type: file
args:
operation: "delete"
path: "{{ workload.temp_file_path }}"
next:
- step: notify_user

Data Transformation Tasks

Data transformation tasks allow you to transform data.

transform.map

Applies a function to each item in a list.

Parameters:

  • data (array, required): List of items to transform
  • function (string, required): Function to apply to each item

Example:

transform_data:
tool:
kind: python
data:
items: "{{ fetch_data.response.json.items }}"
code: |
def main(items):
return [{"price": item["price"] * 1.1, **{k: v for k, v in item.items() if k != "price"}} for item in items]
next: save_data

transform.filter

Filters items in a list.

Parameters:

  • data (array, required): List of items to filter
  • condition (string, required): Condition to filter by

Example:

filter_data:
tool:
kind: python
data:
items: "{{ fetch_data.response.json.items }}"
min_price: 10
code: |
def main(data, min_price):
return [item for item in data if item["price"] > min_price]
next: save_data

transform.reduce

Reduces a list to a single value.

Parameters:

  • data (array, required): List of items to reduce
  • function (string, required): Function to apply to each item
  • initial (any, optional): Initial value for the reduction

Example:

calculate_total:
call:
name: calculate_total
type: python
args:
data: "{{ fetch_data.response.json.items }}"
code: |
def main(data):
return sum(item["price"] for item in data)
next: save_data

transform.merge

Merges multiple objects or lists.

Parameters:

  • objects (array, required): Objects or lists to merge

Example:

merge_data:
call:
name: merge_data
type: python
args:
data1: "{{ fetch_data_1.response.json }}"
data2: "{{ fetch_data_2.response.json }}"
code: |
def main(data1, data2):
if isinstance(data1, list) and isinstance(data2, list):
return data1 + data2
elif isinstance(data1, dict) and isinstance(data2, dict):
return {**data1, **data2}
else:
return [data1, data2]
next: save_data

Notification Tasks

Notification tasks allow you to send notifications.

notify.email

Sends an email.

Parameters:

  • to (string/array, required): Recipient email address(es)
  • subject (string, required): Email subject
  • body (string, required): Email body
  • from (string, optional): Sender email address
  • cc (string/array, optional): CC email address(es)
  • bcc (string/array, optional): BCC email address(es)
  • attachments (array, optional): Attachments

Example:

send_email:
call:
name: send_email
type: email
args:
to: "{{ workload.user_email }}"
subject: "Data Processing Complete"
body: "Your data has been processed successfully."
from: "noreply@example.com"
cc: "support@example.com"
attachments:
- path: "{{ workload.output_path }}"
name: "processed_data.json"
next: end

notify.slack

Sends a Slack message.

Parameters:

  • webhook (string, required): Slack webhook URL
  • message (string, required): Message to send
  • channel (string, optional): Channel to send the message to
  • username (string, optional): Username to send the message as
  • icon_emoji (string, optional): Emoji to use as the icon
  • attachments (array, optional): Slack message attachments

Example:

send_slack:
call:
name: send_slack
type: slack
args:
webhook: "{{ workload.slack_webhook }}"
message: "Data processing complete"
channel: "#notifications"
username: "NoETL Bot"
icon_emoji: ":robot_face:"
attachments:
- title: "Processing Results"
text: "{{ process_data.result }}"
color: "good"
next: end

notify.webhook

Sends a webhook notification.

Parameters:

  • url (string, required): Webhook URL
  • data (object, required): Data to send
  • method (string, optional): HTTP method (default: POST)
  • headers (object, optional): HTTP headers

Example:

send_webhook:
call:
name: send_webhook
type: http
method: POST
endpoint: "{{ workload.webhook_url }}"
json:
event: "data_processing_complete"
result: "{{ process_data.result }}"
headers:
Content-Type: "application/json"
Authorization: "Bearer {{ workload.webhook_token }}"
next: end

Console Tasks

Console tasks allow you to print messages to the console.

console.print

Prints a message to the console.

Parameters:

  • message (string, required): Message to print

Example:

print_message:
call:
name: print_message
type: python
args:
user_name: "{{ workload.user_name }}"
code: |
def main(user_name):
message = f"Processing data for {user_name}"
print(message)
return {"message": message}
next: process_data

Next Steps