Chapter 5

Network Device Collection

Quick Start

Create an inventory.yaml file with your devices, then run three commands in sequence: meshq collect --source inventory.yamlmeshq parsemeshq ingest. Your topology graph will be populated and queryable immediately. See §5.3 for the full pipeline.

MeshOptixIQ collects operational state from network devices over SSH — no agents, no SNMP polling, no proprietary software on the devices. The collection pipeline has three stages: collect (raw CLI output cached to disk), parse (normalise to vendor-agnostic Pydantic models), and ingest (write to the graph database). Each stage can be run independently, making it easy to re-parse after adding a new vendor parser or to re-ingest after a schema update.

5.1 Supported Vendors & OS Types

VendorOS / PlatformFacts Collected
CiscoIOS / IOS-XEInterfaces, IPs, ARP, MAC table, VLANs, neighbors (CDP), device info
AristaEOSInterfaces, IPs, ARP, MAC table, neighbors (LLDP), device info
JuniperJunOSInterfaces, IPs, ARP, neighbors (LLDP), security policies, address objects, device info
ArubaArubaOSInterfaces, IPs, ARP, MAC table, VLANs
Palo AltoPAN-OSInterfaces, ARP, system info, security policies, address/service objects
FortinetFortiOSSecurity policies, address objects, device info
CiscoASA OSACLs (access-lists), object-groups, device info
Custom Parsers
Enterprise plans support custom vendor parsers. Contact hello@meshoptixiq.com to discuss integration of additional vendors or OS versions.

5.2 Inventory File Format

Devices are defined in a YAML inventory file. By convention this file is named inventory.yaml and stored in the project directory.

devices:
  # Cisco IOS core switch
  - hostname: sw-core-01
    host: 10.0.0.1
    device_type: cisco_ios
    username: netops
    password: "{{ env.SWITCH_PASSWORD }}"
    port: 22
    timeout: 30

  # Arista EOS distribution switch
  - hostname: sw-dist-01
    host: 10.0.0.2
    device_type: arista_eos
    username: netops
    password: "{{ env.SWITCH_PASSWORD }}"

  # PAN-OS firewall (includes firewall policy collection)
  - hostname: fw-edge-01
    host: 10.0.0.10
    device_type: paloalto_panos
    username: admin
    password: "{{ env.FW_PASSWORD }}"

  # Juniper SRX firewall
  - hostname: fw-srx-01
    host: 10.0.0.11
    device_type: juniper_junos
    username: netops
    password: "{{ env.FW_PASSWORD }}"

  # Fortinet FortiGate
  - hostname: fw-fg-01
    host: 10.0.0.12
    device_type: fortinet
    username: admin
    password: "{{ env.FW_PASSWORD }}"

  # Cisco ASA
  - hostname: fw-asa-01
    host: 10.0.0.13
    device_type: cisco_asa
    username: cisco
    password: "{{ env.FW_PASSWORD }}"

Supported device_type Values

device_typeVendor / OS
cisco_iosCisco IOS, IOS-XE
arista_eosArista EOS
juniper_junosJuniper JunOS (MX, EX, SRX)
aruba_osAruba ArubaOS
paloalto_panosPalo Alto PAN-OS
fortinetFortinet FortiOS
cisco_asaCisco ASA

Credential Security

Never store credentials in plain text in inventory files. Use environment variable references ({{ env.VAR_NAME }}) or a secrets manager. For Docker deployments, use Docker Secrets or pass credentials via environment variables.

5.3 Running Collection

Collect Raw Data

# Collect from all devices in the inventory file
meshq collect --source inventory.yaml

# Collect from a specific device (by hostname)
meshq collect --source inventory.yaml --device sw-core-01

# Collect with a higher timeout (seconds, default 30)
meshq collect --source inventory.yaml --timeout 60

# Run in parallel (default 4 workers; increase for large networks)
meshq collect --source inventory.yaml --workers 8

Raw CLI output is saved to ~/.meshoptixiq/cache/<hostname>/. Each collection run overwrites the previous cache for that device.

SSH Key Authentication
To use SSH key authentication instead of passwords, set use_keys: true and key_file: /path/to/key.pem in the device definition, or configure SSH agent forwarding.

Check Collection Status

meshq status
# Shows last collection time, device count, and any collection errors

Scheduling Regular Collection

For continuous network intelligence, schedule collection with cron or a container orchestrator:

# Cron example — collect every 4 hours
0 */4 * * * /usr/local/bin/meshq collect --source /opt/meshoptixiq/inventory.yaml \
  && /usr/local/bin/meshq parse \
  && /usr/local/bin/meshq ingest

Parse and Ingest

# Parse all cached device data
meshq parse

# Parse a specific device only
meshq parse --device sw-core-01

# Ingest parsed data into the configured graph backend
meshq ingest

# Ingest and reinitialise schema constraints first
meshq ingest --init-schema

# One-command pipeline: collect, parse, and ingest in sequence
meshq collect --source inventory.yaml \
  && meshq parse \
  && meshq ingest

Parsing produces normalised Pydantic models and reports counts: devices, interfaces, IPs, MACs, VLANs, endpoints. Parsing errors are reported per-device and do not abort the run.

Verifying Ingested Data

# Check summary counts via the API
curl -H "X-API-Key: $API_KEY" \
  http://localhost:8000/queries/summary_stats/execute \
  -H "Content-Type: application/json" \
  -d '{"parameters": {}}'

# Expected response
{
  "total": 1,
  "rows": [{
    "device_count": 12,
    "interface_count": 247,
    "endpoint_count": 1843,
    "vlan_count": 18
  }]
}

5.4 Distributed Collection Pro+

For large networks or horizontally-scaled deployments, MeshOptixIQ supports a distributed collection model using a Redis work queue. Multiple collector worker processes pop device tasks from a shared queue, SSH into the devices in parallel, and write results back to the queue.

Prerequisites

Distributed collection requires Redis. Set the REDIS_URL environment variable on all API and worker instances:

export REDIS_URL=redis://redis:6379

Dispatching Devices to the Queue

# Push all devices from inventory.yaml to the Redis work queue
meshq collect --dispatch --source inventory.yaml

# The dispatcher is typically run as a one-shot job
# and does not block — it exits once all devices are enqueued.

Running Collection Workers

# Start a single worker (pop → SSH → ingest loop)
meshq collect --worker

# Customise the polling interval (seconds, default 5)
meshq collect --worker --poll-interval 5

Each worker instance continuously pops device tasks from the queue, collects raw CLI output, parses, and ingests the data into the configured graph backend. Workers exit cleanly when no tasks remain and the queue is empty.

Horizontal Scaling
Worker instances scale horizontally — run as many workers as you need across multiple hosts or pods. The Redis queue distributes work automatically. In Kubernetes, use a Deployment with multiple replicas for the collector worker, and a CronJob for the dispatcher (see §10.4).

Docker Compose Cluster Setup

The docker-compose.cluster.yml file in the repository provides a ready-to-use configuration with two collector workers and a one-shot dispatcher service:

# Start the full cluster stack (API + Redis + collectors + dispatcher)
docker compose -f docker-compose.cluster.yml up -d

# Scale workers independently
docker compose -f docker-compose.cluster.yml up -d --scale collector=4

# Check queue status
curl -H "X-API-Key: $API_KEY" http://localhost:8000/collect/status

API Endpoints for Queue Management

MethodPathDescription
GET/collect/statusQueue depth, processing count, completed count
POST/collect/dispatchEnqueue all devices from the server-side inventory
POST/collect/recoverRe-enqueue stale tasks (devices stuck in "processing" state)

5.5 Ansible Dynamic Inventory Export Pro+

MeshOptixIQ can generate an Ansible-compatible dynamic inventory from the graph, grouping devices by vendor and automatically placing firewall devices into a dedicated firewalls group.

CLI Export

# Export to stdout (JSON format, compatible with ansible-inventory --list)
meshq export --format ansible

# Save to a file
meshq export --format ansible --output /etc/ansible/inventory.json

# INI format (legacy Ansible format)
meshq export --format ansible-ini --output /etc/ansible/inventory.ini

REST API

# JSON format (default)
GET /inventory/ansible
X-API-Key: your-api-key

# Legacy INI format
GET /inventory/ansible?format=ini
X-API-Key: your-api-key

Example JSON Output

{
  "all": {
    "hosts": {
      "sw-core-01": {
        "ansible_host": "10.0.0.1",
        "ansible_user": "netops",
        "vendor": "cisco",
        "os_version": "17.3.4"
      },
      "fw-edge-01": {
        "ansible_host": "10.0.0.10",
        "ansible_user": "admin",
        "vendor": "paloalto",
        "os_version": "10.2.3"
      }
    },
    "children": ["cisco", "paloalto", "firewalls"]
  },
  "cisco": {
    "hosts": ["sw-core-01", "sw-dist-01"]
  },
  "paloalto": {
    "hosts": ["fw-edge-01"]
  },
  "firewalls": {
    "hosts": ["fw-edge-01", "fw-srx-01", "fw-fg-01"]
  },
  "_meta": {
    "hostvars": { ... }
  }
}
Firewall Group
Devices with at least one collected FirewallRule node in the graph are automatically placed in the firewalls group, regardless of their vendor group. This allows playbooks to target all firewalls without maintaining a separate inventory group manually.

Using with ansible-inventory

# Use as a dynamic inventory script
ansible-inventory -i "http://localhost:8000/inventory/ansible" --list

# Or reference the exported file
ansible-inventory -i /etc/ansible/inventory.json --list

# Run a playbook against firewall devices
ansible-playbook -i /etc/ansible/inventory.json \
  --limit firewalls \
  playbooks/audit_fw_config.yaml

5.8 Async SSH Collection with Scrapli

Pro+ Scrapli replaces Netmiko as the default SSH transport for large-scale deployments. Whereas Netmiko spawns one OS thread per device (~8 MB stack each), Scrapli runs all sessions on a single asyncio event loop — enabling 200+ concurrent SSH connections with no GIL contention.

Installation

pip install 'meshoptixiq-network-discovery[scrapli]'
# Installs scrapli[asyncio]>=2024.1 with asyncssh transport

Vendor Platform Mapping

Inventory vendor valueScrapli platformNotes
cisco_ioscisco_iosxeIOS and IOS-XE share a driver
cisco_xecisco_iosxe
cisco_xrcisco_iosxrIOS-XR uses a distinct driver
cisco_nxoscisco_nxosNX-OS on Nexus hardware
arista_eosarista_eos
juniper_junosjuniper_junos
anything elsegenericUses AsyncGenericDriver; reduced command set

Inventory Format

Same YAML format as Netmiko. Add the optional key_file field to use SSH key authentication instead of a password:

- hostname: core-sw-01
  host: 10.0.0.1
  vendor: cisco_ios
  port: 22
  username: admin
  password_env: DEVICE_PASSWORD   # env var holding the password
  key_file: /etc/meshq/keys/id_ed25519   # optional: key auth

Timeouts & Retry Behaviour

SettingDefaultNotes
Socket connect timeout10 sTCP SYN → ACK window
Transport (auth) timeout30 sSSH handshake + credential exchange
Operations timeout120 sPer-command read deadline
Retry attempts3Back-off: 2 s × attempt number
Auth-failure retrydisabledWrong credentials are not retried

Running Async Collection

import asyncio
from network_discovery.collectors.scrapli_collector import collect_all_async

# Collect all devices, up to 200 concurrent SSH sessions
results = asyncio.run(collect_all_async("inventory.yaml", concurrency=200))
print(f"Collected {len(results)} devices")

To collect only specific fact types (e.g., for delta updates — see §5.9):

results = asyncio.run(
    collect_all_async(
        "inventory.yaml",
        concurrency=200,
        fact_types={"interfaces", "bgp"},   # only run tagged commands
    )
)
OS file-descriptor limit
Each concurrent SSH session consumes ~3 file descriptors. For more than 500 devices, raise the OS limit before starting the collector: ulimit -n 4096. In Docker, set --ulimit nofile=4096:4096.
flowchart LR INV[inventory.yaml] --> SEM[asyncio.Semaphore\nconcurrency=200] SEM --> S1[AsyncScrapli\ndevice-1] SEM --> S2[AsyncScrapli\ndevice-2] SEM --> SN[AsyncScrapli\ndevice-N] S1 --> RO[raw_outputs] S2 --> RO SN --> RO RO --> SRO[save_raw_output] SRO --> DEC{MESHQ_PARSE_QUEUE?} DEC -->|yes| RQ[Redis LIST\nmeshq:collect:parse_queue] DEC -->|no| DISK[disk / graph ingest]

5.9 Event-Driven Delta Collection

Full network sweeps spike CPU on production hardware and can take minutes on large inventories. Delta collection solves this by listening for syslog events and immediately SSH-ing into the affected device to refresh only the relevant fact categories — completing in seconds, not minutes.

Architecture

A syslog receiver feeds events to match_syslog_trigger(). When a pattern matches, handle_trigger() locates the device in the managed inventory and calls collect_device_async() with the minimum fact_types set required to refresh the affected graph nodes.

Syslog Trigger Patterns

Trigger categoryExample mnemonicFact types refreshed
BGP peer changeBGP-5-ADJCHANGEbgp, routing
OSPF adjacencyOSPF-5-ADJCHGospf, routing
IS-IS adjacencyISIS-5-ADJ_CHANGEisis, routing
EIGRP topologyDUAL-5-NBRCHANGErouting
MPLS LDPLDP-5-NBRCHGmpls, routing
Interface state (IOS)LINEPROTO-5-UPDOWNinterfaces, interface_details, neighbors
Link down (IOS)LINK-3-UPDOWNinterfaces, interface_details
Interface state (NX-OS)ETHPORT-5-IF_ADMIN_UPinterfaces, interface_details
Config changeSYS-5-CONFIG_Idevice_info, interfaces, vlans
VLAN addedSW_VLAN-6-VLAN_CREATEDvlans
ARP conflictIP-4-DUPADDRarp_table
MAC flapMAC_MOVE-SP-4-NOTIFmac_table, arp_table
ACL denyIPACCESSLOG-6-LOGDENYacl, security

Integration

from network_discovery.collectors.delta_collector import (
    match_syslog_trigger,
    handle_trigger,
)

# In your syslog processing loop:
trigger = match_syslog_trigger(event)   # returns SyslogTrigger | None
if trigger:
    await handle_trigger(trigger, inventory_devices)
Unresolved hostnames
The device must already exist in the managed inventory. If the hostname in the syslog message cannot be matched (exact match → FQDN prefix fuzzy match), the trigger is skipped and a warning is logged. Ensure inventory hostnames match your syslog source host fields.

5.10 Parse Worker: CPU Offloading via Redis

SSH collection is I/O-bound; TextFSM and regex parsing are CPU-bound. Running both in the same process creates GIL contention that degrades SSH throughput at scale. The Parse Worker decouples them: the SSH collector enqueues raw outputs to Redis, and one or more Parse Worker pods drain the queue asynchronously.

Queue Architecture

The queue uses the Redis RPUSH / BLPOP pattern — inherently ordered, durable, and horizontally scalable. Multiple worker pods BLPOP from the same list: no coordination or leader election needed.

flowchart LR COL[SSH Collector\ncollect_device_async] -->|RPUSH| Q[Redis LIST\nmeshq:collect:parse_queue] Q -->|BLPOP| W1[ParseWorker-1\nasyncio.to_thread] Q -->|BLPOP| W2[ParseWorker-2\nasyncio.to_thread] Q -->|BLPOP| WN[ParseWorker-N] W1 --> NF[NetworkFacts\ngraph ingest] W2 --> NF WN --> NF W1 -->|on error| DLQ[meshq:collect:parse_dlq] W2 -->|on error| DLQ WN -->|on error| DLQ

Environment Variables

VariableDefaultEffect
MESHQ_PARSE_QUEUESet to any truthy value (1, true) to activate queue mode. When set, save_raw_output() enqueues automatically instead of parsing inline.
MESHQ_PARSE_CONCURRENCY4Number of concurrent parse tasks per worker pod.
REDIS_URLredis://localhost:6379Redis connection string. Use redis+cluster:// for Redis Cluster HA.
GRAPH_BACKENDinmemoryGraph backend for ingest after parsing (neo4j / postgres / inmemory).

Activation & Startup

# Activate queue mode (SSH collector will enqueue instead of parsing inline)
export MESHQ_PARSE_QUEUE=1
export REDIS_URL=redis://redis:6379

# Start a parse worker (two equivalent forms)
meshq-parse-worker

# Or directly via Python
python -m network_discovery.collectors.parse_worker

DLQ Monitoring

Items that fail parsing or graph ingest are moved to the dead-letter queue with the original payload and error message preserved. Use redis-cli to inspect and retry:

# Check queue depths
redis-cli LLEN meshq:collect:parse_queue   # pending items
redis-cli LLEN meshq:collect:parse_dlq     # failed items

# Inspect a DLQ item (non-destructive)
redis-cli LINDEX meshq:collect:parse_dlq 0

# Retry a failed item: move from DLQ back to main queue
redis-cli RPOPLPUSH meshq:collect:parse_dlq meshq:collect:parse_queue

Horizontal Scaling

Add worker pods freely — each is stateless and independently BLPOPs from the shared list. In Kubernetes, scale with kubectl scale deployment meshq-parse-worker --replicas=8. Each pod can tune MESHQ_PARSE_CONCURRENCY independently based on available CPU.

DLQ payload format
Each DLQ entry is a JSON object with raw (the original raw output JSON) and error (the exception message). This preserves enough context for manual inspection. To re-process, push the raw value back to meshq:collect:parse_queue.