Menü Kapat

Kaynak Kodlar

Yapay Zeka
Kaynak Kodları

from typing import AsyncIterator, Callable

import pytest
from aiohttp import web
from yarl import URL

from neuromation.api import Action, Client, Permission, ResourceNotFound
from tests import _TestServerFactory


_MakeClient = Callable[..., Client]


@pytest.fixture()
async def mocked_share_client(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> AsyncIterator[Client]:
    async def handler(request: web.Request) -> web.Response:
        data = await request.json()
        assert data[0]["action"] in [item.value for item in Action]
        raise web.HTTPCreated()

    app = web.Application()
    app.router.add_post("/users/bill/permissions", handler)
    srv = await aiohttp_server(app)
    client = make_client(srv.make_url("/"))
    yield client
    await client.close()


@pytest.fixture()
async def mocked_revoke_client(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> AsyncIterator[Client]:
    async def handler(request: web.Request) -> web.Response:
        assert "uri" in request.query
        raise web.HTTPNoContent()

    app = web.Application()
    app.router.add_delete("/users/bill/permissions", handler)
    srv = await aiohttp_server(app)
    client = make_client(srv.make_url("/"))
    yield client
    await client.close()


class TestUsersShare:
    async def test_share_unknown_user(self, mocked_share_client: Client) -> None:
        with pytest.raises(ResourceNotFound):
            await mocked_share_client.users.share(
                user="not-exists",
                permission=Permission(URL("storage://bob/resource"), Action.READ),
            )

    async def test_correct_share(self, mocked_share_client: Client) -> None:
        ret = await mocked_share_client.users.share(
            user="bill",
            permission=Permission(URL("storage://bob/resource"), Action.READ),
        )
        assert ret is None  # at this moment no result

    async def test_revoke_unknown_user(self, mocked_revoke_client: Client) -> None:
        with pytest.raises(ResourceNotFound):
            await mocked_revoke_client.users.revoke(
                user="not-exists", uri=URL("storage://bob/resource")
            )

    async def test_correct_revoke(self, mocked_revoke_client: Client) -> None:
        ret = await mocked_revoke_client.users.revoke(
            user="bill", uri=URL("storage://bob/resource")
        )
        assert ret is None  # at this moment no result
import asyncio
import errno
import json
import os
from filecmp import dircmp
from pathlib import Path
from shutil import copytree
from typing import Any, AsyncIterator, Callable, List, Tuple
from unittest import mock

import pytest
from aiohttp import web
from yarl import URL

import neuromation.api.storage
from neuromation.api import (
    Action,
    Client,
    FileStatus,
    FileStatusType,
    IllegalArgumentError,
    StorageProgressComplete,
    StorageProgressStart,
    StorageProgressStep,
)
from tests import _RawTestServerFactory, _TestServerFactory


_MakeClient = Callable[..., Client]


FOLDER = Path(__file__).parent
DATA_FOLDER = FOLDER / "data"


def calc_diff(dcmp: "dircmp[str]", *, pre: str = "") -> List[Tuple[str, str]]:
    ret = []
    for name in dcmp.diff_files:
        ret.append((pre + name, pre + name))
    for name in dcmp.left_only:
        ret.append((pre + name, ""))
    for name in dcmp.right_only:
        ret.append(("", pre + name))
    for name, sub_dcmp in dcmp.subdirs.items():
        ret.extend(calc_diff(sub_dcmp, pre=name + "/"))
    return ret


@pytest.fixture
def storage_path(tmp_path: Path) -> Path:
    ret = tmp_path / "storage"
    ret.mkdir()
    return ret


@pytest.fixture
async def storage_server(
    aiohttp_raw_server: _RawTestServerFactory, storage_path: Path
) -> Any:
    PREFIX = "/storage/user"
    PREFIX_LEN = len(PREFIX)

    async def handler(request: web.Request) -> web.Response:
        assert "b3" in request.headers
        op = request.query["op"]
        path = request.path
        assert path.startswith(PREFIX)
        path = path[PREFIX_LEN:]
        if path.startswith("/"):
            path = path[1:]
        local_path = storage_path / path
        if op == "CREATE":
            content = await request.read()
            local_path.write_bytes(content)
            return web.Response(status=201)
        elif op == "OPEN":
            return web.Response(body=local_path.read_bytes())
        elif op == "GETFILESTATUS":
            if not local_path.exists():
                raise web.HTTPNotFound()
            stat = local_path.stat()
            return web.json_response(
                {
                    "FileStatus": {
                        "path": local_path.name,
                        "type": "FILE" if local_path.is_file() else "DIRECTORY",
                        "length": stat.st_size,
                        "modificationTime": stat.st_mtime,
                        "permission": "write",
                    }
                }
            )
        elif op == "MKDIRS":
            try:
                local_path.mkdir(parents=True, exist_ok=True)
            except FileExistsError:
                raise web.HTTPBadRequest(
                    text=json.dumps({"error": "File exists", "errno": "EEXIST"}),
                    content_type="application/json",
                )
            return web.Response(status=201)
        elif op == "LISTSTATUS":
            if not local_path.exists():
                raise web.HTTPNotFound()
            ret = []
            for child in local_path.iterdir():
                stat = child.stat()
                ret.append(
                    {
                        "path": child.name,
                        "type": "FILE" if child.is_file() else "DIRECTORY",
                        "length": stat.st_size,
                        "modificationTime": stat.st_mtime,
                        "permission": "write",
                    }
                )
            return web.json_response({"FileStatuses": {"FileStatus": ret}})
        else:
            raise web.HTTPInternalServerError(text=f"Unsupported operation {op}")

    return await aiohttp_raw_server(handler)


async def test_storage_ls(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "FileStatuses": {
            "FileStatus": [
                {
                    "path": "foo",
                    "length": 1024,
                    "type": "FILE",
                    "modificationTime": 0,
                    "permission": "read",
                },
                {
                    "path": "bar",
                    "length": 4 * 1024,
                    "type": "DIRECTORY",
                    "modificationTime": 0,
                    "permission": "read",
                },
            ]
        }
    }

    async def handler(request: web.Request) -> web.Response:
        assert "b3" in request.headers
        assert request.path == "/storage/user/folder"
        assert request.query == {"op": "LISTSTATUS"}
        return web.json_response(JSON)

    app = web.Application()
    app.router.add_get("/storage/user/folder", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        ret = await client.storage.ls(URL("storage://~/folder"))

    assert ret == [
        FileStatus(
            path="foo",
            size=1024,
            type=FileStatusType.FILE,
            modification_time=0,
            permission=Action.READ,
        ),
        FileStatus(
            path="bar",
            size=4 * 1024,
            type=FileStatusType.DIRECTORY,
            modification_time=0,
            permission=Action.READ,
        ),
    ]


async def test_storage_glob(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler_home(request: web.Request) -> web.Response:
        assert "b3" in request.headers
        assert request.path == "/storage/user/"
        assert request.query == {"op": "LISTSTATUS"}
        return web.json_response(
            {
                "FileStatuses": {
                    "FileStatus": [
                        {
                            "path": "folder",
                            "length": 0,
                            "type": "DIRECTORY",
                            "modificationTime": 0,
                            "permission": "read",
                        }
                    ]
                }
            }
        )

    async def handler_folder(request: web.Request) -> web.Response:
        assert "b3" in request.headers
        assert request.path.rstrip("/") == "/storage/user/folder"
        assert request.query["op"] in ("GETFILESTATUS", "LISTSTATUS")
        if request.query["op"] == "GETFILESTATUS":
            return web.json_response(
                {
                    "FileStatus": {
                        "path": "/user/folder",
                        "type": "DIRECTORY",
                        "length": 0,
                        "modificationTime": 0,
                        "permission": "read",
                    }
                }
            )
        elif request.query["op"] == "LISTSTATUS":
            return web.json_response(
                {
                    "FileStatuses": {
                        "FileStatus": [
                            {
                                "path": "foo",
                                "length": 1024,
                                "type": "FILE",
                                "modificationTime": 0,
                                "permission": "read",
                            },
                            {
                                "path": "bar",
                                "length": 0,
                                "type": "DIRECTORY",
                                "modificationTime": 0,
                                "permission": "read",
                            },
                        ]
                    }
                }
            )
        else:
            raise web.HTTPInternalServerError

    async def handler_foo(request: web.Request) -> web.Response:
        assert "b3" in request.headers
        assert request.path == "/storage/user/folder/foo"
        assert request.query["op"] in ("GETFILESTATUS", "LISTSTATUS")
        assert request.query == {"op": "GETFILESTATUS"}
        return web.json_response(
            {
                "FileStatus": {
                    "path": "/user/folder/foo",
                    "length": 1024,
                    "type": "FILE",
                    "modificationTime": 0,
                    "permission": "read",
                }
            }
        )

    async def handler_bar(request: web.Request) -> web.Response:
        assert request.path.rstrip("/") == "/storage/user/folder/bar"
        if request.query["op"] == "GETFILESTATUS":
            return web.json_response(
                {
                    "FileStatus": {
                        "path": "/user/folder/bar",
                        "length": 0,
                        "type": "DIRECTORY",
                        "modificationTime": 0,
                        "permission": "read",
                    }
                }
            )
        elif request.query["op"] == "LISTSTATUS":
            return web.json_response(
                {
                    "FileStatuses": {
                        "FileStatus": [
                            {
                                "path": "baz",
                                "length": 0,
                                "type": "FILE",
                                "modificationTime": 0,
                                "permission": "read",
                            }
                        ]
                    }
                }
            )
        else:
            raise web.HTTPInternalServerError

    app = web.Application()
    app.router.add_get("/storage/user", handler_home)
    app.router.add_get("/storage/user/", handler_home)
    app.router.add_get("/storage/user/folder", handler_folder)
    app.router.add_get("/storage/user/folder/", handler_folder)
    app.router.add_get("/storage/user/folder/foo", handler_foo)
    app.router.add_get("/storage/user/folder/foo/", handler_foo)
    app.router.add_get("/storage/user/folder/bar", handler_bar)
    app.router.add_get("/storage/user/folder/bar/", handler_bar)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:

        async def glob(pattern: str) -> List[URL]:
            return [uri async for uri in client.storage.glob(URL(pattern))]

        assert await glob("storage:folder") == [URL("storage:folder")]
        assert await glob("storage:folder/") == [URL("storage:folder/")]
        assert await glob("storage:folder/*") == [
            URL("storage:folder/foo"),
            URL("storage:folder/bar"),
        ]
        assert await glob("storage:folder/foo") == [URL("storage:folder/foo")]
        assert await glob("storage:folder/[a-d]*") == [URL("storage:folder/bar")]
        assert await glob("storage:folder/*/") == [URL("storage:folder/bar/")]
        assert await glob("storage:*") == [URL("storage:folder")]
        assert await glob("storage:**") == [
            URL("storage:"),
            URL("storage:folder"),
            URL("storage:folder/foo"),
            URL("storage:folder/bar"),
            URL("storage:folder/bar/baz"),
        ]
        assert await glob("storage:*/foo") == [URL("storage:folder/foo")]
        assert await glob("storage:*/f*") == [URL("storage:folder/foo")]
        assert await glob("storage:**/foo") == [URL("storage:folder/foo")]
        assert await glob("storage:**/f*") == [
            URL("storage:folder"),
            URL("storage:folder/foo"),
        ]
        assert await glob("storage:**/f*/") == [URL("storage:folder/")]
        assert await glob("storage:**/b*") == [
            URL("storage:folder/bar"),
            URL("storage:folder/bar/baz"),
        ]
        assert await glob("storage:**/b*/") == [URL("storage:folder/bar/")]


async def test_storage_rm_file(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def get_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/file"
        assert request.query == {"op": "GETFILESTATUS"}
        return web.json_response(
            {
                "FileStatus": {
                    "path": "/user/file",
                    "type": "FILE",
                    "length": 1234,
                    "modificationTime": 3456,
                    "permission": "read",
                }
            }
        )

    async def delete_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/file"
        assert request.query == {"op": "DELETE"}
        return web.Response(status=204)

    app = web.Application()
    app.router.add_get("/storage/user/file", get_handler)
    app.router.add_delete("/storage/user/file", delete_handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        await client.storage.rm(URL("storage://~/file"))


async def test_storage_rm_directory(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def get_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder"
        assert request.query == {"op": "GETFILESTATUS"}
        return web.json_response(
            {
                "FileStatus": {
                    "path": "/user/folder",
                    "type": "DIRECTORY",
                    "length": 1234,
                    "modificationTime": 3456,
                    "permission": "read",
                }
            }
        )

    async def delete_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder"
        assert request.query == {"op": "DELETE"}
        return web.Response(status=204)

    app = web.Application()
    app.router.add_get("/storage/user/folder", get_handler)
    app.router.add_delete("/storage/user/folder", delete_handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        with pytest.raises(IsADirectoryError, match="Is a directory") as cm:
            await client.storage.rm(URL("storage://~/folder"))
        assert cm.value.errno == errno.EISDIR


async def test_storage_rm_recursive(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def delete_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder"
        assert request.query == {"op": "DELETE"}
        return web.Response(status=204)

    app = web.Application()
    app.router.add_delete("/storage/user/folder", delete_handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        await client.storage.rm(URL("storage://~/folder"), recursive=True)


async def test_storage_mv(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder"
        assert request.query == {"op": "RENAME", "destination": "/user/other"}
        return web.Response(status=204)

    app = web.Application()
    app.router.add_post("/storage/user/folder", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        await client.storage.mv(URL("storage://~/folder"), URL("storage://~/other"))


async def test_storage_mkdir_parents_exist_ok(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder/sub"
        assert request.query == {"op": "MKDIRS"}
        return web.Response(status=204)

    app = web.Application()
    app.router.add_put("/storage/user/folder/sub", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        await client.storage.mkdir(
            URL("storage://~/folder/sub"), parents=True, exist_ok=True
        )


async def test_storage_mkdir_parents(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def get_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder/sub"
        assert request.query == {"op": "GETFILESTATUS"}
        return web.Response(status=404)

    async def put_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder/sub"
        assert request.query == {"op": "MKDIRS"}
        return web.Response(status=204)

    app = web.Application()
    app.router.add_get("/storage/user/folder/sub", get_handler)
    app.router.add_put("/storage/user/folder/sub", put_handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        await client.storage.mkdir(URL("storage://~/folder/sub"), parents=True)


async def test_storage_mkdir_exist_ok(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def get_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder"
        assert request.query == {"op": "GETFILESTATUS"}
        return web.json_response(
            {
                "FileStatus": {
                    "path": "/user/folder",
                    "type": "DIRECTORY",
                    "length": 1234,
                    "modificationTime": 3456,
                    "permission": "read",
                }
            }
        )

    async def put_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder/sub"
        assert request.query == {"op": "MKDIRS"}
        return web.Response(status=204)

    app = web.Application()
    app.router.add_get("/storage/user/folder", get_handler)
    app.router.add_put("/storage/user/folder/sub", put_handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        await client.storage.mkdir(URL("storage://~/folder/sub"), exist_ok=True)


async def test_storage_mkdir(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def get_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder/sub"
        assert request.query == {"op": "GETFILESTATUS"}
        return web.Response(status=404)

    async def parent_get_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder"
        assert request.query == {"op": "GETFILESTATUS"}
        return web.json_response(
            {
                "FileStatus": {
                    "path": "/user/folder",
                    "type": "DIRECTORY",
                    "length": 1234,
                    "modificationTime": 3456,
                    "permission": "read",
                }
            }
        )

    async def put_handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder/sub"
        assert request.query == {"op": "MKDIRS"}
        return web.Response(status=204)

    app = web.Application()
    app.router.add_get("/storage/user/folder/sub", get_handler)
    app.router.add_get("/storage/user/folder", parent_get_handler)
    app.router.add_put("/storage/user/folder/sub", put_handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        await client.storage.mkdir(URL("storage://~/folder/sub"))


async def test_storage_create(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/file"
        assert request.query == {"op": "CREATE"}
        content = await request.read()
        assert content == b"01234"
        return web.Response(status=201)

    app = web.Application()
    app.router.add_put("/storage/user/file", handler)

    srv = await aiohttp_server(app)

    async def gen() -> AsyncIterator[bytes]:
        for i in range(5):
            yield str(i).encode("ascii")

    async with make_client(srv.make_url("/")) as client:
        await client.storage.create(URL("storage://~/file"), gen())


async def test_storage_stats(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder"
        assert request.query == {"op": "GETFILESTATUS"}
        return web.json_response(
            {
                "FileStatus": {
                    "path": "/user/folder",
                    "type": "DIRECTORY",
                    "length": 1234,
                    "modificationTime": 3456,
                    "permission": "read",
                }
            }
        )

    app = web.Application()
    app.router.add_get("/storage/user/folder", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        stats = await client.storage.stat(URL("storage://~/folder"))
        assert stats == FileStatus(
            path="/user/folder",
            type=FileStatusType.DIRECTORY,
            size=1234,
            modification_time=3456,
            permission=Action.READ,
        )


async def test_storage_open(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.StreamResponse:
        assert request.path == "/storage/user/file"
        if request.query["op"] == "OPEN":
            resp = web.StreamResponse()
            await resp.prepare(request)
            for i in range(5):
                await resp.write(str(i).encode("ascii"))
            return resp
        elif request.query["op"] == "GETFILESTATUS":
            return web.json_response(
                {
                    "FileStatus": {
                        "path": "/user/file",
                        "type": "FILE",
                        "length": 5,
                        "modificationTime": 3456,
                        "permission": "read",
                    }
                }
            )
        else:
            raise AssertionError(f"Unknown operation {request.query['op']}")

    app = web.Application()
    app.router.add_get("/storage/user/file", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        buf = bytearray()
        async for chunk in client.storage.open(URL("storage://~/file")):
            buf.extend(chunk)
        assert buf == b"01234"


async def test_storage_open_directory(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.Response:
        assert request.path == "/storage/user/folder"
        assert request.query == {"op": "GETFILESTATUS"}
        return web.json_response(
            {
                "FileStatus": {
                    "path": "/user/folder",
                    "type": "DIRECTORY",
                    "length": 5,
                    "modificationTime": 3456,
                    "permission": "read",
                }
            }
        )

    app = web.Application()
    app.router.add_get("/storage/user/folder", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        buf = bytearray()
        with pytest.raises((IsADirectoryError, IllegalArgumentError)):
            async for chunk in client.storage.open(URL("storage://~/folder")):
                buf.extend(chunk)
        assert not buf


# test normalizers


# high level API


async def test_storage_upload_file_does_not_exists(make_client: _MakeClient) -> None:
    async with make_client("https://example.com") as client:
        with pytest.raises(FileNotFoundError):
            await client.storage.upload_file(
                URL("file:///not-exists-file"), URL("storage://host/path/to/file.txt")
            )


async def test_storage_upload_dir_doesnt_exist(make_client: _MakeClient) -> None:
    async with make_client("https://example.com") as client:
        with pytest.raises(IsADirectoryError):
            await client.storage.upload_file(
                URL(FOLDER.as_uri()), URL("storage://host/path/to")
            )


async def test_storage_upload_not_a_file(
    storage_server: Any, make_client: _MakeClient, storage_path: Path
) -> None:
    file_path = Path(os.devnull).absolute()
    target_path = storage_path / "file.txt"
    progress = mock.Mock()

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_file(
            URL(file_path.as_uri()), URL("storage:file.txt"), progress=progress
        )

    uploaded = target_path.read_bytes()
    assert uploaded == b""

    src = URL(file_path.as_uri())
    dst = URL("storage://user/file.txt")
    progress.start.assert_called_with(StorageProgressStart(src, dst, 0))
    progress.step.assert_not_called()
    progress.complete.assert_called_with(StorageProgressComplete(src, dst, 0))


async def test_storage_upload_regular_file_to_existing_file_target(
    storage_server: Any, make_client: _MakeClient, storage_path: Path
) -> None:
    file_path = DATA_FOLDER / "file.txt"
    file_size = file_path.stat().st_size
    target_path = storage_path / "file.txt"
    progress = mock.Mock()

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_file(
            URL(file_path.as_uri()), URL("storage:file.txt"), progress=progress
        )

    expected = file_path.read_bytes()
    uploaded = target_path.read_bytes()
    assert uploaded == expected

    src = URL(file_path.as_uri())
    dst = URL("storage://user/file.txt")
    progress.start.assert_called_with(StorageProgressStart(src, dst, file_size))
    progress.step.assert_called_with(
        StorageProgressStep(src, dst, file_size, file_size)
    )
    progress.complete.assert_called_with(StorageProgressComplete(src, dst, file_size))


async def test_storage_upload_regular_file_to_existing_dir(
    storage_server: Any, make_client: _MakeClient, storage_path: Path
) -> None:
    file_path = DATA_FOLDER / "file.txt"
    folder = storage_path / "folder"
    folder.mkdir()

    async with make_client(storage_server.make_url("/")) as client:
        with pytest.raises(IsADirectoryError):
            await client.storage.upload_file(
                URL(file_path.as_uri()), URL("storage:folder")
            )


async def test_storage_upload_regular_file_to_existing_file(
    storage_server: Any, make_client: _MakeClient, storage_path: Path
) -> None:
    file_path = DATA_FOLDER / "file.txt"
    folder = storage_path / "folder"
    folder.mkdir()
    target_path = folder / "file.txt"
    target_path.write_bytes(b"existing file")

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_file(
            URL(file_path.as_uri()), URL("storage:folder/file.txt")
        )

    expected = file_path.read_bytes()
    uploaded = target_path.read_bytes()
    assert uploaded == expected


async def test_storage_upload_regular_file_to_existing_dir_with_trailing_slash(
    storage_server: Any, make_client: _MakeClient, storage_path: Path
) -> None:
    file_path = DATA_FOLDER / "file.txt"
    folder = storage_path / "folder"
    folder.mkdir()

    async with make_client(storage_server.make_url("/")) as client:
        with pytest.raises(IsADirectoryError):
            await client.storage.upload_file(
                URL(file_path.as_uri()), URL("storage:folder/")
            )


async def test_storage_upload_regular_file_to_existing_non_dir(
    storage_server: Any, make_client: _MakeClient, storage_path: Path
) -> None:
    file_path = DATA_FOLDER / "file.txt"
    path = storage_path / "file"
    path.write_bytes(b"dummy")

    async with make_client(storage_server.make_url("/")) as client:
        with pytest.raises(NotADirectoryError):
            await client.storage.upload_file(
                URL(file_path.as_uri()), URL("storage:file/subfile.txt")
            )


async def test_storage_upload_regular_file_to_not_existing(
    storage_server: Any, make_client: _MakeClient
) -> None:
    file_path = DATA_FOLDER / "file.txt"

    async with make_client(storage_server.make_url("/")) as client:
        with pytest.raises(NotADirectoryError):
            await client.storage.upload_file(
                URL(file_path.as_uri()), URL("storage:absent-dir/absent-file.txt")
            )


async def test_storage_upload_recursive_src_doesnt_exist(
    make_client: _MakeClient,
) -> None:
    async with make_client("https://example.com") as client:
        with pytest.raises(FileNotFoundError):
            await client.storage.upload_dir(
                URL("file:does_not_exist"), URL("storage://host/path/to")
            )


async def test_storage_upload_recursive_src_is_a_file(make_client: _MakeClient) -> None:
    file_path = DATA_FOLDER / "file.txt"

    async with make_client("https://example.com") as client:
        with pytest.raises(NotADirectoryError):
            await client.storage.upload_dir(
                URL(file_path.as_uri()), URL("storage://host/path/to")
            )


async def test_storage_upload_recursive_target_is_a_file(
    storage_server: Any, make_client: _MakeClient, storage_path: Path
) -> None:
    target_file = storage_path / "file.txt"
    target_file.write_bytes(b"dummy")

    async with make_client(storage_server.make_url("/")) as client:
        with pytest.raises(NotADirectoryError):
            await client.storage.upload_dir(
                URL(DATA_FOLDER.as_uri()), URL("storage:file.txt")
            )


async def test_storage_upload_empty_dir(
    storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path
) -> None:
    target_dir = storage_path / "folder"
    assert not target_dir.exists()
    src_dir = tmp_path / "empty"
    src_dir.mkdir()
    assert list(src_dir.iterdir()) == []

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_dir(URL(src_dir.as_uri()), URL("storage:folder"))

    assert list(target_dir.iterdir()) == []


async def test_storage_upload_recursive_ok(
    storage_server: Any, make_client: _MakeClient, storage_path: Path
) -> None:
    target_dir = storage_path / "folder"
    target_dir.mkdir()

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_dir(
            URL(DATA_FOLDER.as_uri()) / "nested", URL("storage:folder")
        )
    diff = dircmp(DATA_FOLDER / "nested", target_dir)  # type: ignore
    assert not calc_diff(diff)  # type: ignore


async def test_storage_upload_recursive_slash_ending(
    storage_server: Any, make_client: _MakeClient, storage_path: Path
) -> None:
    target_dir = storage_path / "folder"
    target_dir.mkdir()

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_dir(
            URL(DATA_FOLDER.as_uri()) / "nested", URL("storage:folder/")
        )
    diff = dircmp(DATA_FOLDER / "nested", target_dir)  # type: ignore
    assert not calc_diff(diff)  # type: ignore


async def test_storage_download_regular_file_to_absent_file(
    storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path
) -> None:
    src_file = DATA_FOLDER / "file.txt"
    storage_file = storage_path / "file.txt"
    storage_file.write_bytes(src_file.read_bytes())
    local_dir = tmp_path / "local"
    local_dir.mkdir()
    local_file = local_dir / "file.txt"
    progress = mock.Mock()

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_file(
            URL("storage:file.txt"), URL(local_file.as_uri()), progress=progress
        )

    expected = src_file.read_bytes()
    downloaded = local_file.read_bytes()
    assert downloaded == expected

    src = URL("storage://user/file.txt")
    dst = URL(local_file.as_uri())
    file_size = src_file.stat().st_size
    progress.start.assert_called_with(StorageProgressStart(src, dst, file_size))
    progress.step.assert_called_with(
        StorageProgressStep(src, dst, file_size, file_size)
    )
    progress.complete.assert_called_with(StorageProgressComplete(src, dst, file_size))


async def test_storage_download_regular_file_to_existing_file(
    storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path
) -> None:
    src_file = DATA_FOLDER / "file.txt"
    storage_file = storage_path / "file.txt"
    storage_file.write_bytes(src_file.read_bytes())
    local_dir = tmp_path / "local"
    local_dir.mkdir()
    local_file = local_dir / "file.txt"
    local_file.write_bytes(b"Previous data")

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_file(
            URL("storage:file.txt"), URL(local_file.as_uri())
        )

    expected = src_file.read_bytes()
    downloaded = local_file.read_bytes()
    assert downloaded == expected


async def test_storage_download_regular_file_to_dir(
    storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path
) -> None:
    src_file = DATA_FOLDER / "file.txt"
    storage_file = storage_path / "file.txt"
    storage_file.write_bytes(src_file.read_bytes())
    local_dir = tmp_path / "local"
    local_dir.mkdir()

    async with make_client(storage_server.make_url("/")) as client:
        with pytest.raises((IsADirectoryError, PermissionError)):
            await client.storage.download_file(
                URL("storage:file.txt"), URL(local_dir.as_uri())
            )


async def test_storage_download_regular_file_to_dir_slash_ended(
    storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path
) -> None:
    src_file = DATA_FOLDER / "file.txt"
    storage_file = storage_path / "file.txt"
    storage_file.write_bytes(src_file.read_bytes())
    local_dir = tmp_path / "local"
    local_dir.mkdir()

    async with make_client(storage_server.make_url("/")) as client:
        with pytest.raises((IsADirectoryError, PermissionError)):
            await client.storage.download_file(
                URL("storage:file.txt"), URL(local_dir.as_uri() + "/")
            )


async def test_storage_download_regular_file_to_non_file(
    storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path
) -> None:
    src_file = DATA_FOLDER / "file.txt"
    storage_file = storage_path / "file.txt"
    storage_file.write_bytes(src_file.read_bytes())

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_file(
            URL("storage:file.txt"), URL(Path(os.devnull).absolute().as_uri())
        )


async def test_storage_download_empty_dir(
    storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path
) -> None:
    storage_dir = storage_path / "folder"
    storage_dir.mkdir()
    assert list(storage_dir.iterdir()) == []
    target_dir = tmp_path / "empty"
    assert not target_dir.exists()

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_dir(
            URL("storage:folder"), URL(target_dir.as_uri())
        )

    assert list(target_dir.iterdir()) == []


async def test_storage_download_dir(
    storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path
) -> None:
    storage_dir = storage_path / "folder"
    copytree(DATA_FOLDER / "nested", storage_dir)
    local_dir = tmp_path / "local"
    local_dir.mkdir()
    target_dir = local_dir / "nested"

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_dir(
            URL("storage:folder"), URL(target_dir.as_uri())
        )

    diff = dircmp(DATA_FOLDER / "nested", target_dir)  # type: ignore
    assert not calc_diff(diff)  # type: ignore


async def test_storage_download_dir_slash_ending(
    storage_server: Any, make_client: _MakeClient, tmp_path: Path, storage_path: Path
) -> None:
    storage_dir = storage_path / "folder"
    copytree(DATA_FOLDER / "nested", storage_dir / "nested")
    local_dir = tmp_path / "local"
    local_dir.mkdir()

    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_dir(
            URL("storage:folder"), URL(local_dir.as_uri() + "/")
        )

    diff = dircmp(DATA_FOLDER / "nested", local_dir / "nested")  # type: ignore
    assert not calc_diff(diff)  # type: ignore


@pytest.fixture
def zero_time_threshold(monkeypatch: Any) -> None:
    monkeypatch.setattr(neuromation.api.storage, "TIME_THRESHOLD", 0.0)


async def test_storage_upload_file_update(
    storage_server: Any,
    make_client: _MakeClient,
    tmp_path: Path,
    storage_path: Path,
    zero_time_threshold: None,
) -> None:
    storage_file = storage_path / "file.txt"
    local_file = tmp_path / "file.txt"

    local_file.write_bytes(b"old")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_file(
            URL(local_file.as_uri()), URL("storage:file.txt"), update=True
        )
    assert storage_file.read_bytes() == b"old"

    local_file.write_bytes(b"new")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_file(
            URL(local_file.as_uri()), URL("storage:file.txt"), update=True
        )
    assert storage_file.read_bytes() == b"new"

    await asyncio.sleep(5)
    storage_file.write_bytes(b"xxx")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_file(
            URL(local_file.as_uri()), URL("storage:file.txt"), update=True
        )
    assert storage_file.read_bytes() == b"xxx"


async def test_storage_upload_dir_update(
    storage_server: Any,
    make_client: _MakeClient,
    tmp_path: Path,
    storage_path: Path,
    zero_time_threshold: None,
) -> None:
    storage_file = storage_path / "folder" / "nested" / "file.txt"
    local_dir = tmp_path / "folder"
    local_file = local_dir / "nested" / "file.txt"
    local_file.parent.mkdir(parents=True)

    local_file.write_bytes(b"old")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_dir(
            URL(local_dir.as_uri()), URL("storage:folder"), update=True
        )
    assert storage_file.read_bytes() == b"old"

    local_file.write_bytes(b"new")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_dir(
            URL(local_dir.as_uri()), URL("storage:folder"), update=True
        )
    assert storage_file.read_bytes() == b"new"

    await asyncio.sleep(5)
    storage_file.write_bytes(b"xxx")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.upload_dir(
            URL(local_dir.as_uri()), URL("storage:folder"), update=True
        )
    assert storage_file.read_bytes() == b"xxx"


async def test_storage_download_file_update(
    storage_server: Any,
    make_client: _MakeClient,
    tmp_path: Path,
    storage_path: Path,
    zero_time_threshold: None,
) -> None:
    storage_file = storage_path / "file.txt"
    local_file = tmp_path / "file.txt"

    storage_file.write_bytes(b"old")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_file(
            URL("storage:file.txt"), URL(local_file.as_uri()), update=True
        )
    assert local_file.read_bytes() == b"old"

    storage_file.write_bytes(b"new")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_file(
            URL("storage:file.txt"), URL(local_file.as_uri()), update=True
        )
    assert local_file.read_bytes() == b"new"

    await asyncio.sleep(2)
    local_file.write_bytes(b"xxx")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_file(
            URL("storage:file.txt"), URL(local_file.as_uri()), update=True
        )
    assert local_file.read_bytes() == b"xxx"


async def test_storage_download_dir_update(
    storage_server: Any,
    make_client: _MakeClient,
    tmp_path: Path,
    storage_path: Path,
    zero_time_threshold: None,
) -> None:
    storage_file = storage_path / "folder" / "nested" / "file.txt"
    local_dir = tmp_path / "folder"
    local_file = local_dir / "nested" / "file.txt"
    storage_file.parent.mkdir(parents=True)

    storage_file.write_bytes(b"old")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_dir(
            URL("storage:folder"), URL(local_dir.as_uri()), update=True
        )
    assert local_file.read_bytes() == b"old"

    storage_file.write_bytes(b"new")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_dir(
            URL("storage:folder"), URL(local_dir.as_uri()), update=True
        )
    assert local_file.read_bytes() == b"new"

    await asyncio.sleep(2)
    local_file.write_bytes(b"xxx")
    async with make_client(storage_server.make_url("/")) as client:
        await client.storage.download_dir(
            URL("storage:folder"), URL(local_dir.as_uri()), update=True
        )
    assert local_file.read_bytes() == b"xxx"
from typing import Callable

import pytest
from yarl import URL

from neuromation.api import Client, LocalImage, RemoteImage
from neuromation.api.parsing_utils import _get_url_authority


_MakeClient = Callable[..., Client]


@pytest.mark.parametrize(
    "volume", ["storage:///", ":", "::::", "", "storage:///data/:/data/rest:wrong"]
)
async def test_volume_from_str_fail(volume: str, make_client: _MakeClient) -> None:
    async with make_client("https://example.com") as client:
        with pytest.raises(ValueError):
            client.parse.volume(volume)


async def test_parse_local(make_client: _MakeClient) -> None:
    async with make_client("https://api.localhost.localdomain") as client:
        result = client.parse.local_image("bananas:latest")
    assert result == LocalImage("bananas", "latest")


async def test_parse_remote(make_client: _MakeClient) -> None:
    async with make_client("https://api.localhost.localdomain") as client:
        result = client.parse.remote_image("image://bob/bananas:latest")
    assert result == RemoteImage(
        "bananas", "latest", owner="bob", registry="registry-dev.neu.ro"
    )


async def test_parse_remote_registry_image(make_client: _MakeClient) -> None:
    async with make_client(
        "https://api.localhost.localdomain", registry_url="http://localhost:5000"
    ) as client:
        result = client.parse.remote_image("localhost:5000/bob/library/bananas:latest")
    assert result == RemoteImage(
        "library/bananas", "latest", owner="bob", registry="localhost:5000"
    )


async def test_parse_remote_public(make_client: _MakeClient) -> None:
    async with make_client(
        "https://api.localhost.localdomain", registry_url="http://localhost:5000"
    ) as client:
        result = client.parse.remote_image("ubuntu:latest")
    assert result == RemoteImage("ubuntu", "latest", owner=None, registry=None)


def test_get_url_authority_with_explicit_port() -> None:
    url = URL("https://example.com:8080/")
    assert _get_url_authority(url) == "example.com:8080"


def test_get_url_authority_with_implicit_port() -> None:
    url = URL("https://example.com/")  # here `url.port == 80`
    assert _get_url_authority(url) == "example.com"


def test_get_url_authority_without_port() -> None:
    url = URL("scheme://example.com/")  # here `url.port is None`
    assert _get_url_authority(url) == "example.com"


def test_get_url_authority_without_host() -> None:
    url = URL("scheme://")
    assert _get_url_authority(url) is None
import asyncio
from typing import AsyncIterator, Optional
from unittest import mock

import aiohttp
import pytest
from aiohttp import ClientSession
from aiohttp.test_utils import unused_port
from aiohttp.web import (
    Application,
    HTTPBadRequest,
    HTTPForbidden,
    HTTPFound,
    HTTPOk,
    HTTPUnauthorized,
    Request,
    Response,
    json_response,
)
from yarl import URL

from neuromation.api import Preset
from neuromation.api.login import (
    AuthCode,
    AuthException,
    AuthNegotiator,
    AuthTokenClient,
    HeadlessNegotiator,
    _AuthConfig,
    _AuthToken,
    _ClusterConfig,
    create_app_server,
    create_app_server_once,
    create_auth_code_app,
)
from tests import _TestServerFactory


class TestAuthCode:
    async def test_wait_timed_out(self) -> None:
        code = AuthCode()
        with pytest.raises(AuthException, match="failed to get an authorization code"):
            await code.wait(timeout_s=0.0)

    async def test_wait_cancelled(self) -> None:
        code = AuthCode()
        code.cancel()
        with pytest.raises(AuthException, match="failed to get an authorization code"):
            await code.wait()

    async def test_wait_exception(self) -> None:
        code = AuthCode()
        code.set_exception(AuthException("testerror"))
        with pytest.raises(AuthException, match="testerror"):
            await code.wait()

    async def test_wait(self) -> None:
        code = AuthCode()
        code.set_value("testcode")
        value = await code.wait()
        assert value == "testcode"


class TestAuthToken:
    def test_is_not_expired(self) -> None:
        token = _AuthToken.create(
            token="test_token",
            expires_in=100,
            refresh_token="test_refresh_token",
            time_factory=lambda: 2000.0,
        )
        assert token.token == "test_token"
        assert token.expiration_time == 2075
        assert not token.is_expired
        assert token.refresh_token == "test_refresh_token"

    def test_is_expired(self) -> None:
        token = _AuthToken.create(
            token="test_token",
            expires_in=0,
            refresh_token="test_refresh_token",
            time_factory=lambda: 2000.0,
        )
        assert token.token == "test_token"
        assert token.expiration_time == 2000
        assert token.is_expired
        assert token.refresh_token == "test_refresh_token"


class TestAuthCodeApp:
    @pytest.fixture
    async def client(
        self, loop: asyncio.AbstractEventLoop
    ) -> AsyncIterator[ClientSession]:
        async with ClientSession() as client:
            yield client

    async def assert_code_callback_success(
        self,
        code: AuthCode,
        client: ClientSession,
        url: URL,
        redirect_url: Optional[URL] = None,
    ) -> None:
        async with client.get(
            url, params={"code": "testcode"}, allow_redirects=False
        ) as resp:
            if redirect_url:
                assert resp.status == HTTPFound.status_code
                assert resp.headers["Location"] == str(redirect_url)
            else:
                assert resp.status == HTTPOk.status_code
                text = await resp.text()
                assert text == "OK"

        assert await code.wait() == "testcode"

    async def test_create_app_server_once(self, client: ClientSession) -> None:
        code = AuthCode()
        app = create_auth_code_app(code)

        port = unused_port()
        async with create_app_server_once(app, host="127.0.0.1", port=port) as url:
            assert url == URL(f"http://127.0.0.1:{port}")
            await self.assert_code_callback_success(code, client, url)

    async def test_create_app_server_redirect(self, client: ClientSession) -> None:
        code = AuthCode()
        redirect_url = URL("https://redirect.url")
        app = create_auth_code_app(code, redirect_url=redirect_url)

        port = unused_port()
        async with create_app_server_once(app, host="127.0.0.1", port=port) as url:
            assert url == URL(f"http://127.0.0.1:{port}")
            await self.assert_code_callback_success(
                code, client, url, redirect_url=redirect_url
            )

    async def test_create_app_server_once_failure(self, client: ClientSession) -> None:
        code = AuthCode()
        app = create_auth_code_app(code)

        port = unused_port()
        async with create_app_server_once(app, host="127.0.0.1", port=port) as url:
            assert url == URL(f"http://127.0.0.1:{port}")

            async with client.get(url) as resp:
                assert resp.status == HTTPBadRequest.status_code
                text = await resp.text()
                assert text == "The 'code' query parameter is missing."

            with pytest.raises(
                AuthException, match="failed to get an authorization code"
            ):
                await code.wait()

    async def test_error_unauthorized(self, client: ClientSession) -> None:
        code = AuthCode()
        app = create_auth_code_app(code)

        port = unused_port()
        async with create_app_server_once(app, host="127.0.0.1", port=port) as url:
            assert url == URL(f"http://127.0.0.1:{port}")

            async with client.get(
                url,
                params={
                    "error": "unauthorized",
                    "error_description": "Test Unauthorized",
                },
            ) as resp:
                assert resp.status == HTTPUnauthorized.status_code
                text = await resp.text()
                assert text == "Test Unauthorized"

            with pytest.raises(AuthException, match="Test Unauthorized"):
                await code.wait()

    async def test_error_access_denied(self, client: ClientSession) -> None:
        code = AuthCode()
        app = create_auth_code_app(code)

        port = unused_port()
        async with create_app_server_once(app, host="127.0.0.1", port=port) as url:
            assert url == URL(f"http://127.0.0.1:{port}")

            async with client.get(
                url,
                params={
                    "error": "access_denied",
                    "error_description": "Test Access Denied",
                },
            ) as resp:
                assert resp.status == HTTPForbidden.status_code
                text = await resp.text()
                assert text == "Test Access Denied"

            with pytest.raises(AuthException, match="Test Access Denied"):
                await code.wait()

    async def test_error_other(self, client: ClientSession) -> None:
        code = AuthCode()
        app = create_auth_code_app(code)

        port = unused_port()
        async with create_app_server_once(app, host="127.0.0.1", port=port) as url:
            assert url == URL(f"http://127.0.0.1:{port}")

            async with client.get(
                url, params={"error": "other", "error_description": "Test Other"}
            ) as resp:
                assert resp.status == HTTPBadRequest.status_code
                text = await resp.text()
                assert text == "Test Other"

            with pytest.raises(AuthException, match="Test Other"):
                await code.wait()

    async def test_create_app_server(self, client: ClientSession) -> None:
        code = AuthCode()
        app = create_auth_code_app(code)

        port = unused_port()
        async with create_app_server(app, host="127.0.0.1", ports=[port]) as url:
            assert url == URL(f"http://127.0.0.1:{port}")
            await self.assert_code_callback_success(code, client, url)

    async def test_create_app_server_no_ports(self) -> None:
        code = AuthCode()
        app = create_auth_code_app(code)

        port = unused_port()
        async with create_app_server_once(app, host="127.0.0.1", port=port):
            with pytest.raises(RuntimeError, match="No free ports."):
                async with create_app_server(app, ports=[port]):
                    pass

    async def test_create_app_server_port_conflict(self, client: ClientSession) -> None:
        code = AuthCode()
        app = create_auth_code_app(code)
        outer_port = unused_port()
        inner_port = unused_port()
        async with create_app_server(app, ports=[outer_port, inner_port]) as url:
            assert url == URL(f"http://127.0.0.1:{outer_port}")
            async with create_app_server(app, ports=[outer_port, inner_port]) as url:
                assert url == URL(f"http://127.0.0.1:{inner_port}")
                await self.assert_code_callback_success(code, client, url)


class _TestAuthHandler:
    def __init__(self, client_id: str) -> None:
        self._client_id = client_id

        self._code = "test_code"
        self._token = "test_access_token"
        self._token_refreshed = "test_access_token_refreshed"
        self._refresh_token = "test_refresh_token"
        self._token_expires_in = 1234

    async def handle_authorize(self, request: Request) -> Response:
        # TODO: assert query
        url = URL(request.query["redirect_uri"]).with_query(code=self._code)
        raise HTTPFound(url)

    async def handle_token(self, request: Request) -> Response:
        payload = await request.json()
        grant_type = payload["grant_type"]
        if grant_type == "authorization_code":
            assert payload == dict(
                grant_type="authorization_code",
                code_verifier=mock.ANY,
                code=self._code,
                client_id=self._client_id,
                redirect_uri=mock.ANY,
            )
            resp_payload = dict(
                access_token=self._token,
                expires_in=self._token_expires_in,
                refresh_token=self._refresh_token,
            )
        else:
            assert payload == dict(
                grant_type="refresh_token",
                refresh_token=self._refresh_token,
                client_id=self._client_id,
            )
            resp_payload = dict(
                access_token=self._token_refreshed, expires_in=self._token_expires_in
            )
        return json_response(resp_payload)


@pytest.fixture
def auth_client_id() -> str:
    return "test_client_id"


@pytest.fixture
async def auth_server(
    auth_client_id: str, aiohttp_server: _TestServerFactory
) -> AsyncIterator[URL]:
    handler = _TestAuthHandler(client_id=auth_client_id)
    app = Application()
    app.router.add_get("/authorize", handler.handle_authorize)
    app.router.add_post("/oauth/token", handler.handle_token)
    server = await aiohttp_server(app)
    yield server.make_url("/")


@pytest.fixture
async def auth_config(
    auth_client_id: str, auth_server: URL
) -> AsyncIterator[_AuthConfig]:
    port = unused_port()
    yield _AuthConfig.create(
        auth_url=auth_server / "authorize",
        token_url=auth_server / "oauth/token",
        client_id=auth_client_id,
        audience="https://platform.dev.neuromation.io",
        headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"),
        callback_urls=[URL(f"http://127.0.0.1:{port}")],
    )


class TestTokenClient:
    async def test_request(self, auth_client_id: str, auth_config: _AuthConfig) -> None:
        code = AuthCode()
        code.set_value("test_code")
        code.callback_url = auth_config.callback_urls[0]

        async with aiohttp.ClientSession() as session:
            async with AuthTokenClient(
                session, auth_config.token_url, client_id=auth_client_id
            ) as client:
                token = await client.request(code)
                assert token.token == "test_access_token"
                assert token.refresh_token == "test_refresh_token"
                assert not token.is_expired

    async def test_refresh(self, auth_client_id: str, auth_config: _AuthConfig) -> None:
        token = _AuthToken.create(
            token="test_access_token",
            expires_in=1234,
            refresh_token="test_refresh_token",
        )

        async with aiohttp.ClientSession() as session:
            async with AuthTokenClient(
                session, auth_config.token_url, client_id=auth_client_id
            ) as client:
                new_token = await client.refresh(token)
                assert new_token.token == "test_access_token_refreshed"
                assert new_token.refresh_token == "test_refresh_token"
                assert not token.is_expired

    async def test_forbidden(
        self, aiohttp_server: _TestServerFactory, auth_config: _AuthConfig
    ) -> None:
        code = AuthCode()
        code.callback_url = auth_config.callback_urls[0]
        code.set_value("testcode")

        client_id = "test_client_id"

        async def handle_token(request: Request) -> Response:
            raise HTTPForbidden()

        app = Application()
        app.router.add_post("/oauth/token", handle_token)

        server = await aiohttp_server(app)
        url = server.make_url("/oauth/token")

        async with aiohttp.ClientSession() as session:
            async with AuthTokenClient(session, url, client_id=client_id) as client:
                with pytest.raises(
                    AuthException, match="failed to get an access token."
                ):
                    await client.request(code)

                with pytest.raises(
                    AuthException, match="failed to get an access token."
                ):
                    token = _AuthToken.create(
                        token="test_token",
                        expires_in=1234,
                        refresh_token="test_refresh_token",
                    )
                    await client.refresh(token)


class TestAuthConfig:
    def test_is_initialized__no_auth_url(self) -> None:
        auth_config = _AuthConfig(
            auth_url=URL(),
            token_url=URL("url"),
            client_id="client_id",
            audience="audience",
            headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"),
            callback_urls=(URL("url1"), URL("url2")),
            success_redirect_url=URL("url"),
        )
        assert auth_config.is_initialized() is False

    def test_is_initialized__no_token_url(self) -> None:
        auth_config = _AuthConfig(
            auth_url=URL("url"),
            token_url=URL(),
            client_id="client_id",
            audience="audience",
            headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"),
            callback_urls=(URL("url1"), URL("url2")),
            success_redirect_url=URL("url"),
        )
        assert auth_config.is_initialized() is False

    def test_is_initialized__no_client_id(self) -> None:
        auth_config = _AuthConfig(
            auth_url=URL("url"),
            token_url=URL("url"),
            client_id="",
            audience="audience",
            headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"),
            callback_urls=(URL("url1"), URL("url2")),
            success_redirect_url=URL("url"),
        )
        assert auth_config.is_initialized() is False

    def test_is_initialized__no_audience(self) -> None:
        auth_config = _AuthConfig(
            auth_url=URL("url"),
            token_url=URL("url"),
            client_id="client_id",
            audience="",
            headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"),
            callback_urls=(URL("url1"), URL("url2")),
            success_redirect_url=URL("url"),
        )
        assert auth_config.is_initialized() is False

    def test_is_initialized__no_callback_urls(self) -> None:
        auth_config = _AuthConfig(
            auth_url=URL("url"),
            token_url=URL("url"),
            client_id="client_id",
            audience="audience",
            headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"),
            callback_urls=[],
            success_redirect_url=URL("url"),
        )
        assert auth_config.is_initialized() is True

    def test_is_initialized__no_success_redirect_url(self) -> None:
        auth_config = _AuthConfig(
            auth_url=URL("url"),
            token_url=URL("url"),
            client_id="client_id",
            audience="audience",
            headless_callback_url=URL("https://dev.neu.ro/oauth/show-code"),
            callback_urls=(URL("url1"), URL("url2")),
            success_redirect_url=None,
        )
        assert auth_config.is_initialized() is True

    def test_is_initialized__no_headless_callback_url(self) -> None:
        auth_config = _AuthConfig(
            auth_url=URL("url"),
            token_url=URL("url"),
            client_id="client_id",
            audience="audience",
            headless_callback_url=URL(),
            callback_urls=(URL("url1"), URL("url2")),
            success_redirect_url=None,
        )
        assert auth_config.is_initialized() is False


class TestClusterConfig:
    def test_is_initialized(self) -> None:
        cluster_config = _ClusterConfig.create(
            registry_url=URL("value"),
            storage_url=URL("value"),
            users_url=URL("value"),
            monitoring_url=URL("value"),
            resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)},
        )
        assert cluster_config.is_initialized() is True

    def test_is_initialized__no_registry_url(self) -> None:
        cluster_config = _ClusterConfig.create(
            registry_url=URL(),
            storage_url=URL("value"),
            users_url=URL("value"),
            monitoring_url=URL("value"),
            resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)},
        )
        assert cluster_config.is_initialized() is False

    def test_is_initialized__no_storage_url(self) -> None:
        cluster_config = _ClusterConfig.create(
            registry_url=URL("value"),
            storage_url=URL(),
            users_url=URL("value"),
            monitoring_url=URL("value"),
            resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)},
        )
        assert cluster_config.is_initialized() is False

    def test_is_initialized__no_users_url(self) -> None:
        cluster_config = _ClusterConfig.create(
            registry_url=URL("value"),
            storage_url=URL("value"),
            users_url=URL(),
            monitoring_url=URL("value"),
            resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)},
        )
        assert cluster_config.is_initialized() is False

    def test_is_initialized__no_monitoring_url(self) -> None:
        cluster_config = _ClusterConfig.create(
            registry_url=URL("value"),
            storage_url=URL("value"),
            users_url=URL("value"),
            monitoring_url=URL(),
            resource_presets={"default": Preset(cpu=1, memory_mb=2 * 1024)},
        )
        assert cluster_config.is_initialized() is False

    def test_is_initialized__no_resource_presets(self) -> None:
        cluster_config = _ClusterConfig.create(
            registry_url=URL("value"),
            storage_url=URL("value"),
            users_url=URL("value"),
            monitoring_url=URL("value"),
            resource_presets={},
        )
        assert cluster_config.is_initialized() is False


class TestAuthNegotiator:
    async def show_dummy_browser(self, url: URL) -> None:
        async with ClientSession() as client:
            await client.get(url, allow_redirects=True)

    async def test_get_code(self, auth_config: _AuthConfig) -> None:
        async with aiohttp.ClientSession() as session:
            negotiator = AuthNegotiator(
                session, config=auth_config, show_browser_cb=self.show_dummy_browser
            )
            code = await negotiator.get_code()
            assert await code.wait() == "test_code"
            assert code.callback_url == auth_config.callback_urls[0]

    async def test_get_token(self, auth_config: _AuthConfig) -> None:
        async with aiohttp.ClientSession() as session:
            negotiator = AuthNegotiator(
                session, config=auth_config, show_browser_cb=self.show_dummy_browser
            )
            token = await negotiator.refresh_token(token=None)
            assert token.token == "test_access_token"
            assert token.refresh_token == "test_refresh_token"

    async def test_refresh_token_noop(self, auth_config: _AuthConfig) -> None:
        async with aiohttp.ClientSession() as session:
            negotiator = AuthNegotiator(
                session, config=auth_config, show_browser_cb=self.show_dummy_browser
            )
            token = await negotiator.refresh_token(token=None)
            assert token.token == "test_access_token"
            assert token.refresh_token == "test_refresh_token"
            assert not token.is_expired

            token = await negotiator.refresh_token(token=token)
            assert token.token == "test_access_token"
            assert token.refresh_token == "test_refresh_token"

    async def test_refresh_token(self, auth_config: _AuthConfig) -> None:
        async with aiohttp.ClientSession() as session:
            negotiator = AuthNegotiator(
                session, config=auth_config, show_browser_cb=self.show_dummy_browser
            )
            token = await negotiator.refresh_token(token=None)
            assert token.token == "test_access_token"
            assert token.refresh_token == "test_refresh_token"
            assert not token.is_expired

            token = _AuthToken.create(
                token=token.token, expires_in=0, refresh_token=token.refresh_token
            )
            token = await negotiator.refresh_token(token=token)
            assert token.token == "test_access_token_refreshed"
            assert token.refresh_token == "test_refresh_token"


class TestHeadlessNegotiator:
    async def test_get_code(self, auth_config: _AuthConfig) -> None:
        async def get_auth_code_cb(url: URL) -> str:
            assert url.with_query(None) == auth_config.auth_url

            assert dict(url.query) == dict(
                response_type="code",
                code_challenge=mock.ANY,
                code_challenge_method="S256",
                client_id="test_client_id",
                redirect_uri="https://dev.neu.ro/oauth/show-code",
                scope="offline_access",
                audience="https://platform.dev.neuromation.io",
            )
            return "test_code"

        async with aiohttp.ClientSession() as session:
            negotiator = HeadlessNegotiator(
                session, config=auth_config, get_auth_code_cb=get_auth_code_cb
            )
            code = await negotiator.get_code()
            assert await code.wait() == "test_code"

    async def test_get_code_raises(self, auth_config: _AuthConfig) -> None:
        async def get_auth_code_cb(url: URL) -> str:
            raise RuntimeError("callback error")

        async with aiohttp.ClientSession() as session:
            negotiator = HeadlessNegotiator(
                session, config=auth_config, get_auth_code_cb=get_auth_code_cb
            )
            with pytest.raises(RuntimeError, match="callback error"):
                await negotiator.get_code()
import json
from typing import Any, Callable, Dict, List, Optional

import pytest
from aiodocker.exceptions import DockerError
from aiohttp import web
from yarl import URL

from neuromation.api import (
    Client,
    Container,
    HTTPPort,
    JobStatus,
    JobTelemetry,
    RemoteImage,
    ResourceNotFound,
    Resources,
    Volume,
)
from neuromation.api.jobs import INVALID_IMAGE_NAME, _job_description_from_api
from neuromation.api.parsing_utils import _ImageNameParser
from tests import _TestServerFactory


_MakeClient = Callable[..., Client]


def test_resources_default() -> None:
    resources = Resources(16, 0.5)
    assert resources.memory_mb == 16
    assert resources.cpu == 0.5
    assert resources.gpu is None
    assert resources.gpu_model is None
    assert resources.shm is True
    assert resources.tpu_type is None
    assert resources.tpu_software_version is None


async def test_jobs_monitor(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def log_stream(request: web.Request) -> web.StreamResponse:
        assert request.headers["Accept-Encoding"] == "identity"
        resp = web.StreamResponse()
        resp.enable_chunked_encoding()
        resp.enable_compression(web.ContentCoding.identity)
        await resp.prepare(request)
        for i in range(10):
            await resp.write(b"chunk " + str(i).encode("ascii") + b"\n")
        return resp

    app = web.Application()
    app.router.add_get("/jobs/job-id/log", log_stream)

    srv = await aiohttp_server(app)

    lst = []
    async with make_client(srv.make_url("/")) as client:
        async for data in client.jobs.monitor("job-id"):
            lst.append(data)

    assert b"".join(lst) == b"".join(
        [
            b"chunk 0\n",
            b"chunk 1\n",
            b"chunk 2\n",
            b"chunk 3\n",
            b"chunk 4\n",
            b"chunk 5\n",
            b"chunk 6\n",
            b"chunk 7\n",
            b"chunk 8\n",
            b"chunk 9\n",
        ]
    )


async def test_monitor_notexistent_job(
    aiohttp_server: Any, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.Response:
        raise web.HTTPNotFound()

    app = web.Application()
    app.router.add_get("/jobs/job-id/log", handler)

    srv = await aiohttp_server(app)

    lst = []
    async with make_client(srv.make_url("/")) as client:
        with pytest.raises(ResourceNotFound):
            async for data in client.jobs.monitor("job-id"):
                lst.append(data)
    assert lst == []


async def test_job_top(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    def get_data_chunk(index: int) -> Dict[str, Any]:
        return {
            "cpu": 0.5,
            "memory": 50,
            "timestamp": index,
            "gpu_duty_cycle": 50,
            "gpu_memory": 55.6,
        }

    def get_job_telemetry(index: int) -> JobTelemetry:
        return JobTelemetry(
            cpu=0.5, memory=50, timestamp=index, gpu_duty_cycle=50, gpu_memory=55.6
        )

    async def top_stream(request: web.Request) -> web.WebSocketResponse:
        ws = web.WebSocketResponse()
        await ws.prepare(request)

        for i in range(10):
            await ws.send_json(get_data_chunk(i))

        await ws.close()
        return ws

    app = web.Application()
    app.router.add_get("/jobs/job-id/top", top_stream)

    srv = await aiohttp_server(app)

    lst = []
    async with make_client(srv.make_url("/")) as client:
        async for data in client.jobs.top("job-id"):
            lst.append(data)

    assert lst == [get_job_telemetry(i) for i in range(10)]


async def test_top_finished_job(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.WebSocketResponse:
        ws = web.WebSocketResponse()
        await ws.prepare(request)

        await ws.close()
        return ws

    app = web.Application()
    app.router.add_get("/jobs/job-id/top", handler)

    srv = await aiohttp_server(app)

    lst = []
    async with make_client(srv.make_url("/")) as client:
        with pytest.raises(ValueError, match="not running"):
            async for data in client.jobs.top("job-id"):
                lst.append(data)
    assert lst == []


async def test_top_nonexisting_job(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.Response:
        raise web.HTTPBadRequest()

    app = web.Application()
    app.router.add_get("/jobs/job-id/top", handler)

    srv = await aiohttp_server(app)

    lst = []
    async with make_client(srv.make_url("/")) as client:
        with pytest.raises(ValueError, match="not found"):
            async for data in client.jobs.top("job-id"):
                lst.append(data)
    assert lst == []


async def test_kill_not_found_error(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.Response:
        raise web.HTTPNotFound()

    app = web.Application()
    app.router.add_delete("/jobs/job-id", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        with pytest.raises(ResourceNotFound):
            await client.jobs.kill("job-id")


async def test_kill_ok(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    async def handler(request: web.Request) -> web.Response:
        raise web.HTTPNoContent()

    app = web.Application()
    app.router.add_delete("/jobs/job-id", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.kill("job-id")

    assert ret is None


async def test_save_image_not_in_neuro_registry(make_client: _MakeClient) -> None:
    async with make_client("http://whatever") as client:
        image = RemoteImage(name="ubuntu")
        with pytest.raises(ValueError, match="must be in the neuromation registry"):
            await client.jobs.save("job-id", image)


async def test_save_ok(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = [
        {
            "status": "CommitStarted",
            "details": {"container": "cont_id", "image": f"ubuntu:latest"},
        },
        {"status": "CommitFinished"},
        {"status": "The push refers to repository [localhost:5000/alpine]"},
        {"status": "Preparing", "progressDetail": {}, "id": "a31dbd3063d7"},
        {
            "status": "Pushing",
            "progressDetail": {"current": 3584},
            "progress": " 3.584kB",
            "id": "0acd017a4b67",
        },
        {"status": "Pushed", "progressDetail": {}, "id": "0acd017a4b67"},
        {"status": "job-id: digest: sha256:DIGEST size: 1359"},
        {
            "progressDetail": {},
            "aux": {"Tag": "job-id", "Digest": "sha256:DIGEST", "Size": 1359},
        },
    ]

    async def handler(request: web.Request) -> web.StreamResponse:
        assert "b3" in request.headers
        encoding = "utf-8"
        response = web.StreamResponse(status=200)
        response.enable_compression(web.ContentCoding.identity)
        response.content_type = "application/x-ndjson"
        response.charset = encoding
        await response.prepare(request)
        for chunk in JSON:
            chunk_str = json.dumps(chunk) + "\r\n"
            await response.write(chunk_str.encode(encoding))
        return response

    app = web.Application()
    app.router.add_post("/jobs/job-id/save", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        image = RemoteImage(registry="gcr.io", owner="me", name="img")
        await client.jobs.save("job-id", image)


async def test_save_commit_started_invalid_status_fails(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    invalid = "invalid status"
    JSON = [
        {"status": invalid, "details": {"container": "cnt", "image": "img"}},
        {"status": "CommitFinished"},
        {"status": "The push refers to repository [localhost:5000/alpine]"},
    ]

    async def handler(request: web.Request) -> web.StreamResponse:
        assert "b3" in request.headers
        encoding = "utf-8"
        response = web.StreamResponse(status=200)
        response.enable_compression(web.ContentCoding.identity)
        response.content_type = "application/x-ndjson"
        response.charset = encoding
        await response.prepare(request)
        for chunk in JSON:
            chunk_str = json.dumps(chunk) + "\r\n"
            await response.write(chunk_str.encode(encoding))
        return response

    app = web.Application()
    app.router.add_post("/jobs/job-id/save", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        image = RemoteImage(registry="gcr.io", owner="me", name="img")
        with pytest.raises(
            DockerError,
            match=f"Invalid commit status: '{invalid}', expecting: 'CommitStarted'",
        ):
            await client.jobs.save("job-id", image)


async def test_save_commit_started_missing_image_details_fails(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = [
        {"status": "CommitStarted", "details": {"container": "cnt"}},
        {"status": "CommitFinished"},
        {"status": "The push refers to repository [localhost:5000/alpine]"},
    ]

    async def handler(request: web.Request) -> web.StreamResponse:
        assert "b3" in request.headers
        encoding = "utf-8"
        response = web.StreamResponse(status=200)
        response.enable_compression(web.ContentCoding.identity)
        response.content_type = "application/x-ndjson"
        response.charset = encoding
        await response.prepare(request)
        for chunk in JSON:
            chunk_str = json.dumps(chunk) + "\r\n"
            await response.write(chunk_str.encode(encoding))
        return response

    app = web.Application()
    app.router.add_post("/jobs/job-id/save", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        image = RemoteImage(registry="gcr.io", owner="me", name="img")
        with pytest.raises(DockerError, match="Missing required details: 'image'"):
            await client.jobs.save("job-id", image)


async def test_save_commit_finished_invalid_status_fails(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    invalid = "invalid status"
    JSON = [
        {"status": "CommitStarted", "details": {"container": "cnt", "image": "img"}},
        {"status": invalid},
        {"status": "The push refers to repository [localhost:5000/alpine]"},
    ]

    async def handler(request: web.Request) -> web.StreamResponse:
        encoding = "utf-8"
        response = web.StreamResponse(status=200)
        response.enable_compression(web.ContentCoding.identity)
        response.content_type = "application/x-ndjson"
        response.charset = encoding
        await response.prepare(request)
        for chunk in JSON:
            chunk_str = json.dumps(chunk) + "\r\n"
            await response.write(chunk_str.encode(encoding))
        return response

    app = web.Application()
    app.router.add_post("/jobs/job-id/save", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        image = RemoteImage(registry="gcr.io", owner="me", name="img")
        with pytest.raises(
            DockerError,
            match=(f"Invalid commit status: '{invalid}', expecting: 'CommitFinished'"),
        ):
            await client.jobs.save("job-id", image)


async def test_save_commit_started_missing_status_fails(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = [
        {"not-a-status": "value"},
        {"status": "CommitFinished"},
        {"status": "The push refers to repository [localhost:5000/alpine]"},
    ]

    async def handler(request: web.Request) -> web.StreamResponse:
        encoding = "utf-8"
        response = web.StreamResponse(status=200)
        response.enable_compression(web.ContentCoding.identity)
        response.content_type = "application/x-ndjson"
        response.charset = encoding
        await response.prepare(request)
        for chunk in JSON:
            chunk_str = json.dumps(chunk) + "\r\n"
            await response.write(chunk_str.encode(encoding))
        return response

    app = web.Application()
    app.router.add_post("/jobs/job-id/save", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        image = RemoteImage(registry="gcr.io", owner="me", name="img")
        with pytest.raises(DockerError, match='Missing required field: "status"'):
            await client.jobs.save("job-id", image)


async def test_save_commit_finished_missing_status_fails(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = [
        {"status": "CommitStarted", "details": {"container": "cnt", "image": "img"}},
        {"not-a-status": "value"},
        {"status": "The push refers to repository [localhost:5000/alpine]"},
    ]

    async def handler(request: web.Request) -> web.StreamResponse:
        encoding = "utf-8"
        response = web.StreamResponse(status=200)
        response.enable_compression(web.ContentCoding.identity)
        response.content_type = "application/x-ndjson"
        response.charset = encoding
        await response.prepare(request)
        for chunk in JSON:
            chunk_str = json.dumps(chunk) + "\r\n"
            await response.write(chunk_str.encode(encoding))
        return response

    app = web.Application()
    app.router.add_post("/jobs/job-id/save", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        image = RemoteImage(registry="gcr.io", owner="me", name="img")
        with pytest.raises(DockerError, match='Missing required field: "status"'):
            await client.jobs.save("job-id", image)


async def test_status_failed(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "status": "failed",
        "id": "job-id",
        "description": "This is job description, not a history description",
        "http_url": "http://my_host:8889",
        "ssh_server": "ssh://my_host.ssh:22",
        "ssh_auth_server": "ssh://my_host.ssh:22",
        "history": {
            "created_at": "2018-08-29T12:23:13.981621+00:00",
            "started_at": "2018-08-29T12:23:15.988054+00:00",
            "finished_at": "2018-08-29T12:59:31.427795+00:00",
            "reason": "ContainerCannotRun",
            "description": "Not enough coffee",
        },
        "is_preemptible": True,
        "owner": "owner",
        "cluster_name": "default",
        "container": {
            "image": "submit-image-name",
            "command": "submit-command",
            "http": {"port": 8181},
            "resources": {
                "memory_mb": "4096",
                "cpu": 7.0,
                "shm": True,
                "gpu": 1,
                "gpu_model": "test-gpu-model",
            },
            "volumes": [
                {
                    "src_storage_uri": "storage://test-user/path_read_only",
                    "dst_path": "/container/read_only",
                    "read_only": True,
                },
                {
                    "src_storage_uri": "storage://test-user/path_read_write",
                    "dst_path": "/container/path_read_write",
                    "read_only": False,
                },
            ],
        },
    }

    async def handler(request: web.Request) -> web.Response:
        return web.json_response(JSON)

    app = web.Application()
    app.router.add_get("/jobs/job-id", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.status("job-id")

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    assert ret == _job_description_from_api(JSON, parser)


async def test_status_with_ssh_and_http(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "status": "running",
        "id": "job-id",
        "description": "This is job description, not a history description",
        "http_url": "http://my_host:8889",
        "ssh_server": "ssh://my_host.ssh:22",
        "ssh_auth_server": "ssh://my_host.ssh:22",
        "history": {
            "created_at": "2018-08-29T12:23:13.981621+00:00",
            "started_at": "2018-08-29T12:23:15.988054+00:00",
            "finished_at": "2018-08-29T12:59:31.427795+00:00",
            "reason": "OK",
            "description": "Everything is fine",
        },
        "is_preemptible": True,
        "owner": "owner",
        "cluster_name": "default",
        "container": {
            "image": "submit-image-name",
            "command": "submit-command",
            "http": {"port": 8181},
            "resources": {
                "memory_mb": "4096",
                "cpu": 7.0,
                "shm": True,
                "gpu": 1,
                "gpu_model": "test-gpu-model",
            },
            "volumes": [
                {
                    "src_storage_uri": "storage://test-user/path_read_only",
                    "dst_path": "/container/read_only",
                    "read_only": True,
                },
                {
                    "src_storage_uri": "storage://test-user/path_read_write",
                    "dst_path": "/container/path_read_write",
                    "read_only": False,
                },
            ],
        },
    }

    async def handler(request: web.Request) -> web.Response:
        return web.json_response(JSON)

    app = web.Application()
    app.router.add_get("/jobs/job-id", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.status("job-id")

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    assert ret == _job_description_from_api(JSON, parser)


async def test_status_with_tpu(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "status": "running",
        "id": "job-id",
        "description": "This is job description, not a history description",
        "http_url": "http://my_host:8889",
        "ssh_server": "ssh://my_host.ssh:22",
        "ssh_auth_server": "ssh://my_host.ssh:22",
        "history": {
            "created_at": "2018-08-29T12:23:13.981621+00:00",
            "started_at": "2018-08-29T12:23:15.988054+00:00",
            "finished_at": "2018-08-29T12:59:31.427795+00:00",
            "reason": "OK",
            "description": "Everything is fine",
        },
        "is_preemptible": True,
        "owner": "owner",
        "cluster_name": "default",
        "container": {
            "image": "submit-image-name",
            "command": "submit-command",
            "http": {"port": 8181},
            "resources": {
                "memory_mb": "4096",
                "cpu": 7.0,
                "shm": True,
                "gpu": 1,
                "gpu_model": "test-gpu-model",
                "tpu": {"type": "v3-8", "software_version": "1.14"},
            },
            "volumes": [
                {
                    "src_storage_uri": "storage://test-user/path_read_only",
                    "dst_path": "/container/read_only",
                    "read_only": True,
                },
                {
                    "src_storage_uri": "storage://test-user/path_read_write",
                    "dst_path": "/container/path_read_write",
                    "read_only": False,
                },
            ],
        },
    }

    async def handler(request: web.Request) -> web.Response:
        return web.json_response(JSON)

    app = web.Application()
    app.router.add_get("/jobs/job-id", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.status("job-id")

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    assert ret == _job_description_from_api(JSON, parser)
    assert ret.container.resources.tpu_type == "v3-8"
    assert ret.container.resources.tpu_software_version == "1.14"


async def test_job_run(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f",
        "status": "failed",
        "history": {
            "status": "failed",
            "reason": "Error",
            "description": "Mounted on Avail\\n/dev/shm     " "64M\\n\\nExit code: 1",
            "created_at": "2018-09-25T12:28:21.298672+00:00",
            "started_at": "2018-09-25T12:28:59.759433+00:00",
            "finished_at": "2018-09-25T12:28:59.759433+00:00",
        },
        "owner": "owner",
        "cluster_name": "default",
        "container": {
            "image": "gcr.io/light-reality-205619/ubuntu:latest",
            "command": "date",
            "resources": {
                "cpu": 1.0,
                "memory_mb": 16384,
                "gpu": 1,
                "shm": False,
                "gpu_model": "nvidia-tesla-p4",
            },
        },
        "http_url": "http://my_host:8889",
        "ssh_server": "ssh://my_host.ssh:22",
        "ssh_auth_server": "ssh://my_host.ssh:22",
        "is_preemptible": False,
    }

    async def handler(request: web.Request) -> web.Response:
        data = await request.json()
        assert data == {
            "container": {
                "image": "submit-image-name",
                "command": "submit-command",
                "http": {"port": 8181, "requires_auth": True},
                "resources": {
                    "memory_mb": 16384,
                    "cpu": 7.0,
                    "shm": True,
                    "gpu": 1,
                    "gpu_model": "test-gpu-model",
                },
                "volumes": [
                    {
                        "src_storage_uri": "storage://test-user/path_read_only",
                        "dst_path": "/container/read_only",
                        "read_only": True,
                    },
                    {
                        "src_storage_uri": "storage://test-user/path_read_write",
                        "dst_path": "/container/path_read_write",
                        "read_only": False,
                    },
                ],
            },
            "is_preemptible": False,
        }

        return web.json_response(JSON)

    app = web.Application()
    app.router.add_post("/jobs", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None)
        volumes: List[Volume] = [
            Volume(
                URL("storage://test-user/path_read_only"), "/container/read_only", True
            ),
            Volume(
                URL("storage://test-user/path_read_write"),
                "/container/path_read_write",
                False,
            ),
        ]
        container = Container(
            image=RemoteImage("submit-image-name"),
            command="submit-command",
            resources=resources,
            volumes=volumes,
            http=HTTPPort(8181),
        )
        ret = await client.jobs.run(container=container, is_preemptible=False)

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    assert ret == _job_description_from_api(JSON, parser)


async def test_job_run_with_name_and_description(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f",
        "name": "test-job-name",
        "description": "job description",
        "status": "failed",
        "history": {
            "status": "failed",
            "reason": "Error",
            "description": "Mounted on Avail\\n/dev/shm     " "64M\\n\\nExit code: 1",
            "created_at": "2018-09-25T12:28:21.298672+00:00",
            "started_at": "2018-09-25T12:28:59.759433+00:00",
            "finished_at": "2018-09-25T12:28:59.759433+00:00",
        },
        "owner": "owner",
        "cluster_name": "default",
        "container": {
            "image": "gcr.io/light-reality-205619/ubuntu:latest",
            "command": "date",
            "resources": {
                "cpu": 1.0,
                "memory_mb": 16384,
                "gpu": 1,
                "shm": False,
                "gpu_model": "nvidia-tesla-p4",
            },
        },
        "http_url": "http://my_host:8889",
        "ssh_server": "ssh://my_host.ssh:22",
        "ssh_auth_server": "ssh://my_host.ssh:22",
        "is_preemptible": False,
    }

    async def handler(request: web.Request) -> web.Response:
        data = await request.json()
        assert data == {
            "container": {
                "image": "submit-image-name",
                "command": "submit-command",
                "http": {"port": 8181, "requires_auth": True},
                "resources": {
                    "memory_mb": 16384,
                    "cpu": 7.0,
                    "shm": True,
                    "gpu": 1,
                    "gpu_model": "test-gpu-model",
                },
                "volumes": [
                    {
                        "src_storage_uri": "storage://test-user/path_read_only",
                        "dst_path": "/container/read_only",
                        "read_only": True,
                    },
                    {
                        "src_storage_uri": "storage://test-user/path_read_write",
                        "dst_path": "/container/path_read_write",
                        "read_only": False,
                    },
                ],
            },
            "is_preemptible": False,
            "name": "test-job-name",
            "description": "job description",
        }

        return web.json_response(JSON)

    app = web.Application()
    app.router.add_post("/jobs", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None)
        volumes: List[Volume] = [
            Volume(
                URL("storage://test-user/path_read_only"), "/container/read_only", True
            ),
            Volume(
                URL("storage://test-user/path_read_write"),
                "/container/path_read_write",
                False,
            ),
        ]
        container = Container(
            image=RemoteImage("submit-image-name"),
            command="submit-command",
            resources=resources,
            volumes=volumes,
            http=HTTPPort(8181),
        )
        ret = await client.jobs.run(
            container,
            is_preemptible=False,
            name="test-job-name",
            description="job description",
        )

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    assert ret == _job_description_from_api(JSON, parser)


async def test_job_run_no_volumes(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f",
        "name": "test-job-name",
        "status": "failed",
        "history": {
            "status": "failed",
            "reason": "Error",
            "description": "Mounted on Avail\\n/dev/shm     " "64M\\n\\nExit code: 1",
            "created_at": "2018-09-25T12:28:21.298672+00:00",
            "started_at": "2018-09-25T12:28:59.759433+00:00",
            "finished_at": "2018-09-25T12:28:59.759433+00:00",
        },
        "owner": "owner",
        "cluster_name": "default",
        "container": {
            "image": "gcr.io/light-reality-205619/ubuntu:latest",
            "command": "date",
            "resources": {
                "cpu": 7,
                "memory_mb": 16384,
                "gpu": 1,
                "shm": False,
                "gpu_model": "nvidia-tesla-p4",
            },
        },
        "http_url": "http://my_host:8889",
        "ssh_server": "ssh://my_host.ssh:22",
        "ssh_auth_server": "ssh://my_host.ssh:22",
        "is_preemptible": False,
    }

    async def handler(request: web.Request) -> web.Response:
        data = await request.json()
        assert data == {
            "container": {
                "image": "submit-image-name",
                "command": "submit-command",
                "http": {"port": 8181, "requires_auth": True},
                "resources": {
                    "memory_mb": 16384,
                    "cpu": 7,
                    "shm": True,
                    "gpu": 1,
                    "gpu_model": "test-gpu-model",
                },
            },
            "is_preemptible": False,
            "name": "test-job-name",
            "description": "job description",
        }

        return web.json_response(JSON)

    app = web.Application()
    app.router.add_post("/jobs", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None)
        container = Container(
            image=RemoteImage("submit-image-name"),
            command="submit-command",
            resources=resources,
            http=HTTPPort(8181),
        )
        ret = await client.jobs.run(
            container,
            is_preemptible=False,
            name="test-job-name",
            description="job description",
        )

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    assert ret == _job_description_from_api(JSON, parser)


async def test_job_run_preemptible(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f",
        "name": "test-job-name",
        "status": "failed",
        "history": {
            "status": "failed",
            "reason": "Error",
            "description": "Mounted on Avail\\n/dev/shm     " "64M\\n\\nExit code: 1",
            "created_at": "2018-09-25T12:28:21.298672+00:00",
            "started_at": "2018-09-25T12:28:59.759433+00:00",
            "finished_at": "2018-09-25T12:28:59.759433+00:00",
        },
        "owner": "owner",
        "cluster_name": "default",
        "container": {
            "image": "gcr.io/light-reality-205619/ubuntu:latest",
            "command": "date",
            "resources": {
                "cpu": 1.0,
                "memory_mb": 16384,
                "gpu": 1,
                "shm": False,
                "gpu_model": "nvidia-tesla-p4",
            },
        },
        "is_preemptible": True,
        "http_url": "http://my_host:8889",
        "ssh_server": "ssh://my_host.ssh:22",
        "ssh_auth_server": "ssh://my_host.ssh:22",
    }

    async def handler(request: web.Request) -> web.Response:
        data = await request.json()
        assert data == {
            "container": {
                "image": "submit-image-name",
                "command": "submit-command",
                "http": {"port": 8181, "requires_auth": True},
                "resources": {
                    "memory_mb": 16384,
                    "cpu": 7.0,
                    "shm": True,
                    "gpu": 1,
                    "gpu_model": "test-gpu-model",
                },
                "volumes": [
                    {
                        "src_storage_uri": "storage://test-user/path_read_only",
                        "dst_path": "/container/read_only",
                        "read_only": True,
                    },
                    {
                        "src_storage_uri": "storage://test-user/path_read_write",
                        "dst_path": "/container/path_read_write",
                        "read_only": False,
                    },
                ],
            },
            "is_preemptible": True,
            "name": "test-job-name",
            "description": "job description",
        }

        return web.json_response(JSON)

    app = web.Application()
    app.router.add_post("/jobs", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None)
        volumes: List[Volume] = [
            Volume(
                URL("storage://test-user/path_read_only"), "/container/read_only", True
            ),
            Volume(
                URL("storage://test-user/path_read_write"),
                "/container/path_read_write",
                False,
            ),
        ]
        container = Container(
            image=RemoteImage("submit-image-name"),
            command="submit-command",
            resources=resources,
            volumes=volumes,
            http=HTTPPort(8181),
        )
        ret = await client.jobs.run(
            container,
            is_preemptible=True,
            name="test-job-name",
            description="job description",
        )

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    assert ret == _job_description_from_api(JSON, parser)


async def test_job_run_schedule_timeout(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f",
        "status": "failed",
        "history": {
            "status": "failed",
            "reason": "Error",
            "description": "Mounted on Avail\\n/dev/shm     " "64M\\n\\nExit code: 1",
            "created_at": "2018-09-25T12:28:21.298672+00:00",
            "started_at": "2018-09-25T12:28:59.759433+00:00",
            "finished_at": "2018-09-25T12:28:59.759433+00:00",
        },
        "owner": "owner",
        "cluster_name": "default",
        "container": {
            "image": "gcr.io/light-reality-205619/ubuntu:latest",
            "command": "date",
            "resources": {
                "cpu": 1.0,
                "memory_mb": 16384,
                "gpu": 1,
                "shm": False,
                "gpu_model": "nvidia-tesla-p4",
            },
        },
        "http_url": "http://my_host:8889",
        "ssh_server": "ssh://my_host.ssh:22",
        "ssh_auth_server": "ssh://my_host.ssh:22",
        "is_preemptible": False,
        "schedule_timeout": 5,
    }

    async def handler(request: web.Request) -> web.Response:
        data = await request.json()
        assert data == {
            "container": {
                "image": "submit-image-name",
                "command": "submit-command",
                "resources": {
                    "memory_mb": 16384,
                    "cpu": 7,
                    "shm": True,
                    "gpu": 1,
                    "gpu_model": "test-gpu-model",
                },
            },
            "is_preemptible": False,
            "schedule_timeout": 5,
        }

        return web.json_response(JSON)

    app = web.Application()
    app.router.add_post("/jobs", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None)
        container = Container(
            image=RemoteImage("submit-image-name"),
            command="submit-command",
            resources=resources,
        )
        ret = await client.jobs.run(container=container, schedule_timeout=5)

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    assert ret == _job_description_from_api(JSON, parser)


async def test_job_run_tpu(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    JSON = {
        "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f",
        "status": "failed",
        "history": {
            "status": "failed",
            "reason": "Error",
            "description": "Mounted on Avail\\n/dev/shm     " "64M\\n\\nExit code: 1",
            "created_at": "2018-09-25T12:28:21.298672+00:00",
            "started_at": "2018-09-25T12:28:59.759433+00:00",
            "finished_at": "2018-09-25T12:28:59.759433+00:00",
        },
        "owner": "owner",
        "cluster_name": "default",
        "container": {
            "image": "gcr.io/light-reality-205619/ubuntu:latest",
            "command": "date",
            "resources": {
                "cpu": 1.0,
                "memory_mb": 16384,
                "gpu": 1,
                "shm": False,
                "gpu_model": "nvidia-tesla-p4",
                "tpu": {"type": "v3-8", "software_version": "1.14"},
            },
        },
        "http_url": "http://my_host:8889",
        "ssh_server": "ssh://my_host.ssh:22",
        "ssh_auth_server": "ssh://my_host.ssh:22",
        "is_preemptible": False,
    }

    async def handler(request: web.Request) -> web.Response:
        data = await request.json()
        assert data == {
            "container": {
                "image": "submit-image-name",
                "command": "submit-command",
                "resources": {
                    "memory_mb": 16384,
                    "cpu": 7,
                    "shm": True,
                    "gpu": 1,
                    "gpu_model": "test-gpu-model",
                    "tpu": {"type": "v3-8", "software_version": "1.14"},
                },
            },
            "is_preemptible": False,
            "schedule_timeout": 5,
        }

        return web.json_response(JSON)

    app = web.Application()
    app.router.add_post("/jobs", handler)

    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        resources = Resources(16384, 7, 1, "test-gpu-model", True, "v3-8", "1.14")
        container = Container(
            image=RemoteImage("submit-image-name"),
            command="submit-command",
            resources=resources,
        )
        ret = await client.jobs.run(container=container, schedule_timeout=5)

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    assert ret == _job_description_from_api(JSON, parser)
    assert ret.container.resources.tpu_type == "v3-8"
    assert ret.container.resources.tpu_software_version == "1.14"


def create_job_response(
    id: str,
    status: str,
    owner: str = "owner",
    name: Optional[str] = None,
    image: str = "submit-image-name",
) -> Dict[str, Any]:
    result = {
        "id": id,
        "status": status,
        "history": {
            "status": "failed",
            "reason": "Error",
            "description": "Mounted on Avail\\n/dev/shm     " "64M\\n\\nExit code: 1",
            "created_at": "2018-09-25T12:28:21.298672+00:00",
            "started_at": "2018-09-25T12:28:59.759433+00:00",
            "finished_at": "2018-09-25T12:28:59.759433+00:00",
        },
        "ssh_auth_server": "ssh://my_host.ssh:22",
        "container": {
            "image": image,
            "command": "submit-command",
            "resources": {
                "cpu": 1.0,
                "memory_mb": 16384,
                "gpu": 1,
                "gpu_model": "nvidia-tesla-v100",
            },
        },
        "is_preemptible": True,
        "owner": owner,
        "cluster_name": "default",
    }
    if name:
        result["name"] = name
    return result


async def test_list_no_filter(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    jobs = [
        create_job_response("job-id-1", "pending", name="job-name-1"),
        create_job_response("job-id-2", "running", name="job-name-1"),
        create_job_response("job-id-3", "succeeded", name="job-name-1"),
        create_job_response("job-id-4", "failed", name="job-name-1"),
    ]
    JSON = {"jobs": jobs}

    async def handler(request: web.Request) -> web.Response:
        return web.json_response(JSON)

    app = web.Application()
    app.router.add_get("/jobs", handler)
    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.list()

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    job_descriptions = [_job_description_from_api(job, parser) for job in jobs]
    assert ret == job_descriptions


async def test_list_filter_by_name(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    name_1 = "job-name-1"
    name_2 = "job-name-2"
    jobs = [
        create_job_response("job-id-1", "pending", name=name_1),
        create_job_response("job-id-2", "succeeded", name=name_1),
        create_job_response("job-id-3", "failed", name=name_1),
        create_job_response("job-id-4", "running", name=name_2),
        create_job_response("job-id-5", "succeeded", name=name_2),
        create_job_response("job-id-6", "failed", name=name_2),
        create_job_response("job-id-7", "running"),
        create_job_response("job-id-8", "pending"),
        create_job_response("job-id-9", "succeeded"),
        create_job_response("job-id-10", "failed"),
    ]

    async def handler(request: web.Request) -> web.Response:
        name = request.query.get("name")
        assert name
        filtered_jobs = [job for job in jobs if job.get("name") == name]
        JSON = {"jobs": filtered_jobs}
        return web.json_response(JSON)

    app = web.Application()
    app.router.add_get("/jobs", handler)
    srv = await aiohttp_server(app)

    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.list(name=name_1)

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    job_descriptions = [_job_description_from_api(job, parser) for job in jobs]
    assert ret == job_descriptions[:3]


async def test_list_filter_by_statuses(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    name_1 = "job-name-1"
    name_2 = "job-name-2"
    jobs = [
        create_job_response("job-id-1", "pending", name=name_1),
        create_job_response("job-id-2", "succeeded", name=name_1),
        create_job_response("job-id-3", "failed", name=name_1),
        create_job_response("job-id-4", "running", name=name_2),
        create_job_response("job-id-5", "succeeded", name=name_2),
        create_job_response("job-id-6", "failed", name=name_2),
        create_job_response("job-id-7", "running"),
        create_job_response("job-id-8", "pending"),
        create_job_response("job-id-9", "succeeded"),
        create_job_response("job-id-10", "failed"),
    ]

    async def handler(request: web.Request) -> web.Response:
        statuses = request.query.getall("status")
        assert statuses
        filtered_jobs = [job for job in jobs if job["status"] in statuses]
        JSON = {"jobs": filtered_jobs}
        return web.json_response(JSON)

    app = web.Application()
    app.router.add_get("/jobs", handler)
    srv = await aiohttp_server(app)

    statuses = {JobStatus.FAILED, JobStatus.SUCCEEDED}
    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.list(statuses=statuses)

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    job_descriptions = [_job_description_from_api(job, parser) for job in jobs]
    assert ret == [job for job in job_descriptions if job.status in statuses]


async def test_list_incorrect_image(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    jobs = [
        create_job_response("job-id-1", "running"),
        create_job_response("job-id-2", "pending", image="some.com/path/:tag"),
        create_job_response(
            "job-id-3", "failed", image="registry-dev.neu.ro/path/:tag"
        ),
        create_job_response("job-id-4", "failed", image=""),
        create_job_response("job-id-5", "failed", image=":"),
    ]

    async def handler(request: web.Request) -> web.Response:

        JSON = {"jobs": jobs}
        return web.json_response(JSON)

    app = web.Application()
    app.router.add_get("/jobs", handler)
    srv = await aiohttp_server(app)

    statuses = {JobStatus.FAILED, JobStatus.SUCCEEDED}
    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.list(statuses=statuses)
    for job in ret:
        if job.status == JobStatus.FAILED:
            assert job.container.image.name == INVALID_IMAGE_NAME
        else:
            assert job.container.image.name != INVALID_IMAGE_NAME


class TestVolumeParsing:
    @pytest.mark.parametrize(
        "volume_param", ["dir", "storage://dir", "storage://dir:/var/www:rw:ro"]
    )
    async def test_incorrect_params_count(
        self, volume_param: str, make_client: _MakeClient
    ) -> None:
        async with make_client("https://example.com") as client:
            with pytest.raises(ValueError, match=r"Invalid volume specification"):
                client.parse.volume(volume_param)

    @pytest.mark.parametrize(
        "volume_param", ["storage://dir:/var/www:write", "storage://dir:/var/www:"]
    )
    async def test_incorrect_mode(
        self, volume_param: str, make_client: _MakeClient
    ) -> None:
        async with make_client("https://example.com") as client:
            with pytest.raises(ValueError, match=r"Wrong ReadWrite/ReadOnly mode spec"):
                client.parse.volume(volume_param)

    @pytest.mark.parametrize(
        "volume_param,volume",
        [
            (
                "storage://user/dir:/var/www",
                Volume(
                    storage_uri=URL("storage://user/dir"),
                    container_path="/var/www",
                    read_only=False,
                ),
            ),
            (
                "storage://user/dir:/var/www:rw",
                Volume(
                    storage_uri=URL("storage://user/dir"),
                    container_path="/var/www",
                    read_only=False,
                ),
            ),
            (
                "storage://user:/var/www:ro",
                Volume(
                    storage_uri=URL("storage://user"),
                    container_path="/var/www",
                    read_only=True,
                ),
            ),
            (
                "storage://~/:/var/www:ro",
                Volume(
                    storage_uri=URL("storage://user"),
                    container_path="/var/www",
                    read_only=True,
                ),
            ),
            (
                "storage:dir:/var/www:ro",
                Volume(
                    storage_uri=URL("storage://user/dir"),
                    container_path="/var/www",
                    read_only=True,
                ),
            ),
            (
                "storage::/var/www:ro",
                Volume(
                    storage_uri=URL("storage://user"),
                    container_path="/var/www",
                    read_only=True,
                ),
            ),
        ],
    )
    async def test_positive(
        self, volume_param: str, volume: Volume, make_client: _MakeClient
    ) -> None:
        async with make_client("https://example.com") as client:
            assert client.parse.volume(volume_param) == volume


async def test_list_filter_by_name_and_statuses(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    name_1 = "job-name-1"
    name_2 = "job-name-2"
    jobs = [
        create_job_response("job-id-1", "pending", name=name_1),
        create_job_response("job-id-2", "succeeded", name=name_1),
        create_job_response("job-id-3", "failed", name=name_1),
        create_job_response("job-id-4", "running", name=name_2),
        create_job_response("job-id-5", "succeeded", name=name_2),
        create_job_response("job-id-6", "failed", name=name_2),
        create_job_response("job-id-7", "running"),
        create_job_response("job-id-8", "pending"),
        create_job_response("job-id-9", "succeeded"),
        create_job_response("job-id-10", "failed"),
    ]

    async def handler(request: web.Request) -> web.Response:
        statuses = request.query.getall("status")
        assert statuses
        name = request.query.get("name")
        assert name
        filtered_jobs = [
            job for job in jobs if job["status"] in statuses and job.get("name") == name
        ]
        JSON = {"jobs": filtered_jobs}
        return web.json_response(JSON)

    app = web.Application()
    app.router.add_get("/jobs", handler)
    srv = await aiohttp_server(app)

    statuses = {JobStatus.PENDING, JobStatus.SUCCEEDED}
    name = "job-name-1"
    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.list(statuses=statuses, name=name)

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    job_descriptions = [_job_description_from_api(job, parser) for job in jobs]
    assert ret == job_descriptions[:2]


async def test_list_filter_by_name_and_statuses_and_owners(
    aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
    name_1 = "job-name-1"
    name_2 = "job-name-2"
    owner_1 = "owner-1"
    owner_2 = "owner-2"
    jobs = [
        create_job_response("job-id-1", "running", name=name_1, owner=owner_1),
        create_job_response("job-id-2", "running", name=name_1, owner=owner_2),
        create_job_response("job-id-3", "running", name=name_2, owner=owner_1),
        create_job_response("job-id-4", "running", name=name_2, owner=owner_2),
        create_job_response("job-id-5", "succeeded", name=name_1, owner=owner_1),
        create_job_response("job-id-6", "succeeded", name=name_1, owner=owner_2),
        create_job_response("job-id-7", "succeeded", name=name_2, owner=owner_1),
        create_job_response("job-id-8", "succeeded", name=name_2, owner=owner_2),
    ]

    async def handler(request: web.Request) -> web.Response:
        statuses = request.query.getall("status")
        name = request.query.get("name")
        owners = request.query.getall("owner")
        filtered_jobs = [
            job
            for job in jobs
            if job["status"] in statuses
            and job.get("name") == name
            and job.get("owner") in owners
        ]
        return web.json_response({"jobs": filtered_jobs})

    app = web.Application()
    app.router.add_get("/jobs", handler)
    srv = await aiohttp_server(app)

    statuses = {JobStatus.RUNNING}
    name = name_1
    owners = {owner_1, owner_2}
    async with make_client(srv.make_url("/")) as client:
        ret = await client.jobs.list(statuses=statuses, name=name, owners=owners)

    parser = _ImageNameParser(
        client._config.auth_token.username, client._config.cluster_config.registry_url
    )
    job_descriptions = [_job_description_from_api(job, parser) for job in jobs]
    assert ret == job_descriptions[:2]

Unutmayın her geri bildirim bizi daha ileriye taşıyacaktır!