#!/usr/bin/env python3

# SPDX-FileCopyrightText: 2022 Harald Sitter <sitter@kde.org>
# SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com>
# SPDX-License-Identifier: MIT

import base64
import logging
import os
import queue
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import unittest
from datetime import date
from time import sleep
from typing import IO, Final

import cv2 as cv
import gi
import numpy as np

gi.require_version("Gdk", "3.0")  # StatusIcon is removed in 4
gi.require_version("Gtk", "3.0")  # StatusIcon is removed in 4
gi.require_version('GdkPixbuf', '2.0')

from appium import webdriver
from appium.options.common.base import AppiumOptions
from appium.webdriver.common.appiumby import AppiumBy
from gi.repository import Gdk, GdkPixbuf, Gio, GLib, Gtk
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.actions.action_builder import ActionBuilder
from selenium.webdriver.common.actions.interaction import POINTER_MOUSE
from selenium.webdriver.common.actions.mouse_button import MouseButton
from selenium.webdriver.common.actions.pointer_input import PointerInput
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

KDE_VERSION: Final = 6
WIDGET_ID: Final = "org.kde.plasma.systemtray"
CMAKE_RUNTIME_OUTPUT_DIRECTORY: Final = os.getenv("CMAKE_RUNTIME_OUTPUT_DIRECTORY", os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir, os.pardir, "build", "bin"))


def generate_color_block(red: int, green: int, blue: int) -> str:
    cv_second_image = np.zeros((10, 10, 3), dtype=np.uint8)
    cv_second_image[:, :] = [blue, green, red]
    return base64.b64encode(cv.imencode('.png', cv_second_image)[1].tobytes()).decode()


class XEmbedTrayIcon(threading.Thread):
    """
    XEmbed tray icon implementation using Gtk.StatusIcon
    """

    def __init__(self, title: str) -> None:
        super().__init__()

        GLib.timeout_add_seconds(300, Gtk.main_quit)  # Failsafe

        # Red square
        self.__status_icon: Gtk.StatusIcon = Gtk.StatusIcon(title=title)
        pixbuf = GdkPixbuf.Pixbuf.new(GdkPixbuf.Colorspace.RGB, True, 8, 16, 16)
        pixbuf.fill(0xaa0000ff)  # 170, 0, 0
        self.__status_icon.set_from_pixbuf(pixbuf)

        self.__status_icon.connect("button-press-event", self._on_button_press_event)
        self.button_press_event = threading.Event()
        self.pressed_button: int = -1
        self.__status_icon.connect("button-release-event", self._on_button_release_event)
        self.button_release_event = threading.Event()
        self.__status_icon.connect("popup-menu", self._on_popup_menu)
        self.popup_menu_event = threading.Event()
        self.scroll_event = threading.Event()
        self.__status_icon.connect("scroll-event", self._on_scroll_event)

    def run(self) -> None:
        Gtk.main()

    def quit(self) -> None:
        Gtk.main_quit()

    def reset_events(self) -> None:
        """
        Reset all threading events
        """
        self.button_press_event.clear()
        self.pressed_button = -1
        self.button_release_event.clear()
        self.popup_menu_event.clear()
        self.scroll_event.clear()

    def _on_button_press_event(self, status_icon: Gtk.StatusIcon, button_event: Gdk.EventButton) -> None:
        logging.info(f"button-press-event {button_event.button}")
        self.pressed_button = button_event.button
        self.button_press_event.set()

    def _on_button_release_event(self, status_icon: Gtk.StatusIcon, button_event: Gdk.EventButton) -> None:
        logging.info(f"button-release-event {button_event.button}")
        self.button_release_event.set()

    def _on_popup_menu(self, status_icon: Gtk.StatusIcon, button: int, activate_time: int) -> None:
        logging.info(f"popup-menu {button} {activate_time}")
        self.popup_menu_event.set()

    def _on_scroll_event(self, status_icon, scroll_event: Gdk.EventScroll) -> None:
        logging.info(f"scroll-event {scroll_event.delta_x} {scroll_event.delta_y} {int(scroll_event.direction)}")
        self.scroll_event.set()


class StreamReaderThread(threading.Thread):
    """
    Non-blocking readline thread
    """

    def __init__(self, stream: IO[bytes]) -> None:
        """
        @param stream: the stream to read from
        """
        self.__stream: IO[bytes] = stream
        self.__queue = queue.Queue()

        self.__stop_event = threading.Event()

        # Create the thread
        super().__init__()

    def run(self) -> None:
        """
        Collects lines from the source stream and put them in the queue.
        """
        while self.__stream.readable() and not self.__stop_event.is_set():
            line_str: str = self.__stream.readline().decode(encoding="utf-8")
            if "Received click" in line_str:
                self.__queue.put(line_str)
            elif len(line_str) == 0:
                break

    def stop(self) -> None:
        """
        Stops the thread
        """
        self.__stop_event.set()

    def readline(self) -> str | None:
        """
        Non-blocking readline
        The default timeout is 5s.
        """
        try:
            return self.__queue.get(block=True, timeout=5)
        except queue.Empty:
            return None


class SystemTrayTests(unittest.TestCase):
    """
    Tests for the system tray widget
    """

    driver: webdriver.Remote
    xembedsniproxy: subprocess.Popen[bytes] | None = None
    kded: subprocess.Popen[bytes] | None = None
    xembed_tray_icon: XEmbedTrayIcon | None = None
    stream_reader_thread: StreamReaderThread | None = None

    @classmethod
    def setUpClass(cls) -> None:
        """
        Opens the widget and initialize the webdriver
        """
        # Install the test widget
        os.makedirs(os.path.join(GLib.get_user_data_dir(), "plasma", "plasmoids"))
        shutil.copytree(os.path.join(os.path.dirname(os.path.abspath(__file__)), "systemtraytest", "org.kde.testdbusactivation"), os.path.join(GLib.get_user_data_dir(), "plasma", "plasmoids", "org.kde.testdbusactivation"))

        options = AppiumOptions()
        options.set_capability("app", f"plasmawindowed -p org.kde.plasma.nano {WIDGET_ID}")
        options.set_capability("environ", {
            "LC_ALL": "en_US.UTF-8",
        })
        cls.driver = webdriver.Remote(command_executor='http://127.0.0.1:4723', options=options)

        cls.kded = subprocess.Popen([f"kded{KDE_VERSION}"])
        # Doc: https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusConnection.html
        session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESSION)
        SERVICE_NAME: Final = "org.freedesktop.DBus"
        OBJECT_PATH: Final = "/"
        INTERFACE_NAME: Final = SERVICE_NAME
        message: Gio.DBusMessage = Gio.DBusMessage.new_method_call(SERVICE_NAME, OBJECT_PATH, INTERFACE_NAME, "NameHasOwner")
        message.set_body(GLib.Variant("(s)", [f"org.kde.kded{KDE_VERSION}"]))
        for _ in range(5):
            reply, _ = session_bus.send_message_with_reply_sync(message, Gio.DBusSendMessageFlags.NONE, 1000)

            if reply and reply.get_signature() == 'b' and reply.get_body().get_child_value(0).get_boolean():
                break
            print(f"waiting for kded to appear on the dbus session", file=sys.stderr, flush=True)
            sleep(1)

        kded_reply: GLib.Variant = session_bus.call_sync(f"org.kde.kded{KDE_VERSION}", "/kded", f"org.kde.kded{KDE_VERSION}", "loadModule", GLib.Variant("(s)", [f"statusnotifierwatcher"]), GLib.VariantType("(b)"), Gio.DBusSendMessageFlags.NONE, 1000)
        assert kded_reply.get_child_value(0).get_boolean(), "Module is not loaded"

    def setUp(self) -> None:
        pass

    def tearDown(self) -> None:
        """
        Take screenshot when the current test fails
        """
        if not self._outcome.result.wasSuccessful():
            if os.environ.get("TEST_WITH_KWIN_WAYLAND", "1") == "0":
                subprocess.check_call(["import", "-window", "root", f"failed_test_shot_systemtraytest_#{self.id()}.png"])
            else:
                self.driver.get_screenshot_as_file(f"failed_test_shot_systemtraytest_#{self.id()}.png")

    @classmethod
    def tearDownClass(cls) -> None:
        """
        Make sure to terminate the driver again, lest it dangles.
        """
        subprocess.check_call([f"kquitapp{KDE_VERSION}", "plasmawindowed"])
        for _ in range(10):
            try:
                subprocess.check_call(["pidof", "plasmawindowed"])
            except subprocess.CalledProcessError:
                break
            time.sleep(1)

        if cls.kded is not None:
            subprocess.check_call([f"kquitapp{KDE_VERSION}", f"kded{KDE_VERSION}"])
            cls.kded.wait(5)
        cls.driver.quit()

    def take_screenshot(self) -> str:
        """
        Take screenshot of the current screen and use png+base64 to encode the image
        """
        with tempfile.TemporaryDirectory() as temp_dir:
            saved_image_path = os.path.join(temp_dir, "tray.png")
            if os.getenv("TEST_WITH_KWIN_WAYLAND", "1") == "0":
                subprocess.check_call(["import", "-window", "root", saved_image_path])
            else:
                self.driver.get_screenshot_as_file(saved_image_path)
            cv_image = cv.imread(saved_image_path, cv.IMREAD_COLOR)
        return base64.b64encode(cv.imencode('.png', cv_image)[1].tobytes()).decode()

    def cleanup_xembed_tray_icon(self) -> None:
        """
        Cleanup function for test_xembed_tray_icon
        """
        if self.xembed_tray_icon is not None:
            self.xembed_tray_icon.quit()
            self.xembed_tray_icon = None

        if self.xembedsniproxy is not None:
            subprocess.check_call([f"kquitapp{KDE_VERSION}", "xembedsniproxy"])
            self.xembedsniproxy.wait(5)
            self.xembedsniproxy = None

        if self.stream_reader_thread is not None and self.stream_reader_thread.is_alive():
            self.stream_reader_thread.stop()
            self.stream_reader_thread = None

    def click_center(self, rect: dict[str, int], mouse_button: int = MouseButton.LEFT) -> None:
        """
        Move the mouse to the center of the area and click
        """
        if self.xembed_tray_icon is not None:
            self.xembed_tray_icon.reset_events()
        action = ActionBuilder(self.driver, mouse=PointerInput(POINTER_MOUSE, "mouse"))
        action.pointer_action.move_to_location(int(rect["x"] + rect["width"] / 2), int(rect["y"] + rect["height"] / 2)).click(None, mouse_button).pause(1)
        action.perform()
        if self.xembed_tray_icon is None:
            return
        self.assertTrue(self.xembed_tray_icon.button_press_event.is_set())
        self.assertTrue(self.xembed_tray_icon.button_release_event.is_set())
        # the button which was pressed or released, numbered from 1 to 5. Normally button 1 is the left mouse button, 2 is the middle button, and 3 is the right button. On 2-button mice, the middle button can often be simulated by pressing both mouse buttons together.
        if mouse_button == MouseButton.LEFT:
            self.assertEqual(self.xembed_tray_icon.pressed_button, 1)
        elif mouse_button == MouseButton.MIDDLE:
            self.assertEqual(self.xembed_tray_icon.pressed_button, 2)
        elif mouse_button == MouseButton.RIGHT:
            self.assertEqual(self.xembed_tray_icon.pressed_button, 3)

    def scroll_center(self, rect: dict[str, int], delta_x: float, delta_y: float) -> None:
        """
        Move the mouse to the center of the area and scroll
        """
        if self.xembed_tray_icon is not None:
            self.xembed_tray_icon.reset_events()
        action = ActionBuilder(self.driver)
        action.wheel_action.scroll(int(rect["x"] + rect["width"] / 2), int(rect["y"] + rect["height"] / 2), delta_x, delta_y).pause(1)
        action.perform()
        if self.xembed_tray_icon is not None:
            self.assertTrue(self.xembed_tray_icon.scroll_event.is_set())

    def test_1_xembed_tray_icon(self) -> None:
        """
        Tests XEmbed tray icons can be listed and clicked in the tray.

        @note GTK doesn't like send_events and double checks the mouse position
        matches where the window is and is top level, so match the debug
        output from xembedsniproxy instead.
        """
        self.addCleanup(self.cleanup_xembed_tray_icon)

        debug_env: dict[str, str] = os.environ.copy()
        debug_env["QT_LOGGING_RULES"] = "kde.xembedsniproxy.debug=true"
        debug_env["XDG_SESSION_TYPE"] = "x11" if os.getenv("TEST_WITH_KWIN_WAYLAND", "1") == "0" else "wayland"
        self.xembedsniproxy = subprocess.Popen([os.path.join(CMAKE_RUNTIME_OUTPUT_DIRECTORY, "xembedsniproxy"), '--platform', 'xcb'], env=debug_env, stderr=subprocess.PIPE)  # For debug output
        if not self.xembedsniproxy.stderr or self.xembedsniproxy.poll() != None:
            self.fail("xembedsniproxy is not available")
        print(f"xembedsniproxy PID: {self.xembedsniproxy.pid}", file=sys.stderr, flush=True)

        title: str = f"XEmbed Status Icon Test {date.today().strftime('%Y%m%d')}"
        self.xembed_tray_icon = XEmbedTrayIcon(title)
        self.xembed_tray_icon.start()

        wait: WebDriverWait = WebDriverWait(self.driver, 10)
        try:
            # FocusScope in StatusNotifierItem.qml
            wait.until(EC.presence_of_element_located((AppiumBy.NAME, title)))
        except TimeoutException:
            self.fail(f"Cannot find the XEmbed icon in the system tray: {self.xembedsniproxy.stderr.readlines()}")

        # Create a reader thread to work around read block
        self.stream_reader_thread = StreamReaderThread(self.xembedsniproxy.stderr)
        self.stream_reader_thread.start()
        self.assertTrue(self.stream_reader_thread.is_alive(), "The reader thread is not running")

        # Now test clickable
        self.driver.find_element(AppiumBy.NAME, title).click()

        success: bool = False
        for _ in range(10):
            if self.stream_reader_thread.readline() is not None:
                success = True
                break

            print("Retrying...", file=sys.stderr, flush=True)
            self.driver.find_element(AppiumBy.NAME, title).click()

        self.assertTrue(success, "xembedsniproxy did not receive the click event")

        # The tray icon is a red square
        rect: dict[str, int] = self.driver.find_image_occurrence(self.take_screenshot(), generate_color_block(170, 0, 0))["rect"]

        if os.getenv("TEST_WITH_KWIN_WAYLAND", "1") == "0":
            return

        # Left click
        self.click_center(rect)

        # Middle click
        self.click_center(rect, MouseButton.MIDDLE)

        # Right click
        self.click_center(rect, MouseButton.RIGHT)
        self.assertTrue(self.xembed_tray_icon.popup_menu_event.is_set())

        # Scroll up
        self.scroll_center(rect, 0, -15)

        # Scroll down
        self.scroll_center(rect, 0, 15)

        # Scroll left
        self.scroll_center(rect, -15, 0)

        # Scroll right
        self.scroll_center(rect, 15, 0)

    @unittest.skipIf(os.getenv("TEST_WITH_KWIN_WAYLAND", "1") == "0", "inputsynth only works on Wayland")
    def test_2_statusnotifieritem(self) -> None:
        """
        Tests for org.kde.StatusNotifierItem
        1. Left click
        2. Right click
        3. Middle click
        4. Wheel
        5. Activate menu actions
        6. NeedsAttention/Active/Passive status
        """
        status_notifier = subprocess.Popen([os.path.join(CMAKE_RUNTIME_OUTPUT_DIRECTORY, "systemtray_statusnotifiertest")], stdout=subprocess.PIPE)
        self.addCleanup(status_notifier.terminate)
        time.sleep(1)  # Wait until the icon appears
        rect: dict = self.driver.find_image_occurrence(self.take_screenshot(), generate_color_block(255, 0, 0))["rect"]  # Red

        expected_result: list[str] = []

        # Left click
        self.click_center(rect)
        expected_result.append("Activated")

        # Middle click
        self.click_center(rect, MouseButton.MIDDLE)
        expected_result.append("SecondaryActivated")

        # Scroll up
        self.scroll_center(rect, 0, -15)
        expected_result.append("Scrolled by 180 Vertically")

        # Scroll down
        self.scroll_center(rect, 0, 15)
        expected_result.append("Scrolled by -180 Vertically")
        time.sleep(1)

        # Scroll left
        self.scroll_center(rect, -15, 0)
        expected_result.append("Scrolled by 180 Horizontally")

        # Scroll right
        self.scroll_center(rect, 15, 0)
        expected_result.append("Scrolled by -180 Horizontally")

        # Right click
        self.click_center(rect, MouseButton.RIGHT)

        # Click the first action to change the color to blue
        rect = self.driver.find_image_occurrence(self.take_screenshot(), generate_color_block(0, 255, 0))["rect"]  # Green
        self.click_center(rect)
        expected_result.append("NeedsAttention")

        # Right click
        rect = self.driver.find_image_occurrence(self.take_screenshot(), generate_color_block(0, 0, 255))["rect"]  # Blue
        self.click_center(rect, MouseButton.RIGHT)

        # Click the second action to change the color to red
        rect = self.driver.find_image_occurrence(self.take_screenshot(), generate_color_block(255, 85, 255))["rect"]  # Purple
        self.click_center(rect)
        expected_result.append("Active")

        # Right click
        rect = self.driver.find_image_occurrence(self.take_screenshot(), generate_color_block(255, 0, 0))["rect"]  # Red
        self.click_center(rect, MouseButton.RIGHT)

        # Move to the submenu item
        rect = self.driver.find_image_occurrence(self.take_screenshot(), generate_color_block(85, 0, 255))["rect"]
        self.click_center(rect)

        # Move to the submenu
        rect = self.driver.find_image_occurrence(self.take_screenshot(), generate_color_block(255, 255, 0))["rect"]  # Yellow
        self.click_center(rect)
        expected_result.append("Passive")
        # The icon is hidden
        self.assertRaises(Exception, self.driver.find_image_occurrence, self.take_screenshot(), generate_color_block(255, 0, 0))

        status_notifier.terminate()
        status_notifier.wait(10)
        output = status_notifier.stdout.readlines()
        self.assertEqual(len(expected_result), len(output), f"output: f{output} expected: {expected_result}")
        for l in range(0, len(expected_result)):
            self.assertEqual(output[l].decode().strip(), expected_result[l])

    @unittest.skipIf(os.environ.get("TEST_WITH_KWIN_WAYLAND", "1") == "0", "In openbox, the popup is not focused by default, so sending keys will not work.")
    def test_3_bug479466_keyboard_navigation_in_HiddenItemsView(self) -> None:
        """
        Make sure iconContainer in AbstractItem.qml has the default focus so it can receive key presses
        """
        self.driver.find_element(AppiumBy.NAME, "Show hidden icons").click()
        wait = WebDriverWait(self.driver, 10)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Notifications")))
        time.sleep(1)

        # By default the focused item is "Notifications"
        # Press Enter key directly to open the widget
        if os.environ.get("TEST_WITH_KWIN_WAYLAND", "1") == "0":
            subprocess.check_call(["xdotool", "key", "Return"])
        else:
            actions = ActionChains(self.driver)
            actions.send_keys(Keys.ENTER).perform()
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Do not disturb")))
        if os.environ.get("TEST_WITH_KWIN_WAYLAND", "1") == "0":
            subprocess.check_call(["xdotool", "key", "Escape"])
        else:
            ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()

    def test_4_dbus_activated_plasmoid(self) -> None:
        """
        Dynamically load a Plasmoid into the systemtray when a specific service becomes available
        """
        with subprocess.Popen(["python3", os.path.join(os.path.dirname(os.path.abspath(__file__)), "systemtraytest", "dbusservice.py")], stdout=sys.stderr, stderr=sys.stderr) as process:
            plasmoid_title = "Do not translate Test only"
            element = self.driver.find_element(AppiumBy.NAME, plasmoid_title)
            process.kill()
        WebDriverWait(self.driver, 10).until_not(lambda _: element.is_displayed())


if __name__ == '__main__':
    assert "USE_CUSTOM_BUS" in os.environ
    assert "GDK_BACKEND" in os.environ or "TEST_WITH_KWIN_WAYLAND" in os.environ
    logging.getLogger().setLevel(logging.INFO)
    unittest.main()
