HTTP to PostgreSQL Data Transfer
This guide demonstrates various patterns for transferring data from HTTP APIs to PostgreSQL databases.
Working Examples
Complete, tested data transfer playbooks are available in the repository:
Pattern Overview
| Pattern | Complexity | Best For |
|---|---|---|
| Transfer Tool | Low | Simple field mapping, production ETL |
| Python Batch | Medium | Custom transformations |
| Iterator | High | Large datasets, per-record logic |
| Direct SQL | Low | Quick prototypes |
Transfer Tool Pattern
The simplest approach using NoETL's built-in transfer tool:
apiVersion: noetl.io/v1
kind: Playbook
metadata:
name: http_to_postgres_transfer
path: data_transfer/http_to_postgres_transfer
workload:
api_url: "https://jsonplaceholder.typicode.com/posts"
target_table: "public.posts"
workflow:
- step: start
next:
- step: transfer_data
- step: transfer_data
tool: transfer
source:
type: http
url: "{{ workload.api_url }}"
method: GET
target:
type: postgres
auth:
type: postgres
credential: pg_demo
table: "{{ workload.target_table }}"
mode: insert
mapping:
post_id: id
user_id: userId
title: title
body: body
next:
- step: end
- step: end
Advantages:
- No code required
- Declarative field mapping
- Built-in error handling
- Automatic batching
Python Batch Pattern
For custom transformations and validation:
apiVersion: noetl.io/v1
kind: Playbook
metadata:
name: http_to_postgres_python
path: data_transfer/http_to_postgres_python
workload:
api_url: "https://jsonplaceholder.typicode.com/posts"
workflow:
- step: start
next:
- step: fetch_data
- step: fetch_data
tool: http
method: GET
endpoint: "{{ workload.api_url }}"
vars:
posts: "{{ result.data }}"
next:
- step: transform_data
- step: transform_data
tool:
kind: python
code: |
def main(posts):
"""Transform and validate posts data."""
transformed = []
for post in posts:
# Validate required fields
if not post.get('id') or not post.get('title'):
continue
transformed.append({
'post_id': post['id'],
'user_id': post.get('userId', 0),
'title': post['title'][:255], # Truncate if needed
'body': post.get('body', '')[:1000],
'word_count': len(post.get('body', '').split())
})
return {'records': transformed, 'count': len(transformed)}
args:
posts: "{{ vars.posts }}"
vars:
transformed_posts: "{{ result.data.records }}"
next:
- step: insert_data
- step: insert_data
tool:
kind: postgres
auth:
type: postgres
credential: pg_demo
query: |
INSERT INTO public.posts (post_id, user_id, title, body, word_count)
SELECT
(p->>'post_id')::int,
(p->>'user_id')::int,
p->>'title',
p->>'body',
(p->>'word_count')::int
FROM jsonb_array_elements('{{ vars.transformed_posts | tojson }}'::jsonb) p
ON CONFLICT (post_id) DO UPDATE SET
title = EXCLUDED.title,
body = EXCLUDED.body,
word_count = EXCLUDED.word_count
RETURNING post_id;
vars:
inserted_count: "{{ result.data.command_1 | length }}"
next:
- step: end
- step: end
Advantages:
- Full control over transformations
- Custom validation logic
- Dynamic field generation
Loop Pattern
For large datasets or per-record processing:
apiVersion: noetl.io/v2
kind: Playbook
metadata:
name: http_to_postgres_loop
path: data_transfer/http_to_postgres_loop
workload:
api_url: "https://jsonplaceholder.typicode.com/posts"
workflow:
- step: start
next:
- step: fetch_data
- step: fetch_data
tool:
kind: http
method: GET
url: "{{ workload.api_url }}"
vars:
all_posts: "{{ result.data }}"
next:
- step: insert_posts
- step: insert_posts
tool:
kind: postgres
auth:
type: postgres
credential: pg_demo
query: |
INSERT INTO public.posts (post_id, user_id, title, body)
VALUES (
{{ current_post.id }},
{{ current_post.userId }},
'{{ current_post.title | replace("'", "''") }}',
'{{ current_post.body | replace("'", "''") }}'
)
ON CONFLICT (post_id) DO UPDATE SET
title = EXCLUDED.title,
body = EXCLUDED.body;
loop:
in: "{{ vars.all_posts }}"
iterator: current_post
mode: sequential
next:
- step: end
- step: end
Advantages:
- Lower memory footprint
- Per-record error handling
- Progress tracking
Direct SQL Pattern
Quick prototyping with direct JSON insertion:
apiVersion: noetl.io/v1
kind: Playbook
metadata:
name: http_to_postgres_direct
path: data_transfer/http_to_postgres_direct
workload:
api_url: "https://jsonplaceholder.typicode.com/posts"
workflow:
- step: start
next:
- step: fetch_and_insert
- step: fetch_and_insert
tool:
kind: http
method: GET
endpoint: "{{ workload.api_url }}"
vars:
raw_data: "{{ result.data }}"
next:
- step: bulk_insert
- step: bulk_insert
tool:
kind: postgres
auth:
type: postgres
credential: pg_demo
query: |
INSERT INTO public.posts_raw (data, fetched_at)
VALUES (
'{{ vars.raw_data | tojson }}'::jsonb,
NOW()
);
next:
- step: end
- step: end
Advantages:
- Minimal configuration
- Fast prototyping
- Preserves raw data
Paginated API Transfer
For APIs with pagination:
apiVersion: noetl.io/v1
kind: Playbook
metadata:
name: paginated_api_transfer
path: data_transfer/paginated_api
workload:
api_base: "https://api.example.com"
workflow:
- step: start
next:
- step: fetch_all_pages
- step: fetch_all_pages
tool:
kind: http
method: GET
url: "{{ workload.api_base }}/items"
params:
page: 1
per_page: 100
loop:
pagination:
type: response_based
continue_while: "{{ response.data.meta.has_more }}"
next_page:
params:
page: "{{ (response.data.meta.page | int) + 1 }}"
merge_strategy: append
merge_path: data.items
max_iterations: 50
vars:
all_items: "{{ result.data }}"
next:
- step: batch_insert
- step: batch_insert
tool:
kind: python
libs: {}
args:
items: "{{ vars.all_items }}"
batch_size: 1000
code: |
batches = []
for i in range(0, len(items), batch_size):
batches.append(items[i:i+batch_size])
result = {
'batches': batches,
'total_items': len(items),
'batch_count': len(batches)
}
vars:
batches: "{{ result.batches }}"
next:
- step: insert_batches
- step: insert_batches
tool:
kind: postgres
auth:
type: postgres
credential: pg_demo
query: |
INSERT INTO public.items (id, name, value, updated_at)
SELECT
(item->>'id')::int,
item->>'name',
(item->>'value')::numeric,
NOW()
FROM jsonb_array_elements('{{ current_batch | tojson }}'::jsonb) item
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at;
loop:
in: "{{ vars.batches }}"
iterator: current_batch
mode: sequential
next:
- step: end
- step: end
Multi-Database Transfer
Transfer to multiple databases in parallel:
apiVersion: noetl.io/v2
kind: Playbook
metadata:
name: http_to_multi_db
path: data_transfer/multi_db
workload:
api_url: "https://api.example.com/data"
workflow:
- step: start
next:
- step: fetch_data
- step: fetch_data
tool:
kind: http
method: GET
url: "{{ workload.api_url }}"
vars:
data: "{{ result.data }}"
next:
- step: postgres_insert
- step: analytics_insert # Parallel execution
- step: postgres_insert
tool:
kind: postgres
auth:
type: postgres
credential: pg_primary
query: |
INSERT INTO app_data.records (data, source)
VALUES ('{{ vars.data | tojson }}'::jsonb, 'api')
next:
- step: end
- step: analytics_insert
tool: duckdb
auth:
type: gcs
credential: gcp_service_account
query: |
COPY (
SELECT
value->>'id' as id,
value->>'name' as name,
current_timestamp as ingested_at
FROM (
SELECT unnest(from_json('{{ vars.data | tojson }}', '["json"]')) as value
)
) TO 'gs://analytics-bucket/ingested/{{ execution_id }}.parquet' (FORMAT PARQUET);
next:
- step: end
- step: end
Error Handling
Handle API and database errors gracefully:
- step: fetch_data
tool: http
method: GET
endpoint: "{{ workload.api_url }}"
retry:
max_attempts: 3
initial_delay: 1.0
retryable_status_codes: [429, 500, 502, 503]
next:
- when: "{{ fetch_data.status == 'error' }}"
then:
- step: handle_fetch_error
- step: process_data
- step: handle_fetch_error
tool: python
code: |
def main(error):
return {
'status': 'failed',
'error': error,
'action': 'manual_review_required'
}
args:
error: "{{ fetch_data.error }}"
next:
- step: end
Performance Tips
- Batch inserts: Use
jsonb_array_elementsfor bulk operations - Use COPY for large datasets: DuckDB + GCS for high throughput
- Parallel execution: Split data and process concurrently
- Connection pooling: Enable for high-frequency inserts
- Appropriate page sizes: Balance API limits and memory