From 681f0786724c31c46771fc5e3b50100ad9734ff0 Mon Sep 17 00:00:00 2001 From: barslev Date: Tue, 12 May 2026 11:06:33 +0200 Subject: [PATCH 01/12] feat: stream CLI logs to /python-cli-runs/* lifecycle endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Buffer the CLI's own log records and POST them in 5s batches to a new register/upload/finalize lifecycle so the admin dashboard renders what the user saw in their terminal alongside the run's terminal status. New modules: - core/cli_run.py — register_cli_run / finalize_cli_run helpers - core/log_uploader.py — BatchedLogUploader (daemon-thread flusher, chunked under the 256KB cap, swallows network errors, drains on shutdown) and UploadingLogHandler routing log records to it - core/streaming.py — setup_streaming() wires both into the socketcli and socketdev loggers, forces them to DEBUG so uploads capture the full history regardless of local terminal verbosity, and returns a teardown callable for the caller to register with atexit - set_run_status() propagates the terminal status through the teardown; socketcli.py exception handlers call it for KeyboardInterrupt (cancelled), uncaught Exception (failure), and any SystemExit with a non-zero code (failure) so sys.exit() paths inside main_code surface correctly instead of defaulting to success Best-effort end-to-end: registration failures fall back to no-streaming and never block the scan. Opt out with --disable-server-log-streaming. Tested against local depscan with the matching /v0/python-cli-runs/* endpoints; 173 unit tests pass. --- socketsecurity/config.py | 14 ++ socketsecurity/core/cli_run.py | 66 ++++++++++ socketsecurity/core/log_uploader.py | 150 +++++++++++++++++++++ socketsecurity/core/streaming.py | 79 ++++++++++++ socketsecurity/socketcli.py | 21 +++ tests/unit/test_cli_run.py | 76 +++++++++++ tests/unit/test_log_uploader.py | 193 ++++++++++++++++++++++++++++ tests/unit/test_streaming.py | 117 +++++++++++++++++ 8 files changed, 716 insertions(+) create mode 100644 socketsecurity/core/cli_run.py create mode 100644 socketsecurity/core/log_uploader.py create mode 100644 socketsecurity/core/streaming.py create mode 100644 tests/unit/test_cli_run.py create mode 100644 tests/unit/test_log_uploader.py create mode 100644 tests/unit/test_streaming.py diff --git a/socketsecurity/config.py b/socketsecurity/config.py index 1d18c6a..c344a12 100644 --- a/socketsecurity/config.py +++ b/socketsecurity/config.py @@ -92,6 +92,7 @@ class CliConfig: ignore_commit_files: bool = False disable_blocking: bool = False disable_ignore: bool = False + disable_server_log_streaming: bool = False strict_blocking: bool = False integration_type: IntegrationType = "api" integration_org_slug: Optional[str] = None @@ -207,6 +208,7 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig': 'ignore_commit_files': args.ignore_commit_files, 'disable_blocking': args.disable_blocking, 'disable_ignore': args.disable_ignore, + 'disable_server_log_streaming': args.disable_server_log_streaming, 'strict_blocking': args.strict_blocking, 'integration_type': args.integration, 'pending_head': args.pending_head, @@ -716,6 +718,18 @@ def create_argument_parser() -> argparse.ArgumentParser: action="store_true", help=argparse.SUPPRESS ) + advanced_group.add_argument( + "--disable-server-log-streaming", + dest="disable_server_log_streaming", + action="store_true", + help="Disable streaming server-side log lines to the terminal during long-running CLI operations." + ) + advanced_group.add_argument( + "--disable_server_log_streaming", + dest="disable_server_log_streaming", + action="store_true", + help=argparse.SUPPRESS + ) advanced_group.add_argument( "--strict-blocking", dest="strict_blocking", diff --git a/socketsecurity/core/cli_run.py b/socketsecurity/core/cli_run.py new file mode 100644 index 0000000..205e812 --- /dev/null +++ b/socketsecurity/core/cli_run.py @@ -0,0 +1,66 @@ +"""Lifecycle helpers for a CLI run on the Socket backend. + +A "run" represents a single CLI invocation. `register_cli_run` opens it and +returns a server-issued `run_id`; `finalize_cli_run` closes it on exit. The +run_id keys the rows that `BatchedLogUploader` POSTs to +`/python-cli-runs//logs` during the run so the dashboard can show +what the user saw in their terminal. + +Both calls are best-effort: failures fall back to no-streaming and never +prevent the scan from running. +""" + +import json +import logging +from typing import Optional + +from .cli_client import CliClient +from .exceptions import APIFailure + +log = logging.getLogger("socketcli") + + +def register_cli_run( + client: CliClient, + client_version: str, + integration: Optional[str] = None, +) -> Optional[str]: + payload = {"client_version": client_version} + if integration: + payload["integration"] = integration + try: + resp = client.request( + path="python-cli-runs", + method="POST", + payload=json.dumps(payload), + ) + except APIFailure as e: + log.debug(f"cli-run register failed (streaming disabled): {e}") + return None + + try: + body = resp.json() + except (ValueError, json.JSONDecodeError) as e: + log.debug(f"cli-run register: bad JSON body: {e}") + return None + + run_id = body.get("run_id") + if not isinstance(run_id, str) or not run_id: + log.debug(f"cli-run register: missing run_id in response: {body!r}") + return None + return run_id + + +def finalize_cli_run( + client: CliClient, + run_id: str, + status: str = "success", +) -> None: + try: + client.request( + path=f"python-cli-runs/{run_id}/finalize", + method="POST", + payload=json.dumps({"status": status}), + ) + except Exception as e: + log.debug(f"cli-run finalize failed (swallowed): {e}") diff --git a/socketsecurity/core/log_uploader.py b/socketsecurity/core/log_uploader.py new file mode 100644 index 0000000..cbc2c4c --- /dev/null +++ b/socketsecurity/core/log_uploader.py @@ -0,0 +1,150 @@ +"""Buffer the CLI's local log records and POST them in batches to +/python-cli-runs//logs so the dashboard's view of a CLI run +mirrors what the user sees in their terminal. + +Behavior: +- daemon thread, 5s flush +- swallow all network errors (debug log only) +- skip empty buffers +- drain on shutdown +- at-most-once semantics (failed batches dropped, not retried) + +A thread-local recursion guard prevents the uploader's own request-error +log lines (emitted by `cli_client.py`'s `socketdev` logger) from being +re-enqueued during a flush. +""" + +import json +import logging +import threading +from datetime import datetime, timezone +from typing import Optional + +from .cli_client import CliClient + +log = logging.getLogger(__name__) + +_FLUSH_GUARD = threading.local() + +_MAX_BATCH_BYTES = 256 * 1024 - 1024 # depscan body cap is 256KB; reserve headroom for envelope/headers + +_LEVEL_MAP = { + logging.DEBUG: "DEBUG", + logging.INFO: "INFO", + logging.WARNING: "WARN", + logging.ERROR: "ERROR", + logging.CRITICAL: "ERROR", +} + + +def _now_str() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + +class BatchedLogUploader: + def __init__( + self, + client: CliClient, + run_id: str, + flush_interval: float = 5.0, + ): + self._client = client + self._run_id = run_id + self._flush_interval = flush_interval + self._buf: list = [] + self._lock = threading.Lock() + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + + def add(self, entry: dict) -> None: + with self._lock: + self._buf.append(entry) + + def start(self) -> None: + if self._thread is not None: + return + self._thread = threading.Thread( + target=self._run, + name=f"socket-log-uploader-{self._run_id[:8]}", + daemon=True, + ) + self._thread.start() + + def stop(self, timeout: float = 2.0) -> None: + if self._thread is None: + self._flush() + return + self._stop.set() + self._thread.join(timeout=timeout) + self._thread = None + self._flush() + + def _run(self) -> None: + while not self._stop.is_set(): + self._flush() + self._stop.wait(self._flush_interval) + + def _flush(self) -> None: + with self._lock: + if not self._buf: + return + entries = self._buf + self._buf = [] + + _FLUSH_GUARD.active = True + try: + for chunk in _chunk_by_size(entries): + try: + self._client.request( + path=f"python-cli-runs/{self._run_id}/logs", + method="POST", + payload=json.dumps({"logs": chunk}), + ) + except Exception as e: + log.debug(f"log upload failed (swallowed, {len(chunk)} entries dropped): {e}") + finally: + _FLUSH_GUARD.active = False + + +def _chunk_by_size(entries: list) -> list: + """Split entries into chunks that each serialize to <= _MAX_BATCH_BYTES. + Single entries that exceed the cap are dropped with a debug log.""" + chunks: list = [] + current: list = [] + envelope = len('{"logs":[]}') + current_size = envelope + for entry in entries: + entry_size = len(json.dumps(entry)) + 1 # +1 for inter-entry comma + if entry_size + envelope > _MAX_BATCH_BYTES: + log.debug(f"log entry too large ({entry_size}B), dropped") + continue + if current and current_size + entry_size > _MAX_BATCH_BYTES: + chunks.append(current) + current = [entry] + current_size = envelope + entry_size + else: + current.append(entry) + current_size += entry_size + if current: + chunks.append(current) + return chunks + + +class UploadingLogHandler(logging.Handler): + def __init__(self, uploader: BatchedLogUploader, context: str = "socket-python-cli"): + super().__init__() + self._uploader = uploader + self._context = context + + def emit(self, record: logging.LogRecord) -> None: + if getattr(_FLUSH_GUARD, "active", False): + return + try: + self._uploader.add({ + "timestamp": _now_str(), + "level": _LEVEL_MAP.get(record.levelno, "INFO"), + "message": self.format(record), + "context": self._context, + }) + except Exception: + self.handleError(record) diff --git a/socketsecurity/core/streaming.py b/socketsecurity/core/streaming.py new file mode 100644 index 0000000..cd6ede9 --- /dev/null +++ b/socketsecurity/core/streaming.py @@ -0,0 +1,79 @@ +"""Wire the server log streaming pipeline for one CLI run. + +`setup_streaming` registers the run with the backend, attaches handlers that +route the CLI's own log output through both the local terminal and a batched +uploader, and forces the loggers into DEBUG so the upload captures everything +regardless of local terminal verbosity. + +Returns a teardown callable to invoke on exit (typically via `atexit.register`). +Returns None if registration failed; in that case nothing was wired up. +""" + +import logging +from typing import Callable, Optional + +from .cli_client import CliClient +from .cli_run import finalize_cli_run, register_cli_run +from .log_uploader import BatchedLogUploader, UploadingLogHandler + +_run_status: str = "success" + + +def set_run_status(status: str) -> None: + global _run_status + _run_status = status + + +def setup_streaming( + *, + client: CliClient, + cli_logger: logging.Logger, + sdk_logger: logging.Logger, + client_version: str, + integration: Optional[str], + enable_debug: bool, +) -> Optional[Callable[[], None]]: + run_id = register_cli_run( + client, + client_version=client_version, + integration=integration, + ) + if not run_id: + cli_logger.debug("server log streaming disabled (register failed)") + return None + + log_uploader = BatchedLogUploader(client, run_id) + log_uploader.start() + upload_handler = UploadingLogHandler(log_uploader, context="socket-python-cli") + upload_handler.setFormatter(logging.Formatter("%(message)s")) + + terminal_handler = logging.StreamHandler() + terminal_handler.setLevel(logging.DEBUG if enable_debug else logging.INFO) + terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s")) + + saved_levels = (cli_logger.level, sdk_logger.level) + saved_propagate = (cli_logger.propagate, sdk_logger.propagate) + cli_logger.setLevel(logging.DEBUG) + sdk_logger.setLevel(logging.DEBUG) + cli_logger.propagate = False + sdk_logger.propagate = False + cli_logger.addHandler(terminal_handler) + sdk_logger.addHandler(terminal_handler) + cli_logger.addHandler(upload_handler) + sdk_logger.addHandler(upload_handler) + + cli_logger.debug(f"server log streaming enabled (run_id={run_id})") + + def teardown() -> None: + cli_logger.removeHandler(upload_handler) + sdk_logger.removeHandler(upload_handler) + log_uploader.stop() + finalize_cli_run(client, run_id, status=_run_status) + cli_logger.removeHandler(terminal_handler) + sdk_logger.removeHandler(terminal_handler) + cli_logger.setLevel(saved_levels[0]) + sdk_logger.setLevel(saved_levels[1]) + cli_logger.propagate = saved_propagate[0] + sdk_logger.propagate = saved_propagate[1] + + return teardown diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index 1f2b166..ea5e73e 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -1,3 +1,4 @@ +import atexit import json import os import sys @@ -20,6 +21,7 @@ from socketsecurity.core.messages import Messages from socketsecurity.core.scm_comments import Comments from socketsecurity.core.socket_config import SocketConfig +from socketsecurity.core.streaming import set_run_status, setup_streaming from socketsecurity.output import OutputHandler socket_logger, log = initialize_logging() @@ -30,13 +32,19 @@ def cli(): try: main_code() except KeyboardInterrupt: + set_run_status("cancelled") log.info("Keyboard Interrupt detected, exiting") config = CliConfig.from_args() # Get current config if not config.disable_blocking: sys.exit(2) else: sys.exit(0) + except SystemExit as e: + if e.code: + set_run_status("failure") + raise except Exception as error: + set_run_status("failure") log.error("Unexpected error when running the cli") log.error(error) traceback.print_exc() @@ -89,6 +97,19 @@ def main_code(): client = CliClient(socket_config) sdk.api.api_url = socket_config.api_url log.debug("loaded client") + + if not config.disable_server_log_streaming: + teardown = setup_streaming( + client=client, + cli_logger=log, + sdk_logger=socket_logger, + client_version=config.version, + integration=config.integration_type, + enable_debug=config.enable_debug, + ) + if teardown: + atexit.register(teardown) + core = Core(socket_config, sdk, config) log.debug("loaded core") diff --git a/tests/unit/test_cli_run.py b/tests/unit/test_cli_run.py new file mode 100644 index 0000000..1c171bd --- /dev/null +++ b/tests/unit/test_cli_run.py @@ -0,0 +1,76 @@ +import json +from unittest.mock import Mock + +from socketsecurity.core.cli_client import CliClient +from socketsecurity.core.cli_run import finalize_cli_run, register_cli_run +from socketsecurity.core.exceptions import APIFailure + + +def _resp(payload): + r = Mock() + r.json.return_value = payload + return r + + +def test_register_cli_run_returns_run_id(): + client = Mock(spec=CliClient) + client.request.return_value = _resp({"run_id": "srv-issued-123"}) + + run_id = register_cli_run(client, client_version="1.2.3", integration="github") + + assert run_id == "srv-issued-123" + args, kwargs = client.request.call_args + assert kwargs["path"] == "python-cli-runs" + assert kwargs["method"] == "POST" + body = json.loads(kwargs["payload"]) + assert body == {"client_version": "1.2.3", "integration": "github"} + + +def test_register_cli_run_returns_none_on_api_failure(): + client = Mock(spec=CliClient) + client.request.side_effect = APIFailure("network down") + + assert register_cli_run(client, client_version="1.0.0") is None + + +def test_register_cli_run_returns_none_on_missing_run_id(): + client = Mock(spec=CliClient) + client.request.return_value = _resp({}) + + assert register_cli_run(client, client_version="1.0.0") is None + + +def test_register_cli_run_returns_none_on_bad_json(): + bad = Mock() + bad.json.side_effect = ValueError("not json") + client = Mock(spec=CliClient) + client.request.return_value = bad + + assert register_cli_run(client, client_version="1.0.0") is None + + +def test_register_cli_run_omits_integration_when_falsy(): + client = Mock(spec=CliClient) + client.request.return_value = _resp({"run_id": "x"}) + + register_cli_run(client, client_version="1.0.0", integration=None) + + body = json.loads(client.request.call_args.kwargs["payload"]) + assert body == {"client_version": "1.0.0"} + + +def test_finalize_cli_run_posts_status(): + client = Mock(spec=CliClient) + finalize_cli_run(client, "run-x", status="failure") + + args, kwargs = client.request.call_args + assert kwargs["path"] == "python-cli-runs/run-x/finalize" + assert kwargs["method"] == "POST" + assert json.loads(kwargs["payload"]) == {"status": "failure"} + + +def test_finalize_cli_run_swallows_errors(): + client = Mock(spec=CliClient) + client.request.side_effect = APIFailure("network down") + + finalize_cli_run(client, "run-x") # must not raise diff --git a/tests/unit/test_log_uploader.py b/tests/unit/test_log_uploader.py new file mode 100644 index 0000000..331b885 --- /dev/null +++ b/tests/unit/test_log_uploader.py @@ -0,0 +1,193 @@ +import json +import logging +import time +from unittest.mock import Mock + +import pytest + +from socketsecurity.core.cli_client import CliClient +from socketsecurity.core.exceptions import APIFailure +from socketsecurity.core.log_uploader import ( + _MAX_BATCH_BYTES, + BatchedLogUploader, + UploadingLogHandler, + _chunk_by_size, +) +from socketsecurity.core.socket_config import SocketConfig + + +@pytest.fixture +def config(): + return SocketConfig(api_key="k", timeout=30) + + +def test_add_buffers_until_flush(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-x", flush_interval=10) + u.add({"timestamp": "t", "level": "INFO", "message": "a", "context": "c"}) + u.add({"timestamp": "t", "level": "INFO", "message": "b", "context": "c"}) + client.request.assert_not_called() + assert len(u._buf) == 2 + + +def test_flush_posts_batch_and_clears_buffer(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-y", flush_interval=10) + u.add({"timestamp": "t", "level": "INFO", "message": "a", "context": "c"}) + u.add({"timestamp": "t", "level": "WARN", "message": "b", "context": "c"}) + + u._flush() + + args, kwargs = client.request.call_args + assert kwargs["path"] == "python-cli-runs/run-y/logs" + assert kwargs["method"] == "POST" + body = json.loads(kwargs["payload"]) + assert len(body["logs"]) == 2 + assert body["logs"][0]["message"] == "a" + assert u._buf == [] + + +def test_flush_skips_empty_buffer(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-z", flush_interval=10) + u._flush() + client.request.assert_not_called() + + +def test_flush_swallows_api_failure_and_drops_batch(): + client = Mock(spec=CliClient) + client.request.side_effect = APIFailure("net down") + u = BatchedLogUploader(client, "run-e", flush_interval=10) + u.add({"timestamp": "t", "level": "INFO", "message": "x", "context": "c"}) + + u._flush() # must not raise + assert u._buf == [] # batch is dropped, not retried + + +def test_stop_drains_remaining_buffer(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-d", flush_interval=10) + u.start() + u.add({"timestamp": "t", "level": "INFO", "message": "tail", "context": "c"}) + u.stop(timeout=2.0) + + assert client.request.called + body = json.loads(client.request.call_args.kwargs["payload"]) + assert body["logs"][-1]["message"] == "tail" + + +def test_handler_emit_enqueues_record(caplog): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-h", flush_interval=10) + h = UploadingLogHandler(u) + + rec = logging.LogRecord( + name="socketcli", level=logging.WARNING, pathname=__file__, + lineno=1, msg="watch out", args=(), exc_info=None, + ) + h.emit(rec) + + assert len(u._buf) == 1 + e = u._buf[0] + assert e["level"] == "WARN" + assert e["message"] == "watch out" + assert e["context"] == "socket-python-cli" + + +def test_handler_skips_during_active_flush(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-g", flush_interval=10) + h = UploadingLogHandler(u) + + captured = {} + + def fake_request(**kwargs): + rec = logging.LogRecord( + name="socketdev", level=logging.ERROR, pathname=__file__, + lineno=1, msg="recursive!", args=(), exc_info=None, + ) + h.emit(rec) + captured["buf_len_during_flush"] = len(u._buf) + return Mock() + + client.request.side_effect = fake_request + u.add({"timestamp": "t", "level": "INFO", "message": "real", "context": "c"}) + u._flush() + + assert captured["buf_len_during_flush"] == 0 # recursive emit was skipped + assert u._buf == [] + + +def test_levels_map_correctly(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-l", flush_interval=10) + h = UploadingLogHandler(u) + + for py_level, expected in [ + (logging.DEBUG, "DEBUG"), + (logging.INFO, "INFO"), + (logging.WARNING, "WARN"), + (logging.ERROR, "ERROR"), + (logging.CRITICAL, "ERROR"), + ]: + rec = logging.LogRecord( + name="t", level=py_level, pathname=__file__, + lineno=1, msg="m", args=(), exc_info=None, + ) + h.emit(rec) + + levels = [e["level"] for e in u._buf] + assert levels == ["DEBUG", "INFO", "WARN", "ERROR", "ERROR"] + + +def test_chunk_by_size_keeps_small_batches_intact(): + entries = [{"timestamp": "t", "level": "INFO", "message": "x", "context": "c"}] * 5 + chunks = _chunk_by_size(entries) + assert len(chunks) == 1 + assert chunks[0] == entries + + +def test_chunk_by_size_splits_when_exceeding_cap(): + big_msg = "y" * 1000 + entries = [ + {"timestamp": "2026-05-07 22:30:00.000", "level": "INFO", + "message": big_msg, "context": "c"} + for _ in range(500) + ] + chunks = _chunk_by_size(entries) + assert len(chunks) >= 2 + for chunk in chunks: + size = len(json.dumps({"logs": chunk})) + assert size <= _MAX_BATCH_BYTES + assert sum(len(c) for c in chunks) == len(entries) + + +def test_chunk_by_size_drops_single_oversize_entry(): + too_big = {"timestamp": "t", "level": "INFO", + "message": "z" * (_MAX_BATCH_BYTES + 100), "context": "c"} + ok = {"timestamp": "t", "level": "INFO", "message": "ok", "context": "c"} + chunks = _chunk_by_size([ok, too_big, ok]) + flat = [e for c in chunks for e in c] + assert flat == [ok, ok] # too_big dropped, smalls preserved + + +def test_flush_chunks_oversize_buffer_into_multiple_posts(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-c", flush_interval=10) + big_msg = "y" * 1000 + for _ in range(500): + u.add({"timestamp": "2026-05-07 22:30:00.000", "level": "INFO", + "message": big_msg, "context": "c"}) + + u._flush() + assert client.request.call_count >= 2 # split into multiple POSTs + + +def test_run_thread_flushes_periodically_then_exits(): + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-t", flush_interval=0.05) + u.add({"timestamp": "t", "level": "INFO", "message": "first", "context": "c"}) + u.start() + time.sleep(0.2) # allow at least one flush tick + u.stop(timeout=1.0) + assert client.request.called diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py new file mode 100644 index 0000000..c20334d --- /dev/null +++ b/tests/unit/test_streaming.py @@ -0,0 +1,117 @@ +import logging +from unittest.mock import patch + +import pytest + +import socketsecurity.core.streaming as streaming_mod +from socketsecurity.core.streaming import set_run_status, setup_streaming + + +@pytest.fixture(autouse=True) +def reset_run_status(): + streaming_mod._run_status = "success" + yield + streaming_mod._run_status = "success" + + +def test_setup_streaming_returns_none_when_register_fails(): + with patch("socketsecurity.core.streaming.register_cli_run", return_value=None): + teardown = setup_streaming( + client=object(), + cli_logger=logging.getLogger("t-fail"), + sdk_logger=logging.getLogger("t-fail-sdk"), + client_version="1.0", + integration=None, + enable_debug=False, + ) + assert teardown is None + + +def test_teardown_finalizes_with_current_run_status(): + cli_logger = logging.getLogger("t-finalize-cli") + sdk_logger = logging.getLogger("t-finalize-sdk") + + finalize_calls = [] + + def fake_finalize(client, run_id, status="success"): + finalize_calls.append(status) + + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-1"), \ + patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ + patch.object(streaming_mod.BatchedLogUploader, "start"), \ + patch.object(streaming_mod.BatchedLogUploader, "stop"): + teardown = setup_streaming( + client=object(), + cli_logger=cli_logger, + sdk_logger=sdk_logger, + client_version="1.0", + integration=None, + enable_debug=False, + ) + assert teardown is not None + + set_run_status("failure") + teardown() + + assert finalize_calls == ["failure"] + + +def test_set_run_status_default_is_success(): + cli_logger = logging.getLogger("t-default-cli") + sdk_logger = logging.getLogger("t-default-sdk") + + finalize_calls = [] + + def fake_finalize(client, run_id, status="success"): + finalize_calls.append(status) + + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-2"), \ + patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ + patch.object(streaming_mod.BatchedLogUploader, "start"), \ + patch.object(streaming_mod.BatchedLogUploader, "stop"): + teardown = setup_streaming( + client=object(), + cli_logger=cli_logger, + sdk_logger=sdk_logger, + client_version="1.0", + integration=None, + enable_debug=False, + ) + teardown() + + assert finalize_calls == ["success"] + + +def test_setup_streaming_restores_logger_state_on_teardown(): + cli_logger = logging.getLogger("t-restore-cli") + sdk_logger = logging.getLogger("t-restore-sdk") + cli_logger.setLevel(logging.WARNING) + sdk_logger.setLevel(logging.ERROR) + cli_logger.propagate = True + sdk_logger.propagate = True + handler_count_before = (len(cli_logger.handlers), len(sdk_logger.handlers)) + + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-3"), \ + patch("socketsecurity.core.streaming.finalize_cli_run"), \ + patch.object(streaming_mod.BatchedLogUploader, "start"), \ + patch.object(streaming_mod.BatchedLogUploader, "stop"): + teardown = setup_streaming( + client=object(), + cli_logger=cli_logger, + sdk_logger=sdk_logger, + client_version="1.0", + integration=None, + enable_debug=False, + ) + # During streaming: levels and propagate are forced + assert cli_logger.level == logging.DEBUG + assert sdk_logger.level == logging.DEBUG + assert cli_logger.propagate is False + assert sdk_logger.propagate is False + teardown() + + assert cli_logger.level == logging.WARNING + assert sdk_logger.level == logging.ERROR + assert cli_logger.propagate is True + assert sdk_logger.propagate is True + assert (len(cli_logger.handlers), len(sdk_logger.handlers)) == handler_count_before From b07e6213df95455ce058528e5fa2971c2b5e00ec Mon Sep 17 00:00:00 2001 From: barslev Date: Thu, 14 May 2026 16:53:55 +0200 Subject: [PATCH 02/12] chore: drop per-batch size chunking to match upstream uploader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 256 KB ceiling I added speculatively when the server cap was 256 KB no longer matches the reference implementation we're mirroring, which sends each flush as a single POST regardless of size. With the server cap now well above any plausible single-flush volume, chunking is unnecessary and divergent — drop it. Removes _chunk_by_size, _MAX_BATCH_BYTES, and the four chunking tests. _flush now POSTs the entire buffered batch as one request. --- socketsecurity/core/log_uploader.py | 44 +++++----------------------- tests/unit/test_log_uploader.py | 45 ----------------------------- 2 files changed, 8 insertions(+), 81 deletions(-) diff --git a/socketsecurity/core/log_uploader.py b/socketsecurity/core/log_uploader.py index cbc2c4c..8c4c6f3 100644 --- a/socketsecurity/core/log_uploader.py +++ b/socketsecurity/core/log_uploader.py @@ -26,8 +26,6 @@ _FLUSH_GUARD = threading.local() -_MAX_BATCH_BYTES = 256 * 1024 - 1024 # depscan body cap is 256KB; reserve headroom for envelope/headers - _LEVEL_MAP = { logging.DEBUG: "DEBUG", logging.INFO: "INFO", @@ -88,48 +86,22 @@ def _flush(self) -> None: with self._lock: if not self._buf: return - entries = self._buf + batch = self._buf self._buf = [] _FLUSH_GUARD.active = True try: - for chunk in _chunk_by_size(entries): - try: - self._client.request( - path=f"python-cli-runs/{self._run_id}/logs", - method="POST", - payload=json.dumps({"logs": chunk}), - ) - except Exception as e: - log.debug(f"log upload failed (swallowed, {len(chunk)} entries dropped): {e}") + self._client.request( + path=f"python-cli-runs/{self._run_id}/logs", + method="POST", + payload=json.dumps({"logs": batch}), + ) + except Exception as e: + log.debug(f"log upload failed (swallowed, {len(batch)} entries dropped): {e}") finally: _FLUSH_GUARD.active = False -def _chunk_by_size(entries: list) -> list: - """Split entries into chunks that each serialize to <= _MAX_BATCH_BYTES. - Single entries that exceed the cap are dropped with a debug log.""" - chunks: list = [] - current: list = [] - envelope = len('{"logs":[]}') - current_size = envelope - for entry in entries: - entry_size = len(json.dumps(entry)) + 1 # +1 for inter-entry comma - if entry_size + envelope > _MAX_BATCH_BYTES: - log.debug(f"log entry too large ({entry_size}B), dropped") - continue - if current and current_size + entry_size > _MAX_BATCH_BYTES: - chunks.append(current) - current = [entry] - current_size = envelope + entry_size - else: - current.append(entry) - current_size += entry_size - if current: - chunks.append(current) - return chunks - - class UploadingLogHandler(logging.Handler): def __init__(self, uploader: BatchedLogUploader, context: str = "socket-python-cli"): super().__init__() diff --git a/tests/unit/test_log_uploader.py b/tests/unit/test_log_uploader.py index 331b885..5555454 100644 --- a/tests/unit/test_log_uploader.py +++ b/tests/unit/test_log_uploader.py @@ -8,10 +8,8 @@ from socketsecurity.core.cli_client import CliClient from socketsecurity.core.exceptions import APIFailure from socketsecurity.core.log_uploader import ( - _MAX_BATCH_BYTES, BatchedLogUploader, UploadingLogHandler, - _chunk_by_size, ) from socketsecurity.core.socket_config import SocketConfig @@ -140,49 +138,6 @@ def test_levels_map_correctly(): assert levels == ["DEBUG", "INFO", "WARN", "ERROR", "ERROR"] -def test_chunk_by_size_keeps_small_batches_intact(): - entries = [{"timestamp": "t", "level": "INFO", "message": "x", "context": "c"}] * 5 - chunks = _chunk_by_size(entries) - assert len(chunks) == 1 - assert chunks[0] == entries - - -def test_chunk_by_size_splits_when_exceeding_cap(): - big_msg = "y" * 1000 - entries = [ - {"timestamp": "2026-05-07 22:30:00.000", "level": "INFO", - "message": big_msg, "context": "c"} - for _ in range(500) - ] - chunks = _chunk_by_size(entries) - assert len(chunks) >= 2 - for chunk in chunks: - size = len(json.dumps({"logs": chunk})) - assert size <= _MAX_BATCH_BYTES - assert sum(len(c) for c in chunks) == len(entries) - - -def test_chunk_by_size_drops_single_oversize_entry(): - too_big = {"timestamp": "t", "level": "INFO", - "message": "z" * (_MAX_BATCH_BYTES + 100), "context": "c"} - ok = {"timestamp": "t", "level": "INFO", "message": "ok", "context": "c"} - chunks = _chunk_by_size([ok, too_big, ok]) - flat = [e for c in chunks for e in c] - assert flat == [ok, ok] # too_big dropped, smalls preserved - - -def test_flush_chunks_oversize_buffer_into_multiple_posts(): - client = Mock(spec=CliClient) - u = BatchedLogUploader(client, "run-c", flush_interval=10) - big_msg = "y" * 1000 - for _ in range(500): - u.add({"timestamp": "2026-05-07 22:30:00.000", "level": "INFO", - "message": big_msg, "context": "c"}) - - u._flush() - assert client.request.call_count >= 2 # split into multiple POSTs - - def test_run_thread_flushes_periodically_then_exits(): client = Mock(spec=CliClient) u = BatchedLogUploader(client, "run-t", flush_interval=0.05) From 03b61266b9a601c2c771c505731e6f91abafd174 Mon Sep 17 00:00:00 2001 From: barslev Date: Fri, 15 May 2026 12:19:24 +0200 Subject: [PATCH 03/12] chore: drop integration field from cli-run register payload The server-side handler now rejects unknown fields and the integration column has been removed from the schema (it was plumbed end-to-end but never displayed, filtered, or grouped on). Stop sending it. Removes the integration parameter from register_cli_run and setup_streaming, drops the corresponding wiring in socketcli.py, and prunes the now-pointless test_register_cli_run_omits_integration_when_falsy case. --- socketsecurity/core/cli_run.py | 6 +----- socketsecurity/core/streaming.py | 2 -- socketsecurity/socketcli.py | 1 - tests/unit/test_cli_run.py | 14 ++------------ tests/unit/test_streaming.py | 4 ---- 5 files changed, 3 insertions(+), 24 deletions(-) diff --git a/socketsecurity/core/cli_run.py b/socketsecurity/core/cli_run.py index 205e812..e3fd428 100644 --- a/socketsecurity/core/cli_run.py +++ b/socketsecurity/core/cli_run.py @@ -23,16 +23,12 @@ def register_cli_run( client: CliClient, client_version: str, - integration: Optional[str] = None, ) -> Optional[str]: - payload = {"client_version": client_version} - if integration: - payload["integration"] = integration try: resp = client.request( path="python-cli-runs", method="POST", - payload=json.dumps(payload), + payload=json.dumps({"client_version": client_version}), ) except APIFailure as e: log.debug(f"cli-run register failed (streaming disabled): {e}") diff --git a/socketsecurity/core/streaming.py b/socketsecurity/core/streaming.py index cd6ede9..13a4b2f 100644 --- a/socketsecurity/core/streaming.py +++ b/socketsecurity/core/streaming.py @@ -30,13 +30,11 @@ def setup_streaming( cli_logger: logging.Logger, sdk_logger: logging.Logger, client_version: str, - integration: Optional[str], enable_debug: bool, ) -> Optional[Callable[[], None]]: run_id = register_cli_run( client, client_version=client_version, - integration=integration, ) if not run_id: cli_logger.debug("server log streaming disabled (register failed)") diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index ea5e73e..a162510 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -104,7 +104,6 @@ def main_code(): cli_logger=log, sdk_logger=socket_logger, client_version=config.version, - integration=config.integration_type, enable_debug=config.enable_debug, ) if teardown: diff --git a/tests/unit/test_cli_run.py b/tests/unit/test_cli_run.py index 1c171bd..0861ad5 100644 --- a/tests/unit/test_cli_run.py +++ b/tests/unit/test_cli_run.py @@ -16,14 +16,14 @@ def test_register_cli_run_returns_run_id(): client = Mock(spec=CliClient) client.request.return_value = _resp({"run_id": "srv-issued-123"}) - run_id = register_cli_run(client, client_version="1.2.3", integration="github") + run_id = register_cli_run(client, client_version="1.2.3") assert run_id == "srv-issued-123" args, kwargs = client.request.call_args assert kwargs["path"] == "python-cli-runs" assert kwargs["method"] == "POST" body = json.loads(kwargs["payload"]) - assert body == {"client_version": "1.2.3", "integration": "github"} + assert body == {"client_version": "1.2.3"} def test_register_cli_run_returns_none_on_api_failure(): @@ -49,16 +49,6 @@ def test_register_cli_run_returns_none_on_bad_json(): assert register_cli_run(client, client_version="1.0.0") is None -def test_register_cli_run_omits_integration_when_falsy(): - client = Mock(spec=CliClient) - client.request.return_value = _resp({"run_id": "x"}) - - register_cli_run(client, client_version="1.0.0", integration=None) - - body = json.loads(client.request.call_args.kwargs["payload"]) - assert body == {"client_version": "1.0.0"} - - def test_finalize_cli_run_posts_status(): client = Mock(spec=CliClient) finalize_cli_run(client, "run-x", status="failure") diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index c20334d..967e60d 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -21,7 +21,6 @@ def test_setup_streaming_returns_none_when_register_fails(): cli_logger=logging.getLogger("t-fail"), sdk_logger=logging.getLogger("t-fail-sdk"), client_version="1.0", - integration=None, enable_debug=False, ) assert teardown is None @@ -45,7 +44,6 @@ def fake_finalize(client, run_id, status="success"): cli_logger=cli_logger, sdk_logger=sdk_logger, client_version="1.0", - integration=None, enable_debug=False, ) assert teardown is not None @@ -74,7 +72,6 @@ def fake_finalize(client, run_id, status="success"): cli_logger=cli_logger, sdk_logger=sdk_logger, client_version="1.0", - integration=None, enable_debug=False, ) teardown() @@ -100,7 +97,6 @@ def test_setup_streaming_restores_logger_state_on_teardown(): cli_logger=cli_logger, sdk_logger=sdk_logger, client_version="1.0", - integration=None, enable_debug=False, ) # During streaming: levels and propagate are forced From d18795dab70c2625b2aaa7822063d58fd42d5482 Mon Sep 17 00:00:00 2001 From: barslev Date: Fri, 15 May 2026 13:15:00 +0200 Subject: [PATCH 04/12] feat: link cli-run to its full_scan via report_run_id on finalize MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The depscan side now joins cli_run → full_scans → repositories via the report_run_id field to surface the scanned repo in the admin dashboard view of each CLI run. Wire the CLI to send the full_scan_id (== the report_run_id depscan expects) when it has one. - finalize_cli_run accepts an optional report_run_id and includes it (nullable) in the POST body. - streaming.py adds a module-level _report_run_id holder and a set_report_run_id() setter; teardown passes it through to finalize. - socketcli.py captures diff.id at a single chokepoint after the diff-producing branches converge, guarded against the NO_DIFF_RAN / NO_SCAN_RAN sentinel values. The field is nullable end-to-end so CLI invocations that fail before producing a diff (or are run in modes that don't create one) still finalize cleanly. --- socketsecurity/core/cli_run.py | 3 ++- socketsecurity/core/streaming.py | 8 +++++++- socketsecurity/socketcli.py | 5 ++++- tests/unit/test_cli_run.py | 12 ++++++++++-- tests/unit/test_streaming.py | 23 +++++++++++++++-------- 5 files changed, 38 insertions(+), 13 deletions(-) diff --git a/socketsecurity/core/cli_run.py b/socketsecurity/core/cli_run.py index e3fd428..651ec04 100644 --- a/socketsecurity/core/cli_run.py +++ b/socketsecurity/core/cli_run.py @@ -51,12 +51,13 @@ def finalize_cli_run( client: CliClient, run_id: str, status: str = "success", + report_run_id: Optional[str] = None, ) -> None: try: client.request( path=f"python-cli-runs/{run_id}/finalize", method="POST", - payload=json.dumps({"status": status}), + payload=json.dumps({"status": status, "report_run_id": report_run_id}), ) except Exception as e: log.debug(f"cli-run finalize failed (swallowed): {e}") diff --git a/socketsecurity/core/streaming.py b/socketsecurity/core/streaming.py index 13a4b2f..aeefed5 100644 --- a/socketsecurity/core/streaming.py +++ b/socketsecurity/core/streaming.py @@ -17,6 +17,7 @@ from .log_uploader import BatchedLogUploader, UploadingLogHandler _run_status: str = "success" +_report_run_id: Optional[str] = None def set_run_status(status: str) -> None: @@ -24,6 +25,11 @@ def set_run_status(status: str) -> None: _run_status = status +def set_report_run_id(report_run_id: Optional[str]) -> None: + global _report_run_id + _report_run_id = report_run_id + + def setup_streaming( *, client: CliClient, @@ -66,7 +72,7 @@ def teardown() -> None: cli_logger.removeHandler(upload_handler) sdk_logger.removeHandler(upload_handler) log_uploader.stop() - finalize_cli_run(client, run_id, status=_run_status) + finalize_cli_run(client, run_id, status=_run_status, report_run_id=_report_run_id) cli_logger.removeHandler(terminal_handler) sdk_logger.removeHandler(terminal_handler) cli_logger.setLevel(saved_levels[0]) diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index a162510..f3b4d7c 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -21,7 +21,7 @@ from socketsecurity.core.messages import Messages from socketsecurity.core.scm_comments import Comments from socketsecurity.core.socket_config import SocketConfig -from socketsecurity.core.streaming import set_run_status, setup_streaming +from socketsecurity.core.streaming import set_report_run_id, set_run_status, setup_streaming from socketsecurity.output import OutputHandler socket_logger, log = initialize_logging() @@ -761,6 +761,9 @@ def _is_unprocessed(c): ) output_handler.handle_output(diff) + if diff.id not in ("NO_DIFF_RAN", "NO_SCAN_RAN"): + set_report_run_id(diff.id) + # Handle license generation if not should_skip_scan and diff.id != "NO_DIFF_RAN" and diff.id != "NO_SCAN_RAN" and config.generate_license: all_packages = {} diff --git a/tests/unit/test_cli_run.py b/tests/unit/test_cli_run.py index 0861ad5..749acfd 100644 --- a/tests/unit/test_cli_run.py +++ b/tests/unit/test_cli_run.py @@ -49,14 +49,22 @@ def test_register_cli_run_returns_none_on_bad_json(): assert register_cli_run(client, client_version="1.0.0") is None -def test_finalize_cli_run_posts_status(): +def test_finalize_cli_run_posts_status_and_null_report_run_id_by_default(): client = Mock(spec=CliClient) finalize_cli_run(client, "run-x", status="failure") args, kwargs = client.request.call_args assert kwargs["path"] == "python-cli-runs/run-x/finalize" assert kwargs["method"] == "POST" - assert json.loads(kwargs["payload"]) == {"status": "failure"} + assert json.loads(kwargs["payload"]) == {"status": "failure", "report_run_id": None} + + +def test_finalize_cli_run_includes_report_run_id_when_provided(): + client = Mock(spec=CliClient) + finalize_cli_run(client, "run-x", status="success", report_run_id="fs-abc") + + body = json.loads(client.request.call_args.kwargs["payload"]) + assert body == {"status": "success", "report_run_id": "fs-abc"} def test_finalize_cli_run_swallows_errors(): diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index 967e60d..eb14b35 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -4,14 +4,20 @@ import pytest import socketsecurity.core.streaming as streaming_mod -from socketsecurity.core.streaming import set_run_status, setup_streaming +from socketsecurity.core.streaming import ( + set_report_run_id, + set_run_status, + setup_streaming, +) @pytest.fixture(autouse=True) -def reset_run_status(): +def reset_streaming_state(): streaming_mod._run_status = "success" + streaming_mod._report_run_id = None yield streaming_mod._run_status = "success" + streaming_mod._report_run_id = None def test_setup_streaming_returns_none_when_register_fails(): @@ -32,8 +38,8 @@ def test_teardown_finalizes_with_current_run_status(): finalize_calls = [] - def fake_finalize(client, run_id, status="success"): - finalize_calls.append(status) + def fake_finalize(client, run_id, status="success", report_run_id=None): + finalize_calls.append((status, report_run_id)) with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-1"), \ patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ @@ -49,9 +55,10 @@ def fake_finalize(client, run_id, status="success"): assert teardown is not None set_run_status("failure") + set_report_run_id("fs-xyz") teardown() - assert finalize_calls == ["failure"] + assert finalize_calls == [("failure", "fs-xyz")] def test_set_run_status_default_is_success(): @@ -60,8 +67,8 @@ def test_set_run_status_default_is_success(): finalize_calls = [] - def fake_finalize(client, run_id, status="success"): - finalize_calls.append(status) + def fake_finalize(client, run_id, status="success", report_run_id=None): + finalize_calls.append((status, report_run_id)) with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-2"), \ patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ @@ -76,7 +83,7 @@ def fake_finalize(client, run_id, status="success"): ) teardown() - assert finalize_calls == ["success"] + assert finalize_calls == [("success", None)] def test_setup_streaming_restores_logger_state_on_teardown(): From 0e8746d75aafd527bd2c9b9673b6cab94653a9c3 Mon Sep 17 00:00:00 2001 From: barslev Date: Sat, 16 May 2026 16:55:09 +0200 Subject: [PATCH 05/12] chore: bump version to 2.2.87 for streaming logs feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - socketsecurity/__init__.py: __version__ → 2.2.87 - pyproject.toml: version → 2.2.87 - CHANGELOG.md: new 2.2.87 entry describing the streaming-logs feature Required by .github/workflows/version-check.yml, which fails the PR if the version isn't incremented relative to main. --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- socketsecurity/__init__.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e2ea6c0..1f3dbd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 2.2.87 + +- Added a streaming log channel between the CLI and the Socket backend. Each CLI invocation now reports a per-run status (`in_progress` / `success` / `failure` / `cancelled`) and uploads a transcript of its own log output, visible in the Socket admin views. The transcript is captured regardless of the local `--enable-debug` state; the existing terminal verbosity is unchanged. The feature is best-effort — registration or upload failures silently degrade and never block the scan. Opt out with `--disable-server-log-streaming`. + ## 2.2.83 - Fixed branch detection in detached-HEAD CI checkouts. When `git name-rev --name-only HEAD` returned an output with a suffix operator (e.g. `remotes/origin/master~1`, `master^0`), the `~N`/`^N` was previously passed through as the branch name and rejected by the Socket API as an invalid Git ref. The suffix is now stripped before the prefix split, producing the bare branch name. diff --git a/pyproject.toml b/pyproject.toml index 49bb294..feaa859 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "socketsecurity" -version = "2.2.86" +version = "2.2.87" requires-python = ">= 3.11" license = {"file" = "LICENSE"} dependencies = [ diff --git a/socketsecurity/__init__.py b/socketsecurity/__init__.py index c816fab..69d6f7b 100644 --- a/socketsecurity/__init__.py +++ b/socketsecurity/__init__.py @@ -1,3 +1,3 @@ __author__ = 'socket.dev' -__version__ = '2.2.86' +__version__ = '2.2.87' USER_AGENT = f'SocketPythonCLI/{__version__}' From 19216cec87be94561f100422ee097a0552c871c0 Mon Sep 17 00:00:00 2001 From: barslev Date: Wed, 10 Jun 2026 10:43:46 +0200 Subject: [PATCH 06/12] feat: flip streaming logs to opt-in via --upload-logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Socket backend changed its register contract so that log streaming is now opt-in rather than default-on. The CLI always calls register (cheap, lets the server force-enable for specific orgs) and gates the downstream upload/finalize lifecycle on the response. Wire changes: - POST /v0/python-cli-runs body adds a required `share_logs` field. - Response: { log_streaming_enabled: bool, run_id: }. When log_streaming_enabled is false, run_id is null and the CLI skips the upload + finalize calls entirely. CLI changes: - New `--upload-logs` flag (default off). When set, the CLI sends share_logs=true on register. - Removed `--disable-server-log-streaming` — default is off, so an opt-out flag no longer makes sense. - register_cli_run takes a required share_logs arg and returns None whenever log_streaming_enabled is false (whatever the reason: client opted out, server denied, server unreachable). Bumps version to 2.2.88 and updates the CHANGELOG entry to reflect the opt-in shape. --- CHANGELOG.md | 4 ++-- pyproject.toml | 2 +- socketsecurity/__init__.py | 2 +- socketsecurity/config.py | 16 +++++++------ socketsecurity/core/cli_run.py | 18 ++++++++++---- socketsecurity/core/streaming.py | 4 +++- socketsecurity/socketcli.py | 20 ++++++++-------- tests/unit/test_cli_run.py | 41 +++++++++++++++++++++++++------- tests/unit/test_streaming.py | 4 ++++ 9 files changed, 76 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f3dbd4..30cde68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,8 @@ # Changelog -## 2.2.87 +## 2.2.88 -- Added a streaming log channel between the CLI and the Socket backend. Each CLI invocation now reports a per-run status (`in_progress` / `success` / `failure` / `cancelled`) and uploads a transcript of its own log output, visible in the Socket admin views. The transcript is captured regardless of the local `--enable-debug` state; the existing terminal verbosity is unchanged. The feature is best-effort — registration or upload failures silently degrade and never block the scan. Opt out with `--disable-server-log-streaming`. +- Added an opt-in streaming log channel between the CLI and the Socket backend. When the new `--upload-logs` flag is set, each CLI invocation registers a run, reports a per-run status (`in_progress` / `success` / `failure` / `cancelled`), and uploads a transcript of its own log output to the Socket backend for that run, visible in the Socket admin views. The transcript is captured regardless of the local `--enable-debug` state; the existing terminal verbosity is unchanged. The Socket backend can also force-enable streaming for specific orgs regardless of the flag. The feature is best-effort — registration or upload failures silently degrade and never block the scan. ## 2.2.83 diff --git a/pyproject.toml b/pyproject.toml index feaa859..50b0518 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "socketsecurity" -version = "2.2.87" +version = "2.2.88" requires-python = ">= 3.11" license = {"file" = "LICENSE"} dependencies = [ diff --git a/socketsecurity/__init__.py b/socketsecurity/__init__.py index 69d6f7b..a7dcdfb 100644 --- a/socketsecurity/__init__.py +++ b/socketsecurity/__init__.py @@ -1,3 +1,3 @@ __author__ = 'socket.dev' -__version__ = '2.2.87' +__version__ = '2.2.88' USER_AGENT = f'SocketPythonCLI/{__version__}' diff --git a/socketsecurity/config.py b/socketsecurity/config.py index c344a12..e1c0796 100644 --- a/socketsecurity/config.py +++ b/socketsecurity/config.py @@ -92,7 +92,7 @@ class CliConfig: ignore_commit_files: bool = False disable_blocking: bool = False disable_ignore: bool = False - disable_server_log_streaming: bool = False + upload_logs: bool = False strict_blocking: bool = False integration_type: IntegrationType = "api" integration_org_slug: Optional[str] = None @@ -208,7 +208,7 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig': 'ignore_commit_files': args.ignore_commit_files, 'disable_blocking': args.disable_blocking, 'disable_ignore': args.disable_ignore, - 'disable_server_log_streaming': args.disable_server_log_streaming, + 'upload_logs': args.upload_logs, 'strict_blocking': args.strict_blocking, 'integration_type': args.integration, 'pending_head': args.pending_head, @@ -719,14 +719,16 @@ def create_argument_parser() -> argparse.ArgumentParser: help=argparse.SUPPRESS ) advanced_group.add_argument( - "--disable-server-log-streaming", - dest="disable_server_log_streaming", + "--upload-logs", + dest="upload_logs", action="store_true", - help="Disable streaming server-side log lines to the terminal during long-running CLI operations." + help="Upload the CLI's log output to the Socket backend for this run. " + "When set, the CLI registers the run with share_logs=true and streams " + "its log records in 5s batches. Default off." ) advanced_group.add_argument( - "--disable_server_log_streaming", - dest="disable_server_log_streaming", + "--upload_logs", + dest="upload_logs", action="store_true", help=argparse.SUPPRESS ) diff --git a/socketsecurity/core/cli_run.py b/socketsecurity/core/cli_run.py index 651ec04..126488a 100644 --- a/socketsecurity/core/cli_run.py +++ b/socketsecurity/core/cli_run.py @@ -1,11 +1,16 @@ """Lifecycle helpers for a CLI run on the Socket backend. A "run" represents a single CLI invocation. `register_cli_run` opens it and -returns a server-issued `run_id`; `finalize_cli_run` closes it on exit. The -run_id keys the rows that `BatchedLogUploader` POSTs to +returns a server-issued `run_id` when streaming is enabled; `finalize_cli_run` +closes it on exit. The run_id keys the rows that `BatchedLogUploader` POSTs to `/python-cli-runs//logs` during the run so the dashboard can show what the user saw in their terminal. +Streaming is opt-in via the `share_logs` field on register. The server may +also force-enable streaming for an org regardless of the client's request, +so the CLI always calls register and gates on the response's +`log_streaming_enabled` flag rather than the client's intent. + Both calls are best-effort: failures fall back to no-streaming and never prevent the scan from running. """ @@ -23,12 +28,13 @@ def register_cli_run( client: CliClient, client_version: str, + share_logs: bool, ) -> Optional[str]: try: resp = client.request( path="python-cli-runs", method="POST", - payload=json.dumps({"client_version": client_version}), + payload=json.dumps({"client_version": client_version, "share_logs": share_logs}), ) except APIFailure as e: log.debug(f"cli-run register failed (streaming disabled): {e}") @@ -40,9 +46,13 @@ def register_cli_run( log.debug(f"cli-run register: bad JSON body: {e}") return None + if not body.get("log_streaming_enabled"): + log.debug("cli-run register: log streaming not enabled by server") + return None + run_id = body.get("run_id") if not isinstance(run_id, str) or not run_id: - log.debug(f"cli-run register: missing run_id in response: {body!r}") + log.debug(f"cli-run register: enabled but missing run_id in response: {body!r}") return None return run_id diff --git a/socketsecurity/core/streaming.py b/socketsecurity/core/streaming.py index aeefed5..c292982 100644 --- a/socketsecurity/core/streaming.py +++ b/socketsecurity/core/streaming.py @@ -36,14 +36,16 @@ def setup_streaming( cli_logger: logging.Logger, sdk_logger: logging.Logger, client_version: str, + share_logs: bool, enable_debug: bool, ) -> Optional[Callable[[], None]]: run_id = register_cli_run( client, client_version=client_version, + share_logs=share_logs, ) if not run_id: - cli_logger.debug("server log streaming disabled (register failed)") + cli_logger.debug("server log streaming not active for this run") return None log_uploader = BatchedLogUploader(client, run_id) diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index f3b4d7c..ca10b22 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -98,16 +98,16 @@ def main_code(): sdk.api.api_url = socket_config.api_url log.debug("loaded client") - if not config.disable_server_log_streaming: - teardown = setup_streaming( - client=client, - cli_logger=log, - sdk_logger=socket_logger, - client_version=config.version, - enable_debug=config.enable_debug, - ) - if teardown: - atexit.register(teardown) + teardown = setup_streaming( + client=client, + cli_logger=log, + sdk_logger=socket_logger, + client_version=config.version, + share_logs=config.upload_logs, + enable_debug=config.enable_debug, + ) + if teardown: + atexit.register(teardown) core = Core(socket_config, sdk, config) log.debug("loaded core") diff --git a/tests/unit/test_cli_run.py b/tests/unit/test_cli_run.py index 749acfd..7b53473 100644 --- a/tests/unit/test_cli_run.py +++ b/tests/unit/test_cli_run.py @@ -12,32 +12,55 @@ def _resp(payload): return r -def test_register_cli_run_returns_run_id(): +def test_register_cli_run_returns_run_id_when_enabled(): client = Mock(spec=CliClient) - client.request.return_value = _resp({"run_id": "srv-issued-123"}) + client.request.return_value = _resp({ + "log_streaming_enabled": True, + "run_id": "srv-issued-123", + }) - run_id = register_cli_run(client, client_version="1.2.3") + run_id = register_cli_run(client, client_version="1.2.3", share_logs=True) assert run_id == "srv-issued-123" args, kwargs = client.request.call_args assert kwargs["path"] == "python-cli-runs" assert kwargs["method"] == "POST" body = json.loads(kwargs["payload"]) - assert body == {"client_version": "1.2.3"} + assert body == {"client_version": "1.2.3", "share_logs": True} + + +def test_register_cli_run_returns_none_when_disabled_by_server(): + client = Mock(spec=CliClient) + client.request.return_value = _resp({ + "log_streaming_enabled": False, + "run_id": None, + }) + + assert register_cli_run(client, client_version="1.0.0", share_logs=False) is None + + +def test_register_cli_run_sends_share_logs_false_when_not_opted_in(): + client = Mock(spec=CliClient) + client.request.return_value = _resp({"log_streaming_enabled": False, "run_id": None}) + + register_cli_run(client, client_version="1.0.0", share_logs=False) + + body = json.loads(client.request.call_args.kwargs["payload"]) + assert body == {"client_version": "1.0.0", "share_logs": False} def test_register_cli_run_returns_none_on_api_failure(): client = Mock(spec=CliClient) client.request.side_effect = APIFailure("network down") - assert register_cli_run(client, client_version="1.0.0") is None + assert register_cli_run(client, client_version="1.0.0", share_logs=True) is None -def test_register_cli_run_returns_none_on_missing_run_id(): +def test_register_cli_run_returns_none_on_missing_run_id_when_enabled(): client = Mock(spec=CliClient) - client.request.return_value = _resp({}) + client.request.return_value = _resp({"log_streaming_enabled": True}) - assert register_cli_run(client, client_version="1.0.0") is None + assert register_cli_run(client, client_version="1.0.0", share_logs=True) is None def test_register_cli_run_returns_none_on_bad_json(): @@ -46,7 +69,7 @@ def test_register_cli_run_returns_none_on_bad_json(): client = Mock(spec=CliClient) client.request.return_value = bad - assert register_cli_run(client, client_version="1.0.0") is None + assert register_cli_run(client, client_version="1.0.0", share_logs=True) is None def test_finalize_cli_run_posts_status_and_null_report_run_id_by_default(): diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index eb14b35..9cc444c 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -27,6 +27,7 @@ def test_setup_streaming_returns_none_when_register_fails(): cli_logger=logging.getLogger("t-fail"), sdk_logger=logging.getLogger("t-fail-sdk"), client_version="1.0", + share_logs=True, enable_debug=False, ) assert teardown is None @@ -50,6 +51,7 @@ def fake_finalize(client, run_id, status="success", report_run_id=None): cli_logger=cli_logger, sdk_logger=sdk_logger, client_version="1.0", + share_logs=True, enable_debug=False, ) assert teardown is not None @@ -79,6 +81,7 @@ def fake_finalize(client, run_id, status="success", report_run_id=None): cli_logger=cli_logger, sdk_logger=sdk_logger, client_version="1.0", + share_logs=True, enable_debug=False, ) teardown() @@ -104,6 +107,7 @@ def test_setup_streaming_restores_logger_state_on_teardown(): cli_logger=cli_logger, sdk_logger=sdk_logger, client_version="1.0", + share_logs=True, enable_debug=False, ) # During streaming: levels and propagate are forced From 06982fb64ebcfeebc64e6608532339adce6f19a5 Mon Sep 17 00:00:00 2001 From: barslev Date: Wed, 10 Jun 2026 13:25:24 +0200 Subject: [PATCH 07/12] chore: regenerate uv.lock for version 2.4.8 The version-check workflow added in main now requires uv.lock to be updated whenever pyproject.toml changes, and the SFW smoke jobs run `uv sync --locked`, which fails on an out-of-sync lockfile. --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index 29cd641..5f6ffdf 100644 --- a/uv.lock +++ b/uv.lock @@ -1283,7 +1283,7 @@ wheels = [ [[package]] name = "socketsecurity" -version = "2.4.7" +version = "2.4.8" source = { editable = "." } dependencies = [ { name = "brotli", marker = "platform_python_implementation == 'CPython'" }, From 3b154de8db80c6a0ba42fe107c5e536298558500 Mon Sep 17 00:00:00 2001 From: barslev Date: Thu, 11 Jun 2026 05:49:10 +0200 Subject: [PATCH 08/12] feat: add --no-upload-logs to explicitly decline log upload Backend now distinguishes "user wants out" from "user said nothing": - `decline_logs: true` (the new flag) overrides every other signal including the server-side org-level override, so users with a legal/consent reason for no upload get a guaranteed off. - `share_logs: true` (the existing --upload-logs) opts in. - Otherwise the server applies its own policy. Argparse enforces that --upload-logs and --no-upload-logs are mutually exclusive (post-parse check via parser.error so dash/underscore aliases on either side still coexist with the same dests). register_cli_run now sends both `share_logs` and `decline_logs` in the payload; setup_streaming forwards both. CHANGELOG 2.4.8 entry updated to call out --no-upload-logs alongside --upload-logs. --- CHANGELOG.md | 4 +++- socketsecurity/config.py | 22 ++++++++++++++++++- socketsecurity/core/cli_run.py | 7 ++++++- socketsecurity/core/streaming.py | 2 ++ socketsecurity/socketcli.py | 1 + tests/unit/test_cli_run.py | 36 +++++++++++++++++++++++++------- tests/unit/test_streaming.py | 4 ++++ 7 files changed, 65 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c02ae8..ae0a7d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,9 @@ ### Added: opt-in streaming log channel via `--upload-logs` -- New `--upload-logs` flag (default off). When set, each CLI invocation registers a run, reports a per-run status (`in_progress` / `success` / `failure` / `cancelled`), and uploads a transcript of its own log output to the Socket backend for that run, visible in the Socket admin views. The transcript is captured regardless of the local `--enable-debug` state; the existing terminal verbosity is unchanged. The Socket backend can also force-enable streaming for specific orgs regardless of the flag. The feature is best-effort — registration or upload failures silently degrade and never block the scan. +- New `--upload-logs` flag (default off). When set, each CLI invocation registers a run, reports a per-run status (`in_progress` / `success` / `failure` / `cancelled`), and uploads a transcript of its own log output to the Socket backend for that run, visible in the Socket admin views. The transcript is captured regardless of the local `--enable-debug` state; the existing terminal verbosity is unchanged. +- New `--no-upload-logs` flag (mutually exclusive with `--upload-logs`) explicitly opts the run out of uploading logs, even when an org-level override would otherwise enable it. Use this when you need a guaranteed no-upload guarantee (e.g. legal/consent reasons). +- The Socket backend can also force-enable streaming for specific orgs in the absence of an explicit opt-out. The feature is best-effort — registration or upload failures silently degrade and never block the scan. ## 2.4.7 diff --git a/socketsecurity/config.py b/socketsecurity/config.py index ae6c798..640e1c9 100644 --- a/socketsecurity/config.py +++ b/socketsecurity/config.py @@ -140,6 +140,7 @@ class CliConfig: disable_blocking: bool = False disable_ignore: bool = False upload_logs: bool = False + decline_logs: bool = False strict_blocking: bool = False integration_type: IntegrationType = "api" integration_org_slug: Optional[str] = None @@ -213,6 +214,9 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig': args = parser.parse_args(args_list) + if args.upload_logs and args.decline_logs: + parser.error("--upload-logs and --no-upload-logs are mutually exclusive") + if args.reach_exclude_paths: logging.warning( "--reach-exclude-paths is deprecated; use --exclude-paths instead. " @@ -284,6 +288,7 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig': 'disable_blocking': args.disable_blocking, 'disable_ignore': args.disable_ignore, 'upload_logs': args.upload_logs, + 'decline_logs': args.decline_logs, 'strict_blocking': args.strict_blocking, 'integration_type': args.integration, 'pending_head': args.pending_head, @@ -874,7 +879,8 @@ def create_argument_parser() -> argparse.ArgumentParser: action="store_true", help="Upload the CLI's log output to the Socket backend for this run. " "When set, the CLI registers the run with share_logs=true and streams " - "its log records in 5s batches. Default off." + "its log records in 5s batches. Default off. Mutually exclusive with " + "--no-upload-logs." ) advanced_group.add_argument( "--upload_logs", @@ -882,6 +888,20 @@ def create_argument_parser() -> argparse.ArgumentParser: action="store_true", help=argparse.SUPPRESS ) + advanced_group.add_argument( + "--no-upload-logs", + dest="decline_logs", + action="store_true", + help="Explicitly opt out of uploading CLI logs to the Socket backend, even " + "when an org-level override would otherwise enable it. Mutually " + "exclusive with --upload-logs." + ) + advanced_group.add_argument( + "--no_upload_logs", + dest="decline_logs", + action="store_true", + help=argparse.SUPPRESS + ) advanced_group.add_argument( "--strict-blocking", dest="strict_blocking", diff --git a/socketsecurity/core/cli_run.py b/socketsecurity/core/cli_run.py index 126488a..e413c4d 100644 --- a/socketsecurity/core/cli_run.py +++ b/socketsecurity/core/cli_run.py @@ -29,12 +29,17 @@ def register_cli_run( client: CliClient, client_version: str, share_logs: bool, + decline_logs: bool, ) -> Optional[str]: try: resp = client.request( path="python-cli-runs", method="POST", - payload=json.dumps({"client_version": client_version, "share_logs": share_logs}), + payload=json.dumps({ + "client_version": client_version, + "share_logs": share_logs, + "decline_logs": decline_logs, + }), ) except APIFailure as e: log.debug(f"cli-run register failed (streaming disabled): {e}") diff --git a/socketsecurity/core/streaming.py b/socketsecurity/core/streaming.py index c292982..b21d728 100644 --- a/socketsecurity/core/streaming.py +++ b/socketsecurity/core/streaming.py @@ -37,12 +37,14 @@ def setup_streaming( sdk_logger: logging.Logger, client_version: str, share_logs: bool, + decline_logs: bool, enable_debug: bool, ) -> Optional[Callable[[], None]]: run_id = register_cli_run( client, client_version=client_version, share_logs=share_logs, + decline_logs=decline_logs, ) if not run_id: cli_logger.debug("server log streaming not active for this run") diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index 0039a04..5dca2f3 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -198,6 +198,7 @@ def main_code(): sdk_logger=socket_logger, client_version=config.version, share_logs=config.upload_logs, + decline_logs=config.decline_logs, enable_debug=config.enable_debug, ) if teardown: diff --git a/tests/unit/test_cli_run.py b/tests/unit/test_cli_run.py index 7b53473..173009a 100644 --- a/tests/unit/test_cli_run.py +++ b/tests/unit/test_cli_run.py @@ -19,14 +19,16 @@ def test_register_cli_run_returns_run_id_when_enabled(): "run_id": "srv-issued-123", }) - run_id = register_cli_run(client, client_version="1.2.3", share_logs=True) + run_id = register_cli_run( + client, client_version="1.2.3", share_logs=True, decline_logs=False + ) assert run_id == "srv-issued-123" args, kwargs = client.request.call_args assert kwargs["path"] == "python-cli-runs" assert kwargs["method"] == "POST" body = json.loads(kwargs["payload"]) - assert body == {"client_version": "1.2.3", "share_logs": True} + assert body == {"client_version": "1.2.3", "share_logs": True, "decline_logs": False} def test_register_cli_run_returns_none_when_disabled_by_server(): @@ -36,31 +38,47 @@ def test_register_cli_run_returns_none_when_disabled_by_server(): "run_id": None, }) - assert register_cli_run(client, client_version="1.0.0", share_logs=False) is None + assert register_cli_run( + client, client_version="1.0.0", share_logs=False, decline_logs=False + ) is None def test_register_cli_run_sends_share_logs_false_when_not_opted_in(): client = Mock(spec=CliClient) client.request.return_value = _resp({"log_streaming_enabled": False, "run_id": None}) - register_cli_run(client, client_version="1.0.0", share_logs=False) + register_cli_run(client, client_version="1.0.0", share_logs=False, decline_logs=False) body = json.loads(client.request.call_args.kwargs["payload"]) - assert body == {"client_version": "1.0.0", "share_logs": False} + assert body == {"client_version": "1.0.0", "share_logs": False, "decline_logs": False} + + +def test_register_cli_run_sends_decline_logs_true_when_opted_out(): + client = Mock(spec=CliClient) + client.request.return_value = _resp({"log_streaming_enabled": False, "run_id": None}) + + register_cli_run(client, client_version="1.0.0", share_logs=False, decline_logs=True) + + body = json.loads(client.request.call_args.kwargs["payload"]) + assert body == {"client_version": "1.0.0", "share_logs": False, "decline_logs": True} def test_register_cli_run_returns_none_on_api_failure(): client = Mock(spec=CliClient) client.request.side_effect = APIFailure("network down") - assert register_cli_run(client, client_version="1.0.0", share_logs=True) is None + assert register_cli_run( + client, client_version="1.0.0", share_logs=True, decline_logs=False + ) is None def test_register_cli_run_returns_none_on_missing_run_id_when_enabled(): client = Mock(spec=CliClient) client.request.return_value = _resp({"log_streaming_enabled": True}) - assert register_cli_run(client, client_version="1.0.0", share_logs=True) is None + assert register_cli_run( + client, client_version="1.0.0", share_logs=True, decline_logs=False + ) is None def test_register_cli_run_returns_none_on_bad_json(): @@ -69,7 +87,9 @@ def test_register_cli_run_returns_none_on_bad_json(): client = Mock(spec=CliClient) client.request.return_value = bad - assert register_cli_run(client, client_version="1.0.0", share_logs=True) is None + assert register_cli_run( + client, client_version="1.0.0", share_logs=True, decline_logs=False + ) is None def test_finalize_cli_run_posts_status_and_null_report_run_id_by_default(): diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index 9cc444c..20b8c65 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -28,6 +28,7 @@ def test_setup_streaming_returns_none_when_register_fails(): sdk_logger=logging.getLogger("t-fail-sdk"), client_version="1.0", share_logs=True, + decline_logs=False, enable_debug=False, ) assert teardown is None @@ -52,6 +53,7 @@ def fake_finalize(client, run_id, status="success", report_run_id=None): sdk_logger=sdk_logger, client_version="1.0", share_logs=True, + decline_logs=False, enable_debug=False, ) assert teardown is not None @@ -82,6 +84,7 @@ def fake_finalize(client, run_id, status="success", report_run_id=None): sdk_logger=sdk_logger, client_version="1.0", share_logs=True, + decline_logs=False, enable_debug=False, ) teardown() @@ -108,6 +111,7 @@ def test_setup_streaming_restores_logger_state_on_teardown(): sdk_logger=sdk_logger, client_version="1.0", share_logs=True, + decline_logs=False, enable_debug=False, ) # During streaming: levels and propagate are forced From 2b9a9b7b6c60676715251aa959b0672bb696c239 Mon Sep 17 00:00:00 2001 From: barslev Date: Thu, 11 Jun 2026 14:11:10 +0200 Subject: [PATCH 09/12] chore: bump version to 2.4.9 2.4.8 already shipped with the full-scan retry fix; this release adds the opt-in --upload-logs streaming channel. --- CHANGELOG.md | 4 +++- pyproject.toml | 2 +- uv.lock | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb13de9..d83963d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 2.4.8 +## 2.4.9 ### Added: opt-in streaming log channel via `--upload-logs` @@ -8,6 +8,8 @@ - New `--no-upload-logs` flag (mutually exclusive with `--upload-logs`) explicitly opts the run out of uploading logs, even when an org-level override would otherwise enable it. Use this when you need a guaranteed no-upload guarantee (e.g. legal/consent reasons). - The Socket backend can also force-enable streaming for specific orgs in the absence of an explicit opt-out. The feature is best-effort — registration or upload failures silently degrade and never block the scan. +## 2.4.8 + ### Fixed: retry transient full-scan upload failures - The full-scan upload (`POST /orgs//full-scans`) now retries transient diff --git a/pyproject.toml b/pyproject.toml index 7876f8d..acff1fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "socketsecurity" -version = "2.4.8" +version = "2.4.9" requires-python = ">= 3.11" license = {"file" = "LICENSE"} dependencies = [ diff --git a/uv.lock b/uv.lock index 0ed1361..86d19ff 100644 --- a/uv.lock +++ b/uv.lock @@ -1283,7 +1283,7 @@ wheels = [ [[package]] name = "socketsecurity" -version = "2.4.8" +version = "2.4.9" source = { editable = "." } dependencies = [ { name = "brotli", marker = "platform_python_implementation == 'CPython'" }, From 678518c7f5df82689c4a33fcabb6c77e2677c007 Mon Sep 17 00:00:00 2001 From: barslev Date: Thu, 11 Jun 2026 14:47:34 +0200 Subject: [PATCH 10/12] chore: bump __version__ to 2.4.9 version-check reads socketsecurity/__init__.py; the previous bump only touched pyproject.toml. --- socketsecurity/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/socketsecurity/__init__.py b/socketsecurity/__init__.py index 9f1797f..cc95834 100644 --- a/socketsecurity/__init__.py +++ b/socketsecurity/__init__.py @@ -1,3 +1,3 @@ __author__ = 'socket.dev' -__version__ = '2.4.8' +__version__ = '2.4.9' USER_AGENT = f'SocketPythonCLI/{__version__}' From 900f74be3d6a194922dbead63f1903f0f2dc5be8 Mon Sep 17 00:00:00 2001 From: barslev Date: Thu, 11 Jun 2026 15:22:12 +0200 Subject: [PATCH 11/12] refactor: address PR review on streaming logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - collapse upload_logs/decline_logs config fields into a single Optional[bool] (tri-state); projection to share_logs/decline_logs happens at the setup_streaming call site. - streaming.py: replace module-level globals + atexit teardown with a StreamingLogs context manager. set_run_status disappears entirely — __exit__ infers the run status from the exception that closed the with block. set_report_run_id is now an instance method. Logger handler wiring iterates over (cli_logger, sdk_logger) instead of repeating itself. - log_uploader.py: drop _LEVEL_MAP, use logging.getLevelName directly. Wire format changes WARN/ERROR-for-CRITICAL to WARNING/CRITICAL. - log_uploader.py: tidy BatchedLogUploader.stop so the final _flush always runs and the thread shutdown only runs when there is a thread. - socketcli.py: wrap main_code body in 'with setup_streaming(...) as streaming:'; cli()'s exception handlers no longer need to set status before re-raising. - tests updated for the CM API and the new level strings. --- socketsecurity/config.py | 11 +- socketsecurity/core/log_uploader.py | 20 +- socketsecurity/core/streaming.py | 175 ++-- socketsecurity/socketcli.py | 1348 +++++++++++++-------------- tests/unit/test_log_uploader.py | 14 +- tests/unit/test_streaming.py | 173 ++-- 6 files changed, 883 insertions(+), 858 deletions(-) diff --git a/socketsecurity/config.py b/socketsecurity/config.py index 640e1c9..da2e473 100644 --- a/socketsecurity/config.py +++ b/socketsecurity/config.py @@ -139,8 +139,9 @@ class CliConfig: ignore_commit_files: bool = False disable_blocking: bool = False disable_ignore: bool = False - upload_logs: bool = False - decline_logs: bool = False + # Tri-state log-upload preference: True = --upload-logs, False = --no-upload-logs, + # None = neither (server-side override decides). + upload_logs: Optional[bool] = None strict_blocking: bool = False integration_type: IntegrationType = "api" integration_org_slug: Optional[str] = None @@ -216,6 +217,9 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig': if args.upload_logs and args.decline_logs: parser.error("--upload-logs and --no-upload-logs are mutually exclusive") + upload_logs: Optional[bool] = ( + True if args.upload_logs else False if args.decline_logs else None + ) if args.reach_exclude_paths: logging.warning( @@ -287,8 +291,7 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig': 'ignore_commit_files': args.ignore_commit_files, 'disable_blocking': args.disable_blocking, 'disable_ignore': args.disable_ignore, - 'upload_logs': args.upload_logs, - 'decline_logs': args.decline_logs, + 'upload_logs': upload_logs, 'strict_blocking': args.strict_blocking, 'integration_type': args.integration, 'pending_head': args.pending_head, diff --git a/socketsecurity/core/log_uploader.py b/socketsecurity/core/log_uploader.py index 8c4c6f3..de57df4 100644 --- a/socketsecurity/core/log_uploader.py +++ b/socketsecurity/core/log_uploader.py @@ -26,14 +26,6 @@ _FLUSH_GUARD = threading.local() -_LEVEL_MAP = { - logging.DEBUG: "DEBUG", - logging.INFO: "INFO", - logging.WARNING: "WARN", - logging.ERROR: "ERROR", - logging.CRITICAL: "ERROR", -} - def _now_str() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] @@ -69,12 +61,10 @@ def start(self) -> None: self._thread.start() def stop(self, timeout: float = 2.0) -> None: - if self._thread is None: - self._flush() - return - self._stop.set() - self._thread.join(timeout=timeout) - self._thread = None + if self._thread is not None: + self._stop.set() + self._thread.join(timeout=timeout) + self._thread = None self._flush() def _run(self) -> None: @@ -114,7 +104,7 @@ def emit(self, record: logging.LogRecord) -> None: try: self._uploader.add({ "timestamp": _now_str(), - "level": _LEVEL_MAP.get(record.levelno, "INFO"), + "level": logging.getLevelName(record.levelno), "message": self.format(record), "context": self._context, }) diff --git a/socketsecurity/core/streaming.py b/socketsecurity/core/streaming.py index b21d728..7b28c75 100644 --- a/socketsecurity/core/streaming.py +++ b/socketsecurity/core/streaming.py @@ -1,33 +1,118 @@ -"""Wire the server log streaming pipeline for one CLI run. - -`setup_streaming` registers the run with the backend, attaches handlers that -route the CLI's own log output through both the local terminal and a batched -uploader, and forces the loggers into DEBUG so the upload captures everything -regardless of local terminal verbosity. - -Returns a teardown callable to invoke on exit (typically via `atexit.register`). -Returns None if registration failed; in that case nothing was wired up. +"""Server log streaming pipeline for one CLI run. + +`setup_streaming` returns a `StreamingLogs` context manager. On enter it +registers the run with the backend, attaches handlers that route the CLI's +own log output through both the local terminal and a batched uploader, and +forces the loggers into DEBUG so the upload captures everything regardless +of local terminal verbosity. On exit it tears the handlers back down and +finalizes the run; the status sent to finalize is inferred from the +exception that closed the `with` block (success / failure / cancelled). + +If registration fails the manager becomes a no-op — nothing is wired up +and __exit__ does nothing. """ import logging -from typing import Callable, Optional +from typing import Optional from .cli_client import CliClient from .cli_run import finalize_cli_run, register_cli_run from .log_uploader import BatchedLogUploader, UploadingLogHandler -_run_status: str = "success" -_report_run_id: Optional[str] = None - - -def set_run_status(status: str) -> None: - global _run_status - _run_status = status - -def set_report_run_id(report_run_id: Optional[str]) -> None: - global _report_run_id - _report_run_id = report_run_id +class StreamingLogs: + def __init__( + self, + *, + client: CliClient, + cli_logger: logging.Logger, + sdk_logger: logging.Logger, + client_version: str, + share_logs: bool, + decline_logs: bool, + enable_debug: bool, + ): + self._client = client + self._loggers = (cli_logger, sdk_logger) + self._client_version = client_version + self._share_logs = share_logs + self._decline_logs = decline_logs + self._enable_debug = enable_debug + + self._run_id: Optional[str] = None + self._report_run_id: Optional[str] = None + self._uploader: Optional[BatchedLogUploader] = None + self._upload_handler: Optional[UploadingLogHandler] = None + self._terminal_handler: Optional[logging.StreamHandler] = None + self._saved_levels: tuple = () + self._saved_propagate: tuple = () + + def set_report_run_id(self, report_run_id: Optional[str]) -> None: + self._report_run_id = report_run_id + + def __enter__(self) -> "StreamingLogs": + self._run_id = register_cli_run( + self._client, + client_version=self._client_version, + share_logs=self._share_logs, + decline_logs=self._decline_logs, + ) + cli_logger = self._loggers[0] + if not self._run_id: + cli_logger.debug("server log streaming not active for this run") + return self + + self._uploader = BatchedLogUploader(self._client, self._run_id) + self._uploader.start() + self._upload_handler = UploadingLogHandler(self._uploader, context="socket-python-cli") + self._upload_handler.setFormatter(logging.Formatter("%(message)s")) + + self._terminal_handler = logging.StreamHandler() + self._terminal_handler.setLevel(logging.DEBUG if self._enable_debug else logging.INFO) + self._terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s")) + + self._saved_levels = tuple(lg.level for lg in self._loggers) + self._saved_propagate = tuple(lg.propagate for lg in self._loggers) + for lg in self._loggers: + lg.setLevel(logging.DEBUG) + lg.propagate = False + lg.addHandler(self._terminal_handler) + lg.addHandler(self._upload_handler) + + cli_logger.debug(f"server log streaming enabled (run_id={self._run_id})") + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> bool: + if self._run_id is None: + return False + + status = self._status_for_exit(exc_type, exc_val) + for lg in self._loggers: + lg.removeHandler(self._upload_handler) + self._uploader.stop() + finalize_cli_run( + self._client, + self._run_id, + status=status, + report_run_id=self._report_run_id, + ) + for lg in self._loggers: + lg.removeHandler(self._terminal_handler) + for lg, level, propagate in zip(self._loggers, self._saved_levels, self._saved_propagate): + lg.setLevel(level) + lg.propagate = propagate + return False + + @staticmethod + def _status_for_exit(exc_type, exc_val) -> str: + if exc_type is None: + return "success" + if issubclass(exc_type, KeyboardInterrupt): + return "cancelled" + # SystemExit with code 0 / None is a clean exit; non-zero codes signal failure. + if issubclass(exc_type, SystemExit) and not getattr(exc_val, "code", None): + return "success" + return "failure" def setup_streaming( @@ -39,49 +124,13 @@ def setup_streaming( share_logs: bool, decline_logs: bool, enable_debug: bool, -) -> Optional[Callable[[], None]]: - run_id = register_cli_run( - client, +) -> StreamingLogs: + return StreamingLogs( + client=client, + cli_logger=cli_logger, + sdk_logger=sdk_logger, client_version=client_version, share_logs=share_logs, decline_logs=decline_logs, + enable_debug=enable_debug, ) - if not run_id: - cli_logger.debug("server log streaming not active for this run") - return None - - log_uploader = BatchedLogUploader(client, run_id) - log_uploader.start() - upload_handler = UploadingLogHandler(log_uploader, context="socket-python-cli") - upload_handler.setFormatter(logging.Formatter("%(message)s")) - - terminal_handler = logging.StreamHandler() - terminal_handler.setLevel(logging.DEBUG if enable_debug else logging.INFO) - terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s")) - - saved_levels = (cli_logger.level, sdk_logger.level) - saved_propagate = (cli_logger.propagate, sdk_logger.propagate) - cli_logger.setLevel(logging.DEBUG) - sdk_logger.setLevel(logging.DEBUG) - cli_logger.propagate = False - sdk_logger.propagate = False - cli_logger.addHandler(terminal_handler) - sdk_logger.addHandler(terminal_handler) - cli_logger.addHandler(upload_handler) - sdk_logger.addHandler(upload_handler) - - cli_logger.debug(f"server log streaming enabled (run_id={run_id})") - - def teardown() -> None: - cli_logger.removeHandler(upload_handler) - sdk_logger.removeHandler(upload_handler) - log_uploader.stop() - finalize_cli_run(client, run_id, status=_run_status, report_run_id=_report_run_id) - cli_logger.removeHandler(terminal_handler) - sdk_logger.removeHandler(terminal_handler) - cli_logger.setLevel(saved_levels[0]) - sdk_logger.setLevel(saved_levels[1]) - cli_logger.propagate = saved_propagate[0] - sdk_logger.propagate = saved_propagate[1] - - return teardown diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index 5dca2f3..e9fabbb 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -1,4 +1,3 @@ -import atexit import json import os import shutil @@ -21,7 +20,7 @@ from socketsecurity.core.messages import Messages from socketsecurity.core.scm_comments import Comments from socketsecurity.core.socket_config import SocketConfig -from socketsecurity.core.streaming import set_report_run_id, set_run_status, setup_streaming +from socketsecurity.core.streaming import setup_streaming from socketsecurity.fossa_compat import build_fossa_attribution_payload from socketsecurity.output import OutputHandler @@ -122,19 +121,15 @@ def cli(): try: main_code() except KeyboardInterrupt: - set_run_status("cancelled") log.info("Keyboard Interrupt detected, exiting") config = CliConfig.from_args() # Get current config if not config.disable_blocking: sys.exit(2) else: sys.exit(0) - except SystemExit as e: - if e.code: - set_run_status("failure") + except SystemExit: raise except Exception as error: - set_run_status("failure") config = CliConfig.from_args() # Get current config _emit_infrastructure_error( f"Unexpected error when running the CLI: {error}", @@ -192,656 +187,632 @@ def main_code(): sdk.api.api_url = socket_config.api_url log.debug("loaded client") - teardown = setup_streaming( + with setup_streaming( client=client, cli_logger=log, sdk_logger=socket_logger, client_version=config.version, - share_logs=config.upload_logs, - decline_logs=config.decline_logs, + share_logs=config.upload_logs is True, + decline_logs=config.upload_logs is False, enable_debug=config.enable_debug, - ) - if teardown: - atexit.register(teardown) - - core = Core(socket_config, sdk, config) - log.debug("loaded core") - - # Check for required dependencies if reachability analysis is enabled - if config.reach: - log.info("Reachability analysis enabled, checking for required dependencies...") - required_deps = ["npm", "node", "uv", "npx"] - missing_deps = [] - found_deps = [] - - for dep in required_deps: - if shutil.which(dep): - found_deps.append(dep) - log.debug(f"Found required dependency: {dep}") - else: - missing_deps.append(dep) - - if missing_deps: - log.error(f"Reachability analysis requires the following dependencies: {', '.join(required_deps)}") - log.error(f"Missing dependencies: {', '.join(missing_deps)}") - log.error("Please install the missing dependencies and try again.") - sys.exit(3) - - log.info(f"All required dependencies found: {', '.join(found_deps)}") - - # Check if organization has an enterprise plan - log.info("Checking organization plan for reachability analysis eligibility...") - org_response = sdk.org.get(use_types=True) - organizations = org_response.get("organizations", {}) + ) as streaming: + core = Core(socket_config, sdk, config) + log.debug("loaded core") - if organizations: - org_id = next(iter(organizations)) - org_plan = organizations[org_id].get('plan', '') + # Check for required dependencies if reachability analysis is enabled + if config.reach: + log.info("Reachability analysis enabled, checking for required dependencies...") + required_deps = ["npm", "node", "uv", "npx"] + missing_deps = [] + found_deps = [] - # Check if plan matches enterprise* pattern (enterprise, enterprise_trial, etc.) - if not org_plan.startswith('enterprise'): - log.error("Reachability analysis is only available for enterprise plans.") - log.error(f"Your organization plan is: {org_plan}") - log.error("Please upgrade to an enterprise plan to use reachability analysis.") + for dep in required_deps: + if shutil.which(dep): + found_deps.append(dep) + log.debug(f"Found required dependency: {dep}") + else: + missing_deps.append(dep) + + if missing_deps: + log.error(f"Reachability analysis requires the following dependencies: {', '.join(required_deps)}") + log.error(f"Missing dependencies: {', '.join(missing_deps)}") + log.error("Please install the missing dependencies and try again.") sys.exit(3) - log.info(f"Organization plan verified: {org_plan}") - else: - log.error("Unable to retrieve organization information for plan verification.") - sys.exit(3) - - # Parse files argument - try: - if isinstance(config.files, list): - # Already a list, use as-is - specified_files = config.files - elif isinstance(config.files, str): - # Handle different string formats - files_str = config.files.strip() + log.info(f"All required dependencies found: {', '.join(found_deps)}") - # If the string is wrapped in extra quotes, strip them - if ((files_str.startswith('"') and files_str.endswith('"')) or - (files_str.startswith("'") and files_str.endswith("'"))): - # Check if the inner content looks like JSON - inner_str = files_str[1:-1] - if inner_str.startswith('[') and inner_str.endswith(']'): - files_str = inner_str + # Check if organization has an enterprise plan + log.info("Checking organization plan for reachability analysis eligibility...") + org_response = sdk.org.get(use_types=True) + organizations = org_response.get("organizations", {}) - # Try to parse as JSON - try: - specified_files = json.loads(files_str) - except json.JSONDecodeError: - # If JSON parsing fails, try replacing single quotes with double quotes - files_str = files_str.replace("'", '"') - specified_files = json.loads(files_str) - else: - # Default to empty list - specified_files = [] - except Exception as error: - log.error(f"Unable to parse files argument: {config.files}") - log.error(f"Error details: {error}") - log.debug(f"Files type: {type(config.files)}") - log.debug(f"Files repr: {repr(config.files)}") - sys.exit(3) + if organizations: + org_id = next(iter(organizations)) + org_plan = organizations[org_id].get('plan', '') + + # Check if plan matches enterprise* pattern (enterprise, enterprise_trial, etc.) + if not org_plan.startswith('enterprise'): + log.error("Reachability analysis is only available for enterprise plans.") + log.error(f"Your organization plan is: {org_plan}") + log.error("Please upgrade to an enterprise plan to use reachability analysis.") + sys.exit(3) + + log.info(f"Organization plan verified: {org_plan}") + else: + log.error("Unable to retrieve organization information for plan verification.") + sys.exit(3) + + # Parse files argument + try: + if isinstance(config.files, list): + # Already a list, use as-is + specified_files = config.files + elif isinstance(config.files, str): + # Handle different string formats + files_str = config.files.strip() + + # If the string is wrapped in extra quotes, strip them + if ((files_str.startswith('"') and files_str.endswith('"')) or + (files_str.startswith("'") and files_str.endswith("'"))): + # Check if the inner content looks like JSON + inner_str = files_str[1:-1] + if inner_str.startswith('[') and inner_str.endswith(']'): + files_str = inner_str + + # Try to parse as JSON + try: + specified_files = json.loads(files_str) + except json.JSONDecodeError: + # If JSON parsing fails, try replacing single quotes with double quotes + files_str = files_str.replace("'", '"') + specified_files = json.loads(files_str) + else: + # Default to empty list + specified_files = [] + except Exception as error: + log.error(f"Unable to parse files argument: {config.files}") + log.error(f"Error details: {error}") + log.debug(f"Files type: {type(config.files)}") + log.debug(f"Files repr: {repr(config.files)}") + sys.exit(3) - # Determine if files were explicitly specified - files_explicitly_specified = config.files != "[]" and len(specified_files) > 0 - - # Variable to track if we need to override files with facts file - facts_file_to_submit = None - # Variable to track SBOM files to submit when using --reach-use-only-pregenerated-sboms - sbom_files_to_submit = None - - # Git setup - is_repo = False - git_repo: Git - try: - git_repo = Git(config.target_path) - is_repo = True - if not config.repo: - config.repo = git_repo.repo_name - if not config.commit_sha: - config.commit_sha = git_repo.commit_str - if not config.branch: - config.branch = git_repo.branch - if not config.committers: - config.committers = [git_repo.get_formatted_committer()] - if not config.commit_message: - config.commit_message = git_repo.commit_message - except InvalidGitRepositoryError: + # Determine if files were explicitly specified + files_explicitly_specified = config.files != "[]" and len(specified_files) > 0 + + # Variable to track if we need to override files with facts file + facts_file_to_submit = None + # Variable to track SBOM files to submit when using --reach-use-only-pregenerated-sboms + sbom_files_to_submit = None + + # Git setup is_repo = False - log.debug("Not a git repository, setting ignore_commit_files=True") - config.ignore_commit_files = True - except NoSuchPathError: - raise Exception(f"Unable to find path {config.target_path}") - - # Track whether repo/branch fell back to the default sentinels so reachability can skip - # forwarding them as coana cache-bucket keys (computed before any workspace suffixing). - repo_defaulted = not config.repo - branch_defaulted = not config.branch - - if not config.repo: - base_repo_name = DEFAULT_REPO_NAME - if config.workspace_name: - config.repo = f"{base_repo_name}-{config.workspace_name}" - else: - config.repo = base_repo_name - log.debug(f"Using default repository name: {config.repo}") + git_repo: Git + try: + git_repo = Git(config.target_path) + is_repo = True + if not config.repo: + config.repo = git_repo.repo_name + if not config.commit_sha: + config.commit_sha = git_repo.commit_str + if not config.branch: + config.branch = git_repo.branch + if not config.committers: + config.committers = [git_repo.get_formatted_committer()] + if not config.commit_message: + config.commit_message = git_repo.commit_message + except InvalidGitRepositoryError: + is_repo = False + log.debug("Not a git repository, setting ignore_commit_files=True") + config.ignore_commit_files = True + except NoSuchPathError: + raise Exception(f"Unable to find path {config.target_path}") + + # Track whether repo/branch fell back to the default sentinels so reachability can skip + # forwarding them as coana cache-bucket keys (computed before any workspace suffixing). + repo_defaulted = not config.repo + branch_defaulted = not config.branch - if not config.branch: - config.branch = DEFAULT_BRANCH_NAME - log.debug(f"Using default branch name: {config.branch}") + if not config.repo: + base_repo_name = DEFAULT_REPO_NAME + if config.workspace_name: + config.repo = f"{base_repo_name}-{config.workspace_name}" + else: + config.repo = base_repo_name + log.debug(f"Using default repository name: {config.repo}") - # Calculate the scan paths - combine target_path with sub_paths if provided - scan_paths = [] - base_paths = [config.target_path] # Always use target_path as the single base path - - if config.sub_paths: - for sub_path in config.sub_paths: - full_scan_path = os.path.join(config.target_path, sub_path) - log.debug(f"Using sub-path for scanning: {full_scan_path}") - # Verify the scan path exists - if not os.path.exists(full_scan_path): - raise Exception(f"Sub-path does not exist: {full_scan_path}") - scan_paths.append(full_scan_path) - else: - # Use the target path as the single scan path - scan_paths = [config.target_path] - - # Modify repository name if workspace_name is provided - if config.workspace_name and config.repo: - config.repo = f"{config.repo}-{config.workspace_name}" - log.debug(f"Modified repository name with workspace suffix: {config.repo}") - elif config.workspace_name and not config.repo: - # If no repo name was set but workspace_name is provided, we'll use it later - log.debug(f"Workspace name provided: {config.workspace_name}") - - # Run reachability analysis if enabled - if config.reach: - from socketsecurity.core.tools.reachability import ReachabilityAnalyzer - - log.info("Starting reachability analysis...") - - # Find manifest files in scan paths (excluding .socket.facts.json to avoid circular dependency) - log.info("Finding manifest files for reachability analysis...") - manifest_files = [] - - # Always find all manifest files for the tar hash upload - for scan_path in scan_paths: - scan_manifests = core.find_files(scan_path) - # Filter out .socket.facts.json files from manifest upload - scan_manifests = [f for f in scan_manifests if not f.endswith('.socket.facts.json')] - manifest_files.extend(scan_manifests) + if not config.branch: + config.branch = DEFAULT_BRANCH_NAME + log.debug(f"Using default branch name: {config.branch}") + + # Calculate the scan paths - combine target_path with sub_paths if provided + scan_paths = [] + base_paths = [config.target_path] # Always use target_path as the single base path - if not manifest_files: - log.warning("No manifest files found for reachability analysis") + if config.sub_paths: + for sub_path in config.sub_paths: + full_scan_path = os.path.join(config.target_path, sub_path) + log.debug(f"Using sub-path for scanning: {full_scan_path}") + # Verify the scan path exists + if not os.path.exists(full_scan_path): + raise Exception(f"Sub-path does not exist: {full_scan_path}") + scan_paths.append(full_scan_path) else: - log.info(f"Found {len(manifest_files)} manifest files for reachability upload") + # Use the target path as the single scan path + scan_paths = [config.target_path] + + # Modify repository name if workspace_name is provided + if config.workspace_name and config.repo: + config.repo = f"{config.repo}-{config.workspace_name}" + log.debug(f"Modified repository name with workspace suffix: {config.repo}") + elif config.workspace_name and not config.repo: + # If no repo name was set but workspace_name is provided, we'll use it later + log.debug(f"Workspace name provided: {config.workspace_name}") + + # Run reachability analysis if enabled + if config.reach: + from socketsecurity.core.tools.reachability import ReachabilityAnalyzer + + log.info("Starting reachability analysis...") + + # Find manifest files in scan paths (excluding .socket.facts.json to avoid circular dependency) + log.info("Finding manifest files for reachability analysis...") + manifest_files = [] + + # Always find all manifest files for the tar hash upload + for scan_path in scan_paths: + scan_manifests = core.find_files(scan_path) + # Filter out .socket.facts.json files from manifest upload + scan_manifests = [f for f in scan_manifests if not f.endswith('.socket.facts.json')] + manifest_files.extend(scan_manifests) - # Upload manifests and get tar hash - log.info("Uploading manifest files...") - try: - # Get org_slug early (we'll need it) - org_slug = core.config.org_slug - - # Upload manifest files - tar_hash = sdk.uploadmanifests.upload_manifest_files( - org_slug=org_slug, - file_paths=manifest_files, - workspace=config.repo or "default-workspace", - base_paths=[config.target_path], - use_lazy_loading=False - ) - log.info(f"Manifest upload successful, tar hash: {tar_hash}") - - # Initialize and run reachability analyzer - analyzer = ReachabilityAnalyzer(sdk, config.api_token) - - # Determine output path - output_path = config.reach_output_file or ".socket.facts.json" - - # Run the analysis - result = analyzer.run_reachability_analysis( - org_slug=org_slug, - target_directory=config.target_path, - tar_hash=tar_hash, - output_path=output_path, - timeout=config.reach_analysis_timeout, - memory_limit=config.reach_analysis_memory_limit, - ecosystems=config.reach_ecosystems, - # Union the deprecated --reach-exclude-paths with the unified --exclude-paths - # and forward verbatim to coana's --exclude-dirs. Patterns are scan-root - # relative; coana resolves --exclude-dirs relative to its `run` target, which - # here is `.` == cwd == scan root, so passthrough is correct. If a nested - # target is ever supported, re-anchor patterns to the target first (see Node's - # pathRelativeToTarget in exclude-paths.mts). - exclude_paths=( - (config.reach_exclude_paths or []) + (config.exclude_paths or []) - ) or None, - min_severity=config.reach_min_severity, - skip_cache=config.reach_skip_cache or False, - disable_analytics=config.reach_disable_analytics or False, - enable_analysis_splitting=config.reach_enable_analysis_splitting or False, - detailed_analysis_log_file=config.reach_detailed_analysis_log_file or False, - lazy_mode=config.reach_lazy_mode or False, - repo_name=None if repo_defaulted else config.repo, - branch_name=None if branch_defaulted else config.branch, - version=config.reach_version, - concurrency=config.reach_concurrency, - additional_params=config.reach_additional_params, - allow_unverified=config.allow_unverified, - enable_debug=config.enable_debug, - use_only_pregenerated_sboms=config.reach_use_only_pregenerated_sboms, - continue_on_analysis_errors=config.reach_continue_on_analysis_errors, - continue_on_install_errors=config.reach_continue_on_install_errors, - continue_on_missing_lock_files=config.reach_continue_on_missing_lock_files, - continue_on_no_source_files=config.reach_continue_on_no_source_files, - reach_debug=config.reach_debug, - disable_external_tool_checks=config.reach_disable_external_tool_checks, - ) - - log.info("Reachability analysis completed successfully") - log.info(f"Results written to: {result['report_path']}") - if result.get('scan_id'): - log.info(f"Reachability scan ID: {result['scan_id']}") - - # If only-facts-file mode, mark the facts file for submission - if config.only_facts_file: - facts_file_to_submit = os.path.abspath(output_path) - log.info(f"Only-facts-file mode: will submit only {facts_file_to_submit}") - - # If reach-use-only-pregenerated-sboms mode, submit CDX, SPDX, and facts file - if config.reach_use_only_pregenerated_sboms: - # Find only CDX and SPDX files for the final scan submission - sbom_files_to_submit = [] - for scan_path in scan_paths: - sbom_files_to_submit.extend(core.find_sbom_files(scan_path)) - # Use relative path for facts file - if os.path.exists(output_path): - sbom_files_to_submit.append(output_path) - log.info(f"Pre-generated SBOMs mode: will submit {len(sbom_files_to_submit)} files (CDX, SPDX, and facts file)") + if not manifest_files: + log.warning("No manifest files found for reachability analysis") + else: + log.info(f"Found {len(manifest_files)} manifest files for reachability upload") - except Exception as e: - log.error(f"Reachability analysis failed: {str(e)}") - if not config.disable_blocking: - sys.exit(3) - - log.info("Continuing with normal scan flow...") - - scm = None - if config.scm == "github": - from socketsecurity.core.scm.github import Github, GithubConfig - # Only pass pr_number if it's not "0" (the default) - pr_number = config.pr_number if config.pr_number != "0" else None - github_config = GithubConfig.from_env(pr_number=pr_number) - scm = Github(client=client, config=github_config) - elif config.scm == 'gitlab': - from socketsecurity.core.scm.gitlab import Gitlab, GitlabConfig - gitlab_config = GitlabConfig.from_env() - scm = Gitlab(client=client, config=gitlab_config) - # Don't override config.default_branch if it was explicitly set via --default-branch flag - # Only use SCM detection if --default-branch wasn't provided - if scm is not None and not config.default_branch: - config.default_branch = scm.config.is_default_branch - - # Override files if only-facts-file mode is active - if facts_file_to_submit: - specified_files = [facts_file_to_submit] - files_explicitly_specified = True - log.debug(f"Overriding files to only submit facts file: {facts_file_to_submit}") - - # Override files if reach-use-only-pregenerated-sboms mode is active - if sbom_files_to_submit: - specified_files = sbom_files_to_submit - files_explicitly_specified = True - log.debug(f"Overriding files to submit only SBOM files (CDX, SPDX, and facts): {sbom_files_to_submit}") - - # Determine files to check based on the new logic - files_to_check = [] - force_api_mode = False - force_diff_mode = False - - if files_explicitly_specified: - # Case 2: Files are specified - use them and don't check commit details - files_to_check = specified_files - log.debug(f"Using explicitly specified files: {files_to_check}") - elif not config.ignore_commit_files and is_repo: - # Case 1: Files not specified and --ignore-commit-files not set - try to find changed files from commit - files_to_check = git_repo.changed_files - log.debug(f"Using changed files from commit: {files_to_check}") - elif config.ignore_commit_files and is_repo: - # Case 3: Git repo with --ignore-commit-files - force diff mode - files_to_check = [] - force_diff_mode = True - log.debug("Git repo with --ignore-commit-files: forcing diff mode") - else: - # Case 4: Not a git repo (ignore_commit_files was auto-set to True) + # Upload manifests and get tar hash + log.info("Uploading manifest files...") + try: + # Get org_slug early (we'll need it) + org_slug = core.config.org_slug + + # Upload manifest files + tar_hash = sdk.uploadmanifests.upload_manifest_files( + org_slug=org_slug, + file_paths=manifest_files, + workspace=config.repo or "default-workspace", + base_paths=[config.target_path], + use_lazy_loading=False + ) + log.info(f"Manifest upload successful, tar hash: {tar_hash}") + + # Initialize and run reachability analyzer + analyzer = ReachabilityAnalyzer(sdk, config.api_token) + + # Determine output path + output_path = config.reach_output_file or ".socket.facts.json" + + # Run the analysis + result = analyzer.run_reachability_analysis( + org_slug=org_slug, + target_directory=config.target_path, + tar_hash=tar_hash, + output_path=output_path, + timeout=config.reach_analysis_timeout, + memory_limit=config.reach_analysis_memory_limit, + ecosystems=config.reach_ecosystems, + # Union the deprecated --reach-exclude-paths with the unified --exclude-paths + # and forward verbatim to coana's --exclude-dirs. Patterns are scan-root + # relative; coana resolves --exclude-dirs relative to its `run` target, which + # here is `.` == cwd == scan root, so passthrough is correct. If a nested + # target is ever supported, re-anchor patterns to the target first (see Node's + # pathRelativeToTarget in exclude-paths.mts). + exclude_paths=( + (config.reach_exclude_paths or []) + (config.exclude_paths or []) + ) or None, + min_severity=config.reach_min_severity, + skip_cache=config.reach_skip_cache or False, + disable_analytics=config.reach_disable_analytics or False, + enable_analysis_splitting=config.reach_enable_analysis_splitting or False, + detailed_analysis_log_file=config.reach_detailed_analysis_log_file or False, + lazy_mode=config.reach_lazy_mode or False, + repo_name=None if repo_defaulted else config.repo, + branch_name=None if branch_defaulted else config.branch, + version=config.reach_version, + concurrency=config.reach_concurrency, + additional_params=config.reach_additional_params, + allow_unverified=config.allow_unverified, + enable_debug=config.enable_debug, + use_only_pregenerated_sboms=config.reach_use_only_pregenerated_sboms, + continue_on_analysis_errors=config.reach_continue_on_analysis_errors, + continue_on_install_errors=config.reach_continue_on_install_errors, + continue_on_missing_lock_files=config.reach_continue_on_missing_lock_files, + continue_on_no_source_files=config.reach_continue_on_no_source_files, + reach_debug=config.reach_debug, + disable_external_tool_checks=config.reach_disable_external_tool_checks, + ) + + log.info("Reachability analysis completed successfully") + log.info(f"Results written to: {result['report_path']}") + if result.get('scan_id'): + log.info(f"Reachability scan ID: {result['scan_id']}") + + # If only-facts-file mode, mark the facts file for submission + if config.only_facts_file: + facts_file_to_submit = os.path.abspath(output_path) + log.info(f"Only-facts-file mode: will submit only {facts_file_to_submit}") + + # If reach-use-only-pregenerated-sboms mode, submit CDX, SPDX, and facts file + if config.reach_use_only_pregenerated_sboms: + # Find only CDX and SPDX files for the final scan submission + sbom_files_to_submit = [] + for scan_path in scan_paths: + sbom_files_to_submit.extend(core.find_sbom_files(scan_path)) + # Use relative path for facts file + if os.path.exists(output_path): + sbom_files_to_submit.append(output_path) + log.info(f"Pre-generated SBOMs mode: will submit {len(sbom_files_to_submit)} files (CDX, SPDX, and facts file)") + + except Exception as e: + log.error(f"Reachability analysis failed: {str(e)}") + if not config.disable_blocking: + sys.exit(3) + + log.info("Continuing with normal scan flow...") + + scm = None + if config.scm == "github": + from socketsecurity.core.scm.github import Github, GithubConfig + # Only pass pr_number if it's not "0" (the default) + pr_number = config.pr_number if config.pr_number != "0" else None + github_config = GithubConfig.from_env(pr_number=pr_number) + scm = Github(client=client, config=github_config) + elif config.scm == 'gitlab': + from socketsecurity.core.scm.gitlab import Gitlab, GitlabConfig + gitlab_config = GitlabConfig.from_env() + scm = Gitlab(client=client, config=gitlab_config) + # Don't override config.default_branch if it was explicitly set via --default-branch flag + # Only use SCM detection if --default-branch wasn't provided + if scm is not None and not config.default_branch: + config.default_branch = scm.config.is_default_branch + + # Override files if only-facts-file mode is active + if facts_file_to_submit: + specified_files = [facts_file_to_submit] + files_explicitly_specified = True + log.debug(f"Overriding files to only submit facts file: {facts_file_to_submit}") + + # Override files if reach-use-only-pregenerated-sboms mode is active + if sbom_files_to_submit: + specified_files = sbom_files_to_submit + files_explicitly_specified = True + log.debug(f"Overriding files to submit only SBOM files (CDX, SPDX, and facts): {sbom_files_to_submit}") + + # Determine files to check based on the new logic files_to_check = [] - # If --enable-diff is set, force diff mode for non-git repos - log.debug(f"Case 4: Non-git repo - config.enable_diff={config.enable_diff}, type={type(config.enable_diff)}") - if config.enable_diff: + force_api_mode = False + force_diff_mode = False + + if files_explicitly_specified: + # Case 2: Files are specified - use them and don't check commit details + files_to_check = specified_files + log.debug(f"Using explicitly specified files: {files_to_check}") + elif not config.ignore_commit_files and is_repo: + # Case 1: Files not specified and --ignore-commit-files not set - try to find changed files from commit + files_to_check = git_repo.changed_files + log.debug(f"Using changed files from commit: {files_to_check}") + elif config.ignore_commit_files and is_repo: + # Case 3: Git repo with --ignore-commit-files - force diff mode + files_to_check = [] force_diff_mode = True - log.debug("Non-git repo with --enable-diff: forcing diff mode") + log.debug("Git repo with --ignore-commit-files: forcing diff mode") else: - log.debug("Non-git repo without --enable-diff: will use full scan mode") + # Case 4: Not a git repo (ignore_commit_files was auto-set to True) + files_to_check = [] + # If --enable-diff is set, force diff mode for non-git repos + log.debug(f"Case 4: Non-git repo - config.enable_diff={config.enable_diff}, type={type(config.enable_diff)}") + if config.enable_diff: + force_diff_mode = True + log.debug("Non-git repo with --enable-diff: forcing diff mode") + else: + log.debug("Non-git repo without --enable-diff: will use full scan mode") - # Check if we have supported manifest files - has_supported_files = files_to_check and core.has_manifest_files(files_to_check) - - # If using sub_paths, we need to check if manifest files exist in the scan paths - if config.sub_paths and not files_explicitly_specified: - # Override file checking to look in the scan paths instead - # Get manifest files from all scan paths + # Check if we have supported manifest files + has_supported_files = files_to_check and core.has_manifest_files(files_to_check) + + # If using sub_paths, we need to check if manifest files exist in the scan paths + if config.sub_paths and not files_explicitly_specified: + # Override file checking to look in the scan paths instead + # Get manifest files from all scan paths + try: + all_scan_files = [] + for scan_path in scan_paths: + scan_files = core.find_files(scan_path) + all_scan_files.extend(scan_files) + has_supported_files = len(all_scan_files) > 0 + log.debug(f"Found {len(all_scan_files)} manifest files across {len(scan_paths)} scan paths") + except Exception as e: + log.debug(f"Error finding files in scan paths: {e}") + has_supported_files = False + + # Case 3: If no supported files or files are empty, force API mode (no PR comments) + # BUT: Don't force API mode if we're in force_diff_mode + log.debug(f"files_to_check={files_to_check}, has_supported_files={has_supported_files}, force_diff_mode={force_diff_mode}, config.enable_diff={config.enable_diff}") + if not has_supported_files and not force_diff_mode: + force_api_mode = True + log.debug("No supported manifest files found, forcing API mode") + log.debug(f"force_api_mode={force_api_mode}") + + # Determine scan behavior + should_skip_scan = False # Always perform scan, but behavior changes based on supported files + if not has_supported_files and not force_diff_mode: + # No supported files and not forcing diff - still scan but in API mode + should_skip_scan = False + log.debug("No supported files but will scan in API mode") + else: + log.debug("Found supported manifest files or forcing diff mode, proceeding with normal scan") + + org_slug = core.config.org_slug + if config.repo_is_public: + core.config.repo_visibility = "public" + if config.excluded_ecosystems and len(config.excluded_ecosystems) > 0: + core.config.excluded_ecosystems = config.excluded_ecosystems + integration_type = config.integration_type + integration_org_slug = config.integration_org_slug or org_slug try: - all_scan_files = [] - for scan_path in scan_paths: - scan_files = core.find_files(scan_path) - all_scan_files.extend(scan_files) - has_supported_files = len(all_scan_files) > 0 - log.debug(f"Found {len(all_scan_files)} manifest files across {len(scan_paths)} scan paths") - except Exception as e: - log.debug(f"Error finding files in scan paths: {e}") - has_supported_files = False - - # Case 3: If no supported files or files are empty, force API mode (no PR comments) - # BUT: Don't force API mode if we're in force_diff_mode - log.debug(f"files_to_check={files_to_check}, has_supported_files={has_supported_files}, force_diff_mode={force_diff_mode}, config.enable_diff={config.enable_diff}") - if not has_supported_files and not force_diff_mode: - force_api_mode = True - log.debug("No supported manifest files found, forcing API mode") - log.debug(f"force_api_mode={force_api_mode}") - - # Determine scan behavior - should_skip_scan = False # Always perform scan, but behavior changes based on supported files - if not has_supported_files and not force_diff_mode: - # No supported files and not forcing diff - still scan but in API mode - should_skip_scan = False - log.debug("No supported files but will scan in API mode") - else: - log.debug("Found supported manifest files or forcing diff mode, proceeding with normal scan") - - org_slug = core.config.org_slug - if config.repo_is_public: - core.config.repo_visibility = "public" - if config.excluded_ecosystems and len(config.excluded_ecosystems) > 0: - core.config.excluded_ecosystems = config.excluded_ecosystems - integration_type = config.integration_type - integration_org_slug = config.integration_org_slug or org_slug - try: - pr_number = int(config.pr_number) - except (ValueError, TypeError): - pr_number = 0 - - # Determine if this should be treated as default branch - # Priority order: - # 1. If --default-branch flag is explicitly set to True, use that - # 2. If SCM detected it's the default branch, use that - # 3. If it's a git repo, use git_repo.is_default_branch - # 4. Otherwise, default to False - if config.default_branch: - is_default_branch = True - elif scm is not None and hasattr(scm.config, 'is_default_branch') and scm.config.is_default_branch: - is_default_branch = True - elif is_repo and git_repo.is_default_branch: - is_default_branch = True - else: - is_default_branch = False - - params = FullScanParams( - org_slug=org_slug, - integration_type=integration_type, - integration_org_slug=integration_org_slug, - repo=config.repo, - branch=config.branch, - commit_message=config.commit_message, - commit_hash=config.commit_sha, - pull_request=pr_number, - committers=config.committers, - make_default_branch=is_default_branch, - set_as_pending_head=is_default_branch, - tmp=False, - scan_type='socket_tier1' if config.reach else 'socket', - workspace=config.workspace or None, - ) - - params.include_license_details = not config.exclude_license_details - - # Initialize diff - diff = Diff() - diff.id = "NO_DIFF_RAN" - diff.diff_url = "" - diff.report_url = "" - - # Handle SCM-specific flows - log.debug(f"Flow decision: scm={scm is not None}, force_diff_mode={force_diff_mode}, force_api_mode={force_api_mode}, enable_diff={config.enable_diff}") - - def _is_unprocessed(c): - """Check if an ignore comment has not yet been marked with '+1' reaction. - For GitHub, reactions['+1'] is already in the comment response (no extra call). - For GitLab, has_thumbsup_reaction() makes a lazy API call per comment.""" - if getattr(c, "reactions", {}).get("+1"): - return False - if hasattr(scm, "has_thumbsup_reaction") and scm.has_thumbsup_reaction(c.id): - return False - return True - - if scm is not None and scm.check_event_type() == "comment": - # FIXME: This entire flow should be a separate command called "filter_ignored_alerts_in_comments" - # It's not related to scanning or diff generation - it just: - # 1. Triggers on comments in GitHub/GitLab - # 2. If comment was from Socket, checks for ignore reactions - # 3. Updates the comment to remove ignored alerts - # This is completely separate from the main scanning functionality - log.info("Comment initiated flow") - - if not config.disable_ignore: - comments = scm.get_comments_for_pr() - - # Emit telemetry for ignore comments before +1 reaction is added. - # The +1 reaction (added by remove_comment_alerts) serves as the "processed" marker. - if "ignore" in comments: - unprocessed = [c for c in comments["ignore"] if _is_unprocessed(c)] - if unprocessed: - try: - events = [] - for c in unprocessed: - single = {"ignore": [c]} - ignore_all, ignore_commands = Comments.get_ignore_options(single) - user = getattr(c, "user", None) or getattr(c, "author", None) or {} - now = datetime.now(timezone.utc).isoformat() - shared_fields = { - "event_kind": "user-action", - "client_action": "ignore", - "alert_action": "error", - "event_sender_created_at": now, - "vcs_provider": integration_type, - "owner": config.repo.split("/")[0] if "/" in config.repo else "", - "repo": config.repo, - "pr_number": pr_number, - "ignore_all": ignore_all, - "sender_name": user.get("login") or user.get("username", ""), - "sender_id": str(user.get("id", "")), - } - if ignore_commands: - for name, version in ignore_commands: - events.append({**shared_fields, "event_id": str(uuid4()), "artifact_input": f"{name}@{version}"}) - elif ignore_all: - events.append({**shared_fields, "event_id": str(uuid4())}) - - if events: - log.debug(f"Ignore telemetry: {len(events)} events to send") - client.post_telemetry_events(org_slug, events) - except Exception as e: - log.warning(f"Failed to send ignore telemetry: {e}") - - log.debug("Removing comment alerts") - scm.remove_comment_alerts(comments) + pr_number = int(config.pr_number) + except (ValueError, TypeError): + pr_number = 0 + + # Determine if this should be treated as default branch + # Priority order: + # 1. If --default-branch flag is explicitly set to True, use that + # 2. If SCM detected it's the default branch, use that + # 3. If it's a git repo, use git_repo.is_default_branch + # 4. Otherwise, default to False + if config.default_branch: + is_default_branch = True + elif scm is not None and hasattr(scm.config, 'is_default_branch') and scm.config.is_default_branch: + is_default_branch = True + elif is_repo and git_repo.is_default_branch: + is_default_branch = True else: - log.info("Ignore commands disabled (--disable-ignore), skipping comment processing") - - elif scm is not None and scm.check_event_type() != "comment" and not force_api_mode: - log.info("Push initiated flow") - if scm.check_event_type() == "diff": - log.info("Starting comment logic for PR/MR event") - diff = core.create_new_diff(scan_paths, params, no_change=should_skip_scan, save_files_list_path=config.save_submitted_files_list, save_manifest_tar_path=config.save_manifest_tar, base_paths=base_paths, explicit_files=sbom_files_to_submit) - comments = scm.get_comments_for_pr() + is_default_branch = False + + params = FullScanParams( + org_slug=org_slug, + integration_type=integration_type, + integration_org_slug=integration_org_slug, + repo=config.repo, + branch=config.branch, + commit_message=config.commit_message, + commit_hash=config.commit_sha, + pull_request=pr_number, + committers=config.committers, + make_default_branch=is_default_branch, + set_as_pending_head=is_default_branch, + tmp=False, + scan_type='socket_tier1' if config.reach else 'socket', + workspace=config.workspace or None, + ) + + params.include_license_details = not config.exclude_license_details + + # Initialize diff + diff = Diff() + diff.id = "NO_DIFF_RAN" + diff.diff_url = "" + diff.report_url = "" + + # Handle SCM-specific flows + log.debug(f"Flow decision: scm={scm is not None}, force_diff_mode={force_diff_mode}, force_api_mode={force_api_mode}, enable_diff={config.enable_diff}") + + def _is_unprocessed(c): + """Check if an ignore comment has not yet been marked with '+1' reaction. + For GitHub, reactions['+1'] is already in the comment response (no extra call). + For GitLab, has_thumbsup_reaction() makes a lazy API call per comment.""" + if getattr(c, "reactions", {}).get("+1"): + return False + if hasattr(scm, "has_thumbsup_reaction") and scm.has_thumbsup_reaction(c.id): + return False + return True + + if scm is not None and scm.check_event_type() == "comment": + # FIXME: This entire flow should be a separate command called "filter_ignored_alerts_in_comments" + # It's not related to scanning or diff generation - it just: + # 1. Triggers on comments in GitHub/GitLab + # 2. If comment was from Socket, checks for ignore reactions + # 3. Updates the comment to remove ignored alerts + # This is completely separate from the main scanning functionality + log.info("Comment initiated flow") - # FIXME: this overwrites diff.new_alerts, which was previously populated by Core.create_issue_alerts if not config.disable_ignore: + comments = scm.get_comments_for_pr() + + # Emit telemetry for ignore comments before +1 reaction is added. + # The +1 reaction (added by remove_comment_alerts) serves as the "processed" marker. + if "ignore" in comments: + unprocessed = [c for c in comments["ignore"] if _is_unprocessed(c)] + if unprocessed: + try: + events = [] + for c in unprocessed: + single = {"ignore": [c]} + ignore_all, ignore_commands = Comments.get_ignore_options(single) + user = getattr(c, "user", None) or getattr(c, "author", None) or {} + now = datetime.now(timezone.utc).isoformat() + shared_fields = { + "event_kind": "user-action", + "client_action": "ignore", + "alert_action": "error", + "event_sender_created_at": now, + "vcs_provider": integration_type, + "owner": config.repo.split("/")[0] if "/" in config.repo else "", + "repo": config.repo, + "pr_number": pr_number, + "ignore_all": ignore_all, + "sender_name": user.get("login") or user.get("username", ""), + "sender_id": str(user.get("id", "")), + } + if ignore_commands: + for name, version in ignore_commands: + events.append({**shared_fields, "event_id": str(uuid4()), "artifact_input": f"{name}@{version}"}) + elif ignore_all: + events.append({**shared_fields, "event_id": str(uuid4())}) + + if events: + log.debug(f"Ignore telemetry: {len(events)} events to send") + client.post_telemetry_events(org_slug, events) + except Exception as e: + log.warning(f"Failed to send ignore telemetry: {e}") + log.debug("Removing comment alerts") - alerts_before = list(diff.new_alerts) - diff.new_alerts = Comments.remove_alerts(comments, diff.new_alerts) - - ignored_alerts = [a for a in alerts_before if a not in diff.new_alerts] - # Emit telemetry per-comment so each event carries the comment author. - unprocessed_ignore = [ - c for c in comments.get("ignore", []) - if _is_unprocessed(c) - ] - if ignored_alerts and unprocessed_ignore: - try: - events = [] - now = datetime.now(timezone.utc).isoformat() - for c in unprocessed_ignore: - single = {"ignore": [c]} - c_ignore_all, c_ignore_commands = Comments.get_ignore_options(single) - user = getattr(c, "user", None) or getattr(c, "author", None) or {} - sender_name = user.get("login") or user.get("username", "") - sender_id = str(user.get("id", "")) - - # Match this comment's targets to the actual ignored alerts - matched_alerts = [] - if c_ignore_all: - matched_alerts = ignored_alerts - else: - for alert in ignored_alerts: - full_name = f"{alert.pkg_type}/{alert.pkg_name}" - purl = (full_name, alert.pkg_version) - purl_star = (full_name, "*") - if purl in c_ignore_commands or purl_star in c_ignore_commands: - matched_alerts.append(alert) - - shared_fields = { - "event_kind": "user-action", - "client_action": "ignore", - "event_sender_created_at": now, - "vcs_provider": integration_type, - "owner": config.repo.split("/")[0] if "/" in config.repo else "", - "repo": config.repo, - "pr_number": pr_number, - "ignore_all": c_ignore_all, - "sender_name": sender_name, - "sender_id": sender_id, - } - if matched_alerts: - for alert in matched_alerts: - # Derive alert_action from the alert's resolved action flags - if getattr(alert, "error", False): - alert_action = "error" - elif getattr(alert, "warn", False): - alert_action = "warn" - elif getattr(alert, "monitor", False): - alert_action = "monitor" - else: - alert_action = "error" - events.append({**shared_fields, "alert_action": alert_action, "event_id": str(uuid4()), "artifact_purl": alert.purl}) - elif c_ignore_all: - events.append({**shared_fields, "event_id": str(uuid4())}) - - if events: - client.post_telemetry_events(org_slug, events) - - # Mark ignore comments as processed with +1 reaction - if hasattr(scm, "handle_ignore_reactions"): - scm.handle_ignore_reactions(comments) - except Exception as e: - log.warning(f"Failed to send ignore telemetry: {e}") + scm.remove_comment_alerts(comments) else: - log.info("Ignore commands disabled (--disable-ignore), all alerts will be reported") - - log.debug("Creating Dependency Overview Comment") - - overview_comment = Messages.dependency_overview_template(diff) - log.debug("Creating Security Issues Comment") - - security_comment = Messages.security_comment_template(diff, config) - - new_security_comment = True - new_overview_comment = True - - update_old_security_comment = ( - security_comment is None or - security_comment == "" or - (len(comments) != 0 and comments.get("security") is not None) - ) - - update_old_overview_comment = ( - overview_comment is None or - overview_comment == "" or - (len(comments) != 0 and comments.get("overview") is not None) - ) - - if len(diff.new_alerts) == 0 or config.disable_security_issue: - if not update_old_security_comment: - new_security_comment = False - log.debug("No new alerts or security issue comment disabled") - else: - log.debug("Updated security comment with no new alerts") - - # FIXME: diff.new_packages is never populated, neither is removed_packages - if (len(diff.new_packages) == 0) or config.disable_overview: - if not update_old_overview_comment: - new_overview_comment = False - log.debug("No new/removed packages or Dependency Overview comment disabled") + log.info("Ignore commands disabled (--disable-ignore), skipping comment processing") + + elif scm is not None and scm.check_event_type() != "comment" and not force_api_mode: + log.info("Push initiated flow") + if scm.check_event_type() == "diff": + log.info("Starting comment logic for PR/MR event") + diff = core.create_new_diff(scan_paths, params, no_change=should_skip_scan, save_files_list_path=config.save_submitted_files_list, save_manifest_tar_path=config.save_manifest_tar, base_paths=base_paths, explicit_files=sbom_files_to_submit) + comments = scm.get_comments_for_pr() + + # FIXME: this overwrites diff.new_alerts, which was previously populated by Core.create_issue_alerts + if not config.disable_ignore: + log.debug("Removing comment alerts") + alerts_before = list(diff.new_alerts) + diff.new_alerts = Comments.remove_alerts(comments, diff.new_alerts) + + ignored_alerts = [a for a in alerts_before if a not in diff.new_alerts] + # Emit telemetry per-comment so each event carries the comment author. + unprocessed_ignore = [ + c for c in comments.get("ignore", []) + if _is_unprocessed(c) + ] + if ignored_alerts and unprocessed_ignore: + try: + events = [] + now = datetime.now(timezone.utc).isoformat() + for c in unprocessed_ignore: + single = {"ignore": [c]} + c_ignore_all, c_ignore_commands = Comments.get_ignore_options(single) + user = getattr(c, "user", None) or getattr(c, "author", None) or {} + sender_name = user.get("login") or user.get("username", "") + sender_id = str(user.get("id", "")) + + # Match this comment's targets to the actual ignored alerts + matched_alerts = [] + if c_ignore_all: + matched_alerts = ignored_alerts + else: + for alert in ignored_alerts: + full_name = f"{alert.pkg_type}/{alert.pkg_name}" + purl = (full_name, alert.pkg_version) + purl_star = (full_name, "*") + if purl in c_ignore_commands or purl_star in c_ignore_commands: + matched_alerts.append(alert) + + shared_fields = { + "event_kind": "user-action", + "client_action": "ignore", + "event_sender_created_at": now, + "vcs_provider": integration_type, + "owner": config.repo.split("/")[0] if "/" in config.repo else "", + "repo": config.repo, + "pr_number": pr_number, + "ignore_all": c_ignore_all, + "sender_name": sender_name, + "sender_id": sender_id, + } + if matched_alerts: + for alert in matched_alerts: + # Derive alert_action from the alert's resolved action flags + if getattr(alert, "error", False): + alert_action = "error" + elif getattr(alert, "warn", False): + alert_action = "warn" + elif getattr(alert, "monitor", False): + alert_action = "monitor" + else: + alert_action = "error" + events.append({**shared_fields, "alert_action": alert_action, "event_id": str(uuid4()), "artifact_purl": alert.purl}) + elif c_ignore_all: + events.append({**shared_fields, "event_id": str(uuid4())}) + + if events: + client.post_telemetry_events(org_slug, events) + + # Mark ignore comments as processed with +1 reaction + if hasattr(scm, "handle_ignore_reactions"): + scm.handle_ignore_reactions(comments) + except Exception as e: + log.warning(f"Failed to send ignore telemetry: {e}") else: - log.debug("Updated overview comment with no dependencies") - - log.debug(f"Adding comments for {config.scm}") - scm.add_socket_comments( - security_comment, - overview_comment, - comments, - new_security_comment, - new_overview_comment - ) - else: - log.info("Starting non-PR/MR flow") - diff = core.create_new_diff(scan_paths, params, no_change=should_skip_scan, save_files_list_path=config.save_submitted_files_list, save_manifest_tar_path=config.save_manifest_tar, base_paths=base_paths, explicit_files=sbom_files_to_submit) + log.info("Ignore commands disabled (--disable-ignore), all alerts will be reported") - output_handler.handle_output(diff) + log.debug("Creating Dependency Overview Comment") + + overview_comment = Messages.dependency_overview_template(diff) + log.debug("Creating Security Issues Comment") + + security_comment = Messages.security_comment_template(diff, config) + + new_security_comment = True + new_overview_comment = True + + update_old_security_comment = ( + security_comment is None or + security_comment == "" or + (len(comments) != 0 and comments.get("security") is not None) + ) + + update_old_overview_comment = ( + overview_comment is None or + overview_comment == "" or + (len(comments) != 0 and comments.get("overview") is not None) + ) + + if len(diff.new_alerts) == 0 or config.disable_security_issue: + if not update_old_security_comment: + new_security_comment = False + log.debug("No new alerts or security issue comment disabled") + else: + log.debug("Updated security comment with no new alerts") + + # FIXME: diff.new_packages is never populated, neither is removed_packages + if (len(diff.new_packages) == 0) or config.disable_overview: + if not update_old_overview_comment: + new_overview_comment = False + log.debug("No new/removed packages or Dependency Overview comment disabled") + else: + log.debug("Updated overview comment with no dependencies") + + log.debug(f"Adding comments for {config.scm}") + scm.add_socket_comments( + security_comment, + overview_comment, + comments, + new_security_comment, + new_overview_comment + ) + else: + log.info("Starting non-PR/MR flow") + diff = core.create_new_diff(scan_paths, params, no_change=should_skip_scan, save_files_list_path=config.save_submitted_files_list, save_manifest_tar_path=config.save_manifest_tar, base_paths=base_paths, explicit_files=sbom_files_to_submit) - elif (config.enable_diff or force_diff_mode) and not force_api_mode: - # New logic: --enable-diff or force_diff_mode (from --ignore-commit-files in git repos) forces diff mode - log.info("Diff mode enabled without SCM integration") - diff = core.create_new_diff(scan_paths, params, no_change=should_skip_scan, save_files_list_path=config.save_submitted_files_list, save_manifest_tar_path=config.save_manifest_tar, base_paths=base_paths, explicit_files=sbom_files_to_submit) - output_handler.handle_output(diff) - - elif (config.enable_diff or force_diff_mode) and force_api_mode: - # User requested diff mode but no manifest files were detected - this should not happen with new logic - # but keeping as a safety net - log.warning("--enable-diff was specified but no supported manifest files were detected in the changed files. Falling back to full scan mode.") - log.info("Creating Socket Report (full scan)") - serializable_params = { - key: value if isinstance(value, (int, float, str, list, dict, bool, type(None))) else str(value) - for key, value in params.__dict__.items() - } - log.debug(f"params={serializable_params}") - diff = core.create_full_scan_with_report_url( - scan_paths, - params, - no_change=should_skip_scan, - save_files_list_path=config.save_submitted_files_list, - save_manifest_tar_path=config.save_manifest_tar, - base_paths=base_paths, - explicit_files=sbom_files_to_submit - ) - log.info(f"Full scan created with ID: {diff.id}") - log.info(f"Full scan report URL: {diff.report_url}") - output_handler.handle_output(diff) + output_handler.handle_output(diff) - else: - if force_api_mode: - log.info("No Manifest files changed, creating Socket Report") + elif (config.enable_diff or force_diff_mode) and not force_api_mode: + # New logic: --enable-diff or force_diff_mode (from --ignore-commit-files in git repos) forces diff mode + log.info("Diff mode enabled without SCM integration") + diff = core.create_new_diff(scan_paths, params, no_change=should_skip_scan, save_files_list_path=config.save_submitted_files_list, save_manifest_tar_path=config.save_manifest_tar, base_paths=base_paths, explicit_files=sbom_files_to_submit) + output_handler.handle_output(diff) + + elif (config.enable_diff or force_diff_mode) and force_api_mode: + # User requested diff mode but no manifest files were detected - this should not happen with new logic + # but keeping as a safety net + log.warning("--enable-diff was specified but no supported manifest files were detected in the changed files. Falling back to full scan mode.") + log.info("Creating Socket Report (full scan)") serializable_params = { key: value if isinstance(value, (int, float, str, list, dict, bool, type(None))) else str(value) for key, value in params.__dict__.items() @@ -859,64 +830,85 @@ def _is_unprocessed(c): log.info(f"Full scan created with ID: {diff.id}") log.info(f"Full scan report URL: {diff.report_url}") output_handler.handle_output(diff) + else: - log.info("API Mode") - diff = core.create_new_diff( - scan_paths, params, - no_change=should_skip_scan, - save_files_list_path=config.save_submitted_files_list, - save_manifest_tar_path=config.save_manifest_tar, - base_paths=base_paths, - explicit_files=sbom_files_to_submit - ) - output_handler.handle_output(diff) + if force_api_mode: + log.info("No Manifest files changed, creating Socket Report") + serializable_params = { + key: value if isinstance(value, (int, float, str, list, dict, bool, type(None))) else str(value) + for key, value in params.__dict__.items() + } + log.debug(f"params={serializable_params}") + diff = core.create_full_scan_with_report_url( + scan_paths, + params, + no_change=should_skip_scan, + save_files_list_path=config.save_submitted_files_list, + save_manifest_tar_path=config.save_manifest_tar, + base_paths=base_paths, + explicit_files=sbom_files_to_submit + ) + log.info(f"Full scan created with ID: {diff.id}") + log.info(f"Full scan report URL: {diff.report_url}") + output_handler.handle_output(diff) + else: + log.info("API Mode") + diff = core.create_new_diff( + scan_paths, params, + no_change=should_skip_scan, + save_files_list_path=config.save_submitted_files_list, + save_manifest_tar_path=config.save_manifest_tar, + base_paths=base_paths, + explicit_files=sbom_files_to_submit + ) + output_handler.handle_output(diff) - if diff.id not in ("NO_DIFF_RAN", "NO_SCAN_RAN"): - set_report_run_id(diff.id) + if diff.id not in ("NO_DIFF_RAN", "NO_SCAN_RAN"): + streaming.set_report_run_id(diff.id) - # Handle license generation - if not should_skip_scan and diff.id != "NO_DIFF_RAN" and diff.id != "NO_SCAN_RAN" and config.generate_license: - all_packages = build_license_artifact_payload( - diff, - legal_format=getattr(config, "legal_format", "socket"), - config=config, - ) - _write_attribution_file(config, all_packages) + # Handle license generation + if not should_skip_scan and diff.id != "NO_DIFF_RAN" and diff.id != "NO_SCAN_RAN" and config.generate_license: + all_packages = build_license_artifact_payload( + diff, + legal_format=getattr(config, "legal_format", "socket"), + config=config, + ) + _write_attribution_file(config, all_packages) - # If we forced API mode due to no supported files, behave as if --disable-blocking was set - if force_api_mode: - if config.strict_blocking: - log.warning("--strict-blocking is only supported in diff mode. " - "API mode (no diff) cannot evaluate existing violations.") - if not config.disable_blocking: - log.debug("Temporarily enabling disable_blocking due to no supported manifest files") - config.disable_blocking = True - - # Post commit status to GitLab if enabled - if config.enable_commit_status and scm is not None: - from socketsecurity.core.scm.gitlab import Gitlab - if isinstance(scm, Gitlab) and scm.config.mr_project_id: - scm.enable_merge_pipeline_check() - passed = output_handler.report_pass(diff) - state = "success" if passed else "failed" - new_blocking = sum(1 for a in diff.new_alerts if a.error) - unchanged_blocking = 0 - if config.strict_blocking and hasattr(diff, 'unchanged_alerts'): - unchanged_blocking = sum(1 for a in diff.unchanged_alerts if a.error) - blocking_count = new_blocking + unchanged_blocking - if passed: - description = "No blocking issues" - else: - parts = [] - if new_blocking: - parts.append(f"{new_blocking} new") - if unchanged_blocking: - parts.append(f"{unchanged_blocking} existing") - description = f"{blocking_count} blocking alert(s) found ({', '.join(parts)})" - target_url = diff.report_url or diff.diff_url or "" - scm.set_commit_status(state, description, target_url) - - sys.exit(output_handler.return_exit_code(diff)) + # If we forced API mode due to no supported files, behave as if --disable-blocking was set + if force_api_mode: + if config.strict_blocking: + log.warning("--strict-blocking is only supported in diff mode. " + "API mode (no diff) cannot evaluate existing violations.") + if not config.disable_blocking: + log.debug("Temporarily enabling disable_blocking due to no supported manifest files") + config.disable_blocking = True + + # Post commit status to GitLab if enabled + if config.enable_commit_status and scm is not None: + from socketsecurity.core.scm.gitlab import Gitlab + if isinstance(scm, Gitlab) and scm.config.mr_project_id: + scm.enable_merge_pipeline_check() + passed = output_handler.report_pass(diff) + state = "success" if passed else "failed" + new_blocking = sum(1 for a in diff.new_alerts if a.error) + unchanged_blocking = 0 + if config.strict_blocking and hasattr(diff, 'unchanged_alerts'): + unchanged_blocking = sum(1 for a in diff.unchanged_alerts if a.error) + blocking_count = new_blocking + unchanged_blocking + if passed: + description = "No blocking issues" + else: + parts = [] + if new_blocking: + parts.append(f"{new_blocking} new") + if unchanged_blocking: + parts.append(f"{unchanged_blocking} existing") + description = f"{blocking_count} blocking alert(s) found ({', '.join(parts)})" + target_url = diff.report_url or diff.diff_url or "" + scm.set_commit_status(state, description, target_url) + + sys.exit(output_handler.return_exit_code(diff)) if __name__ == '__main__': diff --git a/tests/unit/test_log_uploader.py b/tests/unit/test_log_uploader.py index 5555454..629d092 100644 --- a/tests/unit/test_log_uploader.py +++ b/tests/unit/test_log_uploader.py @@ -32,7 +32,7 @@ def test_flush_posts_batch_and_clears_buffer(): client = Mock(spec=CliClient) u = BatchedLogUploader(client, "run-y", flush_interval=10) u.add({"timestamp": "t", "level": "INFO", "message": "a", "context": "c"}) - u.add({"timestamp": "t", "level": "WARN", "message": "b", "context": "c"}) + u.add({"timestamp": "t", "level": "WARNING", "message": "b", "context": "c"}) u._flush() @@ -87,7 +87,7 @@ def test_handler_emit_enqueues_record(caplog): assert len(u._buf) == 1 e = u._buf[0] - assert e["level"] == "WARN" + assert e["level"] == "WARNING" assert e["message"] == "watch out" assert e["context"] == "socket-python-cli" @@ -121,13 +121,7 @@ def test_levels_map_correctly(): u = BatchedLogUploader(client, "run-l", flush_interval=10) h = UploadingLogHandler(u) - for py_level, expected in [ - (logging.DEBUG, "DEBUG"), - (logging.INFO, "INFO"), - (logging.WARNING, "WARN"), - (logging.ERROR, "ERROR"), - (logging.CRITICAL, "ERROR"), - ]: + for py_level in (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL): rec = logging.LogRecord( name="t", level=py_level, pathname=__file__, lineno=1, msg="m", args=(), exc_info=None, @@ -135,7 +129,7 @@ def test_levels_map_correctly(): h.emit(rec) levels = [e["level"] for e in u._buf] - assert levels == ["DEBUG", "INFO", "WARN", "ERROR", "ERROR"] + assert levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] def test_run_thread_flushes_periodically_then_exits(): diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index 20b8c65..5780698 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -1,111 +1,109 @@ import logging from unittest.mock import patch -import pytest - import socketsecurity.core.streaming as streaming_mod -from socketsecurity.core.streaming import ( - set_report_run_id, - set_run_status, - setup_streaming, -) - - -@pytest.fixture(autouse=True) -def reset_streaming_state(): - streaming_mod._run_status = "success" - streaming_mod._report_run_id = None - yield - streaming_mod._run_status = "success" - streaming_mod._report_run_id = None +from socketsecurity.core.streaming import StreamingLogs, setup_streaming -def test_setup_streaming_returns_none_when_register_fails(): - with patch("socketsecurity.core.streaming.register_cli_run", return_value=None): - teardown = setup_streaming( - client=object(), - cli_logger=logging.getLogger("t-fail"), - sdk_logger=logging.getLogger("t-fail-sdk"), - client_version="1.0", - share_logs=True, - decline_logs=False, - enable_debug=False, - ) - assert teardown is None - +def _make(**overrides): + kwargs = dict( + client=object(), + cli_logger=logging.getLogger(overrides.pop("cli_name", "t-cli")), + sdk_logger=logging.getLogger(overrides.pop("sdk_name", "t-sdk")), + client_version="1.0", + share_logs=True, + decline_logs=False, + enable_debug=False, + ) + kwargs.update(overrides) + return setup_streaming(**kwargs) -def test_teardown_finalizes_with_current_run_status(): - cli_logger = logging.getLogger("t-finalize-cli") - sdk_logger = logging.getLogger("t-finalize-sdk") +def test_setup_streaming_is_noop_when_register_fails(): finalize_calls = [] + with patch("socketsecurity.core.streaming.register_cli_run", return_value=None), \ + patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=lambda *a, **k: finalize_calls.append(k)): + with _make(cli_name="t-fail-cli", sdk_name="t-fail-sdk") as streaming: + assert isinstance(streaming, StreamingLogs) + # No run was registered → finalize must not be called. + assert finalize_calls == [] - def fake_finalize(client, run_id, status="success", report_run_id=None): - finalize_calls.append((status, report_run_id)) - with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-1"), \ - patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ +def test_clean_exit_reports_success(): + finalize_calls = [] + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-ok"), \ + patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=lambda c, r, status, report_run_id: finalize_calls.append((status, report_run_id))), \ patch.object(streaming_mod.BatchedLogUploader, "start"), \ patch.object(streaming_mod.BatchedLogUploader, "stop"): - teardown = setup_streaming( - client=object(), - cli_logger=cli_logger, - sdk_logger=sdk_logger, - client_version="1.0", - share_logs=True, - decline_logs=False, - enable_debug=False, - ) - assert teardown is not None - - set_run_status("failure") - set_report_run_id("fs-xyz") - teardown() - - assert finalize_calls == [("failure", "fs-xyz")] - + with _make(cli_name="t-ok-cli", sdk_name="t-ok-sdk"): + pass + assert finalize_calls == [("success", None)] -def test_set_run_status_default_is_success(): - cli_logger = logging.getLogger("t-default-cli") - sdk_logger = logging.getLogger("t-default-sdk") +def test_exception_reports_failure_and_propagates(): finalize_calls = [] - - def fake_finalize(client, run_id, status="success", report_run_id=None): - finalize_calls.append((status, report_run_id)) - - with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-2"), \ - patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=fake_finalize), \ + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-x"), \ + patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=lambda c, r, status, report_run_id: finalize_calls.append((status, report_run_id))), \ patch.object(streaming_mod.BatchedLogUploader, "start"), \ patch.object(streaming_mod.BatchedLogUploader, "stop"): - teardown = setup_streaming( - client=object(), - cli_logger=cli_logger, - sdk_logger=sdk_logger, - client_version="1.0", - share_logs=True, - decline_logs=False, - enable_debug=False, - ) - teardown() - - assert finalize_calls == [("success", None)] - - -def test_setup_streaming_restores_logger_state_on_teardown(): + raised = False + try: + with _make(cli_name="t-exc-cli", sdk_name="t-exc-sdk") as streaming: + streaming.set_report_run_id("fs-1") + raise RuntimeError("boom") + except RuntimeError: + raised = True + assert raised # exception not swallowed + assert finalize_calls == [("failure", "fs-1")] + + +def test_keyboard_interrupt_reports_cancelled(): + finalize_calls = [] + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-ki"), \ + patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=lambda c, r, status, report_run_id: finalize_calls.append((status, report_run_id))), \ + patch.object(streaming_mod.BatchedLogUploader, "start"), \ + patch.object(streaming_mod.BatchedLogUploader, "stop"): + try: + with _make(cli_name="t-ki-cli", sdk_name="t-ki-sdk"): + raise KeyboardInterrupt + except KeyboardInterrupt: + pass + assert finalize_calls == [("cancelled", None)] + + +def test_system_exit_zero_is_success_nonzero_is_failure(): + statuses = [] + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-sx"), \ + patch("socketsecurity.core.streaming.finalize_cli_run", side_effect=lambda c, r, status, report_run_id: statuses.append(status)), \ + patch.object(streaming_mod.BatchedLogUploader, "start"), \ + patch.object(streaming_mod.BatchedLogUploader, "stop"): + try: + with _make(cli_name="t-sx0-cli", sdk_name="t-sx0-sdk"): + raise SystemExit(0) + except SystemExit: + pass + try: + with _make(cli_name="t-sx1-cli", sdk_name="t-sx1-sdk"): + raise SystemExit(1) + except SystemExit: + pass + assert statuses == ["success", "failure"] + + +def test_restores_logger_state_on_exit(): cli_logger = logging.getLogger("t-restore-cli") sdk_logger = logging.getLogger("t-restore-sdk") cli_logger.setLevel(logging.WARNING) sdk_logger.setLevel(logging.ERROR) cli_logger.propagate = True sdk_logger.propagate = True - handler_count_before = (len(cli_logger.handlers), len(sdk_logger.handlers)) + handlers_before = (len(cli_logger.handlers), len(sdk_logger.handlers)) - with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-3"), \ + with patch("socketsecurity.core.streaming.register_cli_run", return_value="run-r"), \ patch("socketsecurity.core.streaming.finalize_cli_run"), \ patch.object(streaming_mod.BatchedLogUploader, "start"), \ patch.object(streaming_mod.BatchedLogUploader, "stop"): - teardown = setup_streaming( + with setup_streaming( client=object(), cli_logger=cli_logger, sdk_logger=sdk_logger, @@ -113,16 +111,15 @@ def test_setup_streaming_restores_logger_state_on_teardown(): share_logs=True, decline_logs=False, enable_debug=False, - ) - # During streaming: levels and propagate are forced - assert cli_logger.level == logging.DEBUG - assert sdk_logger.level == logging.DEBUG - assert cli_logger.propagate is False - assert sdk_logger.propagate is False - teardown() + ): + # Inside the with block: levels and propagate are forced. + assert cli_logger.level == logging.DEBUG + assert sdk_logger.level == logging.DEBUG + assert cli_logger.propagate is False + assert sdk_logger.propagate is False assert cli_logger.level == logging.WARNING assert sdk_logger.level == logging.ERROR assert cli_logger.propagate is True assert sdk_logger.propagate is True - assert (len(cli_logger.handlers), len(sdk_logger.handlers)) == handler_count_before + assert (len(cli_logger.handlers), len(sdk_logger.handlers)) == handlers_before From d6300137550e648a0b46211071c913ada5c70345 Mon Sep 17 00:00:00 2001 From: barslev Date: Thu, 11 Jun 2026 15:38:04 +0200 Subject: [PATCH 12/12] test(log_uploader): cover cross-thread emit during active flush Adds a deterministic regression test that parks the uploader thread inside _flush() via a threading.Event, emits a real log record from the main thread while _FLUSH_GUARD.active is set on the uploader thread, and asserts the record lands in the next batch (not dropped). Documents that the thread-local guard only blocks recursive emits on the uploader thread itself. --- tests/unit/test_log_uploader.py | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/unit/test_log_uploader.py b/tests/unit/test_log_uploader.py index 629d092..81c0311 100644 --- a/tests/unit/test_log_uploader.py +++ b/tests/unit/test_log_uploader.py @@ -1,5 +1,6 @@ import json import logging +import threading import time from unittest.mock import Mock @@ -132,6 +133,55 @@ def test_levels_map_correctly(): assert levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] +def test_emit_during_active_flush_on_another_thread_is_not_dropped(): + # Race scenario: the uploader thread is mid-flush (request in flight, _FLUSH_GUARD + # active on the uploader thread) while a real log record is emitted from a different + # thread. The thread-local guard must NOT block the other thread's emit; the record + # must land in the new buffer and ship on the next flush. + client = Mock(spec=CliClient) + u = BatchedLogUploader(client, "run-race", flush_interval=10) + h = UploadingLogHandler(u) + + flush_in_flight = threading.Event() + release_flush = threading.Event() + request_bodies: list = [] + + def blocking_request(**kwargs): + request_bodies.append(json.loads(kwargs["payload"])) + flush_in_flight.set() + release_flush.wait(timeout=2.0) + return Mock() + + client.request.side_effect = blocking_request + + # Seed the first batch and start a flush on a worker thread so the main thread can + # observe + interleave with it. + u.add({"timestamp": "t", "level": "INFO", "message": "first", "context": "c"}) + flusher = threading.Thread(target=u._flush, daemon=True) + flusher.start() + assert flush_in_flight.wait(timeout=2.0), "uploader never entered _flush" + + # While the uploader is parked inside _flush() with _FLUSH_GUARD.active set on its + # own thread, emit a real log from the main thread. The guard is thread-local, so + # this emit must proceed and land in the freshly-swapped buffer. + rec = logging.LogRecord( + name="socketcli", level=logging.INFO, pathname=__file__, + lineno=1, msg="emitted-during-flush", args=(), exc_info=None, + ) + h.emit(rec) + assert len(u._buf) == 1 + assert u._buf[0]["message"] == "emitted-during-flush" + + # Let the in-flight flush finish, then drain — the record must ship. + release_flush.set() + flusher.join(timeout=2.0) + u._flush() + + assert len(request_bodies) == 2 + assert request_bodies[0]["logs"][0]["message"] == "first" + assert request_bodies[1]["logs"][0]["message"] == "emitted-during-flush" + + def test_run_thread_flushes_periodically_then_exits(): client = Mock(spec=CliClient) u = BatchedLogUploader(client, "run-t", flush_interval=0.05)