diff --git a/Framework/Core/scripts/hyperloop-server/hyperloop_server.py b/Framework/Core/scripts/hyperloop-server/hyperloop_server.py index 6012f23cab2c4..bd849f99a55cd 100644 --- a/Framework/Core/scripts/hyperloop-server/hyperloop_server.py +++ b/Framework/Core/scripts/hyperloop-server/hyperloop_server.py @@ -933,6 +933,105 @@ async def analysis_trains(analysis_id: int, days: int = 14, return "\n".join(lines) +@mcp.tool() +async def composition_trend(analysis_ids: str = "21674,50446,50462,50570", + dataset: str = "", days: int = 30, + daily_only: bool = True) -> str: + """Trend of train composition & splitting over recent releases. + + For the given analyses, groups their trains by package date and reports, per + date: number of trains, total wagons, wagons-per-train (mean / max), and how + many trains are in a ``decomposed`` (split-for-submission) state. Rising + wagons-per-train together with a falling train-count / decomposed-count means + more wagons are running together (fewer splits) — the downstream effect of + per-device memory wins, which is exactly what frees room under the per-train + memory budget. + + Wagon count comes from the ``wagons_names`` field (comma-separated), so it is + approximate if that field is truncated server-side. Most informative on + production / splitting analyses; fixed-composition daily *test* analyses + (e.g. the benchmark set) never decompose, so they will look flat by design. + + Args: + analysis_ids: comma-separated analysis ids (default: the benchmark set). + dataset: if set, ignore analysis_ids and group a single dataset's + trains by release — sub-trains/day = the split factor of that + (cross-analysis, merged) submission. The right lens for + *production* splits (a heavy merged train decomposing). + days: look-back window by package date (default 30). + daily_only: keep only daily builds (default True). + """ + cutoff = (datetime.date.today() - datetime.timedelta(days=days)).strftime("%Y%m%d") + trains: list = [] + if dataset: + raw = await _get("trains/all-trains.jsp", {"dataset_name": dataset}) + trains = [t for t in (raw or []) if t.get("dataset_name") == dataset] + src = f"dataset '{dataset}'" + else: + aids = [int(x) for x in str(analysis_ids).split(",") if str(x).strip()] + for aid in aids: + try: + raw = await _get("analysis/trains-by-analyses.jsp", {"analysis_ids": aid}) + except Exception: + continue + c = raw[0] if isinstance(raw, list) and raw else raw + trains.extend(c.get("trains", []) if isinstance(c, dict) else []) + src = f"analyses {aids}" + # keep only trains within the look-back window + kept = [] + for t in trains: + d = _tag_date(t.get("package_tag")) + if not d or d < cutoff: + continue + if daily_only and "daily" not in (t.get("package_tag") or "").lower(): + continue + kept.append((d, t)) + # Wagon count per train. Analysis-mode trains carry `wagons_names`; the + # dataset-mode (all-trains.jsp) ones do not, so fetch the count per train + # (concurrency-bounded; capped to the most recent trains to bound load — + # uncounted trains contribute to the train/decomp counts but not w/train). + if dataset: + kept.sort(key=lambda dt: (dt[0], dt[1].get("id", 0)), reverse=True) + sem = asyncio.Semaphore(8) + + async def _wcount(tid): + async with sem: + try: + tj = await _get("trains/train.jsp", {"train_id": tid}) + ts = tj.get("wagons_timestamp") or tj.get("dataset_timestamp") + if not ts: + return None + wd = await _get("trains/wagons_derived_data.jsp", + {"train_id": tid, "wagons_timestamp": ts}) + return len(wd) if isinstance(wd, dict) else None + except Exception: + return None + fetched = await asyncio.gather(*[_wcount(t.get("id")) for _, t in kept[:120]]) + counts = list(fetched) + [None] * (len(kept) - len(fetched)) + else: + counts = [len([x for x in (t.get("wagons_names") or "").split(",") if x.strip()]) + for _, t in kept] + per_date: dict = collections.defaultdict(list) # date -> [(nwagons|None, state)] + for (d, t), nw in zip(kept, counts): + per_date[d].append((nw, str(t.get("state") or "").lower())) + if not per_date: + return f"No trains for {src} in last {days}d." + lines = [f"Composition / split trend — {src}, last {days}d" + + (", daily" if daily_only else "") + ":\n", + f"{'date':>8} {'trains':>6} {'wagons':>8} {'w/train':>8} {'maxw':>5} {'decomp':>7}"] + lines.append("-" * 54) + for d in sorted(per_date, reverse=True): + rows = per_date[d] + ntr = len(rows) + wcs = [w for w, _ in rows if w is not None] + tot = sum(wcs) + mx = max(wcs, default=0) + mean = tot / len(wcs) if wcs else 0.0 + dec = sum(1 for _, s in rows if "decompos" in s or "split" in s) + lines.append(f"{d:>8} {ntr:>6} {tot:>8} {mean:>8.1f} {mx:>5} {dec:>7}") + return "\n".join(lines) + + @mcp.tool() async def test_metrics(train_id: int, per_device: bool = False) -> str: """Resource metrics for one test train (from performanceMetrics_processed.json).