Vars Block Implementation Summary
Status: ✅ FULLY IMPLEMENTED AND TESTED
Implementation Date: December 2025
Test Execution: 508176494351351907 (successful - 4 variables extracted)
REST API: /api/vars/* endpoints available for variable management
Feature Overview
The vars block feature enables declarative extraction of values from step execution results. Variables are automatically stored in the transient database table and become accessible in subsequent workflow steps through template syntax.
Architecture:
- Server: Direct database access via
TransientVarsservice (pool connections) - Workers: REST API access (
/api/vars/{execution_id}) - NO direct database connections - Database:
noetl.transienttable with execution-scoped isolation
Design Decisions
1. Template Syntax: {{ result.field }}
Decision: Use {{ result.field }} for current step result access within vars block.
Rejected Alternatives:
{{ STEP.step_name.data.field }}- Too verbose, adds unnecessary nesting{{ step_name.data.field }}- Requires extra.dataextraction
Rationale:
- Simpler and more intuitive for users
- Consistent with other template namespaces (
workload,vars) - "result" clearly indicates the current step's output
- Reduces cognitive overhead and typing
2. Variable Type: step_result
Decision: Store extracted variables with var_type='step_result' in transient table.
Rationale:
- Aligns with existing database constraint:
CHECK (var_type IN ('user_defined', 'step_result', 'computed', 'iterator_state')) - Clear semantic meaning: variable originated from step execution result
- Distinguishes from future
user_definedvariables (set via API) - Enables filtering and analytics by variable source
3. Direct Step Name References
Decision: Use direct step names without wrapper objects (e.g., {{ step_name.field }} not {{ STEP.step_name.field }}).
Rationale:
- Consistency with existing namespaces:
{{ workload.field }},{{ vars.field }} - Less typing, cleaner YAML playbooks
- Reduces complexity for end users
- Standard Jinja2 pattern - direct object property access
4. Processing Location: Server-Side Orchestrator
Decision: Process vars block in orchestrator.py after step_completed event emission.
Rationale:
- Server has full eval_ctx with all step results
- Centralized variable storage logic
- Worker remains stateless (no local state management)
- Natural event-driven timing: after step completes, before next transitions
5. Result Normalization: Extract .data Field
Decision: Server normalizes step results by extracting .data field when present.
Implementation:
eval_ctx[step_name]contains the step's actual return value, not envelope- If step returns
{"status": "success", "data": {...}}, only{...}is stored - Templates access fields directly:
{{ step_name.field }}not{{ step_name.data.field }}
Rationale:
- Simplifies template expressions for most common case
- Handles both envelope and direct return patterns transparently
- Reduces user confusion about data structure
Implementation Details
Code Location
File: noetl/server/api/run/orchestrator.py
Function: _process_step_vars() (lines 750-834)
Integration: Called from _process_transitions() after step_completed event (~line 1005)
Processing Flow
1. Worker executes step → Reports action_completed
2. Orchestrator processes completion → Emits step_completed event
3. Orchestrator calls _process_step_vars()
a. Extracts vars dict from step definition
b. Renders each template using eval_ctx (with 'result' pointing to current step)
c. Stores rendered variables via TransientVars.set_multiple()
d. Logs success/error for each variable
4. Orchestrator evaluates next transitions
5. Subsequent steps load vars into template context
6. Worker receives args with rendered {{ vars.* }} values
Key Code Implementation
async def _process_step_vars(
execution_id: int,
step_name: str,
step_def: dict,
eval_ctx: dict
) -> None:
"""
Process vars block to extract values from step result.
Args:
execution_id: Current execution identifier
step_name: Name of completed step
step_def: Step definition from playbook
eval_ctx: Template context with all step results
- eval_ctx['result']: Current step's result (normalized)
- eval_ctx['step_name']: Previous steps' results
"""
vars_block = step_def.get("vars")
if not vars_block:
return
logger.info(f"Processing vars block for step '{step_name}'")
# Render templates
env = Environment(loader=BaseLoader())
rendered_vars = {}
for var_name, var_template in vars_block.items():
try:
template = env.from_string(str(var_template))
rendered_value = template.render(eval_ctx)
rendered_vars[var_name] = rendered_value
logger.info(f"✓ Rendered var '{var_name}': {rendered_value}")
except Exception as e:
logger.error(f"✗ Failed to render var '{var_name}': {e}")
# Store in transient
if rendered_vars:
count = await TransientVars.set_multiple(
variables=rendered_vars,
execution_id=execution_id,
var_type="step_result",
source_step=step_name
)
logger.info(f"✓ Stored {count} variables from step '{step_name}'")
REST API Access
Variables are accessed via REST API for external systems and workers.
API Endpoints
Base Path: /api/vars
| Method | Endpoint | Description | Access Tracking |
|---|---|---|---|
| GET | /api/vars/{execution_id} | List all variables with metadata | No (bulk read) |
| GET | /api/vars/{execution_id}/{var_name} | Get single variable | Yes (increments count) |
| POST | /api/vars/{execution_id} | Set/update multiple variables | No |
| DELETE | /api/vars/{execution_id}/{var_name} | Delete variable | No |
Example Usage
Get all variables:
curl http://noetl-server:8080/api/vars/507861119290048685
Response:
{
"execution_id": 507861119290048685,
"variables": {
"user_id": {
"value": 12345,
"type": "step_result",
"source_step": "fetch_user",
"created_at": "2025-12-13T10:00:00Z",
"accessed_at": "2025-12-13T10:01:00Z",
"access_count": 5
}
},
"count": 1
}
Set variables:
curl -X POST http://noetl-server:8080/api/vars/507861119290048685 \
-H "Content-Type: application/json" \
-d '{
"variables": {"config_timeout": 60, "retry_enabled": true},
"var_type": "user_defined",
"source_step": "manual_config"
}'
Worker Access Pattern
Workers must use REST API for variable access:
import httpx
import os
SERVER_URL = os.getenv("NOETL_SERVER_URL", "http://localhost:8080")
async def load_variables(execution_id: int) -> dict:
"""Load all variables via REST API."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{SERVER_URL}/api/vars/{execution_id}")
response.raise_for_status()
data = response.json()
return {name: var["value"] for name, var in data["variables"].items()}
Configuration:
# Set server URL for workers
export NOETL_SERVER_URL="http://noetl-server:8080"
DSL Syntax
Basic Extraction
- step: fetch_data
tool: postgres
query: "SELECT user_id, email FROM users WHERE active = true LIMIT 1"
vars:
user_id: "{{ result[0].user_id }}"
email: "{{ result[0].email }}"
next:
- step: send_notification
Using Extracted Variables
- step: send_notification
tool: http
method: POST
endpoint: "https://api.example.com/notify"
payload:
user_id: "{{ vars.user_id }}"
email: "{{ vars.email }}"
timestamp: "{{ workload.execution_time }}"
Complex Extraction
- step: analyze_data
tool:
kind: python
libs: {}
args: {}
code: |
# Pure Python code - no imports, no def main()
result = {
"status": "success",
"data": {
"users": [
{"id": 123, "name": "Alice"},
{"id": 456, "name": "Bob"}
],
"metadata": {
"count": 2,
"source": "production_db"
}
}
}
vars:
first_user_id: "{{ result.data.users[0].id }}"
first_user_name: "{{ result.data.users[0].name }}"
total_users: "{{ result.data.metadata.count }}"
data_source: "{{ result.data.metadata.source }}"
Template Namespace Reference
| Namespace | Scope | Usage | Example |
|---|---|---|---|
result | Vars block only | Current step's result | {{ result.field }} |
step_name | Entire workflow | Previous step result | {{ fetch_data.users[0] }} |
vars | After definition | Stored variables | {{ vars.user_id }} |
workload | Entire workflow | Global workflow vars | {{ workload.timeout }} |
execution_id | Entire workflow | Execution identifier | {{ execution_id }} |
Database Schema
Table: transient
CREATE TABLE transient (
execution_id BIGINT NOT NULL,
var_name VARCHAR(255) NOT NULL,
var_type VARCHAR(50) NOT NULL CHECK (var_type IN (
'user_defined',
'step_result', -- ✅ Used by vars block
'computed',
'iterator_state'
)),
var_value JSONB NOT NULL,
source_step VARCHAR(255),
access_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
accessed_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (execution_id, var_name)
);
Test Results
Test Playbook: tests/fixtures/playbooks/vars_test/test_vars_block.yaml
Execution ID: 508176494351351907
Result: ✅ SUCCESS
Variables Extracted:
first_user_id: 123
first_email: "alice@example.com"
user_count: 2
data_source: "test_db"
Worker Log Confirmation:
INFO Loaded 4 variables for execution 508176494351351907
Integration with Context Service
File: noetl/server/api/context/service.py
Function: build_rendering_context() (lines 136-151)
async def build_rendering_context(
playbook_data: dict,
extra_context: dict = None
) -> dict:
"""Build template rendering context."""
base_ctx = {
"workload": playbook_data.get("workload", {}),
"vars": {}, # Populated below
# ... other context ...
}
# Load stored variables
execution_id = extra_context.get("execution_id")
if execution_id:
vars_data = await TransientVars.get_all_vars(execution_id)
base_ctx["vars"] = vars_data
logger.info(f"✓ Loaded {len(vars_data)} variables")
return base_ctx
Related Documentation
- Design Document:
docs/variables_feature_design.md- Complete feature design and rationale - DSL Specification:
docs/dsl_spec.md- Vars block syntax and template namespace - Test Playbook:
tests/fixtures/playbooks/vars_test/test_vars_block.yaml- Working example - TransientVars API:
noetl/server/api/context/transient.py- Storage layer implementation
Future Enhancements (Not Implemented)
-
Variable Management API (Phase 2 Task 3):
GET /api/vars/{execution_id}- List all variablesGET /api/vars/{execution_id}/{var_name}- Get specific variablePOST /api/vars/{execution_id}- Set variable manually (user_defined)DELETE /api/vars/{execution_id}/{var_name}- Delete variable
-
Computed Variables (
var_type='computed'):- Variables calculated from expressions
- Example:
counter: "{{ vars.counter + 1 }}"
-
Iterator State Variables (
var_type='iterator_state'):- Loop metadata (index, current_item, etc.)
- Automatic population during iterator execution
Migration Notes
No breaking changes - Feature is additive:
- Existing playbooks continue to work without modification
varsblock is optional- No changes to existing template syntax
- TransientVars table already existed and migrated from
execution_variable
Conclusion
The vars block feature is production-ready and provides a declarative, user-friendly way to extract and reuse values from step results. The design prioritizes simplicity and consistency with existing template patterns, while maintaining clean separation between variable storage (server-side) and execution (worker-side).