From 169750a62e4cbc1965ebb04b02c8a1302453b421 Mon Sep 17 00:00:00 2001
From: Claude <noreply@anthropic.com>
Date: Wed, 20 May 2026 18:22:32 +0000
Subject: [PATCH] tools: add NWC protocol black-box e2e test suite

A small Python test rig under tools/nwc_protocol_test/ that exercises
the multiplayer wire protocol in src/simutrans/network/network_cmd*.cc
end-to-end.  Each test spawns a headless server (its own temp
userdir, free port), sends one hand-rolled packet over TCP, and
asserts on the parsed reply.

Two overlapping purposes:

  * Pin the on-wire layout (NWC_AUTH_PLAYER request/reply byte-exact,
    NWC_TOOL / NWC_SERVICE request shape) so a refactor that drops
    or reorders a field breaks CI instead of silently shifting bits.

  * Trip on regressions in the network security class.  Three
    representative tests:

      - test_auth_player.test_null_slot
          NWC_AUTH_PLAYER targeting an unfilled player slot (the
          starter map leaves 2..14 empty) must return the
          wrong-password reply shape, not dereference a NULL slot
          inside welt->get_player(...)->access_password_hash().

      - test_tool.test_forged_client_id_does_not_crash
          Pre-auth NWC_TOOL with our_client_id=0xFFFFFFFF must be
          absorbed; the server must not feed wire bytes straight
          into socket_list_t::get_client and trip vector_tpl
          bounds.

      - test_service.test_get_client_list_huge_count_does_not_crash
        test_service.test_get_black_list_huge_count_does_not_crash
          Pre-auth NWC_SERVICE with a forged count=0xFFFFFFFF on
          SRVC_GET_CLIENT_LIST / SRVC_GET_BLACK_LIST must not drive
          socket_list_t::rdwr / address_list_t::rdwr through a
          billions-of-allocations loop before the admin-auth gate
          rejects.  (This one already passes on master after the
          admin-tool/server gate landed.)

Layout, one file per NWC_* command:

  wire.py              Server context manager, 6-byte
                       (size, version, id) framing helpers,
                       send/recv primitives, assert_heartbeat for
                       the "silently absorbs and keeps ticking"
                       assertion.
  test_auth_player.py  NWC_AUTH_PLAYER.
  test_tool.py         NWC_TOOL.
  test_service.py      NWC_SERVICE.

Plain unittest, no third-party deps.  Runs via

  python3 -m unittest discover -v -s tools/nwc_protocol_test -t .

from the repo root and is wired into .github/workflows/run-tests.yml
under the same ASAN/UBSAN config as the scenario suite, so a
sanitizer hit on any forged packet fails the job loudly.  Requires
a built simutrans binary, a pak64-compatible pakset under
simutrans/pak/, and tests/empty-16x16.sve.

Some tests are expected to fail on current master and stand as
concrete repros for the underlying issues; fixes can land
separately.
---
 .github/workflows/run-tests.yml             |  11 +
 tools/nwc_protocol_test/__init__.py         |  16 ++
 tools/nwc_protocol_test/test_auth_player.py |  78 +++++
 tools/nwc_protocol_test/test_service.py     |  50 ++++
 tools/nwc_protocol_test/test_tool.py        |  67 +++++
 tools/nwc_protocol_test/wire.py             | 298 ++++++++++++++++++++
 6 files changed, 520 insertions(+)
 create mode 100644 tools/nwc_protocol_test/__init__.py
 create mode 100644 tools/nwc_protocol_test/test_auth_player.py
 create mode 100644 tools/nwc_protocol_test/test_service.py
 create mode 100644 tools/nwc_protocol_test/test_tool.py
 create mode 100644 tools/nwc_protocol_test/wire.py

diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml
index b448db2d59..9f6e8fdcee 100644
--- a/.github/workflows/run-tests.yml
+++ b/.github/workflows/run-tests.yml
@@ -58,3 +58,14 @@ jobs:
         chmod +x run-automated-tests.sh
         ulimit -St 600 # 10 minutes ought to be enough for anybody.
         ./run-automated-tests.sh
+    - name: Run NWC protocol e2e suite
+      # Black-box wire-protocol tests: spawn a headless server, send
+      # one hand-rolled packet, parse the reply.  Pins NWC_AUTH_PLAYER
+      # / NWC_TOOL / NWC_SERVICE request and reply shapes and acts as
+      # a regression tripwire for the network security class.  Same
+      # ASAN/UBSAN config as the scenario suite — a sanitizer hit
+      # fails the job loudly.
+      run: |
+        export ASAN_OPTIONS="print_stacktrace=1 abort_on_error=1 detect_leaks=0"
+        export UBSAN_OPTIONS="print_stacktrace=1 abort_on_error=1"
+        python3 -m unittest discover -v -s tools/nwc_protocol_test -t .
diff --git a/tools/nwc_protocol_test/__init__.py b/tools/nwc_protocol_test/__init__.py
new file mode 100644
index 0000000000..2164a64519
--- /dev/null
+++ b/tools/nwc_protocol_test/__init__.py
@@ -0,0 +1,16 @@
+# Black-box e2e tests for the multiplayer wire protocol in
+# src/simutrans/network/network_cmd*.cc.  Each test spawns a headless
+# server, sends one hand-rolled packet, and asserts on the parsed
+# reply.  Two purposes: pin reply shapes (NWC_AUTH_PLAYER, NWC_STEP
+# heartbeat) so a refactor that drops or reorders a field breaks CI,
+# and trip on the recent network security fixes that manifest on the
+# wire as "server consumes the packet then crashes".
+#
+# Run (from repo root):
+#   python3 -m unittest discover -s tools/nwc_protocol_test -t .
+#   python3 -m unittest discover -s tools/nwc_protocol_test -t . -k null
+#   python3 -m unittest tools.nwc_protocol_test.test_auth_player
+#
+# Requires a built simutrans binary, a pak64-compatible pakset under
+# simutrans/pak/, and tests/empty-16x16.sve (the starter map leaves
+# player slots 2..14 unfilled, which the NULL-slot test relies on).
diff --git a/tools/nwc_protocol_test/test_auth_player.py b/tools/nwc_protocol_test/test_auth_player.py
new file mode 100644
index 0000000000..9fa07adeb5
--- /dev/null
+++ b/tools/nwc_protocol_test/test_auth_player.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python3
+# NWC_AUTH_PLAYER wire pin + tripwire for the NULL-player-slot crash
+# in nwc_auth_player_t::execute (src/simutrans/network/network_cmd_ingame.cc).
+# Pre-fix, a peer-supplied player_nr in [2, 14] dereferenced a NULL
+# player slot in the default starter map.  The fix returns the same
+# wrong-password reply shape, so a byte-exact reply pin doubles as
+# the regression check.
+
+import struct
+import unittest
+
+from . import wire
+
+
+# nwc_auth_player_t wire layout (NETWORK_VERSION=1):
+#   header  : u16 size | u16 version | u16 id          (6 bytes)
+#   base    : u32 our_client_id                        (4 bytes)
+#   payload : u8[20] hash | u8 player_nr | u16 player_unlocked  (23 bytes)
+# Total: 33 bytes both ways.
+PACKET_SIZE = 33
+
+
+def build(*, player_nr: int,
+          our_client_id: int = 0,
+          hash20: bytes = b"\x00" * 20,
+          player_unlocked: int = 0) -> bytes:
+    if len(hash20) != 20:
+        raise ValueError(f"hash20 must be 20 bytes, got {len(hash20)}")
+    body = struct.pack("<I", our_client_id)
+    body += hash20
+    body += struct.pack("<B", player_nr & 0xFF)
+    body += struct.pack("<H", player_unlocked & 0xFFFF)
+    return wire.pack_header(wire.NWC_AUTH_PLAYER, body)
+
+
+class AuthPlayerTest(wire.ServerTestCase, unittest.TestCase):
+
+    def check_reply(self, *, player_nr: int, expected_unlocked: int,
+                    hash20: bytes = b"\x00" * 20) -> None:
+        pkt = build(player_nr=player_nr, hash20=hash20)
+        reply = wire.send_and_recv_exact(self.srv, pkt, PACKET_SIZE)
+        if len(reply) != PACKET_SIZE:
+            self.fail(
+                f"reply length {len(reply)} != {PACKET_SIZE}; "
+                f"raw: {reply.hex()}; stderr tail:\n"
+                + self.srv.read_stderr_tail()
+            )
+        size, version, pkt_id = wire.unpack_header(reply)
+        self.assertEqual(size, PACKET_SIZE)
+        self.assertEqual(version, wire.NETWORK_VERSION)
+        self.assertEqual(pkt_id, wire.NWC_AUTH_PLAYER)
+        player_unlocked = struct.unpack("<H", reply[31:33])[0]
+        self.assertEqual(
+            player_unlocked, expected_unlocked,
+            f"player_unlocked={player_unlocked:#x}, "
+            f"expected {expected_unlocked:#x}",
+        )
+        self.assertTrue(
+            self.srv.alive(),
+            "reply was well-formed but server died after",
+        )
+
+    def test_null_slot(self):
+        """Targeting an unfilled slot must return the wrong-password
+        shape (player_unlocked=0), not crash inside
+        welt->get_player(...)->access_password_hash()."""
+        self.check_reply(player_nr=3, expected_unlocked=0)
+
+    def test_filled_slot_empty_password(self):
+        """Public service (slot 1) with an all-zero hash matches the
+        default empty password — bit 1 set in player_unlocked."""
+        self.check_reply(player_nr=1, expected_unlocked=1 << 1)
+
+    def test_filled_slot_wrong_password(self):
+        """Wrong password on a filled slot returns player_unlocked=0
+        — the silent-fail shape the NULL-slot path now mirrors."""
+        self.check_reply(player_nr=1, expected_unlocked=0,
+                         hash20=b"\xff" * 20)
diff --git a/tools/nwc_protocol_test/test_service.py b/tools/nwc_protocol_test/test_service.py
new file mode 100644
index 0000000000..84e2ba6677
--- /dev/null
+++ b/tools/nwc_protocol_test/test_service.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python3
+# NWC_SERVICE wire pin + tripwire for the pre-auth list-shaped OOM
+# in nwc_service_t::rdwr (src/simutrans/network/network_cmd.cc).
+# Pre-fix the server-side rdwr was wire-symmetric, so a forged
+# SRVC_GET_CLIENT_LIST with count=0xFFFFFFFF drove socket_list_t::rdwr
+# through ~4 billion allocations until std::bad_alloc.  The
+# admin-auth gate in execute() only ran after rdwr returned.  The
+# fix makes server-side reads of the list-shaped flags no-ops; only
+# the saving (response) leg touches socket_list_t / address_list_t.
+
+import struct
+import unittest
+
+from . import wire
+
+
+SRVC_LOGIN_ADMIN     = 0
+SRVC_ANNOUNCE_SERVER = 1
+SRVC_GET_CLIENT_LIST = 2
+SRVC_GET_BLACK_LIST  = 5
+
+
+# We always append the wire-controlled count so the packet shape
+# exercises the pre-fix vulnerable read path regardless of flag.
+def build(*, flag: int, number: int = 0, count: int = 0,
+          our_client_id: int = 0) -> bytes:
+    body = struct.pack("<I", our_client_id)
+    body += struct.pack("<II", flag, number)
+    body += struct.pack("<I", count)
+    return wire.pack_header(wire.NWC_SERVICE, body)
+
+
+class ServiceTest(wire.ServerTestCase, unittest.TestCase):
+
+    def test_get_client_list_huge_count_does_not_crash(self):
+        """count=0xFFFFFFFF on SRVC_GET_CLIENT_LIST must not trigger
+        the 2^32-allocation loop in socket_list_t::rdwr."""
+        wire.assert_heartbeat(self.srv, build(flag=SRVC_GET_CLIENT_LIST,
+                                              count=0xFFFFFFFF))
+
+    def test_get_black_list_huge_count_does_not_crash(self):
+        """Sibling path through address_list_t::rdwr."""
+        wire.assert_heartbeat(self.srv, build(flag=SRVC_GET_BLACK_LIST,
+                                              count=0xFFFFFFFF))
+
+    def test_unknown_flag_does_not_crash(self):
+        """Unknown flag must be absorbed silently — guards against a
+        regression that swaps "no-op on loading" for "fatal on
+        loading"."""
+        wire.assert_heartbeat(self.srv, build(flag=0xDEAD, count=0xFFFFFFFF))
diff --git a/tools/nwc_protocol_test/test_tool.py b/tools/nwc_protocol_test/test_tool.py
new file mode 100644
index 0000000000..6be866f687
--- /dev/null
+++ b/tools/nwc_protocol_test/test_tool.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python3
+# NWC_TOOL wire pin + tripwire for the forged-our_client_id crash in
+# nwc_tool_t::clone (src/simutrans/network/network_cmd_ingame.cc).
+# Pre-fix the wire-supplied our_client_id was fed straight into
+# socket_list_t::get_client; 0xFFFFFFFF tripped vector_tpl bounds,
+# any valid-other-slot id let the sender impersonate that client.
+# The fix in network_command_t::read_from_packet now overrides
+# our_client_id with the socket-derived id on the server side, so
+# the wire value is informational only.
+
+import struct
+import unittest
+
+from . import wire
+
+
+# nwc_tool_t wire layout (NETWORK_VERSION=1), built per
+# network_cmd_ingame.cc:nwc_tool_t::rdwr.
+def build(*,
+          our_client_id: int,
+          tool_id: int = 0x1000,
+          player_nr: int = 1,
+          pos_x: int = 0,
+          pos_y: int = 0,
+          pos_z: int = 0,
+          wt: int = -1,
+          default_param: bytes = b"",
+          init: bool = True,
+          tool_client_id: int | None = None,
+          flags: int = 0,
+          callback_id: int = 0,
+          sync_step: int = 0,
+          map_counter: int = 0,
+          exec_flag: bool = True,
+          last_sync_step: int = 0,
+          custom_data: bytes = b"") -> bytes:
+    if tool_client_id is None:
+        tool_client_id = our_client_id
+    body = struct.pack("<I", our_client_id)                  # network_command_t
+    body += struct.pack("<II", sync_step, map_counter)       # network_world_command_t
+    body += struct.pack("<B", 1 if exec_flag else 0)         # network_broadcast_world_command_t
+    body += struct.pack("<I", last_sync_step)                # nwc_tool_t
+    body += struct.pack("<IIHHH", 0, 0, 0, 0, 0)             # checklist_t
+    body += struct.pack("<B", player_nr & 0xFF)
+    body += struct.pack("<hhb", pos_x, pos_y, pos_z)
+    body += struct.pack("<Hh", tool_id, wt)
+    body += struct.pack("<H", len(default_param)) + default_param
+    body += struct.pack("<B", 1 if init else 0)
+    body += struct.pack("<I", tool_client_id)
+    body += struct.pack("<B", flags & 0xFF)
+    body += struct.pack("<I", callback_id)
+    body += custom_data
+    return wire.pack_header(wire.NWC_TOOL, body)
+
+
+class ToolTest(wire.ServerTestCase, unittest.TestCase):
+
+    def test_forged_client_id_does_not_crash(self):
+        """Pre-auth NWC_TOOL with our_client_id=0xFFFFFFFF: server
+        must absorb the packet and keep ticking, not trip vector_tpl
+        bounds in socket_list_t::get_client."""
+        wire.assert_heartbeat(self.srv, build(our_client_id=0xFFFFFFFF))
+
+    def test_zero_client_id_does_not_crash(self):
+        """Negative control: our_client_id=0 is the legitimate "no
+        client" sentinel many clients send before NWC_READY."""
+        wire.assert_heartbeat(self.srv, build(our_client_id=0))
diff --git a/tools/nwc_protocol_test/wire.py b/tools/nwc_protocol_test/wire.py
new file mode 100644
index 0000000000..04544a2cd6
--- /dev/null
+++ b/tools/nwc_protocol_test/wire.py
@@ -0,0 +1,298 @@
+#!/usr/bin/env python3
+# Shared rig for the black-box tests in this package.  Server context
+# manager, 6-byte (size, version, id) framing helpers, and the
+# assert_heartbeat assertion used by the silent-absorb regression
+# tests.  Per-command wire layouts live in test_<command>.py.
+
+import fcntl
+import os
+import shutil
+import signal
+import socket
+import struct
+import subprocess
+import sys
+import tempfile
+import time
+from pathlib import Path
+
+
+ROOT = Path(__file__).resolve().parents[2]
+PAK_DIR = ROOT / "simutrans" / "pak"
+BASE_DIR = ROOT / "simutrans"
+EMPTY_SAVE = ROOT / "tests" / "empty-16x16.sve"
+
+NETWORK_VERSION = 1
+HEADER_SIZE = 6
+
+# Mirrors network_command_id in src/simutrans/network/network_cmd.h.
+NWC_GAMEINFO     = 1
+NWC_JOIN         = 4
+NWC_SYNC         = 5
+NWC_GAME         = 6
+NWC_READY        = 7
+NWC_TOOL         = 8
+NWC_CHECK        = 9
+NWC_SERVICE      = 11
+NWC_AUTH_PLAYER  = 12
+NWC_CHG_PLAYER   = 13
+NWC_STEP         = 16
+
+_PACKET_NAMES = {
+    NWC_GAMEINFO: "NWC_GAMEINFO",
+    NWC_JOIN: "NWC_JOIN",
+    NWC_SYNC: "NWC_SYNC",
+    NWC_GAME: "NWC_GAME",
+    NWC_READY: "NWC_READY",
+    NWC_TOOL: "NWC_TOOL",
+    NWC_CHECK: "NWC_CHECK",
+    NWC_SERVICE: "NWC_SERVICE",
+    NWC_AUTH_PLAYER: "NWC_AUTH_PLAYER",
+    NWC_CHG_PLAYER: "NWC_CHG_PLAYER",
+    NWC_STEP: "NWC_STEP",
+}
+
+
+def pkt_name(pkt_id: int) -> str:
+    return f"{_PACKET_NAMES.get(pkt_id, '?')}({pkt_id})"
+
+
+def find_simutrans() -> Path:
+    for candidate in [
+        ROOT / "build-headless" / "simutrans" / "simutrans",
+        ROOT / "build" / "simutrans" / "simutrans",
+        ROOT / "sim",
+    ]:
+        if candidate.is_file() and os.access(candidate, os.X_OK):
+            return candidate
+    sys.exit("no simutrans binary found; build with cmake or autoconf first")
+
+
+def require_pakset():
+    if not PAK_DIR.is_dir() or not any(PAK_DIR.iterdir()):
+        sys.exit(f"no pakset at {PAK_DIR}; run "
+                 "`cd simutrans && ../tools/get_pak.sh pak64` once")
+    if not EMPTY_SAVE.is_file():
+        sys.exit(f"missing starter savegame {EMPTY_SAVE}")
+
+
+def _free_port() -> int:
+    # Brief TOCTOU between close and simutrans binding; a collision
+    # surfaces as the same "did not start listening" error as a real
+    # launch failure, which is acceptable.
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+        s.bind(("127.0.0.1", 0))
+        return s.getsockname()[1]
+
+
+class Server:
+    """Context manager spawning a headless server on a free port.
+    Each instance gets its own temp userdir so concurrent runs don't
+    collide with each other or with a developer's ~/simutrans tree."""
+
+    def __init__(self, port: int | None = None):
+        self.port = port if port is not None else _free_port()
+        self.proc: subprocess.Popen | None = None
+        self.userdir: Path | None = None
+
+    def __enter__(self) -> "Server":
+        self.userdir = Path(tempfile.mkdtemp(prefix="nwc-e2e-"))
+        save_dir = self.userdir / "save"
+        save_dir.mkdir(parents=True)
+        shutil.copyfile(EMPTY_SAVE, save_dir / "empty.sve")
+
+        sim = find_simutrans()
+        require_pakset()
+        env = os.environ.copy()
+        env.setdefault("SDL_VIDEODRIVER", "dummy")
+        args = [
+            str(sim),
+            "-set_basedir", str(BASE_DIR),
+            "-set_userdir", str(self.userdir),
+            "-objects", "pak",
+            "-nomidi", "-nosound", "-mute",
+            "-server", str(self.port),
+            "-load", "empty",
+            "-debug", "2",
+        ]
+        self.proc = subprocess.Popen(
+            args, env=env,
+            stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
+        )
+        # Non-blocking so read_stderr_tail won't wait for a full read
+        # or process exit when called from an assertion message.
+        fd = self.proc.stderr.fileno()
+        fcntl.fcntl(fd, fcntl.F_SETFL,
+                    fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+        if not self._wait_listening(timeout=20.0):
+            stderr = self.read_stderr_tail(limit=8192)
+            self.__exit__(None, None, None)
+            raise RuntimeError(
+                f"server did not start listening on {self.port} within "
+                f"20s; stderr tail:\n{stderr}"
+            )
+        return self
+
+    def __exit__(self, *exc):
+        if self.proc is not None:
+            if self.proc.poll() is None:
+                self.proc.send_signal(signal.SIGTERM)
+                try:
+                    self.proc.wait(timeout=5)
+                except subprocess.TimeoutExpired:
+                    self.proc.kill()
+                    self.proc.wait()
+            if self.proc.stderr is not None:
+                self.proc.stderr.close()
+        if self.userdir and self.userdir.exists():
+            shutil.rmtree(self.userdir, ignore_errors=True)
+
+    def _wait_listening(self, timeout: float) -> bool:
+        deadline = time.monotonic() + timeout
+        while time.monotonic() < deadline:
+            if self.proc and self.proc.poll() is not None:
+                return False
+            try:
+                with socket.create_connection(("127.0.0.1", self.port),
+                                              timeout=0.5):
+                    return True
+            except OSError:
+                time.sleep(0.2)
+        return False
+
+    def alive(self) -> bool:
+        """Process running AND port accepting new connections."""
+        if self.proc is None or self.proc.poll() is not None:
+            return False
+        try:
+            with socket.create_connection(("127.0.0.1", self.port),
+                                          timeout=1.0):
+                return True
+        except OSError:
+            return False
+
+    def read_stderr_tail(self, limit: int = 4096) -> str:
+        if not self.proc or not self.proc.stderr:
+            return ""
+        try:
+            chunk = os.read(self.proc.stderr.fileno(), limit)
+        except BlockingIOError:
+            return ""
+        return chunk.decode(errors="replace")[-2000:]
+
+
+def pack_header(packet_id: int, body: bytes) -> bytes:
+    # `size` is the total packet length including header — matches
+    # network_command_t::rdwr on the wire.
+    size = HEADER_SIZE + len(body)
+    return struct.pack("<HHH", size, NETWORK_VERSION, packet_id) + body
+
+
+def unpack_header(data: bytes) -> tuple[int, int, int]:
+    if len(data) < HEADER_SIZE:
+        raise AssertionError(
+            f"need {HEADER_SIZE}B for header, got {len(data)}: {data.hex()}"
+        )
+    return struct.unpack("<HHH", data[:HEADER_SIZE])
+
+
+def recv_exact(sock: socket.socket, n: int, timeout: float) -> bytes:
+    sock.settimeout(timeout)
+    buf = b""
+    while len(buf) < n:
+        try:
+            chunk = sock.recv(n - len(buf))
+        except (socket.timeout, OSError):
+            break
+        if not chunk:
+            break
+        buf += chunk
+    return buf
+
+
+def read_packets(sock: socket.socket, timeout: float,
+                 max_packets: int = 8) -> list[tuple[int, bytes]]:
+    """Drain whole packets from `sock` as `(packet_id, body)` tuples.
+    Stops on close, timeout, or after `max_packets`."""
+    sock.settimeout(timeout)
+    out: list[tuple[int, bytes]] = []
+    buf = b""
+    deadline = time.monotonic() + timeout
+    while len(out) < max_packets and time.monotonic() < deadline:
+        try:
+            chunk = sock.recv(4096)
+        except (socket.timeout, OSError):
+            break
+        if not chunk:
+            break
+        buf += chunk
+        while True:
+            if len(buf) < HEADER_SIZE:
+                break
+            size, version, pkt_id = struct.unpack("<HHH", buf[:HEADER_SIZE])
+            if size < HEADER_SIZE:
+                raise AssertionError(
+                    f"impossible packet size={size}: {buf[:HEADER_SIZE].hex()}"
+                )
+            if len(buf) < size:
+                break
+            if version != NETWORK_VERSION:
+                raise AssertionError(
+                    f"version={version} (expected {NETWORK_VERSION}); "
+                    f"raw: {buf[:size].hex()}"
+                )
+            out.append((pkt_id, buf[HEADER_SIZE:size]))
+            buf = buf[size:]
+            if len(out) >= max_packets:
+                break
+    return out
+
+
+def send_and_recv_exact(srv: "Server", pkt: bytes, n: int,
+                        recv_timeout: float = 3.0,
+                        connect_timeout: float = 2.0) -> bytes:
+    with socket.create_connection(("127.0.0.1", srv.port),
+                                  timeout=connect_timeout) as s:
+        s.sendall(pkt)
+        return recv_exact(s, n, timeout=recv_timeout)
+
+
+def assert_heartbeat(srv: "Server", pkt: bytes,
+                     drain_seconds: float = 3.0,
+                     min_steps: int = 1) -> None:
+    """Send `pkt`, drain the reply, require ≥`min_steps` NWC_STEP
+    heartbeats and a still-alive server.  The "silently absorbs a
+    malicious packet and keeps ticking" assertion."""
+    try:
+        with socket.create_connection(("127.0.0.1", srv.port),
+                                      timeout=2.0) as s:
+            s.sendall(pkt)
+            packets = read_packets(s, timeout=drain_seconds, max_packets=16)
+    except OSError as e:
+        raise AssertionError(
+            f"send failed: {e}; stderr tail:\n{srv.read_stderr_tail()}"
+        )
+
+    steps = [p for p in packets if p[0] == NWC_STEP]
+    if len(steps) < min_steps:
+        names = [pkt_name(pid) for pid, _ in packets]
+        raise AssertionError(
+            f"saw {len(steps)} NWC_STEP heartbeat(s), wanted "
+            f">= {min_steps}; received = {names}; "
+            f"stderr tail:\n{srv.read_stderr_tail()}"
+        )
+    if not srv.alive():
+        raise AssertionError(
+            "heartbeat seen but server failed to accept a new "
+            "connection after; stderr tail:\n" + srv.read_stderr_tail()
+        )
+
+
+class ServerTestCase:
+    """Mixin: fresh `Server` per test method on `self.srv`.  Per-test
+    isolation is deliberate — a packet that crashes the server must
+    not poison the next test."""
+
+    def setUp(self):
+        super().setUp()
+        self.srv = self.enterContext(Server())
