Migrate your MP3 library to Spotify

How to automatically identify your MP3s and upload them to a Spotify playlist using Python.

If, like me, you have a large library of MP3s that you have collected over the years, you might want to have them available on the music streaming service of your choice, so that you don’t have to lug them around everywhere with you. In this article, we are going to focus on Spotify, but it should be possible to do something similar with YouTube and Deezer.

Get your credentials from ACRCloud

Sign up with ACRCloud and create a new Audio & Video Recognition Project.

Get your credentials from Spotify

Head over to the Spotify for Developers Dashboard and create a new client ID. (If you don’t already have a Spotify account, you can easily obtain a free one.) You’ll have to provide some blah blah about what you are doing but this is not important, as long as you don’t intend to use it for commercial purposes. You’ll now have access to your Client ID and your Client Secret. However, it is also important to edit the settings and include a Redirect URI. As part of the authentication process, Spotify will call you back at this web address with a URL that contains a magic code. You can choose any website that allows you to grab this information (e.g., https://github.com) without being redirected to a 404 error page. Make a note of your Client ID, Client Secret and Redirect URI.

TL;DR

Now that you have all the pieces you need, you can run my notebook in Google Colab and just follow the steps.

Identify MP3s using Python

First, we have to import some libraries. It should be straightforward to install them if you have not done so already.

import os
import sys
import hmac
import time
import json
import tqdm
import base64
import hashlib
import urllib.request
import urllib.parse
import datetime
from pydub import AudioSegment
def post_multipart(url, fields, files):
content_type, body = encode_multipart_formdata(fields, files)
req = urllib.request.Request(url, data=body)
req.add_header('Content-Type', content_type)
req.add_header('Referer', url)
resp = urllib.request.urlopen(req)
ares = resp.read().decode('utf8')
return ares
def encode_multipart_formdata(fields, files):
boundary = "*****2016.05.27.acrcloud.rec.copyright." + str(
time.time()) + "*****"
body = b''
CRLF = '\r\n'
L = []
for (key, value) in list(fields.items()):
L.append('--' + boundary)
L.append('Content-Disposition: form-data; name="%s"' % key)
L.append('')
L.append(value)
body = CRLF.join(L).encode('ascii')
for (key, value) in list(files.items()):
L = []
L.append(CRLF + '--' + boundary)
L.append('Content-Disposition: form-data; name="%s"; filename="%s"' %(key, key))
L.append('Content-Type: application/octet-stream')
L.append(CRLF)
body = body + CRLF.join(L).encode('ascii') + value
body = body + (CRLF + '--' + boundary + '--' + CRLF + CRLF).encode('ascii')
content_type = 'multipart/form-data; boundary=%s' % boundary
return content_type, body
def get_track_info(sample):
http_method = "POST"
http_url_file = "/v1/identify"
data_type = "audio"
signature_version = "1"
timestamp = int( time.mktime(datetime.datetime.utcfromtimestamp(time.time()).timetuple()))
query_data = sample[:5000000] # make sure sample is not too big
sample_bytes = str(len(query_data))
string_to_sign = http_method + "\n" + http_url_file + "\n" + access_key + "\n" + data_type + "\n" + signature_version + "\n" + str(timestamp)
hmac_res = hmac.new(access_secret.encode('ascii'),
string_to_sign.encode('ascii'),
digestmod=hashlib.sha1).digest()
sign = base64.b64encode(hmac_res).decode('ascii')
fields = {
'access_key': access_key,
'sample_bytes': sample_bytes,
'timestamp': str(timestamp),
'signature': sign,
'data_type': data_type,
"signature_version": signature_version
}
res = post_multipart('http://' + host + http_url_file, fields,
{"sample": query_data})
parsed_resp = json.loads(res)
return parsed_resp
host = 'fill this in with your details'
access_key = 'fill this in with your details'
access_secret = 'fill this in with your details'
f = open("01 Push It Along.mp3", "rb")
sample = f.read()
f.close()
get_track_info(sample)
{'metadata': {'timestamp_utc': '2019-11-30 12:07:45',
'music': [{'label': 'Jive',
'play_offset_ms': 14480,
'external_ids': {'isrc': 'USJI10300139', 'upc': '012414133120'},
'artists': [{'name': 'A Tribe Called Quest'}],
'result_from': 1,
'acrid': '71678fbabfbf26d9d4ec1a85d0655631',
'title': 'Push It Along',
'duration_ms': 462200,
'album': {'name': "Peoples' Instinctive Travels & the Paths of Rhythm"},
'score': 100,
'external_metadata': {'deezer': {'track': {'name': 'Push It Along',
'id': '2467796'},
'artists': [{'name': 'A Tribe Called Quest', 'id': '1862'}],
'album': {'name': "Peoples' Instinctive Travels & the Paths of Rhythm",
'id': '242435'}},
'spotify': {'track': {'name': 'Push It Along',
'id': '6RwONnsgzkvNEwwxoPmg04'},
'artists': [{'name': 'A Tribe Called Quest',
'id': '09hVIj6vWgoCDtT03h8ZCa'}],
'album': {'name': "Peoples' Instinctive Travels & the Paths of Rhythm",
'id': '4Qt1ZvWZ3DoKDimDMesZd5'}},
'youtube': {'vid': 'qRPvKh4JCLg'},
'musicstory': {'track': {'id': '1718770'}}},
'release_date': '1990-04-11'}]},
'cost_time': 1.3910000324249,
'status': {'msg': 'Success', 'version': '1.0', 'code': 0},
'result_type': 0}
directory = '/path/to/your/mp3s/and/m4as'
ids = {}
mp3s = []
for root, dirs, files in os.walk(directory):
for file in files:
if file[-3:] == 'mp3' or file[-3:] == 'm4a':
mp3s.append(root + '/' + file)
for sound_file in tqdm.tqdm_notebook(mp3s):
if sound_file in ids:
continue
try:
f = open(sound_file, "rb")
sample = f.read()
f.close()
parsed_resp = get_track_info(sample)
ids[sound_file] = parsed_resp['metadata']['music'][0][
'external_metadata']['spotify']['track']['id']
except Exception as e:
if parsed_resp['status']['code'] == 2004:
try:
# re-encode sample as mp3
audio = AudioSegment.from_file(sound_file, format=sound_file[-3:])
audio.export("audio.mp3", format="mp3")
f = open("audio.mp3", "rb")
sample = f.read()
f.close()
parsed_resp = get_track_info(sample)
ids[sound_file] = parsed_resp['metadata']['music'][0]['external_metadata']['spotify']['track']['id']
continue
except:
pass
if 'limit exceeded' in parsed_resp['status']['msg']:
print(
f"{parsed_resp['status']['msg']}: Got to {mp3s.index(sound_file)}"
)
break
if parsed_resp['status']['msg'] == 'Success':
print(f'{e}: Skipping {sound_file}...')
else:
print(f"{parsed_resp['status']['msg']}: Skipping {sound_file}...")
continue

Add the tracks to a new playlist in Spotify

For this, we need to install and import the Spotipy (not a typo) library.

import spotipy
import spotipy.util as util
client_id = 'fill this in with your details'
client_secret = 'fill this in with your details'
redirect_uri = 'fill this in with your details'
username = 'fill this in with your details'
playlist_name = 'fill this in with your details'
def user_playlist_create(sp,
username,
playlist_name,
description='',
public=True):
data = {
'name': playlist_name,
'public': public,
'description': description
}
return sp._post("users/%s/playlists" % (username, ), payload=data)['id']
token = util.prompt_for_user_token(username, scope, client_id, client_secret, redirect_uri)
sp = spotipy.Spotify(token)
playlists = sp.user_playlists(username)
playlist_ids = [playlist['id'] for playlist in playlists['items'] if playlist['name'] == playlist_name]
if len(playlist_ids) == 0:
user_playlist_create(sp, username, playlist_name)
else:
playlist_ids = playlist_id
tracks = []
replace = True
for id in ids:
tracks.append(ids[id])
if len(tracks) == 100 or id == len(ids)-1:
if replace:
sp.user_playlist_replace_tracks(username, playlist_id, tracks)
replace = False
else:
sp.user_playlist_add_tracks(username, playlist_id, tracks)
tracks = []