Kaynak Kodlar
Yapay Zeka
Kaynak Kodları
from typing import AsyncIterator, Callable import pytest from aiohttp import web from yarl import URL from neuromation.api import Action, Client, Permission, ResourceNotFound from tests import _TestServerFactory _MakeClient = Callable[..., Client] @pytest.fixture() async def mocked_share_client( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> AsyncIterator[Client]: async def handler(request: web.Request) -> web.Response: data = await request.json() assert data[0]["action"] in [item.value for item in Action] raise web.HTTPCreated() app = web.Application() app.router.add_post("/users/bill/permissions", handler) srv = await aiohttp_server(app) client = make_client(srv.make_url("/")) yield client await client.close() @pytest.fixture() async def mocked_revoke_client( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> AsyncIterator[Client]: async def handler(request: web.Request) -> web.Response: assert "uri" in request.query raise web.HTTPNoContent() app = web.Application() app.router.add_delete("/users/bill/permissions", handler) srv = await aiohttp_server(app) client = make_client(srv.make_url("/")) yield client await client.close() class TestUsersShare: async def test_share_unknown_user(self, mocked_share_client: Client) -> None: with pytest.raises(ResourceNotFound): await mocked_share_client.users.share( user="not-exists", permission=Permission(URL("storage://bob/resource"), Action.READ), ) async def test_correct_share(self, mocked_share_client: Client) -> None: ret = await mocked_share_client.users.share( user="bill", permission=Permission(URL("storage://bob/resource"), Action.READ), ) assert ret is None # at this moment no result async def test_revoke_unknown_user(self, mocked_revoke_client: Client) -> None: with pytest.raises(ResourceNotFound): await mocked_revoke_client.users.revoke( user="not-exists", uri=URL("storage://bob/resource") ) async def test_correct_revoke(self, mocked_revoke_client: Client) -> None: ret = await mocked_revoke_client.users.revoke( user="bill", uri=URL("storage://bob/resource") ) assert ret is None # at this moment no result
import asyncio import errno import json import os from filecmp import dircmp from pathlib import Path from shutil import copytree from typing import Any, AsyncIterator, Callable, List, Tuple from unittest import mock import pytest from aiohttp import web from yarl import URL import neuromation.api.storage from neuromation.api import ( Action, Client, FileStatus, FileStatusType, IllegalArgumentError, StorageProgressComplete, StorageProgressStart, StorageProgressStep, ) from tests import _RawTestServerFactory, _TestServerFactory _MakeClient = Callable[..., Client] FOLDER = Path(__file__).parent DATA_FOLDER = FOLDER / "data" def calc_diff(dcmp: "dircmp[str]", *, pre: str = "") -> List[Tuple[str, str]]: ret = [] for name in dcmp.diff_files: ret.append((pre + name, pre + name)) for name in dcmp.left_only: ret.append((pre + name, "")) for name in dcmp.right_only: ret.append(("", pre + name)) for name, sub_dcmp in dcmp.subdirs.items(): ret.extend(calc_diff(sub_dcmp, pre=name + "/")) return ret @pytest.fixture def storage_path(tmp_path: Path) -> Path: ret = tmp_path / "storage" ret.mkdir() return ret @pytest.fixture async def storage_server( aiohttp_raw_server: _RawTestServerFactory, storage_path: Path ) -> Any: PREFIX = "/storage/user" PREFIX_LEN = len(PREFIX) async def handler(request: web.Request) -> web.Response: assert "b3" in request.headers op = request.query["op"] path = request.path assert path.startswith(PREFIX) path = path[PREFIX_LEN:] if path.startswith("/"): path = path[1:] local_path = storage_path / path if op == "CREATE": content = await request.read() local_path.write_bytes(content) return web.Response(status=201) elif op == "OPEN": return web.Response(body=local_path.read_bytes()) elif op == "GETFILESTATUS": if not local_path.exists(): raise web.HTTPNotFound() stat = local_path.stat() return web.json_response( { "FileStatus": { "path": local_path.name, "type": "FILE" if local_path.is_file() else "DIRECTORY", "length": stat.st_size, "modificationTime": stat.st_mtime, "permission": "write", } } ) elif op == "MKDIRS": try: local_path.mkdir(parents=True, exist_ok=True) except FileExistsError: raise web.HTTPBadRequest( text=json.dumps({"error": "File exists", "errno": "EEXIST"}), content_type="application/json", ) return web.Response(status=201) elif op == "LISTSTATUS": if not local_path.exists(): raise web.HTTPNotFound() ret = [] for child in local_path.iterdir(): stat = child.stat() ret.append( { "path": child.name, "type": "FILE" if child.is_file() else "DIRECTORY", "length": stat.st_size, "modificationTime": stat.st_mtime, "permission": "write", } ) return web.json_response({"FileStatuses": {"FileStatus": ret}}) else: raise web.HTTPInternalServerError(text=f"Unsupported operation {op}") return await aiohttp_raw_server(handler) async def test_storage_ls( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "FileStatuses": { "FileStatus": [ { "path": "foo", "length": 1024, "type": "FILE", "modificationTime": 0, "permission": "read", }, { "path": "bar", "length": 4 * 1024, "type": "DIRECTORY", "modificationTime": 0, "permission": "read", }, ] } } async def handler(request: web.Request) -> web.Response: assert "b3" in request.headers assert request.path == "/storage/user/folder" assert request.query == {"op": "LISTSTATUS"} return web.json_response(JSON) app = web.Application() app.router.add_get("/storage/user/folder", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: ret = await client.storage.ls(URL("storage://~/folder")) assert ret == [ FileStatus( path="foo", size=1024, type=FileStatusType.FILE, modification_time=0, permission=Action.READ, ), FileStatus( path="bar", size=4 * 1024, type=FileStatusType.DIRECTORY, modification_time=0, permission=Action.READ, ), ] async def test_storage_glob( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler_home(request: web.Request) -> web.Response: assert "b3" in request.headers assert request.path == "/storage/user/" assert request.query == {"op": "LISTSTATUS"} return web.json_response( { "FileStatuses": { "FileStatus": [ { "path": "folder", "length": 0, "type": "DIRECTORY", "modificationTime": 0, "permission": "read", } ] } } ) async def handler_folder(request: web.Request) -> web.Response: assert "b3" in request.headers assert request.path.rstrip("/") == "/storage/user/folder" assert request.query["op"] in ("GETFILESTATUS", "LISTSTATUS") if request.query["op"] == "GETFILESTATUS": return web.json_response( { "FileStatus": { "path": "/user/folder", "type": "DIRECTORY", "length": 0, "modificationTime": 0, "permission": "read", } } ) elif request.query["op"] == "LISTSTATUS": return web.json_response( { "FileStatuses": { "FileStatus": [ { "path": "foo", "length": 1024, "type": "FILE", "modificationTime": 0, "permission": "read", }, { "path": "bar", "length": 0, "type": "DIRECTORY", "modificationTime": 0, "permission": "read", }, ] } } ) else: raise web.HTTPInternalServerError async def handler_foo(request: web.Request) -> web.Response: assert "b3" in request.headers assert request.path == "/storage/user/folder/foo" assert request.query["op"] in ("GETFILESTATUS", "LISTSTATUS") assert request.query == {"op": "GETFILESTATUS"} return web.json_response( { "FileStatus": { "path": "/user/folder/foo", "length": 1024, "type": "FILE", "modificationTime": 0, "permission": "read", } } ) async def handler_bar(request: web.Request) -> web.Response: assert request.path.rstrip("/") == "/storage/user/folder/bar" if request.query["op"] == "GETFILESTATUS": return web.json_response( { "FileStatus": { "path": "/user/folder/bar", "length": 0, "type": "DIRECTORY", "modificationTime": 0, "permission": "read", } } ) elif request.query["op"] == "LISTSTATUS": return web.json_response( { "FileStatuses": { "FileStatus": [ { "path": "baz", "length": 0, "type": "FILE", "modificationTime": 0, "permission": "read", } ] } } ) else: raise web.HTTPInternalServerError app = web.Application() app.router.add_get("/storage/user", handler_home) app.router.add_get("/storage/user/", handler_home) app.router.add_get("/storage/user/folder", handler_folder) app.router.add_get("/storage/user/folder/", handler_folder) app.router.add_get("/storage/user/folder/foo", handler_foo) app.router.add_get("/storage/user/folder/foo/", handler_foo) app.router.add_get("/storage/user/folder/bar", handler_bar) app.router.add_get("/storage/user/folder/bar/", handler_bar) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: async def glob(pattern: str) -> List[URL]: return [uri async for uri in client.storage.glob(URL(pattern))] assert await glob("storage:folder") == [URL("storage:folder")] assert await glob("storage:folder/") == [URL("storage:folder/")] assert await glob("storage:folder/*") == [ URL("storage:folder/foo"), URL("storage:folder/bar"), ] assert await glob("storage:folder/foo") == [URL("storage:folder/foo")] assert await glob("storage:folder/[a-d]*") == [URL("storage:folder/bar")] assert await glob("storage:folder/*/") == [URL("storage:folder/bar/")] assert await glob("storage:*") == [URL("storage:folder")] assert await glob("storage:**") == [ URL("storage:"), URL("storage:folder"), URL("storage:folder/foo"), URL("storage:folder/bar"), URL("storage:folder/bar/baz"), ] assert await glob("storage:*/foo") == [URL("storage:folder/foo")] assert await glob("storage:*/f*") == [URL("storage:folder/foo")] assert await glob("storage:**/foo") == [URL("storage:folder/foo")] assert await glob("storage:**/f*") == [ URL("storage:folder"), URL("storage:folder/foo"), ] assert await glob("storage:**/f*/") == [URL("storage:folder/")] assert await glob("storage:**/b*") == [ URL("storage:folder/bar"), URL("storage:folder/bar/baz"), ] assert await glob("storage:**/b*/") == [URL("storage:folder/bar/")] async def test_storage_rm_file( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def get_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/file" assert request.query == {"op": "GETFILESTATUS"} return web.json_response( { "FileStatus": { "path": "/user/file", "type": "FILE", "length": 1234, "modificationTime": 3456, "permission": "read", } } ) async def delete_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/file" assert request.query == {"op": "DELETE"} return web.Response(status=204) app = web.Application() app.router.add_get("/storage/user/file", get_handler) app.router.add_delete("/storage/user/file", delete_handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: await client.storage.rm(URL("storage://~/file")) async def test_storage_rm_directory( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def get_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder" assert request.query == {"op": "GETFILESTATUS"} return web.json_response( { "FileStatus": { "path": "/user/folder", "type": "DIRECTORY", "length": 1234, "modificationTime": 3456, "permission": "read", } } ) async def delete_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder" assert request.query == {"op": "DELETE"} return web.Response(status=204) app = web.Application() app.router.add_get("/storage/user/folder", get_handler) app.router.add_delete("/storage/user/folder", delete_handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: with pytest.raises(IsADirectoryError, match="Is a directory") as cm: await client.storage.rm(URL("storage://~/folder")) assert cm.value.errno == errno.EISDIR async def test_storage_rm_recursive( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def delete_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder" assert request.query == {"op": "DELETE"} return web.Response(status=204) app = web.Application() app.router.add_delete("/storage/user/folder", delete_handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: await client.storage.rm(URL("storage://~/folder"), recursive=True) async def test_storage_mv( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder" assert request.query == {"op": "RENAME", "destination": "/user/other"} return web.Response(status=204) app = web.Application() app.router.add_post("/storage/user/folder", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: await client.storage.mv(URL("storage://~/folder"), URL("storage://~/other")) async def test_storage_mkdir_parents_exist_ok( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder/sub" assert request.query == {"op": "MKDIRS"} return web.Response(status=204) app = web.Application() app.router.add_put("/storage/user/folder/sub", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: await client.storage.mkdir( URL("storage://~/folder/sub"), parents=True, exist_ok=True ) async def test_storage_mkdir_parents( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def get_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder/sub" assert request.query == {"op": "GETFILESTATUS"} return web.Response(status=404) async def put_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder/sub" assert request.query == {"op": "MKDIRS"} return web.Response(status=204) app = web.Application() app.router.add_get("/storage/user/folder/sub", get_handler) app.router.add_put("/storage/user/folder/sub", put_handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: await client.storage.mkdir(URL("storage://~/folder/sub"), parents=True) async def test_storage_mkdir_exist_ok( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def get_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder" assert request.query == {"op": "GETFILESTATUS"} return web.json_response( { "FileStatus": { "path": "/user/folder", "type": "DIRECTORY", "length": 1234, "modificationTime": 3456, "permission": "read", } } ) async def put_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder/sub" assert request.query == {"op": "MKDIRS"} return web.Response(status=204) app = web.Application() app.router.add_get("/storage/user/folder", get_handler) app.router.add_put("/storage/user/folder/sub", put_handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: await client.storage.mkdir(URL("storage://~/folder/sub"), exist_ok=True) async def test_storage_mkdir( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def get_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder/sub" assert request.query == {"op": "GETFILESTATUS"} return web.Response(status=404) async def parent_get_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder" assert request.query == {"op": "GETFILESTATUS"} return web.json_response( { "FileStatus": { "path": "/user/folder", "type": "DIRECTORY", "length": 1234, "modificationTime": 3456, "permission": "read", } } ) async def put_handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder/sub" assert request.query == {"op": "MKDIRS"} return web.Response(status=204) app = web.Application() app.router.add_get("/storage/user/folder/sub", get_handler) app.router.add_get("/storage/user/folder", parent_get_handler) app.router.add_put("/storage/user/folder/sub", put_handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: await client.storage.mkdir(URL("storage://~/folder/sub")) async def test_storage_create( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/file" assert request.query == {"op": "CREATE"} content = await request.read() assert content == b"01234" return web.Response(status=201) app = web.Application() app.router.add_put("/storage/user/file", handler) srv = await aiohttp_server(app) async def gen() -> AsyncIterator[bytes]: for i in range(5): yield str(i).encode("ascii") async with make_client(srv.make_url("/")) as client: await client.storage.create(URL("storage://~/file"), gen()) async def test_storage_stats( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder" assert request.query == {"op": "GETFILESTATUS"} return web.json_response( { "FileStatus": { "path": "/user/folder", "type": "DIRECTORY", "length": 1234, "modificationTime": 3456, "permission": "read", } } ) app = web.Application() app.router.add_get("/storage/user/folder", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: stats = await client.storage.stat(URL("storage://~/folder")) assert stats == FileStatus( path="/user/folder", type=FileStatusType.DIRECTORY, size=1234, modification_time=3456, permission=Action.READ, ) async def test_storage_open( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.StreamResponse: assert request.path == "/storage/user/file" if request.query["op"] == "OPEN": resp = web.StreamResponse() await resp.prepare(request) for i in range(5): await resp.write(str(i).encode("ascii")) return resp elif request.query["op"] == "GETFILESTATUS": return web.json_response( { "FileStatus": { "path": "/user/file", "type": "FILE", "length": 5, "modificationTime": 3456, "permission": "read", } } ) else: raise AssertionError(f"Unknown operation {request.query['op']}") app = web.Application() app.router.add_get("/storage/user/file", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: buf = bytearray() async for chunk in client.storage.open(URL("storage://~/file")): buf.extend(chunk) assert buf == b"01234" async def test_storage_open_directory( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.Response: assert request.path == "/storage/user/folder" assert request.query == {"op": "GETFILESTATUS"} return web.json_response( { "FileStatus": { "path": "/user/folder", "type": "DIRECTORY", "length": 5, "modificationTime": 3456, "permission": "read", } } ) app = web.Application() app.router.add_get("/storage/user/folder", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: buf = bytearray() with pytest.raises((IsADirectoryError, IllegalArgumentError)): async for chunk in client.storage.open(URL("storage://~/folder")): buf.extend(chunk) assert not buf # test normalizers # high level API async def test_storage_upload_file_does_not_exists(make_client: _MakeClient) -> None: async with make_client("https://example.com") as client: with pytest.raises(FileNotFoundError): await client.storage.upload_file( URL("file:///not-exists-file"), URL("storage://host/path/to/file.txt") ) async def test_storage_upload_dir_doesnt_exist(make_client: _MakeClient) -> None: async with make_client("https://example.com") as client: with pytest.raises(IsADirectoryError): await client.storage.upload_file( URL(FOLDER.as_uri()), URL("storage://host/path/to") ) async def test_storage_upload_not_a_file( storage_server: Any, make_client: _MakeClient, storage_path: Path ) -> None: file_path = Path(os.devnull).absolute() target_path = storage_path / "file.txt" progress = mock.Mock() async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_file( URL(file_path.as_uri()), URL("storage:file.txt"), progress=progress ) uploaded = target_path.read_bytes() assert uploaded == b"" src = URL(file_path.as_uri()) dst = URL("storage://user/file.txt") progress.start.assert_called_with(StorageProgressStart(src, dst, 0)) progress.step.assert_not_called() progress.complete.assert_called_with(StorageProgressComplete(src, dst, 0)) async def test_storage_upload_regular_file_to_existing_file_target( storage_server: Any, make_client: _MakeClient, storage_path: Path ) -> None: file_path = DATA_FOLDER / "file.txt" file_size = file_path.stat().st_size target_path = storage_path / "file.txt" progress = mock.Mock() async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_file( URL(file_path.as_uri()), URL("storage:file.txt"), progress=progress ) expected = file_path.read_bytes() uploaded = target_path.read_bytes() assert uploaded == expected src = URL(file_path.as_uri()) dst = URL("storage://user/file.txt") progress.start.assert_called_with(StorageProgressStart(src, dst, file_size)) progress.step.assert_called_with( StorageProgressStep(src, dst, file_size, file_size) ) progress.complete.assert_called_with(StorageProgressComplete(src, dst, file_size)) async def test_storage_upload_regular_file_to_existing_dir( storage_server: Any, make_client: _MakeClient, storage_path: Path ) -> None: file_path = DATA_FOLDER / "file.txt" folder = storage_path / "folder" folder.mkdir() async with make_client(storage_server.make_url("/")) as client: with pytest.raises(IsADirectoryError): await client.storage.upload_file( URL(file_path.as_uri()), URL("storage:folder") ) async def test_storage_upload_regular_file_to_existing_file( storage_server: Any, make_client: _MakeClient, storage_path: Path ) -> None: file_path = DATA_FOLDER / "file.txt" folder = storage_path / "folder" folder.mkdir() target_path = folder / "file.txt" target_path.write_bytes(b"existing file") async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_file( URL(file_path.as_uri()), URL("storage:folder/file.txt") ) expected = file_path.read_bytes() uploaded = target_path.read_bytes() assert uploaded == expected async def test_storage_upload_regular_file_to_existing_dir_with_trailing_slash( storage_server: Any, make_client: _MakeClient, storage_path: Path ) -> None: file_path = DATA_FOLDER / "file.txt" folder = storage_path / "folder" folder.mkdir() async with make_client(storage_server.make_url("/")) as client: with pytest.raises(IsADirectoryError): await client.storage.upload_file( URL(file_path.as_uri()), URL("storage:folder/") ) async def test_storage_upload_regular_file_to_existing_non_dir( storage_server: Any, make_client: _MakeClient, storage_path: Path ) -> None: file_path = DATA_FOLDER / "file.txt" path = storage_path / "file" path.write_bytes(b"dummy") async with make_client(storage_server.make_url("/")) as client: with pytest.raises(NotADirectoryError): await client.storage.upload_file( URL(file_path.as_uri()), URL("storage:file/subfile.txt") ) async def test_storage_upload_regular_file_to_not_existing( storage_server: Any, make_client: _MakeClient ) -> None: file_path = DATA_FOLDER / "file.txt" async with make_client(storage_server.make_url("/")) as client: with pytest.raises(NotADirectoryError): await client.storage.upload_file( URL(file_path.as_uri()), URL("storage:absent-dir/absent-file.txt") ) async def test_storage_upload_recursive_src_doesnt_exist( make_client: _MakeClient, ) -> None: async with make_client("https://example.com") as client: with pytest.raises(FileNotFoundError): await client.storage.upload_dir( URL("file:does_not_exist"), URL("storage://host/path/to") ) async def test_storage_upload_recursive_src_is_a_file(make_client: _MakeClient) -> None: file_path = DATA_FOLDER / "file.txt" async with make_client("https://example.com") as client: with pytest.raises(NotADirectoryError): await client.storage.upload_dir( URL(file_path.as_uri()), URL("storage://host/path/to") ) async def test_storage_upload_recursive_target_is_a_file( storage_server: Any, make_client: _MakeClient, storage_path: Path ) -> None: target_file = storage_path / "file.txt" target_file.write_bytes(b"dummy") async with make_client(storage_server.make_url("/")) as client: with pytest.raises(NotADirectoryError): await client.storage.upload_dir( URL(DATA_FOLDER.as_uri()), URL("storage:file.txt") ) async def test_storage_upload_empty_dir( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path ) -> None: target_dir = storage_path / "folder" assert not target_dir.exists() src_dir = tmp_path / "empty" src_dir.mkdir() assert list(src_dir.iterdir()) == [] async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_dir(URL(src_dir.as_uri()), URL("storage:folder")) assert list(target_dir.iterdir()) == [] async def test_storage_upload_recursive_ok( storage_server: Any, make_client: _MakeClient, storage_path: Path ) -> None: target_dir = storage_path / "folder" target_dir.mkdir() async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_dir( URL(DATA_FOLDER.as_uri()) / "nested", URL("storage:folder") ) diff = dircmp(DATA_FOLDER / "nested", target_dir) # type: ignore assert not calc_diff(diff) # type: ignore async def test_storage_upload_recursive_slash_ending( storage_server: Any, make_client: _MakeClient, storage_path: Path ) -> None: target_dir = storage_path / "folder" target_dir.mkdir() async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_dir( URL(DATA_FOLDER.as_uri()) / "nested", URL("storage:folder/") ) diff = dircmp(DATA_FOLDER / "nested", target_dir) # type: ignore assert not calc_diff(diff) # type: ignore async def test_storage_download_regular_file_to_absent_file( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path ) -> None: src_file = DATA_FOLDER / "file.txt" storage_file = storage_path / "file.txt" storage_file.write_bytes(src_file.read_bytes()) local_dir = tmp_path / "local" local_dir.mkdir() local_file = local_dir / "file.txt" progress = mock.Mock() async with make_client(storage_server.make_url("/")) as client: await client.storage.download_file( URL("storage:file.txt"), URL(local_file.as_uri()), progress=progress ) expected = src_file.read_bytes() downloaded = local_file.read_bytes() assert downloaded == expected src = URL("storage://user/file.txt") dst = URL(local_file.as_uri()) file_size = src_file.stat().st_size progress.start.assert_called_with(StorageProgressStart(src, dst, file_size)) progress.step.assert_called_with( StorageProgressStep(src, dst, file_size, file_size) ) progress.complete.assert_called_with(StorageProgressComplete(src, dst, file_size)) async def test_storage_download_regular_file_to_existing_file( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path ) -> None: src_file = DATA_FOLDER / "file.txt" storage_file = storage_path / "file.txt" storage_file.write_bytes(src_file.read_bytes()) local_dir = tmp_path / "local" local_dir.mkdir() local_file = local_dir / "file.txt" local_file.write_bytes(b"Previous data") async with make_client(storage_server.make_url("/")) as client: await client.storage.download_file( URL("storage:file.txt"), URL(local_file.as_uri()) ) expected = src_file.read_bytes() downloaded = local_file.read_bytes() assert downloaded == expected async def test_storage_download_regular_file_to_dir( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path ) -> None: src_file = DATA_FOLDER / "file.txt" storage_file = storage_path / "file.txt" storage_file.write_bytes(src_file.read_bytes()) local_dir = tmp_path / "local" local_dir.mkdir() async with make_client(storage_server.make_url("/")) as client: with pytest.raises((IsADirectoryError, PermissionError)): await client.storage.download_file( URL("storage:file.txt"), URL(local_dir.as_uri()) ) async def test_storage_download_regular_file_to_dir_slash_ended( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path ) -> None: src_file = DATA_FOLDER / "file.txt" storage_file = storage_path / "file.txt" storage_file.write_bytes(src_file.read_bytes()) local_dir = tmp_path / "local" local_dir.mkdir() async with make_client(storage_server.make_url("/")) as client: with pytest.raises((IsADirectoryError, PermissionError)): await client.storage.download_file( URL("storage:file.txt"), URL(local_dir.as_uri() + "/") ) async def test_storage_download_regular_file_to_non_file( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path ) -> None: src_file = DATA_FOLDER / "file.txt" storage_file = storage_path / "file.txt" storage_file.write_bytes(src_file.read_bytes()) async with make_client(storage_server.make_url("/")) as client: await client.storage.download_file( URL("storage:file.txt"), URL(Path(os.devnull).absolute().as_uri()) ) async def test_storage_download_empty_dir( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path ) -> None: storage_dir = storage_path / "folder" storage_dir.mkdir() assert list(storage_dir.iterdir()) == [] target_dir = tmp_path / "empty" assert not target_dir.exists() async with make_client(storage_server.make_url("/")) as client: await client.storage.download_dir( URL("storage:folder"), URL(target_dir.as_uri()) ) assert list(target_dir.iterdir()) == [] async def test_storage_download_dir( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path ) -> None: storage_dir = storage_path / "folder" copytree(DATA_FOLDER / "nested", storage_dir) local_dir = tmp_path / "local" local_dir.mkdir() target_dir = local_dir / "nested" async with make_client(storage_server.make_url("/")) as client: await client.storage.download_dir( URL("storage:folder"), URL(target_dir.as_uri()) ) diff = dircmp(DATA_FOLDER / "nested", target_dir) # type: ignore assert not calc_diff(diff) # type: ignore async def test_storage_download_dir_slash_ending( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path ) -> None: storage_dir = storage_path / "folder" copytree(DATA_FOLDER / "nested", storage_dir / "nested") local_dir = tmp_path / "local" local_dir.mkdir() async with make_client(storage_server.make_url("/")) as client: await client.storage.download_dir( URL("storage:folder"), URL(local_dir.as_uri() + "/") ) diff = dircmp(DATA_FOLDER / "nested", local_dir / "nested") # type: ignore assert not calc_diff(diff) # type: ignore @pytest.fixture def zero_time_threshold(monkeypatch: Any) -> None: monkeypatch.setattr(neuromation.api.storage, "TIME_THRESHOLD", 0.0) async def test_storage_upload_file_update( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path, zero_time_threshold: None, ) -> None: storage_file = storage_path / "file.txt" local_file = tmp_path / "file.txt" local_file.write_bytes(b"old") async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_file( URL(local_file.as_uri()), URL("storage:file.txt"), update=True ) assert storage_file.read_bytes() == b"old" local_file.write_bytes(b"new") async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_file( URL(local_file.as_uri()), URL("storage:file.txt"), update=True ) assert storage_file.read_bytes() == b"new" await asyncio.sleep(5) storage_file.write_bytes(b"xxx") async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_file( URL(local_file.as_uri()), URL("storage:file.txt"), update=True ) assert storage_file.read_bytes() == b"xxx" async def test_storage_upload_dir_update( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path, zero_time_threshold: None, ) -> None: storage_file = storage_path / "folder" / "nested" / "file.txt" local_dir = tmp_path / "folder" local_file = local_dir / "nested" / "file.txt" local_file.parent.mkdir(parents=True) local_file.write_bytes(b"old") async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_dir( URL(local_dir.as_uri()), URL("storage:folder"), update=True ) assert storage_file.read_bytes() == b"old" local_file.write_bytes(b"new") async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_dir( URL(local_dir.as_uri()), URL("storage:folder"), update=True ) assert storage_file.read_bytes() == b"new" await asyncio.sleep(5) storage_file.write_bytes(b"xxx") async with make_client(storage_server.make_url("/")) as client: await client.storage.upload_dir( URL(local_dir.as_uri()), URL("storage:folder"), update=True ) assert storage_file.read_bytes() == b"xxx" async def test_storage_download_file_update( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path, zero_time_threshold: None, ) -> None: storage_file = storage_path / "file.txt" local_file = tmp_path / "file.txt" storage_file.write_bytes(b"old") async with make_client(storage_server.make_url("/")) as client: await client.storage.download_file( URL("storage:file.txt"), URL(local_file.as_uri()), update=True ) assert local_file.read_bytes() == b"old" storage_file.write_bytes(b"new") async with make_client(storage_server.make_url("/")) as client: await client.storage.download_file( URL("storage:file.txt"), URL(local_file.as_uri()), update=True ) assert local_file.read_bytes() == b"new" await asyncio.sleep(2) local_file.write_bytes(b"xxx") async with make_client(storage_server.make_url("/")) as client: await client.storage.download_file( URL("storage:file.txt"), URL(local_file.as_uri()), update=True ) assert local_file.read_bytes() == b"xxx" async def test_storage_download_dir_update( storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path, zero_time_threshold: None, ) -> None: storage_file = storage_path / "folder" / "nested" / "file.txt" local_dir = tmp_path / "folder" local_file = local_dir / "nested" / "file.txt" storage_file.parent.mkdir(parents=True) storage_file.write_bytes(b"old") async with make_client(storage_server.make_url("/")) as client: await client.storage.download_dir( URL("storage:folder"), URL(local_dir.as_uri()), update=True ) assert local_file.read_bytes() == b"old" storage_file.write_bytes(b"new") async with make_client(storage_server.make_url("/")) as client: await client.storage.download_dir( URL("storage:folder"), URL(local_dir.as_uri()), update=True ) assert local_file.read_bytes() == b"new" await asyncio.sleep(2) local_file.write_bytes(b"xxx") async with make_client(storage_server.make_url("/")) as client: await client.storage.download_dir( URL("storage:folder"), URL(local_dir.as_uri()), update=True ) assert local_file.read_bytes() == b"xxx"
from typing import Callable import pytest from yarl import URL from neuromation.api import Client, LocalImage, RemoteImage from neuromation.api.parsing_utils import _get_url_authority _MakeClient = Callable[..., Client] @pytest.mark.parametrize( "volume", ["storage:///", ":", "::::", "", "storage:///data/:/data/rest:wrong"] ) async def test_volume_from_str_fail(volume: str, make_client: _MakeClient) -> None: async with make_client("https://example.com") as client: with pytest.raises(ValueError): client.parse.volume(volume) async def test_parse_local(make_client: _MakeClient) -> None: async with make_client("https://api.localhost.localdomain") as client: result = client.parse.local_image("bananas:latest") assert result == LocalImage("bananas", "latest") async def test_parse_remote(make_client: _MakeClient) -> None: async with make_client("https://api.localhost.localdomain") as client: result = client.parse.remote_image("image://bob/bananas:latest") assert result == RemoteImage( "bananas", "latest", owner="bob", registry="registry-dev.neu.ro" ) async def test_parse_remote_registry_image(make_client: _MakeClient) -> None: async with make_client( "https://api.localhost.localdomain", registry_url="http://localhost:5000" ) as client: result = client.parse.remote_image("localhost:5000/bob/library/bananas:latest") assert result == RemoteImage( "library/bananas", "latest", owner="bob", registry="localhost:5000" ) async def test_parse_remote_public(make_client: _MakeClient) -> None: async with make_client( "https://api.localhost.localdomain", registry_url="http://localhost:5000" ) as client: result = client.parse.remote_image("ubuntu:latest") assert result == RemoteImage("ubuntu", "latest", owner=None, registry=None) def test_get_url_authority_with_explicit_port() -> None: url = URL("https://example.com:8080/") assert _get_url_authority(url) == "example.com:8080" def test_get_url_authority_with_implicit_port() -> None: url = URL("https://example.com/") # here `url.port == 80` assert _get_url_authority(url) == "example.com" def test_get_url_authority_without_port() -> None: url = URL("scheme://example.com/") # here `url.port is None` assert _get_url_authority(url) == "example.com" def test_get_url_authority_without_host() -> None: url = URL("scheme://") assert _get_url_authority(url) is None
import asyncio from typing import AsyncIterator, Optional from unittest import mock import aiohttp import pytest from aiohttp import ClientSession from aiohttp.test_utils import unused_port from aiohttp.web import ( Application, HTTPBadRequest, HTTPForbidden, HTTPFound, HTTPOk, HTTPUnauthorized, Request, Response, json_response, ) from yarl import URL from neuromation.api import Preset from neuromation.api.login import ( AuthCode, AuthException, AuthNegotiator, AuthTokenClient, HeadlessNegotiator, _AuthConfig, _AuthToken, _ClusterConfig, create_app_server, create_app_server_once, create_auth_code_app, ) from tests import _TestServerFactory class TestAuthCode: async def test_wait_timed_out(self) -> None: code = AuthCode() with pytest.raises(AuthException, match="failed to get an authorization code"): await code.wait(timeout_s=0.0) async def test_wait_cancelled(self) -> None: code = AuthCode() code.cancel() with pytest.raises(AuthException, match="failed to get an authorization code"): await code.wait() async def test_wait_exception(self) -> None: code = AuthCode() code.set_exception(AuthException("testerror")) with pytest.raises(AuthException, match="testerror"): await code.wait() async def test_wait(self) -> None: code = AuthCode() code.set_value("testcode") value = await code.wait() assert value == "testcode" class TestAuthToken: def test_is_not_expired(self) -> None: token = _AuthToken.create( token="test_token", expires_in=100, refresh_token="test_refresh_token", time_factory=lambda: 2000.0, ) assert token.token == "test_token" assert token.expiration_time == 2075 assert not token.is_expired assert token.refresh_token == "test_refresh_token" def test_is_expired(self) -> None: token = _AuthToken.create( token="test_token", expires_in=0, refresh_token="test_refresh_token", time_factory=lambda: 2000.0, ) assert token.token == "test_token" assert token.expiration_time == 2000 assert token.is_expired assert token.refresh_token == "test_refresh_token" class TestAuthCodeApp: @pytest.fixture async def client( self, loop: asyncio.AbstractEventLoop ) -> AsyncIterator[ClientSession]: async with ClientSession() as client: yield client async def assert_code_callback_success( self, code: AuthCode, client: ClientSession, url: URL, redirect_url: Optional[URL] = None, ) -> None: async with client.get( url, params={"code": "testcode"}, allow_redirects=False ) as resp: if redirect_url: assert resp.status == HTTPFound.status_code assert resp.headers["Location"] == str(redirect_url) else: assert resp.status == HTTPOk.status_code text = await resp.text() assert text == "OK" assert await code.wait() == "testcode" async def test_create_app_server_once(self, client: ClientSession) -> None: code = AuthCode() app = create_auth_code_app(code) port = unused_port() async with create_app_server_once(app, host="127.0.0.1", port=port) as url: assert url == URL(f"http://127.0.0.1:{port}") await self.assert_code_callback_success(code, client, url) async def test_create_app_server_redirect(self, client: ClientSession) -> None: code = AuthCode() redirect_url = URL("https://redirect.url") app = create_auth_code_app(code, redirect_url=redirect_url) port = unused_port() async with create_app_server_once(app, host="127.0.0.1", port=port) as url: assert url == URL(f"http://127.0.0.1:{port}") await self.assert_code_callback_success( code, client, url, redirect_url=redirect_url ) async def test_create_app_server_once_failure(self, client: ClientSession) -> None: code = AuthCode() app = create_auth_code_app(code) port = unused_port() async with create_app_server_once(app, host="127.0.0.1", port=port) as url: assert url == URL(f"http://127.0.0.1:{port}") async with client.get(url) as resp: assert resp.status == HTTPBadRequest.status_code text = await resp.text() assert text == "The 'code' query parameter is missing." with pytest.raises( AuthException, match="failed to get an authorization code" ): await code.wait() async def test_error_unauthorized(self, client: ClientSession) -> None: code = AuthCode() app = create_auth_code_app(code) port = unused_port() async with create_app_server_once(app, host="127.0.0.1", port=port) as url: assert url == URL(f"http://127.0.0.1:{port}") async with client.get( url, params={ "error": "unauthorized", "error_description": "Test Unauthorized", }, ) as resp: assert resp.status == HTTPUnauthorized.status_code text = await resp.text() assert text == "Test Unauthorized" with pytest.raises(AuthException, match="Test Unauthorized"): await code.wait() async def test_error_access_denied(self, client: ClientSession) -> None: code = AuthCode() app = create_auth_code_app(code) port = unused_port() async with create_app_server_once(app, host="127.0.0.1", port=port) as url: assert url == URL(f"http://127.0.0.1:{port}") async with client.get( url, params={ "error": "access_denied", "error_description": "Test Access Denied", }, ) as resp: assert resp.status == HTTPForbidden.status_code text = await resp.text() assert text == "Test Access Denied" with pytest.raises(AuthException, match="Test Access Denied"): await code.wait() async def test_error_other(self, client: ClientSession) -> None: code = AuthCode() app = create_auth_code_app(code) port = unused_port() async with create_app_server_once(app, host="127.0.0.1", port=port) as url: assert url == URL(f"http://127.0.0.1:{port}") async with client.get( url, params={"error": "other", "error_description": "Test Other"} ) as resp: assert resp.status == HTTPBadRequest.status_code text = await resp.text() assert text == "Test Other" with pytest.raises(AuthException, match="Test Other"): await code.wait() async def test_create_app_server(self, client: ClientSession) -> None: code = AuthCode() app = create_auth_code_app(code) port = unused_port() async with create_app_server(app, host="127.0.0.1", ports=[port]) as url: assert url == URL(f"http://127.0.0.1:{port}") await self.assert_code_callback_success(code, client, url) async def test_create_app_server_no_ports(self) -> None: code = AuthCode() app = create_auth_code_app(code) port = unused_port() async with create_app_server_once(app, host="127.0.0.1", port=port): with pytest.raises(RuntimeError, match="No free ports."): async with create_app_server(app, ports=[port]): pass async def test_create_app_server_port_conflict(self, client: ClientSession) -> None: code = AuthCode() app = create_auth_code_app(code) outer_port = unused_port() inner_port = unused_port() async with create_app_server(app, ports=[outer_port, inner_port]) as url: assert url == URL(f"http://127.0.0.1:{outer_port}") async with create_app_server(app, ports=[outer_port, inner_port]) as url: assert url == URL(f"http://127.0.0.1:{inner_port}") await self.assert_code_callback_success(code, client, url) class _TestAuthHandler: def __init__(self, client_id: str) -> None: self._client_id = client_id self._code = "test_code" self._token = "test_access_token" self._token_refreshed = "test_access_token_refreshed" self._refresh_token = "test_refresh_token" self._token_expires_in = 1234 async def handle_authorize(self, request: Request) -> Response: # TODO: assert query url = URL(request.query["redirect_uri"]).with_query(code=self._code) raise HTTPFound(url) async def handle_token(self, request: Request) -> Response: payload = await request.json() grant_type = payload["grant_type"] if grant_type == "authorization_code": assert payload == dict( grant_type="authorization_code", code_verifier=mock.ANY, code=self._code, client_id=self._client_id, redirect_uri=mock.ANY, ) resp_payload = dict( access_token=self._token, expires_in=self._token_expires_in, refresh_token=self._refresh_token, ) else: assert payload == dict( grant_type="refresh_token", refresh_token=self._refresh_token, client_id=self._client_id, ) resp_payload = dict( access_token=self._token_refreshed, expires_in=self._token_expires_in ) return json_response(resp_payload) @pytest.fixture def auth_client_id() -> str: return "test_client_id" @pytest.fixture async def auth_server( auth_client_id: str, aiohttp_server: _TestServerFactory ) -> AsyncIterator[URL]: handler = _TestAuthHandler(client_id=auth_client_id) app = Application() app.router.add_get("/authorize", handler.handle_authorize) app.router.add_post("/oauth/token", handler.handle_token) server = await aiohttp_server(app) yield server.make_url("/") @pytest.fixture async def auth_config( auth_client_id: str, auth_server: URL ) -> AsyncIterator[_AuthConfig]: port = unused_port() yield _AuthConfig.create( auth_url=auth_server / "authorize", token_url=auth_server / "oauth/token", client_id=auth_client_id, audience="https://platform.dev.neuromation.io", headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"), callback_urls=[URL(f"http://127.0.0.1:{port}")], ) class TestTokenClient: async def test_request(self, auth_client_id: str, auth_config: _AuthConfig) -> None: code = AuthCode() code.set_value("test_code") code.callback_url = auth_config.callback_urls[0] async with aiohttp.ClientSession() as session: async with AuthTokenClient( session, auth_config.token_url, client_id=auth_client_id ) as client: token = await client.request(code) assert token.token == "test_access_token" assert token.refresh_token == "test_refresh_token" assert not token.is_expired async def test_refresh(self, auth_client_id: str, auth_config: _AuthConfig) -> None: token = _AuthToken.create( token="test_access_token", expires_in=1234, refresh_token="test_refresh_token", ) async with aiohttp.ClientSession() as session: async with AuthTokenClient( session, auth_config.token_url, client_id=auth_client_id ) as client: new_token = await client.refresh(token) assert new_token.token == "test_access_token_refreshed" assert new_token.refresh_token == "test_refresh_token" assert not token.is_expired async def test_forbidden( self, aiohttp_server: _TestServerFactory, auth_config: _AuthConfig ) -> None: code = AuthCode() code.callback_url = auth_config.callback_urls[0] code.set_value("testcode") client_id = "test_client_id" async def handle_token(request: Request) -> Response: raise HTTPForbidden() app = Application() app.router.add_post("/oauth/token", handle_token) server = await aiohttp_server(app) url = server.make_url("/oauth/token") async with aiohttp.ClientSession() as session: async with AuthTokenClient(session, url, client_id=client_id) as client: with pytest.raises( AuthException, match="failed to get an access token." ): await client.request(code) with pytest.raises( AuthException, match="failed to get an access token." ): token = _AuthToken.create( token="test_token", expires_in=1234, refresh_token="test_refresh_token", ) await client.refresh(token) class TestAuthConfig: def test_is_initialized__no_auth_url(self) -> None: auth_config = _AuthConfig( auth_url=URL(), token_url=URL("url"), client_id="client_id", audience="audience", headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"), callback_urls=(URL("url1"), URL("url2")), success_redirect_url=URL("url"), ) assert auth_config.is_initialized() is False def test_is_initialized__no_token_url(self) -> None: auth_config = _AuthConfig( auth_url=URL("url"), token_url=URL(), client_id="client_id", audience="audience", headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"), callback_urls=(URL("url1"), URL("url2")), success_redirect_url=URL("url"), ) assert auth_config.is_initialized() is False def test_is_initialized__no_client_id(self) -> None: auth_config = _AuthConfig( auth_url=URL("url"), token_url=URL("url"), client_id="", audience="audience", headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"), callback_urls=(URL("url1"), URL("url2")), success_redirect_url=URL("url"), ) assert auth_config.is_initialized() is False def test_is_initialized__no_audience(self) -> None: auth_config = _AuthConfig( auth_url=URL("url"), token_url=URL("url"), client_id="client_id", audience="", headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"), callback_urls=(URL("url1"), URL("url2")), success_redirect_url=URL("url"), ) assert auth_config.is_initialized() is False def test_is_initialized__no_callback_urls(self) -> None: auth_config = _AuthConfig( auth_url=URL("url"), token_url=URL("url"), client_id="client_id", audience="audience", headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"), callback_urls=[], success_redirect_url=URL("url"), ) assert auth_config.is_initialized() is True def test_is_initialized__no_success_redirect_url(self) -> None: auth_config = _AuthConfig( auth_url=URL("url"), token_url=URL("url"), client_id="client_id", audience="audience", headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"), callback_urls=(URL("url1"), URL("url2")), success_redirect_url=None, ) assert auth_config.is_initialized() is True def test_is_initialized__no_headless_callback_url(self) -> None: auth_config = _AuthConfig( auth_url=URL("url"), token_url=URL("url"), client_id="client_id", audience="audience", headless_callback_url=URL(), callback_urls=(URL("url1"), URL("url2")), success_redirect_url=None, ) assert auth_config.is_initialized() is False class TestClusterConfig: def test_is_initialized(self) -> None: cluster_config = _ClusterConfig.create( registry_url=URL("value"), storage_url=URL("value"), users_url=URL("value"), monitoring_url=URL("value"), resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)}, ) assert cluster_config.is_initialized() is True def test_is_initialized__no_registry_url(self) -> None: cluster_config = _ClusterConfig.create( registry_url=URL(), storage_url=URL("value"), users_url=URL("value"), monitoring_url=URL("value"), resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)}, ) assert cluster_config.is_initialized() is False def test_is_initialized__no_storage_url(self) -> None: cluster_config = _ClusterConfig.create( registry_url=URL("value"), storage_url=URL(), users_url=URL("value"), monitoring_url=URL("value"), resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)}, ) assert cluster_config.is_initialized() is False def test_is_initialized__no_users_url(self) -> None: cluster_config = _ClusterConfig.create( registry_url=URL("value"), storage_url=URL("value"), users_url=URL(), monitoring_url=URL("value"), resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)}, ) assert cluster_config.is_initialized() is False def test_is_initialized__no_monitoring_url(self) -> None: cluster_config = _ClusterConfig.create( registry_url=URL("value"), storage_url=URL("value"), users_url=URL("value"), monitoring_url=URL(), resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)}, ) assert cluster_config.is_initialized() is False def test_is_initialized__no_resource_presets(self) -> None: cluster_config = _ClusterConfig.create( registry_url=URL("value"), storage_url=URL("value"), users_url=URL("value"), monitoring_url=URL("value"), resource_presets={}, ) assert cluster_config.is_initialized() is False class TestAuthNegotiator: async def show_dummy_browser(self, url: URL) -> None: async with ClientSession() as client: await client.get(url, allow_redirects=True) async def test_get_code(self, auth_config: _AuthConfig) -> None: async with aiohttp.ClientSession() as session: negotiator = AuthNegotiator( session, config=auth_config, show_browser_cb=self.show_dummy_browser ) code = await negotiator.get_code() assert await code.wait() == "test_code" assert code.callback_url == auth_config.callback_urls[0] async def test_get_token(self, auth_config: _AuthConfig) -> None: async with aiohttp.ClientSession() as session: negotiator = AuthNegotiator( session, config=auth_config, show_browser_cb=self.show_dummy_browser ) token = await negotiator.refresh_token(token=None) assert token.token == "test_access_token" assert token.refresh_token == "test_refresh_token" async def test_refresh_token_noop(self, auth_config: _AuthConfig) -> None: async with aiohttp.ClientSession() as session: negotiator = AuthNegotiator( session, config=auth_config, show_browser_cb=self.show_dummy_browser ) token = await negotiator.refresh_token(token=None) assert token.token == "test_access_token" assert token.refresh_token == "test_refresh_token" assert not token.is_expired token = await negotiator.refresh_token(token=token) assert token.token == "test_access_token" assert token.refresh_token == "test_refresh_token" async def test_refresh_token(self, auth_config: _AuthConfig) -> None: async with aiohttp.ClientSession() as session: negotiator = AuthNegotiator( session, config=auth_config, show_browser_cb=self.show_dummy_browser ) token = await negotiator.refresh_token(token=None) assert token.token == "test_access_token" assert token.refresh_token == "test_refresh_token" assert not token.is_expired token = _AuthToken.create( token=token.token, expires_in=0, refresh_token=token.refresh_token ) token = await negotiator.refresh_token(token=token) assert token.token == "test_access_token_refreshed" assert token.refresh_token == "test_refresh_token" class TestHeadlessNegotiator: async def test_get_code(self, auth_config: _AuthConfig) -> None: async def get_auth_code_cb(url: URL) -> str: assert url.with_query(None) == auth_config.auth_url assert dict(url.query) == dict( response_type="code", code_challenge=mock.ANY, code_challenge_method="S256", client_id="test_client_id", redirect_uri="https://dev.neu.ro/oauth/show-code", scope="offline_access", audience="https://platform.dev.neuromation.io", ) return "test_code" async with aiohttp.ClientSession() as session: negotiator = HeadlessNegotiator( session, config=auth_config, get_auth_code_cb=get_auth_code_cb ) code = await negotiator.get_code() assert await code.wait() == "test_code" async def test_get_code_raises(self, auth_config: _AuthConfig) -> None: async def get_auth_code_cb(url: URL) -> str: raise RuntimeError("callback error") async with aiohttp.ClientSession() as session: negotiator = HeadlessNegotiator( session, config=auth_config, get_auth_code_cb=get_auth_code_cb ) with pytest.raises(RuntimeError, match="callback error"): await negotiator.get_code()
import json from typing import Any, Callable, Dict, List, Optional import pytest from aiodocker.exceptions import DockerError from aiohttp import web from yarl import URL from neuromation.api import ( Client, Container, HTTPPort, JobStatus, JobTelemetry, RemoteImage, ResourceNotFound, Resources, Volume, ) from neuromation.api.jobs import INVALID_IMAGE_NAME, _job_description_from_api from neuromation.api.parsing_utils import _ImageNameParser from tests import _TestServerFactory _MakeClient = Callable[..., Client] def test_resources_default() -> None: resources = Resources(16, 0.5) assert resources.memory_mb == 16 assert resources.cpu == 0.5 assert resources.gpu is None assert resources.gpu_model is None assert resources.shm is True assert resources.tpu_type is None assert resources.tpu_software_version is None async def test_jobs_monitor( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def log_stream(request: web.Request) -> web.StreamResponse: assert request.headers["Accept-Encoding"] == "identity" resp = web.StreamResponse() resp.enable_chunked_encoding() resp.enable_compression(web.ContentCoding.identity) await resp.prepare(request) for i in range(10): await resp.write(b"chunk " + str(i).encode("ascii") + b"\n") return resp app = web.Application() app.router.add_get("/jobs/job-id/log", log_stream) srv = await aiohttp_server(app) lst = [] async with make_client(srv.make_url("/")) as client: async for data in client.jobs.monitor("job-id"): lst.append(data) assert b"".join(lst) == b"".join( [ b"chunk 0\n", b"chunk 1\n", b"chunk 2\n", b"chunk 3\n", b"chunk 4\n", b"chunk 5\n", b"chunk 6\n", b"chunk 7\n", b"chunk 8\n", b"chunk 9\n", ] ) async def test_monitor_notexistent_job( aiohttp_server: Any, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.Response: raise web.HTTPNotFound() app = web.Application() app.router.add_get("/jobs/job-id/log", handler) srv = await aiohttp_server(app) lst = [] async with make_client(srv.make_url("/")) as client: with pytest.raises(ResourceNotFound): async for data in client.jobs.monitor("job-id"): lst.append(data) assert lst == [] async def test_job_top( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: def get_data_chunk(index: int) -> Dict[str, Any]: return { "cpu": 0.5, "memory": 50, "timestamp": index, "gpu_duty_cycle": 50, "gpu_memory": 55.6, } def get_job_telemetry(index: int) -> JobTelemetry: return JobTelemetry( cpu=0.5, memory=50, timestamp=index, gpu_duty_cycle=50, gpu_memory=55.6 ) async def top_stream(request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) for i in range(10): await ws.send_json(get_data_chunk(i)) await ws.close() return ws app = web.Application() app.router.add_get("/jobs/job-id/top", top_stream) srv = await aiohttp_server(app) lst = [] async with make_client(srv.make_url("/")) as client: async for data in client.jobs.top("job-id"): lst.append(data) assert lst == [get_job_telemetry(i) for i in range(10)] async def test_top_finished_job( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) await ws.close() return ws app = web.Application() app.router.add_get("/jobs/job-id/top", handler) srv = await aiohttp_server(app) lst = [] async with make_client(srv.make_url("/")) as client: with pytest.raises(ValueError, match="not running"): async for data in client.jobs.top("job-id"): lst.append(data) assert lst == [] async def test_top_nonexisting_job( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.Response: raise web.HTTPBadRequest() app = web.Application() app.router.add_get("/jobs/job-id/top", handler) srv = await aiohttp_server(app) lst = [] async with make_client(srv.make_url("/")) as client: with pytest.raises(ValueError, match="not found"): async for data in client.jobs.top("job-id"): lst.append(data) assert lst == [] async def test_kill_not_found_error( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.Response: raise web.HTTPNotFound() app = web.Application() app.router.add_delete("/jobs/job-id", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: with pytest.raises(ResourceNotFound): await client.jobs.kill("job-id") async def test_kill_ok( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: async def handler(request: web.Request) -> web.Response: raise web.HTTPNoContent() app = web.Application() app.router.add_delete("/jobs/job-id", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: ret = await client.jobs.kill("job-id") assert ret is None async def test_save_image_not_in_neuro_registry(make_client: _MakeClient) -> None: async with make_client("http://whatever") as client: image = RemoteImage(name="ubuntu") with pytest.raises(ValueError, match="must be in the neuromation registry"): await client.jobs.save("job-id", image) async def test_save_ok( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = [ { "status": "CommitStarted", "details": {"container": "cont_id", "image": f"ubuntu:latest"}, }, {"status": "CommitFinished"}, {"status": "The push refers to repository [localhost:5000/alpine]"}, {"status": "Preparing", "progressDetail": {}, "id": "a31dbd3063d7"}, { "status": "Pushing", "progressDetail": {"current": 3584}, "progress": " 3.584kB", "id": "0acd017a4b67", }, {"status": "Pushed", "progressDetail": {}, "id": "0acd017a4b67"}, {"status": "job-id: digest: sha256:DIGEST size: 1359"}, { "progressDetail": {}, "aux": {"Tag": "job-id", "Digest": "sha256:DIGEST", "Size": 1359}, }, ] async def handler(request: web.Request) -> web.StreamResponse: assert "b3" in request.headers encoding = "utf-8" response = web.StreamResponse(status=200) response.enable_compression(web.ContentCoding.identity) response.content_type = "application/x-ndjson" response.charset = encoding await response.prepare(request) for chunk in JSON: chunk_str = json.dumps(chunk) + "\r\n" await response.write(chunk_str.encode(encoding)) return response app = web.Application() app.router.add_post("/jobs/job-id/save", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: image = RemoteImage(registry="gcr.io", owner="me", name="img") await client.jobs.save("job-id", image) async def test_save_commit_started_invalid_status_fails( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: invalid = "invalid status" JSON = [ {"status": invalid, "details": {"container": "cnt", "image": "img"}}, {"status": "CommitFinished"}, {"status": "The push refers to repository [localhost:5000/alpine]"}, ] async def handler(request: web.Request) -> web.StreamResponse: assert "b3" in request.headers encoding = "utf-8" response = web.StreamResponse(status=200) response.enable_compression(web.ContentCoding.identity) response.content_type = "application/x-ndjson" response.charset = encoding await response.prepare(request) for chunk in JSON: chunk_str = json.dumps(chunk) + "\r\n" await response.write(chunk_str.encode(encoding)) return response app = web.Application() app.router.add_post("/jobs/job-id/save", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: image = RemoteImage(registry="gcr.io", owner="me", name="img") with pytest.raises( DockerError, match=f"Invalid commit status: '{invalid}', expecting: 'CommitStarted'", ): await client.jobs.save("job-id", image) async def test_save_commit_started_missing_image_details_fails( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = [ {"status": "CommitStarted", "details": {"container": "cnt"}}, {"status": "CommitFinished"}, {"status": "The push refers to repository [localhost:5000/alpine]"}, ] async def handler(request: web.Request) -> web.StreamResponse: assert "b3" in request.headers encoding = "utf-8" response = web.StreamResponse(status=200) response.enable_compression(web.ContentCoding.identity) response.content_type = "application/x-ndjson" response.charset = encoding await response.prepare(request) for chunk in JSON: chunk_str = json.dumps(chunk) + "\r\n" await response.write(chunk_str.encode(encoding)) return response app = web.Application() app.router.add_post("/jobs/job-id/save", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: image = RemoteImage(registry="gcr.io", owner="me", name="img") with pytest.raises(DockerError, match="Missing required details: 'image'"): await client.jobs.save("job-id", image) async def test_save_commit_finished_invalid_status_fails( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: invalid = "invalid status" JSON = [ {"status": "CommitStarted", "details": {"container": "cnt", "image": "img"}}, {"status": invalid}, {"status": "The push refers to repository [localhost:5000/alpine]"}, ] async def handler(request: web.Request) -> web.StreamResponse: encoding = "utf-8" response = web.StreamResponse(status=200) response.enable_compression(web.ContentCoding.identity) response.content_type = "application/x-ndjson" response.charset = encoding await response.prepare(request) for chunk in JSON: chunk_str = json.dumps(chunk) + "\r\n" await response.write(chunk_str.encode(encoding)) return response app = web.Application() app.router.add_post("/jobs/job-id/save", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: image = RemoteImage(registry="gcr.io", owner="me", name="img") with pytest.raises( DockerError, match=(f"Invalid commit status: '{invalid}', expecting: 'CommitFinished'"), ): await client.jobs.save("job-id", image) async def test_save_commit_started_missing_status_fails( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = [ {"not-a-status": "value"}, {"status": "CommitFinished"}, {"status": "The push refers to repository [localhost:5000/alpine]"}, ] async def handler(request: web.Request) -> web.StreamResponse: encoding = "utf-8" response = web.StreamResponse(status=200) response.enable_compression(web.ContentCoding.identity) response.content_type = "application/x-ndjson" response.charset = encoding await response.prepare(request) for chunk in JSON: chunk_str = json.dumps(chunk) + "\r\n" await response.write(chunk_str.encode(encoding)) return response app = web.Application() app.router.add_post("/jobs/job-id/save", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: image = RemoteImage(registry="gcr.io", owner="me", name="img") with pytest.raises(DockerError, match='Missing required field: "status"'): await client.jobs.save("job-id", image) async def test_save_commit_finished_missing_status_fails( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = [ {"status": "CommitStarted", "details": {"container": "cnt", "image": "img"}}, {"not-a-status": "value"}, {"status": "The push refers to repository [localhost:5000/alpine]"}, ] async def handler(request: web.Request) -> web.StreamResponse: encoding = "utf-8" response = web.StreamResponse(status=200) response.enable_compression(web.ContentCoding.identity) response.content_type = "application/x-ndjson" response.charset = encoding await response.prepare(request) for chunk in JSON: chunk_str = json.dumps(chunk) + "\r\n" await response.write(chunk_str.encode(encoding)) return response app = web.Application() app.router.add_post("/jobs/job-id/save", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: image = RemoteImage(registry="gcr.io", owner="me", name="img") with pytest.raises(DockerError, match='Missing required field: "status"'): await client.jobs.save("job-id", image) async def test_status_failed( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "status": "failed", "id": "job-id", "description": "This is job description, not a history description", "http_url": "http://my_host:8889", "ssh_server": "ssh://my_host.ssh:22", "ssh_auth_server": "ssh://my_host.ssh:22", "history": { "created_at": "2018-08-29T12:23:13.981621+00:00", "started_at": "2018-08-29T12:23:15.988054+00:00", "finished_at": "2018-08-29T12:59:31.427795+00:00", "reason": "ContainerCannotRun", "description": "Not enough coffee", }, "is_preemptible": True, "owner": "owner", "cluster_name": "default", "container": { "image": "submit-image-name", "command": "submit-command", "http": {"port": 8181}, "resources": { "memory_mb": "4096", "cpu": 7.0, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", }, "volumes": [ { "src_storage_uri": "storage://test-user/path_read_only", "dst_path": "/container/read_only", "read_only": True, }, { "src_storage_uri": "storage://test-user/path_read_write", "dst_path": "/container/path_read_write", "read_only": False, }, ], }, } async def handler(request: web.Request) -> web.Response: return web.json_response(JSON) app = web.Application() app.router.add_get("/jobs/job-id", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: ret = await client.jobs.status("job-id") parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) assert ret == _job_description_from_api(JSON, parser) async def test_status_with_ssh_and_http( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "status": "running", "id": "job-id", "description": "This is job description, not a history description", "http_url": "http://my_host:8889", "ssh_server": "ssh://my_host.ssh:22", "ssh_auth_server": "ssh://my_host.ssh:22", "history": { "created_at": "2018-08-29T12:23:13.981621+00:00", "started_at": "2018-08-29T12:23:15.988054+00:00", "finished_at": "2018-08-29T12:59:31.427795+00:00", "reason": "OK", "description": "Everything is fine", }, "is_preemptible": True, "owner": "owner", "cluster_name": "default", "container": { "image": "submit-image-name", "command": "submit-command", "http": {"port": 8181}, "resources": { "memory_mb": "4096", "cpu": 7.0, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", }, "volumes": [ { "src_storage_uri": "storage://test-user/path_read_only", "dst_path": "/container/read_only", "read_only": True, }, { "src_storage_uri": "storage://test-user/path_read_write", "dst_path": "/container/path_read_write", "read_only": False, }, ], }, } async def handler(request: web.Request) -> web.Response: return web.json_response(JSON) app = web.Application() app.router.add_get("/jobs/job-id", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: ret = await client.jobs.status("job-id") parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) assert ret == _job_description_from_api(JSON, parser) async def test_status_with_tpu( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "status": "running", "id": "job-id", "description": "This is job description, not a history description", "http_url": "http://my_host:8889", "ssh_server": "ssh://my_host.ssh:22", "ssh_auth_server": "ssh://my_host.ssh:22", "history": { "created_at": "2018-08-29T12:23:13.981621+00:00", "started_at": "2018-08-29T12:23:15.988054+00:00", "finished_at": "2018-08-29T12:59:31.427795+00:00", "reason": "OK", "description": "Everything is fine", }, "is_preemptible": True, "owner": "owner", "cluster_name": "default", "container": { "image": "submit-image-name", "command": "submit-command", "http": {"port": 8181}, "resources": { "memory_mb": "4096", "cpu": 7.0, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", "tpu": {"type": "v3-8", "software_version": "1.14"}, }, "volumes": [ { "src_storage_uri": "storage://test-user/path_read_only", "dst_path": "/container/read_only", "read_only": True, }, { "src_storage_uri": "storage://test-user/path_read_write", "dst_path": "/container/path_read_write", "read_only": False, }, ], }, } async def handler(request: web.Request) -> web.Response: return web.json_response(JSON) app = web.Application() app.router.add_get("/jobs/job-id", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: ret = await client.jobs.status("job-id") parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) assert ret == _job_description_from_api(JSON, parser) assert ret.container.resources.tpu_type == "v3-8" assert ret.container.resources.tpu_software_version == "1.14" async def test_job_run( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f", "status": "failed", "history": { "status": "failed", "reason": "Error", "description": "Mounted on Avail\\n/dev/shm " "64M\\n\\nExit code: 1", "created_at": "2018-09-25T12:28:21.298672+00:00", "started_at": "2018-09-25T12:28:59.759433+00:00", "finished_at": "2018-09-25T12:28:59.759433+00:00", }, "owner": "owner", "cluster_name": "default", "container": { "image": "gcr.io/light-reality-205619/ubuntu:latest", "command": "date", "resources": { "cpu": 1.0, "memory_mb": 16384, "gpu": 1, "shm": False, "gpu_model": "nvidia-tesla-p4", }, }, "http_url": "http://my_host:8889", "ssh_server": "ssh://my_host.ssh:22", "ssh_auth_server": "ssh://my_host.ssh:22", "is_preemptible": False, } async def handler(request: web.Request) -> web.Response: data = await request.json() assert data == { "container": { "image": "submit-image-name", "command": "submit-command", "http": {"port": 8181, "requires_auth": True}, "resources": { "memory_mb": 16384, "cpu": 7.0, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", }, "volumes": [ { "src_storage_uri": "storage://test-user/path_read_only", "dst_path": "/container/read_only", "read_only": True, }, { "src_storage_uri": "storage://test-user/path_read_write", "dst_path": "/container/path_read_write", "read_only": False, }, ], }, "is_preemptible": False, } return web.json_response(JSON) app = web.Application() app.router.add_post("/jobs", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None) volumes: List[Volume] = [ Volume( URL("storage://test-user/path_read_only"), "/container/read_only", True ), Volume( URL("storage://test-user/path_read_write"), "/container/path_read_write", False, ), ] container = Container( image=RemoteImage("submit-image-name"), command="submit-command", resources=resources, volumes=volumes, http=HTTPPort(8181), ) ret = await client.jobs.run(container=container, is_preemptible=False) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) assert ret == _job_description_from_api(JSON, parser) async def test_job_run_with_name_and_description( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f", "name": "test-job-name", "description": "job description", "status": "failed", "history": { "status": "failed", "reason": "Error", "description": "Mounted on Avail\\n/dev/shm " "64M\\n\\nExit code: 1", "created_at": "2018-09-25T12:28:21.298672+00:00", "started_at": "2018-09-25T12:28:59.759433+00:00", "finished_at": "2018-09-25T12:28:59.759433+00:00", }, "owner": "owner", "cluster_name": "default", "container": { "image": "gcr.io/light-reality-205619/ubuntu:latest", "command": "date", "resources": { "cpu": 1.0, "memory_mb": 16384, "gpu": 1, "shm": False, "gpu_model": "nvidia-tesla-p4", }, }, "http_url": "http://my_host:8889", "ssh_server": "ssh://my_host.ssh:22", "ssh_auth_server": "ssh://my_host.ssh:22", "is_preemptible": False, } async def handler(request: web.Request) -> web.Response: data = await request.json() assert data == { "container": { "image": "submit-image-name", "command": "submit-command", "http": {"port": 8181, "requires_auth": True}, "resources": { "memory_mb": 16384, "cpu": 7.0, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", }, "volumes": [ { "src_storage_uri": "storage://test-user/path_read_only", "dst_path": "/container/read_only", "read_only": True, }, { "src_storage_uri": "storage://test-user/path_read_write", "dst_path": "/container/path_read_write", "read_only": False, }, ], }, "is_preemptible": False, "name": "test-job-name", "description": "job description", } return web.json_response(JSON) app = web.Application() app.router.add_post("/jobs", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None) volumes: List[Volume] = [ Volume( URL("storage://test-user/path_read_only"), "/container/read_only", True ), Volume( URL("storage://test-user/path_read_write"), "/container/path_read_write", False, ), ] container = Container( image=RemoteImage("submit-image-name"), command="submit-command", resources=resources, volumes=volumes, http=HTTPPort(8181), ) ret = await client.jobs.run( container, is_preemptible=False, name="test-job-name", description="job description", ) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) assert ret == _job_description_from_api(JSON, parser) async def test_job_run_no_volumes( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f", "name": "test-job-name", "status": "failed", "history": { "status": "failed", "reason": "Error", "description": "Mounted on Avail\\n/dev/shm " "64M\\n\\nExit code: 1", "created_at": "2018-09-25T12:28:21.298672+00:00", "started_at": "2018-09-25T12:28:59.759433+00:00", "finished_at": "2018-09-25T12:28:59.759433+00:00", }, "owner": "owner", "cluster_name": "default", "container": { "image": "gcr.io/light-reality-205619/ubuntu:latest", "command": "date", "resources": { "cpu": 7, "memory_mb": 16384, "gpu": 1, "shm": False, "gpu_model": "nvidia-tesla-p4", }, }, "http_url": "http://my_host:8889", "ssh_server": "ssh://my_host.ssh:22", "ssh_auth_server": "ssh://my_host.ssh:22", "is_preemptible": False, } async def handler(request: web.Request) -> web.Response: data = await request.json() assert data == { "container": { "image": "submit-image-name", "command": "submit-command", "http": {"port": 8181, "requires_auth": True}, "resources": { "memory_mb": 16384, "cpu": 7, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", }, }, "is_preemptible": False, "name": "test-job-name", "description": "job description", } return web.json_response(JSON) app = web.Application() app.router.add_post("/jobs", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None) container = Container( image=RemoteImage("submit-image-name"), command="submit-command", resources=resources, http=HTTPPort(8181), ) ret = await client.jobs.run( container, is_preemptible=False, name="test-job-name", description="job description", ) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) assert ret == _job_description_from_api(JSON, parser) async def test_job_run_preemptible( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f", "name": "test-job-name", "status": "failed", "history": { "status": "failed", "reason": "Error", "description": "Mounted on Avail\\n/dev/shm " "64M\\n\\nExit code: 1", "created_at": "2018-09-25T12:28:21.298672+00:00", "started_at": "2018-09-25T12:28:59.759433+00:00", "finished_at": "2018-09-25T12:28:59.759433+00:00", }, "owner": "owner", "cluster_name": "default", "container": { "image": "gcr.io/light-reality-205619/ubuntu:latest", "command": "date", "resources": { "cpu": 1.0, "memory_mb": 16384, "gpu": 1, "shm": False, "gpu_model": "nvidia-tesla-p4", }, }, "is_preemptible": True, "http_url": "http://my_host:8889", "ssh_server": "ssh://my_host.ssh:22", "ssh_auth_server": "ssh://my_host.ssh:22", } async def handler(request: web.Request) -> web.Response: data = await request.json() assert data == { "container": { "image": "submit-image-name", "command": "submit-command", "http": {"port": 8181, "requires_auth": True}, "resources": { "memory_mb": 16384, "cpu": 7.0, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", }, "volumes": [ { "src_storage_uri": "storage://test-user/path_read_only", "dst_path": "/container/read_only", "read_only": True, }, { "src_storage_uri": "storage://test-user/path_read_write", "dst_path": "/container/path_read_write", "read_only": False, }, ], }, "is_preemptible": True, "name": "test-job-name", "description": "job description", } return web.json_response(JSON) app = web.Application() app.router.add_post("/jobs", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None) volumes: List[Volume] = [ Volume( URL("storage://test-user/path_read_only"), "/container/read_only", True ), Volume( URL("storage://test-user/path_read_write"), "/container/path_read_write", False, ), ] container = Container( image=RemoteImage("submit-image-name"), command="submit-command", resources=resources, volumes=volumes, http=HTTPPort(8181), ) ret = await client.jobs.run( container, is_preemptible=True, name="test-job-name", description="job description", ) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) assert ret == _job_description_from_api(JSON, parser) async def test_job_run_schedule_timeout( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f", "status": "failed", "history": { "status": "failed", "reason": "Error", "description": "Mounted on Avail\\n/dev/shm " "64M\\n\\nExit code: 1", "created_at": "2018-09-25T12:28:21.298672+00:00", "started_at": "2018-09-25T12:28:59.759433+00:00", "finished_at": "2018-09-25T12:28:59.759433+00:00", }, "owner": "owner", "cluster_name": "default", "container": { "image": "gcr.io/light-reality-205619/ubuntu:latest", "command": "date", "resources": { "cpu": 1.0, "memory_mb": 16384, "gpu": 1, "shm": False, "gpu_model": "nvidia-tesla-p4", }, }, "http_url": "http://my_host:8889", "ssh_server": "ssh://my_host.ssh:22", "ssh_auth_server": "ssh://my_host.ssh:22", "is_preemptible": False, "schedule_timeout": 5, } async def handler(request: web.Request) -> web.Response: data = await request.json() assert data == { "container": { "image": "submit-image-name", "command": "submit-command", "resources": { "memory_mb": 16384, "cpu": 7, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", }, }, "is_preemptible": False, "schedule_timeout": 5, } return web.json_response(JSON) app = web.Application() app.router.add_post("/jobs", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None) container = Container( image=RemoteImage("submit-image-name"), command="submit-command", resources=resources, ) ret = await client.jobs.run(container=container, schedule_timeout=5) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) assert ret == _job_description_from_api(JSON, parser) async def test_job_run_tpu( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON = { "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f", "status": "failed", "history": { "status": "failed", "reason": "Error", "description": "Mounted on Avail\\n/dev/shm " "64M\\n\\nExit code: 1", "created_at": "2018-09-25T12:28:21.298672+00:00", "started_at": "2018-09-25T12:28:59.759433+00:00", "finished_at": "2018-09-25T12:28:59.759433+00:00", }, "owner": "owner", "cluster_name": "default", "container": { "image": "gcr.io/light-reality-205619/ubuntu:latest", "command": "date", "resources": { "cpu": 1.0, "memory_mb": 16384, "gpu": 1, "shm": False, "gpu_model": "nvidia-tesla-p4", "tpu": {"type": "v3-8", "software_version": "1.14"}, }, }, "http_url": "http://my_host:8889", "ssh_server": "ssh://my_host.ssh:22", "ssh_auth_server": "ssh://my_host.ssh:22", "is_preemptible": False, } async def handler(request: web.Request) -> web.Response: data = await request.json() assert data == { "container": { "image": "submit-image-name", "command": "submit-command", "resources": { "memory_mb": 16384, "cpu": 7, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", "tpu": {"type": "v3-8", "software_version": "1.14"}, }, }, "is_preemptible": False, "schedule_timeout": 5, } return web.json_response(JSON) app = web.Application() app.router.add_post("/jobs", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: resources = Resources(16384, 7, 1, "test-gpu-model", True, "v3-8", "1.14") container = Container( image=RemoteImage("submit-image-name"), command="submit-command", resources=resources, ) ret = await client.jobs.run(container=container, schedule_timeout=5) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) assert ret == _job_description_from_api(JSON, parser) assert ret.container.resources.tpu_type == "v3-8" assert ret.container.resources.tpu_software_version == "1.14" def create_job_response( id: str, status: str, owner: str = "owner", name: Optional[str] = None, image: str = "submit-image-name", ) -> Dict[str, Any]: result = { "id": id, "status": status, "history": { "status": "failed", "reason": "Error", "description": "Mounted on Avail\\n/dev/shm " "64M\\n\\nExit code: 1", "created_at": "2018-09-25T12:28:21.298672+00:00", "started_at": "2018-09-25T12:28:59.759433+00:00", "finished_at": "2018-09-25T12:28:59.759433+00:00", }, "ssh_auth_server": "ssh://my_host.ssh:22", "container": { "image": image, "command": "submit-command", "resources": { "cpu": 1.0, "memory_mb": 16384, "gpu": 1, "gpu_model": "nvidia-tesla-v100", }, }, "is_preemptible": True, "owner": owner, "cluster_name": "default", } if name: result["name"] = name return result async def test_list_no_filter( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: jobs = [ create_job_response("job-id-1", "pending", name="job-name-1"), create_job_response("job-id-2", "running", name="job-name-1"), create_job_response("job-id-3", "succeeded", name="job-name-1"), create_job_response("job-id-4", "failed", name="job-name-1"), ] JSON = {"jobs": jobs} async def handler(request: web.Request) -> web.Response: return web.json_response(JSON) app = web.Application() app.router.add_get("/jobs", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list() parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) job_descriptions = [_job_description_from_api(job, parser) for job in jobs] assert ret == job_descriptions async def test_list_filter_by_name( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: name_1 = "job-name-1" name_2 = "job-name-2" jobs = [ create_job_response("job-id-1", "pending", name=name_1), create_job_response("job-id-2", "succeeded", name=name_1), create_job_response("job-id-3", "failed", name=name_1), create_job_response("job-id-4", "running", name=name_2), create_job_response("job-id-5", "succeeded", name=name_2), create_job_response("job-id-6", "failed", name=name_2), create_job_response("job-id-7", "running"), create_job_response("job-id-8", "pending"), create_job_response("job-id-9", "succeeded"), create_job_response("job-id-10", "failed"), ] async def handler(request: web.Request) -> web.Response: name = request.query.get("name") assert name filtered_jobs = [job for job in jobs if job.get("name") == name] JSON = {"jobs": filtered_jobs} return web.json_response(JSON) app = web.Application() app.router.add_get("/jobs", handler) srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list(name=name_1) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) job_descriptions = [_job_description_from_api(job, parser) for job in jobs] assert ret == job_descriptions[:3] async def test_list_filter_by_statuses( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: name_1 = "job-name-1" name_2 = "job-name-2" jobs = [ create_job_response("job-id-1", "pending", name=name_1), create_job_response("job-id-2", "succeeded", name=name_1), create_job_response("job-id-3", "failed", name=name_1), create_job_response("job-id-4", "running", name=name_2), create_job_response("job-id-5", "succeeded", name=name_2), create_job_response("job-id-6", "failed", name=name_2), create_job_response("job-id-7", "running"), create_job_response("job-id-8", "pending"), create_job_response("job-id-9", "succeeded"), create_job_response("job-id-10", "failed"), ] async def handler(request: web.Request) -> web.Response: statuses = request.query.getall("status") assert statuses filtered_jobs = [job for job in jobs if job["status"] in statuses] JSON = {"jobs": filtered_jobs} return web.json_response(JSON) app = web.Application() app.router.add_get("/jobs", handler) srv = await aiohttp_server(app) statuses = {JobStatus.FAILED, JobStatus.SUCCEEDED} async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list(statuses=statuses) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) job_descriptions = [_job_description_from_api(job, parser) for job in jobs] assert ret == [job for job in job_descriptions if job.status in statuses] async def test_list_incorrect_image( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: jobs = [ create_job_response("job-id-1", "running"), create_job_response("job-id-2", "pending", image="some.com/path/:tag"), create_job_response( "job-id-3", "failed", image="registry-dev.neu.ro/path/:tag" ), create_job_response("job-id-4", "failed", image=""), create_job_response("job-id-5", "failed", image=":"), ] async def handler(request: web.Request) -> web.Response: JSON = {"jobs": jobs} return web.json_response(JSON) app = web.Application() app.router.add_get("/jobs", handler) srv = await aiohttp_server(app) statuses = {JobStatus.FAILED, JobStatus.SUCCEEDED} async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list(statuses=statuses) for job in ret: if job.status == JobStatus.FAILED: assert job.container.image.name == INVALID_IMAGE_NAME else: assert job.container.image.name != INVALID_IMAGE_NAME class TestVolumeParsing: @pytest.mark.parametrize( "volume_param", ["dir", "storage://dir", "storage://dir:/var/www:rw:ro"] ) async def test_incorrect_params_count( self, volume_param: str, make_client: _MakeClient ) -> None: async with make_client("https://example.com") as client: with pytest.raises(ValueError, match=r"Invalid volume specification"): client.parse.volume(volume_param) @pytest.mark.parametrize( "volume_param", ["storage://dir:/var/www:write", "storage://dir:/var/www:"] ) async def test_incorrect_mode( self, volume_param: str, make_client: _MakeClient ) -> None: async with make_client("https://example.com") as client: with pytest.raises(ValueError, match=r"Wrong ReadWrite/ReadOnly mode spec"): client.parse.volume(volume_param) @pytest.mark.parametrize( "volume_param,volume", [ ( "storage://user/dir:/var/www", Volume( storage_uri=URL("storage://user/dir"), container_path="/var/www", read_only=False, ), ), ( "storage://user/dir:/var/www:rw", Volume( storage_uri=URL("storage://user/dir"), container_path="/var/www", read_only=False, ), ), ( "storage://user:/var/www:ro", Volume( storage_uri=URL("storage://user"), container_path="/var/www", read_only=True, ), ), ( "storage://~/:/var/www:ro", Volume( storage_uri=URL("storage://user"), container_path="/var/www", read_only=True, ), ), ( "storage:dir:/var/www:ro", Volume( storage_uri=URL("storage://user/dir"), container_path="/var/www", read_only=True, ), ), ( "storage::/var/www:ro", Volume( storage_uri=URL("storage://user"), container_path="/var/www", read_only=True, ), ), ], ) async def test_positive( self, volume_param: str, volume: Volume, make_client: _MakeClient ) -> None: async with make_client("https://example.com") as client: assert client.parse.volume(volume_param) == volume async def test_list_filter_by_name_and_statuses( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: name_1 = "job-name-1" name_2 = "job-name-2" jobs = [ create_job_response("job-id-1", "pending", name=name_1), create_job_response("job-id-2", "succeeded", name=name_1), create_job_response("job-id-3", "failed", name=name_1), create_job_response("job-id-4", "running", name=name_2), create_job_response("job-id-5", "succeeded", name=name_2), create_job_response("job-id-6", "failed", name=name_2), create_job_response("job-id-7", "running"), create_job_response("job-id-8", "pending"), create_job_response("job-id-9", "succeeded"), create_job_response("job-id-10", "failed"), ] async def handler(request: web.Request) -> web.Response: statuses = request.query.getall("status") assert statuses name = request.query.get("name") assert name filtered_jobs = [ job for job in jobs if job["status"] in statuses and job.get("name") == name ] JSON = {"jobs": filtered_jobs} return web.json_response(JSON) app = web.Application() app.router.add_get("/jobs", handler) srv = await aiohttp_server(app) statuses = {JobStatus.PENDING, JobStatus.SUCCEEDED} name = "job-name-1" async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list(statuses=statuses, name=name) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) job_descriptions = [_job_description_from_api(job, parser) for job in jobs] assert ret == job_descriptions[:2] async def test_list_filter_by_name_and_statuses_and_owners( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: name_1 = "job-name-1" name_2 = "job-name-2" owner_1 = "owner-1" owner_2 = "owner-2" jobs = [ create_job_response("job-id-1", "running", name=name_1, owner=owner_1), create_job_response("job-id-2", "running", name=name_1, owner=owner_2), create_job_response("job-id-3", "running", name=name_2, owner=owner_1), create_job_response("job-id-4", "running", name=name_2, owner=owner_2), create_job_response("job-id-5", "succeeded", name=name_1, owner=owner_1), create_job_response("job-id-6", "succeeded", name=name_1, owner=owner_2), create_job_response("job-id-7", "succeeded", name=name_2, owner=owner_1), create_job_response("job-id-8", "succeeded", name=name_2, owner=owner_2), ] async def handler(request: web.Request) -> web.Response: statuses = request.query.getall("status") name = request.query.get("name") owners = request.query.getall("owner") filtered_jobs = [ job for job in jobs if job["status"] in statuses and job.get("name") == name and job.get("owner") in owners ] return web.json_response({"jobs": filtered_jobs}) app = web.Application() app.router.add_get("/jobs", handler) srv = await aiohttp_server(app) statuses = {JobStatus.RUNNING} name = name_1 owners = {owner_1, owner_2} async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list(statuses=statuses, name=name, owners=owners) parser = _ImageNameParser( client._config.auth_token.username, client._config.cluster_config.registry_url ) job_descriptions = [_job_description_from_api(job, parser) for job in jobs] assert ret == job_descriptions[:2]