Automation Contexts
Overview
This document explains how automation contexts are constructed, composed, and made available across every execution layer of the FLUX automation application: flows, tasks, scripts, models, and services (email-msg).
Contexts provide a structured way to pass data and configuration throughout the automation pipeline, ensuring that each component has access to the appropriate information needed for execution.
Think of context as a "backpack of information" that moves through the execution. It starts with base device data and gets enriched as the process moves forward. The main idea: each step can reuse what previous steps already discovered.
Base Element Context
Function: element_context_generate(element)
The element context serves as the foundation of every execution. It is generated once per element at the beginning of each task or flow execution step and is passed throughout the entire pipeline.
Source Location: apps/inventory/utils.py → element_context_generate()
Context Structure
The base element context contains the following components:
{
# ── Group Variables ─────────────────────────────────────────────────────────
# One key per GroupVariable configured in any Group assigned to the element.
# Variables with dots in their name are expanded into nested dictionaries.
# Example: "if.description" → context["if"]["description"] = value
"<variable_name>": "<value>",
"<nested.variable>": "<value>", # expanded: context["nested"]["variable"]
# ── Element Settings (ElementSetting) ───────────────────────────────────────
# One key per parameter configured in the element's Profile.
# Key format: [group.variable_root.][instance.]parameter.variable_name
# Example: "net.0.hostname" → context["net"]["0"]["hostname"]
"<parameter_variable_name>": "<value>",
# ── Element Serialization ───────────────────────────────────────────────────
"element": { # Serialized Element (all database fields)
"id": 1,
"name": "router-01",
"uuid": "...",
"status": True, # operational status
"status_change": "...", # last status change datetime
"prev_status_change": "...",
"is_active": True,
"debug": False,
"sync": False,
"reconf": False,
"internal": False,
"location": { # nested — all Location fields
"id": 1,
"name": "DataCenter A",
...
},
"latitude": None,
"longitude": None,
"profile": 3, # Foreign Key ID
"organization": 1, # Foreign Key ID
"is_public": False,
"group": [2, 5], # list of Foreign Key IDs
"certificate": None,
"key": None,
"password": None,
"elevated_priv_password": None,
"management_address": "192.168.1.1",
"management_gateway": None,
"transport_settings": None,
"software_version": "17.3.4",
"hardware_version": None,
"serial_number": "FTX1234ABCD",
"serial_number_alt": None,
"ip_address": None, # Foreign Key ID
"ip_network": None,
"dfgw_address": None,
"dns_servers": None,
"mac_address": None,
"created": "...",
"created_by": 1,
"last_change": "...",
"first_execution": "...",
"last_execution": "...",
"description": "Core router",
"description2": None,
"description3": None,
"description4": None,
"notes": None,
"active_alert": None,
"alert": None,
},
# ── Timestamp ───────────────────────────────────────────────────────────────
"epoch": 1710000000, # UTC timestamp (integer) at context generation time
}
Flow Context Addition: context['flow']
When a task runs inside a flow (via run_task_wrapper), the following key is automatically added before the TaskContext is built:
context['flow'] = {
'user': {
'id': user.id,
'username': user.username,
'full_name': user.get_full_name(),
'email': user.email,
}
# Returns empty dictionary {} if no user is associated with the flow
}
When a user runs the flow, user info is also loaded as real runtime variables:
flow_variables = {'user': None}
flow_variables['user'] = {
'id': user.id,
'username': user.username,
'full_name': user.get_full_name(),
'email': user.email,
}
Task Results Addition: context['tasks']
At the beginning of each task execution within a flow, the task results accumulated so far for the current element are merged into the context:
if self.ctx.flow and self.ctx.flow_context:
self.ctx.context['tasks'] = self.ctx.flow_context[-1]['tasks']
This allows scripts to read results from previously-executed tasks of the same element within the current flow run.
TaskContext Class
The TaskContext groups all execution-time variables for a single task run on a single element.
Source Location: apps/automation/task_utils.py → TaskContext
TaskContext Properties
class TaskContext:
element # Element ORM instance
flow # Flow ORM instance (or None if standalone task)
flow_context # list[dict] — the live flow context list (see Flow Context section)
context # dict — base element context (element_context_generate), modified in-flight
user # User ORM instance (or None)
Important: The context property is mutable throughout the pipeline. Management scripts, processing scripts, and the task runner itself add keys to it as the task progresses.
Flow Context (flow_context)
A Flow runs over one or more elements (devices). The flow_context is a list built progressively as the flow processes elements — one entry per element.
Source Location: apps/automation/flow_utils.py → FlowManager
Per-Element Entry Structure
For each element, the flow creates this structure:
{
"id": element.id, # element.id
"context": element_context_generate(element), # full element_context_generate() result
"tasks": {} # populated after each task finishes
}
Each element in the flow generates an entry with the following structure:
{
"id": 42, # element.id
"context": { # full element_context_generate() result
... # all keys from Base Element Context section
},
"tasks": { # populated after each task finishes
"<task_pk_as_str>": {
# Keys depend on which steps ran:
"management_script": {
"status": "OK",
"result": {...}
},
"process_model": {
"status": "OK",
"result": "<rendered model string>"
},
"model_commands": {
"status": "OK",
"result": "<raw command output>"
},
"processing_model_out": {
"status": "OK",
"result": {...}
},
"element_data_mapped": {
"status": "OK",
"result": {...}
},
"execute_service": {
"status": "OK",
"result": {}
},
},
"<another_task_pk>": { ... },
}
}
The list grows as each element is processed by the flow:
- Serial flows: Process one element at a time
- Parallel flows: Launch all elements concurrently, but the list is still populated per-element
How Flow Context is Used
- The flow knows which element is being processed (
id+context). - The flow keeps task results in
tasksas execution moves forward. - Next flow steps can make decisions using those accumulated results.
Script Contexts
FLUX automation provides five distinct types of scripts, each receiving a different set of variables and serving specific purposes in the automation pipeline.
Flow Pre-Execution Script
Purpose: Runs once before any element loop, used to sort or filter the element list.
Script Type: processing_script on Flow
Source Location: FlowManager.flow_execute_processing_script()
Context Variables
{
"elements": list[Element], # full list of elements that will run the flow
"order_flow_elements": Callable, # sort_flow_elements(elements, sort_fn, reverse=False)
"filter_flow_elements": Callable, # filter_flow_elements(elements, filter_fn, inplace=True)
"settings": {}, # unused output placeholder
}
Output Expectations
- No explicit output expected
- The
elementslist is modified in place
Available Modules
math, json, random, naturaltime, re, ssh_key_generate, time
Flow Execution Script
Purpose: The main flow script that calls run_task() / run_flow() to orchestrate execution.
Script Type: execution_script on Flow
Serial Flow Context
Execution: One invocation per element
Source Location: SerialFlowManager.run_script()
{
"context": dict, # flatten_dictionary(flow_context) — flattened snapshot
"flow_variables": {
"user": {
"id": ...,
"username": ...,
"full_name": ...,
"email": ...
} # or None if no user
},
"element": Element, # current element ORM instance
"elements": list[Element], # parent flow's elements (if nested), else None
"current": int, # index of current element (0-based)
"previous": int, # current - 1
"last": int, # len(elements) - 1
"first": 0,
"settings": {}, # unused
"run_task": Callable, # run_task(id=None, short_name=None, retry_attempts=1, retry_delay=0)
"run_flow": Callable, # run_flow(id=None, short_name=None, elements=None, ...)
}
Key Points:
- The
run_taskcallable executes the task for the current element only - After script execution,
action_statusandaction_infoare read fromscript_contextand stored in the flow log
Magic Replacements: Applied to the script string before execution:
| Placeholder | Replaced With |
|---|---|
current |
str(current_order) |
previous |
str(current_order - 1) |
last |
str(flow_elements_count - 1) |
first |
"0" |
Parallel Flow Context
Execution: One invocation for all elements at once
Source Location: ParallelFlowManager.run_script()
{
"context": dict, # flatten_dictionary(flow_context)
"flow_variables": dict, # same as serial (see above)
"elements": list[Element], # all elements of this flow
"settings": {},
"run_task": Callable, # run_task_in_elements(id=, short_name=, elements=None, ...)
"run_flow": Callable, # run_flow_in_elements(id=, short_name=, elements=None, ...)
"filter_flow_elements": Callable,
}
Key Points:
- The
run_taskcallable spawns one thread per element
Available Modules
math, json, random, naturaltime, re, ssh_key_generate, time, pause
TaskFlow Condition Script
Purpose: Executed before deciding whether to execute a task step for a given element.
Script Type: condition_value on TaskFlow
Source Location: task_flow_execute_condition_script()
Context Variables
{
"flow_context": list[dict], # full flow_context list (see Flow Context section)
"current": int, # current element order
"previous": int, # current - 1
"last": int, # len(elements) - 1
"first": 0,
"settings": {},
}
Output Requirements
The script must assign: condition = True or condition = False
Available Modules
math, json, random, naturaltime, re, ssh_key_generate, time
Task Management Script
Purpose: Runs as Step 1 of the task pipeline, before any connection to elements. Used to enrich the context (e.g., resolve credentials, build payloads, call APIs, wait for element availability).
Script Type: management_script on Task
Source Location: task_execute_script(..., klass='mgmt')
A Task receives element context and can enrich it while it runs. When a task starts inside a flow:
element_context = element_context_generate(element)
element_context['flow'] = flow_variables if flow_variables else {}
Then, right before task execution, previous task results for that same element are injected:
self.ctx.context['tasks'] = self.ctx.flow_context[-1]['tasks']
Context Variables
{
"element": PublicElement, # wraps the Element (restricted write interface)
# only if 'element.' appears in the script text
"config": PublicElementConfig, # wraps ElementConfig (read/save last config)
# only if 'config.' appears in script text
"data": {}, # always empty for management scripts
"context": dict, # full element context dict (mutable — write here)
"flow_context": None, # NOT passed from run_management_script
"settings": {}, # not used as output for mgmt scripts
"pause": Callable, # pause(sleep=30, user_msg=None)
"wait_for_connect": Callable, # wait_for_connect(timeout=10, retry=1, sleep=30, user_msg=None)
"log_message": Callable, # log_message(level, msg)
}
Writing Output
# Add/update keys in element context
context['my_new_variable'] = 'value'
context['credentials'] = {'user': 'admin', 'pw': 'secret'}
Output Processing
-
Extraction Method:
result.pop('context', {}) -
Integration: The entire
contextdict after execution is merged intoTaskContext.contextvia.update()
If a management script returns new context data, it is merged directly:
self.ctx.context.update(script_out)
Available Modules
math, json, random, naturaltime, re, ssh_key_generate, time
Task Processing Script
Purpose: Runs as Step 7, after model commands have been sent and parsed. Used to transform, validate, or enrich the data extracted from the element before it is stored in the database.
Script Type: processing_script on Task
Source Location: task_execute_script(..., klass='proc')
Context Variables
{
"element": PublicElement, # same as management script
"config": PublicElementConfig,
"data": dict, # current processing_out (command results, parsed data)
"context": dict, # full element context dict (read + write)
"flow_context": None, # NOT currently passed from run_processing_script
"settings": {}, # output dict — write results here
"pause": Callable,
"wait_for_connect": Callable,
"log_message": Callable,
}
Writing Output
settings['hostname'] = data.get('Hostname', '').strip()
settings['action_status'] = 'ok' # optional — stored in task results
settings['action_info'] = 'Hostname updated'
Special Output Keys
| Key | Purpose |
|---|---|
action_status |
Stored in TaskResults.action_status and flow log |
action_info |
Stored in TaskResults.action_info and flow log |
Output Processing
-
Extraction Method:
result.pop('settings', {}) -
Integration: Merged into
TaskManager.results.processing_outvia.update()
Available Modules
math, json, random, naturaltime, re, ssh_key_generate, time
How Task Context is Used
The task can:
- Read base element data (
element,epoch, settings/group variables) - Read flow user data in
context['flow'] - Read previous task outputs in
context['tasks'] - Add more data for later steps
Concrete runtime behavior already used in code:
if not self.results.commands_out and self.ctx.context.get('automation_custom_output'):
self.results.commands_out = self.ctx.context['automation_custom_output']
Model Python Script Context
For models with klass in ('pyms', 'pyps'), a Python script runs inside the model data mapping process.
Source Location: apps/automation/utils.py → model_data_map_python_script()
Context Variables
{
"model": Model, # the Automation Model ORM instance
"data": str, # raw text data received from element (for pyps)
# empty string for pyms (no raw data)
"context": dict, # full element context dict (read-only recommendation)
"output": {}, # write results here
"settings": {}, # alternative output dict
}
Writing Output
output['parsed_value'] = some_function(data)
output['hostname'] = re.search(r'hostname (\S+)', data).group(1)
Output Processing
-
Extraction Method:
result.pop('output', {}) -
Special Handling: If
contextwas also modified, it is merged underoutput['context']
Additional Modules
math, json, random, naturaltime, re, time, boto3, botocore, collections, datetime, timedelta, gcp_bigquery, gcp_service_account, timezone, pytz
Service Execution Context (email-msg)
Purpose: Handles email service execution during task completion.
Source Location: apps/automation/utils.py → task_execute_service()
Called In: TaskManager.execute_service() (Step 9)
A Service (email service in this case) uses the context available at service execution time.
Email Context Assembly
At service step, the email context is assembled exactly like this:
email_context = {
# ── Fixed Keys ──────────────────────────────────────────────────────────────
'element': element, # raw Element ORM instance
'task': task, # Task ORM instance (name, short_name, id, ...)
'body_custom_msg': notification_content, # 'content' param from service settings
# ── Task Context (spread) ───────────────────────────────────────────────────
# Everything from TaskContext.context at the time execute_service runs:
# - all base element context keys (group variables, element settings, context['element'], epoch)
# - context['flow'] = flow_variables (if running inside a flow)
# - context['tasks'] = previous tasks results for this element in this flow
# - any keys added by management_script or processing_script
**task_context,
# ── Flow Context Entry (spread) ─────────────────────────────────────────────
# The flow_context[-1] dict for the current element (see Flow Context section), or {} if standalone:
# - "id": element.id
# - "context": element_context_generate(element) (original snapshot)
# - "tasks": { "<task_pk>": task_detail, ... }
**flow_context_entry,
}
Service Settings Parameters
These parameters are configured on the Service instance of type email-msg:
| Parameter | Usage |
|---|---|
from-email |
Sender address |
custom_to |
Comma-separated recipient list |
subject |
Email subject (Django template rendered) |
content |
body_custom_msg in the email context |
template |
Template ID (optional, selects HTML template) |
Injecting Custom Variables
The task_context received by task_execute_service() is directly self.ctx.context — the same mutable dictionary that circulates through the entire task pipeline. Since the service runs in step 9, results from both the management script (step 1) and processing script (step 7) are available.
Management Script → Official Channel
Script results are applied with .update() to ctx.context:
# In task management_script (klass='svc'):
context['recipient_email'] = element.get('email_contact', '')
context['alert_level'] = 'critical' if context.get('status') == 'down' else 'info'
context['custom_subject'] = f"[{element['name']}] Alert processed"
# All these keys will be available in the email template
Processing Script → Direct Dictionary Mutation
The processing script receives the same context object by reference. Its official output (settings) goes to processing_out (and from there to the database), not to ctx.context. However, since the dictionary is passed by reference, writing directly to context within the processing script also persists and reaches the service:
# In task processing_script:
settings['sw_version'] = data.get('version', '') # → processing_out → Database
# To pass something additional to the service:
context['parsed_hostname'] = data.get('hostname', '') # → persists in ctx.context → reaches service
Data Flow Summary
| Source | Destination | Available in email_context? |
|---|---|---|
context['x'] = val in management script |
ctx.context via .update() |
✓ Yes |
context['x'] = val in processing script |
ctx.context direct (by reference) |
✓ Yes |
settings['x'] = val in processing script |
results.processing_out → Database |
✗ No (unless also copied to context) |
flow_variables (user info from flow) |
context['flow'] |
✓ Yes (as context['flow']['user']) |
How Service Context is Used
Email subject/body and template can use:
- Task and element data
- Anything previously added to
task_context - If the flow already has task results, those are also available through the merged context
Context Lifecycle Within Task Execution
The following table summarizes which context is available at each step of TaskManager.execute():
| Step | Method | Context at Entrance | Context Modifications |
|---|---|---|---|
| — | execute() starts |
Base element context | context['tasks'] added if inside flow |
| 1 | run_management_script() |
Base context | Script can add/modify any key in context |
| 2 | element_data_mapping() |
Context enriched by step 1 | results.model_out set |
| 3–6 | connect_with_element() … |
Same context | results.commands_out, processing_out built |
| 7 | run_processing_script() |
Same context + data=processing_out |
results.processing_out updated from settings; action_status/action_info stored |
| 8 | mapping_result_to_database() |
Same context + processing_out | Element parameters saved to database |
| 9 | execute_service() |
Same context | Email sent; service_out merged into processing_out |
| 10 | finish_execution() |
Final context | Task log created; flow_context[-1]['tasks'][task_pk] updated |
Context Lifecycle Within Serial Flow Execution
FlowManager.flow_execute()
│
├─ flow_execute_processing_script() ← receives: elements list (can sort/filter)
│
└─ for each element:
flow_context.append(get_flow_context_for_element(element))
│
└─ SerialFlowManager.run_script(element, order)
│ receives: flattened flow_context, element, positions, run_task, run_flow
│
└─ run_task(id, short_name)
│
└─ run_task_wrapper()
│ element_context = element_context_generate(element)
│ element_context['flow'] = flow_variables
│
└─ TaskManager.execute()
├─ context['tasks'] = flow_context[-1]['tasks']
├─ run_management_script() → context enriched
├─ element_data_mapping()
├─ connect_with_element()
├─ model_commands_execution()
├─ custom_model_commands_output_processing()
├─ convert_command_results...()
├─ run_processing_script() → processing_out enriched
├─ mapping_result_to_database()
├─ execute_service() → email sent with full context
└─ finish_execution()
└─ flow_context[-1]['tasks'][task_pk] = task_detail
Context Lifecycle Within Parallel Flow Execution
ParallelFlowManager
│
├─ for each element: flow_context.append(get_flow_context_for_element(element))
│
└─ run_script()
│ receives: flattened flow_context, all elements, run_task_in_elements
│
└─ run_task_in_elements(id, short_name, elements=None)
│
└─ spawns one thread per element → run_task_wrapper() (same as serial)
Key Difference: The parallel flow script sees all elements at once. The context variable in the script is the flattened snapshot of flow_context (all elements merged), not a per-element view.
Variable Availability Reference
This table shows which variables are available in each script type:
| Variable | Flow Pre-exec | Flow Exec (Serial) | Flow Exec (Parallel) | Task Condition | Task Mgmt Script | Task Proc Script | Model pyms/pyps | Service (Email) |
|---|---|---|---|---|---|---|---|---|
element (Element ORM) |
✗ | ✓ | ✗ | ✗ | ✓ (PublicElement) | ✓ (PublicElement) | ✗ | ✓ |
elements (list) |
✓ | ✓ (parent) | ✓ | ✗ | ✗ | ✗ | ✗ | ✗ |
context (element ctx dict) |
✗ | ✓ (flattened) | ✓ (flattened) | ✗ | ✓ (mutable) | ✓ (read) | ✓ (read) | ✓ (spread) |
flow_context (list) |
✗ | ✗ | ✗ | ✓ | ✗ | ✗ | ✗ | ✓ (latest entry) |
flow_variables (user info) |
✗ | ✓ | ✓ | ✗ | ✗ | ✗ | ✗ | ✗ |
data |
✗ | ✗ | ✗ | ✗ | ✓ (empty) | ✓ (processing_out) | ✓ (raw text) | ✗ |
settings (output) |
✗ | ✗ | ✗ | ✗ | ✗ | ✓ write here | ✗ | ✗ |
output (output) |
✗ | ✗ | ✗ | ✗ | ✗ | ✗ | ✓ write here | ✗ |
condition (output) |
✗ | ✗ | ✗ | ✓ write here | ✗ | ✗ | ✗ | ✗ |
run_task |
✗ | ✓ | ✓ | ✗ | ✗ | ✗ | ✗ | ✗ |
run_flow |
✗ | ✓ | ✓ | ✗ | ✗ | ✗ | ✗ | ✗ |
current/previous/last/first |
✗ | ✓ | ✗ | ✓ | ✗ | ✗ | ✗ | ✗ |
config (PublicElementConfig) |
✗ | ✗ | ✗ | ✗ | ✓ | ✓ | ✗ | ✗ |
pause(sleep) |
✓ | ✓ | ✓ | ✗ | ✓ | ✓ | ✗ | ✗ |
wait_for_connect(...) |
✗ | ✗ | ✗ | ✗ | ✓ | ✓ | ✗ | ✗ |
task (Task ORM) |
✗ | ✗ | ✗ | ✗ | ✗ | ✗ | ✗ | ✓ |
body_custom_msg |
✗ | ✗ | ✗ | ✗ | ✗ | ✗ | ✗ | ✓ |
Writing to Context from Scripts
Management Script → Context Enrichment
context['resolved_ip'] = '10.0.0.1'
context['credentials'] = {'username': 'admin', 'password': 'secret'}
# These are available to all subsequent steps in the same task execution
Processing Script → Settings Population
settings['hostname'] = data.get('hostname', '').strip()
settings['sw_version'] = re.search(r'Version (\S+)', data.get('version', '')).group(1)
settings['action_status'] = 'ok' # or 'warning', 'error'
settings['action_info'] = 'Parsed 3 parameters successfully'
# Keys in settings are merged into processing_out and saved to ElementSetting
Model pyms/pyps Script → Output Writing
output['interface_count'] = len(re.findall(r'^interface', data, re.M))
output['mgmt_address'] = context.get('element', {}).get('management_address')
Flow Execution Script → Results Reading
# After run_task runs, results are accumulated in flow_context and
# available via the flattened context dict in the next run_task call
# (for reading, use the full flow_context structure)
run_task(short_name='collect-hw')
run_task(short_name='send-report') # can read results from collect-hw via context['tasks']
Summary: End-to-End Context Flow
| Case | Real context source | Practical use |
|---|---|---|
| Flow | {"id", "context", "tasks"} per element + flow_variables['user'] |
Coordinate and track full flow execution |
| Task | element_context_generate(...) + flow + tasks + script updates |
Run task logic with up-to-date shared data |
| Service | email_context = fixed keys + merged task/flow context |
Send dynamic email content with real execution data |
Real Path Through System
- Flow creates per-element context:
id,context,tasks. - Task starts and receives base context +
flow_variables. - Task injects previous task results into
context['tasks']. - Management/processing steps enrich available data.
- Service runs and receives merged
email_context(element + task + accumulated context).
Result: Services run with real, current data from the same execution — no manual copy/paste. Flows create and organize context, tasks enrich and consume it, and services use it to execute with full, real execution state.
No comments to display
No comments to display