Network Device Collection
Create an inventory.yaml file with your devices, then run three commands in sequence: meshq collect --source inventory.yaml → meshq parse → meshq ingest. Your topology graph will be populated and queryable immediately. See §5.3 for the full pipeline.
MeshOptixIQ collects operational state from network devices over SSH — no agents, no SNMP polling, no proprietary software on the devices. The collection pipeline has three stages: collect (raw CLI output cached to disk), parse (normalise to vendor-agnostic Pydantic models), and ingest (write to the graph database). Each stage can be run independently, making it easy to re-parse after adding a new vendor parser or to re-ingest after a schema update.
5.1 Supported Vendors & OS Types
| Vendor | OS / Platform | Facts Collected |
|---|---|---|
| Cisco | IOS / IOS-XE | Interfaces, IPs, ARP, MAC table, VLANs, neighbors (CDP), device info |
| Arista | EOS | Interfaces, IPs, ARP, MAC table, neighbors (LLDP), device info |
| Juniper | JunOS | Interfaces, IPs, ARP, neighbors (LLDP), security policies, address objects, device info |
| Aruba | ArubaOS | Interfaces, IPs, ARP, MAC table, VLANs |
| Palo Alto | PAN-OS | Interfaces, ARP, system info, security policies, address/service objects |
| Fortinet | FortiOS | Security policies, address objects, device info |
| Cisco | ASA OS | ACLs (access-lists), object-groups, device info |
5.2 Inventory File Format
Devices are defined in a YAML inventory file. By convention this file is named inventory.yaml and stored in the project directory.
devices:
# Cisco IOS core switch
- hostname: sw-core-01
host: 10.0.0.1
device_type: cisco_ios
username: netops
password: "{{ env.SWITCH_PASSWORD }}"
port: 22
timeout: 30
# Arista EOS distribution switch
- hostname: sw-dist-01
host: 10.0.0.2
device_type: arista_eos
username: netops
password: "{{ env.SWITCH_PASSWORD }}"
# PAN-OS firewall (includes firewall policy collection)
- hostname: fw-edge-01
host: 10.0.0.10
device_type: paloalto_panos
username: admin
password: "{{ env.FW_PASSWORD }}"
# Juniper SRX firewall
- hostname: fw-srx-01
host: 10.0.0.11
device_type: juniper_junos
username: netops
password: "{{ env.FW_PASSWORD }}"
# Fortinet FortiGate
- hostname: fw-fg-01
host: 10.0.0.12
device_type: fortinet
username: admin
password: "{{ env.FW_PASSWORD }}"
# Cisco ASA
- hostname: fw-asa-01
host: 10.0.0.13
device_type: cisco_asa
username: cisco
password: "{{ env.FW_PASSWORD }}"
Supported device_type Values
| device_type | Vendor / OS |
|---|---|
cisco_ios | Cisco IOS, IOS-XE |
arista_eos | Arista EOS |
juniper_junos | Juniper JunOS (MX, EX, SRX) |
aruba_os | Aruba ArubaOS |
paloalto_panos | Palo Alto PAN-OS |
fortinet | Fortinet FortiOS |
cisco_asa | Cisco ASA |
Credential Security
Never store credentials in plain text in inventory files. Use environment variable references ({{ env.VAR_NAME }}) or a secrets manager. For Docker deployments, use Docker Secrets or pass credentials via environment variables.
5.3 Running Collection
Collect Raw Data
# Collect from all devices in the inventory file
meshq collect --source inventory.yaml
# Collect from a specific device (by hostname)
meshq collect --source inventory.yaml --device sw-core-01
# Collect with a higher timeout (seconds, default 30)
meshq collect --source inventory.yaml --timeout 60
# Run in parallel (default 4 workers; increase for large networks)
meshq collect --source inventory.yaml --workers 8
Raw CLI output is saved to ~/.meshoptixiq/cache/<hostname>/. Each collection run overwrites the previous cache for that device.
use_keys: true and key_file: /path/to/key.pem in the device definition, or configure SSH agent forwarding.Check Collection Status
meshq status
# Shows last collection time, device count, and any collection errors
Scheduling Regular Collection
For continuous network intelligence, schedule collection with cron or a container orchestrator:
# Cron example — collect every 4 hours
0 */4 * * * /usr/local/bin/meshq collect --source /opt/meshoptixiq/inventory.yaml \
&& /usr/local/bin/meshq parse \
&& /usr/local/bin/meshq ingest
Parse and Ingest
# Parse all cached device data
meshq parse
# Parse a specific device only
meshq parse --device sw-core-01
# Ingest parsed data into the configured graph backend
meshq ingest
# Ingest and reinitialise schema constraints first
meshq ingest --init-schema
# One-command pipeline: collect, parse, and ingest in sequence
meshq collect --source inventory.yaml \
&& meshq parse \
&& meshq ingest
Parsing produces normalised Pydantic models and reports counts: devices, interfaces, IPs, MACs, VLANs, endpoints. Parsing errors are reported per-device and do not abort the run.
Verifying Ingested Data
# Check summary counts via the API
curl -H "X-API-Key: $API_KEY" \
http://localhost:8000/queries/summary_stats/execute \
-H "Content-Type: application/json" \
-d '{"parameters": {}}'
# Expected response
{
"total": 1,
"rows": [{
"device_count": 12,
"interface_count": 247,
"endpoint_count": 1843,
"vlan_count": 18
}]
}
5.4 Distributed Collection Pro+
For large networks or horizontally-scaled deployments, MeshOptixIQ supports a distributed collection model using a Redis work queue. Multiple collector worker processes pop device tasks from a shared queue, SSH into the devices in parallel, and write results back to the queue.
Prerequisites
Distributed collection requires Redis. Set the REDIS_URL environment variable on all API and worker instances:
export REDIS_URL=redis://redis:6379
Dispatching Devices to the Queue
# Push all devices from inventory.yaml to the Redis work queue
meshq collect --dispatch --source inventory.yaml
# The dispatcher is typically run as a one-shot job
# and does not block — it exits once all devices are enqueued.
Running Collection Workers
# Start a single worker (pop → SSH → ingest loop)
meshq collect --worker
# Customise the polling interval (seconds, default 5)
meshq collect --worker --poll-interval 5
Each worker instance continuously pops device tasks from the queue, collects raw CLI output, parses, and ingests the data into the configured graph backend. Workers exit cleanly when no tasks remain and the queue is empty.
Docker Compose Cluster Setup
The docker-compose.cluster.yml file in the repository provides a ready-to-use configuration with two collector workers and a one-shot dispatcher service:
# Start the full cluster stack (API + Redis + collectors + dispatcher)
docker compose -f docker-compose.cluster.yml up -d
# Scale workers independently
docker compose -f docker-compose.cluster.yml up -d --scale collector=4
# Check queue status
curl -H "X-API-Key: $API_KEY" http://localhost:8000/collect/status
API Endpoints for Queue Management
| Method | Path | Description |
|---|---|---|
GET | /collect/status | Queue depth, processing count, completed count |
POST | /collect/dispatch | Enqueue all devices from the server-side inventory |
POST | /collect/recover | Re-enqueue stale tasks (devices stuck in "processing" state) |
5.5 Ansible Dynamic Inventory Export Pro+
MeshOptixIQ can generate an Ansible-compatible dynamic inventory from the graph, grouping devices by vendor and automatically placing firewall devices into a dedicated firewalls group.
CLI Export
# Export to stdout (JSON format, compatible with ansible-inventory --list)
meshq export --format ansible
# Save to a file
meshq export --format ansible --output /etc/ansible/inventory.json
# INI format (legacy Ansible format)
meshq export --format ansible-ini --output /etc/ansible/inventory.ini
REST API
# JSON format (default)
GET /inventory/ansible
X-API-Key: your-api-key
# Legacy INI format
GET /inventory/ansible?format=ini
X-API-Key: your-api-key
Example JSON Output
{
"all": {
"hosts": {
"sw-core-01": {
"ansible_host": "10.0.0.1",
"ansible_user": "netops",
"vendor": "cisco",
"os_version": "17.3.4"
},
"fw-edge-01": {
"ansible_host": "10.0.0.10",
"ansible_user": "admin",
"vendor": "paloalto",
"os_version": "10.2.3"
}
},
"children": ["cisco", "paloalto", "firewalls"]
},
"cisco": {
"hosts": ["sw-core-01", "sw-dist-01"]
},
"paloalto": {
"hosts": ["fw-edge-01"]
},
"firewalls": {
"hosts": ["fw-edge-01", "fw-srx-01", "fw-fg-01"]
},
"_meta": {
"hostvars": { ... }
}
}
FirewallRule node in the graph are automatically placed in the firewalls group, regardless of their vendor group. This allows playbooks to target all firewalls without maintaining a separate inventory group manually.Using with ansible-inventory
# Use as a dynamic inventory script
ansible-inventory -i "http://localhost:8000/inventory/ansible" --list
# Or reference the exported file
ansible-inventory -i /etc/ansible/inventory.json --list
# Run a playbook against firewall devices
ansible-playbook -i /etc/ansible/inventory.json \
--limit firewalls \
playbooks/audit_fw_config.yaml
5.8 Async SSH Collection with Scrapli
Pro+ Scrapli replaces Netmiko as the default SSH transport for large-scale deployments. Whereas Netmiko spawns one OS thread per device (~8 MB stack each), Scrapli runs all sessions on a single asyncio event loop — enabling 200+ concurrent SSH connections with no GIL contention.
Installation
pip install 'meshoptixiq-network-discovery[scrapli]'
# Installs scrapli[asyncio]>=2024.1 with asyncssh transport
Vendor Platform Mapping
Inventory vendor value | Scrapli platform | Notes |
|---|---|---|
cisco_ios | cisco_iosxe | IOS and IOS-XE share a driver |
cisco_xe | cisco_iosxe | |
cisco_xr | cisco_iosxr | IOS-XR uses a distinct driver |
cisco_nxos | cisco_nxos | NX-OS on Nexus hardware |
arista_eos | arista_eos | |
juniper_junos | juniper_junos | |
| anything else | generic | Uses AsyncGenericDriver; reduced command set |
Inventory Format
Same YAML format as Netmiko. Add the optional key_file field to use SSH key authentication instead of a password:
- hostname: core-sw-01
host: 10.0.0.1
vendor: cisco_ios
port: 22
username: admin
password_env: DEVICE_PASSWORD # env var holding the password
key_file: /etc/meshq/keys/id_ed25519 # optional: key auth
Timeouts & Retry Behaviour
| Setting | Default | Notes |
|---|---|---|
| Socket connect timeout | 10 s | TCP SYN → ACK window |
| Transport (auth) timeout | 30 s | SSH handshake + credential exchange |
| Operations timeout | 120 s | Per-command read deadline |
| Retry attempts | 3 | Back-off: 2 s × attempt number |
| Auth-failure retry | disabled | Wrong credentials are not retried |
Running Async Collection
import asyncio
from network_discovery.collectors.scrapli_collector import collect_all_async
# Collect all devices, up to 200 concurrent SSH sessions
results = asyncio.run(collect_all_async("inventory.yaml", concurrency=200))
print(f"Collected {len(results)} devices")
To collect only specific fact types (e.g., for delta updates — see §5.9):
results = asyncio.run(
collect_all_async(
"inventory.yaml",
concurrency=200,
fact_types={"interfaces", "bgp"}, # only run tagged commands
)
)
ulimit -n 4096. In Docker, set --ulimit nofile=4096:4096.
5.9 Event-Driven Delta Collection
Full network sweeps spike CPU on production hardware and can take minutes on large inventories. Delta collection solves this by listening for syslog events and immediately SSH-ing into the affected device to refresh only the relevant fact categories — completing in seconds, not minutes.
Architecture
A syslog receiver feeds events to match_syslog_trigger(). When a pattern matches, handle_trigger() locates the device in the managed inventory and calls collect_device_async() with the minimum fact_types set required to refresh the affected graph nodes.
Syslog Trigger Patterns
| Trigger category | Example mnemonic | Fact types refreshed |
|---|---|---|
| BGP peer change | BGP-5-ADJCHANGE | bgp, routing |
| OSPF adjacency | OSPF-5-ADJCHG | ospf, routing |
| IS-IS adjacency | ISIS-5-ADJ_CHANGE | isis, routing |
| EIGRP topology | DUAL-5-NBRCHANGE | routing |
| MPLS LDP | LDP-5-NBRCHG | mpls, routing |
| Interface state (IOS) | LINEPROTO-5-UPDOWN | interfaces, interface_details, neighbors |
| Link down (IOS) | LINK-3-UPDOWN | interfaces, interface_details |
| Interface state (NX-OS) | ETHPORT-5-IF_ADMIN_UP | interfaces, interface_details |
| Config change | SYS-5-CONFIG_I | device_info, interfaces, vlans |
| VLAN added | SW_VLAN-6-VLAN_CREATED | vlans |
| ARP conflict | IP-4-DUPADDR | arp_table |
| MAC flap | MAC_MOVE-SP-4-NOTIF | mac_table, arp_table |
| ACL deny | IPACCESSLOG-6-LOGDENY | acl, security |
Integration
from network_discovery.collectors.delta_collector import (
match_syslog_trigger,
handle_trigger,
)
# In your syslog processing loop:
trigger = match_syslog_trigger(event) # returns SyslogTrigger | None
if trigger:
await handle_trigger(trigger, inventory_devices)
5.10 Parse Worker: CPU Offloading via Redis
SSH collection is I/O-bound; TextFSM and regex parsing are CPU-bound. Running both in the same process creates GIL contention that degrades SSH throughput at scale. The Parse Worker decouples them: the SSH collector enqueues raw outputs to Redis, and one or more Parse Worker pods drain the queue asynchronously.
Queue Architecture
The queue uses the Redis RPUSH / BLPOP pattern — inherently ordered, durable, and horizontally scalable. Multiple worker pods BLPOP from the same list: no coordination or leader election needed.
Environment Variables
| Variable | Default | Effect |
|---|---|---|
MESHQ_PARSE_QUEUE | — | Set to any truthy value (1, true) to activate queue mode. When set, save_raw_output() enqueues automatically instead of parsing inline. |
MESHQ_PARSE_CONCURRENCY | 4 | Number of concurrent parse tasks per worker pod. |
REDIS_URL | redis://localhost:6379 | Redis connection string. Use redis+cluster:// for Redis Cluster HA. |
GRAPH_BACKEND | inmemory | Graph backend for ingest after parsing (neo4j / postgres / inmemory). |
Activation & Startup
# Activate queue mode (SSH collector will enqueue instead of parsing inline)
export MESHQ_PARSE_QUEUE=1
export REDIS_URL=redis://redis:6379
# Start a parse worker (two equivalent forms)
meshq-parse-worker
# Or directly via Python
python -m network_discovery.collectors.parse_worker
DLQ Monitoring
Items that fail parsing or graph ingest are moved to the dead-letter queue with the original payload and error message preserved. Use redis-cli to inspect and retry:
# Check queue depths
redis-cli LLEN meshq:collect:parse_queue # pending items
redis-cli LLEN meshq:collect:parse_dlq # failed items
# Inspect a DLQ item (non-destructive)
redis-cli LINDEX meshq:collect:parse_dlq 0
# Retry a failed item: move from DLQ back to main queue
redis-cli RPOPLPUSH meshq:collect:parse_dlq meshq:collect:parse_queue
Horizontal Scaling
Add worker pods freely — each is stateless and independently BLPOPs from the shared list. In Kubernetes, scale with kubectl scale deployment meshq-parse-worker --replicas=8. Each pod can tune MESHQ_PARSE_CONCURRENCY independently based on available CPU.
raw (the original raw output JSON) and error (the exception message). This preserves enough context for manual inspection. To re-process, push the raw value back to meshq:collect:parse_queue.