新加的内容

# Copyright 2019-present Lenovo
# Confidential and Proprietary

import json

from py.path import local
from pytest import mark
from rest_framework.status import HTTP_200_OK


@mark.django_db
def test_template_job_for_kube(
        client, mocker, settings, user
):
    settings.LICO_OPERATOR = 'kube'
    from antilles.common.utils import encode_base64url
    uuid = 'c12c00ec9d4c11e9ac32000c29fde54a'
    ingress_ctrl_addr = mocker.patch(
        "antilles.kubernetes.utils.get_user_context",
        return_value={"ingress_ctrl_addr": "http://127.0.0.1:8000"}
    )
    jobfile_mock = mocker.patch(
        'antilles.scheduler_ext.models.Template.generate_jobfile',
        return_value={
            "job_name": "pv2_test1",
            "jobfilecontent": "123",
            "job_log_file": "",
            "job_workspace": str(local(user.workspace).join('test')),
            "internal_name": uuid
        }
    )
    mock_create_job = mocker.patch(
        'antilles.scheduler.job_proxy.JobManagerProxy.request_create_job',
        return_value={'id': 102, "ports": {"12": 50000}}
    )
    import base64
    password = '123456'
    mocker.patch(
        'antilles.devtools.utils.passwd',
        return_value=password
    )

    data = {
        "job_queue": "compute",
        "job_workspace": 'test',
        "language": "py2",
        "password": base64.b64encode("123456"),
        "job_name": "pv2_test1",
        "image_path": "/home/hpcadmin/new_image_628/jupyter-cpu.image",
        "nodes": 1,
        "conda_env": "test"
    }
    response = client.post(
        '/submit/',
        data=json.dumps(data),
        content_type="application/json",
    )
    assert response.status_code == HTTP_200_OK
    ingress_ctrl_addr.assert_called_once()
    jobfile_mock.assert_called_once_with(
        user,
        dict(
            data,
            password=password,
            job_workspace=str(local(user.workspace).join('test')),
            conda_env=str(local(user.workspace).join('test')),
            host='lico.lenovo.com',
            base_url=encode_base64url('127.0.0.1:8000'),
            container_port=6006,
            command='/opt/start'
        )
    )

    mock_create_job.assert_called_once_with(
        workspace=str(local(user.workspace).join('test')),
        jobname=data['job_name'],
        jobfilecontent='123',
        type="jupyter",
        json_body=mocker.ANY,
        output_file='',
        submitter=user
    )


@mark.django_db
def test_jupyter_list_for_kube(
        client, settings, user, jupyter, mocker
):
    settings.LICO_OPERATOR = 'kube'
    ingress_ctrl_addr = mocker.patch(
        "antilles.kubernetes.utils.get_user_context",
        return_value={"ingress_ctrl_addr": "http://127.0.0.1:8000"}
    )
    from antilles.devtools.models import Devtools
    jupyter1 = Devtools.objects.create(
        user=user,
        password='123456',
        port=80001,
        language='pv2',
        uuid='88e20b0e9be011e994be000c29fde54a',
        job=2
    )
    jupyter2 = Devtools.objects.create(
        user=user,
        password='123456',
        port=80003,
        language='pv2',
        uuid='88e20b0e9be011e994be000c29fde54a',
        job=3
    )
    mock_job_query = mocker.patch(
        'antilles.scheduler.job_proxy.JobObjectsQuery.filter_with_commit',
        return_value=[
            {
                "status": "complete",
                "exechosts": "localhost*1",
                "cpuscount": 1,
                "jobname": "test",
                "gpuscount": 2,
                "starttime": 1561611663,
                "endtime": 1561611665,
                "id": jupyter.job,
                "json_body": "test",
                "submiter": "test",
                "priority": "test"
            },
            {
                "status": "running",
                "exechosts": "localhost*1",
                "cpuscount": 1,
                "jobname": "test",
                "gpuscount": 2,
                "starttime": 1561611663,
                "endtime": 1561611665,
                "id": jupyter1.job,
                "json_body": "test",
                "submiter": "test",
                "priority": "test"
            },
            {
                "status": "running",
                "exechosts": "localhost*1",
                "cpuscount": 1,
                "jobname": "test",
                "gpuscount": 2,
                "starttime": 1561611663,
                "endtime": 1561611665,
                "id": jupyter2.job,
                "json_body": "test",
                "submiter": "test",
                "priority": "test"
            }
        ]
    )
    response = client.get(
        '/'
    )
    import operator
    expect_list = sorted(['uuid', 'endtime', 'create_time', 'id', 'status',
                          'exechosts', 'gpuscount', 'jobname', 'language',
                          'port', 'jupyter_id', 'job', 'cpuscount'])
    assert response.status_code == HTTP_200_OK
    assert operator.eq(sorted(list(response.data['data'][0].keys())),
                       expect_list)
    assert operator.eq(sorted(list(response.data['data'][1].keys())),
                       expect_list)
    assert operator.eq(sorted(list(response.data['data'][2].keys())),
                       expect_list)
    assert response.data['data'][0]['status'] == "running"
    assert response.data['data'][1]['status'] == "running"
    assert response.data['data'][0]['id'] in [jupyter1.job, jupyter2.job]
    assert response.data['data'][1]['id'] in [jupyter1.job, jupyter2.job]
    assert response.data['data'][2]['status'] == "complete"
    assert response.data['data'][2]['id'] == jupyter.job
    mock_job_query.assert_called_once()
    ingress_ctrl_addr.assert_called()


@mark.django_db
def test_jupyter_rerun_for_kube(
        client, settings, user, mocker, jupyter
):
    settings.LICO_OPERATOR = 'kube'
    uuid = 'c12c00ec9d4c11e9ac32000c29fde54a'
    jobfile_mock = mocker.patch(
        'antilles.scheduler_ext.models.Template.generate_jobfile',
        return_value={
            "internal_name": uuid,
            "job_workspace": str(local(user.workspace).join('test')),
            "job_name": "pv2_test1",
            "jobfilecontent": "123",
            "job_log_file": "123"
        }

    )
    mock_create_job = mocker.patch(
        'antilles.scheduler.job_proxy.JobManagerProxy.request_create_job',
        return_value={'id': 100, "ports": {"13": 50000}}
    )
    mock_job_query = mocker.patch(
        'antilles.scheduler.job_proxy.JobObjectsQuery.get_with_commit',
        return_value={
            "json_body": json.dumps(
                {
                    "parameters": {
                        "job_queue": "compute",
                        "job_workspace": str(local(user.workspace).
                                             join('test')),
                        "py_version": "py2",
                        "password": "123456",
                        "job_name": "pv2_test1",
                        "image_path": "/home/hpcadmin/jupyter-cpu.image",
                        "nodes": 1,
                        "conda_env": str(local(user.workspace).join('test')),
                        "host": "lico.lenovo.com",
                        "base_url": '127.0.0.1:8000',
                        "container_port": 6006,
                        "command": "/opt/start"
                    }

                }
            )
        }
    )
    response = client.put(
        '/1/'
    )
    from antilles.devtools.models import Devtools
    jupyter1 = Devtools.objects.get(id=1)
    jobfile_mock.assert_called_once()
    mock_create_job.assert_called_once()
    mock_job_query.assert_called_once()
    assert response.status_code == HTTP_200_OK
    assert jupyter1.job == 100
    import operator
    assert jupyter1.uuid == operator.concat('u', uuid)
    jobfile_mock.assert_called_once_with(
        user,
        {
            "job_queue": "compute",
            "job_workspace": str(local(user.workspace).join('test')),
            "py_version": "py2",
            "password": "123456",
            "job_name": "pv2_test1",
            "image_path": "/home/hpcadmin/jupyter-cpu.image",
            "nodes": 1,
            "conda_env": str(local(user.workspace).join('test')),
            "host": "lico.lenovo.com",
            "base_url": '127.0.0.1:8000',
            "container_port": 6006,
            "command": "/opt/start"
        }
    )
    mock_create_job.assert_called_once_with(
        workspace=str(local(user.workspace).join('test')),
        jobname="pv2_test1",
        jobfilecontent='123',
        type="jupyter",
        json_body=mocker.ANY,
        output_file='123',
        submitter=user
    )


@mark.django_db
def test_jupyter_delete(
    client, settings, user, mocker, jupyter
):
    settings.LICO_OPERATOR = 'kube'
    mock_job_query = mocker.patch(
        'antilles.scheduler.job_proxy.JobObjectsQuery.get_with_commit',
        return_value={
            "json_body": json.dumps(
                {
                    "parameters": {
                        "job_queue": "compute",
                        "job_workspace": 'test',
                        "py_version": "py2",
                        "password": "123456",
                        "job_name": "pv2_test1",
                        "image_path": "/home/hpcadmin/jupyter-cpu.image",
                        "nodes": 1,
                        "conda_env": 'test'
                    }

                }
            )
        }
    )
    operator = mocker.patch(
        'antilles.devtools.devtools.get_file_operator'
    ).return_value
    operator.path_exists.return_value = True
    operator.path_isdir.return_value = True

    response = client.delete(
        '/1/',
        data=json.dumps({"delete_completely": True}),
        content_type="application/json"
    )
    from antilles.devtools.models import Devtools
    jupyter1 = Devtools.objects.filter(id=1)
    mock_job_query.assert_called_once()
    operator.mkdir.assert_called_once_with(
        str(local(user.workspace).join('test'))
    )
    operator.rmtree.assert_called_with(
        str(local(user.workspace).join('test'))
    )
    assert response.status_code == HTTP_200_OK
    assert jupyter1.exists() is False
    assert operator.rmtree.call_count == 1
    assert operator.mkdir.call_count == 1
    assert operator.chown.call_count == 1
k8s_ut
# Copyright 2019-present Lenovo
# Confidential and Proprietary

import json
from os import path

from django.db import transaction
from rest_framework.response import Response
from rest_framework.views import APIView

from antilles.common.operator import get_operator
from antilles.devtools.models import Devtools
from antilles.scheduler.job_proxy import JobObjectsQuery


class JupyetrSubmit(APIView):
    def post(self, request):
        from copy import deepcopy
        import operator
        base_parameters = deepcopy(request.data)

        if operator.truth(base_parameters['job_workspace']):
            base_parameters['job_workspace'] = path.join(
                request.user.workspace,
                request.data['job_workspace']
            )

        if operator.truth(base_parameters.get('conda_env', '')):
            base_parameters['conda_env'] = path.join(
                request.user.workspace,
                request.data['conda_env']
            )

        from antilles.devtools.utils import passwd
        import base64
        base_parameters['password'] = passwd(
            base64.b64decode(base_parameters['password'])
        )
        jupyter_operator = get_operator().jupyter_operator(request.user)
        ret = jupyter_operator.create_jupyter_job(base_parameters)
        return Response(ret)


class JupyterList(APIView):
    def get(self, request):
        import calendar
        from pandas import DataFrame, merge

        jupyte_frame = DataFrame(
            {
                "language": jupyter.language,
                "create_time":
                    calendar.timegm(jupyter.create_time.timetuple()),
                "port": jupyter.port,
                "id": jupyter.job,
                "job": jupyter.job,
                "jupyter_id": jupyter.id,
                "uuid": jupyter.uuid
            }
            for jupyter in Devtools.objects.filter(user=request.user).iterator()
        )
        if len(jupyte_frame) == 0:
            return Response({"data": []})

        job_frame = DataFrame(JobObjectsQuery().filter_with_commit(
            id__in=jupyte_frame['job'].tolist(),
            submiter=request.user.username
        ))[["id", "status", "gpuscount", "endtime", "exechosts", "jobname",
            "cpuscount"]]

        res = merge(jupyte_frame, job_frame, on='id')
        jupyter_operator = get_operator().jupyter_operator(request.user)
        res['exechosts'] = res.apply(
            jupyter_operator.queue_host, axis=1
            )
        res_running = res[res['status'].isin(['running'])]
        res.drop(index=list(res_running.index), inplace=True)
        return Response(
            {"data": res_running.append(res).to_dict(orient='records')}
        )


class JupyterDetail(APIView):
    @staticmethod
    def _image_jupyter(user, pk):
        return Devtools.objects.filter(user=user).get(job=pk)

    def put(self, request, pk):
        with transaction.atomic():
            jupyter = self._image_jupyter(request.user, pk)
            job_info = JobObjectsQuery().get_with_commit(id=pk)
            parameters = json.loads(job_info['json_body'])['parameters']
            jupyter_operator = get_operator().jupyter_operator(request.user)
            ret = jupyter_operator.rerun_jupyter(parameters, jupyter)
            print ret
        return Response(ret)

    def delete(self, request, pk):
        confirm = request.data.get('delete_completely', False)

        jupyter = self._image_jupyter(request.user, pk)
        if confirm:
            job_info = JobObjectsQuery().get_with_commit(id=pk)
            parameters = json.loads(job_info['json_body'])['parameters']
            jupyter_operator = get_operator().jupyter_operator(request.user)
            jupyter_operator.delete_jupyter(
                parameters, jupyter
            )
        return Response()
views.py
# Copyright 2019-present Lenovo
# Confidential and Proprietary

import json
import operator
from os import path

from six import raise_from

from antilles.common.operator import get_file_operator
from antilles.devtools.models import Devtools
from antilles.scheduler.job_proxy import JobManagerProxy
from antilles.scheduler_ext.exceptions import JobTemplateNotExist
from antilles.scheduler_ext.models import Template


class BaseDevtools(object):
    @staticmethod
    def request_create_job(user, base_parameters, processed_param):
        ret = JobManagerProxy.request_create_job(
            workspace=processed_param["job_workspace"],
            jobname=processed_param['job_name'],
            jobfilecontent=processed_param['jobfilecontent'],
            type="jupyter",
            json_body=json.dumps(dict(parameters=base_parameters,
                                      template_id="jupyter")),
            output_file=processed_param.get('job_log_file', ''),
            submitter=user
        )
        return ret

    def submit_all_job(self, user, base_parameter):
        try:
            template = Template.objects.filter(enable=True).get(
                code='jupyter'
            )
            processed_param = template.generate_jobfile(
                user,
                base_parameter
            )

        except Template.DoesNotExist as e:
            raise_from(JobTemplateNotExist, e)

        ret = self.request_create_job(
            user,
            base_parameter,
            processed_param
        )
        return processed_param, ret


class HostDevtools(BaseDevtools):
    def __init__(self, user):
        self.user = user

    def create_jupyter_job(self, base_parameters):
        import uuid
        env_uuid = uuid.uuid1().get_hex()
        base_parameters.update(env_uuid=env_uuid)
        processed_param, ret = self.submit_all_job(self.user, base_parameters)

        Devtools.objects.create(
            user=self.user,
            password=base_parameters['password'],
            port=list(ret['ports'].values())[0],
            language=base_parameters['language'],
            job=ret['id'],
            uuid=env_uuid
        )
        return ret

    def rerun_jupyter(self, base_parameters, jupyter):
        processed_param, ret = self.submit_all_job(self.user, base_parameters)
        ret.update(job=ret['id'], port=list(ret['ports'].values())[0])
        [
            setattr(jupyter, key, valuse)
            for key, valuse in ret.items() if key != 'id'
        ]
        jupyter.save()
        return ret

    @staticmethod
    def queue_host(item):
        from antilles.common.utils import encode_base64url
        if item['status'] == "running":
            import socket
            return encode_base64url(
                "{}:{}".format(
                    socket.gethostbyname(item['exechosts'].split("*")[0]),
                    item['port']
                )
            )
        else:
            return ''

    def delete_jupyter(self, parameters, jupyter):
        file_operator = get_file_operator(self.user)
        if operator.truth(parameters.get('conda_env', '')):
            conda_env = path.join(
                self.user.workspace,
                parameters['conda_env']
            )
            if file_operator.path_isdir(conda_env):
                file_operator.rmtree(conda_env)
                file_operator.mkdir(conda_env)
                file_operator.chown(
                    conda_env,
                    self.user.uid,
                    self.user.gid
                )

        run_dir = path.join(
            self.user.workspace,
            parameters['job_workspace'],
            '.' + jupyter.uuid + '_run'
        )
        if file_operator.path_isdir(run_dir):
            file_operator.rmtree(run_dir)
        jupyter.delete()


class KubeDevtools(BaseDevtools):
    def __init__(self, user):
        self.user = user

    def create_jupyter_job(self, base_parameters):
        base_url, port = self.get_ingress_url(self.user)
        base_parameters.update(
            host='lico.lenovo.com',
            base_url=base_url,
            container_port=6006,
            command='/opt/start'
        )
        processed_param, ret = self.submit_all_job(self.user, base_parameters)

        Devtools.objects.create(
            user=self.user,
            password=base_parameters['password'],
            port=int(port),
            language=base_parameters['language'],
            job=ret['id'],
            uuid=operator.concat('u', processed_param['internal_name'])
        )
        return ret

    @staticmethod
    def get_ingress_url(user):
        from antilles.common.utils import encode_base64url
        from six.moves.urllib_parse import urlparse
        import urllib
        from antilles.kubernetes.utils import get_user_context
        ingress_addr = get_user_context(user)['ingress_ctrl_addr']
        netloc = urlparse(ingress_addr).netloc
        host, port = urllib.splitport(netloc)
        port = 80 if port is None else port

        return encode_base64url(netloc), port

    def queue_host(self, _):
        url, _ = self.get_ingress_url(self.user)
        return url

    def rerun_jupyter(self, base_parameters, jupyter):
        processed_param, ret = self.submit_all_job(self.user, base_parameters)
        jupyter.job = ret['id']
        jupyter.uuid = operator.concat('u', processed_param['internal_name'])
        jupyter.save()
        return ret

    def delete_jupyter(self, parameters, jupyter):
        file_operator = get_file_operator(self.user)
        if operator.truth(parameters.get('conda_env', '')):
            conda_env = path.join(
                self.user.workspace,
                parameters['conda_env']
            )
            if file_operator.path_isdir(conda_env):
                file_operator.rmtree(conda_env)
                file_operator.mkdir(conda_env)
                file_operator.chown(
                    conda_env,
                    self.user.uid,
                    self.user.gid
                )
        jupyter.delete()
devtools.py

猜你喜欢

转载自www.cnblogs.com/crazymagic/p/11257326.html