forked from data-to-insight/ofsted-ilacs-scrape-tool
-
Notifications
You must be signed in to change notification settings - Fork 1
/
ofsted_childrens_services_inspection_scrape.py
1395 lines (1023 loc) · 54.7 KB
/
ofsted_childrens_services_inspection_scrape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Export options
export_summary_filename = 'ofsted_childrens_services_send_overview'
# export_file_type = 'csv' # Excel / csv currently supported
export_file_type = 'excel'
# Default (sub)folder structure
# Defined to offer some ease of onward flexibility
# data exports
root_export_folder = 'export_data' # <all> exports folder
inspections_subfolder = 'inspection_reports' # downloaded report pdfs
# data imports
import_la_data_path = 'import_data/la_lookup/'
import_geo_data_path = 'import_data/geospatial/'
geo_boundaries_filename = 'local_authority_districts_boundaries.json'
# scrape inspection grade/data from pdf reports
pdf_data_capture = True # True is default (scrape within pdf inspection reports for inspection results etc)
# This impacts run time E.g False == ~1m20 / True == ~ 4m10
# False == only pdfs/list of LA's+link to most recent exported. Not inspection results.
repo_path = '/workspaces/ofsted-send-scrape-tool'
#
# Ofsted site/page admin settings
max_page_results = 200 # Set max number of search results to show on page(MUST be > total number of LA's!)
url_stem = 'https://reports.ofsted.gov.uk/'
# search url equates to Ofsted base search criteria of 'childrens social care + local authority childrens services'
search_url = 'search?q=&location=&lat=&lon=&radius=&level_1_types=3&level_2_types%5B%5D=12'
max_page_results_url = '&rows=' + str(max_page_results) # Coerce results page to display ALL providers on single results page without next/pagination
# resultant complete url to process
url = url_stem + search_url + max_page_results_url
# #
# # In progress Ofsted site/search link refactoring
# search_category = 3 # Default 3 == 'Childrens social care' (range 1 -> 4)
# search_sub_category = 12 # Default 12 == 'Local Authority Childrens Services' (range 8 -> 12)
# url_search_stem = 'search?q=&location=&radius='
# url = url_stem + url_search_stem + '&level_1_types=' + str(search_category) + '&level_2_types=' + str(search_sub_category) + max_page_results_url
#
# Script admin settings
# Standard library imports
import os
import io
import re
import json
from datetime import datetime, timedelta
# Third-party library imports
import requests
import git # possible case for just: from git import Repo
from requests.exceptions import RequestException
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from dateutil.relativedelta import relativedelta
# pdf search/data extraction
try:
import fitz # PyMuPDF
import tabula
import PyPDF2
except ModuleNotFoundError:
print("Please install 'tabula-py' and 'PyPDF2' using pip")
# handle optional excel export+active file links
try:
import xlsxwriter
except ModuleNotFoundError:
print("Please install 'openpyxl' and 'xlsxwriter' using pip")
try:
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer
except ModuleNotFoundError:
print("Please install 'scikit-learn' using pip")
# Configure logging/logging module
import warnings
import logging
# wipe / reset the logging file
with open('output.log', 'w'):
# comment out if maintaining ongoing/historic log
pass
# Keep warnings quiet unless priority
logging.getLogger('org.apache.pdfbox').setLevel(logging.ERROR)
warnings.filterwarnings('ignore')
logging.basicConfig(filename='output.log', level=logging.INFO, format='%(asctime)s - %(message)s')
#
# Function defs
def get_soup(url, retries=3, delay=5):
"""
Given a URL, returns a BeautifulSoup object + request error handling
Args:
url (str): The URL to fetch and parse
retries (int): Number of retries on network errors
delay (int): Delay between retries in seconds
Returns:
BeautifulSoup: The parsed HTML content, or None if an error occurs
"""
timeout_seconds = 10 # lets not assume the Ofsted page is up, avoid over-pinging
for attempt in range(retries):
try:
response = requests.get(url, timeout=timeout_seconds)
response.raise_for_status() # any HTTP errors?
soup = BeautifulSoup(response.content, 'html.parser')
return soup
except Timeout:
print(f"Timeout getting URL '{url}' on attempt {attempt + 1}. Retrying after {delay} secs...")
time.sleep(delay)
except HTTPError as e:
print(f"HTTP error getting URL '{url}': {e}")
return None # end retries on client and server errors
except RequestException as e:
print(f"Request error getting URL '{url}': {e}")
if attempt < retries - 1:
print(f"Retrying after {delay} secs...")
time.sleep(delay) # pause to assist not getting blocked
else:
print("Max rtry attempts reached, giving up")
return None
except Exception as e:
print(f"Unexpected error occurred: {e}")
return None
return None # All the retries failed / stop point
def clean_provider_name(name):
"""
Cleans the la/provider name according to:
- expected output based on existing ILACS sheet
- historic string issues seen on Ofsted site
Args:
name (str): The original name to be cleaned.
Returns:
str: The cleaned name.
"""
# Convert to lowercase and remove extra spaces
name = name.lower().replace(' ', ' ')
# Remove specific phrases
name = name.replace("royal borough of ", "").replace("city of ", "").replace("metropolitan district council", "").replace("london borough of", "").replace("council of", "")
# Remove further undesired 'single' words and join the remaining parts
name_parts = [part for part in name.split() if part not in ['city', 'metropolitan', 'borough', 'council', 'county', 'district', 'the']]
return ' '.join(name_parts)
## Need to refactor the below funcs. Lots of duplication going on
def format_date(date_str: str, input_format: str, output_format: str) -> str:
"""
Convert and format a date string.
Args:
date_str (str): The input date string.
input_format (str): The format of the input date string.
output_format (str): The desired output format.
Returns:
str: The formatted date string.
"""
dt = datetime.strptime(date_str, input_format)
date_obj = dt.date()
return date_obj.strftime(output_format)
def parse_date(date_str, date_format):
try:
dt = datetime.strptime(date_str, date_format)
return dt.date() # only need date
except (TypeError, ValueError):
return None
def format_date_for_report_BAK(date_obj, output_format_str):
"""
Formats a datetime object as a string in the d/m/y format, or returns an empty string if the input is None.
Args:
date_obj (datetime.datetime or None): The datetime object to format, or None.
Returns:
str: The formatted date string, or an empty string if date_obj is None.
"""
if date_obj is not None:
return date_obj.strftime(output_format_str)
else:
return ""
def format_date_for_report(date_input, output_format_str, input_format_str=None):
"""
Formats a datetime object or a date string as a string in the specified format, or returns an empty string if the input is None.
Args:
date_input (datetime.datetime, str, or None): The datetime object or date string to format, or None.
output_format_str (str): The desired output format for the date string.
input_format_str (str, optional): The format to use for parsing the input date string, if date_input is a string.
Returns:
str: The formatted date string, or an empty string if date_input is None.
"""
if date_input is None:
return ""
if isinstance(date_input, str):
date_obj = None
if input_format_str:
try:
date_obj = datetime.strptime(date_input, input_format_str)
except ValueError:
raise ValueError(f"Report date format for {date_input} does not match {input_format_str}")
else:
# Try common date formats including two-digit yrs
formats = ["%d %B %Y", "%d/%m/%Y", "%d/%m/%y"]
for fmt in formats:
try:
date_obj = datetime.strptime(date_input, fmt)
break
except ValueError:
continue
if date_obj is None:
raise ValueError(f"Report date format for {date_input} is not supported")
elif isinstance(date_input, datetime):
date_obj = date_input
else:
raise TypeError("Report date_input must be a datetime object, a string, or None")
return date_obj.strftime(output_format_str)
## Need to refactor the above funcs. Lots of duplication going on
# testing
def extract_dates_from_text(text):
"""
Extracts and cleans inspection dates from the given text.
This has heavy outputs atm due to multiple problem report formats and ongoing testing
Args:
text (str): The text from which to extract dates.
Returns:
tuple: A tuple containing the start and end dates as strings in the format 'dd/mm/yy'.
Notes:
# Some clean up based on historic data obs from scraped reports/incl. ILACS
# Ofsted reports contain inspection date strings in multiple formats (i/ii/iii...)
# i) "15 to 26 November"
# ii) "28 February to 4 March" or "8 October to 19 October" (majority)
# iii) ['8 July ', '12 July and 7 August to'] (*recently seen)
# iv) "11 September 2017 to 5 October 2017" (double year)
# v) "Inspection dates: 19 November–30 November 2018" (Bromley)
# vi) white spaces between date numbers e.g. "wiltshire, 1 9 June 2019"
# vii) 'None' years where no recognisable was found
"""
# print("Debug: Starting date extraction")
if not text:
print("Debug: Input text is empty or None.")
raise ValueError("No text provided")
# Remove non-printing characters and multiple spaces
cleaned_text = re.sub(r'[^\x20-\x7E]', '', text)
cleaned_text = re.sub(r'\s+', ' ', cleaned_text)
# Preprocess the inspection_dates to fix split years, e.g. 20 23, 20 24 -> 2023, 2024
cleaned_text = re.sub(r"(\b20)\s+(\d{2}\b)", r"\1\2", cleaned_text)
#print(f"Debug: Cleaned text: {cleaned_text}")
# Try to capture date ranges correctly
# date_match = re.search(r"Inspection dates:\s*(.+?)(?=\s{2,}|$)", cleaned_text) - doesnt work for oxfordshire
# date_match = re.search(r"Inspection dates\s*:\s*(\d{1,2}(?: to \d{1,2})? \w+ \d{4})", cleaned_text)
# # Not implemented. But in case need to handle cases of repeating year alongside known repeating month "13 July 2023 to 21 July 2023" e.g. West Sussex
# date_match = re.search(r"Inspection dates\s*:\s*(\d{1,2}(?: \w+ \d{4})?(?: to \d{1,2})? \w+ \d{4})", cleaned_text)
date_match = re.search(r"Inspection dates\s*:\s*(\d{1,2} \w+ \d{4}) to (\d{1,2} \w+ \d{4})", cleaned_text)
if date_match:
#print(f"Debug: Primary date match found: {date_match.group(0)}")
# Extract start and end dates directly from the match
start_date_str = date_match.group(1).strip()
end_date_str = date_match.group(2).strip()
else:
#print("Debug: Primary date match not found, trying fallback method")
# Fallback to capturing single date or simpler range within the same month
date_match = re.search(r"Inspection dates\s*:\s*(\d{1,2}) to (\d{1,2}) (\w+) (\d{4})", cleaned_text)
if date_match:
#print(f"Debug: Fallback date match found: {date_match.group(0)}")
start_day = date_match.group(1)
end_day = date_match.group(2)
month = date_match.group(3)
year = date_match.group(4)
start_date_str = f"{start_day} {month} {year}"
end_date_str = f"{end_day} {month} {year}"
else:
print("Debug: No inspection dates found.")
raise ValueError("No inspection dates found")
# Clean and format the extracted dates
try:
start_date = datetime.strptime(start_date_str, "%d %B %Y").strftime("%d/%m/%y")
end_date = datetime.strptime(end_date_str, "%d %B %Y").strftime("%d/%m/%y")
#print(f"Debug: Formatted start date: {start_date}")
#print(f"Debug: Formatted end date: {end_date}")
except ValueError as ve:
print(f"Error converting date: {ve}")
raise ValueError("Date conversion failed")
# Now handle previous inspection dates if present in the same cleaned_text
previous_inspection_match = re.search(r"Dates? of previous inspection:\s*(\d{1,2}) to (\d{1,2}) (\w+) (\d{4})", cleaned_text)
if previous_inspection_match:
#print(f"Debug: Previous inspection match found: {previous_inspection_match.groups()}")
previous_start_day = previous_inspection_match.group(1)
previous_end_day = previous_inspection_match.group(2)
previous_month = previous_inspection_match.group(3)
previous_year = previous_inspection_match.group(4)
previous_end_date_str = f"{previous_end_day} {previous_month} {previous_year}"
try:
previous_end_date = datetime.strptime(previous_end_date_str, "%d %B %Y").strftime("%d/%m/%Y")
#print(f"Debug: Formatted previous inspection end date: {previous_end_date}")
except ValueError as ve:
print(f"Error converting previous inspection date: {ve}")
previous_end_date = "01/01/1900" # Placeholder date for conversion errors
else:
#print("Debug: No previous inspection date found, using placeholder.")
previous_end_date = "01/01/1900" # Placeholder date if no match found
# Final debug print to verify results
print(f"\nStart Date: {start_date}, End Date: {end_date}, Previous Inspection End Date: {previous_end_date}")
return start_date, end_date, previous_end_date
def extract_inspection_data_update(pdf_content):
"""
Function to extract key details from inspection reports PDF.
Args:
pdf_content (bytes): The raw content of the PDF file to be processed.
Returns:
dict: A dictionary containing the extracted details. The dictionary keys are as follows:
- 'table_rows_found': Number of rows found in the table.
- 'inspector_name': The name of the inspector.
- 'overall_inspection_grade': The overall effectiveness grade.
- 'inspection_start_date': The start date of the inspection.
- 'inspection_end_date': The end date of the inspection.
- 'inspection_framework': The inspection framework string.
- 'impact_of_leaders_grade': The impact of leaders grade.
- 'help_and_protection_grade': The help and protection grade.
- 'in_care_grade': The in care grade.
- 'care_leavers_grade': The care leavers grade.
- 'sentiment_score': The sentiment score of the inspection report.
- 'sentiment_summary': The sentiment summary of the inspection report.
- 'main_inspection_topics': List of key inspection themes.
Raises:
ValueError: If the PDF content is not valid or cannot be processed correctly.
Note:
This function expects the input PDF to contain specific sections specifically
the inspection judgements to be on page 1 (page[0])
If the PDF structure is different, obv the function will need changing.
"""
# Create a file-like buffer for the PDF content
with io.BytesIO(pdf_content) as buffer:
# Read the PDF content for text extraction
reader = PyPDF2.PdfReader(buffer)
# Extract the first page of inspection report pdf
first_page_text = reader.pages[0].extract_text()
# Not needed in SEND extract(yet) - at least not for overview summary
# # Extract text from <all> pages in the pdf
# full_text = ''
# for page in reader.pages:
# full_text += page.extract_text()
# # Carry over for ref from ILACS. Not used in SEND
# # Find the inspector's name using a regular expression
# match = re.search(r"Lead inspector:\s*(.+)", first_page_text)
# if match:
# inspector_name = match.group(1)
# inspector_name = inspector_name.split(',')[0].strip() # Remove everything after the first comma (some contain '.., Her Majesty’s Inspector')
# inspector_name = inspector_name.replace("HMI", "").rstrip() # Remove "HMI" and any trailing spaces(some inspectors add this to name)
# else:
# inspector_name = None
# remove all non-printing chars from text content
first_page_text= re.sub(r'[^\x20-\x7E]', '', first_page_text)
# extract and format inspection dates
try:
# Attempt to extract and format dates
start_date_formatted, end_date_formatted, previous_inspection_date = extract_dates_from_text(first_page_text)
# Validate the start date
try:
datetime.strptime(start_date_formatted, "%d/%m/%y")
except (ValueError, TypeError) as e:
print(f"Error with start date: {e}")
start_date_formatted = None
# Validate the end date
try:
datetime.strptime(end_date_formatted, "%d/%m/%y")
except (ValueError, TypeError) as e:
print(f"Error with end date: {e}")
end_date_formatted = None
# Validate the previous inspection date
try:
datetime.strptime(previous_inspection_date, "%d/%m/%Y")
except (ValueError, TypeError) as e:
print(f"Error with previous inspection date: {e}")
previous_inspection_date = None
except ValueError as e:
# If there was a broader issue with the extraction function itself
start_date_formatted = None
end_date_formatted = None
previous_inspection_date = None
print(f"Error: {e}")
# end test block
return {
# main inspection details
# 'inspector_name': inspector_name,
# 'overall_inspection_grade': inspection_grades_dict['overall_effectiveness'],
'inspection_start_date': start_date_formatted,
'inspection_end_date': end_date_formatted,
'previous_inspection_date': previous_inspection_date
# # inspection sentiments (in progress)
# 'sentiment_score': round(sentiment_val, 4),
# 'sentiment_summary': sentiment_summary_str,
# 'main_inspection_topics': key_inspection_themes_lst,
# 'table_rows_found':len(df)
}
# testing only
def find_non_printable_characters(text):
"""
TEST Finds and prints non-printable characters in the text.
Args:
text (str): The text to check for non-printable characters.
Returns:
None
"""
non_printable = ''.join(ch for ch in text if ord(ch) < 32 or ord(ch) > 126)
if non_printable:
print(f"Non-printable characters found: {non_printable}")
else:
print("No non-printable characters found.")
def clean_pdf_content(pdf_content):
# Check if pdf_content is bytes and decode to string
if isinstance(pdf_content, bytes):
pdf_content = pdf_content.decode('utf-8', errors='ignore')
# Rem non-printing characters + non-text data
text_content = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', pdf_content)
# Rem remaining PDF encoding remnants and metadata
text_content = re.sub(r'\\x[a-fA-F0-9]{2}', '', text_content)
text_content = re.sub(r'[/<>\r\n]', ' ', text_content) # Remove common non-text elements
text_content = re.sub(r'\s{2,}', ' ', text_content) # Replace multiple spaces with a single space
# clean up the text
text_content = text_content.strip()
return text_content
def extract_text_from_pdf(pdf_bytes):
# Open the PDF from bytes
pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf")
extracted_text = ""
# Iterate through each page
for page_num in range(len(pdf_document)):
page = pdf_document.load_page(page_num)
extracted_text += page.get_text("text")
return extracted_text
def extract_text_by_pages(pdf_bytes):
# supercedes extract_text_from_pdf in combo with remove_unwanted_sections
pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf")
pages = []
for page_num in range(len(pdf_document)):
page = pdf_document.load_page(page_num)
text = page.get_text("text")
pages.append(text)
return pages
def remove_unwanted_sections(pages_content):
# supercedes extract_text_from_pdf in combo with extract_text_by_pages
# we know the last two pages of the reports are superfluous to content/outcome detail
cleaned_pages = []
heading_found = False
for page in pages_content:
if "Local area partnership details" in page:
heading_found = True
if not heading_found:
cleaned_pages.append(page)
return cleaned_pages
def clean_text(text):
# Replace newline characters that are directly joined with the following word with a space
text = re.sub(r'(?<!\n)\n(?!\n)', ' ', text)
# Remove extra newlines that don't separate paragraphs
text = re.sub(r'\n\s*\n', '\n\n', text)
# Replace double spaces with a single space
text = re.sub(r' +', ' ', text)
# Remove any trailing or leading whitespaces
text = text.strip()
text = text.replace('\n\n', ' ') # slightly frustrating brute force approach to persistent
return text
def extract_inspection_outcome_section(cleaned_text):
pattern = re.compile(r"Inspection outcome(.*?)Information about the local area partnership", re.DOTALL | re.IGNORECASE)
match = pattern.search(cleaned_text)
if match:
section = match.group(1).strip()
# Remove the last paragraph (assumes that more than 2 exist!)
# This typically only states strategic progress publishing etc.
# E.g. "Ofsted and CQC ask that the local area partnership updates and publishes ...."
paragraphs = re.split(r'\n\s*\n', section)
if len(paragraphs) > 1:
section = '\n\n'.join(paragraphs[:-1]).strip()
else:
section = section # No change if there's only one paragraph
section = clean_text(section) # Clean further non-printing chars
return section
else:
return "Inspection outcome section not found."
def determine_outcome_grade(inspection_outcome_section):
grades = {
"positive experiences": 1,
"inconsistent experiences": 2,
"significant concerns": 3
}
for phrase, grade in grades.items():
if phrase in inspection_outcome_section:
return grade
return None # If no matching phrase is found
def parse_inspection_date(date_string):
formats = ["%d %B %Y", "%d/%m/%Y", "%d/%m/%y"]
for fmt in formats:
try:
return datetime.strptime(date_string, fmt)
except ValueError:
continue
raise ValueError(f"Date format not supported {date_string} ")
def extract_next_inspection(inspection_outcome_section):
monitoring_pattern = re.compile(r"monitoring inspection will be carried out within approximately (\d+|one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve) (years?|months?)", re.IGNORECASE)
full_patterns = [
re.compile(r"full reinspection will be within approximately (\d+|one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve) (years?|months?)", re.IGNORECASE),
re.compile(r"the next full area SEND inspection will be within approximately (\d+|one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve) (years?|months?)", re.IGNORECASE)
]
# Check for monitoring inspection first
match = monitoring_pattern.search(inspection_outcome_section)
if not match:
# No intrim inspection was found, must be a full inspection due next
for pattern in full_patterns:
match = pattern.search(inspection_outcome_section)
if match:
break
if match:
# Convert text numbers to numeric
number_map = {
"one": 1, "two": 2, "three": 3, "four": 4, "five": 5,
"six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10,
"eleven": 11, "twelve": 12
}
number_str = match.group(1).lower()
time_frame = number_map.get(number_str, number_str) # Convert text to number if needed
unit = match.group(2).lower()
return f"{time_frame} {unit}"
return None # If no matching time frame is found
def calculate_next_inspection_by_date(last_inspection_date, next_inspection_timeframe):
if not last_inspection_date:
return "Last inspection date not provided"
if not next_inspection_timeframe:
return "Next inspection time frame not found"
# Parse the inspection_end_date
try:
last_inspection_date_parsed = parse_inspection_date(last_inspection_date)
except ValueError as e:
return str(e)
# Extract number and unit from next_inspection_timeframe
pattern = re.compile(r"(\d+) (years?|months?)", re.IGNORECASE)
# print(type(next_inspection_timeframe)) # testing
match = pattern.search(next_inspection_timeframe)
if match:
number = int(match.group(1))
unit = match.group(2).lower()
# testing
print(f"calculate_next_inspection_by_date/number+unit: {number}, {unit}") # testing
if 'year' in unit:
next_inspection_date = last_inspection_date_parsed + relativedelta(years=number)
elif 'month' in unit:
next_inspection_date = last_inspection_date_parsed + relativedelta(months=number)
# testing
#outgoing = next_inspection_date.strftime("%d/%m/%y")
#print(f"calculate_next_inspection_by_date/next_date: {outgoing}") # testing
return next_inspection_date.strftime("%d/%m/%y")
return "Invalid next inspection time frame"
def parse_date_new(date_input, date_format=None, output_format="%d/%m/%y", return_as_date=False):
"""
Function to parse a date string or format a datetime object into a specified format, with an option to return as a date object.
Args:
date_input (str or datetime): The date string to be parsed or datetime object to be formatted.
date_format (str, optional): A specific date format to be used for parsing. If not provided, multiple formats are tried.
output_format (str, optional): The desired format for the output date string. Defaults to "%d/%m/%y".
return_as_date (bool, optional): Whether to return the output as a datetime.date object. Defaults to False.
Returns:
str or datetime.date: The formatted date string in the specified output format, or a datetime.date object if return_as_date is True.
Raises:
ValueError: If the date string cannot be parsed with any of the supported formats.
Notes:
- Tries the provided date_format first if specified for parsing strings.
- Falls back to trying a list of common formats if date_format is not provided or fails.
- If the input is already a datetime object, formats it directly.
"""
if date_input is None:
return "" if not return_as_date else None
# Check if the input is a datetime object
if isinstance(date_input, datetime):
date_obj = date_input
else:
# Check if the date_input is already in the desired output format
try:
date_obj = datetime.strptime(date_input, output_format)
if return_as_date:
return date_obj.date()
else:
return date_input # Already in the desired output format
except (ValueError, TypeError):
pass # Continue to parsing since it's not in the desired format
# Try the provided date_format first if specified
if date_format:
try:
date_obj = datetime.strptime(date_input, date_format)
except (TypeError, ValueError):
pass
# Try the common formats
formats = ["%d %B %Y", "%d/%m/%Y", "%d/%m/%y"]
for fmt in formats:
try:
date_obj = datetime.strptime(date_input, fmt)
break
except ValueError:
continue
else:
raise ValueError(f"Date format for {date_input} is not supported")
if return_as_date:
return date_obj.date()
else:
return date_obj.strftime(output_format)
def process_provider_links(provider_links):
"""
Processes provider links and returns a list of dictionaries containing URN, local authority, and inspection link.
Args:
provider_links (list): A list of BeautifulSoup Tag objects representing provider links.
Returns:
list: A list of dictionaries containing URN, local authority, inspection link, and, if enabled, additional inspection data.
"""
data = []
global pdf_data_capture # Bool flag
global root_export_folder
global inspections_subfolder
for link in provider_links:
# Extract the URN and provider name from the web link shown
urn = link['href'].rsplit('/', 1)[-1]
la_name_str = clean_provider_name(link.text.strip())
provider_dir = os.path.join('.', root_export_folder, inspections_subfolder, urn + '_' + la_name_str)
# Create the provider directory if it doesn't exist, ready for .pdf report export into file structure
if not os.path.exists(provider_dir):
os.makedirs(provider_dir)
# Get the child page content
child_url = 'https://reports.ofsted.gov.uk' + link['href']
child_soup = get_soup(child_url)
# Find all publication links in the provider's child page
pdf_links = child_soup.find_all('a', {'class': 'publication-link'})
# Initialise a flag to indicate if an inspection link has been found
# Important: This assumes that the provider's reports are returned/organised most recent FIRST
found_inspection_link = False
# Iterate through the publication links
for pdf_link in pdf_links:
# E.g. Publication link contains
# <a class="publication-link" href="https://files.ofsted.gov.uk/v1/file/50252240" target="_blank">
# Check if the current/next href-link meets the selection criteria
# This block obv relies on Ofsted continued use of nonvisual element descriptors
# containing the type(s) of inspection text. We use "children's services inspection"
nonvisual_text = pdf_link.select_one('span.nonvisual').text.lower().strip()
# For reference:
# At this point <nonvisual_text> contains a mixed batch of the following:
# joint area child protection inspection, pdf - 30 january 2024
# children's services focused visit, pdf - 01 august 2024
# joint area child protection inspection, pdf - 06 january 2023
# children's services focused visit, pdf - 07 november 2023
# area send full inspection, pdf - 12 july 2024
# For now at least, web page|non-visual elements search terms hard-coded
if 'area' in nonvisual_text and 'send' in nonvisual_text and 'full inspection' in nonvisual_text:
# Create the filename and download the PDF (this filetype needs to be hard-coded here)
filename = nonvisual_text.replace(', pdf', '') + '.pdf'
# # For reference:
# # at this point, example var contents would be:
# print(f"pdflink:{pdf_link}") # e.g. "<a class="publication-link" href="https://files.ofsted.gov.uk/v1/file/50252437" target="_blank">
# # Area SEND full inspection <span class="nonvisual">Area SEND full inspection, pdf - 15 July 2024</span></a>"
# print(f"nonvisualtext:{nonvisual_text}") # e.g. "area send full inspection, pdf - 15 july 2024"
# print(f"filename:{filename}") # e.g. "area send full inspection - 15 july 2024.pdf"
pdf_content = requests.get(pdf_link['href']).content
with open(os.path.join(provider_dir, filename), 'wb') as f:
f.write(pdf_content)
pdf_pages_content = extract_text_by_pages(pdf_content)
pdf_pages_content_reduced = remove_unwanted_sections(pdf_pages_content)
# Combine pages back into a single text
pdf_content_reduced = "\n".join(pdf_pages_content_reduced)
# Extract the "Inspection outcome" section
inspection_outcome_section = extract_inspection_outcome_section(pdf_content_reduced)
# Determine the outcome grade
outcome_grade = determine_outcome_grade(inspection_outcome_section)
# Next inspection time-frame (comnes back as f"{time_frame} {unit}")
next_inspection = extract_next_inspection(inspection_outcome_section)
# Extract the local authority and inspection link, and add the data to the list
if not found_inspection_link:
# Capture the data that will be exported about the most recent inspection only
local_authority = provider_dir.split('_', 1)[-1].replace('_', ' ').strip()
inspection_link = pdf_link['href']
# #testing
# print(f"la:{local_authority}")
# print(f"inspectionlink:{inspection_link}")
# Extract the report published date
report_published_date_str = filename.split('-')[-1].strip().split('.')[0] # published date appears after '-'
# get/format date(s) (as dt objects)
report_published_date = format_date(report_published_date_str, '%d %B %Y', '%d/%m/%y')
# Now get the in-document data
if pdf_data_capture:
# Opt1 : ~x4 slower runtime
# Only here if we have set PDF text scrape flag to True
# Turn this off, speeds up script if we only need the inspection documents themselves to be retrieved
# Scrape inside the pdf inspection reports
# inspection_data_dict = extract_inspection_data(pdf_content)
inspection_data_dict = extract_inspection_data_update(pdf_content)
# Dict extract here for readability of returned data/onward
# # inspection basics
# overall_effectiveness = inspection_data_dict['overall_inspection_grade']
# inspector_name = inspection_data_dict['inspector_name']
inspection_start_date = inspection_data_dict['inspection_start_date']
inspection_end_date = inspection_data_dict['inspection_end_date']
previous_inspection_date = inspection_data_dict['previous_inspection_date']
# format dates for output
inspection_start_date_formatted = format_date_for_report(inspection_start_date, "%d/%m/%y")
inspection_end_date_formatted = format_date_for_report(inspection_end_date, "%d/%m/%y")
previous_inspection_date_formatted = format_date_for_report(previous_inspection_date, "%d/%m/%Y") # Note YYYY not yy (required for placeholder date)
# Format the provider directory as a file path link (in readiness for such as Excel)
provider_dir_link = f"{provider_dir}"
provider_dir_link = provider_dir_link.replace('/', '\\') # fix for Windows systems
print(f"{local_authority}") # Gives listing console output during run in the format 'data/inspection reports/urn name_of_la'
# testing
#print(f"next_inspection: {next_inspection}")
# testing
#print(f"Dict: {inspection_data_dict}")
#print(f"inspection_start_date_formatted: {inspection_start_date}")
#print(f"inspection_end_date_formatted: {inspection_end_date}")
#print(f"inspection_start_date_formatted: {inspection_start_date_formatted}")
#print(f"inspection_end_date_formatted: {inspection_end_date_formatted} | next_inspection: {next_inspection}")
# problematic end date, means more likely to get success on start date (only 2/3 days difference)
next_inspection_by_date = calculate_next_inspection_by_date(inspection_start_date_formatted, next_inspection)
# testing
#print(f"next_inspection_by_date(after processing): {next_inspection_by_date}")
data.append({
'urn': urn,
'local_authority': la_name_str,
'inspection_link': inspection_link,
'outcome_grade': outcome_grade,
'previous_inspection_date': previous_inspection_date_formatted,
'inspection_start_date': inspection_start_date_formatted,
'inspection_end_date': inspection_end_date_formatted,
'publication_date': report_published_date,
'next_inspection': next_inspection,
'next_inspection_by_date': next_inspection_by_date,
'local_link_to_all_inspections': provider_dir_link,
'inspection_outcome_text': inspection_outcome_section,
# 'inspection_framework': inspection_framework,
# 'inspector_name': inspector_name,
# 'sentiment_score': sentiment_score,
# 'sentiment_summary': sentiment_summary,
# 'main_inspection_topics': main_inspection_topics
})
else:
# Opt2 : ~x4 faster runtime
# Only grab the data/docs we can get direct off the Ofsted page
data.append({'urn': urn, 'local_authority': local_authority, 'inspection_link': inspection_link})
found_inspection_link = True # Flag to ensure data reporting on only the most recent inspection
return data
def save_data_update(data, filename, file_type='csv', hyperlink_column = None):
"""
Exports data to a specified file type.
Args:
data (DataFrame): The data to be exported.
filename (str): The desired name of the output file.
file_type (str, optional): The desired file type. Defaults to 'csv'.
hyperlink_column (str, optional): The column containing folder names for hyperlinks. Defaults to None.
Returns:
None
"""
if file_type == 'csv':
filename_with_extension = filename + '.csv'
data.to_csv(filename_with_extension, index=False)
elif file_type == 'excel':