AutoSafe/main.py

343 lines
9.7 KiB
Python
Raw Normal View History

2019-05-24 18:47:13 +01:00
"""AutoSafe
Modular Road-Safety Toolkit
2019-05-11 17:08:22 +01:00
2019-05-24 18:47:13 +01:00
"""
#pylint: disable=C0103,C0111
import argparse
import time
2019-05-11 17:08:22 +01:00
import overpy
import simplejson as sjson
from scipy.spatial import distance as dist
from imutils.video import VideoStream
from imutils import face_utils
2019-05-24 18:47:13 +01:00
#import playsound
2019-05-11 17:08:22 +01:00
import imutils
import dlib
import cv2
2019-05-24 18:47:13 +01:00
#from uber_rides.session import Session
#from uber_rides.client import UberRidesClient
2019-05-11 17:08:22 +01:00
import tweepy
import requests
#import geocoder
#import obd
######################
# Defining Variables #
######################
2019-05-24 18:47:13 +01:00
radius = str(100) # Radius for maxspeed function. taken as a string because gets concatenated
2019-05-11 17:08:22 +01:00
################
# Dummy Values #
################
#Latitude
glat = 28.544565 #28.546519
#Longitude
glng = 77.193320 #77.179248
# Commented this as we are not on a road I guess?
#g = geocoder.ip('me')
#print(g.latlng)
#####################
# Real Time Values #
#####################
#glat = g.lat
#glng = g.lng
###################
# Dummy OBD Setup #
###################
##############################################################
# What the commands are doing: #
# It setups and asynchronous watch over the speed of the car #
# this means that the speed's data is constantly updated #
# This makes the while loop work #
##############################################################
#connection = obd.Async() # auto-connects to USB or RF port
#connection.watch(obd.commands.SPEED) # select an OBD command (sensor)
#connections.start()
#carSpeed = connection.query(obd.commands.SPEED) # send the command, and parse the response
#carSpeed = 30
#####################
# Tweeting Function #
#####################
def get_api(cfg):
auth = tweepy.OAuthHandler(cfg['consumer_key'], cfg['consumer_secret'])
auth.set_access_token(cfg['access_token'], cfg['access_token_secret'])
return tweepy.API(auth)
2019-05-24 18:47:13 +01:00
def tweetMe():
cfg = {
"consumer_key":"knQFpTnjuSvr6OxYwebt3wyrd",
"consumer_secret":"Mhex3oRkmaF7lD3hoMvHpAD6ctW0ugKYCopTlhc0JzOLOMIZ0w",
"access_token":"2846631344-wEozinvHfEIFxFVy51I6te8SrN5OTFtU00wxsiz",
"access_token_secret":"Nfx1U8a2TjAQXFLBrJIyy2p36sjBGAWFIthLc1cIoI56U"
}
2019-05-11 17:08:22 +01:00
api = get_api(cfg)
headers = {
'Accept': 'application/json',
2019-05-24 18:47:13 +01:00
'user-key': 'a530c1424d9abe5442fa22f77ce03d25',
2019-05-11 17:08:22 +01:00
}
params = (
('lat', '28.546519'),
('lon', '77.179248'),
)
2019-05-24 18:47:13 +01:00
url = 'https://developers.zomato.com/api/v2.1/geocode'
response = requests.get(url, headers=headers, params=params)
2019-05-11 17:08:22 +01:00
loc = response.json()['location']['title']
2019-05-24 18:47:13 +01:00
tweet = ("Stay Alert! Sudden braking at: ", loc)
status = api.update_status(status=tweet)
print(status)
2019-05-11 17:08:22 +01:00
################################
# Fetching Details from Zomato #
################################
def getRes():
2019-05-24 18:47:13 +01:00
res = []
2019-05-11 17:08:22 +01:00
headers = {
'Accept': 'application/json',
'user-key': 'a530c1424d9abe5442fa22f77ce03d25',
}
params = (
('lat', '28.546519'),
('lon', '77.179248'),
)
2019-05-24 18:47:13 +01:00
url = 'https://developers.zomato.com/api/v2.1/geocode'
response = requests.get(url, headers=headers, params=params)
2019-05-11 17:08:22 +01:00
res = response.json()['popularity']['nearby_res']
return res
def getDetails(res):
headers = {
'Accept': 'application/json',
'user-key': 'a530c1424d9abe5442fa22f77ce03d25',
}
url = "https://developers.zomato.com/api/v2.1/restaurant?res_id=" + str(res[0])
newResponse = requests.get(url, headers=headers)
2019-05-24 18:47:13 +01:00
#newRes = []
2019-05-11 17:08:22 +01:00
resName = newResponse.json()['name']
resAddress = newResponse.json()['location']['address']
print("You are feeling sleepy, why don't you take a break?\n")
2019-05-24 18:47:13 +01:00
print("Your nearest eatery is " + resName, "\n")
print(resName + " is at " + resAddress, "\n")
2019-05-11 17:08:22 +01:00
def zomato():
res = getRes()
getDetails(res)
############################
# Combatting Drunk Driving #
############################
def drunk():
2019-05-24 18:47:13 +01:00
print("Your appear to be around places which sell \n alcohol, take the breathalyser test")
2019-05-11 17:08:22 +01:00
2019-05-24 18:47:13 +01:00
bac_raw = str(open("./files/bac.txt", "r").read())
2019-05-11 17:08:22 +01:00
bac = float(float(bac_raw)/100)
print(bac)
2019-05-24 18:47:13 +01:00
if bac >= 0.08:
2019-05-11 17:08:22 +01:00
print("Please Do Not Drive!\n")
print("I can call a cab if you want\n")
print("")
answer = input("Say No, to disagree, else I'll book the cab: \n")
2019-05-24 18:47:13 +01:00
if answer == "no":
2019-05-11 17:08:22 +01:00
print("You are not fit to drive")
print("Text message to emergency contact sent")
print("Your cab has been booked, thank you for not driving")
print("have a safe journey!")
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
def sound_alarm():
print('You Sleep You Lose')
def eye_aspect_ratio(eye):
# Computes the euclidean distances between the two sets of eyes
A = dist.euclidean(eye[1], eye[5])
B = dist.euclidean(eye[2], eye[4])
C = dist.euclidean(eye[0], eye[3])
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
ear = (A + B) / (2.0 * C)
return ear
shape_predictor = "./files/shape_predictor_68_face_landmarks.dat"
EYE_AR_THRESH = 0.2 # If the EAR goes < this for 48 frames, it is counted as drowsiness
EYE_AR_CONSEC_FRAMES = 24
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
########################
# Drowsiness Detection #
########################
def sleepiness():
2019-05-24 18:47:13 +01:00
COUNTER = 0
#ALERT = False
2019-05-11 17:08:22 +01:00
print("Initialising Facial Landmark Predictor...")
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(shape_predictor)
(lStart, lEnd) = face_utils.FACIAL_LANDMARKS_IDXS["left_eye"]
(rStart, rEnd) = face_utils.FACIAL_LANDMARKS_IDXS["right_eye"]
print("Starting Video Stream...")
vs = VideoStream(src=0).start()
time.sleep(1.0)
while True:
frame = vs.read()
frame = imutils.resize(frame, width=450)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
rects = detector(gray, 0)
for rect in rects:
shape = predictor(gray, rect)
shape = face_utils.shape_to_np(shape)
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
leftEye = shape[lStart:lEnd]
rightEye = shape[rStart:rEnd]
leftEAR = eye_aspect_ratio(leftEye)
rightEAR = eye_aspect_ratio(rightEye)
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
ear = (leftEAR + rightEAR) / 2.0
leftEyeHull = cv2.convexHull(leftEye)
rightEyeHull = cv2.convexHull(rightEye)
cv2.drawContours(frame, [leftEyeHull], -1, (0, 255, 0), 1)
cv2.drawContours(frame, [rightEyeHull], -1, (0, 255, 0), 1)
if ear < EYE_AR_THRESH:
COUNTER += 1
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
# if the eyes were closed for a sufficient number of
# then sound the alarm
if COUNTER >= EYE_AR_CONSEC_FRAMES:
# if the alarm is not on, turn it on
if not ALARM_ON:
ALARM_ON = True
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
# draw an alarm on the frame
cv2.putText(frame, "Sleepiness Detected!", (10, 30),
2019-05-24 18:47:13 +01:00
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
2019-05-11 17:08:22 +01:00
print("Sleepiness Detected!")
2019-05-24 18:47:13 +01:00
print("\n \n \n")
2019-05-11 17:08:22 +01:00
zomato()
time.sleep(5)
exit()
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
else:
COUNTER = 0
ALARM_ON = False
cv2.putText(frame, "Ratio: {:.2f}".format(ear), (300, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
# breaks loop on q
if key == ord("q"):
break
2019-05-24 18:47:13 +01:00
2019-05-11 17:08:22 +01:00
cv2.destroyAllWindows()
vs.stop()
##############################
# Speed Limit Fetch Function #
##############################
def maxspeed(coordinates, radius):
lat, lon = coordinates
api = overpy.Overpass()
#######################
# Query for Open Maps #
2019-05-24 18:47:13 +01:00
#######################
2019-05-11 17:08:22 +01:00
result = api.query("""
way(around:""" + radius + """,""" + lat + """,""" + lon + """) ["maxspeed"];
(._;>;);
out body;
""")
results_list = []
for way in result.ways:
2019-05-24 18:47:13 +01:00
road = {}
road["name"] = way.tags.get("name", "n/a")
road["speed_limit"] = way.tags.get("maxspeed", "n/a")
nodes = []
for node in way.nodes:
nodes.append((node.lat, node.lon))
road["nodes"] = nodes
results_list.append(road)
return results_list
2019-05-11 17:08:22 +01:00
2019-05-24 18:47:13 +01:00
#########################################################################
# Gives data to the function and gets json in return. #
#This json is then parsed. Then the double quotes are stripped off it #
#########################################################################
2019-05-11 17:08:22 +01:00
def speedlim():
speedLimit = sjson.dumps(maxspeed((str(glat), str(glng)), radius)[0]['speed_limit']).strip('\"')
2019-05-24 18:47:13 +01:00
while True:
carSpeedDummy = open("./files/carSpeed.txt", "r")
2019-05-11 17:08:22 +01:00
carSpeed = carSpeedDummy.read()
#print(carSpeed)
#carSpeed = 29
2019-05-24 18:47:13 +01:00
while int(carSpeed) > int(speedLimit):
carSpeedDummy = open("./files/carSpeed.txt", "r")
2019-05-11 17:08:22 +01:00
carSpeed = carSpeedDummy.read()
print("Over The Speed Limit")
time.sleep(5)
2019-05-24 18:47:13 +01:00
while int(carSpeed) <= int(speedLimit):
carSpeedDummy = open("./files/carSpeed.txt", "r")
2019-05-11 17:08:22 +01:00
carSpeed = carSpeedDummy.read()
print("Under the Speed Limit")
time.sleep(5)
##################
# Sudden Braking #
##################
def brakes():
2019-05-24 18:47:13 +01:00
while True:
tweetMe()
2019-05-11 17:08:22 +01:00
#####################################################
# Using Argument Parse to run one command at a time #
#####################################################
parser = argparse.ArgumentParser()
2019-05-24 18:47:13 +01:00
FUNCTION_MAP = {'overspeed' : speedlim, 'sleep-detector' : sleepiness,
'sudden-braking' : brakes, 'drunk' : drunk}
2019-05-24 17:49:42 +01:00
parser.add_argument('function', choices=list(FUNCTION_MAP))
2019-05-11 17:08:22 +01:00
args = parser.parse_args()
2019-05-24 18:47:13 +01:00
FUNC = FUNCTION_MAP[args.function]
FUNC()