Skip to content
This repository has been archived by the owner on Dec 9, 2024. It is now read-only.

feat: preprocessing and timestamps #27

Merged
merged 15 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions app/app.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import os.path
import logging
import shutil
from typing import Optional
import utils
import web_cli
from extract_text import ExtractText
from flask import Flask, render_template, request, send_file, redirect
import html
import glob
from typing import Optional
from extract_text import ExtractText
from flask import Flask, render_template, request, send_file, redirect, jsonify
from preprocess import scan_video_for_code_frames, get_full_code
from socket_util import init_socketio

# Initialise flask app
app = Flask(__name__, static_url_path='/static', static_folder='static')
# Initialise flask socket
init_socketio(app)
# Current video
filename: Optional[str] = None
# Flag to check if the search process should be canceled
Expand Down Expand Up @@ -192,6 +196,27 @@ def upload_video():
return redirect("/upload")


@app.route("/start_processing_video", methods=['POST'])
def start_processing_video():
current_settings = utils.get_current_settings()
if filename and current_settings["UserSettings"]["preprocess_videos"] == 'True':
llama_endpoint = current_settings["AppSettings"]["llama_endpoint"]
preprocess_interval = int(current_settings["UserSettings"]["preprocess_interval"])
preprocess_format_code = current_settings["UserSettings"]["preprocess_format_code"] == 'True'
preprocess_code_explanation = current_settings["UserSettings"]["preprocess_code_explanation"] == 'True'
programming_language = current_settings["UserSettings"]["programming_language"]
preprocess_llm = current_settings["UserSettings"]["preprocess_llm"]
json = scan_video_for_code_frames(filename, llama_endpoint, preprocess_interval, programming_language, preprocess_format_code, preprocess_code_explanation, preprocess_llm)

output_file = f"{utils.get_processed_vid_info_save_path()}/{filename}.json"
with open(output_file, "w") as output:
output.write(json)

return "success"
else:
return "error"


@app.route("/play_video/<play_filename>")
def video(play_filename):
"""
Expand All @@ -202,7 +227,9 @@ def video(play_filename):
if utils.filename_exists_in_userdata(play_filename):
global filename
filename = play_filename
return render_template("player.html", filename=filename, video_data=utils.get_video_data(filename))
current_settings = utils.get_current_settings()
preprocess_video = current_settings['UserSettings']['preprocess_videos'] == 'True'
return render_template("player.html", filename=filename, video_data=utils.get_video_data(filename), current_settings=current_settings, preprocess_video=preprocess_video)
return redirect("/")


Expand Down
55 changes: 31 additions & 24 deletions app/config.example.ini
Original file line number Diff line number Diff line change
@@ -1,35 +1,42 @@
# Application settings
[AppSettings]
openai_api_key = your_openai_api_key_here
tesseract_executable = your_path_to_tesseract_here
ide_executable = your_path_to_ide_here
openai_api_key = your_openai_api_key_here
llama_endpoint = http://localhost:11434/api/chat
tesseract_executable = your_path_to_tesseract_here
ide_executable = your_path_to_ide_here
# Formatting options for extracted text
[Formatting]
openai_analysis = False
remove_language_name = False
remove_backticks = False
openai_analysis = False
remove_language_name = False
remove_backticks = False
# Hotkeys to control web interface
[Hotkeys]
play_video = Space
rewind_video = ArrowLeft
skip_video = ArrowRight
open_home_page = ShiftLeft,KeyH
mute_ui_sounds = ShiftLeft,KeyM
mute_video = ShiftLeft,KeyV
capture_code = ShiftLeft,KeyC
open_in_ide = ShiftLeft,KeyS
upload_video = ShiftLeft,KeyU
open_settings_page = ShiftLeft,KeyT
open_web_cli = ShiftLeft,KeyW,KeyD
play_video = Space
rewind_video = ArrowLeft
skip_video = ArrowRight
open_home_page = ShiftLeft,KeyH
mute_ui_sounds = ShiftLeft,KeyM
mute_video = ShiftLeft,KeyV
capture_code = ShiftLeft,KeyC
open_in_ide = ShiftLeft,KeyS
upload_video = ShiftLeft,KeyU
open_settings_page = ShiftLeft,KeyT
open_web_cli = ShiftLeft,KeyW,KeyD
# User settings for the application
[UserSettings]
programming_language = Python
capture_output_path = output_path
video_save_path = output_path
mute_ui_sounds = False
username = None
collaborate_pass_hash = None
server_auth_token = None
preprocess_videos = True
preprocess_llm = llama
preprocess_interval = 5
preprocess_format_code = True
preprocess_code_explanation = True
programming_language = Python
capture_output_path = output_path
video_save_path = output_path
processed_video_save_path = output_path
mute_ui_sounds = False
username = None
collaborate_pass_hash = None
server_auth_token = None
# Additional features of the application
[Features]
use_youtube_downloader = False
237 changes: 237 additions & 0 deletions app/preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import logging
import math
import openai
import requests
import pytesseract
import re
import cv2
import json
import utils
from socket_util import get_socketio


def query_llama(llama_endpoint, system_prompt, prompt, model="llama3", images=None):
"""
Query the llama AI LLM
:param llama_endpoint: The endpoint of the llama AI LLM
:param system_prompt: The System prompt to use
:param prompt: The prompt to use
:param model: The AI model to use
:param images: Images for AI model to use. By default, this is not needed as only the llava model supports it.
:return: The AI's response
"""
if images is None:
images = []

req = requests.post(llama_endpoint, json={
'model': model,
'messages': [
{
'role': 'system',
'content': system_prompt
},
{
'role': 'user',
'content': prompt,
'images': images
}
],
'stream': False
})

print(req)
body = req.json()
return body['message']['content']


def query_openai(irrelevant, system_prompt, prompt):
"""
Query the OpenAI API
:param irrelevant: This parameter is irrelevant as it is the llama endpoint
:param system_prompt: The system prompt to provide to OpenAI
:param prompt: The prompt to provide to OpenAI
:return: The OpenAI response
"""
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
)
return response['choices'][0]['message']['content']
except openai.OpenAIError as error:
logging.error(error)
return None


def remove_unnecessary_chars(string):
"""
Removes any unnecessary characters (characters that aren't valid to be used within actual code)
:param string: The string you want to remove unnecessary characters from
:return: The cleaned string
"""
pattern = r'[^a-zA-Z0-9_\-+=<>/*&|%!@#$^(){}\[\];:,.\'\"`~\\ ]'
return re.sub(pattern, '', string)
# return string


def extract_text_from_frame(frame):
"""
Extracts all text from frame, removes unnecessary characters and returns it
:param frame: The frame to extract text from
:return: Extracted Text
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
text = pytesseract.image_to_string(binary)
code_lines = [remove_unnecessary_chars(line) for line in text.split('\n')]
return '\n'.join(code_lines)


def clean_code_text(text):
"""
Removes any lines from a text blob that are definitely not proper code (special thanks to ChatGPT for the regex)
:param text: The text to clean
:return: The cleaned text
"""
patterns_to_remove = [
r'^\s*$', # Empty lines or lines with only whitespace
r'^\s*\d+\s*$', # Lines with only numbers
r'^\s*PROBLEMS.*$', # Lines starting with PROBLEMS
r'^\s*Error.*$', # Lines indicating errors
r'^\s*Traceback.*$', # Traceback lines
r'^\s*File.*line\s*\d+', # File path and line number often in error messages
r'^\s*===.*$', # Lines with only equals signs (often in logs or outputs)
r'^\s*---.*$', # Lines with only dashes (often in logs or outputs)
r'^\s*==>.*$', # Lines starting with ==> (often in logs or outputs)
r'^\s*\*.*$', # Lines starting with an asterisk
r'^\s*\&.*$', # Lines starting with ampersand
]

combined_pattern = re.compile('|'.join(patterns_to_remove), re.IGNORECASE)
cleaned_lines = [line for line in text.split('\n') if not combined_pattern.match(line)]
return '\n'.join(cleaned_lines)


def extract_code(text):
"""
Since the AI responds with backticks, extract the text from between the backticks
:param text: The AI response
:return: The extracted text or None
"""
match = re.search(r"```(.*?)```", text, re.DOTALL)
if match:
return match.group(1).strip()
else:
return None


def get_full_code(code_json):
unique_lines = set()

for item in code_json:
code_lines = item["code"].splitlines()
unique_lines.update(code_lines)

return "\n".join(unique_lines) + "\n"


def scan_video_for_code_frames(filename, llama_endpoint, interval_seconds=5, programming_language="Python", provide_formatted_code=True, provide_code_explanations=True, llm="llama"):
"""
Scans a video for frames containing code at approximately every interval_seconds.
:param filename: The files name
:param llama_endpoint: The endpoint of the llama AI LLM
:param interval_seconds: Interval in seconds to capture frames
:param programming_language: The programming language of the video
:param provide_formatted_code: Whether the formatted code is included in the response
:param provide_code_explanations: Whether the code explanation is included in the response
:return: JSON object with timestamps and detected code.
"""
is_code_prompt = (
f"You are a helpful coding assistant. Your task is to determine if the given text contains valid {programming_language} code. "
"If the text contains valid code, respond with true. Do not worry if the code is not functional, "
"you just need to reply with true if it contains valid code. Otherwise, respond with false. Do not include "
"anything other than true or false in your response."
)

format_code_prompt = (
f"You have determined this text contains valid {programming_language} code. Your task is to extract the valid "
f"code, correct any syntax errors, properly indent it, and respond only with the fixed code."
"If no valid Python code can be extracted, respond with ERROR. Do not include anything other than the fixed "
"code or ERROR in your response."
)

describe_code_prompt = (
f"You are a helpful coding assistant. You will be given blocks of {programming_language} code and your "
"job is to describe what they do to the best of your ability. Do NOT include a title. "
"Only include a simple explanation. Your explanation is to be a maximum of ~20-30 words. "
"Do NOT go over this limit. If you cannot provide an explanation, simply respond with Unknown. "
"Do not reply with anything other than a simple explanation or Unknown."
)

provide_full_code_prompt = (
f"You are a helpful coding assistant. The language you are working in is {programming_language}. "
f"You will be given blocks of code and your job is to take those blocks of code and "
f"properly format them, ensuring that there are no duplicate. This includes functions doing the same thing. "
f"If 2 functions do the same thing, you must determine the most updated one and use that. You are NOT to add "
f"new imports, or use things that were not already being used. You are also to remove unnecessary lines, "
f"ensure that everything is correct syntactically and that there are proper indentations. You must then "
f"return the corrected version. You must NOT return anything other than the corrected code, or ERROR if you "
f"failed to correct it."
)

to_query = llm == "llama" and query_llama or query_openai
video_path = f"{utils.get_vid_save_path()}\\{filename}"
socketio = get_socketio()
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * interval_seconds)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

frame_count = 0
code_frames = []

while cap.isOpened():
ret, frame = cap.read()
if not ret:
break

if frame_count % frame_interval == 0:
print(f"Processing frame {frame_count}")
socketio.emit("processingProgressUpdate", f"{math.floor((frame_count / total_frames) * 100)}%")
text = extract_text_from_frame(frame)
is_code = to_query(llama_endpoint, is_code_prompt, text)
if is_code and is_code.strip().lower() == 'true':
formatted_code, code_explanation = None, None
timestamp = frame_count / fps
print(f"Code detected at timestamp: {timestamp}")

if provide_formatted_code:
formatted_code = extract_code(to_query(llama_endpoint, format_code_prompt, text))

if not formatted_code:
print(f"Could not extract code from frame {frame_count}")
frame_count += 1
continue

if provide_code_explanations:
code_explanation = to_query(llama_endpoint, describe_code_prompt, text)
print(f"Code from frame {frame_count} explanation: {code_explanation}")

code_frames.append({"seconds": timestamp, "code": formatted_code, "explanation": code_explanation})
socketio.emit("timestampsUpdate", code_frames)

frame_count += 1

full_code = None

if len(code_frames) >= 1 and provide_formatted_code:
formatted = to_query(llama_endpoint, provide_full_code_prompt, get_full_code(code_frames))
full_code = extract_code(formatted)

socketio.emit("finishedProcessing", full_code)
cap.release()
cv2.destroyAllWindows()
return json.dumps(code_frames, indent=4)
12 changes: 12 additions & 0 deletions app/socket_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from flask_socketio import SocketIO

socket = None


def init_socketio(app):
global socket
socket = SocketIO(app)


def get_socketio():
return socket
Loading
Loading