Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BIT France analysis #17

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,6 @@ outputs/*
# data files
*.csv
poetry.lock
poetry.lock

# quarto
*.html
16 changes: 16 additions & 0 deletions dsp_interview_transcripts/pipeline/bit_france/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

The scripts run in this order:

1. `analysis/convert_txt.py`

2. `analysis/convert_txt_df.py`

3. `analysis/assign_roles.py`

4. `analysis/topic_modelling.py`

5. `analysis/prep_outputs.py`

6. `../name_clusters.py` (yes, this script is outside the `bit_france/` directory)

7. `analysis/prep_outputs_for_report.py`
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Assign interview roles based on mapping table
"""
import pandas as pd

from dsp_interview_transcripts import PROJECT_DIR
from dsp_interview_transcripts import logger


def assign_role(row):
# This rule is an exception because for some reason this file name doesn't parse
if "E25" in row["file_name"] and row["speaker_id"] == 0:
return "informant"
if row["informant"] == 0 and row["speaker_id"] == 0:
return "informant"
elif row["informant"] == 1 and row["speaker_id"] == 1:
return "informant"
else:
return "other"


if __name__ == "__main__":
mapping = pd.read_csv(f"{PROJECT_DIR}/data/bit_france/converted/file_mapping.csv")
data = pd.read_csv(f"{PROJECT_DIR}/data/bit_france/converted/combined_transcripts.csv")

data_with_mapping = data.merge(mapping[["file_name", "informant"]], on="file_name", how="left")

data_with_mapping["role"] = data_with_mapping.apply(assign_role, axis=1)
roles = data_with_mapping[data_with_mapping["role"] == "informant"][["file_name", "speaker_id"]].drop_duplicates()
for idx, row in roles.iterrows():
logger.info(f"File: {row['file_name']} Speaker ID: {row['speaker_id']}")

data_with_mapping.to_csv(
f"{PROJECT_DIR}/data/bit_france/converted/combined_transcripts_with_roles.csv", index=False
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import re

from collections import defaultdict
from typing import Dict
from typing import Iterator

import numpy as np

from sklearn.feature_extraction.text import TfidfVectorizer


def remove_non_alphanumeric(text: str) -> str:
return re.sub(r"[^0-9a-zA-Z ]+", "", text)


def simple_tokenizer(text: str) -> Iterator[str]:
return [token.strip() for token in text.split(" ") if len(token) > 0]


def cluster_texts(documents: Iterator[str], cluster_labels: Iterator) -> Dict:
"""
Creates a large text string for each cluster, by joining up the
text strings (documents) belonging to the same cluster
Args:
documents: A list of text strings
cluster_labels: A list of cluster labels, indicating the membership of the text strings
Returns:
A dictionary where keys are cluster labels, and values are cluster text documents
"""

assert len(documents) == len(cluster_labels)
doc_type = type(documents[0])

cluster_text_dict = defaultdict(doc_type)
for i, doc in enumerate(documents):
if doc_type is str:
cluster_text_dict[cluster_labels[i]] += doc + " "
elif doc_type is list:
cluster_text_dict[cluster_labels[i]] += doc
return cluster_text_dict


def cluster_keywords(
documents: Iterator[str],
cluster_labels: Iterator[int],
n: int = 10,
tokenizer=simple_tokenizer,
max_df: float = 0.90,
min_df: float = 0.01,
Vectorizer=TfidfVectorizer,
) -> Dict:
"""
Generates keywords that characterise the cluster, using the specified Vectorizer
Args:
documents: List of (preprocessed) text documents
cluster_labels: List of integer cluster labels
n: Number of top keywords to return
Vectorizer: Vectorizer object to use (eg, TfidfVectorizer, CountVectorizer)
tokenizer: Function to use to tokenise the input documents; by default splits the document into words
Returns:
Dictionary that maps cluster integer labels to a list of keywords
"""

# Define vectorizer
vectorizer = Vectorizer(
analyzer="word",
tokenizer=tokenizer,
preprocessor=lambda x: x,
token_pattern=None,
max_df=max_df,
min_df=min_df,
max_features=10000,
)

# Create cluster text documents
cluster_documents = cluster_texts(documents, cluster_labels)
unique_cluster_labels = list(cluster_documents.keys())

# Apply the vectorizer
token_score_matrix = vectorizer.fit_transform(list(cluster_documents.values()))

# Create a token lookup dictionary
id_to_token = dict(zip(list(vectorizer.vocabulary_.values()), list(vectorizer.vocabulary_.keys())))

# For each cluster, check the top n tokens
top_cluster_tokens = {}
for i in range(token_score_matrix.shape[0]):
# Get the cluster feature vector
x = token_score_matrix[i, :].todense()
# Find the indices of the top n tokens
x = list(np.flip(np.argsort(np.array(x)))[0])[0:n]
# Find the tokens corresponding to the top n indices
top_cluster_tokens[unique_cluster_labels[i]] = [id_to_token[j] for j in x]

return top_cluster_tokens
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os

from pathlib import Path

from docx import Document

from dsp_interview_transcripts import PROJECT_DIR


def convert_docx_to_txt(input_dir, output_dir):
os.makedirs(output_dir, exist_ok=True)

for file_name in os.listdir(input_dir):
if file_name.endswith(".docx"):
input_path = os.path.join(input_dir, file_name)
output_file_name = os.path.splitext(file_name)[0] + ".txt"
output_path = os.path.join(output_dir, output_file_name)

try:
doc = Document(input_path)
with open(output_path, "w", encoding="utf-8") as txt_file:
for paragraph in doc.paragraphs:
txt_file.write(paragraph.text + "\n")
print(f"Converted: {file_name} -> {output_file_name}")
except Exception as e:
print(f"Failed to convert {file_name}: {e}")


if __name__ == "__main__":

for input_directory in [
f"{PROJECT_DIR}/data/bit_france/Salariés",
f"{PROJECT_DIR}/data/bit_france/Elus",
f"{PROJECT_DIR}/data/bit_france/Décideurs",
f"{PROJECT_DIR}/data/bit_france/Médecins du travail",
]:
profession = input_directory.split("/")[-1]
output_directory = PROJECT_DIR / f"data/bit_france/converted/{profession}"
convert_docx_to_txt(input_directory, output_directory)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
import re

import pandas as pd

from dsp_interview_transcripts import PROJECT_DIR


def process_text_files(input_dir, output_csv, profession):
data = [] # List to store rows for the DataFrame

# Iterate through all files in the input directory
for file_name in os.listdir(input_dir):
if file_name.endswith(".txt"):
file_path = os.path.join(input_dir, file_name)

# Read the file content
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()

# Use regex to extract speaker ID and their spoken text
# Pattern 1: Speaker [\d]\n- Text
pattern1 = re.findall(r"Speaker (\d)\n- (.*?)\n", content, re.DOTALL)

# Pattern 2: Speaker [\d] [timestamp] - Text
pattern2 = re.findall(r"Speaker (\d)\s+\d{2}:\d{2}\s+-\s+(.*?)\n", content, re.DOTALL)

turns = pattern1 + pattern2

# Add each turn as a row in the DataFrame
for speaker_id, text in turns:
data.append(
{
"speaker_id": int(speaker_id),
"text": text.strip(),
"file_name": file_name,
"profession": profession,
}
)

# Create a DataFrame from the collected data
df = pd.DataFrame(data, columns=["speaker_id", "text", "file_name", "profession"])

# Save the DataFrame to a CSV file
df.to_csv(output_csv, index=False, encoding="utf-8")

print(f"Processed files saved to {output_csv}")


if __name__ == "__main__":

input_dirs = [
f"{PROJECT_DIR}/data/bit_france/converted/Salariés",
f"{PROJECT_DIR}/data/bit_france/converted/Elus",
f"{PROJECT_DIR}/data/bit_france/converted/Décideurs",
f"{PROJECT_DIR}/data/bit_france/converted/Médecins du travail",
]

for input_directory in input_dirs:
profession = input_directory.split("/")[-1]
output_directory = PROJECT_DIR / f"data/bit_france/converted/{profession}/output.csv"

process_text_files(input_directory, output_directory, profession)

dataframes = []

for directory in input_dirs:
file_path = os.path.join(directory, "output.csv")
if os.path.exists(file_path): # Ensure the file exists
df = pd.read_csv(file_path)
dataframes.append(df)
else:
print(f"File not found: {file_path}")

combined_df = pd.concat(dataframes, ignore_index=True)
output_path = f"{PROJECT_DIR}/data/bit_france/converted/combined_transcripts.csv"
combined_df.to_csv(output_path, index=False)

print(f"Combined CSV saved to {output_path}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import re

from dsp_interview_transcripts import PROJECT_DIR


def find_file_with_smallest_number(directory):
"""
Finds the file ending in '.txt' within the given directory that contains the smallest float number.

Parameters:
directory (str): The path to the directory containing the '.txt' files.

Returns:
str: The name of the file containing the smallest float number, or None if no '.txt' files are found.
"""
smallest_number = float("inf")
smallest_file = None

# Regex to match float numbers in the file content
float_pattern = re.compile(r"[-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?")

# Iterate through files in the directory
for filename in os.listdir(directory):
if filename.endswith(".txt"):
file_path = os.path.join(directory, filename)
try:
with open(file_path, "r") as file:
content = file.read()

# Find all float numbers in the file content
numbers = [float(match) for match in float_pattern.findall(content)]

# Check for the smallest number
if numbers:
min_number = min(numbers)
if min_number < smallest_number:
smallest_number = min_number
smallest_file = filename

except (ValueError, IOError) as e:
print(f"Error processing file {filename}: {e}")

return smallest_file


# Example usage
directory_path = f"{PROJECT_DIR}/dsp_interview_transcripts/pipeline/bit_france/outputs"
result = find_file_with_smallest_number(directory_path)
if result:
print(f"The file with the smallest number is: {result}")
else:
print("No '.txt' files with valid numbers were found in the directory.")
Loading