#!/usr/bin/python3 -u
#
# SPDX-FileCopyrightText: 2016-2026 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
#

import argparse
import grp
import os
import shutil
import sys
import traceback

from ldap import LDAPError
from ldap.dn import str2dn
from ldap.filter import escape_filter_chars, filter_format

import univention.admin.uldap
import univention.config_registry
import univention.debug as ud
import univention.uldap
from ucsschool.lib.models.school import School
from ucsschool.lib.schoolldap import SchoolSearchBase
from univention.admin.uexceptions import noObject
from univention.lib.umc import Client, ConnectionError, HTTPError

LOGFILE = "/var/log/univention/ucs-school-exam-cleanup.log"


class ExamCleanupHelper(object):
    groupmod = None

    def __init__(self, options):
        self.options = options
        self.ucr = univention.config_registry.ConfigRegistry()
        self.ucr.load()
        self.hostname = f"{self.ucr['hostname']}.{self.ucr['domainname']}"
        self.umcp = self.get_UMCP_connection()
        self.lo, self.po = self.get_LDAP_connection()
        self.exam_prefix = self.ucr.get("ucsschool/ldap/default/userprefix/exam", "exam-")
        self.DIR_ROOMS = "/var/cache/ucs-school-umc-computerroom"
        self.DIR_EXAMS = self.ucr.get("ucsschool/exam/cache", "/var/lib/ucs-school-umc-schoolexam")

    def get_LDAP_connection(self, admin=False):
        ud.debug(ud.MAIN, ud.INFO, "Opening LDAP connection")
        try:
            if admin:
                return univention.admin.uldap.getAdminConnection()
            else:
                return univention.admin.uldap.getMachineConnection(ldap_master=False)
        except (IOError, LDAPError) as exc:
            ud.debug(ud.MAIN, ud.ERROR, "Could not connect to LDAP: %s" % exc)
            sys.exit(1)

    def get_UMCP_connection(self):
        ud.debug(ud.MAIN, ud.INFO, f"Opening UMCP connection to {self.hostname} with machine account")
        try:
            client = Client(self.hostname)
            client.authenticate_with_machine_account()
        except (ConnectionError, HTTPError) as exc:
            ud.debug(ud.MAIN, ud.ERROR, f"Could not connect to UMC on {self.hostname}: {exc}")
            sys.exit(1)
        return client

    def running_exam_iter(self):
        ou_list = self.lo.search(filter="(objectClass=ucsschoolOrganizationalUnit)")
        for _ou_dn, ou_attrs in ou_list:
            ou_name = ou_attrs.get("ou")[0].decode("UTF-8")
            try:
                room_list = self.umcp.umc_command("computerroom/rooms", {"school": ou_name}).result
            except (ConnectionError, HTTPError) as exc:
                ud.debug(ud.MAIN, ud.ERROR, f"Cannot get room list for OU {ou_name!r}:\n{exc}")
                continue
            for room in room_list:
                ud.debug(ud.MAIN, ud.INFO, f"{ou_name}: {room!r}")
                if room.get("exam"):
                    ud.debug(ud.MAIN, ud.INFO, "Running exam found")
                    yield room

    def stop_running_exams(self):
        if self.options.skip_stop_running_exams:
            ud.debug(ud.MAIN, ud.INFO, "Skipping shutdown of running exams as requested")
            return

        for room in self.running_exam_iter():
            ud.debug(
                ud.MAIN,
                ud.PROCESS,
                f"Stopping exam {room.get('exam')!r} in room {room.get('label')!r} ({room.get('id')!r})",
            )
            if self.options.dryrun:
                ud.debug(ud.MAIN, ud.PROCESS, "dry-run: skipping stop of exam")
                continue

            try:
                result = self.umcp.umc_command(
                    "schoolexam/exam/finish", {"exam": room.get("exam"), "room": room.get("id")}
                ).result
                ud.debug(ud.MAIN, ud.INFO, f"result of schoolexam/exam/finish: {result!r}")
            except (ConnectionError, HTTPError) as exc:
                ud.debug(
                    ud.MAIN,
                    ud.ERROR,
                    f"Cannot stop exam {room.get('exam')!r} in room {room.get('id')!r}:\n{exc}",
                )
                continue

    def restore_original_user(self, dn):
        user_uid = str2dn(dn)[0][0][1].replace(self.exam_prefix, "", 1)
        mod_user = univention.udm.UDM(self.lo, 1).get("users/user")
        search_result = list(mod_user.search(filter_format("uid=%s", [user_uid])))
        if len(search_result) == 1:
            try:
                orig_udm = search_result[0]
                new_value = [ws.lstrip("$") for ws in orig_udm.props.sambaUserWorkstations]
                orig_udm.props.sambaUserWorkstations = [ws for ws in new_value if ws]
                orig_udm.props.disabled = False
                orig_udm.save()
                ud.debug(
                    ud.MAIN, ud.PROCESS, f"Original user access has been restored for {orig_udm!r}."
                )
            except univention.admin.uexceptions.noObject:
                ud.debug(ud.MAIN, ud.ERROR, f"Exam student {dn[len(self.exam_prefix) :]!r} not found.")
        elif len(search_result) == 0:
            ud.debug(
                ud.MAIN,
                ud.ERROR,
                f"Exam student {dn[len(self.exam_prefix) :]!r} not found.\n{traceback.format_exc()}",
            )

    def remove_exam_user(self):
        if self.ucr.get("server/role") not in ("domaincontroller_master", "domaincontroller_backup"):
            ud.debug(ud.MAIN, ud.INFO, "Skipping remove_exam_user: system/role does not match")
            return
        if self.options.skip_exam_user_removal:
            ud.debug(ud.MAIN, ud.INFO, "Skipping removal of exam user as requested")
            return

        try:
            lo, position = self.get_LDAP_connection(admin=True)

            univention.admin.modules.update()
            mod_user = univention.admin.modules.get("users/user")
            univention.admin.modules.init(lo, position, mod_user)

            ou_list = self.lo.search(filter="(objectClass=ucsschoolOrganizationalUnit)")
            for ou_dn, ou_attrs in ou_list:
                ou_name = ou_attrs["ou"][0].decode("UTF-8")
                searchbase = SchoolSearchBase([ou_name], dn=ou_dn)
                try:
                    userlist = mod_user.lookup(
                        {},
                        lo,
                        "uid=%s*" % (escape_filter_chars(self.exam_prefix),),
                        base=searchbase.examUsers,
                    )
                except noObject:
                    # no exam users container in this OU
                    continue

                for user in userlist:
                    ud.debug(ud.MAIN, ud.PROCESS, "Removing exam user %r" % (user.dn,))
                    if self.options.dryrun:
                        ud.debug(ud.MAIN, ud.PROCESS, "dry-run: skipping removal")
                        continue
                    self.restore_original_user(user.dn)
                    user.open()
                    user.remove()
        except Exception:
            ud.debug(
                ud.MAIN,
                ud.ERROR,
                f"Removing remaining exam user failed with traceback:\n{traceback.format_exc()}",
            )

    @classmethod
    def get_udm_group(cls, group_dn, lo, po):
        if not cls.groupmod:
            univention.admin.modules.update()
            cls.groupmod = univention.admin.modules.get("groups/group")
            univention.admin.modules.init(lo, po, cls.groupmod)
        group = cls.groupmod.object(None, lo, po, group_dn)
        group.open()
        return group

    def remove_exam_computers(self):
        if self.ucr.get("server/role") not in ("domaincontroller_master", "domaincontroller_backup"):
            ud.debug(ud.MAIN, ud.INFO, "Skipping remove_exam_computers: system/role does not match")
            return
        if self.options.skip_exam_computer_removal:
            ud.debug(ud.MAIN, ud.INFO, "Skipping removal of exam computers as requested")
            return

        lo, po = self.get_LDAP_connection(admin=True)
        try:
            for school in School.get_all(lo):
                search_base = School.get_search_base(school.name)
                exam_group_dn = search_base.examGroup
                try:
                    exam_group = self.get_udm_group(exam_group_dn, lo, po)
                except noObject:
                    ud.debug(ud.MAIN, ud.WARN, f"No exam group found in school {school.name!r}.")
                    continue
                if exam_group["hosts"]:
                    ud.debug(
                        ud.MAIN,
                        ud.PROCESS,
                        f"Removing computers from exam group {exam_group['name']!r}: "
                        f"{exam_group['hosts']!r}",
                    )
                    exam_group["hosts"] = []
                    exam_group.modify()
        except Exception:
            ud.debug(
                ud.MAIN,
                ud.ERROR,
                f"Removing remaining exam computers failed with traceback:\n{traceback.format_exc()}",
            )

    def remove_ucr_debris(self):
        if self.options.skip_remove_ucr_debris:
            ud.debug(ud.MAIN, ud.INFO, "Skipping UCR cleanup as requested")
            return

        remove_list = []
        for key in self.ucr.keys():
            if key in (
                "samba/othershares/hosts/deny",
                "samba/othershares/hosts/none",
                "samba/printmode/hosts/none",
                "samba/printmode/hosts/all",
                "cups/printmode/hosts/none",
                "cups/printmode/hosts/all",
            ):
                remove_list.append(key)
            elif key.startswith("proxy/filter/room/") and key.endswith(("/ip", "/rule")):
                remove_list.append(key)
            elif key.startswith("samba/share/") and key.endswith("/hosts/deny"):
                remove_list.append(key)
            elif key.startswith("samba/sharemode/room/"):
                remove_list.append(key)
            elif key.startswith("samba/printmode/room/"):
                remove_list.append(key)
            elif key.startswith("proxy/filter/setting-user/"):
                remove_list.append(key)
        if remove_list:
            ud.debug(
                ud.MAIN,
                ud.PROCESS,
                "Removing following UCR variables:\n- %s" % ("\n- ".join(remove_list),),
            )
            if self.options.dryrun:
                ud.debug(ud.MAIN, ud.PROCESS, "dry-run: leaving UCR variables untouched")
            else:
                univention.config_registry.handler_unset(remove_list)

    def remove_exam_file_debris(self):
        if self.options.skip_remove_exam_file_debris:
            ud.debug(ud.MAIN, ud.INFO, "Skipping cleanup of exam metadata/data files as requested")
            return

        if not os.path.isdir(self.DIR_EXAMS):
            ud.debug(
                ud.MAIN,
                ud.INFO,
                "Skipping cleanup of exam metadata/data files as {!r} does not exist.".format(
                    self.DIR_EXAMS
                ),
            )
            return

        for entry in os.listdir(self.DIR_EXAMS):
            fn = os.path.join(self.DIR_EXAMS, entry)
            ud.debug(ud.MAIN, ud.PROCESS, f"Removing {fn!r}")
            if not self.options.dryrun:
                try:
                    if os.path.isdir(fn):
                        shutil.rmtree(fn, ignore_errors=False)
                    else:
                        os.remove(fn)
                except (IOError, OSError) as ex:
                    ud.debug(ud.MAIN, ud.ERROR, f"Unable to remove {fn!r}: {ex}")
            else:
                ud.debug(ud.MAIN, ud.PROCESS, "dry-run: leaving file/directory untouched")

    def remove_room_debris(self):
        if self.options.skip_remove_room_debris:
            ud.debug(ud.MAIN, ud.INFO, "Skipping cleanup of room metadata files as requested")
            return

        if not os.path.isdir(self.DIR_ROOMS):
            ud.debug(
                ud.MAIN,
                ud.INFO,
                "Skipping cleanup of room metadata files as {!r} does not exist.".format(self.DIR_ROOMS),
            )
            return

        for entry in os.listdir(self.DIR_ROOMS):
            fn = os.path.join(self.DIR_ROOMS, entry)
            ud.debug(ud.MAIN, ud.PROCESS, "Removing %r" % (fn,))
            if not self.options.dryrun:
                try:
                    os.remove(fn)
                except (IOError, OSError) as ex:
                    ud.debug(ud.MAIN, ud.ERROR, f"Unable to remove {fn!r}: {ex}")
            else:
                ud.debug(ud.MAIN, ud.PROCESS, "dry-run: leaving file untouched")

    def cleanup(self):
        self.stop_running_exams()
        self.remove_exam_user()
        self.remove_exam_computers()
        self.remove_ucr_debris()
        self.remove_exam_file_debris()
        self.remove_room_debris()


def main():
    description = """This script stops currently running exams and purges all remaining settings for
exams and computer rooms.
WARNING: if a running exam cannot be shutdown cleanly, data in the home
directory may be lost.
This script redirects all output to /var/log/univention/ucs-school-exam-cleanup.log."""
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument(
        "-n",
        "--dry-run",
        dest="dryrun",
        default=False,
        action="store_true",
        help="do not change anything",
    )
    parser.add_argument(
        "-d",
        "--debug",
        default=False,
        action="store_true",
        help="write additional debug output to logfile",
    )
    parser.add_argument(
        "--skip-exam-shutdown",
        dest="skip_stop_running_exams",
        default=False,
        action="store_true",
        help="do not try to shutdown running exams gracefully",
    )
    parser.add_argument(
        "--skip-exam-user-removal",
        default=False,
        action="store_true",
        help="do not remove all existing exam users (only on Primary Directory Node / Backup "
        "Directory Node)",
    )
    parser.add_argument(
        "--skip-exam-computer-removal",
        default=False,
        action="store_true",
        help="do not remove all existing exam computers (only on Primary Directory Node / Backup "
        "Directory Node)",
    )
    parser.add_argument(
        "--skip-ucr-cleanup",
        dest="skip_remove_ucr_debris",
        default=False,
        action="store_true",
        help="do not purge exam/room related UCR variables",
    )
    parser.add_argument(
        "--skip-exam-cleanup",
        dest="skip_remove_exam_file_debris",
        default=False,
        action="store_true",
        help="do not purge exam data/metadata files",
    )
    parser.add_argument(
        "--skip-room-cleanup",
        dest="skip_remove_room_debris",
        default=False,
        action="store_true",
        help="do not purge room metadata files",
    )
    options = parser.parse_args()

    debug_level = ud.ALL if options.debug else ud.PROCESS
    ud.init(LOGFILE, ud.FLUSH, ud.NO_FUNCTION)
    adm = grp.getgrnam("adm")
    os.chown(LOGFILE, 0, adm.gr_gid)
    os.chmod(LOGFILE, 0o640)
    ud.set_level(ud.MAIN, debug_level)

    sys.stdout = open(LOGFILE, "a+")
    sys.stderr = open(LOGFILE, "a+")

    helper = ExamCleanupHelper(options)
    helper.cleanup()


if __name__ == "__main__":
    main()
