Skip to content

Commit 7e0269f

Browse files
committed
Adapt to the new security architecture
Use the new security proxy which does not store any secret unencrypted at rest.
1 parent 91c2ccd commit 7e0269f

4 files changed

Lines changed: 317 additions & 44 deletions

File tree

Framework/Core/scripts/hyperloop-perf-server/hl_common.py

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,80 @@
1212

1313
from __future__ import annotations
1414

15+
import json
1516
import os
17+
import socket
18+
import time
1619

1720
import httpx
1821

22+
# security-proxy (see ~/src/ali-bot/security-proxy): a localhost credential proxy
23+
# that binds a RANDOM port and mints a per-service, daily-rotating gate token,
24+
# both handed out over a per-user UNIX socket. Replaces the old fixed
25+
# localhost:8888 + static-bearer ccdb-proxy. Every alimonitor.cern.ch artefact is
26+
# routed through the "/alimonitor/" route (upstream = alimonitor.cern.ch root), so a
27+
# single "alimonitor" gate token covers train-workdir / hyperloop / alihyperloop-data.
28+
_AGENT_SOCK = os.path.expanduser(
29+
os.environ.get("SECURITY_PROXY_AGENT_SOCK", "~/.security-proxy/agent.sock")
30+
)
31+
_PROXY_SERVICE = os.environ.get("SECURITY_PROXY_SERVICE", "alimonitor")
32+
_creds_cache: dict[str, tuple[int, str, float]] = {}
33+
34+
35+
def _proxy_creds(service: str) -> tuple[int, str]:
36+
"""Return (port, gate_token) for ``service`` from the security-proxy agent socket.
37+
38+
Cached ~5 min; the proxy accepts the current and previous token, so a slightly
39+
stale cached token still works across the daily rotation. Raises with a clear
40+
hint if the proxy isn't running.
41+
"""
42+
now = time.time()
43+
hit = _creds_cache.get(service)
44+
if hit and now - hit[2] < 300:
45+
return hit[0], hit[1]
46+
try:
47+
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
48+
s.settimeout(5.0)
49+
s.connect(_AGENT_SOCK)
50+
s.sendall((service + "\n").encode())
51+
buf = b""
52+
while not buf.endswith(b"\n"):
53+
chunk = s.recv(4096)
54+
if not chunk:
55+
break
56+
buf += chunk
57+
s.close()
58+
data = json.loads(buf.decode())
59+
except (OSError, ValueError) as exc:
60+
raise RuntimeError(
61+
f"security-proxy agent not reachable at {_AGENT_SOCK} ({exc}); "
62+
"is the proxy running? (see ~/src/ali-bot/security-proxy)"
63+
) from exc
64+
if "error" in data:
65+
raise RuntimeError(
66+
f"security-proxy: {data['error']}; known services: {data.get('services', [])}"
67+
)
68+
port, token = int(data["port"]), data.get("token", "")
69+
_creds_cache[service] = (port, token, now)
70+
return port, token
71+
1972

2073
async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes:
21-
"""Fetch a workdir artefact, routing alimonitor URLs through the local proxy.
74+
"""Fetch a workdir artefact, routing alimonitor URLs through the security-proxy.
2275
23-
Mirrors the grid-cert proxy convention used across the Hyperloop tooling:
2476
``alimonitor.cern.ch/<path>`` is rewritten to
25-
``http://localhost:8888/alimonitor/<path>`` with a bearer token, and
26-
``Accept-Encoding: identity`` is required (otherwise the proxy returns a gzip
27-
Content-Length mismatch). Retries transient protocol/read errors up to 3×.
77+
``http://127.0.0.1:<port>/alimonitor/<path>``: the random port and a per-service,
78+
daily-rotating gate token come from the security-proxy agent socket
79+
(``~/.security-proxy/agent.sock``; override with ``SECURITY_PROXY_AGENT_SOCK``),
80+
and the token is sent as ``Authorization: Bearer``. ``Accept-Encoding: identity``
81+
is required (otherwise the proxy returns a gzip Content-Length mismatch). Retries
82+
transient protocol/read errors up to 3×.
2883
2984
Args:
30-
url: Direct artefact URL (perf script, igprof dump, side-car, ...).
31-
proxy_token: Bearer token for the local proxy. Falls back to PROXY_TOKEN,
32-
then HYPERLOOP_TOKEN, then ``token``.
33-
token: Hyperloop auth token fallback.
85+
url: Direct artefact URL, a local path, or a ``file://`` URL.
86+
proxy_token: Accepted for backward compatibility but ignored — the gate token
87+
is minted from the agent socket.
88+
token: Ditto (ignored).
3489
"""
3590
# Local file (a path or a file:// URL) — read directly, no HTTP. Lets a
3691
# locally-generated side-car (igprof-demangle-symbols output) be attached
@@ -40,20 +95,14 @@ async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes
4095
with open(path, "rb") as f:
4196
return f.read()
4297

43-
proxy_token = (
44-
proxy_token
45-
or os.environ.get("PROXY_TOKEN", "")
46-
or token
47-
or os.environ.get("HYPERLOOP_TOKEN", "")
48-
)
49-
5098
fetch_url = url
99+
headers = {"Accept-Encoding": "identity"}
51100
if "alimonitor.cern.ch" in url:
52101
path = url.split("alimonitor.cern.ch", 1)[1].lstrip("/")
53-
fetch_url = f"http://localhost:8888/alimonitor/{path}"
54-
55-
headers = {"Authorization": f"Bearer {proxy_token}"} if proxy_token else {}
56-
headers["Accept-Encoding"] = "identity"
102+
port, gate = _proxy_creds(_PROXY_SERVICE)
103+
fetch_url = f"http://127.0.0.1:{port}/{_PROXY_SERVICE}/{path}"
104+
if gate:
105+
headers["Authorization"] = f"Bearer {gate}"
57106

58107
async with httpx.AsyncClient(verify=False) as client:
59108
for attempt in range(3):
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
# Copyright 2019-2026 CERN and copyright holders of ALICE O2.
2+
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
# All rights not expressly granted are reserved.
4+
#
5+
# This software is distributed under the terms of the GNU General Public
6+
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
#
8+
# In applying this license CERN does not waive the privileges and immunities
9+
# granted to it by virtue of its status as an Intergovernmental Organization
10+
# or submit itself to any jurisdiction.
11+
"""Log tools for the Hyperloop perf MCP server.
12+
13+
A train/device log (e.g. ``stdout.log``) is fetched once through the alimonitor
14+
proxy and cached on disk; subsequent ``grep_log`` calls run regex queries over
15+
the cached copy and return at most ``max_results`` matches (with optional
16+
context), so a multi-MB log never has to come back over the wire — or into the
17+
model's context — in full.
18+
"""
19+
20+
from __future__ import annotations
21+
22+
import gzip
23+
import hashlib
24+
import os
25+
import re
26+
from dataclasses import dataclass
27+
28+
from hl_common import fetch_bytes
29+
30+
_CACHE_DIR = os.path.expanduser(os.environ.get("LOG_MCP_CACHE", "~/.cache/log-mcp"))
31+
_MAX_LINE = 2000 # truncate individual lines in the output to keep results bounded
32+
33+
34+
@dataclass
35+
class LogReport:
36+
url: str
37+
name: str
38+
path: str
39+
n_lines: int
40+
n_bytes: int
41+
42+
43+
_logs: dict[str, LogReport] = {}
44+
45+
46+
def _get(name: str) -> LogReport:
47+
r = _logs.get(name)
48+
if r is None:
49+
avail = ", ".join(_logs) if _logs else "(none)"
50+
raise ValueError(f"No log '{name}'. Loaded: {avail}. Use load_log first.")
51+
return r
52+
53+
54+
def _clip(line: str) -> str:
55+
return line if len(line) <= _MAX_LINE else line[:_MAX_LINE] + " …[truncated]"
56+
57+
58+
async def load_log(url: str, name: str = "", proxy_token: str = "") -> str:
59+
"""Fetch a log file and cache it for regex querying with grep_log.
60+
61+
The file is downloaded (via the alimonitor proxy for ``alimonitor.cern.ch``
62+
URLs), decompressed if gzip'd, and cached on disk; grep_log then reads that
63+
cached copy and never re-fetches.
64+
65+
Args:
66+
url: Direct URL to a log file (e.g. .../stdout.log or a .gz log).
67+
name: Label (defaults to the filename portion of the URL).
68+
proxy_token: Bearer token for the local proxy (else PROXY_TOKEN env).
69+
"""
70+
raw = await fetch_bytes(url, proxy_token=proxy_token)
71+
data = gzip.decompress(raw) if (url.endswith(".gz") or raw[:2] == b"\x1f\x8b") else raw
72+
text = data.decode("utf-8", errors="replace")
73+
os.makedirs(_CACHE_DIR, exist_ok=True)
74+
h = hashlib.sha1(url.encode()).hexdigest()[:12]
75+
path = os.path.join(_CACHE_DIR, f"{h}.log")
76+
with open(path, "w", errors="replace") as f:
77+
f.write(text)
78+
n_lines = text.count("\n") + (0 if text.endswith("\n") or not text else 1)
79+
pname = name or url.rstrip("/").split("/")[-1]
80+
_logs[pname] = LogReport(url, pname, path, n_lines, len(data))
81+
return f"Loaded log '{pname}': {n_lines:,} lines, {len(data):,} bytes."
82+
83+
84+
def grep_log(
85+
name: str,
86+
pattern: str,
87+
max_results: int = 50,
88+
ignore_case: bool = False,
89+
invert: bool = False,
90+
context: int = 0,
91+
) -> str:
92+
"""Regex-search a cached log and return at most max_results matching lines.
93+
94+
Args:
95+
name: Log name as returned by load_log.
96+
pattern: Python regex (re.search semantics, matches anywhere in a line).
97+
max_results: Maximum number of matching lines to return (default 50).
98+
ignore_case: Case-insensitive match.
99+
invert: Return non-matching lines instead.
100+
context: Lines of context to show before and after each match (like grep -C).
101+
"""
102+
r = _get(name)
103+
try:
104+
rx = re.compile(pattern, re.IGNORECASE if ignore_case else 0)
105+
except re.error as e:
106+
return f"bad regex: {e}"
107+
if max_results < 1:
108+
return "max_results must be >= 1"
109+
110+
with open(r.path, errors="replace") as f:
111+
lines = f.read().splitlines()
112+
113+
total = 0
114+
hits: list[int] = [] # line indices of the first max_results matches
115+
for i, line in enumerate(lines):
116+
matched = bool(rx.search(line))
117+
if invert:
118+
matched = not matched
119+
if matched:
120+
total += 1
121+
if len(hits) < max_results:
122+
hits.append(i)
123+
124+
if total == 0:
125+
return f"[{name}] no matches for /{pattern}/ in {r.n_lines:,} lines"
126+
127+
ctx = max(0, context)
128+
out: list[str] = []
129+
prev_end = -1 # last printed line index, to insert separators / avoid dup
130+
for idx in hits:
131+
lo, hi = max(0, idx - ctx), min(len(lines) - 1, idx + ctx)
132+
if lo <= prev_end: # overlap with previous block: continue from there
133+
lo = prev_end + 1
134+
elif prev_end >= 0:
135+
out.append("--")
136+
for j in range(lo, hi + 1):
137+
mark = ":" if j == idx else "-" # ':' = the match line, '-' = context
138+
out.append(f"{j + 1}{mark} {_clip(lines[j])}")
139+
prev_end = hi
140+
141+
shown = min(total, max_results)
142+
header = f"[{name}] {total} match(es) for /{pattern}/" + (
143+
f"; showing first {shown}" if total > shown else ""
144+
)
145+
return header + "\n" + "\n".join(out)
146+
147+
148+
def list_logs() -> str:
149+
"""List loaded logs."""
150+
if not _logs:
151+
return "No logs loaded. Use load_log first."
152+
return "\n".join(
153+
f"{n}: {r.n_lines:,} lines, {r.n_bytes:,} bytes, url={r.url}" for n, r in _logs.items()
154+
)
155+
156+
157+
def drop_log(name: str) -> str:
158+
"""Free a log and delete its cached copy.
159+
160+
Args:
161+
name: Log name as returned by load_log.
162+
"""
163+
r = _get(name)
164+
if os.path.exists(r.path):
165+
os.remove(r.path)
166+
del _logs[name]
167+
return f"Dropped log '{name}'."
168+
169+
170+
def register(mcp) -> None:
171+
"""Register the log tools on a shared FastMCP instance."""
172+
for fn in (load_log, grep_log, list_logs, drop_log):
173+
mcp.tool()(fn)

Framework/Core/scripts/hyperloop-perf-server/perf_mcp_server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from mcp.server.fastmcp import FastMCP
3636

3737
import igprof_tools
38+
import log_tools
3839
from hl_common import fetch_bytes
3940

4041
# ---------------------------------------------------------------------------
@@ -398,6 +399,7 @@ def compare(name_a: str, name_b: str, n: int = 40, mode: str = "leaf") -> str:
398399
# ---------------------------------------------------------------------------
399400

400401
igprof_tools.register(mcp)
402+
log_tools.register(mcp)
401403

402404

403405
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)