Mette a disposizione una api, per ottenere informazioni su brani, artisti, playlist e altro. In particolare mette a disposizione una serie di informazioni su ogni brano, di due tipi: Basso livello: dove fa un'analisi molto dettagliata su ogni secondo del brano, con pitch, timbro, beats e altro. Alto livello: una serie di indici per classificare il brano, che forniscono informazioni più esplicite e immediate. Ci sono 7 indici che definiscono caratteristiche di un brano, ognuno può assumere valori da 0 a 1 compresi:
Ho costruito un piccolo front-end per fare il login con le credenziali di un client spotify, ed eseguire le query e ottenere una lista dei primi 10 risultati. Cliccando sul titolo, si va alla pagina spotify corrispondente. Possiamo vedere il risultato come json, e inviarlo a logstash. Il codice python è servito da flask, sulla porta 5001.
Dietro la pagna web, sta uno script producer.py, che esegue le query a spotify.
Per eseguire richieste all'api di spotify ho usato una libreria apposita di python: spotipy.
Prima di poter fare richieste però è necessario prima però creare un'applicazione sulla dashboard di spotify developer, e connettere lo script ad essa con client id e secret.
Successivamente si possono effettuare richieste del tipo:
sp.search("Timber", limit=4)
Restituisce un json con i primi 4 risultati della ricerca "Timber"
sp.audio_features([track_id])
Restituisce un oggetto json con quegli indici di features relativi all'id del brano inserito
Quindi costruisco un array di oggetti json del tipo:
{
"name": "nome brano",
"playlist": "nome playlist di appartenenza (se presente)"
"track_id": "id di spotify del brano",
"speechiness": valore,
"energy": valore,
..
}
Per mandarlo a Logstash con tcp su porta 6000
Nella sua configurazione definisco come input: tcp con porta 6000, e come output: stdout e kafka sul topic: topic_1
Rende disponibili i dati sulla porta 9092 e topic topic_1
Usando pyspark, la libreria per spark di python. Creo un input structured stream da kafka, e organizzo i messaggi in un dataframe. Creo 2 dataframe di training, nel primo (train1 ) carico il dataset csv dalle top canzoni su spotify della settimana e della giornata, per un totale di 150 brani. Il secondo dataframe di training (train_inverse), invece lo creo basandomi sul primo, ad ogni valore mappo 1 - valore in train1 in modo da avere un set fittizio di canzoni con valori esattamente opposti a quelle più popolari. Quindi aggiungo a entrambi una colonna impostata a 1 per tutto train1 e a 0 per tutto train_inverse
Per ogni brano o insieme di brani in arrivo, eseguo le operazioni in pipeline, che aggiungeranno la colonna di prediction, che avrà un valore in base a quanto gli indici di speechness, valence, ecc.. saranno simili a quelli delle top songs.
Quindi invio il risultato ad elasticsearch con:
(df.writeStream
.option("checkpointLocation", "/temp/tmp/checkpoints")
.format("es")
.start(es_index)
.awaitTermination())
Dalla directory principale del progetto, eseguire lo script ./start.sh
Esso scaricherà i file necessari, ovvero solo l'archivio di spark-hadoop, per costruire la docker image. Dopodichè eseguirà: docker-compose -d, avviando i container della ui, logstash, kafka, zookeeper, spark, elastic e kibana.
Se si vuole aggiornare il training set di canzoni, su cui spark basa le predizioni, basta uncommentare le righe specifiche in start.sh