123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- from jellyfin_api_client.models import *
- from jellyfin_api_client.api.items import get_items, get_item_user_data
- from jellyfin_api_client.api.user import get_users
- from jellyfin_api_client.api.playlists import create_playlist, update_playlist
- from jellyfin_api_client.types import Response
- from jellyfin_api_client import AuthenticatedClient
- import os
- import random
- from dotenv import load_dotenv
- from pprint import pp
- load_dotenv()
- def log_response(response):
- request = response.request
- if response.status_code >= 400:
- print(f"{request.method} {request.url} - Status {response.status_code}")
- raise Exception(response.read())
- server_url = os.getenv('SERVER_URL')
- api_token = os.getenv('API_TOKEN')
- device = os.getenv('HOSTNAME')
- user_names = os.getenv('USER_NAMES').split()
- playlist_name = os.getenv('PLAYLIST')
- item_count = int(os.getenv('ITEM_COUNT'))
- client = AuthenticatedClient(
- base_url=server_url,
- prefix="",
- token=f"MediaBrowser Client=\"jellyfin-client\", Device=\"{device}\", DeviceId=\"1234\", Version=\"1.0.0\", Token=\"{api_token}\"",
- httpx_args={"event_hooks": {"response": [log_response]}})
- with client as client:
- users = get_users.sync(client=client)
- users = [user for user in users if user.name in user_names]
- playlists = get_items.sync(
- client=client,
- recursive=True,
- include_item_types=[BaseItemKind.PLAYLIST],
- search_term=playlist_name
- )
- songs: list[BaseItemDto] = []
- for user in users:
- result = get_items.sync(
- client=client,
- user_id=user.id,
- recursive=True,
- enable_images=False,
- include_item_types=[BaseItemKind.AUDIO],
- limit=item_count,
- sort_by=[ItemSortBy.PLAYCOUNT, ItemSortBy.RANDOM])
- play_count_min = min(s.user_data.play_count for s in result.items)
- play_count_max = max(s.user_data.play_count for s in result.items)
- print(f"Got {len(result.items)} songs for user {user.name} (played {play_count_min} to {play_count_max} times)")
- for item in result.items:
- if any(item.id == s.id for s in songs):
- continue
- play_count = item.user_data.play_count
- for u in users:
- if u.id == user.id:
- continue
- user_data = get_item_user_data.sync(item.id, client=client, user_id=u.id)
- play_count += user_data.play_count
- item.user_data.play_count = play_count
- songs.append(item)
-
- print(f"Selecting from {len(songs)} unique songs...")
- random.shuffle(songs)
- songs.sort(key=lambda s: s.user_data.play_count)
- songs = songs[:item_count]
-
- play_count_min = min(s.user_data.play_count for s in songs)
- play_count_max = max(s.user_data.play_count for s in songs)
- print(f"Using the {len(songs)} least played songs (played {play_count_min} to {play_count_max} times)")
- try:
- playlist_id = next(p.id for p in playlists.items if p.name == playlist_name)
- body = UpdatePlaylistDto(
- name=playlist_name,
- users=[PlaylistUserPermissions(user_id=u.id, can_edit=True) for u in users],
- ids=[item.id for item in songs],
- is_public=True
- )
- update_playlist.sync(
- playlist_id=playlist_id,
- client=client,
- body=body
- )
- except StopIteration:
- create_playlist.sync(
- client=client,
- body=CreatePlaylistDto(
- name=playlist_name,
- user_id=users[0].id,
- users=[PlaylistUserPermissions(user_id=u.id, can_edit=True) for u in users],
- ids=[item.id for item in songs],
- is_public=True,
- media_type=CreatePlaylistDtoMediaType.AUDIO
- )
- )
- print(f"Successfully updated playlist '{playlist_name}' with {len(songs)} songs")
|