diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a8ce7587..b1b5f058 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -60,6 +60,7 @@ repos: rev: "v2.2.6" hooks: - id: codespell + args: ["--quiet-level 3"] - repo: https://github.com/shellcheck-py/shellcheck-py rev: "v0.9.0.6" diff --git a/idc_index/index.py b/idc_index/index.py index f4abc06d..f6f85385 100644 --- a/idc_index/index.py +++ b/idc_index/index.py @@ -153,6 +153,7 @@ def _safe_filter_by_selection( patientId, studyInstanceUID, seriesInstanceUID, + sopInstanceUID, crdc_series_uuid, ): if collection_id is not None: @@ -173,6 +174,12 @@ def _safe_filter_by_selection( seriesInstanceUID, list ): raise TypeError("seriesInstanceUID must be a string or list of strings") + if sopInstanceUID is not None: + if not isinstance(sopInstanceUID, str) and not isinstance( + sopInstanceUID, list + ): + raise TypeError("sopInstanceUID must be a string or list of strings") + if crdc_series_uuid is not None: if not isinstance(crdc_series_uuid, str) and not isinstance( crdc_series_uuid, list @@ -196,6 +203,12 @@ def _safe_filter_by_selection( ) return result_df + if sopInstanceUID is not None: + result_df = IDCClient._filter_by_dicom_instance_uid( + df_index, sopInstanceUID + ) + return result_df + if seriesInstanceUID is not None: result_df = IDCClient._filter_by_dicom_series_uid( df_index, seriesInstanceUID @@ -238,6 +251,12 @@ def _filter_by_dicom_series_uid(df_index, dicom_series_uid): "SeriesInstanceUID", df_index, dicom_series_uid ) + @staticmethod + def _filter_by_dicom_instance_uid(df_index, dicom_instance_uid): + return IDCClient._filter_dataframe_by_id( + "SOPInstanceUID", df_index, dicom_instance_uid + ) + @staticmethod def get_idc_version(): """ @@ -258,7 +277,7 @@ def _check_create_directory(download_dir): def fetch_index(self, index) -> None: """ - Downloads requested index. + Downloads requested index and adds this index joined with the main index as respective class attribute. Args: index (str): Name of the index to be downloaded. @@ -277,10 +296,18 @@ def fetch_index(self, index) -> None: idc_index_data.IDC_INDEX_PARQUET_FILEPATH.parents[0], f"{index}.parquet", ) + with open(filepath, mode="wb") as file: file.write(response.content) - setattr(self.__class__, index, pd.read_parquet(filepath)) + + index_table = pd.read_parquet(filepath) + # index_table = index_table.merge( + # self.index[["series_aws_url", "SeriesInstanceUID"]], + # on="SeriesInstanceUID", how="left" + # ) + setattr(self.__class__, index, index_table) self.indices_overview[index]["installed"] = True + else: logger.error( f"Failed to fetch index from URL {self.indices_overview[index]['url']}: {response.status_code}" @@ -1010,41 +1037,40 @@ def _track_download_progress( runtime_errors = [] if show_progress_bar: - total_size_bytes = size_MB * 10**6 # Convert MB to bytes - # temporary place holder. Accurate size is calculated in the next step + total_size_to_be_downloaded_bytes = size_MB * (10**6) initial_size_bytes = 0 # Calculate the initial size of the directory for directory in list_of_directories: initial_size_bytes = IDCClient._get_dir_sum_file_size(directory) - logger.info("Initial size of the directory: %s bytes", initial_size_bytes) logger.info( - "Approximate size of the files that need to be downloaded: %s bytes", - total_size_bytes, + "Initial size of the directory: %s", + IDCClient._format_size(initial_size_bytes, size_in_bytes=True), + ) + logger.info( + "Approximate size of the files that need to be downloaded: %s", + IDCClient._format_size(size_MB), ) pbar = tqdm( - total=total_size_bytes, + total=total_size_to_be_downloaded_bytes, unit="B", unit_scale=True, desc="Downloading data", ) while True: + time.sleep(0.5) downloaded_bytes = 0 for directory in list_of_directories: downloaded_bytes += IDCClient._get_dir_sum_file_size(directory) downloaded_bytes -= initial_size_bytes pbar.n = min( - downloaded_bytes, total_size_bytes + downloaded_bytes, total_size_to_be_downloaded_bytes ) # Prevent the progress bar from exceeding 100% pbar.refresh() - if process.poll() is not None: break - - time.sleep(0.5) - # Wait for the process to finish _, stderr = process.communicate() pbar.close() @@ -1250,12 +1276,12 @@ def _s5cmd_run( ) existing_data_size = round(total_size - sync_size, 2) logger.info( - f"""Requested total download size is {total_size} MB, -however at least {existing_data_size} MB is already present, -so downloading only remaining upto {sync_size} MB -Please note that disk sizes are calculated at series level, -so if individual files are missing, displayed progress bar may -not be accurate.""" + f"Requested total download size is {total_size} MB, \ + however at least {existing_data_size} MB is already present,\ + so downloading only remaining up to {sync_size} MB\n\ + Please note that disk sizes are calculated at series level, \ + so if individual files are missing, displayed progress bar may\ + not be accurate." ) self._track_download_progress( sync_size, @@ -1333,7 +1359,11 @@ def _s5cmd_run( logger.info("Successfully downloaded files to %s", str(downloadDir)) @staticmethod - def _format_size(size_MB): + def _format_size(size, size_in_bytes: bool = False): + if size_in_bytes: + size_MB = size / (10**6) + else: + size_MB = size size_GB = size_MB / 1000 size_TB = size_GB / 1000 @@ -1341,7 +1371,9 @@ def _format_size(size_MB): return f"{round(size_TB, 2)} TB" if size_GB >= 1: return f"{round(size_GB, 2)} GB" - return f"{round(size_MB, 2)} MB" + if size_MB >= 1: + return f"{round(size_MB, 2)} MB" + return f"{round(size, 2)} bytes" def download_from_manifest( self, @@ -1480,6 +1512,7 @@ def citations_from_selection( patientId=patientId, studyInstanceUID=studyInstanceUID, seriesInstanceUID=seriesInstanceUID, + sopInstanceUID=None, crdc_series_uuid=None, ) @@ -1531,6 +1564,7 @@ def download_from_selection( patientId=None, studyInstanceUID=None, seriesInstanceUID=None, + sopInstanceUID=None, crdc_series_uuid=None, quiet=True, show_progress_bar=True, @@ -1547,6 +1581,7 @@ def download_from_selection( patientId: string or list of strings containing the values of PatientID to filter by studyInstanceUID: string or list of strings containing the values of DICOM StudyInstanceUID to filter by seriesInstanceUID: string or list of strings containing the values of DICOM SeriesInstanceUID to filter by + sopInstanceUID: string or list of strings containing the values of DICOM SOPInstanceUID to filter by crdc_series_uuid: string or list of strings containing the values of crdc_series_uuid to filter by quiet (bool): If True, suppresses the output of the subprocess. Defaults to True show_progress_bar (bool): If True, tracks the progress of download @@ -1557,7 +1592,25 @@ def download_from_selection( downloadDir = self._check_create_directory(downloadDir) - if crdc_series_uuid is not None: + # If SOPInstanceUID(s) are given, we need to join the main index with the instance-level index + if sopInstanceUID: + if hasattr( + self, "sm_instance_index" + ): # check if instance-level index is installed + download_df = self.sm_instance_index + else: + logger.error( + "Instance-level access not possible because instance-level index not installed." + ) + raise ValueError( + "Instance-level access not possible because instance-level index not installed." + ) + if use_s5cmd_sync: + logger.warning( + "s5cmd sync is not supported for downloading individual files. Disabling sync." + ) + use_s5cmd_sync = False + elif crdc_series_uuid is not None: download_df = pd.concat( [ self.index[ @@ -1595,10 +1648,20 @@ def download_from_selection( patientId=patientId, studyInstanceUID=studyInstanceUID, seriesInstanceUID=seriesInstanceUID, + sopInstanceUID=sopInstanceUID, crdc_series_uuid=crdc_series_uuid, ) - total_size = round(result_df["series_size_MB"].sum(), 2) + if not sopInstanceUID: + total_size = round(result_df["series_size_MB"].sum(), 2) + else: + total_size_bytes = round(result_df["instance_size"].sum(), 2) + logger.info( + "Total size of files to download: " + + self._format_size(total_size_bytes, size_in_bytes=True) + ) + total_size = total_size_bytes / (10**6) + disk_free_space_MB = psutil.disk_usage(downloadDir).free / (1000 * 1000) if disk_free_space_MB < total_size: logger.error("Not enough free space on disk to download the files.") @@ -1610,11 +1673,10 @@ def download_from_selection( ) return - logger.info("Total size of files to download: " + self._format_size(total_size)) logger.info( "Total free space on disk: " + str(psutil.disk_usage(downloadDir).free / (1000 * 1000 * 1000)) - + "GB" + + " GB" ) if dry_run: @@ -1630,7 +1692,30 @@ def download_from_selection( ) else: hierarchy = f"CONCAT('{downloadDir}')" - sql = f""" + + if sopInstanceUID: + sql = f""" + WITH temp as + ( + SELECT + sopInstanceUID + FROM + result_df + ) + SELECT + series_aws_url, + CONCAT(TRIM('*' FROM series_aws_url), crdc_instance_uuid, '.dcm') as instance_aws_url, + REGEXP_EXTRACT(series_aws_url, '(?:.*?\\/){{3}}([^\\/?#]+)', 1) index_crdc_series_uuid, + {hierarchy} as path + FROM + temp + JOIN + sm_instance_index using (sopInstanceUID) + LEFT JOIN + index using (seriesInstanceUID) + """ + else: + sql = f""" WITH temp as ( SELECT @@ -1649,34 +1734,41 @@ def download_from_selection( index using (seriesInstanceUID) """ result_df = self.sql_query(sql) - # Download the files - # make temporary file to store the list of files to download + # Download the files and make temporary file to store the list of files to download + with tempfile.NamedTemporaryFile(mode="w", delete=False) as manifest_file: + # Determine column containing the URL for instance / series-level access + if sopInstanceUID: + if not "instance_aws_url" in result_df: + result_df["instance_aws_url"] = ( + result_df["series_aws_url"].replace("/*", "/") + + result_df["crdc_instance_uuid"] + + ".dcm" + ) + url_column = "instance_aws_url" + else: + url_column = "series_aws_url" + if use_s5cmd_sync and len(os.listdir(downloadDir)) != 0: if dirTemplate is not None: result_df["s5cmd_cmd"] = ( - "sync " - + result_df["series_aws_url"] - + ' "' - + result_df["path"] - + '"' + "sync " + result_df[url_column] + ' "' + result_df["path"] + '"' ) else: result_df["s5cmd_cmd"] = ( - "sync " + result_df["series_aws_url"] + ' "' + downloadDir + '"' + "sync " + result_df[url_column] + ' "' + downloadDir + '"' ) elif dirTemplate is not None: result_df["s5cmd_cmd"] = ( - "cp " + result_df["series_aws_url"] + ' "' + result_df["path"] + '"' + "cp " + result_df[url_column] + ' "' + result_df["path"] + '"' ) else: result_df["s5cmd_cmd"] = ( - "cp " + result_df["series_aws_url"] + ' "' + downloadDir + '"' + "cp " + result_df[url_column] + ' "' + downloadDir + '"' ) # Combine all commands into a single string with newline separators commands = "\n".join(result_df["s5cmd_cmd"]) - manifest_file.write(commands) if dirTemplate is not None: @@ -1688,6 +1780,12 @@ def download_from_selection( Temporary download manifest is generated and is passed to self._s5cmd_run """ ) + if sopInstanceUID: + s5cmd_sync_helper_df = None + else: + s5cmd_sync_helper_df = result_df[ + ["index_crdc_series_uuid", "s5cmd_cmd", "series_size_MB", "path"] + ] self._s5cmd_run( endpoint_to_use=aws_endpoint_url, manifest_file=Path(manifest_file.name), @@ -1698,9 +1796,45 @@ def download_from_selection( use_s5cmd_sync=use_s5cmd_sync, dirTemplate=dirTemplate, list_of_directories=list_of_directories, - s5cmd_sync_helper_df=result_df[ - ["index_crdc_series_uuid", "s5cmd_cmd", "series_size_MB", "path"] - ], + s5cmd_sync_helper_df=s5cmd_sync_helper_df, + ) + + def download_dicom_instance( + self, + sopInstanceUID, + downloadDir, + dry_run=False, + quiet=True, + show_progress_bar=True, + use_s5cmd_sync=False, + dirTemplate=DOWNLOAD_HIERARCHY_DEFAULT, + ) -> None: + """ + Download the files corresponding to the seriesInstanceUID to the specified directory. + + Args: + sopInstanceUID: string or list of strings containing the values of DICOM SOPInstanceUID to filter by + downloadDir: string containing the path to the directory to download the files to + dry_run: calculates the size of the cohort but download does not start + quiet (bool): If True, suppresses the output of the subprocess. Defaults to True. + show_progress_bar (bool): If True, tracks the progress of download + use_s5cmd_sync (bool): If True, will use s5cmd sync operation instead of cp when downloadDirectory is not empty; this can significantly improve the download speed if the content is partially downloaded + dirTemplate (str): Download directory hierarchy template. This variable defines the folder hierarchy for the organizing the downloaded files in downloadDirectory. Defaults to index.DOWNLOAD_HIERARCHY_DEFAULT set to %collection_id/%PatientID/%StudyInstanceUID/%Modality_%SeriesInstanceUID. The template string can be built using a combination of selected metadata attributes (PatientID, collection_id, Modality, StudyInstanceUID, SeriesInstanceUID) that must be prefixed by '%'. The following special characters can be used as separators: '-' (hyphen), '/' (slash for subdirectories), '_' (underscore). When set to None all files will be downloaded to the download directory with no subdirectories. + + Returns: None + + Raises: + TypeError: If sopInstanceUID(s) passed is(are) not a string or list + + """ + self.download_from_selection( + downloadDir, + sopInstanceUID=sopInstanceUID, + dry_run=dry_run, + quiet=quiet, + show_progress_bar=show_progress_bar, + use_s5cmd_sync=use_s5cmd_sync, + dirTemplate=dirTemplate, ) def download_dicom_series( @@ -1869,6 +2003,7 @@ def sql_query(self, sql_query): """ index = self.index + logger.debug("Executing SQL query: " + sql_query) # TODO: find a more elegant way to automate the following: https://www.perplexity.ai/search/write-python-code-that-iterate-XY9ppywbQFSRnOpgbwx_uQ if hasattr(self, "sm_index"): diff --git a/tests/idcindex.py b/tests/idcindex.py index 4d12a71a..66b875b9 100644 --- a/tests/idcindex.py +++ b/tests/idcindex.py @@ -15,6 +15,9 @@ # Run tests using the following command from the root of the repository: # python -m unittest -vv tests/idcindex.py +# +# run specific tests with this: +# pytest ./tests/idcindex.py::TestIDCClient.test_download_dicom_instance logging.basicConfig(level=logging.DEBUG) @@ -186,6 +189,17 @@ def test_download_dicom_series(self): ) self.assertEqual(sum([len(files) for r, d, files in os.walk(temp_dir)]), 3) + def test_download_dicom_instance(self): + i = IDCClient() + i.fetch_index("sm_instance_index") + with tempfile.TemporaryDirectory() as temp_dir: + self.client.download_dicom_instance( + sopInstanceUID="1.3.6.1.4.1.5962.99.1.528744472.1087975700.1641206284312.14.0", + downloadDir=temp_dir, + ) + + self.assertEqual(sum([len(files) for r, d, files in os.walk(temp_dir)]), 1) + def test_download_with_template(self): dirTemplateValues = [ None,