#!/usr/bin/env python3
import os
import tarfile
from typing import Any
import warnings
import itertools
import yaml
from metrics.workflows import run_parse_simulations, run_calculate_analysis
warnings.simplefilter("ignore")
[docs]def untar_files(folder_path: str) -> None:
"""
Untar all files in a specified folder path.
Parameters
----------
folder_path : str
Path to the folder
"""
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)
if file_name.endswith(".tar.xz"):
with tarfile.open(file_path, "r:xz") as tar:
tar.extractall(folder_path)
[docs]def recursive_file_delete(folder_path: str) -> None:
"""
Recursively delete all files in a specified folder path.
Parameters
----------
folder_path : str
Path to the folder
"""
for root, _, files in os.walk(folder_path):
for file in files:
if not file.endswith(".tar.xz"):
file_path = os.path.join(root, file)
os.remove(file_path)
[docs]def make_list_based_on_resolution(resolution: Any, range_param: list, is_seed: bool) -> list:
"""
Create list based on specified resolution parameter, if applicable.
Parameters
----------
resolution: Union[float, str, int]
range_param: list
seed: bool
Returns
-------
resolution_list : list
List based on resolution parameters.
"""
if resolution and resolution.lower() not in ("none", "None"):
if is_seed:
resolution_list = [
str(x * resolution).zfill(2)
for x in range(range_param[0], int(range_param[1] / resolution) + 1)
]
else:
resolution_list = [
x * resolution for x in range(range_param[0], int(range_param[1] / resolution) + 1)
]
else:
if is_seed:
resolution_list = [str(x).zfill(2) for x in range_param]
else:
resolution_list = range_param
return resolution_list
[docs]def main() -> None:
"""
TODO
"""
filename = "./src/metrics/config.yaml"
with open(filename, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
print(config)
experiment_section = config["experiment"]
database = experiment_section["database"]
time_range = experiment_section["timepoints"]
time_resolution = experiment_section["timepoint_resolution"]
observation_time_range = experiment_section["observation_timepoints"]
observation_time_resolution = experiment_section["observation_timepoint_resolution"]
seed_range = experiment_section["seeds"]
seed_resolution = experiment_section["seed_resolution"]
seeds = make_list_based_on_resolution(seed_resolution, seed_range, True)
timepoints = make_list_based_on_resolution(time_resolution, time_range, False)
observation_timepoints = make_list_based_on_resolution(
observation_time_resolution, observation_time_range, False
)
simulation_path = experiment_section["simulation_path"]
tissue_heterogeneity = experiment_section["tissue_heterogeneity"]
cancer_heterogeneity = experiment_section["cancer_heterogeneity"]
contexts = experiment_section["contexts"]
populations = experiment_section["populations"]
for context, population, cancer_heterogeneity_level in itertools.product(
contexts, populations, cancer_heterogeneity
):
print(cancer_heterogeneity_level)
if context == "C":
simulation = f"{simulation_path}{context}_{population}_{cancer_heterogeneity_level}"
elif context == "CH":
simulation = f"{simulation_path}{context}_{population}_{cancer_heterogeneity_level}_{tissue_heterogeneity}"
else:
raise ValueError("Invalid context.")
database_path = f"{database}{population}_{context}_{cancer_heterogeneity_level}.db"
untar_files(simulation)
untar_files(f"{simulation}.PARAM")
if config["parse"]:
for seed in seeds:
run_parse_simulations(database_path, simulation, seed, timepoints)
if config["analyze"]:
analysis_params = config["analysis"]
features = analysis_params["features"]
samples = analysis_params["samples"]
comparisons = analysis_params["comparisons"]
for seed in seeds:
run_calculate_analysis(
database_path,
simulation,
seed,
features,
timepoints,
observation_timepoints,
samples,
comparisons,
)
recursive_file_delete(simulation)
recursive_file_delete(f"{simulation}.PARAM")
if __name__ == "__main__":
main()