from jellyfin_api_client.models import * from jellyfin_api_client.api.items import get_items, get_item_user_data from jellyfin_api_client.api.user import get_users from jellyfin_api_client.api.playlists import create_playlist, update_playlist from jellyfin_api_client.types import Response from jellyfin_api_client import AuthenticatedClient import os import random from dotenv import load_dotenv from pprint import pp load_dotenv() def log_response(response): request = response.request if response.status_code >= 400: print(f"{request.method} {request.url} - Status {response.status_code}") raise Exception(response.read()) server_url = os.getenv('SERVER_URL') api_token = os.getenv('API_TOKEN') device = os.getenv('HOSTNAME') user_names = os.getenv('USER_NAMES').split() playlist_name = os.getenv('PLAYLIST') item_count = int(os.getenv('ITEM_COUNT')) client = AuthenticatedClient( base_url=server_url, prefix="", token=f"MediaBrowser Client=\"jellyfin-client\", Device=\"{device}\", DeviceId=\"1234\", Version=\"1.0.0\", Token=\"{api_token}\"", httpx_args={"event_hooks": {"response": [log_response]}}) with client as client: users = get_users.sync(client=client) users = [user for user in users if user.name in user_names] playlists = get_items.sync( client=client, recursive=True, include_item_types=[BaseItemKind.PLAYLIST], search_term=playlist_name ) songs: list[BaseItemDto] = [] for user in users: result = get_items.sync( client=client, user_id=user.id, recursive=True, enable_images=False, include_item_types=[BaseItemKind.AUDIO], limit=item_count, sort_by=[ItemSortBy.PLAYCOUNT, ItemSortBy.RANDOM]) play_count_min = min(s.user_data.play_count for s in result.items) play_count_max = max(s.user_data.play_count for s in result.items) print(f"Got {len(result.items)} songs for user {user.name} (played {play_count_min} to {play_count_max} times)") for item in result.items: if any(item.id == s.id for s in songs): continue play_count = item.user_data.play_count for u in users: if u.id == user.id: continue user_data = get_item_user_data.sync(item.id, client=client, user_id=u.id) play_count += user_data.play_count item.user_data.play_count = play_count songs.append(item) print(f"Selecting from {len(songs)} unique songs...") random.shuffle(songs) songs.sort(key=lambda s: s.user_data.play_count) songs = songs[:item_count] play_count_min = min(s.user_data.play_count for s in songs) play_count_max = max(s.user_data.play_count for s in songs) print(f"Using the {len(songs)} least played songs (played {play_count_min} to {play_count_max} times)") try: playlist_id = next(p.id for p in playlists.items if p.name == playlist_name) body = UpdatePlaylistDto( name=playlist_name, users=[PlaylistUserPermissions(user_id=u.id, can_edit=True) for u in users], ids=[item.id for item in songs], is_public=True ) update_playlist.sync( playlist_id=playlist_id, client=client, body=body ) except StopIteration: create_playlist.sync( client=client, body=CreatePlaylistDto( name=playlist_name, user_id=users[0].id, users=[PlaylistUserPermissions(user_id=u.id, can_edit=True) for u in users], ids=[item.id for item in songs], is_public=True, media_type=CreatePlaylistDtoMediaType.AUDIO ) ) print(f"Successfully updated playlist '{playlist_name}' with {len(songs)} songs")