Merge pull request #5 from navanchauhan/deepsource-transform-8bc97824

Format code with Black
This commit is contained in:
Navan Chauhan 2020-07-07 20:27:58 +05:30 committed by GitHub
commit 4edd4d1bd2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 311 additions and 251 deletions

72
main.py
View File

@ -16,11 +16,15 @@ import time
version = 2.0 version = 2.0
style = False style = False
text = '| V A P O R W A V E || G E N E R A T O R |' text = "| V A P O R W A V E || G E N E R A T O R |"
parser = argparse.ArgumentParser(description=text) parser = argparse.ArgumentParser(description=text)
parser.add_argument("-M", "--music", help="generate v a p o r w a v e music", action="store_true") parser.add_argument(
parser.add_argument("-P", "--picture", help="generate VHS Style image", action="store_true") "-M", "--music", help="generate v a p o r w a v e music", action="store_true"
)
parser.add_argument(
"-P", "--picture", help="generate VHS Style image", action="store_true"
)
parser.add_argument("-V", "--video", help="VHS Style Video", action="store_true") parser.add_argument("-V", "--video", help="VHS Style Video", action="store_true")
parser.add_argument("-v", "--version", help="show program version", action="store_true") parser.add_argument("-v", "--version", help="show program version", action="store_true")
parser.add_argument("-i", "--input") parser.add_argument("-i", "--input")
@ -51,7 +55,15 @@ else:
exit exit
MAX_DURATION = 600 # In-case the program finds a compilation MAX_DURATION = 600 # In-case the program finds a compilation
youtube_urls = ('youtube.com', 'https://www.youtube.com/', 'http://www.youtube.com/', 'http://youtu.be/', 'https://youtu.be/', 'youtu.be') youtube_urls = (
"youtube.com",
"https://www.youtube.com/",
"http://www.youtube.com/",
"http://youtu.be/",
"https://youtu.be/",
"youtu.be",
)
def download_file(query, request_id=1): def download_file(query, request_id=1):
"""Returns audio to the vapor command handler """Returns audio to the vapor command handler
@ -63,16 +75,18 @@ def download_file(query,request_id=1):
Query can be YouTube link. Query can be YouTube link.
""" """
ydl_opts = { ydl_opts = {
'quiet': 'True', "quiet": "True",
'format': 'bestaudio/best', "format": "bestaudio/best",
'outtmpl': str(request_id) +'.%(ext)s', "outtmpl": str(request_id) + ".%(ext)s",
'prefer_ffmpeg': 'True', "prefer_ffmpeg": "True",
'noplaylist': 'True', "noplaylist": "True",
'postprocessors': [{ "postprocessors": [
'key': 'FFmpegExtractAudio', {
'preferredcodec': 'wav', "key": "FFmpegExtractAudio",
'preferredquality': '192', "preferredcodec": "wav",
}], "preferredquality": "192",
}
],
} }
original_path = str(request_id) + ".wav" original_path = str(request_id) + ".wav"
@ -82,8 +96,12 @@ def download_file(query,request_id=1):
if not query.lower().startswith((youtube_urls)): if not query.lower().startswith((youtube_urls)):
# search for youtube videos matching query # search for youtube videos matching query
query_string = urllib.parse.urlencode({"search_query": query}) query_string = urllib.parse.urlencode({"search_query": query})
html_content = urllib.request.urlopen("http://www.youtube.com/results?" + query_string) html_content = urllib.request.urlopen(
search_results = re.findall(r'href=\"\/watch\?v=(.{11})', html_content.read().decode()) "http://www.youtube.com/results?" + query_string
)
search_results = re.findall(
r"href=\"\/watch\?v=(.{11})", html_content.read().decode()
)
info = False info = False
# find video that fits max duration # find video that fits max duration
@ -94,27 +112,27 @@ def download_file(query,request_id=1):
info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url, download=False) info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url, download=False)
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
raise ValueError('Could not get information about video.') raise ValueError("Could not get information about video.")
full_title = info['title'] full_title = info["title"]
if (info['duration'] < MAX_DURATION and info['duration'] >= 5): if info["duration"] < MAX_DURATION and info["duration"] >= 5:
# get first video that fits the limit duration # get first video that fits the limit duration
logger.info("Got video: " + str(full_title)) logger.info("Got video: " + str(full_title))
file_title = info['title'] file_title = info["title"]
break break
# if we ran out of urls, return error # if we ran out of urls, return error
if (not info): if not info:
raise ValueError('Could not find a video.') raise ValueError("Could not find a video.")
# query was a youtube link # query was a youtube link
else: else:
logger.info("Query was a YouTube URL.") logger.info("Query was a YouTube URL.")
url = query url = query
info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url, download=False) info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url, download=False)
file_title = info['title'] file_title = info["title"]
# check if video fits limit duration # check if video fits limit duration
if (info['duration'] < 5 or info['duration'] > MAX_DURATION): if info["duration"] < 5 or info["duration"] > MAX_DURATION:
raise ValueError('Video is too short. Need 5 seconds or more.') raise ValueError("Video is too short. Need 5 seconds or more.")
# download video and extract audio # download video and extract audio
logger.info("Downloading video...") logger.info("Downloading video...")
@ -123,7 +141,7 @@ def download_file(query,request_id=1):
ydl.download([url]) ydl.download([url])
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
raise ValueError('Could not download ' + str(full_title) + '.') raise ValueError("Could not download " + str(full_title) + ".")
return original_path, file_title return original_path, file_title
@ -137,7 +155,6 @@ def gen_vapor(filePath, title):
os.system("mkdir download/") os.system("mkdir download/")
os.system("mkdir beats/") os.system("mkdir beats/")
# Download the youtube query's first result. Might be wrong but YOLO # Download the youtube query's first result. Might be wrong but YOLO
# YTDownloader.download_wav_to_samp2(query) # YTDownloader.download_wav_to_samp2(query)
@ -170,4 +187,3 @@ elif picture:
generateVHSStyle(query, "out.jpg") generateVHSStyle(query, "out.jpg")
elif video: elif video:
VHS_Vid(query, outfile) VHS_Vid(query, outfile)

View File

@ -35,11 +35,11 @@ def hueChange(img, offset):
g_data = [] g_data = []
b_data = [] b_data = []
offset = offset/100. offset = offset / 100.0
for rd, gr, bl in zip(r.getdata(), g.getdata(), b.getdata()): for rd, gr, bl in zip(r.getdata(), g.getdata(), b.getdata()):
h, s, v = colorsys.rgb_to_hsv(rd / 255.0, bl / 255.0, gr / 255.0) h, s, v = colorsys.rgb_to_hsv(rd / 255.0, bl / 255.0, gr / 255.0)
rgb = colorsys.hsv_to_rgb(h, s + offset, v) rgb = colorsys.hsv_to_rgb(h, s + offset, v)
rd, bl, gr = [int(x*255.) for x in rgb] rd, bl, gr = [int(x * 255.0) for x in rgb]
r_data.append(rd) r_data.append(rd)
g_data.append(gr) g_data.append(gr)
b_data.append(bl) b_data.append(bl)
@ -47,12 +47,21 @@ def hueChange(img, offset):
r.putdata(r_data) r.putdata(r_data)
g.putdata(g_data) g.putdata(g_data)
b.putdata(b_data) b.putdata(b_data)
return Image.merge('RGB',(r,g,b)) return Image.merge("RGB", (r, g, b))
def decision(probability): def decision(probability):
return random.random() < probability return random.random() < probability
def mod_image_repeat_rows(imgname, chance_of_row_repeat=0, max_row_repeats=0, min_row_repeats=0, save=True, out_name="image.jpg"):
def mod_image_repeat_rows(
imgname,
chance_of_row_repeat=0,
max_row_repeats=0,
min_row_repeats=0,
save=True,
out_name="image.jpg",
):
img = Image.open(imgname) img = Image.open(imgname)
pixels = img.load() pixels = img.load()
width, height = img.size width, height = img.size
@ -102,22 +111,35 @@ def add_date(img_path, out_name="image.jpg", bottom_offset=0):
width, height = img.size width, height = img.size
draw = ImageDraw.Draw(img) draw = ImageDraw.Draw(img)
font = ImageFont.truetype("src/VCR_OSD_MONO_1.001.ttf", 64) font = ImageFont.truetype("src/VCR_OSD_MONO_1.001.ttf", 64)
draw.text((corner_offset, (height-150-bottom_offset)), date_str_1, (255, 255, 255), font=font) draw.text(
draw.text((corner_offset, (height-75)-bottom_offset), date_str_2, (255, 255, 255), font=font) (corner_offset, (height - 150 - bottom_offset)),
date_str_1,
(255, 255, 255),
font=font,
)
draw.text(
(corner_offset, (height - 75) - bottom_offset),
date_str_2,
(255, 255, 255),
font=font,
)
draw.text((corner_offset, 25), "|| PAUSE", (255, 255, 255), font=font) draw.text((corner_offset, 25), "|| PAUSE", (255, 255, 255), font=font)
img.save(out_name) img.save(out_name)
def add_img_noise(imgpath, intensity=1, out_name="image.jpg"): def add_img_noise(imgpath, intensity=1, out_name="image.jpg"):
img = imageio.imread(imgpath, pilmode='RGB') img = imageio.imread(imgpath, pilmode="RGB")
noise1 = img + intensity * img.std() * np.random.random(img.shape) noise1 = img + intensity * img.std() * np.random.random(img.shape)
imageio.imwrite(out_name, noise1) imageio.imwrite(out_name, noise1)
def offset_hue(image, out_name="image.jpg"): def offset_hue(image, out_name="image.jpg"):
if isinstance(image, str): if isinstance(image, str):
image = Image.open(image) image = Image.open(image)
image = hueChange(image, 25) image = hueChange(image, 25)
image.save(out_name) image.save(out_name)
def build_background(out_name, taskbar_offset): def build_background(out_name, taskbar_offset):
# getImage(out_name="start.jpg") # getImage(out_name="start.jpg")
offset_hue("start.jpg", out_name="saturated.jpg") offset_hue("start.jpg", out_name="saturated.jpg")
@ -125,11 +147,13 @@ def build_background(out_name, taskbar_offset):
add_img_noise("shifted.jpg", out_name="noisy.jpg") add_img_noise("shifted.jpg", out_name="noisy.jpg")
add_date("noisy.jpg", out_name=out_name, bottom_offset=taskbar_offset) add_date("noisy.jpg", out_name=out_name, bottom_offset=taskbar_offset)
""" """
if __name__ == "__main__": if __name__ == "__main__":
build_background("bkg.jpg", 25) build_background("bkg.jpg", 25)
""" """
def generateVHSStyle(infile, outfile, silence=False): def generateVHSStyle(infile, outfile, silence=False):
if silence: if silence:
cut_rows = bool(random.getrandbits(1)) cut_rows = bool(random.getrandbits(1))
@ -165,4 +189,5 @@ def generateVHSStyle(infile, outfile, silence=False):
os.remove("saturated.jpg") os.remove("saturated.jpg")
os.remove("noisy.jpg") os.remove("noisy.jpg")
# generateVHSStyle("s.jpg","o.jpg") # generateVHSStyle("s.jpg","o.jpg")

View File

@ -8,6 +8,7 @@ import logzero
from logzero import logger from logzero import logger
from logzero import setup_logger from logzero import setup_logger
def SaveVid(path): def SaveVid(path):
vidObj = cv2.VideoCapture(path) vidObj = cv2.VideoCapture(path)
count = 0 count = 0
@ -18,6 +19,7 @@ def SaveVid(path):
# os.rename("frames/"+str(count)+".jpg", os.path.splitext("frames/"+str(count)+".jpg")[0]) # os.rename("frames/"+str(count)+".jpg", os.path.splitext("frames/"+str(count)+".jpg")[0])
count += 1 count += 1
def Style(pathToFrames): def Style(pathToFrames):
files = [f for f in os.listdir(pathToFrames) if isfile(join(pathToFrames, f))] files = [f for f in os.listdir(pathToFrames) if isfile(join(pathToFrames, f))]
count = 0 count = 0
@ -41,12 +43,25 @@ def Style(pathToFrames):
os.system(c) os.system(c)
os.chdir(cwd) os.chdir(cwd)
def generateVideo(outfile, path, infile): def generateVideo(outfile, path, infile):
frame_array = [] frame_array = []
files = [int(f) for f in os.listdir(path) if isfile(join(path, f))] files = [int(f) for f in os.listdir(path) if isfile(join(path, f))]
files.sort() files.sort()
duration = subprocess.check_output(['ffprobe', '-i', infile, '-show_entries', 'format=duration', '-v', 'quiet', '-of', 'csv=%s' % ("p=0")]) duration = subprocess.check_output(
[
"ffprobe",
"-i",
infile,
"-show_entries",
"format=duration",
"-v",
"quiet",
"-of",
"csv=%s" % ("p=0"),
]
)
fps = len(files) / float(duration) fps = len(files) / float(duration)
print("FPS", fps) print("FPS", fps)
@ -56,14 +71,14 @@ def generateVideo(outfile, path, infile):
height, width, layers = img.shape height, width, layers = img.shape
size = (width, height) size = (width, height)
frame_array.append(img) frame_array.append(img)
out = cv2.VideoWriter(outfile,cv2.VideoWriter_fourcc(*'MP4V'), fps, size) out = cv2.VideoWriter(outfile, cv2.VideoWriter_fourcc(*"MP4V"), fps, size)
for i in range(len(frame_array)): for i in range(len(frame_array)):
out.write(frame_array[i]) out.write(frame_array[i])
out.release() out.release()
def VHS_Vid(infile, outfile): def VHS_Vid(infile, outfile):
path = './frames/' path = "./frames/"
os.system("rm frames/*") os.system("rm frames/*")
os.system("mkdir frames") os.system("mkdir frames")
logger.info("Exctracting Frames") logger.info("Exctracting Frames")

View File

@ -11,6 +11,7 @@ from logzero import setup_logger
CONFIDENCE_THRESH = 0.02 CONFIDENCE_THRESH = 0.02
class VaporSong: class VaporSong:
# Slows down Track # Slows down Track
@ -23,22 +24,22 @@ class VaporSong:
# Adds Reverb # Adds Reverb
def reverbize(src, dest): def reverbize(src, dest):
cmd = "sox -G -D " + src + " " + dest + " reverb 100 fade 5 -0 7" # idk what this does tbh, https://stackoverflow.com/a/57767238/8386344 cmd = (
"sox -G -D " + src + " " + dest + " reverb 100 fade 5 -0 7"
) # idk what this does tbh, https://stackoverflow.com/a/57767238/8386344
os.system(cmd) os.system(cmd)
return dest return dest
# Crops "src" from "start" plus "start + dur" and return it in "dest" # Crops "src" from "start" plus "start + dur" and return it in "dest"
def crop(src, dest, start, dur): def crop(src, dest, start, dur):
cmd = "sox " + src + " " + dest + " trim " + " " + str(start) + " " + str(dur) cmd = "sox " + src + " " + dest + " trim " + " " + str(start) + " " + str(dur)
os.system(cmd) os.system(cmd)
# Randomly crops a part of the song of at most max_sec_len. # Randomly crops a part of the song of at most max_sec_len.
def random_crop(src, max_sec_len, dest): def random_crop(src, max_sec_len, dest):
out = subprocess.check_output(["soxi", "-D", src]).rstrip() out = subprocess.check_output(["soxi", "-D", src]).rstrip()
f_len = int(float(out)) f_len = int(float(out))
if (f_len <= max_sec_len): if f_len <= max_sec_len:
os.system("cp " + src + " " + dest) os.system("cp " + src + " " + dest)
return return
else: else:
@ -46,12 +47,11 @@ class VaporSong:
start = randint(0, start_region) start = randint(0, start_region)
VaporSong.crop(src, dest, start, max_sec_len) VaporSong.crop(src, dest, start, max_sec_len)
# Given a file, returns a list of [beats, confidence], executable based on audibo's test-beattracking.c # Given a file, returns a list of [beats, confidence], executable based on audibo's test-beattracking.c
# TODO: Move away from executable and use aubio's Python module # TODO: Move away from executable and use aubio's Python module
def fetchbeats(src): def fetchbeats(src):
beat_matrix = [] beat_matrix = []
if platform == 'darwin': if platform == "darwin":
beats = subprocess.check_output(["noah", "get-beats", src]).rstrip() beats = subprocess.check_output(["noah", "get-beats", src]).rstrip()
else: else:
beats = subprocess.check_output(["./get-beats", src]).rstrip() beats = subprocess.check_output(["./get-beats", src]).rstrip()
@ -69,8 +69,8 @@ class VaporSong:
split_files = [] split_files = []
for i in range(0, len(beat_matrix) - 1): for i in range(0, len(beat_matrix) - 1):
if(beat_matrix[i][1] > CONFIDENCE_THRESH): if beat_matrix[i][1] > CONFIDENCE_THRESH:
dur = (beat_matrix[i+1][0] - beat_matrix[i][0]) dur = beat_matrix[i + 1][0] - beat_matrix[i][0]
out = src.split(".")[0] + str(i) + ".wav" out = src.split(".")[0] + str(i) + ".wav"
VaporSong.crop(src, out, beat_matrix[i][0], dur) VaporSong.crop(src, out, beat_matrix[i][0], dur)
split_files.append(out) split_files.append(out)
@ -86,7 +86,9 @@ class VaporSong:
for sample in section: for sample in section:
tocomb.append(sample) tocomb.append(sample)
tocomb.append(dest) tocomb.append(dest)
tmpFileLimit = len(tocomb) + 256 # in case the program messes up, it does not actually frick up your system tmpFileLimit = (
len(tocomb) + 256
) # in case the program messes up, it does not actually frick up your system
n = str(tmpFileLimit) n = str(tmpFileLimit)
# logger.info("Setting file limit to ", n) # logger.info("Setting file limit to ", n)
os.system("ulimit -n " + n) os.system("ulimit -n " + n)
@ -99,23 +101,22 @@ class VaporSong:
sections = [] sections = []
beats = [4, 6, 8, 9] beats = [4, 6, 8, 9]
index = 0 index = 0
while(index != len(ary)): while index != len(ary):
current_beat = beats[randint(0, len(beats) - 1)] current_beat = beats[randint(0, len(beats) - 1)]
new_section = [] new_section = []
while((current_beat != 0) and (index != len(ary))): while (current_beat != 0) and (index != len(ary)):
new_section.append(ary[index]) new_section.append(ary[index])
current_beat -= 1 current_beat -= 1
index += 1 index += 1
sections.append(new_section) sections.append(new_section)
return sections return sections
# given a list of sections, selects some of them and duplicates them, perfect for that vaporwave looping effect # given a list of sections, selects some of them and duplicates them, perfect for that vaporwave looping effect
def dup_sections(sections): def dup_sections(sections):
new_section = [] new_section = []
for section in sections: for section in sections:
new_section.append(section) new_section.append(section)
if(randint(0,1) == 0): if randint(0, 1) == 0:
new_section.append(section) new_section.append(section)
return new_section return new_section
@ -124,10 +125,10 @@ class VaporSong:
def make_passages(sections): def make_passages(sections):
passages = [] passages = []
index = 0 index = 0
while(index != len(sections)): while index != len(sections):
passage_len = randint(1, 4) passage_len = randint(1, 4)
passage = [] passage = []
while(index != len(sections) and passage_len > 0): while index != len(sections) and passage_len > 0:
passage.append(sections[index]) passage.append(sections[index])
index += 1 index += 1
passage_len -= 1 passage_len -= 1
@ -139,11 +140,11 @@ class VaporSong:
def reorder_passages(passages): def reorder_passages(passages):
new_passages = [] new_passages = []
passage_count = randint(5, 12) passage_count = randint(5, 12)
while(passage_count != 0): while passage_count != 0:
passage = passages[randint(0, len(passages) - 1)] passage = passages[randint(0, len(passages) - 1)]
passage_count -= 1 passage_count -= 1
dup = randint(1, 4) dup = randint(1, 4)
while(dup != 0): while dup != 0:
dup -= 1 dup -= 1
new_passages.append(passage) new_passages.append(passage)
return new_passages return new_passages
@ -180,5 +181,8 @@ class VaporSong:
sectionflat = VaporSong.flatten(pasloop) sectionflat = VaporSong.flatten(pasloop)
logger.info("Mastering & Reverbing") logger.info("Mastering & Reverbing")
VaporSong.combine(sectionflat, "beats/out_norev.wav") VaporSong.combine(sectionflat, "beats/out_norev.wav")
VaporSong.reverbize("beats/out_norev.wav","./" + (re.sub(r"\W+|_", " ", title)).replace(" ","_") + ".wav") VaporSong.reverbize(
"beats/out_norev.wav",
"./" + (re.sub(r"\W+|_", " ", title)).replace(" ", "_") + ".wav",
)
logger.info("Generated V A P O R W A V E") logger.info("Generated V A P O R W A V E")