Extract separate module for searching on spotify

This commit is contained in:
SemvdHoeven
2026-04-13 17:24:42 +02:00
parent e8552e6429
commit 97b3ebb41f
2 changed files with 525 additions and 303 deletions

View File

@@ -15,13 +15,10 @@ import datetime as dt
from icrawler.builtin import GoogleImageCrawler
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from datetime import datetime
import spotify_search
import requests
import time
import logging
logging.basicConfig(
@@ -370,7 +367,8 @@ def check_artist(audio, filename: str) -> bool:
return res
def check_spotify_genre(genres,audio):
def set_genre_tag(genres, audio):
"""Apply genre tags to audio file from Spotify genres list."""
genre = ""
if (len(genres) > 0):
if (len(genres) == 1):
@@ -403,271 +401,247 @@ def embed_music_file(audiostr: str, coverfile: str):
logging.info("could not embed music file")
def check_spotify_album_and_save(spotify, audio,x: str) -> bool:
found = False
logging.info("Searching on spotify for album...")
querystring = ""
if x.endswith(".flac"):
querystring = "artist:{0} album:{1}".format(str(audio["artist"]),str(audio["album"]))
else:
querystring = "artist:{0} album:{1}".format(str(audio["TPE2"]),str(audio["TALB"]))
def save_album_from_spotify(spotify, audio, x: str, spotify_data: dict) -> bool:
"""
Save audio file with metadata and cover art from Spotify album data.
logging.info("query string: " + querystring)
tries = 0
found = False
while (tries < 5 and found == False):
try:
results = spotify.search(q=querystring,type='album')
found = True
except Exception as err:
logging.error("could not search on spotify")
logging.error(err)
logging.info("waiting 30 seconds before trying again")
time.sleep(30)
tries += 1
Args:
spotify: Spotify client instance
audio: Audio file object
x: Filename
spotify_data: Dict with album data from spotify_search.search_album()
if (found == False):
logging.error("could not search on spotify after 5 tries, aborting")
Returns:
True if successful, False otherwise
"""
if not spotify_data or not spotify_data.get('found'):
logging.info("No Spotify album data provided")
return False
if (len(results["albums"]["items"]) > 0):
logging.info("album found on spotify!")
found = True
album = results["albums"]["items"][0]
album_artist = album["artists"][0]["name"]
logging.info("Applying Spotify album data to file...")
if (x.endswith(".flac")):
# Set artist
album_artist = spotify_data['artist']
if x.endswith(".flac"):
try:
if str(audio["album_artist"]) != album_artist:
if str(audio.get("album_artist", "")) != album_artist:
audio["album_artist"] = album_artist
except:
audio["album_artist"] = album_artist
else:
if (str(audio["TPE2"]) != album_artist):
audio["TPE2"] = TPE2(encoding=3,text=album_artist)
if str(audio.get("TPE2", "")) != album_artist:
audio["TPE2"] = TPE2(encoding=3, text=album_artist)
album_image_url = album["images"][0]["url"]
album_name = album["name"]
# Set album
album_name = spotify_data['album']
if x.endswith(".flac"):
if str(audio.get("album", "")) != album_name:
audio["album"] = album_name
elif str(audio.get("TALB", "")) != album_name:
audio["TALB"] = TALB(encoding=3, text=album_name)
# Parse and set release date
release_date = spotify_data['release_date']
try:
year = str(datetime.strptime(release_date, '%Y-%m-%d').year)
except:
try:
year = str(datetime.strptime(release_date, '%Y-%m').year)
except:
try:
year = str(datetime.strptime(release_date, '%Y').year)
except:
year = str(release_date)
if x.endswith(".flac"):
if str(audio["album"] != album_name):
audio["album"] = album_name
elif (str(audio["TALB"]) != album_name):
audio["TALB"] = TALB(encoding=3,text=album_name)
# Parse release date to extract year
try:
year = str(datetime.strptime(album["release_date"], '%Y-%m-%d').year)
except:
try:
year = str(datetime.strptime(album["release_date"], '%Y-%m').year)
except:
try:
year = str(datetime.strptime(album["release_date"], '%Y').year)
except:
year = str(album["release_date"])
if (x.endswith(".flac")):
audio["year"] = year
audio["date"] = album["release_date"]
audio["date"] = release_date
else:
audio["TDRC"] = TDRC(encoding=3,text=year)
audio["TDRL"] = TDRL(encoding=3,text=album["release_date"])
audio["TDRC"] = TDRC(encoding=3, text=year)
audio["TDRL"] = TDRL(encoding=3, text=release_date)
artist_search = spotify.artist(album['artists'][0]['external_urls']['spotify'])
logging.info("genres: " + str(artist_search['genres']))
check_spotify_genre(artist_search['genres'],audio)
# Set genres
logging.info("genres: " + str(spotify_data['genres']))
set_genre_tag(spotify_data['genres'], audio)
comment ="Spotify ID: {0}. Release date precision: {1}, total tracks in album: {2}. This album has {3} version(s)".format(album["id"],album["release_date_precision"], album["total_tracks"],len(results["albums"]["items"]))
# Set comment
comment = "Spotify ID: {0}. Release date precision: {1}, total tracks in album: {2}. This album has {3} version(s)".format(
spotify_data['album_id'],
spotify_data['release_date_precision'],
spotify_data['total_tracks'],
spotify_data['versions_count']
)
logging.info("Comment: " + comment)
if x.endswith(".flac"):
audio["comment"] = audio["comment"] + comment
audio["comment"] = audio.get("comment", "") + comment
else:
audio["COMM"] = COMM(encoding=3,text=comment + audio["COMM"])
audio["COMM"] = COMM(encoding=3, text=comment + str(audio.get("COMM", "")))
# Save tags
if x.endswith(".flac"):
remove_flac_ID3_tags(audio,x)
remove_flac_ID3_tags(audio, x)
audio.save(x)
songpath = ""
# Create folder structure
if x.endswith(".flac"):
songpath = join(".",str(audio["artist"][0]),str(audio["album"][0]))
make_folder(join(".",str(audio["artist"][0])))
artist_path = str(audio["artist"][0])
album_path = str(audio["album"][0])
else:
if "/" in audio["TPE2"]:
audio["TPE2"] = audio["TPE2"].replace("/","")
songpath = join(".",str(audio["TPE2"]),str(audio["TALB"]))
make_folder(join(".",str(audio["TPE2"])))
if "/" in str(audio["TPE2"]):
audio["TPE2"] = str(audio["TPE2"]).replace("/", "")
artist_path = str(audio["TPE2"])
album_path = str(audio["TALB"])
songpath = join(".", artist_path, album_path)
make_folder(join(".", artist_path))
if (not x.endswith(".flac")):
if ("/" in str(audio["TALB"])):
# Handle albums with / in the name
if not x.endswith(".flac") and "/" in album_path:
logging.info("album contains /")
folders = str(audio["TALB"]).split('/')
folders = album_path.split('/')
logging.info(folders)
pos = join(".",str(audio["TPE2"]))
pos = join(".", artist_path)
for fold in folders:
make_folder(join(pos,fold))
pos = join(pos,fold)
make_folder(join(pos, fold))
pos = join(pos, fold)
logging.info(pos)
make_folder(songpath)
os.replace(join(".",x),join(songpath,x))
os.replace(join(".", x), join(songpath, x))
logging.info("moved song file, now downloading cover art")
img_data = requests.get(str(album_image_url)).content
with open(join(songpath,"Cover.jpg"),'wb') as handler:
# Download and save cover art
if spotify_data['image_url']:
img_data = requests.get(spotify_data['image_url']).content
with open(join(songpath, "Cover.jpg"), 'wb') as handler:
handler.write(img_data)
logging.info("done getting cover art!")
logging.info("now setting cover art..")
embed_music_file(join(songpath,x),join(songpath,"Cover.jpg"))
else:
logging.info("No album found on spotify")
return found
embed_music_file(join(songpath, x), join(songpath, "Cover.jpg"))
def check_spotify_and_save(spotify, audio,x: str) -> bool:
found = False
logging.info("Searching spotify for file " + x)
artist = ""
track = ""
return True
def save_track_from_spotify(spotify, audio, x: str, spotify_data: dict) -> bool:
"""
Save audio file with metadata and cover art from Spotify track data.
Args:
spotify: Spotify client instance
audio: Audio file object
x: Filename
spotify_data: Dict with track data from spotify_search.search_track()
Returns:
True if successful, False otherwise
"""
if not spotify_data or not spotify_data.get('found'):
logging.info("No Spotify track data provided")
return False
logging.info("Applying Spotify track data to file...")
# Get current artist value for comparison
if x.endswith(".flac"):
if audio["artist"] is not str:
artist = str(audio["artist"][0])
current_artist = str(audio.get("artist", [""])[0]) if not isinstance(audio.get("artist", ""), str) else str(audio.get("artist", ""))
else:
artist = str(audio["artist"])
if audio["title"] is not str:
track = str(audio["title"][0])
else:
track = str(audio["title"])
else:
# Prefer 'artist' and 'title' tags if available, fallback to TPE2/TIT2
if "artist" in audio:
if audio["artist"] is not str:
artist = str(audio["artist"][0])
else:
artist = str(audio["artist"])
current_artist = str(audio["artist"][0]) if not isinstance(audio["artist"], str) else str(audio["artist"])
elif "TPE2" in audio:
if audio["TPE2"] is not str:
artist = str(audio["TPE2"][0])
current_artist = str(audio["TPE2"][0]) if not isinstance(audio["TPE2"], str) else str(audio["TPE2"])
else:
artist = str(audio["TPE2"])
else:
artist = "Unknown Artist"
current_artist = "Unknown Artist"
if "title" in audio:
if audio["title"] is not str:
track = str(audio["title"][0])
else:
track = str(audio["title"])
elif "TIT2" in audio:
if audio["TIT2"] is not str:
track = str(audio["TIT2"][0])
else:
track = str(audio["TIT2"])
else:
track = "Unknown Title"
querystring = "artist:{0} track:{1}".format(artist.split("\00")[0],track)
logging.info("query string: " + querystring)
results = spotify.search(q=querystring,type='track')
if (len(results['tracks']['items']) > 0):
logging.info("track found on spotify!")
found = True
album = results['tracks']['items'][0]["album"]
found_artist = album["artists"][0]["name"]
if (found_artist != artist):
logging.info("Changing album artist from " + artist + " to " + found_artist)
# Set artist if different
found_artist = spotify_data['artist']
if found_artist != current_artist:
logging.info("Changing album artist from " + current_artist + " to " + found_artist)
if x.endswith(".flac"):
audio["album_artist"] = found_artist
else:
audio["TPE2"] = TPE2(encoding=3,text=found_artist)
found_album = album["name"]
audio["TPE2"] = TPE2(encoding=3, text=found_artist)
# Set album
found_album = spotify_data['album']
logging.info("found album name: " + found_album)
if (len(found_album) > 0):
if len(found_album) > 0:
if x.endswith(".flac"):
audio["album"] = found_album
else:
audio["TALB"] = TALB(encoding=3,text=found_album)
audio["TALB"] = TALB(encoding=3, text=found_album)
else:
# set album to title if no album found
if (x.endswith(".flac")):
if x.endswith(".flac"):
audio["album"] = audio["title"][0]
else:
audio["TALB"] = TALB(encoding=3,text=str(audio["TIT2"]))
audio["TALB"] = TALB(encoding=3, text=str(audio["TIT2"]))
# Add current date/time and CPU/RAM usage
# Add system info to comment
now = dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cpu_percent = psutil.cpu_percent(interval=1)
ram_percent = psutil.virtual_memory().percent
sysinfo = f"This album was downloaded on {now}. The server was using {cpu_percent}% CPU and {ram_percent}% RAM."
# Try to get album description (Spotify API does not provide a direct description, but label is available)
album_label = album.get("label", "")
album_desc = ""
if album_label:
album_desc = f"Label: {album_label}. "
# Some albums may have a 'description' field, but it's rare. If present, add it.
if "description" in album:
album_desc += f"Description: {album['description']} "
comment = "Spotify ID: {0}. This album was released on: {1}, total tracks in album: {2}. This album has {3} version(s). {4} {5}".format(album["id"],album["release_date"], album["total_tracks"],len(results["tracks"]["items"]), album_desc, sysinfo)
# Build comment from album metadata
album_label = spotify_data.get('label', '')
album_desc = f"Label: {album_label}. " if album_label else ""
comment = "Spotify ID: {0}. This album was released on: {1}, total tracks in album: {2}. This album has {3} version(s). {4} {5}".format(
spotify_data['album_id'],
spotify_data['release_date'],
spotify_data['total_tracks'],
spotify_data['versions_count'],
album_desc,
sysinfo
)
logging.info("Comment: " + comment)
if x.endswith(".flac"):
audio["comment"] = comment
else:
audio["COMM"] = COMM(encoding=3,text=comment)
audio["COMM"] = COMM(encoding=3, text=comment)
# Parse release date to extract year
# Parse and set release date
release_date = spotify_data['release_date']
try:
year = str(datetime.strptime(album["release_date"], '%Y-%m-%d').year)
year = str(datetime.strptime(release_date, '%Y-%m-%d').year)
except:
try:
year = str(datetime.strptime(album["release_date"], '%Y-%m').year)
year = str(datetime.strptime(release_date, '%Y-%m').year)
except:
try:
year = str(datetime.strptime(album["release_date"], '%Y').year)
year = str(datetime.strptime(release_date, '%Y').year)
except Exception as err:
logging.info(err)
year = str(album["release_date"])
year = str(release_date)
if x.endswith(".flac"):
audio["year"] = year
audio["date"] = album["release_date"]
audio["date"] = release_date
else:
audio["TDRC"] = TDRC(encoding=3,text=year)
audio["TDRL"] = TDRL(encoding=3,text=album["release_date"])
audio["TDRC"] = TDRC(encoding=3, text=year)
audio["TDRL"] = TDRL(encoding=3, text=release_date)
# Set track number
if x.endswith(".flac"):
audio["TRACKNUMBER"] = str(results['tracks']['items'][0]["track_number"]) +"/" + str(album["total_tracks"])
audio["TRACKNUMBER"] = str(spotify_data['track_number']) + "/" + str(spotify_data['total_tracks'])
else:
audio["TRCK"] = TRCK(encoding=3,text=str(results['tracks']['items'][0]["track_number"]) +"/" + str(album["total_tracks"]))
audio["TRCK"] = TRCK(encoding=3, text=str(spotify_data['track_number']) + "/" + str(spotify_data['total_tracks']))
# Set popularity
if x.endswith(".flac"):
audio["popularity"] = str(results['tracks']['items'][0]["popularity"])
audio["popularity"] = str(spotify_data['popularity'])
else:
audio["POPM"] = POPM(encoding=3,text=str(results['tracks']['items'][0]["popularity"]))
audio["POPM"] = POPM(encoding=3, text=str(spotify_data['popularity']))
# Set genres
logging.info("genres: " + str(spotify_data['genres']))
set_genre_tag(spotify_data['genres'], audio)
found_image_url = album["images"][0]["url"]
logging.info("found cover art image at " + str(found_image_url))
artist_search = spotify.artist(results['tracks']['items'][0]['artists'][0]['external_urls']['spotify'])
logging.info("genres: " + str(artist_search['genres']))
check_spotify_genre(artist_search['genres'],audio)
# remove ID3 tags if it's a flac file, otherwise it will throw an error
remove_flac_ID3_tags(audio,x)
# Save tags
remove_flac_ID3_tags(audio, x)
audio.save(x)
artist_path = ""
songpath = ""
# Create folder structure
if x.endswith(".flac"):
artist_path = str(audio["artist"][0])
else:
@@ -678,36 +652,39 @@ def check_spotify_and_save(spotify, audio,x: str) -> bool:
logging.info("artist path: " + artist_path)
if x.endswith(".flac"):
songpath = join(".",artist_path,str(audio["ALBUM"][0]))
songpath = join(".", artist_path, str(audio["ALBUM"][0]))
else:
songpath = join(".",artist_path,str(audio["TALB"]))
songpath = join(".", artist_path, str(audio["TALB"]))
logging.info("song path: " + songpath)
make_folder(join(".",artist_path))
make_folder(join(".", artist_path))
if (not x.endswith(".flac") and "/" in str(audio["TALB"])):
# Handle albums with / in the name
if not x.endswith(".flac") and "/" in str(audio["TALB"]):
logging.info("album contains /")
folders = str(audio["TALB"]).split('/')
logging.info(folders)
pos = join(".",str(audio["TPE2"]))
pos = join(".", str(audio["TPE2"]))
for fold in folders:
make_folder(join(pos,fold))
pos = join(pos,fold)
make_folder(join(pos, fold))
pos = join(pos, fold)
logging.info(pos)
make_folder(songpath)
os.replace(join(".",x),join(songpath,x))
os.replace(join(".", x), join(songpath, x))
logging.info("moved song file, now downloading cover art")
img_data = requests.get(str(found_image_url)).content
with open(join(songpath,"Cover.jpg"),'wb') as handler:
# Download and save cover art
if spotify_data['image_url']:
img_data = requests.get(spotify_data['image_url']).content
with open(join(songpath, "Cover.jpg"), 'wb') as handler:
handler.write(img_data)
logging.info("done getting cover art!")
logging.info("now setting cover art..")
embed_music_file(join(songpath,x),join(songpath,"Cover.jpg"))
embed_music_file(join(songpath, x), join(songpath, "Cover.jpg"))
return found
return True
def main():
# Preprocess: rename files with '- Topic -' in the name to 'artist - title'
@@ -727,7 +704,7 @@ def main():
# for spotipy to be able to log in, the environment variables SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET have to be set
# these can be obtained from the spotify developer dashboard
# they are defined in /etc/profile.d/spotipy.sh
spotify = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
spotify = spotify_search.init_spotify_client()
onlyfiles = [f for f in listdir(".") if (isfile(join(".",f)) and f.split(".")[-1] in ['mp3','mp4','ogg','wav','flac','m4a','MP3','FLAC','OGG','MP4','WAV','M4A'])]
# TIT2 = title,
@@ -823,8 +800,52 @@ def main():
if (has_valid_artist and has_valid_title):
found = False
# Extract artist and title for search
artist = ""
track = ""
if x.endswith(".flac"):
if audio["artist"] is not str:
artist = str(audio["artist"][0])
else:
artist = str(audio["artist"])
if audio["title"] is not str:
track = str(audio["title"][0])
else:
track = str(audio["title"])
else:
# Prefer 'artist' and 'title' tags if available, fallback to TPE2/TIT2
if "artist" in audio:
if audio["artist"] is not str:
artist = str(audio["artist"][0])
else:
artist = str(audio["artist"])
elif "TPE2" in audio:
if audio["TPE2"] is not str:
artist = str(audio["TPE2"][0])
else:
artist = str(audio["TPE2"])
else:
artist = "Unknown Artist"
if "title" in audio:
if audio["title"] is not str:
track = str(audio["title"][0])
else:
track = str(audio["title"])
elif "TIT2" in audio:
if audio["TIT2"] is not str:
track = str(audio["TIT2"][0])
else:
track = str(audio["TIT2"])
else:
track = "Unknown Title"
# Search Spotify for the track
try:
found = check_spotify_and_save(spotify, audio,x)
spotify_data = spotify_search.search_track(spotify, artist, track)
if spotify_data:
found = save_track_from_spotify(spotify, audio, x, spotify_data)
except Exception as err:
logging.error("could not find track on spotify: " + str(err))
logging.error(err.with_traceback)
@@ -850,7 +871,23 @@ def main():
logging.info("spotify did not find artist and track, searching for album...")
if (has_valid_album):
album_found = check_spotify_album_and_save(spotify,audio,x)
# Extract artist and album for search
search_artist = ""
search_album = ""
if x.endswith(".flac"):
search_artist = str(audio["artist"])
search_album = str(audio["album"])
else:
search_artist = str(audio["TPE2"])
search_album = str(audio["TALB"])
# Search Spotify for the album
album_data = spotify_search.search_album(spotify, search_artist, search_album)
if album_data:
album_found = save_album_from_spotify(spotify, audio, x, album_data)
else:
album_found = False
if (album_found == False):
logging.info("Nothing found on spotify, searching Google Images...")
search_google_images_and_save(x, audio)

185
spotify_search.py Normal file
View File

@@ -0,0 +1,185 @@
import logging
import time
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
logging.basicConfig(
level=logging.INFO,
format="{asctime} - {levelname} - [{funcName}:{lineno}] - {message}",
style="{",
datefmt="%Y-%m-%d %H:%M",
)
def init_spotify_client():
"""
Initialize and return a Spotify client.
Requires SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET environment variables.
"""
return spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
def search_track(spotify, artist, title):
"""
Search for a track on Spotify.
Args:
spotify: Spotify client instance
artist: Artist name (string)
title: Track title (string)
Returns:
dict with track data if found, None otherwise. Structure:
{
'found': True,
'artist': str,
'album': str,
'album_id': str,
'release_date': str,
'release_date_precision': str,
'total_tracks': int,
'track_number': int,
'popularity': int,
'image_url': str,
'genres': list,
'label': str,
'versions_count': int
}
"""
try:
querystring = "artist:{0} track:{1}".format(artist.split("\00")[0], title)
logging.info("Searching Spotify for track with query: " + querystring)
results = spotify.search(q=querystring, type='track')
if len(results['tracks']['items']) > 0:
logging.info("Track found on Spotify!")
track = results['tracks']['items'][0]
album = track["album"]
# Get artist genres
artist_search = spotify.artist(track['artists'][0]['external_urls']['spotify'])
genres = artist_search.get('genres', [])
# Build response
data = {
'found': True,
'artist': album["artists"][0]["name"],
'album': album["name"],
'album_id': album["id"],
'release_date': album["release_date"],
'release_date_precision': album.get("release_date_precision", "day"),
'total_tracks': album["total_tracks"],
'track_number': track["track_number"],
'popularity': track["popularity"],
'image_url': album["images"][0]["url"] if album["images"] else None,
'genres': genres,
'label': album.get("label", ""),
'versions_count': len(results["tracks"]["items"])
}
logging.info(f"Found: {data['artist']} - {data['album']}")
return data
else:
logging.info("No track found on Spotify")
return None
except Exception as err:
logging.error(f"Error searching for track on Spotify: {err}")
return None
def search_album(spotify, artist, album_name):
"""
Search for an album on Spotify.
Args:
spotify: Spotify client instance
artist: Artist name (string)
album_name: Album name (string)
Returns:
dict with album data if found, None otherwise. Structure:
{
'found': True,
'artist': str,
'album': str,
'album_id': str,
'release_date': str,
'release_date_precision': str,
'total_tracks': int,
'image_url': str,
'genres': list,
'versions_count': int
}
"""
try:
querystring = "artist:{0} album:{1}".format(artist, album_name)
logging.info("Searching Spotify for album with query: " + querystring)
tries = 0
found = False
results = None
while tries < 5 and not found:
try:
results = spotify.search(q=querystring, type='album')
found = True
except Exception as err:
logging.error(f"Could not search on Spotify: {err}")
logging.info("Waiting 30 seconds before trying again")
time.sleep(30)
tries += 1
if not found or not results:
logging.error("Could not search on Spotify after 5 tries")
return None
if len(results["albums"]["items"]) > 0:
logging.info("Album found on Spotify!")
album = results["albums"]["items"][0]
# Get artist genres
artist_search = spotify.artist(album['artists'][0]['external_urls']['spotify'])
genres = artist_search.get('genres', [])
# Build response
data = {
'found': True,
'artist': album["artists"][0]["name"],
'album': album["name"],
'album_id': album["id"],
'release_date': album["release_date"],
'release_date_precision': album.get("release_date_precision", "day"),
'total_tracks': album["total_tracks"],
'image_url': album["images"][0]["url"] if album["images"] else None,
'genres': genres,
'versions_count': len(results["albums"]["items"])
}
logging.info(f"Found: {data['artist']} - {data['album']}")
return data
else:
logging.info("No album found on Spotify")
return None
except Exception as err:
logging.error(f"Error searching for album on Spotify: {err}")
return None
def format_genres(genres):
"""
Format a list of genres into a comma-separated string.
Args:
genres: list of genre strings
Returns:
Comma-separated string of genres, or empty string if no genres
"""
if not genres or len(genres) == 0:
return ""
elif len(genres) == 1:
return str(genres[0])
else:
return ",".join(genres)