From 552ad832aac29252dd8d605fc593e9b8e0dccca5 Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Mon, 8 Jun 2026 13:38:23 -0400 Subject: [PATCH 1/2] CAP-1007 add ASM eval tooling --- .../agents/asm-operator.md | 150 ++++++ .../attack-surface-management/capability.yaml | 45 ++ .../attack-surface-management/mcp/bbot.py | 437 ++++++++++++++++ .../attack-surface-management/mcp/shodan.py | 367 +++++++++++++ .../attack-surface-management/pyproject.toml | 19 + .../skills/bbot-module-reference/SKILL.md | 161 ++++++ .../skills/cypher-query-playbook/SKILL.md | 325 ++++++++++++ .../skills/reconnaissance-planning/SKILL.md | 113 ++++ .../skills/screenshot-triage/SKILL.md | 105 ++++ .../skills/shodan-reconnaissance/SKILL.md | 168 ++++++ .../tests/test_bbot_mcp.py | 174 +++++++ .../tests/test_shodan_mcp.py | 155 ++++++ .../tools/FUTURE_TOOLS.md | 34 ++ .../attack-surface-management/tools/bbot.py | 486 ++++++++++++++++++ 14 files changed, 2739 insertions(+) create mode 100644 dreadnode/attack-surface-management/agents/asm-operator.md create mode 100644 dreadnode/attack-surface-management/capability.yaml create mode 100644 dreadnode/attack-surface-management/mcp/bbot.py create mode 100644 dreadnode/attack-surface-management/mcp/shodan.py create mode 100644 dreadnode/attack-surface-management/pyproject.toml create mode 100644 dreadnode/attack-surface-management/skills/bbot-module-reference/SKILL.md create mode 100644 dreadnode/attack-surface-management/skills/cypher-query-playbook/SKILL.md create mode 100644 dreadnode/attack-surface-management/skills/reconnaissance-planning/SKILL.md create mode 100644 dreadnode/attack-surface-management/skills/screenshot-triage/SKILL.md create mode 100644 dreadnode/attack-surface-management/skills/shodan-reconnaissance/SKILL.md create mode 100644 dreadnode/attack-surface-management/tests/test_bbot_mcp.py create mode 100644 dreadnode/attack-surface-management/tests/test_shodan_mcp.py create mode 100644 dreadnode/attack-surface-management/tools/FUTURE_TOOLS.md create mode 100644 dreadnode/attack-surface-management/tools/bbot.py diff --git a/dreadnode/attack-surface-management/agents/asm-operator.md b/dreadnode/attack-surface-management/agents/asm-operator.md new file mode 100644 index 0000000..8173676 --- /dev/null +++ b/dreadnode/attack-surface-management/agents/asm-operator.md @@ -0,0 +1,150 @@ +--- +name: asm-operator +description: Autonomous attack surface management agent that systematically discovers and analyzes external attack surfaces using BBOT reconnaissance scanning and Neo4j graph analysis +model: inherit +--- + +You are a **Red Team Reconnaissance Operator** specializing in external attack surface management. Your mission is to systematically discover and analyze a target's attack surface by synthesizing data from BBOT scans and Neo4j graph queries, producing actionable intelligence for subsequent offensive operations. + +## Core Objective + +Produce a concise list of **10-20 actionable areas of interest** for a human operator to investigate further. An "area of interest" is anything anomalous, misconfigured, high-value, or potentially vulnerable. It is more valuable to surface many *potential* leads than to deeply confirm a few. + +## Guiding Philosophy + +1. **Be the Signal, Not the Noise**: Filter thousands of data points down to a handful of meaningful leads. Don't just list data — synthesize it. +2. **Think Like an Analyst**: Prioritize what a human would find interesting. A `dev` subdomain with an exposed login page is more interesting than 100 identical marketing pages. Look for outliers. +3. **Context is King**: Your data is a graph. Connect the dots. How does a newly found subdomain relate to a known IP? What technologies are running on assets with "admin" in the name? +4. **Outcome Over Process**: A rigid checklist is secondary to achieving the core objective. The goal is the list of interesting follow-up targets, not perfect adherence to a phased workflow. +5. **Continuously Surface Insights**: As soon as you find something that warrants human attention, report it immediately. Don't wait to bundle findings in a final report. + +## Analysis Priorities + +Focus your analysis on these themes, in priority order: + +1. **Information Leakage**: Verbose error messages, stack traces (`DEBUG=True` pages), `phpinfo()` files, public `.git` directories. A screenshot of a stack trace can be more valuable than a login page. +2. **Development & Staging Artifacts**: Assets named `dev`, `stage`, `uat`, `test`, `qa`. They often have weaker security, debug features enabled, default credentials, and more bugs. Their presence reveals the target's development lifecycle. +3. **API Surfaces**: API endpoints (`/api/`, `/v1/`, `/graphql`). APIs are connective tissue of modern applications and a frequent source of business logic flaws, information disclosure, and authentication bypasses. +4. **Outdated & Esoteric Software**: An asset running old Nginx is interesting; one running `JBoss Application Server 4.0` is critical. Look for technologies past end-of-life, uncommonly used, or with known critical vulnerabilities. +5. **Business Context Clues**: Asset names and page titles revealing business context. `invoice-processor` or `customer-data-api` is inherently more valuable than `blog-assets`. +6. **Misconfigured Cloud Services**: Beyond open S3 buckets — public cloud function URLs, exposed instance metadata endpoints, DNS records pointing to takeover-vulnerable cloud services. + +## Operating Loop (OODA) + +Operate in a continuous **Observe -> Orient -> Decide -> Act** cycle. Every action feeds the next iteration. + +### Observe (What's the current state?) + +- What assets do I already know about? Query: `MATCH (n) RETURN labels(n)[0] as type, count(n) AS count ORDER BY count DESC` +- What was the result of the last scan? Review newly added nodes and relationships. +- Are there screenshots needing analysis? Query: `MATCH (s:WEBSCREENSHOT) WHERE s.analyzed IS NULL RETURN s.uuid, s.url` + +### Orient (What's interesting here?) + +This is the most critical step. Synthesize the observed data: + +- **High-value targets**: Assets with names like `vpn`, `admin`, `dev`, `api`, `sso`? +- **Anomalies**: An IP hosting only one domain while others host dozens? Strange or outdated technology? +- **Potential vulnerabilities**: Exposed login panels, directory listings, services on non-standard ports? +- **Screenshot triage**: What do visuals reveal? Prioritize screenshots of pages with interesting titles or from high-value hosts. Load the `screenshot-triage` skill for the full triage methodology. + +### Decide (What's the most logical next action?) + +Based on orientation, choose the single next action providing the most valuable new information: + +- Found new `api` subdomains? Run a targeted web scan or technology detection against them. +- Found a sensitive-looking URL in a screenshot? Run `nuclei` against it. +- Initial enumeration seems sparse? Run a broader scan to get more data. +- Need deeper graph analysis? Load the `cypher-query-playbook` skill for advanced query patterns. +- Unsure which BBOT modules to use? Load the `bbot-module-reference` skill. + +### Act (Execute the action) + +- Run the chosen BBOT scan via `run_bbot_scan`. +- Query the graph database via `query_graph` for analysis. +- Use `explore_nodes` and `explore_relationships` for discovery. +- Once the action completes, return to **Observe**. + +**Tempo**: Faster cycles beat slower ones. Avoid analysis paralysis — a good test executed now is better than a perfect test planned for three cycles from now. But never sacrifice orientation for speed. + +## Tools + +You have three categories of tools: + +### BBOT Scanning + +- `run_bbot_scan` — Execute BBOT reconnaissance scans against targets. Supports modules, presets, flags, and custom configuration. Results are automatically stored in the Neo4j graph database. + +### Neo4j Graph Database + +- `query_graph` — Execute Cypher queries for advanced analysis. This is your primary analysis tool. Load the `cypher-query-playbook` skill for comprehensive query patterns. +- `get_scan_metadata` — Retrieve metadata about completed scans. +- `get_findings` — Retrieve security findings and vulnerabilities. +- `get_db_schema` — Introspect the database schema to understand available data. +- `explore_nodes` — Flexibly explore graph nodes by label and property filters. +- `explore_relationships` — Discover how nodes are connected. +- `get_screenshot` — Retrieve screenshot images for visual analysis. + +### Shodan Internet Intelligence + +You may have tools from the Shodan MCP server. Check your tool schema for availability — the server requires a `SHODAN_API_KEY` to be configured. If unavailable, fall back to BBOT modules that query Shodan (e.g., `shodan_dns`). + +Key Shodan tools: +- `shodan_host_search` — Search for hosts by query (org, hostname, port, product, CVE) +- `shodan_host_info` — Detailed IP reconnaissance (free, no credit cost) +- `shodan_count` — Result count without consuming credits (always use first to check scope) +- `shodan_dns_lookup` / `shodan_dns_reverse` — DNS resolution and reverse lookups (free) +- `shodan_exploits_search` — CVE and exploit database search (free) + +**Credit strategy**: Use `shodan_count` + facets first (free), `shodan_host_info` for specific IPs (free), reserve `shodan_host_search` for when you need the full match list. Load the `shodan-reconnaissance` skill for query patterns and enrichment workflows. + +### Neo4j Data Model Reference + +Always start with `get_db_schema()` when a graph is preloaded. BBOT exports and +evaluation fixtures may use either friendly fields (`name`, `address`, `url`) or +the BBOT event envelope (`id`, `uuid`, `type`, `data`, `host`, `netloc`, `port`, +`tags`, `scope_distance`, `module`, `scan`). Prefer Cypher like +`coalesce(n.name, n.data, n.host)` when identifying assets, and inspect +relationship types before assuming names. Some graphs use semantic relationships +such as `RESOLVES_TO`; others use module or DNS-record relationships such as +`A`, `CNAME`, or `httpx`. + +**Key Node Labels:** + +| Label | Properties | Purpose | +|---|---|---| +| `DNS_NAME` | `.name` or `.data`, `.host`, `.tags` | Domain or subdomain | +| `IP_ADDRESS` | `.address` or `.data`, `.provider`, `.asn` | IP address | +| `URL` | `.name` or `.data`, `.url`, `.status_code`, `.title`, `.content_length` | Web endpoint | +| `TECHNOLOGY` | `.name` or `.data`, `.version`, `.category` | Web technology | +| `WEBSCREENSHOT` | `.uuid`, `.url`, `.path`, `.data`, `.analyzed` | Page screenshot | +| `FINDING` | `.type`, `.severity`, `.description`, `.data`, `.tags` | Security finding | +| `OPEN_TCP_PORT` | `.port`, `.service` | Open network port | +| `STORAGE_BUCKET` | `.name`, `.public` | Cloud storage | +| `EMAIL_ADDRESS` | `.address` | Email address | +| `SCAN` | `.name`, `.id`, `.start_time`, `.modules` | Scan metadata | + +**Key Relationships:** + +| Relationship | Pattern | Purpose | +|---|---|---| +| `RESOLVES_TO` or `A` | `(DNS_NAME)-[]->(IP_ADDRESS)` | DNS resolution | +| `HAS_PORT` or module edge | `(IP_ADDRESS)-[]->(OPEN_TCP_PORT)` | Port discovery | +| `HAS_TECHNOLOGY` or module edge | `(URL)-[]->(TECHNOLOGY)` | Tech detection | +| `HAS_FINDING` or module edge | `(URL\|DNS_NAME\|IP_ADDRESS)-[]->(FINDING)` | Vulnerability link | +| `USED_BY` | `(TECHNOLOGY)-[:USED_BY]->(URL)` | Reverse tech link | + +## Evidence Standards + +When reporting areas of interest, provide: + +- **What you found**: The specific asset, configuration, or behavior. +- **Why it matters**: The security implication or potential attack path. +- **What to do next**: Concrete next steps for a human operator. +- **Supporting evidence**: Cypher queries, scan results, or screenshots that back up the finding. + +Classify each area of interest by priority: **critical**, **high**, **medium**, or **low**. + +## Autonomous Operation + +You are autonomous and should not assume any user will engage with this conversation. Operate in continuous OODA loops until you have surfaced sufficient areas of interest or exhausted available reconnaissance avenues. Communicate progress and findings through your tool calls and output. diff --git a/dreadnode/attack-surface-management/capability.yaml b/dreadnode/attack-surface-management/capability.yaml new file mode 100644 index 0000000..cdd200b --- /dev/null +++ b/dreadnode/attack-surface-management/capability.yaml @@ -0,0 +1,45 @@ +schema: 1 +name: attack-surface-management +version: "1.0.0" +description: > + External attack surface management with BBOT reconnaissance scanning, + Neo4j graph database analysis, Shodan internet intelligence, and + autonomous OODA-loop driven asset discovery. Covers subdomain + enumeration, web scanning, cloud resource discovery, technology + fingerprinting, vulnerability detection, screenshot triage, and + CVE/exploit correlation. Includes Cypher query playbooks for + graph-based infrastructure mapping and threat prioritization. + +mcp: + servers: + bbot: + command: "uv" + args: + - "run" + - "${CAPABILITY_ROOT}/mcp/bbot.py" + init_timeout: 60 + timeout: 3600 + shodan: + command: "uv" + args: + - "run" + - "${CAPABILITY_ROOT}/mcp/shodan.py" + env: + SHODAN_API_KEY: "${SHODAN_API_KEY}" + SHODAN_API_URL: "${SHODAN_API_URL:-}" + init_timeout: 30 + +author: + name: Dreadnode + url: https://dreadnode.io +license: MIT +repository: https://github.com/dreadnode/capabilities +keywords: + - attack-surface-management + - reconnaissance + - bbot + - neo4j + - shodan + - subdomain-enumeration + - asset-discovery + - vulnerability-intelligence diff --git a/dreadnode/attack-surface-management/mcp/bbot.py b/dreadnode/attack-surface-management/mcp/bbot.py new file mode 100644 index 0000000..1810359 --- /dev/null +++ b/dreadnode/attack-surface-management/mcp/bbot.py @@ -0,0 +1,437 @@ +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "fastmcp>=2.0", +# "neo4j>=5.28.1", +# "bbot", +# ] +# /// +"""BBOT reconnaissance and Neo4j graph query tools exposed as an MCP server. + +Provides BBOT scan execution and Neo4j Cypher query tools for attack +surface management. Connects to a running Neo4j instance where BBOT +stores its scan results. + +Environment variables: + NEO4J_URI: Neo4j bolt URI (default: bolt://localhost:7687) + NEO4J_USER: Neo4j username (default: neo4j) + NEO4J_PASSWORD: Neo4j password (default: bbotislife) + BBOT_DATA_DIR: BBOT data directory (default: .bbot) +""" + +from __future__ import annotations + +import asyncio +import ast +import contextlib +import json +import os +from pathlib import Path +import re +import shlex +from typing import Annotated + +from fastmcp import FastMCP +from neo4j import AsyncGraphDatabase + +NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://localhost:7687") +NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j") +NEO4J_PASSWORD = os.environ.get("NEO4J_PASSWORD", "bbotislife") +BBOT_DATA_DIR = os.environ.get("BBOT_DATA_DIR", ".bbot") +SCAN_TIMEOUT = 3600 +MAX_OUTPUT = 50_000 +_CYPHER_IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + + +def _safe_json(obj: object) -> str: + return json.dumps(obj, indent=2, default=str) + + +def _validate_cypher_identifier(value: str, kind: str) -> str: + """Validate identifiers interpolated into Cypher label/type/property slots.""" + if not _CYPHER_IDENTIFIER_RE.fullmatch(value): + raise ValueError(f"Invalid {kind}: {value!r}") + return value + + +def _parse_mapping(value: object) -> dict: + if isinstance(value, dict): + return value + if not isinstance(value, str): + return {} + with contextlib.suppress(Exception): + parsed = json.loads(value) + return parsed if isinstance(parsed, dict) else {} + with contextlib.suppress(Exception): + parsed = ast.literal_eval(value) + return parsed if isinstance(parsed, dict) else {} + return {} + + +def _coerce_int_limit(limit: int, *, maximum: int = 1000) -> int: + if limit < 1 or limit > maximum: + raise ValueError(f"Limit must be between 1 and {maximum}.") + return limit + + +class _Neo4jClient: + """Lazy async Neo4j driver wrapper.""" + + def __init__(self) -> None: + self._driver = None + + async def get(self): + if self._driver is None: + self._driver = AsyncGraphDatabase.driver( + NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD) + ) + await self._driver.verify_connectivity() + return self._driver + + async def query(self, cypher: str, params: dict | None = None) -> list[dict]: + driver = await self.get() + async with driver.session() as session: + result = await session.run(cypher, params or {}) + return [record.data() async for record in result] + + +_neo4j = _Neo4jClient() + +mcp = FastMCP("bbot") + + +@mcp.tool() +async def bbot_health() -> str: + """Check BBOT and Neo4j connectivity.""" + errors = [] + + # Check bbot + try: + proc = await asyncio.create_subprocess_exec( + "bbot", "--version", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + stdout, _ = await proc.communicate() + bbot_version = stdout.decode().strip() if proc.returncode == 0 else "not found" + except FileNotFoundError: + bbot_version = "not installed" + errors.append("bbot CLI not found in PATH") + + # Check Neo4j + try: + await _neo4j.get() + neo4j_status = f"connected ({NEO4J_URI})" + except Exception as e: + neo4j_status = f"error: {e}" + errors.append(f"Neo4j connection failed: {e}") + + status = "healthy" if not errors else "degraded" + return ( + f"Status: {status}\n" + f" BBOT: {bbot_version}\n" + f" Neo4j: {neo4j_status}" + ) + + +@mcp.tool() +async def run_bbot_scan( + targets: Annotated[list[str], "Targets to scan (domains, IPs, CIDRs)"], + modules: Annotated[list[str] | None, "Specific modules to run"] = None, + presets: Annotated[list[str] | None, "Presets (subdomain-enum, web-basic, nuclei, etc.)"] = None, + flags: Annotated[list[str] | None, "Module group flags (passive, safe, active, etc.)"] = None, + config: Annotated[list[str] | None, "Config options in key=value format"] = None, + extra_args: Annotated[list[str] | None, "Additional bbot CLI flags"] = None, +) -> str: + """Execute a BBOT reconnaissance scan with results stored in Neo4j.""" + if not targets: + return "Error: at least one target is required." + + cfg = list(config or []) + cfg.extend([ + f"modules.neo4j.uri={NEO4J_URI}", + f"modules.neo4j.username={NEO4J_USER}", + f"modules.neo4j.password={NEO4J_PASSWORD}", + ]) + + parts = ["bbot", "--yes", "--output-modules", "neo4j", "--brief"] + parts.extend(["--targets", *targets]) + if modules: + parts.extend(["--modules", *modules]) + if flags: + parts.extend(["--flags", *flags]) + if presets: + parts.extend(["--preset", *presets]) + parts.extend(["--config", *cfg]) + if extra_args: + parts.extend(extra_args) + + cmd = " ".join(parts) + + try: + proc = await asyncio.create_subprocess_exec( + *shlex.split(cmd), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=SCAN_TIMEOUT) + output = stdout.decode(errors="replace") + except asyncio.TimeoutError: + return f"Scan timed out after {SCAN_TIMEOUT}s" + except FileNotFoundError: + return "Error: bbot not found. Install with: pip install bbot" + + if len(output) > MAX_OUTPUT: + output = output[:MAX_OUTPUT] + "\n\n... [TRUNCATED]" + + status = "completed" if proc.returncode == 0 else f"exited with code {proc.returncode}" + return f"Scan {status}.\n\n{output}" + + +@mcp.tool() +async def query_graph( + cypher: Annotated[str, "Cypher query to execute"], + params: Annotated[dict | None, "Query parameters (use $param in query)"] = None, +) -> str: + """Execute a Cypher query against the Neo4j graph database.""" + result = await _neo4j.query(cypher, params) + return _safe_json(result) + + +@mcp.tool() +async def get_findings( + severity: Annotated[str | None, "Filter by severity (critical, high, medium, low)"] = None, +) -> str: + """Retrieve security findings from BBOT scans.""" + if severity: + result = await _neo4j.query( + "MATCH (f:FINDING) WHERE f.severity = $sev RETURN f", {"sev": severity} + ) + else: + result = await _neo4j.query("MATCH (f:FINDING) RETURN f") + return _safe_json(result) + + +@mcp.tool() +async def get_db_schema() -> str: + """Retrieve Neo4j labels, relationship types, and property metadata.""" + queries = { + "node_labels": "CALL db.labels() YIELD label RETURN label", + "relationship_types": "CALL db.relationshipTypes() YIELD relationshipType RETURN relationshipType", + "node_properties": "CALL db.schema.nodeTypeProperties()", + "relationship_properties": "CALL db.schema.relTypeProperties()", + } + labels, rel_types, node_props, rel_props = await asyncio.gather( + *(_neo4j.query(query) for query in queries.values()) + ) + + schema: dict[str, object] = { + "node_labels": sorted(record["label"] for record in labels if record.get("label")), + "relationship_types": sorted( + record["relationshipType"] for record in rel_types if record.get("relationshipType") + ), + "node_properties": {}, + "relationship_properties": {}, + } + + node_properties: dict[str, list[dict[str, object]]] = {} + for record in node_props: + label = str(record.get("nodeType", "")).lstrip(":") + if not label: + continue + node_properties.setdefault(label, []).append({ + "property": record.get("propertyName"), + "types": record.get("propertyTypes"), + "mandatory": record.get("mandatory"), + }) + + relationship_properties: dict[str, list[dict[str, object]]] = {} + for record in rel_props: + rel_type = str(record.get("relType", "")).lstrip(":") + if not rel_type: + continue + relationship_properties.setdefault(rel_type, []).append({ + "property": record.get("propertyName"), + "types": record.get("propertyTypes"), + "mandatory": record.get("mandatory"), + }) + + schema["node_properties"] = node_properties + schema["relationship_properties"] = relationship_properties + return _safe_json(schema) + + +@mcp.tool() +async def get_asset_summary() -> str: + """Get a summary count of all asset types in the database.""" + result = await _neo4j.query( + "MATCH (n) RETURN labels(n)[0] as type, count(n) AS count ORDER BY count DESC" + ) + return _safe_json(result) + + +@mcp.tool() +async def get_subdomains( + domain: Annotated[str, "Parent domain to search for subdomains"], + limit: Annotated[int, "Maximum results"] = 100, +) -> str: + """List discovered subdomains for a domain.""" + result = await _neo4j.query( + """ + MATCH (n:DNS_NAME) + WITH coalesce(n.name, n.data, n.host) AS name + WHERE name ENDS WITH $domain + RETURN name + ORDER BY name + LIMIT $limit + """, + {"domain": domain, "limit": limit}, + ) + return _safe_json(result) + + +@mcp.tool() +async def get_technologies() -> str: + """List all discovered technologies and their usage counts.""" + result = await _neo4j.query( + """ + MATCH (t:TECHNOLOGY) + RETURN DISTINCT coalesce(t.name, t.data) AS name, t.version AS version, count(*) AS usage + ORDER BY usage DESC + """ + ) + return _safe_json(result) + + +@mcp.tool() +async def explore_nodes( + label: Annotated[str | None, "Node label to browse, for example DNS_NAME, URL, FINDING"] = None, + property_filter: Annotated[ + str | None, + "Optional filter: 'property=value' for exact match or 'property CONTAINS value' for substring", + ] = None, + limit: Annotated[int, "Maximum nodes to return (1-1000)"] = 100, +) -> str: + """Browse graph nodes by label and optional property filter.""" + limit = _coerce_int_limit(limit) + query_parts = [f"MATCH (node:{_validate_cypher_identifier(label, 'label')})" if label else "MATCH (node)"] + params: dict[str, object] = {"limit": limit} + + if property_filter: + if " CONTAINS " in property_filter: + prop, value = property_filter.split(" CONTAINS ", 1) + prop = _validate_cypher_identifier(prop.strip(), "property") + query_parts.append(f"WHERE toString(node.`{prop}`) CONTAINS $value") + params["value"] = value.strip() + elif "=" in property_filter: + prop, value = property_filter.split("=", 1) + prop = _validate_cypher_identifier(prop.strip(), "property") + query_parts.append(f"WHERE node.`{prop}` = $value") + params["value"] = value.strip() + else: + raise ValueError("property_filter must use '=' or ' CONTAINS '.") + + query_parts.append("RETURN node LIMIT $limit") + result = await _neo4j.query(" ".join(query_parts), params) + return _safe_json(result) + + +@mcp.tool() +async def explore_relationships( + source_label: Annotated[str | None, "Optional source node label"] = None, + relationship_type: Annotated[str | None, "Optional relationship type"] = None, + target_label: Annotated[str | None, "Optional target node label"] = None, + limit: Annotated[int, "Maximum relationships to return (1-1000)"] = 100, +) -> str: + """Browse graph relationships with optional source/type/target filters.""" + limit = _coerce_int_limit(limit) + source = f"(source:{_validate_cypher_identifier(source_label, 'source label')})" if source_label else "(source)" + target = f"(target:{_validate_cypher_identifier(target_label, 'target label')})" if target_label else "(target)" + if relationship_type: + rel = f"-[relationship:{_validate_cypher_identifier(relationship_type, 'relationship type')}]->" + else: + rel = "-[relationship]->" + + result = await _neo4j.query( + f"MATCH {source}{rel}{target} RETURN source, relationship, target LIMIT $limit", + {"limit": limit}, + ) + return _safe_json(result) + + +@mcp.tool() +async def get_screenshot( + uuid: Annotated[str | None, "WEBSCREENSHOT uuid or id"] = None, + url: Annotated[str | None, "Substring of the original screenshot URL"] = None, +) -> str: + """Resolve a WEBSCREENSHOT node to a local screenshot file path.""" + if not uuid and not url: + raise ValueError("Either uuid or url must be provided.") + + if uuid: + result = await _neo4j.query( + """ + MATCH (w:WEBSCREENSHOT) + WHERE w.uuid = $uuid OR w.id = $uuid + OPTIONAL MATCH (s:SCAN {id: w.scan}) + RETURN properties(w) AS web_props, properties(s) AS scan_props + LIMIT 1 + """, + {"uuid": uuid}, + ) + else: + result = await _neo4j.query( + """ + MATCH (w:WEBSCREENSHOT) + WHERE toString(w.url) CONTAINS $url OR toString(w.data) CONTAINS $url + OPTIONAL MATCH (s:SCAN {id: w.scan}) + RETURN properties(w) AS web_props, properties(s) AS scan_props + LIMIT 1 + """, + {"url": url}, + ) + + if not result: + needle = f"UUID {uuid!r}" if uuid else f"URL {url!r}" + return f"No screenshot found for {needle}." + + web_props = result[0].get("web_props") or {} + scan_props = result[0].get("scan_props") or {} + web_data = _parse_mapping(web_props.get("data")) + scan_data = _parse_mapping(scan_props.get("data")) + + screenshot_uuid = web_props.get("uuid") or web_props.get("id") or uuid + original_url = web_props.get("url") or web_data.get("url") or web_props.get("host") or url + relative_path = web_props.get("path") or web_data.get("path") + scan_name = scan_props.get("name") or scan_data.get("name") or web_props.get("scan") + + if not relative_path: + return _safe_json({ + "error": "Screenshot data is missing a path.", + "uuid": screenshot_uuid, + "url": original_url, + }) + + path = Path(str(relative_path)).expanduser() + candidates = [path] if path.is_absolute() else [] + bbot_home = Path(BBOT_DATA_DIR).expanduser().resolve() + if not path.is_absolute() and scan_name: + candidates.append(bbot_home / "scans" / str(scan_name) / path) + if not path.is_absolute(): + candidates.append(bbot_home / path) + + for candidate in candidates: + if candidate.exists(): + return _safe_json({ + "path": str(candidate), + "url": original_url, + "uuid": screenshot_uuid, + }) + + return _safe_json({ + "error": "Screenshot file not found.", + "checked_paths": [str(candidate) for candidate in candidates], + "url": original_url, + "uuid": screenshot_uuid, + }) diff --git a/dreadnode/attack-surface-management/mcp/shodan.py b/dreadnode/attack-surface-management/mcp/shodan.py new file mode 100644 index 0000000..89252ee --- /dev/null +++ b/dreadnode/attack-surface-management/mcp/shodan.py @@ -0,0 +1,367 @@ +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "fastmcp>=2.0", +# "shodan>=1.31.0", +# ] +# /// +"""Shodan internet intelligence tools exposed as an MCP server. + +Provides host search, IP reconnaissance, DNS lookups, CVE/exploit +intelligence, and API usage tracking via the Shodan API. + +Environment variables: + SHODAN_API_KEY: Required. Your Shodan API key. + SHODAN_API_URL: Optional. Base URL for a Shodan-compatible mock service. +""" + +from __future__ import annotations + +import json +import os +from typing import Annotated +from urllib import parse, request + +import shodan +from fastmcp import FastMCP + +SHODAN_API_KEY = os.environ.get("SHODAN_API_KEY", "") +SHODAN_API_URL = os.environ.get("SHODAN_API_URL", "").rstrip("/") + +mcp = FastMCP("shodan") + + +class _HttpNamespace: + def __init__(self, client: "_HttpShodanClient", prefix: str) -> None: + self._client = client + self._prefix = prefix + + def resolve(self, hostnames: str) -> dict: + return self._client.get_json(f"{self._prefix}/resolve", {"hostnames": hostnames}) + + def reverse(self, ips: str) -> dict: + return self._client.get_json(f"{self._prefix}/reverse", {"ips": ips}) + + def search(self, query: str, **options: object) -> dict: + return self._client.get_json(f"{self._prefix}/search", {"query": query, **options}) + + def tags(self, size: int = 20) -> object: + return self._client.get_json(f"{self._prefix}/tags", {"size": size}) + + +class _HttpShodanClient: + """Small Shodan-compatible HTTP adapter for task-local mock services.""" + + def __init__(self, base_url: str, api_key: str) -> None: + self.base_url = base_url.rstrip("/") + self.api_key = api_key + self.dns = _HttpNamespace(self, "/dns") + self.exploits = _HttpNamespace(self, "/exploits") + self.queries = _HttpNamespace(self, "/shodan/query") + + def get_json(self, path: str, params: dict[str, object] | None = None) -> object: + query_params = dict(params or {}) + if self.api_key: + query_params["key"] = self.api_key + query = parse.urlencode(query_params, doseq=True) + url = f"{self.base_url}{path}" + if query: + url = f"{url}?{query}" + + req = request.Request(url, headers={"Accept": "application/json"}) + with request.urlopen(req, timeout=30) as response: + body = response.read().decode("utf-8") + return json.loads(body) if body else {} + + def search(self, query: str, **options: object) -> dict: + result = self.get_json("/shodan/host/search", {"query": query, **options}) + return result if isinstance(result, dict) else {} + + def host(self, ip: str, history: bool = False) -> dict: + result = self.get_json(f"/shodan/host/{parse.quote(ip, safe='')}", {"history": str(history).lower()}) + return result if isinstance(result, dict) else {} + + def count(self, query: str, **options: object) -> dict: + result = self.get_json("/shodan/host/count", {"query": query, **options}) + return result if isinstance(result, dict) else {} + + def ports(self) -> object: + return self.get_json("/shodan/ports") + + def protocols(self) -> object: + return self.get_json("/shodan/protocols") + + def info(self) -> dict: + result = self.get_json("/api-info") + return result if isinstance(result, dict) else {} + + +def _get_client() -> shodan.Shodan | _HttpShodanClient: + if not SHODAN_API_KEY: + raise RuntimeError( + "SHODAN_API_KEY environment variable is not set. " + "Get your API key at https://account.shodan.io" + ) + if SHODAN_API_URL: + return _HttpShodanClient(SHODAN_API_URL, SHODAN_API_KEY) + return shodan.Shodan(SHODAN_API_KEY) + + +def _safe_json(obj: object) -> str: + return json.dumps(obj, indent=2, default=str) + + +# ── Core Search ─────────────────────────────────────────────────────── + + +@mcp.tool() +def shodan_host_search( + query: Annotated[str, "Shodan search query (e.g., 'apache city:\"San Francisco\"', 'port:502 tag:ics')"], + facets: Annotated[str | None, "Comma-separated facets for aggregation (e.g., 'country,org,port')"] = None, + page: Annotated[int, "Page number for pagination"] = 1, +) -> str: + """Search Shodan for hosts matching a query. + + Returns matching hosts with IP, port, org, hostnames, location, + vulnerabilities, and optional facet aggregations. + + Common queries: + org:"Target Corp" + hostname:example.com + port:3389 org:"Target Corp" + ssl.cert.subject.cn:example.com + http.title:"Dashboard" + vuln:CVE-2021-44228 + product:"Apache" version:"2.4.49" + """ + api = _get_client() + options: dict = {"page": page} + if facets: + options["facets"] = facets + + results = api.search(query, **options) + + matches = [] + for match in results.get("matches", []): + entry: dict = { + "ip": match.get("ip_str"), + "port": match.get("port"), + "org": match.get("org"), + "hostnames": match.get("hostnames", []), + "domains": match.get("domains", []), + "transport": match.get("transport"), + "product": match.get("product"), + "version": match.get("version"), + } + if match.get("vulns"): + entry["vulns"] = list(match["vulns"].keys()) if isinstance(match["vulns"], dict) else match["vulns"] + if match.get("location"): + loc = match["location"] + entry["location"] = { + "country": loc.get("country_name"), + "city": loc.get("city"), + } + matches.append(entry) + + return _safe_json({ + "total": results.get("total", 0), + "matches": matches, + "facets": results.get("facets", {}), + }) + + +@mcp.tool() +def shodan_host_info( + ip: Annotated[str, "IP address to look up (e.g., '8.8.8.8')"], + history: Annotated[bool, "Include historical banners"] = False, +) -> str: + """Get detailed information about a specific IP address. + + Returns open ports, services, OS, organization, hostnames, location, + vulnerabilities, and service banners. + """ + api = _get_client() + host = api.host(ip, history=history) + + return _safe_json({ + "ip": host.get("ip_str"), + "org": host.get("org"), + "os": host.get("os"), + "ports": host.get("ports", []), + "hostnames": host.get("hostnames", []), + "domains": host.get("domains", []), + "vulns": host.get("vulns", []), + "tags": host.get("tags", []), + "last_update": host.get("last_update"), + "location": { + "country": host.get("country_name"), + "city": host.get("city"), + "asn": host.get("asn"), + "isp": host.get("isp"), + }, + "data": [ + { + "port": svc.get("port"), + "transport": svc.get("transport"), + "product": svc.get("product"), + "version": svc.get("version"), + "banner": (svc.get("data", "")[:500] if svc.get("data") else None), + } + for svc in host.get("data", []) + ], + }) + + +@mcp.tool() +def shodan_count( + query: Annotated[str, "Shodan search query"], + facets: Annotated[str | None, "Comma-separated facets for aggregated counts"] = None, +) -> str: + """Get result count for a query without consuming search credits. + + Always use this before a full search to check scope and avoid + wasting API credits on overly broad queries. + """ + api = _get_client() + options: dict = {} + if facets: + options["facets"] = facets + + result = api.count(query, **options) + return _safe_json({ + "total": result.get("total", 0), + "facets": result.get("facets", {}), + }) + + +# ── DNS ─────────────────────────────────────────────────────────────── + + +@mcp.tool() +def shodan_dns_lookup( + hostnames: Annotated[list[str], "Hostnames to resolve (e.g., ['example.com', 'api.example.com'])"], +) -> str: + """Resolve domain names to IP addresses via Shodan DNS.""" + api = _get_client() + result = api.dns.resolve(",".join(hostnames)) + return _safe_json(result) + + +@mcp.tool() +def shodan_dns_reverse( + ips: Annotated[list[str], "IP addresses to reverse lookup (e.g., ['8.8.8.8'])"], +) -> str: + """Reverse DNS lookup — find hostnames for IP addresses.""" + api = _get_client() + result = api.dns.reverse(",".join(ips)) + return _safe_json(result) + + +# ── Exploits & Intelligence ────────────────────────────────────────── + + +@mcp.tool() +def shodan_exploits_search( + query: Annotated[str, "Exploit search query (e.g., 'CVE-2021-44228', 'Apache', 'Modbus')"], + facets: Annotated[str | None, "Facets for aggregation (e.g., 'type,platform,author')"] = None, + page: Annotated[int, "Page number"] = 1, +) -> str: + """Search the Shodan Exploits database for known exploits and CVEs. + + Returns exploit details including description, author, type, platform, + affected CVEs, and source references. + """ + api = _get_client() + options: dict = {"page": page} + if facets: + options["facets"] = facets + + result = api.exploits.search(query, **options) + + matches = [] + for exploit in result.get("matches", []): + matches.append({ + "id": exploit.get("_id"), + "description": exploit.get("description", "")[:500], + "author": exploit.get("author"), + "type": exploit.get("type"), + "platform": exploit.get("platform"), + "date": exploit.get("date"), + "source": exploit.get("source"), + "cve": exploit.get("cve", []), + }) + + return _safe_json({ + "total": result.get("total", 0), + "matches": matches, + "facets": result.get("facets", {}), + }) + + +# ── Reference Data ─────────────────────────────────────────────────── + + +@mcp.tool() +def shodan_ports() -> str: + """List all port numbers that Shodan actively crawls.""" + api = _get_client() + return _safe_json(api.ports()) + + +@mcp.tool() +def shodan_protocols() -> str: + """List all protocols Shodan can distinguish in banner grabs.""" + api = _get_client() + return _safe_json(api.protocols()) + + +# ── Community Queries ──────────────────────────────────────────────── + + +@mcp.tool() +def shodan_query_search( + query: Annotated[str, "Search term (e.g., 'SCADA', 'webcam', 'database')"], + page: Annotated[int, "Page number"] = 1, +) -> str: + """Search community-shared Shodan queries for inspiration.""" + api = _get_client() + result = api.queries.search(query, page=page) + return _safe_json({ + "total": result.get("total", 0), + "matches": [ + { + "title": q.get("title"), + "description": q.get("description"), + "query": q.get("query"), + "votes": q.get("votes"), + "tags": q.get("tags", []), + } + for q in result.get("matches", []) + ], + }) + + +@mcp.tool() +def shodan_query_tags( + size: Annotated[int, "Number of tags to return"] = 20, +) -> str: + """Get popular tags for community-shared Shodan queries.""" + api = _get_client() + return _safe_json(api.queries.tags(size=size)) + + +# ── API Status ─────────────────────────────────────────────────────── + + +@mcp.tool() +def shodan_api_info() -> str: + """Check API plan, remaining credits, and account status.""" + api = _get_client() + info = api.info() + return _safe_json({ + "plan": info.get("plan"), + "query_credits": info.get("query_credits"), + "scan_credits": info.get("scan_credits"), + "unlocked": info.get("unlocked"), + }) diff --git a/dreadnode/attack-surface-management/pyproject.toml b/dreadnode/attack-surface-management/pyproject.toml new file mode 100644 index 0000000..94daa3c --- /dev/null +++ b/dreadnode/attack-surface-management/pyproject.toml @@ -0,0 +1,19 @@ +[project] +name = "attack-surface-management" +version = "1.0.0" +requires-python = ">=3.10" +dependencies = [ + "bbot", + "fastmcp>=2.0", + "neo4j>=5.28.1", + "shodan>=1.31.0", + "aiodocker>=0.24.0", + "pillow>=11.3.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "mypy>=1.17.0", + "ruff>=0.12.5", +] diff --git a/dreadnode/attack-surface-management/skills/bbot-module-reference/SKILL.md b/dreadnode/attack-surface-management/skills/bbot-module-reference/SKILL.md new file mode 100644 index 0000000..17b7cb7 --- /dev/null +++ b/dreadnode/attack-surface-management/skills/bbot-module-reference/SKILL.md @@ -0,0 +1,161 @@ +--- +name: bbot-module-reference +description: BBOT module and preset reference for reconnaissance scanning. Use when choosing which modules, presets, or flags to use for a BBOT scan, or when you need to understand what a specific module does. +--- + +# BBOT Module & Preset Reference + +## Presets (-p flag) + +Presets are curated combinations of modules for common tasks. + +### Discovery + +| Preset | Purpose | Key Modules | +|---|---|---| +| `subdomain-enum` | Comprehensive subdomain discovery | anubisdb, certspotter, crt, dnsdumpster, dnsbrute, shodan_dns, securitytrails, wayback, +40 more | +| `cloud-enum` | Cloud resource enumeration (includes subdomain-enum) | bucket_amazon, bucket_azure, bucket_firebase, bucket_google | +| `code-enum` | Git repos, Docker images | github_codesearch, dockerhub, git_clone, postman | +| `email-enum` | Email address harvesting | emailformat, hunterio, pgp, skymem | + +### Web Scanning + +| Preset | Purpose | Key Modules | +|---|---|---| +| `web-basic` | Quick web scan for essentials | httpx, wappalyzer, badsecrets, robots, sslcert, ffuf_shortnames | +| `web-thorough` | Aggressive web scan (includes web-basic) | All web-basic + web-thorough flagged modules | +| `spider` | Recursive web crawling | distance:2, depth:4, 25 links/page | +| `spider-intense` | Aggressive spidering | distance:4, depth:6, 50 links/page | +| `tech-detect` | Technology detection only | wappalyzer, nuclei tech templates, fingerprintx | + +### Vulnerability Scanning + +| Preset | Purpose | Notes | +|---|---|---| +| `nuclei` | Template-based vulnerability scanning | directory_only mode | +| `nuclei-intense` | All URLs with robots/urlscan/wayback | More thorough, slower | +| `nuclei-technology` | Templates matching discovered tech | Targeted based on detected stack | +| `nuclei-budget` | Low-hanging fruit mode | budget:10, fastest nuclei option | + +### Fuzzing + +| Preset | Purpose | Notes | +|---|---|---| +| `dirbust-light` | Basic directory brute-force | 1000-line wordlist | +| `dirbust-heavy` | Recursive directory brute-force | 5000-line wordlist, depth:3 | +| `lightfuzz-light` | Basic fuzzing | path, sqli, xss only | +| `lightfuzz-medium` | All fuzzing modules | No POST requests | +| `lightfuzz-heavy` | Intense fuzzing | Includes POST and paramminer | +| `paramminer` | Parameter discovery | Brute-force parameter names | + +### Specialized + +| Preset | Purpose | +|---|---| +| `baddns-intense` | DNS misconfiguration checks (CNAME, MX, NS, TXT) | +| `iis-shortnames` | IIS shortname enumeration | +| `dotnet-audit` | Comprehensive IIS/.NET scanning | +| `fast` | Minimal discovery, strict scope | +| `kitchen-sink` | Everything combined (use with caution on large targets) | + +## Flags (-f flag) + +Flags enable groups of modules sharing a characteristic. + +| Flag | Description | Use When | +|---|---|---| +| `passive` | No direct target contact | Stealth required | +| `safe` | Non-intrusive modules only | Production systems | +| `active` | Modules that contact target | Standard engagement | +| `aggressive` | Potentially disruptive | Lab/controlled environment | +| `subdomain-enum` | All subdomain discovery | Comprehensive DNS mapping | +| `web-basic` | Essential web modules | Quick web assessment | +| `web-thorough` | Extended web modules | Deep web analysis | +| `web-screenshots` | Visual capture | Screenshot collection | +| `portscan` | Port scanning | Network service discovery | +| `cloud-enum` | Cloud resources | Cloud-focused targets | +| `code-enum` | Code repositories | OSINT / code leakage | + +## Key Modules + +### Subdomain Discovery +- `dnsbrute` — Active DNS brute-forcing with wordlists +- `certspotter` / `crt` — Certificate transparency logs +- `dnsdumpster` — DNSDumpster.com queries (passive) +- `wayback` — Archive.org historical data +- `shodan_dns` — Shodan DNS database (requires API key) +- `securitytrails` — Historical DNS records (requires API key) + +### Web Analysis +- `httpx` — Fast web service detection, status codes, titles +- `gowitness` — Web page screenshots (configurable resolution) +- `wappalyzer` — Technology fingerprinting +- `ffuf` — Fast web fuzzer for directories/files +- `nuclei` — Template-based vulnerability scanner + +### Cloud Resources +- `bucket_amazon` / `bucket_azure` / `bucket_google` — Storage bucket enumeration +- `azure_realm` / `azure_tenant` — Azure-specific enumeration +- `oauth` — OAuth endpoint discovery + +### Security Testing +- `badsecrets` — Hardcoded secrets/keys detection +- `baddns` — DNS misconfigurations and potential takeovers +- `lightfuzz` — Lightweight vulnerability fuzzing +- `git` / `gitdumper` — Exposed git repository detection and dumping + +### OSINT / Code +- `github_codesearch` — Search code for secrets/info +- `dockerhub` — Docker image discovery +- `postman` — API documentation discovery +- `social` — Social media profile enumeration + +## Common Recipes + +**Passive-only subdomain discovery:** +``` +targets=["target.com"], presets=["subdomain-enum"], flags=["passive"] +``` + +**Subdomain enum + basic web scan:** +``` +targets=["target.com"], presets=["subdomain-enum", "web-basic"] +``` + +**Targeted nuclei scan on known hosts:** +``` +targets=["api.target.com", "admin.target.com"], presets=["nuclei"] +``` + +**Technology detection across all subdomains:** +``` +targets=["target.com"], presets=["subdomain-enum", "tech-detect"] +``` + +**Screenshot collection:** +``` +targets=["target.com"], modules=["gowitness"], presets=["subdomain-enum"] +``` + +**Cloud resource hunt:** +``` +targets=["target.com"], presets=["cloud-enum"] +``` + +**Deep web spider on specific app:** +``` +targets=["app.target.com"], presets=["spider"], config=["web.spider_distance=2", "web.spider_depth=3"] +``` + +**Full kitchen sink (small targets only):** +``` +targets=["target.com"], presets=["kitchen-sink"] +``` + +## Configuration Tips + +- **API keys**: Configure in `~/.config/bbot/bbot.yaml` for modules like Shodan, SecurityTrails, VirusTotal +- **Scope control**: Use `extra_args=["--strict-scope"]` to prevent scope creep +- **Proxy**: Use `extra_args=["--proxy", "http://127.0.0.1:8080"]` to route through a proxy +- **Custom headers**: Use `extra_args=["--custom-headers", "Authorization=Bearer token"]` +- **Timeouts**: Set via config: `config=["modules.httpx.timeout=10"]` diff --git a/dreadnode/attack-surface-management/skills/cypher-query-playbook/SKILL.md b/dreadnode/attack-surface-management/skills/cypher-query-playbook/SKILL.md new file mode 100644 index 0000000..d848698 --- /dev/null +++ b/dreadnode/attack-surface-management/skills/cypher-query-playbook/SKILL.md @@ -0,0 +1,325 @@ +--- +name: cypher-query-playbook +description: Neo4j Cypher query patterns for analyzing BBOT reconnaissance data in the graph database. Use when you need to analyze scan results, map infrastructure, find anomalies, or synthesize findings from the attack surface graph. +--- + +# Cypher Query Playbook + +## Quick Reference + +## Schema Compatibility + +Run `get_db_schema()` before using relationship-heavy queries. BBOT-backed +graphs may expose friendly properties (`name`, `address`, `url`) or the BBOT +event envelope (`data`, `host`, `netloc`, `port`, `tags`, `scope_distance`, +`module`, `scan`). Use `coalesce(n.name, n.data, n.host)` for asset names when +you are not sure which shape is present. Relationship names also vary: some +fixtures use semantic types such as `RESOLVES_TO` and `HAS_PORT`, while BBOT +module output may use DNS-record or module names such as `A`, `CNAME`, `httpx`, +or `nuclei`. + +### Orientation Queries (Run First) + +**Asset summary:** +```cypher +MATCH (n) RETURN labels(n)[0] as type, count(n) AS count ORDER BY count DESC +``` + +**Recent scans:** +```cypher +MATCH (s:SCAN) RETURN s.name, s.id, s.start_time ORDER BY s.start_time DESC LIMIT 10 +``` + +**Database schema:** +Use the `get_db_schema` tool for a complete schema overview. + +--- + +## Finding High-Value Assets + +**Dev/test/staging subdomains:** +```cypher +MATCH (n:DNS_NAME) +WITH coalesce(n.name, n.data, n.host) AS name +WHERE name =~ '.*(dev|test|stage|uat|vpn|api|admin|internal|staging|qa|sandbox).*' +RETURN name ORDER BY name +``` + +**Interesting web page titles:** +```cypher +MATCH (n:URL) +WHERE n.status_code = 200 +AND n.title =~ '.*(Login|Admin|Dashboard|Unauthorized|Forbidden|Console|Manager|Portal|Panel|Config).*' +RETURN n.name, n.title +``` + +**Critical and high findings:** +```cypher +MATCH (f:FINDING) +WHERE f.severity IN ['critical', 'high'] +RETURN f.type, f.severity, f.description, f.data +``` + +**Admin panels and login pages:** +```cypher +MATCH (n:URL) +WHERE n.name =~ '.*(admin|panel|dashboard|console|login|signin|auth).*' +AND n.status_code < 400 +RETURN n.name, n.status_code, n.title +``` + +--- + +## Infrastructure Mapping + +**DNS to IP resolution:** +```cypher +MATCH (d:DNS_NAME)-[:RESOLVES_TO]->(ip:IP_ADDRESS) +RETURN d.name, ip.address +ORDER BY ip.address +``` + +**Find all domains on a specific IP:** +```cypher +MATCH (ip:IP_ADDRESS {address: $ip})<-[:RESOLVES_TO]-(d:DNS_NAME) +RETURN ip.address, collect(d.name) AS domains +``` + +**Find IPs for a domain:** +```cypher +MATCH (d:DNS_NAME {name: $domain})-[:RESOLVES_TO]->(ip:IP_ADDRESS) +RETURN d.name, ip.address +``` + +**Shared hosting (IPs with multiple domains):** +```cypher +MATCH (ip:IP_ADDRESS)<-[:RESOLVES_TO]-(d:DNS_NAME) +WITH ip, collect(d.name) AS domains, count(d) as cnt +WHERE cnt > 1 +RETURN ip.address, cnt, domains +ORDER BY cnt DESC +``` + +**Reverse DNS — all domains per IP:** +```cypher +MATCH (ip:IP_ADDRESS)<-[:RESOLVES_TO]-(d:DNS_NAME) +RETURN ip.address, collect(d.name) as domains +ORDER BY size(collect(d.name)) DESC +``` + +--- + +## Service Discovery + +**Web services responding 200:** +```cypher +MATCH (n:URL) +WHERE n.status_code >= 200 AND n.status_code < 300 +RETURN n.name, n.status_code, n.title +LIMIT 50 +``` + +**API endpoints:** +```cypher +MATCH (n:URL) +WHERE n.name CONTAINS '/api/' OR n.name CONTAINS '/v1/' OR n.name CONTAINS '/v2/' OR n.name CONTAINS '/graphql' +RETURN n.name, n.status_code, n.title +``` + +**Interesting ports (databases, admin services):** +```cypher +MATCH (p:OPEN_TCP_PORT) +WHERE p.port IN [3306, 5432, 6379, 27017, 9200, 8080, 8443, 9090, 3389, 5900, 11211] +MATCH (ip:IP_ADDRESS)-[:HAS_PORT]->(p) +RETURN ip.address, p.port, p.service +``` + +**Services by port:** +```cypher +MATCH (ip:IP_ADDRESS)-[:HAS_PORT]->(p:OPEN_TCP_PORT) +RETURN p.port, count(ip) as host_count +ORDER BY host_count DESC +LIMIT 20 +``` + +--- + +## Technology Analysis + +**All discovered technologies:** +```cypher +MATCH (t:TECHNOLOGY) +RETURN DISTINCT t.name, t.version, count(*) as usage_count +ORDER BY usage_count DESC +``` + +**Technology stack for a host:** +```cypher +MATCH (d:DNS_NAME {name: $domain})-[:RESOLVES_TO]->(ip)-[:HAS_PORT]->()-[:HAS_TECHNOLOGY]->(t) +RETURN d.name, t.name, t.version +``` + +**Technology outliers (old/unusual software):** +```cypher +MATCH (t:TECHNOLOGY) +WITH t, coalesce(t.name, t.data) AS tech_name +WHERE tech_name IN ['JBoss', 'ColdFusion', 'Struts', 'WebLogic', 'Tomcat', 'IIS'] +OR t.version =~ '.*[0-4]\\..*' +MATCH (n)-[:HAS_TECHNOLOGY]->(t) +RETURN labels(n)[0] as asset_type, coalesce(n.name, n.data, n.host) AS asset, tech_name, t.version +``` + +**Assets with a specific technology:** +```cypher +MATCH (t:TECHNOLOGY {name: $tech_name})<-[:HAS_TECHNOLOGY]-(n) +RETURN labels(n)[0] as type, n.name, t.version +``` + +--- + +## Security Analysis + +**All findings by severity:** +```cypher +MATCH (f:FINDING) +RETURN f.severity, count(f) as count +ORDER BY CASE f.severity + WHEN 'critical' THEN 0 + WHEN 'high' THEN 1 + WHEN 'medium' THEN 2 + WHEN 'low' THEN 3 + ELSE 4 +END +``` + +**Findings with affected assets:** +```cypher +MATCH (asset)-[:HAS_FINDING]->(f:FINDING) +RETURN f.type, f.severity, f.description, labels(asset)[0] as asset_type, asset.name +ORDER BY f.severity +``` + +**Public storage buckets:** +```cypher +MATCH (n:STORAGE_BUCKET) +WHERE n.public = true +RETURN n.name, n.url +``` + +**Exposed databases:** +```cypher +MATCH (p:OPEN_TCP_PORT) +WHERE p.port IN [3306, 5432, 6379, 27017, 9200, 5984, 11211] +MATCH (ip:IP_ADDRESS)-[:HAS_PORT]->(p) +OPTIONAL MATCH (ip)<-[:RESOLVES_TO]-(d:DNS_NAME) +RETURN ip.address, p.port, p.service, collect(d.name) as hostnames +``` + +--- + +## Cross-Reference & Correlation + +**Shared infrastructure for high-value assets:** +```cypher +MATCH (d:DNS_NAME)-[:RESOLVES_TO]->(ip:IP_ADDRESS) +WHERE d.name CONTAINS 'dev' OR d.name CONTAINS 'api' OR d.name CONTAINS 'staging' OR d.name CONTAINS 'admin' +WITH ip, collect(d.name) AS domains, count(*) as domainCount +WHERE domainCount > 1 +RETURN ip.address, domains +``` + +**Correlate findings by technology:** +```cypher +MATCH (f:FINDING)<-[:HAS_FINDING]-(root) +MATCH (root)-[:HAS_TECHNOLOGY]->(tech:TECHNOLOGY) +MATCH (other_asset)-[:HAS_TECHNOLOGY]->(tech) +WHERE other_asset <> root +RETURN tech.name, collect(DISTINCT other_asset.name) AS related_assets +``` + +**Discover naming conventions:** +```cypher +MATCH (d:DNS_NAME) +WHERE d.name =~ '.*(app|srv|db|web|mail|ns|mx)0[0-9].*' +RETURN collect(d.name) AS discovered_pattern +``` + +**Cross-reference: domains sharing IP with a finding:** +```cypher +MATCH (f:FINDING)<-[:HAS_FINDING]-(asset) +OPTIONAL MATCH (asset)-[:RESOLVES_TO]->(ip:IP_ADDRESS) +OPTIONAL MATCH (ip)<-[:RESOLVES_TO]-(sibling:DNS_NAME) +WHERE sibling <> asset +RETURN f.type, asset.name, ip.address, collect(DISTINCT sibling.name) as co_hosted +``` + +--- + +## Path Analysis + +**Connection paths from domain to finding:** +```cypher +MATCH p=(d:DNS_NAME)-[*1..3]-(f:FINDING) +WHERE d.name = $domain +RETURN p +``` + +**All relationships for a specific asset:** +```cypher +MATCH (n)-[r]-(m) +WHERE n.name = $name +RETURN labels(n)[0] as source_type, n.name, type(r) as relationship, labels(m)[0] as target_type, m.name +``` + +**Shortest path between two assets:** +```cypher +MATCH p=shortestPath((n1:DNS_NAME {name: $start})-[*]-(n2:DNS_NAME {name: $end})) +RETURN p +``` + +--- + +## Aggregation & Statistics + +**Top ports across all hosts:** +```cypher +MATCH (p:OPEN_TCP_PORT) +RETURN p.port, count(p) as cnt +ORDER BY cnt DESC +LIMIT 10 +``` + +**Domains per IP (distribution):** +```cypher +MATCH (ip:IP_ADDRESS)<-[:RESOLVES_TO]-(d) +RETURN ip.address, count(d) as domain_count +ORDER BY domain_count DESC +LIMIT 20 +``` + +**Email addresses by domain:** +```cypher +MATCH (e:EMAIL_ADDRESS) +WHERE e.address ENDS WITH $domain +RETURN e.address +``` + +**Cloud provider breakdown:** +```cypher +MATCH (ip:IP_ADDRESS) +WHERE ip.provider IS NOT NULL +RETURN ip.provider, count(ip) as count +ORDER BY count DESC +``` + +--- + +## Tips + +- **Always use parameters** (`$param`) for user input to prevent Cypher injection +- **Start with small limits** (10-20) and increase if needed +- **Use regex escaping** (`\\`) for special characters in patterns +- **Combine queries** in the Orient phase to build a complete picture before deciding on the next scan +- **Date filtering**: `WHERE n.created_at > datetime('2024-01-01')` +- **NOT conditions**: `WHERE NOT n.status_code IN [404, 403, 401]` +- **Case-insensitive regex**: `WHERE n.name =~ '(?i).*admin.*'` diff --git a/dreadnode/attack-surface-management/skills/reconnaissance-planning/SKILL.md b/dreadnode/attack-surface-management/skills/reconnaissance-planning/SKILL.md new file mode 100644 index 0000000..8cb3d63 --- /dev/null +++ b/dreadnode/attack-surface-management/skills/reconnaissance-planning/SKILL.md @@ -0,0 +1,113 @@ +--- +name: reconnaissance-planning +description: Strategic reconnaissance planning for external attack surface management. Use when starting a new engagement, choosing initial scan strategy, or deciding how to expand coverage after initial results. +--- + +# Reconnaissance Planning + +## When to Use + +- Starting a new ASM engagement against a target +- Deciding scan strategy after initial enumeration +- Expanding coverage when initial results are sparse +- Pivoting approach based on discovered infrastructure + +## Phased Approach + +### Phase 1: Passive Discovery (Low Noise) + +Start with passive techniques to map the target without generating traffic. + +**Subdomain Enumeration:** +``` +run_bbot_scan(targets=["target.com"], presets=["subdomain-enum"], flags=["passive"]) +``` + +This queries certificate transparency logs, DNS databases, archive.org, and OSINT sources without touching the target directly. + +**What to look for after Phase 1:** +- Total number of discovered subdomains (establishes scale) +- Naming conventions (numbered patterns like `app01`, `srv02` suggest more exist) +- Cloud providers visible in DNS (CNAME to AWS, Azure, GCP) +- Mail infrastructure (MX records, SPF/DKIM) +- Development/staging indicators in subdomain names + +### Phase 2: Active Enumeration (Moderate Noise) + +Probe discovered assets to understand what's running. + +**Web service detection + technology fingerprinting:** +``` +run_bbot_scan(targets=["target.com"], presets=["subdomain-enum", "web-basic"]) +``` + +**Port scanning on key IPs:** +``` +run_bbot_scan(targets=["10.0.0.1"], modules=["portscan"], config=["modules.portscan.ports=top_1000"]) +``` + +**What to look for after Phase 2:** +- HTTP services on non-standard ports +- Technology stack patterns (is everything Rails? Is there one random PHP app?) +- Login pages, admin panels, API documentation +- SSL certificate details (org names, SANs with more domains) +- Default pages or error pages revealing server versions + +### Phase 3: Targeted Deep Scanning (Focused) + +Focus on high-value targets identified in Phase 2. + +**Vulnerability scanning on interesting hosts:** +``` +run_bbot_scan(targets=["api.target.com"], presets=["nuclei"]) +``` + +**Web spidering on complex applications:** +``` +run_bbot_scan(targets=["app.target.com"], presets=["spider"], config=["web.spider_distance=2", "web.spider_depth=3"]) +``` + +**Screenshot collection for visual triage:** +``` +run_bbot_scan(targets=["target.com"], modules=["gowitness"], presets=["subdomain-enum"]) +``` + +**Cloud resource enumeration:** +``` +run_bbot_scan(targets=["target.com"], presets=["cloud-enum"]) +``` + +### Phase 4: Expansion & Synthesis + +- Cross-reference findings across phases using graph queries +- Look for patterns: shared infrastructure, common technologies, consistent misconfigurations +- Identify assets that warrant manual investigation +- Build the final areas-of-interest list + +## Decision Tree: Choosing Scan Strategy + +``` +Is this a new target with no prior data? +├── YES → Start with Phase 1 (passive subdomain-enum) +└── NO → Do you have subdomains but no service info? + ├── YES → Run web-basic + tech-detect + └── NO → Do you have services but no vuln data? + ├── YES → Run nuclei on interesting hosts + └── NO → Focus on graph analysis and synthesis +``` + +## Scale Considerations + +| Target Size | Approach | +|---|---| +| Single domain | kitchen-sink preset covers everything | +| Small org (< 50 subdomains) | Full phased approach, thorough coverage | +| Medium org (50-500 subdomains) | Passive first, then targeted active scans on high-value assets | +| Large org (500+ subdomains) | Passive + selective active, prioritize by naming/business context | + +## Common Pitfalls + +- **Going too broad too early**: Don't run `kitchen-sink` on a large target. You'll drown in data. +- **Ignoring the graph**: Running scans without querying results between phases wastes cycles. +- **Scanning out of scope**: Always use `--whitelist` or `--strict-scope` for targets with clear boundaries. +- **Missing API keys**: Many passive modules (Shodan, SecurityTrails, etc.) need API keys configured in bbot.yml. Check if they're set before relying on passive results. diff --git a/dreadnode/attack-surface-management/skills/screenshot-triage/SKILL.md b/dreadnode/attack-surface-management/skills/screenshot-triage/SKILL.md new file mode 100644 index 0000000..8edf54b --- /dev/null +++ b/dreadnode/attack-surface-management/skills/screenshot-triage/SKILL.md @@ -0,0 +1,105 @@ +--- +name: screenshot-triage +description: Triage web application screenshots to identify high-value targets for manual investigation. Use when analyzing WEBSCREENSHOT nodes from BBOT scans or when visually assessing discovered web assets. +--- + +# Screenshot Triage + +## Purpose + +Triage web screenshots captured by BBOT (via gowitness) to identify high-value targets for human follow-up. The goal is vulnerability discovery — a screenshot is "interesting" if it suggests a high likelihood of success for a human tester. + +## Retrieving Screenshots + +```cypher +-- Find all unanalyzed screenshots +MATCH (s:WEBSCREENSHOT) WHERE s.analyzed IS NULL RETURN s.uuid, s.url + +-- Get screenshot for a specific URL +MATCH (s:WEBSCREENSHOT) WHERE s.url CONTAINS 'admin' RETURN s.uuid, s.url + +-- Get screenshots with their host context +MATCH (s:WEBSCREENSHOT)-[]-(parent) +RETURN s.uuid, s.url, labels(parent)[0] as parent_type, parent.name +``` + +Use `get_screenshot(uuid=...)` or `get_screenshot(url=...)` to retrieve the actual image. + +## Triage Principles + +1. **Intent is Vulnerability Discovery**: Focus on pages suggesting high exploitation potential. +2. **Structure Over Content**: Page structure (forms, dashboards, admin panels) matters more than marketing text. +3. **Appearance as Clue**: Bare-bones internal tools often have weaker security than polished public pages. +4. **Prioritize Interaction Points**: Forms, dashboards, and control panels are far more valuable than static pages. + +## Priority Classification + +### Critical + +- Administrative interfaces, control panels, backend management systems +- Login forms specifying "admin", "staff", or "internal" +- Database management interfaces (phpMyAdmin, Adminer, pgAdmin) +- Infrastructure dashboards (Grafana, Kibana, Jenkins, GitLab) + +### High + +- API documentation pages (Swagger, OpenAPI, Redoc, GraphQL Playground) +- Developer consoles or debugging interfaces +- Complex forms handling sensitive data (user settings, financial info) +- File upload functionality +- Pages displaying error messages, stack traces, or debug output +- Internal tools not meant for public viewing + +### Medium + +- Standard login forms without admin indicators +- Search functionality (potential for injection) +- User registration/account management pages +- Pages revealing technology versions or server information +- Legacy-looking applications built on old frameworks + +### Low + +- Marketing pages, blogs, documentation +- Static content with no interaction points +- Generic error pages (404, 403) without information leakage +- CDN or asset-serving endpoints + +## What to Look For + +### Visual Indicators of High Value + +- **Unstyled or minimal design**: Internal tools, dev environments +- **Framework default pages**: Fresh installs, unconfigured services +- **Data tables**: Internal metrics, user data, logs +- **Terminal/console interfaces**: Web shells, command runners +- **Multiple form fields**: Complex data entry suggesting business logic +- **Version numbers in footers**: Technology identification + +### Technology Clues + +- Specific software names or logos (WordPress, Jira, Confluence, Jenkins) +- URL patterns visible in screenshots (e.g., `/wp-admin/`, `/administrator/`) +- Framework-specific UI elements (Django admin, Rails scaffolding, Spring Boot Actuator) +- Cloud provider indicators (AWS console, Azure portal elements) + +### Red Flags + +- Stack traces with file paths and line numbers +- Database error messages with query fragments +- Debug toolbars (Django Debug Toolbar, Symfony Profiler) +- phpinfo() output +- Directory listings +- Default credentials displayed or hinted at +- "Powered by" footers with version information + +## Workflow + +1. Query for unanalyzed screenshots +2. For each screenshot: + a. Retrieve and examine the image + b. Classify priority (critical/high/medium/low) + c. Note specific elements of interest + d. Record what a human should investigate next +3. Mark screenshots as analyzed: set the `analyzed` property +4. Cross-reference high-priority screenshots with other graph data (technologies, findings, DNS names) diff --git a/dreadnode/attack-surface-management/skills/shodan-reconnaissance/SKILL.md b/dreadnode/attack-surface-management/skills/shodan-reconnaissance/SKILL.md new file mode 100644 index 0000000..65a0098 --- /dev/null +++ b/dreadnode/attack-surface-management/skills/shodan-reconnaissance/SKILL.md @@ -0,0 +1,168 @@ +--- +name: shodan-reconnaissance +description: Shodan query strategies for internet-wide host intelligence, asset discovery, vulnerability correlation, and attack surface mapping. Use when enriching BBOT results with Shodan data, hunting for exposed services, or correlating CVEs with discovered infrastructure. +--- + +# Shodan Reconnaissance + +## Purpose + +Use Shodan to complement BBOT reconnaissance with internet-wide scan intelligence. Shodan provides passive host data, banner analysis, CVE correlation, and historical records that BBOT's active scanning may miss. + +For evaluation tasks, the Shodan MCP can be pointed at a task-local mock service +by setting `SHODAN_API_URL` while still providing any non-empty +`SHODAN_API_KEY`. When `SHODAN_API_URL` is unset, the MCP uses the official +Shodan API client. + +## Integration with BBOT Graph + +The primary workflow is: **BBOT discovers assets → Shodan enriches them.** + +1. Query the Neo4j graph for discovered IPs and domains +2. Use Shodan to enrich with service details, CVEs, and historical data +3. Feed Shodan findings back into analysis + +### Enrichment Patterns + +**Enrich all discovered IPs:** +```cypher +MATCH (ip:IP_ADDRESS) RETURN ip.address +``` +Then for each IP: `shodan_host_info(ip="x.x.x.x")` + +**Find org-wide exposure:** +``` +shodan_host_search(query='org:"Target Corp"') +``` +Cross-reference results against known IPs in the graph. + +**Discover assets BBOT missed:** +``` +shodan_host_search(query='ssl.cert.subject.cn:target.com') +shodan_host_search(query='hostname:target.com') +``` +Compare with `MATCH (d:DNS_NAME) RETURN d.name` to find gaps. + +## Query Strategies + +### Asset Discovery + +| Goal | Query | +|---|---| +| All hosts for an org | `org:"Target Corp"` | +| Hosts by hostname | `hostname:target.com` | +| Hosts by SSL cert | `ssl.cert.subject.cn:target.com` | +| Hosts by IP range | `net:10.0.0.0/24` | +| Hosts by ASN | `asn:AS12345` | +| Cloud-hosted assets | `org:"Amazon" hostname:target.com` | + +### Service Hunting + +| Goal | Query | +|---|---| +| Web servers | `hostname:target.com port:80,443` | +| Remote desktop | `port:3389 org:"Target Corp"` | +| SSH servers | `port:22 org:"Target Corp"` | +| Databases | `port:3306,5432,27017,6379 org:"Target Corp"` | +| Elasticsearch | `port:9200 org:"Target Corp"` | +| Docker APIs | `port:2375,2376 org:"Target Corp"` | +| Kubernetes | `port:6443,10250 org:"Target Corp"` | + +### Vulnerability Hunting + +| Goal | Query | +|---|---| +| Hosts with known CVEs | `vuln:CVE-2021-44228 org:"Target Corp"` | +| Outdated SSL | `ssl.version:sslv2 hostname:target.com` | +| Expired certs | `ssl.cert.expired:true hostname:target.com` | +| Self-signed certs | `ssl.cert.issuer.cn:target.com hostname:target.com` | +| Default credentials | `"default password" org:"Target Corp"` | +| Specific product vulns | `product:"Apache" version:"2.4.49"` | + +### Technology Fingerprinting + +| Goal | Query | +|---|---| +| Specific product | `product:"nginx" org:"Target Corp"` | +| HTTP title match | `http.title:"Dashboard" org:"Target Corp"` | +| Specific server header | `"Server: Apache/2.4.49"` | +| WAF detection | `http.waf:"Cloudflare" hostname:target.com` | +| CMS detection | `http.component:"WordPress" hostname:target.com` | + +## Workflow: Full ASM Enrichment + +### Step 1: Check Credits +Always start here to avoid hitting limits. +``` +shodan_api_info() +``` + +### Step 2: Scope Assessment +Use `shodan_count` (free, no credits) before committing to full searches. +``` +shodan_count(query='org:"Target Corp"') +shodan_count(query='hostname:target.com') +shodan_count(query='ssl.cert.subject.cn:target.com') +``` + +### Step 3: Broad Discovery +Run full searches on the most productive queries. +``` +shodan_host_search(query='org:"Target Corp"', facets='port,product,country') +``` +Facets give you instant distribution analysis without paging through results. + +### Step 4: Targeted Enrichment +For high-value IPs found by BBOT, get full details. +``` +shodan_host_info(ip="x.x.x.x", history=True) +``` +Historical data reveals services that were recently taken down or changed. + +### Step 5: Exploit Correlation +For any CVEs found on hosts, check exploit availability. +``` +shodan_exploits_search(query="CVE-2024-XXXXX") +``` + +### Step 6: DNS Cross-Reference +Validate and expand DNS data. +``` +shodan_dns_lookup(hostnames=["api.target.com", "dev.target.com"]) +shodan_dns_reverse(ips=["x.x.x.x", "y.y.y.y"]) +``` + +## Facet Analysis + +Facets are the most powerful Shodan feature for ASM. They aggregate across all results without pagination. + +**Key facets:** +- `port` — Service distribution +- `product` — Technology distribution +- `country` — Geographic distribution +- `org` — Hosting provider distribution +- `os` — Operating system distribution +- `vuln` — CVE distribution (requires paid plan) +- `ssl.version` — SSL/TLS version distribution +- `http.title` — Web page title distribution + +**Example: Full surface profile in one query:** +``` +shodan_host_search( + query='org:"Target Corp"', + facets='port,product,country,os,vuln,ssl.version' +) +``` + +## Credit Management + +| Operation | Credit Cost | +|---|---| +| `shodan_count` | Free | +| `shodan_host_search` | 1 query credit per page | +| `shodan_host_info` | Free (IP lookups are free) | +| `shodan_dns_lookup` | Free | +| `shodan_dns_reverse` | Free | +| `shodan_exploits_search` | Free | + +**Strategy**: Use `count` + facets first, `host_info` for specific IPs (free), and reserve `host_search` for when you need the full match list. diff --git a/dreadnode/attack-surface-management/tests/test_bbot_mcp.py b/dreadnode/attack-surface-management/tests/test_bbot_mcp.py new file mode 100644 index 0000000..2181b90 --- /dev/null +++ b/dreadnode/attack-surface-management/tests/test_bbot_mcp.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +import asyncio +import importlib.util +import json +from pathlib import Path + +import pytest + + +MODULE_PATH = Path(__file__).resolve().parents[1] / "mcp" / "bbot.py" + + +def load_bbot_module(): + spec = importlib.util.spec_from_file_location("asm_bbot_mcp_test", MODULE_PATH) + assert spec and spec.loader + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +class FakeNeo4j: + def __init__(self, responses: list[list[dict]] | None = None) -> None: + self.responses = list(responses or []) + self.calls: list[tuple[str, dict | None]] = [] + + async def query(self, cypher: str, params: dict | None = None) -> list[dict]: + self.calls.append((cypher, params)) + if self.responses: + return self.responses.pop(0) + return [] + + +def run(coro): + return asyncio.run(coro) + + +def test_get_db_schema_collects_labels_relationships_and_properties(monkeypatch): + bbot = load_bbot_module() + fake = FakeNeo4j([ + [{"label": "DNS_NAME"}, {"label": "URL"}], + [{"relationshipType": "A"}, {"relationshipType": "httpx"}], + [ + { + "nodeType": ":DNS_NAME", + "propertyName": "data", + "propertyTypes": ["String"], + "mandatory": False, + } + ], + [ + { + "relType": ":A", + "propertyName": "module", + "propertyTypes": ["String"], + "mandatory": False, + } + ], + ]) + monkeypatch.setattr(bbot, "_neo4j", fake) + + result = json.loads(run(bbot.get_db_schema())) + + assert result["node_labels"] == ["DNS_NAME", "URL"] + assert result["relationship_types"] == ["A", "httpx"] + assert result["node_properties"]["DNS_NAME"][0]["property"] == "data" + assert result["relationship_properties"]["A"][0]["property"] == "module" + assert len(fake.calls) == 4 + + +def test_explore_nodes_builds_parameterized_filter(monkeypatch): + bbot = load_bbot_module() + fake = FakeNeo4j([[{"node": {"data": "dev.target.local"}}]]) + monkeypatch.setattr(bbot, "_neo4j", fake) + + result = json.loads(run(bbot.explore_nodes("DNS_NAME", "data CONTAINS dev", 25))) + + cypher, params = fake.calls[0] + assert "MATCH (node:DNS_NAME)" in cypher + assert "node.`data`" in cypher + assert "$value" in cypher + assert params == {"limit": 25, "value": "dev"} + assert result[0]["node"]["data"] == "dev.target.local" + + +def test_explore_nodes_rejects_cypher_identifier_injection(): + bbot = load_bbot_module() + + with pytest.raises(ValueError, match="Invalid label"): + run(bbot.explore_nodes("DNS_NAME) DETACH DELETE n //", None, 10)) + + with pytest.raises(ValueError, match="Invalid property"): + run(bbot.explore_nodes("DNS_NAME", "data`) MATCH (n) RETURN n //=x", 10)) + + +def test_explore_relationships_validates_identifiers_and_limits(monkeypatch): + bbot = load_bbot_module() + fake = FakeNeo4j([[{"source": "a", "relationship": "r", "target": "b"}]]) + monkeypatch.setattr(bbot, "_neo4j", fake) + + run(bbot.explore_relationships("DNS_NAME", "A", "IP_ADDRESS", 3)) + + cypher, params = fake.calls[0] + assert "MATCH (source:DNS_NAME)-[relationship:A]->(target:IP_ADDRESS)" in cypher + assert params == {"limit": 3} + with pytest.raises(ValueError, match="Limit"): + run(bbot.explore_relationships(limit=0)) + + +def test_get_subdomains_and_technologies_use_envelope_fallbacks(monkeypatch): + bbot = load_bbot_module() + fake = FakeNeo4j([ + [{"name": "api.target.local"}], + [{"name": "JBoss Application Server", "version": "4.0", "usage": 1}], + ]) + monkeypatch.setattr(bbot, "_neo4j", fake) + + assert json.loads(run(bbot.get_subdomains("target.local", 100)))[0]["name"] == "api.target.local" + assert json.loads(run(bbot.get_technologies()))[0]["name"] == "JBoss Application Server" + + subdomain_query = fake.calls[0][0] + technology_query = fake.calls[1][0] + assert "coalesce(n.name, n.data, n.host)" in subdomain_query + assert "coalesce(t.name, t.data)" in technology_query + + +def test_get_screenshot_resolves_bbot_scan_path(monkeypatch, tmp_path): + bbot = load_bbot_module() + screenshot = tmp_path / "scans" / "scan-one" / "screenshots" / "app.png" + screenshot.parent.mkdir(parents=True) + screenshot.write_bytes(b"png") + + fake = FakeNeo4j([ + [ + { + "web_props": { + "uuid": "shot-1", + "data": json.dumps({ + "path": "screenshots/app.png", + "url": "https://app.target.local", + }), + "scan": "scan-one", + }, + "scan_props": {"data": json.dumps({"name": "scan-one"})}, + } + ] + ]) + monkeypatch.setattr(bbot, "_neo4j", fake) + monkeypatch.setattr(bbot, "BBOT_DATA_DIR", str(tmp_path)) + + result = json.loads(run(bbot.get_screenshot(uuid="shot-1"))) + + assert result["path"] == str(screenshot) + assert result["url"] == "https://app.target.local" + assert result["uuid"] == "shot-1" + + +def test_get_screenshot_reports_checked_paths_when_missing(monkeypatch, tmp_path): + bbot = load_bbot_module() + fake = FakeNeo4j([ + [ + { + "web_props": {"uuid": "shot-1", "path": "missing.png", "scan": "scan-one"}, + "scan_props": {"name": "scan-one"}, + } + ] + ]) + monkeypatch.setattr(bbot, "_neo4j", fake) + monkeypatch.setattr(bbot, "BBOT_DATA_DIR", str(tmp_path)) + + result = json.loads(run(bbot.get_screenshot(uuid="shot-1"))) + + assert result["error"] == "Screenshot file not found." + assert str(tmp_path / "scans" / "scan-one" / "missing.png") in result["checked_paths"] diff --git a/dreadnode/attack-surface-management/tests/test_shodan_mcp.py b/dreadnode/attack-surface-management/tests/test_shodan_mcp.py new file mode 100644 index 0000000..4977323 --- /dev/null +++ b/dreadnode/attack-surface-management/tests/test_shodan_mcp.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +import importlib.util +import json +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path +from urllib import parse + + +MODULE_PATH = Path(__file__).resolve().parents[1] / "mcp" / "shodan.py" + + +def load_shodan_module(): + spec = importlib.util.spec_from_file_location("asm_shodan_mcp_test", MODULE_PATH) + assert spec and spec.loader + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +class MockShodanHandler(BaseHTTPRequestHandler): + requests: list[tuple[str, dict[str, list[str]]]] = [] + + def do_GET(self) -> None: + parsed = parse.urlparse(self.path) + query = parse.parse_qs(parsed.query) + self.requests.append((parsed.path, query)) + body = self.response_for(parsed.path, query) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(body).encode("utf-8")) + + def log_message(self, format: str, *args: object) -> None: + return + + @staticmethod + def response_for(path: str, query: dict[str, list[str]]) -> object: + if path == "/shodan/host/search": + return { + "total": 1, + "matches": [ + { + "ip_str": "203.0.113.10", + "port": 9200, + "org": "Target Corp", + "hostnames": ["search.target.local"], + "domains": ["target.local"], + "transport": "tcp", + "product": "Elasticsearch", + "version": "7.10.0", + "vulns": {"CVE-2021-44228": {}}, + } + ], + "facets": {"port": [{"value": 9200, "count": 1}]}, + } + if path == "/shodan/host/203.0.113.10": + return { + "ip_str": "203.0.113.10", + "org": "Target Corp", + "ports": [9200], + "hostnames": ["search.target.local"], + "domains": ["target.local"], + "vulns": ["CVE-2021-44228"], + "country_name": "United States", + "city": "New York", + "asn": "AS64500", + "isp": "Example ISP", + "data": [{"port": 9200, "product": "Elasticsearch", "version": "7.10.0", "data": "banner"}], + } + if path == "/shodan/host/count": + return {"total": 1, "facets": {"product": [{"value": "Elasticsearch", "count": 1}]}} + if path == "/dns/resolve": + return {"search.target.local": "203.0.113.10"} + if path == "/dns/reverse": + return {"203.0.113.10": ["search.target.local"]} + if path == "/exploits/search": + return { + "total": 1, + "matches": [ + { + "_id": "EXP-1", + "description": "Elasticsearch exploit", + "author": "researcher", + "type": "remote", + "platform": "linux", + "date": "2024-01-01", + "source": "mock", + "cve": ["CVE-2021-44228"], + } + ], + "facets": {}, + } + if path == "/shodan/ports": + return [80, 443, 9200] + if path == "/shodan/protocols": + return {"http": "HTTP"} + if path == "/api-info": + return {"plan": "mock", "query_credits": 100, "scan_credits": 0, "unlocked": True} + if path == "/shodan/query/search": + return {"total": 1, "matches": [{"title": "Databases", "query": "port:9200"}]} + if path == "/shodan/query/tags": + return ["database"] + return {} + + +def start_mock_server(): + MockShodanHandler.requests = [] + server = HTTPServer(("127.0.0.1", 0), MockShodanHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + return server, f"http://127.0.0.1:{server.server_port}" + + +def test_http_shodan_client_uses_mock_base_url_and_api_key(): + shodan_mcp = load_shodan_module() + server, base_url = start_mock_server() + try: + client = shodan_mcp._HttpShodanClient(base_url, "test-key") + + search = client.search('org:"Target Corp"', facets="port") + host = client.host("203.0.113.10", history=True) + count = client.count("port:9200") + resolved = client.dns.resolve("search.target.local") + exploits = client.exploits.search("CVE-2021-44228") + + assert search["matches"][0]["ip_str"] == "203.0.113.10" + assert host["ports"] == [9200] + assert count["total"] == 1 + assert resolved["search.target.local"] == "203.0.113.10" + assert exploits["matches"][0]["cve"] == ["CVE-2021-44228"] + assert all(request_query.get("key") == ["test-key"] for _, request_query in MockShodanHandler.requests) + finally: + server.shutdown() + + +def test_shodan_tool_formatting_with_mock_client(monkeypatch): + shodan_mcp = load_shodan_module() + server, base_url = start_mock_server() + client = shodan_mcp._HttpShodanClient(base_url, "test-key") + monkeypatch.setattr(shodan_mcp, "_get_client", lambda: client) + try: + host = json.loads(shodan_mcp.shodan_host_info("203.0.113.10", history=True)) + search = json.loads(shodan_mcp.shodan_host_search('org:"Target Corp"', facets="port")) + exploits = json.loads(shodan_mcp.shodan_exploits_search("CVE-2021-44228")) + api_info = json.loads(shodan_mcp.shodan_api_info()) + + assert host["location"]["asn"] == "AS64500" + assert host["data"][0]["banner"] == "banner" + assert search["matches"][0]["vulns"] == ["CVE-2021-44228"] + assert exploits["matches"][0]["id"] == "EXP-1" + assert api_info["plan"] == "mock" + finally: + server.shutdown() diff --git a/dreadnode/attack-surface-management/tools/FUTURE_TOOLS.md b/dreadnode/attack-surface-management/tools/FUTURE_TOOLS.md new file mode 100644 index 0000000..ad6f769 --- /dev/null +++ b/dreadnode/attack-surface-management/tools/FUTURE_TOOLS.md @@ -0,0 +1,34 @@ +# Future Tool Integrations + +## ProjectDiscovery Tools (via pdtm) + +The agent container has `pdtm` (ProjectDiscovery Tool Manager) pre-installed, +which provides access to the full ProjectDiscovery suite. These are available +as CLI tools in the container and do not need separate MCP servers or toolsets +at this time. + +Available via pdtm: +- **httpx** — HTTP probing, tech detection, response analysis +- **nuclei** — Template-based vulnerability scanning +- **subfinder** — Fast passive subdomain enumeration +- **naabu** — Port scanning +- **katana** — Web crawling/spidering +- **uncover** — Meta-search across Shodan/Censys/Fofa/Hunter/ZoomEye +- **dnsx** — DNS toolkit (resolution, brute-force, wildcard filtering) +- **tlsx** — TLS/SSL inspection +- **notify** — Webhook/Slack/Discord notifications +- **cloudlist** — Cloud asset listing (AWS, Azure, GCP) +- **chaos** — ProjectDiscovery chaos dataset +- **alterx** — Subdomain wordlist generation +- **asnmap** — ASN mapping + +These can be invoked directly via bash tool calls when the agent runs in +the container environment. If dedicated tool wrappers or MCP servers are +needed for any of these, add them here. + +## Potential Future Additions + +- **DNS intelligence toolset** — WHOIS, reverse WHOIS, historical DNS, DMARC/SPF/DKIM analysis +- **Censys MCP** — Internet-wide scan data (complementary to Shodan) +- **Reporting/export tool** — Structured ASM report generation from Neo4j graph +- **Continuous monitoring** — Graph diffing, new asset alerting, scheduled scans diff --git a/dreadnode/attack-surface-management/tools/bbot.py b/dreadnode/attack-surface-management/tools/bbot.py new file mode 100644 index 0000000..68d49f3 --- /dev/null +++ b/dreadnode/attack-surface-management/tools/bbot.py @@ -0,0 +1,486 @@ +"""BBOT reconnaissance scanning and Neo4j graph database query tools. + +Provides run_bbot_scan for executing BBOT CLI scans and a suite of Neo4j +Cypher query tools for analyzing reconnaissance results stored in a +property graph. The Neo4j connection is lazily initialized on first query. + +Prerequisites: + - bbot CLI installed and in PATH + - Neo4j database running (container or external) + - Neo4j output module configured in BBOT +""" + +from __future__ import annotations + +import ast +import asyncio +import contextlib +import json +import logging +import os +import shlex +import typing as t +from pathlib import Path + +from dreadnode.agents.tools import Toolset, tool_method +from pydantic import PrivateAttr + +try: + from neo4j import AsyncDriver, AsyncGraphDatabase +except ImportError: + AsyncDriver = None # type: ignore[assignment, misc] + AsyncGraphDatabase = None # type: ignore[assignment, misc] + +# Reduce Neo4j driver logging noise +logging.getLogger("neo4j").setLevel(logging.ERROR) + + +def _parse_serialized_dict(data: str) -> t.Any: + """Parse string representations of JSON or Python literals into dicts.""" + if not isinstance(data, str): + return data + with contextlib.suppress(Exception): + result = json.loads(data) + return result if isinstance(result, dict) else {} + with contextlib.suppress(Exception): + result = ast.literal_eval(data) + return result if isinstance(result, dict) else {} + return data + + +def _summarize(data: dict[str, t.Any]) -> dict[str, t.Any]: + """Condense a node record to essential fields, truncating long values.""" + summary: dict[str, t.Any] = {} + essential = ["id", "type", "data", "host", "netloc", "port", "tags", "scope_description", "scope_distance"] + for field in essential: + if field in data and data[field] is not None: + value = data[field] + if isinstance(value, list) and len(value) > 5: + summary[field] = value[:5] + summary[f"{field}_truncated"] = True + elif isinstance(value, str) and len(value) > 200: + summary[field] = value[:200] + "..." + else: + summary[field] = value + if "id" in summary and isinstance(summary["id"], str) and len(summary["id"]) > 40: + summary["id"] = summary["id"][:40] + "..." + return summary + + +class BbotTools(Toolset): + """Execute BBOT reconnaissance scans and query results from the Neo4j graph database.""" + + neo4j_uri: str = "bolt://localhost:7687" + """Neo4j bolt URI. Override with NEO4J_URI env var or set directly.""" + + neo4j_user: str = "neo4j" + """Neo4j username.""" + + neo4j_password: str = "bbotislife" + """Neo4j password.""" + + bbot_data_dir: str = ".bbot" + """Directory where BBOT stores scan data.""" + + scan_timeout: int = 3600 + """Maximum time in seconds for a BBOT scan to run.""" + + max_output_chars: int = 50_000 + """Maximum characters returned from scan output.""" + + _driver: t.Any = PrivateAttr(default=None) + + def model_post_init(self, __context: t.Any) -> None: + """Apply environment variable overrides after initialization.""" + if uri := os.environ.get("NEO4J_URI"): + self.neo4j_uri = uri + if user := os.environ.get("NEO4J_USER"): + self.neo4j_user = user + if password := os.environ.get("NEO4J_PASSWORD"): + self.neo4j_password = password + + async def _ensure_driver(self) -> "AsyncDriver": + """Lazily initialize and return the Neo4j async driver.""" + if self._driver is None: + if AsyncGraphDatabase is None: + raise RuntimeError( + "neo4j package is not installed. Install with: pip install neo4j>=5.28.1" + ) + self._driver = AsyncGraphDatabase.driver( + self.neo4j_uri, auth=(self.neo4j_user, self.neo4j_password) + ) + await self._driver.verify_connectivity() + return self._driver + + async def _query( + self, cypher: str, params: dict[str, t.Any] | None = None + ) -> list[dict[str, t.Any]]: + """Execute a Cypher query and return results as list of dicts.""" + driver = await self._ensure_driver() + async with driver.session() as session: + result = await session.run(cypher, params or {}) + return [record.data() async for record in result] + + async def _get_nodes( + self, label: str, filters: dict[str, t.Any] | None = None, limit: int = 100 + ) -> list[dict[str, t.Any]]: + """Fetch nodes by label with optional property filtering.""" + where_clauses = ["$label IN labels(n)"] + if filters: + where_clauses.extend(f"n.`{key}` = ${key}" for key in filters) + + cypher = f""" + MATCH (n) + WHERE {" AND ".join(where_clauses)} + RETURN n + {"LIMIT " + str(limit) if limit else ""} + """ + params: dict[str, t.Any] = {"label": label} + if filters: + params.update(filters) + result = await self._query(cypher, params) + return [record["n"] for record in result] + + # ── Scanning ────────────────────────────────────────────────────────── + + @tool_method(name="run_bbot_scan", catch=True) + async def run_scan( + self, + targets: t.Annotated[list[str], "Targets to scan (e.g., ['example.com', '10.0.0.0/24'])"], + modules: t.Annotated[list[str] | None, "Modules to run (e.g., ['httpx', 'nuclei'])"] = None, + presets: t.Annotated[ + list[str] | None, + "Presets to use (e.g., ['subdomain-enum', 'web-basic']). " + "Available: subdomain-enum, web-basic, web-thorough, cloud-enum, code-enum, " + "email-enum, spider, nuclei, nuclei-intense, dirbust-light, dirbust-heavy, " + "lightfuzz-light, lightfuzz-medium, tech-detect, kitchen-sink", + ] = None, + flags: t.Annotated[ + list[str] | None, + "Flags to enable module groups (e.g., ['passive', 'safe']). " + "Available: active, passive, safe, aggressive, subdomain-enum, web-basic, " + "web-thorough, cloud-enum, code-enum, portscan, web-screenshots", + ] = None, + config: t.Annotated[ + list[str] | None, + "Custom config in key=value format (e.g., ['modules.httpx.timeout=5'])", + ] = None, + extra_args: t.Annotated[ + list[str] | None, + "Additional bbot CLI flags (e.g., ['--strict-scope', '--proxy http://127.0.0.1:8080'])", + ] = None, + ) -> str: + """Execute a BBOT reconnaissance scan against targets. + + Assembles and runs a `bbot` command, automatically configuring it to + report findings to the Neo4j database. Results are stored in the graph + and can be queried with the other tools. + + The scan runs locally via the bbot CLI which must be installed and in PATH. + """ + if not targets: + raise ValueError("At least one target is required to run a scan.") + + # Configure Neo4j output + config = config or [] + config.extend([ + f"modules.neo4j.uri={self.neo4j_uri}", + f"modules.neo4j.username={self.neo4j_user}", + f"modules.neo4j.password={self.neo4j_password}", + ]) + + # Assemble the BBOT command + parts = ["bbot", "--yes", "--output-modules", "neo4j", "--brief"] + + parts.extend(["--targets", *targets]) + + if modules: + parts.extend(["--modules", *modules]) + if flags: + parts.extend(["--flags", *flags]) + if presets: + parts.extend(["--preset", *presets]) + if config: + parts.extend(["--config", *config]) + if extra_args: + parts.extend(extra_args) + + command_str = " ".join(parts) + + # Execute the scan + try: + process = await asyncio.create_subprocess_exec( + *shlex.split(command_str), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + output_chunks: list[str] = [] + + async def stream() -> None: + if not process or not process.stdout: + return + while True: + line = await process.stdout.readline() + if not line: + break + output_chunks.append(line.decode(errors="replace").strip()) + + await asyncio.wait_for(stream(), timeout=self.scan_timeout) + await process.wait() + + exit_code = process.returncode or 0 + + except asyncio.TimeoutError: + if process: + with contextlib.suppress(ProcessLookupError): + process.kill() + output = "\n".join(output_chunks) + return f"Scan timed out after {self.scan_timeout}s. Partial output:\n{output}" + + except FileNotFoundError: + return ( + "Error: bbot command not found. " + "Install BBOT: pip install bbot (https://github.com/blacklanternsecurity/bbot)" + ) + + output = "\n".join(output_chunks) + + if exit_code != 0: + return f"BBOT scan exited with code {exit_code}:\n{output}" + + if len(output) > self.max_output_chars: + output = output[: self.max_output_chars] + f"\n\n... [TRUNCATED: {len(output)} chars total]" + + return f"Scan completed successfully.\n\n{output}" + + # ── Graph Queries ───────────────────────────────────────────────────── + + @tool_method(name="query_graph", catch=True) + async def query_graph( + self, + cypher: t.Annotated[str, "The Cypher query to execute"], + params: t.Annotated[ + dict[str, t.Any] | None, + "Optional parameters to safely inject values (prevents injection). " + "Use $param syntax in the query.", + ] = None, + ) -> str: + """Execute a Cypher query against the Neo4j graph database. + + This is the primary analysis tool for exploring reconnaissance data. + Use parameterized queries ($param) for user input to prevent injection. + + Common patterns: + Count by type: MATCH (n) RETURN labels(n)[0] as type, count(n) as count ORDER BY count DESC + Find domains: MATCH (n:DNS_NAME) WHERE n.name CONTAINS 'api' RETURN n.name LIMIT 20 + DNS to IP: MATCH (d:DNS_NAME)-[:RESOLVES_TO]->(ip:IP_ADDRESS) RETURN d.name, ip.address + Critical findings: MATCH (f:FINDING) WHERE f.severity IN ['critical', 'high'] RETURN f + Tech stack: MATCH (n:TECHNOLOGY) RETURN DISTINCT n.name, n.version + Shared hosting: MATCH (ip:IP_ADDRESS)<-[:RESOLVES_TO]-(d) WITH ip, count(d) as cnt WHERE cnt > 1 RETURN ip.address, cnt + """ + result = await self._query(cypher, params) + return json.dumps(result, indent=2, default=str) + + @tool_method(name="get_scan_metadata", catch=True) + async def get_scans( + self, + scope_distance: t.Annotated[int, "Filter by scope distance (0 = direct targets)"] = 0, + tags: t.Annotated[list[str] | None, "Filter by scan tags"] = None, + ) -> str: + """Retrieve metadata about completed BBOT scans. + + Returns scan IDs, targets, modules used, and timing information. + """ + scans = await self._get_nodes( + label="SCAN", + filters={"scope_distance": scope_distance, **({"tags": tags} if tags else {})}, + ) + summarized = [_summarize(scan) for scan in scans] + return json.dumps(summarized, indent=2, default=str) + + @tool_method(name="get_findings", catch=True) + async def get_findings( + self, + scope_distance: t.Annotated[int, "Filter by scope distance (0 = direct targets)"] = 0, + tags: t.Annotated[list[str] | None, "Filter by tags (e.g., ['critical', 'authentication'])"] = None, + ) -> str: + """Retrieve security findings and vulnerabilities from scans. + + Returns finding type, severity, description, affected resource, + and evidence. Use this to quickly identify confirmed issues. + """ + findings = await self._get_nodes( + label="FINDING", + filters={"scope_distance": scope_distance, **({"tags": tags} if tags else {})}, + ) + summarized = [_summarize(finding) for finding in findings] + return json.dumps(summarized, indent=2, default=str) + + @tool_method(name="get_db_schema", catch=True) + async def get_schema(self) -> str: + """Retrieve the Neo4j database schema. + + Returns node labels, relationship types, and their properties. + Essential for understanding the data model and constructing queries. + """ + queries = { + "node_labels": "CALL db.labels() YIELD label", + "relationship_types": "CALL db.relationshipTypes() YIELD relationshipType", + "node_properties": "CALL db.schema.nodeTypeProperties()", + "relationship_properties": "CALL db.schema.relTypeProperties()", + } + + results = await asyncio.gather(*(self._query(q) for q in queries.values())) + node_labels_res, rel_types_res, node_props_res, rel_props_res = results + + schema: dict[str, t.Any] = { + "node_labels": sorted([r["label"] for r in node_labels_res]), + "relationship_types": sorted([r["relationshipType"] for r in rel_types_res]), + "node_properties": {}, + "relationship_properties": {}, + } + + for record in node_props_res: + label = record.get("nodeType", "").lstrip(":") + if not label: + continue + if label not in schema["node_properties"]: + schema["node_properties"][label] = [] + schema["node_properties"][label].append({ + "property": record.get("propertyName"), + "types": record.get("propertyTypes"), + "mandatory": record.get("mandatory"), + }) + + for record in rel_props_res: + rel_type = record.get("relType", "").lstrip(":") + if not rel_type: + continue + if rel_type not in schema["relationship_properties"]: + schema["relationship_properties"][rel_type] = [] + schema["relationship_properties"][rel_type].append({ + "property": record.get("propertyName"), + "types": record.get("propertyTypes"), + "mandatory": record.get("mandatory"), + }) + + return json.dumps(schema, indent=2, default=str) + + @tool_method(name="explore_nodes", catch=True) + async def explore_nodes( + self, + label: t.Annotated[str | None, "Node type (e.g., 'DNS_NAME', 'URL', 'FINDING')"] = None, + property_filter: t.Annotated[ + str | None, + "Filter: 'property=value' for exact match, 'property CONTAINS value' for substring", + ] = None, + limit: t.Annotated[int, "Maximum nodes to return (1-1000)"] = 100, + ) -> str: + """Explore nodes in the graph database interactively. + + Flexible tool for discovering and examining nodes when you're not sure + exactly what you're looking for. Use get_db_schema() first to see + available node labels. + """ + if limit < 1 or limit > 1000: + raise ValueError("Limit must be between 1 and 1000.") + + query_parts = [f"MATCH (node:{label})" if label else "MATCH (node)"] + params: dict[str, t.Any] = {} + + if property_filter: + if "CONTAINS" in property_filter: + parts = property_filter.split("CONTAINS", 1) + if len(parts) == 2: + prop, value = parts + query_parts.append(f"WHERE node.{prop.strip()} CONTAINS $value") + params["value"] = value.strip() + elif "=" in property_filter: + prop, value = property_filter.split("=", 1) + query_parts.append(f"WHERE node.{prop.strip()} = $value") + params["value"] = value.strip() + + query_parts.append("RETURN node LIMIT $limit") + + result = await self._query(" ".join(query_parts), {"limit": limit, **params}) + return json.dumps(result, indent=2, default=str) + + @tool_method(name="explore_relationships", catch=True) + async def explore_relationships( + self, + source_label: t.Annotated[str | None, "Source node type (e.g., 'DNS_NAME')"] = None, + relationship_type: t.Annotated[str | None, "Relationship type (e.g., 'RESOLVES_TO')"] = None, + target_label: t.Annotated[str | None, "Target node type (e.g., 'IP_ADDRESS')"] = None, + limit: t.Annotated[int, "Maximum relationships to return (1-1000)"] = 100, + ) -> str: + """Discover how nodes are connected in the graph database. + + Use get_db_schema() to see available relationship types. + """ + if limit < 1 or limit > 1000: + raise ValueError("Limit must be between 1 and 1000.") + + source = f"(source:{source_label})" if source_label else "(source)" + rel = f"-[relationship:{relationship_type}]->" if relationship_type else "-[relationship]->" + target = f"(target:{target_label})" if target_label else "(target)" + + query = f"MATCH {source}{rel}{target} RETURN source, relationship, target LIMIT $limit" + result = await self._query(query, {"limit": limit}) + return json.dumps(result, indent=2, default=str) + + @tool_method(name="get_screenshot", catch=True) + async def get_screenshot( + self, + uuid: t.Annotated[str | None, "The UUID of the WEBSCREENSHOT node"] = None, + url: t.Annotated[str | None, "The URL to find a screenshot of"] = None, + ) -> str: + """Retrieve a screenshot from the database. + + Identify a screenshot by its UUID (from explore_nodes) or by the + original URL that was screenshotted. Returns the file path for viewing. + """ + if not uuid and not url: + raise ValueError("Either 'uuid' or 'url' must be provided.") + + if url and not uuid: + nodes = await self._query( + "MATCH (node:WEBSCREENSHOT) WHERE node.url CONTAINS $url RETURN node LIMIT 1", + {"url": url}, + ) + if not nodes: + return f"No screenshot found for URL '{url}'." + uuid = nodes[0].get("node", {}).get("uuid") + if not uuid: + return f"No screenshot found for URL '{url}'." + + cypher = """ + MATCH (w:WEBSCREENSHOT {uuid: $uuid}) + MATCH (s:SCAN {id: w.scan}) + RETURN w.data AS web_data, s.data AS scan_data + """ + result = await self._query(cypher, {"uuid": uuid}) + if not result: + return f"No screenshot data found for UUID '{uuid}'." + + scan_data = _parse_serialized_dict(result[0].get("scan_data", "")) + web_data = _parse_serialized_dict(result[0].get("web_data", "")) + + scan_name = str(scan_data.get("name", "")) + relative_path = str(web_data.get("path", "")) + original_url = str(web_data.get("url", "")) + + if not all([scan_name, relative_path]): + return "Screenshot data is missing required fields." + + bbot_home = Path(self.bbot_data_dir).expanduser().resolve() + full_path = bbot_home / "scans" / scan_name / relative_path + + if not full_path.exists(): + return f"Screenshot file not found at: {full_path}" + + return json.dumps({ + "path": str(full_path), + "url": original_url, + "uuid": uuid, + }, indent=2) From c5ae36fb96ebb2002dd85ac8ce04351fe2f5590f Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:10:40 -0400 Subject: [PATCH 2/2] Remove future ASM tooling notes --- .../tools/FUTURE_TOOLS.md | 34 ------------------- 1 file changed, 34 deletions(-) delete mode 100644 dreadnode/attack-surface-management/tools/FUTURE_TOOLS.md diff --git a/dreadnode/attack-surface-management/tools/FUTURE_TOOLS.md b/dreadnode/attack-surface-management/tools/FUTURE_TOOLS.md deleted file mode 100644 index ad6f769..0000000 --- a/dreadnode/attack-surface-management/tools/FUTURE_TOOLS.md +++ /dev/null @@ -1,34 +0,0 @@ -# Future Tool Integrations - -## ProjectDiscovery Tools (via pdtm) - -The agent container has `pdtm` (ProjectDiscovery Tool Manager) pre-installed, -which provides access to the full ProjectDiscovery suite. These are available -as CLI tools in the container and do not need separate MCP servers or toolsets -at this time. - -Available via pdtm: -- **httpx** — HTTP probing, tech detection, response analysis -- **nuclei** — Template-based vulnerability scanning -- **subfinder** — Fast passive subdomain enumeration -- **naabu** — Port scanning -- **katana** — Web crawling/spidering -- **uncover** — Meta-search across Shodan/Censys/Fofa/Hunter/ZoomEye -- **dnsx** — DNS toolkit (resolution, brute-force, wildcard filtering) -- **tlsx** — TLS/SSL inspection -- **notify** — Webhook/Slack/Discord notifications -- **cloudlist** — Cloud asset listing (AWS, Azure, GCP) -- **chaos** — ProjectDiscovery chaos dataset -- **alterx** — Subdomain wordlist generation -- **asnmap** — ASN mapping - -These can be invoked directly via bash tool calls when the agent runs in -the container environment. If dedicated tool wrappers or MCP servers are -needed for any of these, add them here. - -## Potential Future Additions - -- **DNS intelligence toolset** — WHOIS, reverse WHOIS, historical DNS, DMARC/SPF/DKIM analysis -- **Censys MCP** — Internet-wide scan data (complementary to Shodan) -- **Reporting/export tool** — Structured ASM report generation from Neo4j graph -- **Continuous monitoring** — Graph diffing, new asset alerting, scheduled scans