Compare commits
2 Commits
bugfix/6-f
...
feature/se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97b3ebb41f | ||
|
|
e8552e6429 |
397
make_folders.py
397
make_folders.py
@@ -5,7 +5,7 @@ from mutagen.mp3 import MP3
|
|||||||
from mutagen.id3 import ID3
|
from mutagen.id3 import ID3
|
||||||
from mutagen.wave import WAVE
|
from mutagen.wave import WAVE
|
||||||
from mutagen.oggvorbis import OggVorbis
|
from mutagen.oggvorbis import OggVorbis
|
||||||
from mutagen.id3 import ID3, TIT2, TALB, TPE1, TPE2, COMM, TCOM, TCON, TDRC, TRCK, TPUB, POPM, APIC
|
from mutagen.id3 import ID3, TIT2, TALB, TPE1, TPE2, COMM, TCOM, TCON, TDRC, TDRL, TRCK, TPUB, POPM, APIC
|
||||||
from mutagen.easyid3 import EasyID3
|
from mutagen.easyid3 import EasyID3
|
||||||
import os
|
import os
|
||||||
from os import listdir
|
from os import listdir
|
||||||
@@ -15,13 +15,10 @@ import datetime as dt
|
|||||||
|
|
||||||
from icrawler.builtin import GoogleImageCrawler
|
from icrawler.builtin import GoogleImageCrawler
|
||||||
|
|
||||||
import spotipy
|
|
||||||
from spotipy.oauth2 import SpotifyClientCredentials
|
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
import spotify_search
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
import time
|
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
@@ -102,6 +99,10 @@ def search_google_images_and_save(x: str, audio):
|
|||||||
else:
|
else:
|
||||||
artist_val = str(audio.get("TPE2", audio.get("TPE1", "Unknown Artist")))
|
artist_val = str(audio.get("TPE2", audio.get("TPE1", "Unknown Artist")))
|
||||||
album_val = str(audio.get("TALB", "Unknown Album"))
|
album_val = str(audio.get("TALB", "Unknown Album"))
|
||||||
|
# Skip Google search if artist or album is unknown
|
||||||
|
if "Unknown Artist" in artist_val or "Unknown Album" in album_val:
|
||||||
|
logging.info("Artist or album is unknown, skipping Google image search")
|
||||||
|
else:
|
||||||
google_keyword = artist_val + " " + album_val + " album"
|
google_keyword = artist_val + " " + album_val + " album"
|
||||||
logging.info("Moved file! Now searching for album art... keyword is " + google_keyword)
|
logging.info("Moved file! Now searching for album art... keyword is " + google_keyword)
|
||||||
google_Crawler = GoogleImageCrawler(storage = {'root_dir': songpath})
|
google_Crawler = GoogleImageCrawler(storage = {'root_dir': songpath})
|
||||||
@@ -129,7 +130,13 @@ def search_google_images_and_save(x: str, audio):
|
|||||||
songpath = join(".",str(audio.get("TPE2", "Unknown Artist")),str(audio.get("TIT2", "Unknown Title")))
|
songpath = join(".",str(audio.get("TPE2", "Unknown Artist")),str(audio.get("TIT2", "Unknown Title")))
|
||||||
make_folder(songpath)
|
make_folder(songpath)
|
||||||
os.replace(join(".",x),join(songpath,x))
|
os.replace(join(".",x),join(songpath,x))
|
||||||
song_keyword = str(audio.get("TPE2", "Unknown Artist")) + " " + str(audio.get("TIT2", "Unknown Title"))
|
artist_keyword = str(audio.get("TPE2", "Unknown Artist"))
|
||||||
|
title_keyword = str(audio.get("TIT2", "Unknown Title"))
|
||||||
|
# Skip Google search if artist or title is unknown
|
||||||
|
if "Unknown Artist" in artist_keyword or "Unknown Title" in title_keyword:
|
||||||
|
logging.info("Artist or title is unknown, skipping Google image search")
|
||||||
|
else:
|
||||||
|
song_keyword = artist_keyword + " " + title_keyword
|
||||||
logging.info("Moved file! Now searching for album art... keyword is " + song_keyword)
|
logging.info("Moved file! Now searching for album art... keyword is " + song_keyword)
|
||||||
google_Crawler = GoogleImageCrawler(storage = {'root_dir': songpath})
|
google_Crawler = GoogleImageCrawler(storage = {'root_dir': songpath})
|
||||||
try:
|
try:
|
||||||
@@ -176,6 +183,11 @@ def create_ID3_tag(audio, tagname: str, textvalue: str):
|
|||||||
audio[tagname] = TDRC(encoding=3,text=textvalue)
|
audio[tagname] = TDRC(encoding=3,text=textvalue)
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
elif tagname == "TDRL":
|
||||||
|
try:
|
||||||
|
audio[tagname] = TDRL(encoding=3,text=textvalue)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
elif tagname == "TPUB":
|
elif tagname == "TPUB":
|
||||||
audio[tagname] = TPUB(encoding=3,text=textvalue)
|
audio[tagname] = TPUB(encoding=3,text=textvalue)
|
||||||
|
|
||||||
@@ -355,7 +367,8 @@ def check_artist(audio, filename: str) -> bool:
|
|||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def check_spotify_genre(genres,audio):
|
def set_genre_tag(genres, audio):
|
||||||
|
"""Apply genre tags to audio file from Spotify genres list."""
|
||||||
genre = ""
|
genre = ""
|
||||||
if (len(genres) > 0):
|
if (len(genres) > 0):
|
||||||
if (len(genres) == 1):
|
if (len(genres) == 1):
|
||||||
@@ -388,98 +401,106 @@ def embed_music_file(audiostr: str, coverfile: str):
|
|||||||
logging.info("could not embed music file")
|
logging.info("could not embed music file")
|
||||||
|
|
||||||
|
|
||||||
def check_spotify_album_and_save(spotify, audio,x: str) -> bool:
|
def save_album_from_spotify(spotify, audio, x: str, spotify_data: dict) -> bool:
|
||||||
found = False
|
"""
|
||||||
logging.info("Searching on spotify for album...")
|
Save audio file with metadata and cover art from Spotify album data.
|
||||||
querystring = ""
|
|
||||||
if x.endswith(".flac"):
|
|
||||||
querystring = "artist:{0} album:{1}".format(str(audio["artist"]),str(audio["album"]))
|
|
||||||
else:
|
|
||||||
querystring = "artist:{0} album:{1}".format(str(audio["TPE2"]),str(audio["TALB"]))
|
|
||||||
|
|
||||||
logging.info("query string: " + querystring)
|
Args:
|
||||||
tries = 0
|
spotify: Spotify client instance
|
||||||
found = False
|
audio: Audio file object
|
||||||
while (tries < 5 and found == False):
|
x: Filename
|
||||||
try:
|
spotify_data: Dict with album data from spotify_search.search_album()
|
||||||
results = spotify.search(q=querystring,type='album')
|
|
||||||
found = True
|
|
||||||
except Exception as err:
|
|
||||||
logging.error("could not search on spotify")
|
|
||||||
logging.error(err)
|
|
||||||
logging.info("waiting 30 seconds before trying again")
|
|
||||||
time.sleep(30)
|
|
||||||
tries += 1
|
|
||||||
|
|
||||||
if (found == False):
|
Returns:
|
||||||
logging.error("could not search on spotify after 5 tries, aborting")
|
True if successful, False otherwise
|
||||||
|
"""
|
||||||
|
if not spotify_data or not spotify_data.get('found'):
|
||||||
|
logging.info("No Spotify album data provided")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if (len(results["albums"]["items"]) > 0):
|
logging.info("Applying Spotify album data to file...")
|
||||||
logging.info("album found on spotify!")
|
|
||||||
found = True
|
|
||||||
album = results["albums"]["items"][0]
|
|
||||||
album_artist = album["artists"][0]["name"]
|
|
||||||
|
|
||||||
if (x.endswith(".flac")):
|
# Set artist
|
||||||
|
album_artist = spotify_data['artist']
|
||||||
|
if x.endswith(".flac"):
|
||||||
try:
|
try:
|
||||||
if str(audio["album_artist"]) != album_artist:
|
if str(audio.get("album_artist", "")) != album_artist:
|
||||||
audio["album_artist"] = album_artist
|
audio["album_artist"] = album_artist
|
||||||
except:
|
except:
|
||||||
audio["album_artist"] = album_artist
|
audio["album_artist"] = album_artist
|
||||||
else:
|
else:
|
||||||
if (str(audio["TPE2"]) != album_artist):
|
if str(audio.get("TPE2", "")) != album_artist:
|
||||||
audio["TPE2"] = TPE2(encoding=3, text=album_artist)
|
audio["TPE2"] = TPE2(encoding=3, text=album_artist)
|
||||||
|
|
||||||
album_image_url = album["images"][0]["url"]
|
# Set album
|
||||||
album_name = album["name"]
|
album_name = spotify_data['album']
|
||||||
|
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
if str(audio["album"] != album_name):
|
if str(audio.get("album", "")) != album_name:
|
||||||
audio["album"] = album_name
|
audio["album"] = album_name
|
||||||
elif (str(audio["TALB"]) != album_name):
|
elif str(audio.get("TALB", "")) != album_name:
|
||||||
audio["TALB"] = TALB(encoding=3, text=album_name)
|
audio["TALB"] = TALB(encoding=3, text=album_name)
|
||||||
|
|
||||||
if (x.endswith(".flac")):
|
# Parse and set release date
|
||||||
audio["year"] = album["release_date"]
|
release_date = spotify_data['release_date']
|
||||||
|
try:
|
||||||
|
year = str(datetime.strptime(release_date, '%Y-%m-%d').year)
|
||||||
|
except:
|
||||||
|
try:
|
||||||
|
year = str(datetime.strptime(release_date, '%Y-%m').year)
|
||||||
|
except:
|
||||||
|
try:
|
||||||
|
year = str(datetime.strptime(release_date, '%Y').year)
|
||||||
|
except:
|
||||||
|
year = str(release_date)
|
||||||
|
|
||||||
|
if x.endswith(".flac"):
|
||||||
|
audio["year"] = year
|
||||||
|
audio["date"] = release_date
|
||||||
else:
|
else:
|
||||||
audio["TDRC"] = TDRC(encoding=3,text=album["release_date"])
|
audio["TDRC"] = TDRC(encoding=3, text=year)
|
||||||
|
audio["TDRL"] = TDRL(encoding=3, text=release_date)
|
||||||
|
|
||||||
artist_search = spotify.artist(album['artists'][0]['external_urls']['spotify'])
|
# Set genres
|
||||||
logging.info("genres: " + str(artist_search['genres']))
|
logging.info("genres: " + str(spotify_data['genres']))
|
||||||
check_spotify_genre(artist_search['genres'],audio)
|
set_genre_tag(spotify_data['genres'], audio)
|
||||||
|
|
||||||
comment ="Spotify ID: {0}. Release date precision: {1}, total tracks in album: {2}. This album has {3} version(s)".format(album["id"],album["release_date_precision"], album["total_tracks"],len(results["albums"]["items"]))
|
# Set comment
|
||||||
|
comment = "Spotify ID: {0}. Release date precision: {1}, total tracks in album: {2}. This album has {3} version(s)".format(
|
||||||
|
spotify_data['album_id'],
|
||||||
|
spotify_data['release_date_precision'],
|
||||||
|
spotify_data['total_tracks'],
|
||||||
|
spotify_data['versions_count']
|
||||||
|
)
|
||||||
logging.info("Comment: " + comment)
|
logging.info("Comment: " + comment)
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
audio["comment"] = audio["comment"] + comment
|
audio["comment"] = audio.get("comment", "") + comment
|
||||||
else:
|
else:
|
||||||
audio["COMM"] = COMM(encoding=3,text=comment + audio["COMM"])
|
audio["COMM"] = COMM(encoding=3, text=comment + str(audio.get("COMM", "")))
|
||||||
|
|
||||||
|
# Save tags
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
remove_flac_ID3_tags(audio, x)
|
remove_flac_ID3_tags(audio, x)
|
||||||
|
|
||||||
audio.save(x)
|
audio.save(x)
|
||||||
|
|
||||||
songpath = ""
|
# Create folder structure
|
||||||
|
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
songpath = join(".",str(audio["artist"][0]),str(audio["album"][0]))
|
artist_path = str(audio["artist"][0])
|
||||||
make_folder(join(".",str(audio["artist"][0])))
|
album_path = str(audio["album"][0])
|
||||||
else:
|
else:
|
||||||
if "/" in audio["TPE2"]:
|
if "/" in str(audio["TPE2"]):
|
||||||
audio["TPE2"] = audio["TPE2"].replace("/","")
|
audio["TPE2"] = str(audio["TPE2"]).replace("/", "")
|
||||||
songpath = join(".",str(audio["TPE2"]),str(audio["TALB"]))
|
artist_path = str(audio["TPE2"])
|
||||||
make_folder(join(".",str(audio["TPE2"])))
|
album_path = str(audio["TALB"])
|
||||||
|
|
||||||
|
songpath = join(".", artist_path, album_path)
|
||||||
|
make_folder(join(".", artist_path))
|
||||||
|
|
||||||
|
# Handle albums with / in the name
|
||||||
if (not x.endswith(".flac")):
|
if not x.endswith(".flac") and "/" in album_path:
|
||||||
if ("/" in str(audio["TALB"])):
|
|
||||||
logging.info("album contains /")
|
logging.info("album contains /")
|
||||||
folders = str(audio["TALB"]).split('/')
|
folders = album_path.split('/')
|
||||||
logging.info(folders)
|
logging.info(folders)
|
||||||
pos = join(".",str(audio["TPE2"]))
|
pos = join(".", artist_path)
|
||||||
for fold in folders:
|
for fold in folders:
|
||||||
make_folder(join(pos, fold))
|
make_folder(join(pos, fold))
|
||||||
pos = join(pos, fold)
|
pos = join(pos, fold)
|
||||||
@@ -489,149 +510,138 @@ def check_spotify_album_and_save(spotify, audio,x: str) -> bool:
|
|||||||
os.replace(join(".", x), join(songpath, x))
|
os.replace(join(".", x), join(songpath, x))
|
||||||
logging.info("moved song file, now downloading cover art")
|
logging.info("moved song file, now downloading cover art")
|
||||||
|
|
||||||
img_data = requests.get(str(album_image_url)).content
|
# Download and save cover art
|
||||||
|
if spotify_data['image_url']:
|
||||||
|
img_data = requests.get(spotify_data['image_url']).content
|
||||||
with open(join(songpath, "Cover.jpg"), 'wb') as handler:
|
with open(join(songpath, "Cover.jpg"), 'wb') as handler:
|
||||||
handler.write(img_data)
|
handler.write(img_data)
|
||||||
logging.info("done getting cover art!")
|
logging.info("done getting cover art!")
|
||||||
|
|
||||||
logging.info("now setting cover art..")
|
logging.info("now setting cover art..")
|
||||||
embed_music_file(join(songpath, x), join(songpath, "Cover.jpg"))
|
embed_music_file(join(songpath, x), join(songpath, "Cover.jpg"))
|
||||||
else:
|
|
||||||
logging.info("No album found on spotify")
|
|
||||||
return found
|
|
||||||
|
|
||||||
def check_spotify_and_save(spotify, audio,x: str) -> bool:
|
return True
|
||||||
found = False
|
|
||||||
logging.info("Searching spotify for file " + x)
|
def save_track_from_spotify(spotify, audio, x: str, spotify_data: dict) -> bool:
|
||||||
artist = ""
|
"""
|
||||||
track = ""
|
Save audio file with metadata and cover art from Spotify track data.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
spotify: Spotify client instance
|
||||||
|
audio: Audio file object
|
||||||
|
x: Filename
|
||||||
|
spotify_data: Dict with track data from spotify_search.search_track()
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if successful, False otherwise
|
||||||
|
"""
|
||||||
|
if not spotify_data or not spotify_data.get('found'):
|
||||||
|
logging.info("No Spotify track data provided")
|
||||||
|
return False
|
||||||
|
|
||||||
|
logging.info("Applying Spotify track data to file...")
|
||||||
|
|
||||||
|
# Get current artist value for comparison
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
if audio["artist"] is not str:
|
current_artist = str(audio.get("artist", [""])[0]) if not isinstance(audio.get("artist", ""), str) else str(audio.get("artist", ""))
|
||||||
artist = str(audio["artist"][0])
|
|
||||||
else:
|
else:
|
||||||
artist = str(audio["artist"])
|
|
||||||
if audio["title"] is not str:
|
|
||||||
track = str(audio["title"][0])
|
|
||||||
else:
|
|
||||||
track = str(audio["title"])
|
|
||||||
else:
|
|
||||||
# Prefer 'artist' and 'title' tags if available, fallback to TPE2/TIT2
|
|
||||||
if "artist" in audio:
|
if "artist" in audio:
|
||||||
if audio["artist"] is not str:
|
current_artist = str(audio["artist"][0]) if not isinstance(audio["artist"], str) else str(audio["artist"])
|
||||||
artist = str(audio["artist"][0])
|
|
||||||
else:
|
|
||||||
artist = str(audio["artist"])
|
|
||||||
elif "TPE2" in audio:
|
elif "TPE2" in audio:
|
||||||
if audio["TPE2"] is not str:
|
current_artist = str(audio["TPE2"][0]) if not isinstance(audio["TPE2"], str) else str(audio["TPE2"])
|
||||||
artist = str(audio["TPE2"][0])
|
|
||||||
else:
|
else:
|
||||||
artist = str(audio["TPE2"])
|
current_artist = "Unknown Artist"
|
||||||
else:
|
|
||||||
artist = "Unknown Artist"
|
|
||||||
|
|
||||||
if "title" in audio:
|
# Set artist if different
|
||||||
if audio["title"] is not str:
|
found_artist = spotify_data['artist']
|
||||||
track = str(audio["title"][0])
|
if found_artist != current_artist:
|
||||||
else:
|
logging.info("Changing album artist from " + current_artist + " to " + found_artist)
|
||||||
track = str(audio["title"])
|
|
||||||
elif "TIT2" in audio:
|
|
||||||
if audio["TIT2"] is not str:
|
|
||||||
track = str(audio["TIT2"][0])
|
|
||||||
else:
|
|
||||||
track = str(audio["TIT2"])
|
|
||||||
else:
|
|
||||||
track = "Unknown Title"
|
|
||||||
|
|
||||||
querystring = "artist:{0} track:{1}".format(artist.split("\00")[0],track)
|
|
||||||
logging.info("query string: " + querystring)
|
|
||||||
results = spotify.search(q=querystring,type='track')
|
|
||||||
|
|
||||||
if (len(results['tracks']['items']) > 0):
|
|
||||||
logging.info("track found on spotify!")
|
|
||||||
found = True
|
|
||||||
album = results['tracks']['items'][0]["album"]
|
|
||||||
found_artist = album["artists"][0]["name"]
|
|
||||||
if (found_artist != artist):
|
|
||||||
logging.info("Changing album artist from " + artist + " to " + found_artist)
|
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
audio["album_artist"] = found_artist
|
audio["album_artist"] = found_artist
|
||||||
else:
|
else:
|
||||||
audio["TPE2"] = TPE2(encoding=3, text=found_artist)
|
audio["TPE2"] = TPE2(encoding=3, text=found_artist)
|
||||||
found_album = album["name"]
|
|
||||||
|
# Set album
|
||||||
|
found_album = spotify_data['album']
|
||||||
logging.info("found album name: " + found_album)
|
logging.info("found album name: " + found_album)
|
||||||
if (len(found_album) > 0):
|
if len(found_album) > 0:
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
audio["album"] = found_album
|
audio["album"] = found_album
|
||||||
else:
|
else:
|
||||||
audio["TALB"] = TALB(encoding=3, text=found_album)
|
audio["TALB"] = TALB(encoding=3, text=found_album)
|
||||||
else:
|
else:
|
||||||
# set album to title if no album found
|
# set album to title if no album found
|
||||||
if (x.endswith(".flac")):
|
if x.endswith(".flac"):
|
||||||
audio["album"] = audio["title"][0]
|
audio["album"] = audio["title"][0]
|
||||||
else:
|
else:
|
||||||
audio["TALB"] = TALB(encoding=3, text=str(audio["TIT2"]))
|
audio["TALB"] = TALB(encoding=3, text=str(audio["TIT2"]))
|
||||||
|
|
||||||
|
# Add system info to comment
|
||||||
# Add current date/time and CPU/RAM usage
|
|
||||||
now = dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
now = dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||||
cpu_percent = psutil.cpu_percent(interval=1)
|
cpu_percent = psutil.cpu_percent(interval=1)
|
||||||
ram_percent = psutil.virtual_memory().percent
|
ram_percent = psutil.virtual_memory().percent
|
||||||
sysinfo = f"This album was downloaded on {now}. The server was using {cpu_percent}% CPU and {ram_percent}% RAM."
|
sysinfo = f"This album was downloaded on {now}. The server was using {cpu_percent}% CPU and {ram_percent}% RAM."
|
||||||
|
|
||||||
# Try to get album description (Spotify API does not provide a direct description, but label is available)
|
# Build comment from album metadata
|
||||||
album_label = album.get("label", "")
|
album_label = spotify_data.get('label', '')
|
||||||
album_desc = ""
|
album_desc = f"Label: {album_label}. " if album_label else ""
|
||||||
if album_label:
|
|
||||||
album_desc = f"Label: {album_label}. "
|
comment = "Spotify ID: {0}. This album was released on: {1}, total tracks in album: {2}. This album has {3} version(s). {4} {5}".format(
|
||||||
# Some albums may have a 'description' field, but it's rare. If present, add it.
|
spotify_data['album_id'],
|
||||||
if "description" in album:
|
spotify_data['release_date'],
|
||||||
album_desc += f"Description: {album['description']} "
|
spotify_data['total_tracks'],
|
||||||
comment = "Spotify ID: {0}. This album was released on: {1}, total tracks in album: {2}. This album has {3} version(s). {4} {5}".format(album["id"],album["release_date"], album["total_tracks"],len(results["tracks"]["items"]), album_desc, sysinfo)
|
spotify_data['versions_count'],
|
||||||
|
album_desc,
|
||||||
|
sysinfo
|
||||||
|
)
|
||||||
logging.info("Comment: " + comment)
|
logging.info("Comment: " + comment)
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
audio["comment"] = comment
|
audio["comment"] = comment
|
||||||
else:
|
else:
|
||||||
audio["COMM"] = COMM(encoding=3, text=comment)
|
audio["COMM"] = COMM(encoding=3, text=comment)
|
||||||
|
|
||||||
|
# Parse and set release date
|
||||||
|
release_date = spotify_data['release_date']
|
||||||
try:
|
try:
|
||||||
year = str(datetime.strptime(album["release_date"], '%Y-%m-%d').year)
|
year = str(datetime.strptime(release_date, '%Y-%m-%d').year)
|
||||||
|
except:
|
||||||
if x.endswith(".flac"):
|
try:
|
||||||
audio["year"] = year
|
year = str(datetime.strptime(release_date, '%Y-%m').year)
|
||||||
else:
|
except:
|
||||||
audio["TDRC"] = TDRC(encoding=3,text=year)
|
try:
|
||||||
|
year = str(datetime.strptime(release_date, '%Y').year)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logging.info(err)
|
logging.info(err)
|
||||||
year = str(album["release_date"])
|
year = str(release_date)
|
||||||
|
|
||||||
|
if x.endswith(".flac"):
|
||||||
audio["year"] = year
|
audio["year"] = year
|
||||||
|
audio["date"] = release_date
|
||||||
|
|
||||||
if x.endswith(".flac"):
|
|
||||||
audio["TRACKNUMBER"] = str(results['tracks']['items'][0]["track_number"]) +"/" + str(album["total_tracks"])
|
|
||||||
else:
|
else:
|
||||||
audio["TRCK"] = TRCK(encoding=3,text=str(results['tracks']['items'][0]["track_number"]) +"/" + str(album["total_tracks"]))
|
audio["TDRC"] = TDRC(encoding=3, text=year)
|
||||||
|
audio["TDRL"] = TDRL(encoding=3, text=release_date)
|
||||||
|
|
||||||
|
# Set track number
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
audio["popularity"] = str(results['tracks']['items'][0]["popularity"])
|
audio["TRACKNUMBER"] = str(spotify_data['track_number']) + "/" + str(spotify_data['total_tracks'])
|
||||||
else:
|
else:
|
||||||
audio["POPM"] = POPM(encoding=3,text=str(results['tracks']['items'][0]["popularity"]))
|
audio["TRCK"] = TRCK(encoding=3, text=str(spotify_data['track_number']) + "/" + str(spotify_data['total_tracks']))
|
||||||
|
|
||||||
|
# Set popularity
|
||||||
|
if x.endswith(".flac"):
|
||||||
|
audio["popularity"] = str(spotify_data['popularity'])
|
||||||
|
else:
|
||||||
|
audio["POPM"] = POPM(encoding=3, text=str(spotify_data['popularity']))
|
||||||
|
|
||||||
found_image_url = album["images"][0]["url"]
|
# Set genres
|
||||||
logging.info("found cover art image at " + str(found_image_url))
|
logging.info("genres: " + str(spotify_data['genres']))
|
||||||
|
set_genre_tag(spotify_data['genres'], audio)
|
||||||
|
|
||||||
artist_search = spotify.artist(results['tracks']['items'][0]['artists'][0]['external_urls']['spotify'])
|
# Save tags
|
||||||
logging.info("genres: " + str(artist_search['genres']))
|
|
||||||
check_spotify_genre(artist_search['genres'],audio)
|
|
||||||
|
|
||||||
# remove ID3 tags if it's a flac file, otherwise it will throw an error
|
|
||||||
remove_flac_ID3_tags(audio, x)
|
remove_flac_ID3_tags(audio, x)
|
||||||
|
|
||||||
audio.save(x)
|
audio.save(x)
|
||||||
|
|
||||||
artist_path = ""
|
# Create folder structure
|
||||||
songpath = ""
|
|
||||||
if x.endswith(".flac"):
|
if x.endswith(".flac"):
|
||||||
artist_path = str(audio["artist"][0])
|
artist_path = str(audio["artist"][0])
|
||||||
else:
|
else:
|
||||||
@@ -649,7 +659,8 @@ def check_spotify_and_save(spotify, audio,x: str) -> bool:
|
|||||||
|
|
||||||
make_folder(join(".", artist_path))
|
make_folder(join(".", artist_path))
|
||||||
|
|
||||||
if (not x.endswith(".flac") and "/" in str(audio["TALB"])):
|
# Handle albums with / in the name
|
||||||
|
if not x.endswith(".flac") and "/" in str(audio["TALB"]):
|
||||||
logging.info("album contains /")
|
logging.info("album contains /")
|
||||||
folders = str(audio["TALB"]).split('/')
|
folders = str(audio["TALB"]).split('/')
|
||||||
logging.info(folders)
|
logging.info(folders)
|
||||||
@@ -663,7 +674,9 @@ def check_spotify_and_save(spotify, audio,x: str) -> bool:
|
|||||||
os.replace(join(".", x), join(songpath, x))
|
os.replace(join(".", x), join(songpath, x))
|
||||||
logging.info("moved song file, now downloading cover art")
|
logging.info("moved song file, now downloading cover art")
|
||||||
|
|
||||||
img_data = requests.get(str(found_image_url)).content
|
# Download and save cover art
|
||||||
|
if spotify_data['image_url']:
|
||||||
|
img_data = requests.get(spotify_data['image_url']).content
|
||||||
with open(join(songpath, "Cover.jpg"), 'wb') as handler:
|
with open(join(songpath, "Cover.jpg"), 'wb') as handler:
|
||||||
handler.write(img_data)
|
handler.write(img_data)
|
||||||
logging.info("done getting cover art!")
|
logging.info("done getting cover art!")
|
||||||
@@ -671,7 +684,7 @@ def check_spotify_and_save(spotify, audio,x: str) -> bool:
|
|||||||
logging.info("now setting cover art..")
|
logging.info("now setting cover art..")
|
||||||
embed_music_file(join(songpath, x), join(songpath, "Cover.jpg"))
|
embed_music_file(join(songpath, x), join(songpath, "Cover.jpg"))
|
||||||
|
|
||||||
return found
|
return True
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
# Preprocess: rename files with '- Topic -' in the name to 'artist - title'
|
# Preprocess: rename files with '- Topic -' in the name to 'artist - title'
|
||||||
@@ -691,7 +704,7 @@ def main():
|
|||||||
# for spotipy to be able to log in, the environment variables SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET have to be set
|
# for spotipy to be able to log in, the environment variables SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET have to be set
|
||||||
# these can be obtained from the spotify developer dashboard
|
# these can be obtained from the spotify developer dashboard
|
||||||
# they are defined in /etc/profile.d/spotipy.sh
|
# they are defined in /etc/profile.d/spotipy.sh
|
||||||
spotify = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
|
spotify = spotify_search.init_spotify_client()
|
||||||
|
|
||||||
onlyfiles = [f for f in listdir(".") if (isfile(join(".",f)) and f.split(".")[-1] in ['mp3','mp4','ogg','wav','flac','m4a','MP3','FLAC','OGG','MP4','WAV','M4A'])]
|
onlyfiles = [f for f in listdir(".") if (isfile(join(".",f)) and f.split(".")[-1] in ['mp3','mp4','ogg','wav','flac','m4a','MP3','FLAC','OGG','MP4','WAV','M4A'])]
|
||||||
# TIT2 = title,
|
# TIT2 = title,
|
||||||
@@ -787,8 +800,52 @@ def main():
|
|||||||
|
|
||||||
if (has_valid_artist and has_valid_title):
|
if (has_valid_artist and has_valid_title):
|
||||||
found = False
|
found = False
|
||||||
|
|
||||||
|
# Extract artist and title for search
|
||||||
|
artist = ""
|
||||||
|
track = ""
|
||||||
|
if x.endswith(".flac"):
|
||||||
|
if audio["artist"] is not str:
|
||||||
|
artist = str(audio["artist"][0])
|
||||||
|
else:
|
||||||
|
artist = str(audio["artist"])
|
||||||
|
if audio["title"] is not str:
|
||||||
|
track = str(audio["title"][0])
|
||||||
|
else:
|
||||||
|
track = str(audio["title"])
|
||||||
|
else:
|
||||||
|
# Prefer 'artist' and 'title' tags if available, fallback to TPE2/TIT2
|
||||||
|
if "artist" in audio:
|
||||||
|
if audio["artist"] is not str:
|
||||||
|
artist = str(audio["artist"][0])
|
||||||
|
else:
|
||||||
|
artist = str(audio["artist"])
|
||||||
|
elif "TPE2" in audio:
|
||||||
|
if audio["TPE2"] is not str:
|
||||||
|
artist = str(audio["TPE2"][0])
|
||||||
|
else:
|
||||||
|
artist = str(audio["TPE2"])
|
||||||
|
else:
|
||||||
|
artist = "Unknown Artist"
|
||||||
|
|
||||||
|
if "title" in audio:
|
||||||
|
if audio["title"] is not str:
|
||||||
|
track = str(audio["title"][0])
|
||||||
|
else:
|
||||||
|
track = str(audio["title"])
|
||||||
|
elif "TIT2" in audio:
|
||||||
|
if audio["TIT2"] is not str:
|
||||||
|
track = str(audio["TIT2"][0])
|
||||||
|
else:
|
||||||
|
track = str(audio["TIT2"])
|
||||||
|
else:
|
||||||
|
track = "Unknown Title"
|
||||||
|
|
||||||
|
# Search Spotify for the track
|
||||||
try:
|
try:
|
||||||
found = check_spotify_and_save(spotify, audio,x)
|
spotify_data = spotify_search.search_track(spotify, artist, track)
|
||||||
|
if spotify_data:
|
||||||
|
found = save_track_from_spotify(spotify, audio, x, spotify_data)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logging.error("could not find track on spotify: " + str(err))
|
logging.error("could not find track on spotify: " + str(err))
|
||||||
logging.error(err.with_traceback)
|
logging.error(err.with_traceback)
|
||||||
@@ -814,7 +871,23 @@ def main():
|
|||||||
|
|
||||||
logging.info("spotify did not find artist and track, searching for album...")
|
logging.info("spotify did not find artist and track, searching for album...")
|
||||||
if (has_valid_album):
|
if (has_valid_album):
|
||||||
album_found = check_spotify_album_and_save(spotify,audio,x)
|
# Extract artist and album for search
|
||||||
|
search_artist = ""
|
||||||
|
search_album = ""
|
||||||
|
if x.endswith(".flac"):
|
||||||
|
search_artist = str(audio["artist"])
|
||||||
|
search_album = str(audio["album"])
|
||||||
|
else:
|
||||||
|
search_artist = str(audio["TPE2"])
|
||||||
|
search_album = str(audio["TALB"])
|
||||||
|
|
||||||
|
# Search Spotify for the album
|
||||||
|
album_data = spotify_search.search_album(spotify, search_artist, search_album)
|
||||||
|
if album_data:
|
||||||
|
album_found = save_album_from_spotify(spotify, audio, x, album_data)
|
||||||
|
else:
|
||||||
|
album_found = False
|
||||||
|
|
||||||
if (album_found == False):
|
if (album_found == False):
|
||||||
logging.info("Nothing found on spotify, searching Google Images...")
|
logging.info("Nothing found on spotify, searching Google Images...")
|
||||||
search_google_images_and_save(x, audio)
|
search_google_images_and_save(x, audio)
|
||||||
|
|||||||
185
spotify_search.py
Normal file
185
spotify_search.py
Normal file
@@ -0,0 +1,185 @@
|
|||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import spotipy
|
||||||
|
from spotipy.oauth2 import SpotifyClientCredentials
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="{asctime} - {levelname} - [{funcName}:{lineno}] - {message}",
|
||||||
|
style="{",
|
||||||
|
datefmt="%Y-%m-%d %H:%M",
|
||||||
|
)
|
||||||
|
|
||||||
|
def init_spotify_client():
|
||||||
|
"""
|
||||||
|
Initialize and return a Spotify client.
|
||||||
|
Requires SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET environment variables.
|
||||||
|
"""
|
||||||
|
return spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
|
||||||
|
|
||||||
|
|
||||||
|
def search_track(spotify, artist, title):
|
||||||
|
"""
|
||||||
|
Search for a track on Spotify.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
spotify: Spotify client instance
|
||||||
|
artist: Artist name (string)
|
||||||
|
title: Track title (string)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict with track data if found, None otherwise. Structure:
|
||||||
|
{
|
||||||
|
'found': True,
|
||||||
|
'artist': str,
|
||||||
|
'album': str,
|
||||||
|
'album_id': str,
|
||||||
|
'release_date': str,
|
||||||
|
'release_date_precision': str,
|
||||||
|
'total_tracks': int,
|
||||||
|
'track_number': int,
|
||||||
|
'popularity': int,
|
||||||
|
'image_url': str,
|
||||||
|
'genres': list,
|
||||||
|
'label': str,
|
||||||
|
'versions_count': int
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
querystring = "artist:{0} track:{1}".format(artist.split("\00")[0], title)
|
||||||
|
logging.info("Searching Spotify for track with query: " + querystring)
|
||||||
|
|
||||||
|
results = spotify.search(q=querystring, type='track')
|
||||||
|
|
||||||
|
if len(results['tracks']['items']) > 0:
|
||||||
|
logging.info("Track found on Spotify!")
|
||||||
|
track = results['tracks']['items'][0]
|
||||||
|
album = track["album"]
|
||||||
|
|
||||||
|
# Get artist genres
|
||||||
|
artist_search = spotify.artist(track['artists'][0]['external_urls']['spotify'])
|
||||||
|
genres = artist_search.get('genres', [])
|
||||||
|
|
||||||
|
# Build response
|
||||||
|
data = {
|
||||||
|
'found': True,
|
||||||
|
'artist': album["artists"][0]["name"],
|
||||||
|
'album': album["name"],
|
||||||
|
'album_id': album["id"],
|
||||||
|
'release_date': album["release_date"],
|
||||||
|
'release_date_precision': album.get("release_date_precision", "day"),
|
||||||
|
'total_tracks': album["total_tracks"],
|
||||||
|
'track_number': track["track_number"],
|
||||||
|
'popularity': track["popularity"],
|
||||||
|
'image_url': album["images"][0]["url"] if album["images"] else None,
|
||||||
|
'genres': genres,
|
||||||
|
'label': album.get("label", ""),
|
||||||
|
'versions_count': len(results["tracks"]["items"])
|
||||||
|
}
|
||||||
|
|
||||||
|
logging.info(f"Found: {data['artist']} - {data['album']}")
|
||||||
|
return data
|
||||||
|
else:
|
||||||
|
logging.info("No track found on Spotify")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
logging.error(f"Error searching for track on Spotify: {err}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def search_album(spotify, artist, album_name):
|
||||||
|
"""
|
||||||
|
Search for an album on Spotify.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
spotify: Spotify client instance
|
||||||
|
artist: Artist name (string)
|
||||||
|
album_name: Album name (string)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict with album data if found, None otherwise. Structure:
|
||||||
|
{
|
||||||
|
'found': True,
|
||||||
|
'artist': str,
|
||||||
|
'album': str,
|
||||||
|
'album_id': str,
|
||||||
|
'release_date': str,
|
||||||
|
'release_date_precision': str,
|
||||||
|
'total_tracks': int,
|
||||||
|
'image_url': str,
|
||||||
|
'genres': list,
|
||||||
|
'versions_count': int
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
querystring = "artist:{0} album:{1}".format(artist, album_name)
|
||||||
|
logging.info("Searching Spotify for album with query: " + querystring)
|
||||||
|
|
||||||
|
tries = 0
|
||||||
|
found = False
|
||||||
|
results = None
|
||||||
|
|
||||||
|
while tries < 5 and not found:
|
||||||
|
try:
|
||||||
|
results = spotify.search(q=querystring, type='album')
|
||||||
|
found = True
|
||||||
|
except Exception as err:
|
||||||
|
logging.error(f"Could not search on Spotify: {err}")
|
||||||
|
logging.info("Waiting 30 seconds before trying again")
|
||||||
|
time.sleep(30)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if not found or not results:
|
||||||
|
logging.error("Could not search on Spotify after 5 tries")
|
||||||
|
return None
|
||||||
|
|
||||||
|
if len(results["albums"]["items"]) > 0:
|
||||||
|
logging.info("Album found on Spotify!")
|
||||||
|
album = results["albums"]["items"][0]
|
||||||
|
|
||||||
|
# Get artist genres
|
||||||
|
artist_search = spotify.artist(album['artists'][0]['external_urls']['spotify'])
|
||||||
|
genres = artist_search.get('genres', [])
|
||||||
|
|
||||||
|
# Build response
|
||||||
|
data = {
|
||||||
|
'found': True,
|
||||||
|
'artist': album["artists"][0]["name"],
|
||||||
|
'album': album["name"],
|
||||||
|
'album_id': album["id"],
|
||||||
|
'release_date': album["release_date"],
|
||||||
|
'release_date_precision': album.get("release_date_precision", "day"),
|
||||||
|
'total_tracks': album["total_tracks"],
|
||||||
|
'image_url': album["images"][0]["url"] if album["images"] else None,
|
||||||
|
'genres': genres,
|
||||||
|
'versions_count': len(results["albums"]["items"])
|
||||||
|
}
|
||||||
|
|
||||||
|
logging.info(f"Found: {data['artist']} - {data['album']}")
|
||||||
|
return data
|
||||||
|
else:
|
||||||
|
logging.info("No album found on Spotify")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
logging.error(f"Error searching for album on Spotify: {err}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def format_genres(genres):
|
||||||
|
"""
|
||||||
|
Format a list of genres into a comma-separated string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
genres: list of genre strings
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Comma-separated string of genres, or empty string if no genres
|
||||||
|
"""
|
||||||
|
if not genres or len(genres) == 0:
|
||||||
|
return ""
|
||||||
|
elif len(genres) == 1:
|
||||||
|
return str(genres[0])
|
||||||
|
else:
|
||||||
|
return ",".join(genres)
|
||||||
Reference in New Issue
Block a user