#!/usr/bin/env python3 # SPDX-FileCopyrightText: 2019 The GNOME Music developers # SPDX-FileCopyrightText: 2023 Fushan Wen # SPDX-License-Identifier: GPL-2.0-or-later # pylint: disable=too-many-arguments # For FreeBSD CI which only has python 3.9 from __future__ import annotations import logging import json import os import signal import sys import threading from os import getpid from typing import Any, Final current_folder: Final = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(current_folder, os.pardir, os.pardir, "utils")) from gi.repository import Gio, GLib from GLibMainLoopThread import GLibMainLoopThread def read_player_metadata(json_dict: dict[str, Any]) -> list[dict[str, GLib.Variant]]: song_list: list[dict[str, int | str | list[str]]] = json_dict["metadata"] assert len(song_list) > 0 _metadata: list[dict[str, GLib.Variant]] = [] for song in song_list: song_dict: dict[str, GLib.Variant] = { "mpris:trackid": GLib.Variant('o', song["mpris:trackid"]), "xesam:url": GLib.Variant('s', "file://" + os.path.join(current_folder, song["xesam:url"])), "mpris:length": GLib.Variant('x', int(song["mpris:length"])), # ms "xesam:title": GLib.Variant('s', song["xesam:title"]), "xesam:artist": GLib.Variant('as', song["xesam:artist"]), "mpris:artUrl": GLib.Variant('s', "") } if "xesam:album" in song.keys(): song_dict["xesam:album"] = GLib.Variant('s', song["xesam:album"]) if "mpris:artUrl" in song.keys(): song_dict["mpris:artUrl"] = GLib.Variant('s', "file://" + os.path.join(current_folder, song["mpris:artUrl"])) if "kde:pid" in song.keys(): song_dict["kde:pid"] = GLib.Variant('u', song["kde:pid"]) _metadata.append(song_dict) return _metadata def read_base_properties(json_dict: dict[str, Any]) -> dict[str, GLib.Variant]: prop_dict: dict[str, bool | str | list[str]] = json_dict["base_properties"] assert len(prop_dict) > 0 _base_properties: dict[str, GLib.Variant] = { "CanQuit": GLib.Variant('b', prop_dict["CanQuit"]), "Fullscreen": GLib.Variant('b', prop_dict["Fullscreen"]), "CanSetFullscreen": GLib.Variant('b', prop_dict["CanSetFullscreen"]), "CanRaise": GLib.Variant('b', prop_dict["CanRaise"]), "HasTrackList": GLib.Variant('b', prop_dict["HasTrackList"]), "Identity": GLib.Variant('s', prop_dict["Identity"]), "DesktopEntry": GLib.Variant('s', prop_dict["DesktopEntry"]), "SupportedUriSchemes": GLib.Variant('as', prop_dict["SupportedUriSchemes"]), "SupportedMimeTypes": GLib.Variant('as', prop_dict["SupportedMimeTypes"]), } return _base_properties def read_player_properties(json_dict: dict[str, Any], _metadata: dict[str, GLib.Variant]) -> dict[str, GLib.Variant]: prop_dict: dict[str, int | float | bool | str | dict] = json_dict["player_properties"] assert len(prop_dict) > 0 _player_properties: dict[str, GLib.Variant] = { "PlaybackStatus": GLib.Variant("s", prop_dict["PlaybackStatus"]), "LoopStatus": GLib.Variant("s", prop_dict["LoopStatus"]), "Rate": GLib.Variant("d", prop_dict["Rate"]), "Shuffle": GLib.Variant("b", prop_dict["Shuffle"]), "Metadata": GLib.Variant("a{sv}", _metadata), "Position": GLib.Variant("x", prop_dict["Position"]), "MinimumRate": GLib.Variant("d", prop_dict["MinimumRate"]), "MaximumRate": GLib.Variant("d", prop_dict["MaximumRate"]), "Volume": GLib.Variant("d", prop_dict["Volume"]), "CanGoNext": GLib.Variant("b", prop_dict["CanGoNext"]), "CanGoPrevious": GLib.Variant("b", prop_dict["CanGoPrevious"]), "CanPlay": GLib.Variant("b", prop_dict["CanPlay"]), "CanPause": GLib.Variant("b", prop_dict["CanPause"]), "CanSeek": GLib.Variant("b", prop_dict["CanSeek"]), "CanControl": GLib.Variant("b", prop_dict["CanControl"]), } return _player_properties class Mpris2: """ MPRIS2 interface implemented in GDBus, since dbus-python does not support property """ BASE_IFACE: Final = "org.mpris.MediaPlayer2" OBJECT_PATH: Final = "/org/mpris/MediaPlayer2" PLAYER_IFACE: Final = GLib.Variant('s', "org.mpris.MediaPlayer2.Player") APP_INTERFACE: Final = f"org.mpris.MediaPlayer2.appiumtest.instance{str(getpid())}" connection: Gio.DBusConnection | None = None def __init__(self, metadata: list[dict[str, GLib.Variant]], base_properties: dict[str, GLib.Variant], player_properties: dict[str, GLib.Variant], current_index: int) -> None: self.metadata: list[dict[str, GLib.Variant]] = metadata self.base_properties: dict[str, GLib.Variant] = base_properties self.player_properties: dict[str, GLib.Variant] = player_properties self.current_index: int = current_index self.registered_event = threading.Event() self.playback_status_set_event = threading.Event() self.metadata_updated_event = threading.Event() self.__owner_id: int = Gio.bus_own_name(Gio.BusType.SESSION, self.APP_INTERFACE, Gio.BusNameOwnerFlags.NONE, self.on_bus_acquired, None, None) assert self.__owner_id > 0 self.__prop_reg_id: int = 0 self.__player_reg_id: int = 0 self.__base_reg_id: int = 0 def quit(self) -> None: if self.connection is None: return self.connection.unregister_object(self.__prop_reg_id) self.__prop_reg_id = 0 self.connection.unregister_object(self.__player_reg_id) self.__player_reg_id = 0 self.connection.unregister_object(self.__base_reg_id) self.__base_reg_id = 0 Gio.bus_unown_name(self.__owner_id) self.connection.flush_sync(None) # Otherwise flaky logging.info("Player exit") def on_bus_acquired(self, connection: Gio.DBusConnection, name: str, *args) -> None: """ Interface is ready, now register objects. """ self.connection = connection dbus_folder: str = os.path.join(current_folder, os.pardir, os.pardir, os.pardir, "libkmpris", "dbus") with open(f"{dbus_folder}/org.freedesktop.DBus.Properties.xml", encoding="utf-8") as handler: properties_introspection_xml: str = '\n'.join(handler.readlines()) introspection_data = Gio.DBusNodeInfo.new_for_xml(properties_introspection_xml) self.__prop_reg_id = connection.register_object(self.OBJECT_PATH, introspection_data.interfaces[0], self.properties_handle_method_call, None, None) assert self.__prop_reg_id > 0 with open(f"{dbus_folder}/org.mpris.MediaPlayer2.Player.xml", encoding="utf-8") as handler: player_introspection_xml: str = '\n'.join(handler.readlines()) introspection_data = Gio.DBusNodeInfo.new_for_xml(player_introspection_xml) self.__player_reg_id = connection.register_object(self.OBJECT_PATH, introspection_data.interfaces[0], self.player_handle_method_call, self.player_handle_get_property, self.player_handle_set_property) assert self.__player_reg_id != 0 with open(f"{dbus_folder}/org.mpris.MediaPlayer2.xml", encoding="utf-8") as handler: interface_introspection_xml: str = '\n'.join(handler.readlines()) introspection_data = Gio.DBusNodeInfo.new_for_xml(interface_introspection_xml) self.__base_reg_id = connection.register_object(self.OBJECT_PATH, introspection_data.interfaces[0], self.interface_handle_method_call, self.interface_handle_get_property, self.interface_handle_set_property) assert self.__base_reg_id != 0 print("MPRIS registered", file=sys.stderr, flush=True) self.registered_event.set() def properties_handle_method_call(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, method_name: str, parameters: GLib.Variant, invocation: Gio.DBusMethodInvocation) -> None: """ Handles method calls for org.freedesktop.DBus.Properties """ assert interface_name == "org.freedesktop.DBus.Properties", f"Wrong interface name {interface_name} from {sender}" if method_name == "Get": ret: Any = None interface: str = parameters[0] property_name: str = parameters[1] if interface == self.BASE_IFACE: ret = self.base_properties[property_name] elif interface == self.PLAYER_IFACE.get_string(): ret = self.player_properties[property_name] else: assert False, f"Unknown interface {interface}" print(f"Get: {interface} {property_name} {ret}", file=sys.stderr, flush=True) # https://bugzilla.gnome.org/show_bug.cgi?id=765603 invocation.return_value(GLib.Variant.new_tuple(ret)) elif method_name == "GetAll": interface = parameters[0] if interface == self.BASE_IFACE: ret = GLib.Variant('a{sv}', self.base_properties) elif interface == self.PLAYER_IFACE.get_string(): ret = GLib.Variant('a{sv}', self.player_properties) else: assert False, f"Unknown interface {interface}" print(f"GetAll: {interface} {ret}", file=sys.stderr, flush=True) invocation.return_value(GLib.Variant.new_tuple(ret)) elif method_name == "Set": interface = parameters[0] property_name = parameters[1] value: Any = parameters[2] if interface != self.PLAYER_IFACE.get_string(): assert False, f"Wrong interface {interface}" if property_name == "Rate": self.set_rate(value, connection, object_path) elif property_name == "LoopStatus": self.set_repeat(value, connection, object_path) elif property_name == "Shuffle": self.set_shuffle(value, connection, object_path) elif property_name == "Volume": self.set_volume(value, connection, object_path) else: assert False, f"Unknown property {property_name}" print(f"Set: {interface} {property_name} {value}", file=sys.stderr, flush=True) else: logging.error("Unhandled method: %s", method_name) invocation.return_error_literal(Gio.dbus_error_quark(), Gio.DBusError.UNKNOWN_METHOD, f"Unknown method {method_name}") def set_playing(self, playing: bool, connection: Gio.DBusConnection, object_path: str) -> None: """ Changes the playing state """ self.player_properties["PlaybackStatus"] = GLib.Variant('s', "Playing" if playing else "Paused") changed_properties = GLib.Variant('a{sv}', { "PlaybackStatus": self.player_properties["PlaybackStatus"], }) Gio.DBusConnection.emit_signal(connection, None, object_path, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(self.PLAYER_IFACE, changed_properties, GLib.Variant('as', ()))) self.playback_status_set_event.set() def stop(self, connection: Gio.DBusConnection, object_path: str) -> None: """ Changes the playing state """ self.player_properties["PlaybackStatus"] = GLib.Variant('s', "Stopped") self.player_properties["Metadata"] = GLib.Variant('a{sv}', {}) changed_properties = GLib.Variant('a{sv}', { "PlaybackStatus": self.player_properties["PlaybackStatus"], "Metadata": self.player_properties["Metadata"], }) Gio.DBusConnection.emit_signal(connection, None, object_path, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(self.PLAYER_IFACE, changed_properties, GLib.Variant('as', ()))) def set_rate(self, rate: float, connection: Gio.DBusConnection, object_path: str) -> None: """ Changes the shuffle state """ assert isinstance(rate, float), f"argument is not a float but {type(rate)}" assert self.player_properties["MinimumRate"].get_double() <= rate <= self.player_properties["MaximumRate"].get_double(), f"Rate {rate} is out of bounds" self.player_properties["Rate"] = GLib.Variant('d', rate) changed_properties = GLib.Variant('a{sv}', { "Rate": self.player_properties["Rate"], }) Gio.DBusConnection.emit_signal(connection, None, object_path, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(self.PLAYER_IFACE, changed_properties, GLib.Variant('as', ()))) def set_shuffle(self, shuffle: bool, connection: Gio.DBusConnection, object_path: str) -> None: """ Changes the shuffle state """ assert isinstance(shuffle, bool), f"argument is not a boolean but {type(shuffle)}" self.player_properties["Shuffle"] = GLib.Variant('b', shuffle) changed_properties = GLib.Variant('a{sv}', { "Shuffle": self.player_properties["Shuffle"], }) Gio.DBusConnection.emit_signal(connection, None, object_path, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(self.PLAYER_IFACE, changed_properties, GLib.Variant('as', ()))) def set_repeat(self, repeat: str, connection: Gio.DBusConnection, object_path: str) -> None: """ Changes the loop state """ assert isinstance(repeat, str), f"argument is not a string but {type(repeat)}" self.player_properties["LoopStatus"] = GLib.Variant('s', repeat) changed_properties = GLib.Variant('a{sv}', { "LoopStatus": self.player_properties["LoopStatus"], }) Gio.DBusConnection.emit_signal(connection, None, object_path, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(self.PLAYER_IFACE, changed_properties, GLib.Variant('as', ()))) def set_volume(self, volume: float, connection: Gio.DBusConnection, object_path: str) -> None: """ Adjusts the volume """ assert isinstance(volume, float) and 0 <= volume <= 1, f"Invalid volume {volume} of type {type(volume)}" self.player_properties["Volume"] = GLib.Variant('d', volume) changed_properties = GLib.Variant('a{sv}', { "Volume": self.player_properties["Volume"], }) Gio.DBusConnection.emit_signal(connection, None, object_path, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(self.PLAYER_IFACE, changed_properties, GLib.Variant('as', ()))) def player_handle_method_call(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, method_name: str, parameters: GLib.Variant, invocation: Gio.DBusMethodInvocation) -> None: """ Handles method calls for org.mpris.MediaPlayer2.Player """ assert interface_name == "org.mpris.MediaPlayer2.Player", f"Wrong interface name {interface_name} from {sender}" print(f"player_handle_method_call method_name: {method_name}", file=sys.stderr, flush=True) if method_name == "Next" or method_name == "Previous": self.current_index += 1 if method_name == "Next" else -1 assert 0 <= self.current_index < len(self.metadata) self.player_properties["Metadata"] = GLib.Variant('a{sv}', self.metadata[self.current_index]) self.player_properties["CanGoPrevious"] = GLib.Variant('b', self.current_index > 0) self.player_properties["CanGoNext"] = GLib.Variant('b', self.current_index < len(self.metadata) - 1) changed_properties = GLib.Variant('a{sv}', { "Metadata": self.player_properties["Metadata"], "CanGoPrevious": self.player_properties["CanGoPrevious"], "CanGoNext": self.player_properties["CanGoNext"], }) Gio.DBusConnection.emit_signal(connection, None, object_path, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(self.PLAYER_IFACE, changed_properties, GLib.Variant('as', ()))) self.metadata_updated_event.set() elif method_name == "Pause": self.set_playing(False, connection, object_path) elif method_name == "PlayPause": self.set_playing(self.player_properties["PlaybackStatus"].get_string() != "Playing", connection, object_path) elif method_name == "Stop": self.stop(connection, object_path) elif method_name == "Play": self.set_playing(True, connection, object_path) elif method_name == "Seek": offset: int = parameters[0] length: int = int(self.metadata[self.current_index]["mpris:length"]) position: int = self.player_properties["Position"].get_int64() assert 0 <= position + offset <= length self.player_properties["Position"] = GLib.Variant('x', position + offset) changed_properties = GLib.Variant('a{sv}', { 'Position': self.player_properties["Position"], }) Gio.DBusConnection.emit_signal(connection, None, object_path, self.PLAYER_IFACE.get_string(), "Seeked", GLib.Variant.new_tuple(self.player_properties["Position"])) Gio.DBusConnection.emit_signal(connection, None, object_path, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(self.PLAYER_IFACE, changed_properties, GLib.Variant('as', ()))) elif method_name == "SetPosition": assert parameters[0] == self.metadata[self.current_index]["mpris:trackid"].get_string(), f"expected trackid: {parameters[0]}, actual trackid: {self.metadata[self.current_index]['mpris:trackid'].get_string()}" self.player_properties["Position"] = GLib.Variant('x', parameters[1]) changed_properties = GLib.Variant('a{sv}', { 'Position': self.player_properties["Position"], }) Gio.DBusConnection.emit_signal(connection, None, object_path, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(self.PLAYER_IFACE, changed_properties, GLib.Variant('as', ()))) elif method_name == "OpenUri": print("OpenUri", file=sys.stderr, flush=True) else: # In case the interface adds new methods, fail here for easier discovery logging.error("%s does not exist", method_name) invocation.return_error_literal(Gio.dbus_error_quark(), Gio.DBusError.NOT_SUPPORTED, f"Invalid method {method_name}") def player_handle_get_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, value: Any): """ Handles properties for org.mpris.MediaPlayer2.Player """ assert interface_name == "org.mpris.MediaPlayer2.Player", f"Wrong interface name {interface_name} from {sender}" assert value in self.player_properties.keys(), f"{value} does not exist" return self.player_properties[value] def player_handle_set_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, key: str, value: Any) -> bool: """ Handles properties for org.mpris.MediaPlayer2.Player """ assert interface_name == "org.mpris.MediaPlayer2.Player", f"Wrong interface name {interface_name} from {sender}" assert key in self.player_properties.keys(), f"{key} does not exist" print(f"player_handle_set_property key: {key}, value: {value}", file=sys.stderr, flush=True) if key == "Rate": self.set_rate(value, connection, object_path) elif key == "LoopStatus": self.set_repeat(value, connection, object_path) elif key == "Shuffle": self.set_shuffle(value, connection, object_path) elif key == "Volume": self.set_volume(value, connection, object_path) else: return False # What is the correct thing to return here on success? It appears that # we need to return something other than None or what would be evaluated # to False for this call back to be successful. return True def interface_handle_method_call(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, method_name: str, parameters: GLib.Variant, invocation: Gio.DBusMethodInvocation) -> None: """ Handles method calls for org.mpris.MediaPlayer2 """ assert interface_name == "org.mpris.MediaPlayer2", f"Wrong interface name {interface_name} from {sender}" if method_name == "Raise": print("Raise", file=sys.stderr, flush=True) elif method_name == "Quit": print("Quit", file=sys.stderr, flush=True) else: logging.error("%s does not exist", method_name) invocation.return_error_literal(Gio.dbus_error_quark(), Gio.DBusError.NOT_SUPPORTED, f"Invalid method {method_name}") def interface_handle_get_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, value: Any): """ Handles properties for org.mpris.MediaPlayer2 """ assert interface_name == "org.mpris.MediaPlayer2", f"Wrong interface name {interface_name} from {sender}" assert value in self.base_properties.keys(), f"{value} does not exist" return self.base_properties[value] def interface_handle_set_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, key: str, value: Any) -> bool: """ Handles properties for org.mpris.MediaPlayer2 """ assert interface_name == "org.mpris.MediaPlayer2", f"Wrong interface name {interface_name} from {sender}" assert key in self.base_properties.keys(), f"{key} does not exist" return True class InvalidMpris2(Mpris2): """ MPRIS2 interfaces with invalid responses """ def __init__(self) -> None: super().__init__([], {}, {}, 0) def properties_handle_method_call(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, method_name: str, parameters: GLib.Variant, invocation: Gio.DBusMethodInvocation) -> None: """ @override """ invocation.return_error_literal(Gio.dbus_error_quark(), Gio.DBusError.NOT_SUPPORTED, "") def player_handle_method_call(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, method_name: str, parameters: GLib.Variant, invocation: Gio.DBusMethodInvocation) -> None: """ @override """ invocation.return_error_literal(Gio.dbus_error_quark(), Gio.DBusError.NOT_SUPPORTED, "") def player_handle_get_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, value: Any): """ @override """ return None def player_handle_set_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, key: str, value: Any) -> bool: """ @override """ return False def interface_handle_method_call(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, method_name: str, parameters: GLib.Variant, invocation: Gio.DBusMethodInvocation) -> None: """ @override """ invocation.return_error_literal(Gio.dbus_error_quark(), Gio.DBusError.NOT_SUPPORTED, "") def interface_handle_get_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, value: Any): """ @override """ return None def interface_handle_set_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, key: str, value: Any) -> bool: """ @override """ return False player: Mpris2 loopThread: GLibMainLoopThread def __on_terminate(signal, frame) -> None: player.quit() loopThread.quit() if loopThread.is_alive(): loopThread.join(timeout=10) if __name__ == '__main__': assert len(sys.argv) >= 2, "Insufficient arguments" signal.signal(signal.SIGTERM, __on_terminate) json_path: str = sys.argv.pop() with open(json_path, "r", encoding="utf-8") as f: json_dict: dict[str, list | dict] = json.load(f) metadata: list[dict[str, GLib.Variant]] = read_player_metadata(json_dict) base_properties: dict[str, GLib.Variant] = read_base_properties(json_dict) current_index: int = 0 player_properties: dict[str, GLib.Variant] = read_player_properties(json_dict, metadata[current_index]) loopThread = GLibMainLoopThread() loopThread.start() player = Mpris2(metadata, base_properties, player_properties, current_index) logging.info("Player %s started", base_properties['Identity'].get_string())