#!/usr/bin/env python3

# SPDX-FileCopyrightText: 2022 Harald Sitter <sitter@kde.org>
# SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com>
# SPDX-License-Identifier: GPL-2.0-or-later

# pylint: disable=too-many-arguments

import json
import os
import subprocess
import sys
import time
import unittest
from tempfile import NamedTemporaryFile
from time import sleep
from typing import Final

import gi
from appium import webdriver
from appium.options.common.base import AppiumOptions
from appium.webdriver.common.appiumby import AppiumBy
from selenium.webdriver.common.actions.action_builder import ActionBuilder
from selenium.webdriver.common.actions.interaction import POINTER_TOUCH
from selenium.webdriver.common.actions.pointer_input import PointerInput
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest"))
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir, "utils"))
from GLibMainLoopThread import GLibMainLoopThread
from mediaplayer import (InvalidMpris2, Mpris2, read_base_properties, read_player_metadata, read_player_properties)

gi.require_version('Gtk', '4.0')
from gi.repository import Gio, GLib, Gtk

WIDGET_ID: Final = "org.kde.plasma.mediacontroller"
KDE_VERSION: Final = 6


class MediaControllerTests(unittest.TestCase):
    """
    Tests for the media controller widget
    """

    driver: webdriver.Remote
    loop_thread: GLibMainLoopThread
    mpris_interface: Mpris2 | None
    player_b: subprocess.Popen | None = None
    player_browser: subprocess.Popen | None = None
    player_plasma_browser_integration: subprocess.Popen | None = None

    @classmethod
    def setUpClass(cls) -> None:
        """
        Opens the widget and initialize the webdriver
        """
        cls.loop_thread = GLibMainLoopThread()
        cls.loop_thread.start()

        options = AppiumOptions()
        options.set_capability("app", f"plasmawindowed -p org.kde.plasma.nano {WIDGET_ID}")
        options.set_capability("environ", {
            "LC_ALL": "en_US.UTF-8",
            "QT_FATAL_WARNINGS": "1",
            "QT_LOGGING_RULES": "qt.accessibility.atspi.warning=false;kf.plasma.core.warning=false;kf.windowsystem.warning=false;kf.kirigami.platform.warning=false;qt.qml.typeresolution.cycle.warning=false",
        })
        options.set_capability("timeouts", {'implicit': 10000})
        cls.driver = webdriver.Remote(command_executor='http://127.0.0.1:4723', options=options)

    def setUp(self) -> None:
        json_path: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "player_a.json")
        with open(json_path, "r", encoding="utf-8") as f:
            json_dict: dict[str, list | dict] = json.load(f)
        metadata: list[dict[str, GLib.Variant]] = read_player_metadata(json_dict)
        base_properties: dict[str, GLib.Variant] = read_base_properties(json_dict)
        current_index: int = 1
        player_properties: dict[str, GLib.Variant] = read_player_properties(json_dict, metadata[current_index])

        self.mpris_interface = Mpris2(metadata, base_properties, player_properties, current_index)
        assert self.mpris_interface.registered_event.wait(10)

    def tearDown(self) -> None:
        if not self._outcome.result.wasSuccessful():
            self.driver.get_screenshot_as_file(f"failed_test_shot_{WIDGET_ID}_#{self.id()}.png")
        if self.mpris_interface is not None:
            self.mpris_interface.quit()
            self.mpris_interface = None
        WebDriverWait(self.driver, 5, 0.2).until(EC.presence_of_element_located((AppiumBy.NAME, "No media playing")))

    @classmethod
    def tearDownClass(cls) -> None:
        """
        Make sure to terminate the driver again, lest it dangles.
        """
        subprocess.check_call([f"kquitapp{KDE_VERSION}", "plasmawindowed"])
        for _ in range(10):
            try:
                subprocess.check_call(["pidof", "plasmawindowed"])
            except subprocess.CalledProcessError:
                break
            time.sleep(1)
        cls.loop_thread.quit()
        cls.driver.quit()

    def test_track(self) -> None:
        """
        Tests the widget can show track metadata
        """
        assert self.mpris_interface
        play_button = self.driver.find_element(by=AppiumBy.NAME, value="Play")
        previous_button = self.driver.find_element(by=AppiumBy.NAME, value="Previous Track")
        next_button = self.driver.find_element(by=AppiumBy.NAME, value="Next Track")
        shuffle_button = self.driver.find_element(by=AppiumBy.NAME, value="Shuffle")
        repeat_button = self.driver.find_element(by=AppiumBy.NAME, value="Repeat")

        # Match song title, artist and album
        wait: WebDriverWait = WebDriverWait(self.driver, 5)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())))  # Title
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:album"].get_string())))  # Album
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "0:00")))  # Current position
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "-5:00")))  # Remaining time
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, ', '.join(self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:artist"].unpack()))))  # Artists

        # Now click the play button
        play_button.click()
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Pause")))
        play_button.click()
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Play")))

        # Now click the shuffle button
        shuffle_button.click()
        wait.until(lambda _: self.mpris_interface.player_properties["Shuffle"].get_boolean())
        # Click again to disable shuffle
        shuffle_button.click()
        wait.until(lambda _: not self.mpris_interface.player_properties["Shuffle"].get_boolean())

        # Now click the repeat button
        repeat_button.click()
        wait.until(lambda _: self.mpris_interface.player_properties["LoopStatus"].get_string() == "Playlist")
        # Click again to switch to Track mode
        repeat_button.click()
        wait.until(lambda _: self.mpris_interface.player_properties["LoopStatus"].get_string() == "Track")
        # Click again to disable repeat
        repeat_button.click()
        wait.until(lambda _: self.mpris_interface.player_properties["LoopStatus"].get_string() == "None")

        # Switch to the previous song, and match again
        previous_button.click()
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Katie's Favorite")))
        self.assertFalse(previous_button.is_enabled())
        self.assertTrue(next_button.is_enabled())
        self.driver.find_element(by=AppiumBy.NAME, value=self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())
        self.driver.find_element(by=AppiumBy.NAME, value=self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:album"].get_string())
        self.driver.find_element(by=AppiumBy.NAME, value="0:00")
        self.driver.find_element(by=AppiumBy.NAME, value="-10:00")
        self.driver.find_element(by=AppiumBy.NAME, value=', '.join(self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:artist"].unpack()))

        # Switch to the next song (need to click twice), and match again
        next_button.click()
        next_button.click()
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Konqi's Favorite")))
        self.assertTrue(previous_button.is_enabled())
        self.assertFalse(next_button.is_enabled())
        self.driver.find_element(by=AppiumBy.NAME, value=self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())
        self.driver.find_element(by=AppiumBy.NAME, value=self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:album"].get_string())
        self.driver.find_element(by=AppiumBy.NAME, value="0:00")
        self.driver.find_element(by=AppiumBy.NAME, value="-15:00")
        self.driver.find_element(by=AppiumBy.NAME, value=', '.join(self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:artist"].unpack()))

    @unittest.skipIf("KDECI_BUILD" in os.environ, "Too unstable to test in the CI")
    def test_touch_gestures(self) -> None:
        """
        Tests touch gestures like swipe up/down/left/right to adjust volume/progress
        @see https://invent.kde.org/plasma/plasma-workspace/-/merge_requests/2438
        """
        assert self.mpris_interface
        wait: WebDriverWait = WebDriverWait(self.driver, 5)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "0:00")))  # Current position
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "-5:00")))  # Remaining time

        # Center point of the screen
        geometry = Gtk.Window().get_display().get_monitors()[0].get_geometry()
        center_pos_x: int = int(geometry.width / 2)
        center_pos_y: int = int(geometry.height / 2)
        self.assertGreater(center_pos_x, 1)
        self.assertGreater(center_pos_y, 1)

        # Touch the window, and wait a moment to make sure the widget is ready
        input_source = PointerInput(POINTER_TOUCH, "finger")
        action = ActionBuilder(self.driver, mouse=input_source, duration=500)
        action.pointer_action.move_to_location(center_pos_x, center_pos_y).click().pause(1)
        action.perform()

        # Swipe right -> Position++
        input_source = PointerInput(POINTER_TOUCH, "finger")
        action = ActionBuilder(self.driver, mouse=input_source, duration=500)
        action.pointer_action.move_to_location(center_pos_x, center_pos_y).pointer_down().move_to_location(center_pos_x * 2, center_pos_y).pointer_up()
        action.perform()
        self.mpris_interface.connection.flush_sync(None)
        wait.until(lambda _: self.mpris_interface.player_properties["Position"].get_int64() > 0)

        # Swipe left -> Position--
        old_position: int = self.mpris_interface.player_properties["Position"].get_int64()
        input_source = PointerInput(POINTER_TOUCH, "finger")
        action = ActionBuilder(self.driver, mouse=input_source, duration=500)
        action.pointer_action.move_to_location(center_pos_x, center_pos_y).pointer_down().move_to_location(0, center_pos_y).pointer_up().pause(1)
        action.perform()
        self.mpris_interface.connection.flush_sync(None)
        wait.until(lambda _: self.mpris_interface.player_properties["Position"].get_int64() < old_position)

        # Swipe down: Volume--
        input_source = PointerInput(POINTER_TOUCH, "finger")
        action = ActionBuilder(self.driver, mouse=input_source, duration=500)
        action.pointer_action.move_to_location(center_pos_x, center_pos_y).pointer_down().move_to_location(center_pos_x, center_pos_y * 2).pointer_up().pause(1)
        action.perform()
        self.mpris_interface.connection.flush_sync(None)
        wait.until(lambda _: self.mpris_interface.player_properties["Volume"].get_double() < 1.0)

        # Swipe up: Volume++
        old_volume: float = self.mpris_interface.player_properties["Volume"].get_double()
        input_source = PointerInput(POINTER_TOUCH, "finger")
        action = ActionBuilder(self.driver, mouse=input_source, duration=500)
        action.pointer_action.move_to_location(center_pos_x, center_pos_y).pointer_down().move_to_location(center_pos_x, 0).pointer_up().pause(1)
        action.perform()
        self.mpris_interface.connection.flush_sync(None)
        wait.until(lambda _: self.mpris_interface.player_properties["Volume"].get_double() > old_volume)

        # Swipe down and then swipe right, only volume should change
        old_volume = self.mpris_interface.player_properties["Volume"].get_double()
        old_position = self.mpris_interface.player_properties["Position"].get_int64()
        input_source = PointerInput(POINTER_TOUCH, "finger")
        action = ActionBuilder(self.driver, mouse=input_source, duration=500)
        action.pointer_action.move_to_location(center_pos_x, center_pos_y).pointer_down().move_to_location(center_pos_x, center_pos_y * 2).pause(0.5).move_to_location(center_pos_x * 2, center_pos_y * 2).pointer_up().pause(1)
        action.perform()
        self.mpris_interface.connection.flush_sync(None)
        wait.until(lambda _: self.mpris_interface.player_properties["Volume"].get_double() < old_volume)
        self.assertEqual(old_position, self.mpris_interface.player_properties["Position"].get_int64())

        # Swipe right and then swipe up, only position should change
        old_volume = self.mpris_interface.player_properties["Volume"].get_double()
        old_position = self.mpris_interface.player_properties["Position"].get_int64()
        input_source = PointerInput(POINTER_TOUCH, "finger")
        action = ActionBuilder(self.driver, mouse=input_source, duration=500)
        action.pointer_action.move_to_location(center_pos_x, center_pos_y).pointer_down().move_to_location(center_pos_x * 2, center_pos_y).pause(0.5).move_to_location(center_pos_x * 2, 0).pointer_up().pause(1)
        action.perform()
        self.mpris_interface.connection.flush_sync(None)
        wait.until(lambda _: self.mpris_interface.player_properties["Position"].get_int64() > old_position)
        self.assertAlmostEqual(old_volume, self.mpris_interface.player_properties["Volume"].get_double())

    def _cleanup_multiplexer(self) -> None:
        if self.player_b is not None:
            self.player_b.terminate()
            self.player_b.wait()
            self.player_b = None
        if self.player_browser is not None:
            self.player_browser.terminate()
            self.player_browser.wait()
            self.player_browser = None

    def test_multiplexer(self) -> None:
        """
        The multiplexer should be hidden when there is only 1 player, and shows information from the active/playing player if there is one
        """
        self.addCleanup(self._cleanup_multiplexer)

        # Wait until the first player is ready
        wait: WebDriverWait = WebDriverWait(self.driver, 3)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())))

        # Start Player B, Total 2 players
        player_b_json_path: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "player_b.json")
        self.player_b = subprocess.Popen(("python3", os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "mediaplayer.py"), player_b_json_path))
        player_selector: WebElement = wait.until(EC.visibility_of_element_located((AppiumBy.ACCESSIBILITY_ID, "playerSelector")))

        # Find player tabs based on "Identity"
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "AppiumTest")))

        # Make sure the current index does not change after a new player appears
        self.driver.find_element(by=AppiumBy.NAME, value=self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())  # Title

        # Switch to Player B
        self.driver.find_element(by=AppiumBy.NAME, value="Audacious").click()
        with open(player_b_json_path, "r", encoding="utf-8") as f:
            player_b_metadata = read_player_metadata(json.load(f))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, player_b_metadata[0]["xesam:title"].get_string())))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Play")))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, player_b_metadata[0]["xesam:album"].get_string())))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "0:00")))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "-15:00")))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, ', '.join(player_b_metadata[0]["xesam:artist"].unpack()))))
        wait.until_not(EC.element_to_be_clickable((AppiumBy.NAME, "Next Track")))
        wait.until_not(EC.element_to_be_clickable((AppiumBy.NAME, "Previous Track")))

        # Switch to Multiplexer
        # A Paused, B Paused -> A (first added)
        self.driver.find_element(by=AppiumBy.NAME, value="Choose player automatically").click()
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Play")))
        wait.until(EC.element_to_be_clickable((AppiumBy.NAME, "Next Track")))
        wait.until(EC.element_to_be_clickable((AppiumBy.NAME, "Previous Track")))

        # A Paused, B Playing -> B
        # Doc: https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusConnection.html
        session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESSION)
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_b.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Play", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, player_b_metadata[0]["xesam:title"].get_string())))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Pause")))
        wait.until_not(EC.element_to_be_clickable((AppiumBy.NAME, "Next Track")))
        wait.until_not(EC.element_to_be_clickable((AppiumBy.NAME, "Previous Track")))

        # Pause B -> Still B
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_b.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Pause", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Play")))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, player_b_metadata[0]["xesam:title"].get_string())))

        # A Playing, B Paused -> A
        session_bus.call(self.mpris_interface.APP_INTERFACE, Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Play", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Pause")))
        wait.until(EC.element_to_be_clickable((AppiumBy.NAME, "Next Track")))
        wait.until(EC.element_to_be_clickable((AppiumBy.NAME, "Previous Track")))

        # A Playing, B Playing -> Still A
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_b.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Play", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        sleep(1)
        self.driver.find_element(by=AppiumBy.NAME, value=self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())  # Title

        # A Paused, B Playing -> B
        session_bus.call(self.mpris_interface.APP_INTERFACE, Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Pause", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, player_b_metadata[0]["xesam:title"].get_string())))
        wait.until_not(EC.element_to_be_clickable((AppiumBy.NAME, "Next Track")))
        wait.until_not(EC.element_to_be_clickable((AppiumBy.NAME, "Previous Track")))

        # BUG 481992: When the filter proxy model emits rowsRemoved, the player is still not erased from m_container, so we have to find the index in m_filterModel.
        # Mpris2SourceModel::beginRemoveRows -> Mpris2SourceModel::rowsAboutToBeRemoved -> Mpris2FilterProxyModel::rowsAboutToBeRemoved -> QSortFilterProxyModelPrivate::remove_proxy_interval -> Mpris2FilterProxyModel::endRemoveRows -> Mpris2FilterProxyModel::rowsRemoved -> Multiplexer::onRowsRemoved (at this time m_container is still not updated)
        # So when the active player is the **last** player in m_container, m_filterModel->mapFromSource(sourceModel->index(sourceRow, 0)) will return an invalid index because the source index becomes stale (source_index.row() >= m->proxy_rows.size()) in the filter model.
        # Start Player C
        player_browser_json_path: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "player_browser.json")
        with open(player_browser_json_path, "r", encoding="utf-8") as f:
            browser_json_data = json.load(f)
        self.player_browser = subprocess.Popen(("python3", os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "mediaplayer.py"), player_browser_json_path))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, browser_json_data["base_properties"]["Identity"])))

        # A Paused, B Playing, C Playing -> Still B
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_browser.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Play", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        self.driver.find_element(by=AppiumBy.NAME, value=player_b_metadata[0]["xesam:title"].get_string())  # Title

        # A Paused, B Paused, C Playing -> C
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_b.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Pause", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, browser_json_data["metadata"][0]["xesam:title"])))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, browser_json_data["metadata"][0]["xesam:album"])))

        # Close B -> Still C, and does not assert at invalid index (BUG 481992)
        self.player_b.terminate()
        self.player_b.wait(10)
        self.player_b = None
        self.driver.find_element(by=AppiumBy.NAME, value=browser_json_data["metadata"][0]["xesam:title"])  # Title

        # Close C -> A
        self.player_browser.terminate()
        self.player_browser.wait(10)
        self.player_browser = None
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Play")))
        wait.until(EC.element_to_be_clickable((AppiumBy.NAME, "Next Track")))
        wait.until(EC.element_to_be_clickable((AppiumBy.NAME, "Previous Track")))
        self.assertFalse(player_selector.is_displayed())  # Tabbar is hidden again

    def _cleanup_filter_plasma_browser_integration(self) -> None:
        """
        A cleanup function to be called after the test is completed
        """
        if self.player_browser is not None:
            self.player_browser.terminate()
            self.player_browser.wait(10)
            self.player_browser = None
        if self.player_plasma_browser_integration is not None:
            self.player_plasma_browser_integration.terminate()
            self.player_plasma_browser_integration.wait(10)
            self.player_plasma_browser_integration = None
        if self.player_b is not None:
            self.player_b.terminate()
            self.player_b.wait(10)
            self.player_b = None

    def test_filter_plasma_browser_integration(self) -> None:
        """
        When Plasma Browser Integration is installed, the widget should only show the player from p-b-i, and hide the player from the browser.
        """
        self.addCleanup(self._cleanup_filter_plasma_browser_integration)

        # Make sure the active player is not the browser so the bug can be tested
        wait = WebDriverWait(self.driver, 3)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, self.mpris_interface.metadata[self.mpris_interface.current_index]["xesam:title"].get_string())))  # Title

        player_browser_json_path: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "player_browser.json")
        with open(player_browser_json_path, "r", encoding="utf-8") as f:
            browser_json_data = json.load(f)
        self.player_browser = subprocess.Popen(["python3", os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "mediaplayer.py"), player_browser_json_path])
        wait.until(EC.visibility_of_element_located((AppiumBy.ACCESSIBILITY_ID, "playerSelector")))
        browser_tab: WebElement = wait.until(EC.presence_of_element_located((AppiumBy.NAME, browser_json_data["base_properties"]["Identity"])))
        browser_tab.click()
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, browser_json_data["metadata"][0]["xesam:title"])))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, browser_json_data["metadata"][0]["xesam:album"])))
        self.assertFalse(self.driver.find_element(by=AppiumBy.NAME, value="Next Track").is_enabled())

        with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "player_plasma_browser_integration.json"), "r", encoding="utf-8") as f:
            pbi_json_data = json.load(f)
        pbi_json_data["metadata"][0]["kde:pid"] = self.player_browser.pid  # Simulate Plasma Browser Integration
        with NamedTemporaryFile("w", encoding="utf-8", suffix=".json", delete=False) as temp_file:
            json.dump(pbi_json_data, temp_file)
            temp_file.flush()

            self.player_plasma_browser_integration = subprocess.Popen(("python3", os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "mediaplayer.py"), temp_file.name))
            wait.until(EC.presence_of_element_located((AppiumBy.NAME, pbi_json_data["base_properties"]["Identity"]))).click()
            wait.until(EC.presence_of_element_located((AppiumBy.NAME, pbi_json_data["metadata"][0]["xesam:title"])))
            wait.until(EC.presence_of_element_located((AppiumBy.NAME, pbi_json_data["metadata"][0]["xesam:album"])))
            wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Play")))
            wait.until(EC.element_to_be_clickable((AppiumBy.NAME, "Next Track")))
            self.assertFalse(browser_tab.is_displayed())

        # When a browser starts playing a video
        # 1. It registers the browser MPRIS instance with PlaybackStatus: Stopped/Paused, and Mpris2FilterProxyModel has it but the active player can be others
        # 2. p-b-i also registers its MPRIS instance, and Mpris2FilterProxyModel filters out the Chromium MPRIS instance, so the browser MPRIS instance in Mpris2FilterProxyModel becomes invalid
        # 3. PlaybackStatus changes to Playing, and the container of the browser MPRIS instance emits playbackStatusChanged() signal. However, the signal should be ignored by Multiplexer (disconnect in Multiplexer::onRowsAboutToBeRemoved)
        session_bus: Gio.DBusConnection = Gio.bus_get_sync(Gio.BusType.SESSION)
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_browser.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Play", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_plasma_browser_integration.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Play", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Pause")))  # Confirm the backend does not crash

        # Pause the browser and start another player to test BUG 483027
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_browser.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Pause", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_plasma_browser_integration.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Pause", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        player_b_json_path: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "player_b.json")
        self.player_b = subprocess.Popen(("python3", os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "mediaplayer.py"), player_b_json_path))
        # Start playing B to make it become favorite player
        self.driver.find_element(AppiumBy.NAME, "Choose player automatically").click()
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_b.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Play", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        with open(player_b_json_path, "r", encoding="utf-8") as f:
            player_b_metadata = read_player_metadata(json.load(f))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, player_b_metadata[0]["xesam:title"].get_string())))
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Pause")))
        session_bus.call(f"org.mpris.MediaPlayer2.appiumtest.instance{str(self.player_b.pid)}", Mpris2.OBJECT_PATH, Mpris2.PLAYER_IFACE.get_string(), "Pause", None, None, Gio.DBusSendMessageFlags.NONE, 1000)
        # Pause B to prepare for BUG 483027
        wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Play")))
        # Close the pbi and the browser to test the assertion in BUG 483027 (the order is required)
        self.player_plasma_browser_integration.terminate()
        self.player_plasma_browser_integration.wait(10)
        self.player_plasma_browser_integration = None
        self.player_browser.terminate()
        self.player_browser.wait(10)
        self.player_browser = None
        self.driver.find_element(AppiumBy.NAME, player_b_metadata[0]["xesam:title"].get_string())

        self._cleanup_filter_plasma_browser_integration()

    def test_bug477144_invalid_player(self) -> None:
        """
        Do not crash when a player is invalid or its DBus interface returns any errors on initialization
        @see https://bugs.kde.org/show_bug.cgi?id=477144
        """
        if self.mpris_interface is not None:
            self.mpris_interface.quit()

        placeholder_element: WebElement = self.driver.find_element(AppiumBy.NAME, "No media playing")
        self.mpris_interface = InvalidMpris2()
        self.assertTrue(self.mpris_interface.registered_event.wait(10))
        self.assertTrue(placeholder_element.is_displayed())

    def test_bug477335_decode_xesam_url(self) -> None:
        """
        Make sure "xesam_url" is decoded before using it in other places like album name
        """
        if self.mpris_interface is not None:
            self.mpris_interface.quit()

        player_with_encoded_url_json_path: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "player_with_encoded_url.json")
        with subprocess.Popen(("python3", os.path.join(os.path.dirname(os.path.abspath(__file__)), "mediacontrollertest", "mediaplayer.py"), player_with_encoded_url_json_path)) as player_with_encoded_url:
            wait = WebDriverWait(self.driver, 3)
            wait.until(EC.presence_of_element_located((AppiumBy.NAME, "Flash Funk")))
            wait.until(EC.presence_of_element_located((AppiumBy.NAME, "League of Legends")))  # Album name deduced from folder name
            # Overflow check, 2160000000 (microsecond) > INT_MAX (2147483647)
            wait.until(EC.presence_of_element_located((AppiumBy.NAME, "-36:00")))
            player_with_encoded_url.terminate()
            player_with_encoded_url.wait(10)


if __name__ == '__main__':
    unittest.main()
