Skip to main content

PostgreSQL Tool

The PostgreSQL tool executes SQL statements against PostgreSQL databases with support for connection pooling, authentication, and result processing.

Basic Usage

- step: query_users
tool: postgres
auth:
type: postgres
credential: my_pg_creds
query: "SELECT * FROM users WHERE status = 'active'"
next:
- step: process_results

Configuration

Authentication

The PostgreSQL tool uses the unified authentication system:

- step: query_db
tool: postgres
auth:
type: postgres
credential: production_db
query: "SELECT count(*) FROM orders"

Connection Parameters

When credentials are resolved, these connection parameters are available:

ParameterDescription
db_hostDatabase hostname
db_portDatabase port (default: 5432)
db_userDatabase username
db_passwordDatabase password
db_nameDatabase name
db_conn_stringFull connection string (overrides individual params)

SQL Commands

SQL commands can be provided in multiple ways:

Inline Query

- step: simple_query
tool: postgres
auth: { type: postgres, credential: my_db }
query: "SELECT * FROM users LIMIT 10"

Base64 Encoded Commands

- step: encoded_query
tool: postgres
auth: { type: postgres, credential: my_db }
command_b64: "U0VMRUNUICogRlJPTSB1c2VycyBMSU1JVCAxMDs="

Multiple Commands

- step: multi_command
tool: postgres
auth: { type: postgres, credential: my_db }
commands_b64: |
U0VMRUNUICogRlJPTSB1c2VyczsKU0VMRUNUICogRlJPTSBvcmRlcnM7

External Script

Load SQL from external sources (GCS, S3, HTTP, file):

- step: migration
tool: postgres
auth: { type: postgres, credential: my_db }
script:
uri: gs://sql-scripts/migrations/v2.sql
source:
type: gcs
auth: gcp_service_account

Connection Pooling

Configure connection pooling for high-throughput scenarios:

- step: bulk_query
tool: postgres
auth: { type: postgres, credential: my_db }
pool:
min_size: 5
max_size: 20
timeout: 30
query: "SELECT * FROM large_table"

Pool Parameters

ParameterTypeDefaultDescription
min_sizeint1Minimum pool connections
max_sizeint10Maximum pool connections
timeoutint30Connection timeout (seconds)

Template Variables

Use Jinja2 templates in your SQL:

- step: filtered_query
tool: postgres
auth: { type: postgres, credential: my_db }
query: |
SELECT * FROM orders
WHERE customer_id = '{{ workload.customer_id }}'
AND created_at >= '{{ vars.start_date }}'

Keychain Access

Access keychain secrets in queries:

- step: secure_query
tool: postgres
auth: { type: postgres, credential: my_db }
query: |
SELECT * FROM api_logs
WHERE api_key = '{{ keychain.api_key }}'

Response Format

The PostgreSQL tool returns a standardized response:

{
"id": "task-uuid",
"status": "success",
"data": {
"command_1": [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"}
]
}
}

Accessing Results

- step: get_users
tool: postgres
auth: { type: postgres, credential: my_db }
query: "SELECT id, name FROM users"
vars:
first_user: "{{ result.data.command_1[0] }}"
total_users: "{{ result.data.command_1 | length }}"
next:
- step: process_users
args:
users: "{{ get_users.data.command_1 }}"

Examples

Simple SELECT Query

- step: get_active_users
tool: postgres
auth:
type: postgres
credential: app_database
query: |
SELECT id, email, created_at
FROM users
WHERE status = 'active'
ORDER BY created_at DESC
LIMIT 100

INSERT with Returning

- step: create_record
tool: postgres
auth:
type: postgres
credential: app_database
query: |
INSERT INTO audit_logs (action, user_id, details)
VALUES ('{{ workload.action }}', {{ workload.user_id }}, '{{ workload.details | tojson }}')
RETURNING id, created_at
vars:
log_id: "{{ result.data.command_1[0].id }}"

Transaction with Multiple Statements

- step: batch_update
tool: postgres
auth:
type: postgres
credential: app_database
command_b64: |
# Base64 encoded:
# BEGIN;
# UPDATE orders SET status = 'processed' WHERE id = ANY($1);
# INSERT INTO order_history SELECT * FROM orders WHERE id = ANY($1);
# COMMIT;

ETL Pipeline Step

- step: extract_data
tool: postgres
auth:
type: postgres
credential: source_db
query: |
SELECT
id,
customer_name,
order_date,
total_amount
FROM orders
WHERE order_date >= '{{ workload.start_date }}'
AND order_date < '{{ workload.end_date }}'
vars:
extracted_rows: "{{ result.data.command_1 }}"
next:
- step: transform_data

Dynamic Schema Query

- step: get_table_info
tool: postgres
auth:
type: postgres
credential: app_database
query: |
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = '{{ workload.table_name }}'
ORDER BY ordinal_position

Error Handling

The tool captures SQL errors and returns them in the response:

{
"id": "task-uuid",
"status": "error",
"error": "relation \"nonexistent_table\" does not exist",
"data": {}
}

Use conditional routing to handle errors:

- step: risky_query
tool: postgres
auth: { type: postgres, credential: my_db }
query: "SELECT * FROM {{ workload.table_name }}"
next:
- when: "{{ risky_query.status == 'error' }}"
then:
- step: handle_error
- step: process_success

Best Practices

  1. Use parameterized queries: Avoid SQL injection by using template variables carefully
  2. Connection pooling: Enable for high-frequency queries
  3. Transaction management: Group related statements in transactions
  4. Error handling: Always check status in conditional routing
  5. Credential security: Store credentials in keychain, never in playbooks

See Also