Initial Commit
This commit is contained in:
commit
eab831fe8b
|
@ -0,0 +1,2 @@
|
|||
*.pyc
|
||||
*.py~
|
|
@ -0,0 +1,47 @@
|
|||
# V A P O R W A V E
|
||||
A vaporwave music (+art, +video soon, I promise) generator bodged together using code from various sources ( Hopefully I have credited all of them in the source code). Runs on Python3
|
||||
|
||||
## Installation
|
||||
|
||||
This was tested on macOS Catalina ( so should work on almost all macOS versions).
|
||||
Windows is unsupported at this time ( I need to find a way to use aubio's python module)
|
||||
|
||||
### Dependencies
|
||||
|
||||
#### Linux
|
||||
|
||||
```
|
||||
sudo apt install ffmpeg libavl1 sox
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
#### macOS
|
||||
|
||||
Make sure you have brew installed
|
||||
|
||||
```
|
||||
brew install noah # I would have had to re-compile the executeable :(
|
||||
brew install sox
|
||||
pip install -r requirements.txt
|
||||
|
||||
## Usage
|
||||
|
||||
### YouTube URL
|
||||
```
|
||||
python3 main.py <YOUTUBE_URL>
|
||||
```
|
||||
### Song Title
|
||||
```
|
||||
python3 main.py Song Title
|
||||
```
|
||||
|
||||
## Bugs
|
||||
|
||||
This project is a result of bodging and therefore has tons of bugs which need to be ironed out
|
||||
|
||||
## To-Do
|
||||
|
||||
[ ] Move away from using os.system calls, and use Python modules instead ( Looking at you, Sox and aubio)
|
||||
[ ] Clean the Code
|
||||
[ ] Add Artwork Generator
|
||||
[ ] Add Video Generator
|
|
@ -0,0 +1,180 @@
|
|||
import os
|
||||
import subprocess
|
||||
import re
|
||||
from random import randint
|
||||
|
||||
import logzero
|
||||
from logzero import logger
|
||||
from logzero import setup_logger
|
||||
|
||||
CONFIDENCE_THRESH = 0.02
|
||||
|
||||
class VaporSong:
|
||||
|
||||
# Slows down Track
|
||||
|
||||
def slow_down(src, rate, dest):
|
||||
cmd = "sox -G -D " + src + " " + dest + " speed " + str(rate)
|
||||
os.system(cmd)
|
||||
return dest
|
||||
|
||||
# Adds Reverb
|
||||
|
||||
def reverbize(src, dest):
|
||||
cmd = "sox -G -D " + src + " " + dest + " reverb 100 fade 5 -0 7" # idk what this does tbh, https://stackoverflow.com/a/57767238/8386344
|
||||
os.system(cmd)
|
||||
return dest
|
||||
|
||||
|
||||
# Crops "src" from "start" plus "start + dur" and return it in "dest"
|
||||
def crop(src,dest,start,dur):
|
||||
cmd = "sox " + src + " " + dest + " trim " + " " + str(start) + " " + str(dur)
|
||||
os.system(cmd)
|
||||
|
||||
|
||||
# Randomly crops a part of the song of at most max_sec_len.
|
||||
def random_crop(src, max_sec_len, dest):
|
||||
out = subprocess.check_output(["soxi","-D",src]).rstrip()
|
||||
f_len = int(float(out))
|
||||
if (f_len <= max_sec_len):
|
||||
os.system("cp " + src + " " + dest)
|
||||
return
|
||||
else:
|
||||
start_region = f_len - max_sec_len
|
||||
start = randint(0,start_region)
|
||||
VaporSong.crop(src,dest,start,max_sec_len)
|
||||
|
||||
|
||||
# Given a file, returns a list of [beats, confidence], executable based on audibo's test-beattracking.c
|
||||
# TODO: Move away from executable and use aubio's Python module
|
||||
def fetchbeats(src):
|
||||
beat_matrix = []
|
||||
if os.name == 'posix':
|
||||
beats = subprocess.check_output(["noah", "get-beats",src]).rstrip()
|
||||
else:
|
||||
beats = subprocess.check_output(["get-beats",src]).rstrip()
|
||||
beats_ary = beats.splitlines()
|
||||
for i in beats_ary:
|
||||
record = i.split()
|
||||
record[0] = float(record[0])/1000.0
|
||||
record[1] = float(record[1])
|
||||
beat_matrix.append(record)
|
||||
return beat_matrix
|
||||
|
||||
# Splits an audio file into beats according to beat_matrix list
|
||||
|
||||
def split_beat(src,beat_matrix):
|
||||
split_files = []
|
||||
for i in range(0,len(beat_matrix)-1):
|
||||
|
||||
if(beat_matrix[i][1] > CONFIDENCE_THRESH):
|
||||
dur = (beat_matrix[i+1][0] - beat_matrix[i][0])
|
||||
out = src.split(".")[0]+str(i)+".wav"
|
||||
VaporSong.crop(src,out,beat_matrix[i][0],dur)
|
||||
split_files.append(out)
|
||||
return split_files
|
||||
|
||||
# Combines a list of sections
|
||||
|
||||
def combine(sections,dest):
|
||||
tocomb = []
|
||||
tocomb.append("sox")
|
||||
tocomb.append("-G")
|
||||
for section in sections:
|
||||
for sample in section:
|
||||
tocomb.append(sample)
|
||||
tocomb.append(dest)
|
||||
tmpFileLimit = len(tocomb) + 256 # in case the program messes up, it does not actually frick up your system
|
||||
os.system("ulimit -n " + str(tmpFileLimit))
|
||||
subprocess.check_output(tocomb)
|
||||
return dest
|
||||
|
||||
# Arbitrarily groups beats into lists of 4, 6, 8, or 9, perfect for looping.
|
||||
|
||||
def generate_sections(ary):
|
||||
sections = []
|
||||
beats = [4,6,8,9]
|
||||
index = 0
|
||||
while(index != len(ary)):
|
||||
current_beat = beats[randint(0,len(beats)-1)]
|
||||
new_section = []
|
||||
while((current_beat != 0) and (index != len(ary))):
|
||||
new_section.append(ary[index])
|
||||
current_beat -= 1
|
||||
index += 1
|
||||
sections.append(new_section)
|
||||
return sections
|
||||
|
||||
|
||||
# given a list of sections, selects some of them and duplicates them, perfect for that vaporwave looping effect
|
||||
def dup_sections(sections):
|
||||
new_section = []
|
||||
for section in sections:
|
||||
new_section.append(section)
|
||||
if(randint(0,1) == 0):
|
||||
new_section.append(section)
|
||||
return new_section
|
||||
|
||||
# a passage is a list of sections. This takes some sections and groups them into passages.
|
||||
|
||||
def make_passages(sections):
|
||||
passages = []
|
||||
index = 0
|
||||
while(index != len(sections)):
|
||||
passage_len = randint(1,4)
|
||||
passage = []
|
||||
while(index != len(sections) and passage_len > 0):
|
||||
passage.append(sections[index])
|
||||
index += 1
|
||||
passage_len -= 1
|
||||
passages.append(passage)
|
||||
return passages
|
||||
|
||||
# Given all of our passages, picks some of them and inserts them into a list some number of times.
|
||||
|
||||
def reorder_passages(passages):
|
||||
new_passages = []
|
||||
passage_count = randint(5,12)
|
||||
while(passage_count != 0):
|
||||
passage = passages[randint(0,len(passages)-1)]
|
||||
passage_count -= 1
|
||||
dup = randint(1,4)
|
||||
while(dup != 0):
|
||||
dup -= 1
|
||||
new_passages.append(passage)
|
||||
return new_passages
|
||||
|
||||
# converts a list of passages to a list of sections.
|
||||
|
||||
def flatten(passages):
|
||||
sections = []
|
||||
for passage in passages:
|
||||
for section in passage:
|
||||
sections.append(section)
|
||||
return sections
|
||||
|
||||
# It's all coming together
|
||||
|
||||
def vaporize_song(fname, title):
|
||||
logger.info("Slowing down the music")
|
||||
VaporSong.slow_down(fname, 0.7, "beats/out.wav")
|
||||
#logger.info("Cropping")
|
||||
#VaporSong.random_crop("beats/out.wav",150,"beats/outcrop.wav")
|
||||
logger.info("Doing Beat Analysis")
|
||||
bm = VaporSong.fetchbeats("beats/out.wav")
|
||||
logger.info("Split into beats")
|
||||
splitd = VaporSong.split_beat("beats/out.wav",bm)
|
||||
#group beats to sections
|
||||
logger.info("Divide into sections")
|
||||
sections = VaporSong.generate_sections(splitd)
|
||||
logger.info("Duping Sections")
|
||||
sdup = VaporSong.dup_sections(sections)
|
||||
# group sections into passages
|
||||
paslist = VaporSong.make_passages(sdup)
|
||||
# reorder packages
|
||||
pasloop = VaporSong.reorder_passages(paslist)
|
||||
sectionflat = VaporSong.flatten(pasloop)
|
||||
logger.info("Mastering & Reverbing")
|
||||
VaporSong.combine(sectionflat,"beats/out_norev.wav")
|
||||
VaporSong.reverbize("beats/out_norev.wav","./" + (re.sub(r"\W+|_", " ", title)).replace(" ","_") + ".wav")
|
||||
logger.info("Generated V A P O R W A V E")
|
|
@ -0,0 +1,127 @@
|
|||
from VaporSong import VaporSong
|
||||
import os
|
||||
import sys
|
||||
import youtube_dl
|
||||
import logzero
|
||||
from logzero import logger
|
||||
from logzero import setup_logger
|
||||
import re
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
|
||||
import time
|
||||
|
||||
MAX_DURATION = 600 # In-case the program finds a compilation
|
||||
youtube_urls = ('youtube.com', 'https://www.youtube.com/', 'http://www.youtube.com/', 'http://youtu.be/', 'https://youtu.be/', 'youtu.be')
|
||||
|
||||
def download_file(query,request_id=1):
|
||||
"""Returns audio to the vapor command handler
|
||||
|
||||
Searches YouTube for 'query', finds first match that has
|
||||
duration under the limit, download video with youtube_dl
|
||||
and extract .wav audio with ffmpeg.
|
||||
|
||||
Query can be YouTube link.
|
||||
"""
|
||||
ydl_opts = {
|
||||
'quiet': 'True',
|
||||
'format': 'bestaudio/best',
|
||||
'outtmpl': str(request_id) +'.%(ext)s',
|
||||
'prefer_ffmpeg': 'True',
|
||||
'noplaylist': 'True',
|
||||
'postprocessors': [{
|
||||
'key': 'FFmpegExtractAudio',
|
||||
'preferredcodec': 'wav',
|
||||
'preferredquality': '192',
|
||||
}],
|
||||
}
|
||||
|
||||
original_path = str(request_id) + ".wav"
|
||||
file_title = ""
|
||||
|
||||
# check if query is youtube url
|
||||
if not query.lower().startswith((youtube_urls)):
|
||||
# search for youtube videos matching query
|
||||
query_string = urllib.parse.urlencode({"search_query" : query})
|
||||
html_content = urllib.request.urlopen("http://www.youtube.com/results?" + query_string)
|
||||
search_results = re.findall(r'href=\"\/watch\?v=(.{11})', html_content.read().decode())
|
||||
info = False
|
||||
|
||||
# find video that fits max duration
|
||||
logger.info("Get video information...")
|
||||
for url in search_results:
|
||||
# check for video duration
|
||||
try:
|
||||
info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url,download = False)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
raise ValueError('Could not get information about video.')
|
||||
full_title = info['title']
|
||||
if (info['duration'] < MAX_DURATION and info['duration'] >= 5):
|
||||
# get first video that fits the limit duration
|
||||
logger.info("Got video: " + str(full_title))
|
||||
file_title = info['title']
|
||||
break
|
||||
|
||||
# if we ran out of urls, return error
|
||||
if (not info):
|
||||
raise ValueError('Could not find a video.')
|
||||
|
||||
# query was a youtube link
|
||||
else:
|
||||
logger.info("Query was a YouTube URL.")
|
||||
url = query
|
||||
info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url,download = False)
|
||||
file_title = info['title']
|
||||
# check if video fits limit duration
|
||||
if (info['duration'] < 5 or info['duration'] > MAX_DURATION):
|
||||
raise ValueError('Video is too short. Need 5 seconds or more.')
|
||||
|
||||
# download video and extract audio
|
||||
logger.info("Downloading video...")
|
||||
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
||||
try:
|
||||
ydl.download([url])
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
raise ValueError('Could not download ' + str(full_title) + '.')
|
||||
|
||||
return original_path, file_title
|
||||
|
||||
|
||||
def gen_vapor(filePath, title):
|
||||
# Delete stuff if there is anything left over.
|
||||
os.system("rm -r download/")
|
||||
os.system("rm -r beats/")
|
||||
|
||||
# Make the proper folders for intermediate steps
|
||||
os.system("mkdir download/")
|
||||
os.system("mkdir beats/")
|
||||
|
||||
|
||||
# Download the youtube query's first result. Might be wrong but YOLO
|
||||
#YTDownloader.download_wav_to_samp2(query)
|
||||
|
||||
# For every song in download folder(just one for now)
|
||||
"""
|
||||
for fs in os.listdir("download/"):
|
||||
# Slow down the song.
|
||||
VaporSong.vaporize_song(query,"download/"+fs)
|
||||
pass
|
||||
# When we are finished, delete the old folders.
|
||||
"""
|
||||
VaporSong.vaporize_song(filePath, title)
|
||||
|
||||
os.system("rm -r download/")
|
||||
os.system("rm -r beats/")
|
||||
|
||||
|
||||
|
||||
## Makes this a command line tool: disable when we get the webserver going
|
||||
sys.argv.pop(0)
|
||||
query = ""
|
||||
for s in sys.argv:
|
||||
query = query + s
|
||||
|
||||
name, title = download_file(query)
|
||||
gen_vapor(name, title)
|
Loading…
Reference in New Issue