Commit 9bb5c514 authored by Theo Serralta's avatar Theo Serralta

Add faster function calcul_depth_seq

parent be5e1926
......@@ -11,6 +11,8 @@ from pycuda.autoinit import context
import multiprocessing
from multiprocessing import Process, Queue
import time
import subprocess
import pandas as pd
# Options
......@@ -327,10 +329,12 @@ def dico_mappabilite(mappability_file):
and mappability scores as values.
"""
logging.info("Entering dico_mappability")
start_time = time.time()
mappability_dico = {}
logging.info(" In dico_mappability : Open file and create the dico")
start_time_1 = time.time()
with open(mappability_file, "r") as f:
for line in f:
......@@ -348,19 +352,25 @@ def dico_mappabilite(mappability_file):
mappability_dico[chromosome].append((start_pos, end_pos, score))
logging.info(f"In dico_mappability : Leaving file and create the dico")
end_time_1 = time.time()
elapsed_time_1 = end_time_1 - start_time_1
logging.info(f"In dico_mappability : Leaving file and create the dico (Time taken: {elapsed_time_1:.4f} seconds)")
# Add position 0 for each chromosome
logging.info(" In dico_mappability : Add position 0 for each chromosome")
start_time_2 = time.time()
for chromosome, intervals in mappability_dico.items():
if intervals[0][0] != 0:
mappability_dico[chromosome].insert(0, (0, intervals[0][0], 0))
logging.info(f"In dico_mappability : Ending add position 0 for each chromosome")
end_time_2 = time.time()
elapsed_time_2 = end_time_2 - start_time_2
logging.info(f"In dico_mappability : Ending add position 0 for each chromosome (Time taken: {elapsed_time_2:.4f} seconds)")
# Merge intervals with the same score
logging.info(" In dico_mappability : Merge intervals with the same score")
start_time_3 = time.time()
for chromosome, intervals in mappability_dico.items():
merged_intervals = merge_intervals(intervals)
......@@ -368,8 +378,13 @@ def dico_mappabilite(mappability_file):
start: score for start, _, score in merged_intervals
}
logging.info(f"In dico_mappability : Ending merge intervals with the same score")
logging.info(f"Leaving dico_mappability")
end_time_3 = time.time()
elapsed_time_3 = end_time_3 - start_time_3
logging.info(f"In dico_mappability : Ending merge intervals with the same score (Time taken: {elapsed_time_2:.4f} seconds)")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving dico_mappability (Time taken: {elapsed_time:.4f} seconds)")
return mappability_dico
......@@ -396,6 +411,7 @@ def calcul_mappability(seq_length, mappability, chr):
An array of mappability scores for the given sequence length.
"""
logging.info(f"Entering calcul_mappability for {chr}")
start_time = time.time()
map_data = np.zeros(seq_length, dtype=np.float32)
sorted_keys = sorted(mappability[chr].keys())
......@@ -404,6 +420,7 @@ def calcul_mappability(seq_length, mappability, chr):
prev_mappability = 0
logging.info(f"In calcul_mappability : Entering for bound in sorted_keys for {chr}")
start_time_1 = time.time()
for bound in sorted_keys:
for i in range(prev_bound, min(seq_length, bound)):
......@@ -411,17 +428,25 @@ def calcul_mappability(seq_length, mappability, chr):
prev_bound = bound
prev_mappability = mappability[chr][bound]
logging.info(f"In calcul_mappability : Leaving for bound in sorted_keys for {chr}")
end_time_1 = time.time()
elapsed_time_1 = end_time_1 - start_time_1
logging.info(f"In calcul_mappability : Leaving for bound in sorted_keys for {chr} (Time taken: {elapsed_time_1:.4f} seconds)")
# Fill remaining positions if sequence length exceeds last bound
logging.info(f"In calcul_mappability : Entering for i in range(prev_bound, seq_length) for {chr}")
start_time_2 = time.time()
for i in range(prev_bound, seq_length):
map_data[i] = prev_mappability
logging.info(f"In calcul_mappability : Leaving for i in range(prev_bound, seq_length) for {chr}")
end_time_2 = time.time()
elapsed_time_2 = end_time_2 - start_time_2
logging.info(f"In calcul_mappability : Leaving for i in range(prev_bound, seq_length) for {chr} (Time taken: {elapsed_time_2:.4f} seconds)")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving dico_mappability for {chr} (Time taken: {elapsed_time:.4f} seconds)")
logging.info(f"Leaving dico_mappability for {chr}")
return map_data
......@@ -442,6 +467,7 @@ def parse_fasta(gc_file):
A dictionary where keys are sequence headers and values are the corresponding sequences.
"""
logging.info("Entering parse_fasta")
start_time = time.time()
sequences = {}
with open(gc_file, "r") as f:
......@@ -452,7 +478,9 @@ def parse_fasta(gc_file):
sequence = "".join(lines[1:])
sequences[header] = sequence
logging.info(f"Leaving parse_fasta")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving parse_fasta (Time taken: {elapsed_time:.4f} seconds)")
return sequences
......@@ -478,12 +506,15 @@ def calcul_gc_content(seq_length, chr, seq):
An array of bytes ('S' dtype) representing the GC content for the given sequence length.
"""
logging.info(f"Entering calcul_gc_content for {chr}")
start_time = time.time()
gc_data = np.zeros(seq_length, dtype="S")
for i in range(len(seq[chr])):
gc_data[i] = seq[chr][i]
logging.info(f"Leaving calcul_gc_content for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving calcul_gc_content for {chr} (Time taken: {elapsed_time:.4f} seconds)")
return gc_data
......@@ -509,6 +540,7 @@ def calcul_depth_seq(seq_length, bamfile_path, chr):
An array of integers representing the sequencing depth for the given sequence length.
"""
logging.info(f"Entering calcul_depth_seq for {chr}")
start_time = time.time()
depth_data = np.zeros(seq_length, dtype=np.int32)
for pileupcolumn in bamfile_path.pileup(reference = chr):
......@@ -519,7 +551,10 @@ def calcul_depth_seq(seq_length, bamfile_path, chr):
#depth_data = bamfile_path.pileup().nsegments
logging.info(f"Leaving calcul_depth_seq for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving calcul_depth_seq for {chr} (Time taken: {elapsed_time:.4f} seconds)")
return depth_data
......@@ -545,6 +580,7 @@ def calcul_depth_seq_copy(seq_length, bamfile_path, chr):
An array of integers representing the sequencing depth for the given sequence length.
"""
logging.info(f"Entering calcul_depth_seq for {chr}")
start_time = time.time()
depth_data = np.zeros(seq_length, dtype=np.int32)
......@@ -568,7 +604,9 @@ def calcul_depth_seq_copy(seq_length, bamfile_path, chr):
# depth_data = np.array(bamfile_path.pileup().nsegments)
logging.info(f"Leaving calcul_depth_seq for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving calcul_depth_seq for {chr} (Time taken: {elapsed_time:.4f} seconds)")
return depth_data
......@@ -593,17 +631,73 @@ def calcul_depth_seq_count(seq_length, bamfile_path, chr):
numpy.ndarray
An array of integers representing the sequencing depth for the given sequence length.
"""
logging.info(f"Entering calcul_depth_seq for {chr}")
logging.info(f"Entering calcul_depth_seq_count for {chr}")
start_time = time.time()
depth_data_count = np.zeros(seq_length, dtype=np.int32)
for i in range(seq_length):
depth_data_count[i] = bamfile_path.count(contig = chr, start = i, stop = i+1)
# Use count_coverage to get coverage depth
depth_data_count = bamfile_path.count_coverage(contig = chr)
type(depth_data_count)
#print(depth_data_count[:10056])
logging.info(f"Leaving calcul_depth_seq for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving calcul_depth_seq for {chr} (Time taken: {elapsed_time:.4f} seconds)")
return depth_data_count
def calcul_depth_seq_samtools(seq_length, bamfile_path, chr, num_threads=12):
"""
Calculate the sequencing depth for a given chromosome using a parallelized bash script.
Parameters
----------
seq_length : int
The length of the sequence.
bamfile_path : str
The path to the BAM file.
chr : str
The chromosome for which the depth is calculated.
num_threads : int
The number of threads to use for parallel processing.
Returns
-------
numpy.ndarray
An array of integers representing the sequencing depth for the given sequence length.
"""
logging.info(f"Entering calcul_depth_seq for {chr}")
start_time = time.time()
#Define the output file for depth results
output_file_1 = f"/work/gad/shared/analyse/test/cnvGPU/test_scalability/tmp_depth_{chr}.out"
# Run the parallelized bash script to calculate depth
p = subprocess.Popen(["/work/gad/shared/analyse/test/cnvGPU/test_scalability/samtools_test_by_chr_multithreading_log.sh %s %s %s %s" % (bamfile_path, chr, output_file_1, num_threads)], shell = True, executable = "/bin/bash")
p.communicate()
# Create the numpy array for depth data
depth_data = np.zeros(seq_length, dtype=np.int32)
# Read the output file and fill the numpy array using pandas for efficiency
try:
df = pd.read_csv(output_file_1, delim_whitespace=True, header=None, names=['chr', 'pos', 'depth'])
df['pos'] -= 1 # Convert to 0-based index
df = df[df['pos'] < seq_length] # Ensure positions are within sequence length
depth_data[df['pos']] = df['depth']
except Exception as e:
logging.error(f"Error reading depth file: {e}")
# Clean up the output file
subprocess.run(['rm', output_file_1])
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving calcul_depth_seq for {chr} (Time taken: {elapsed_time:.4f} seconds)")
return depth_data
def calcul_med_total(depth_correction_results, chr):
"""
Calculate the median of non-zero depth correction results.
......@@ -621,6 +715,7 @@ def calcul_med_total(depth_correction_results, chr):
The median of the non-zero depth correction values, or 0 if no non-zero values are present.
"""
logging.info(f"Entering calcul_med_total for {chr}")
start_time = time.time()
depth_correction_results = np.array(depth_correction_results)
# Filter results to remove zero values
......@@ -630,7 +725,9 @@ def calcul_med_total(depth_correction_results, chr):
sys.stderr.write("Chromosome : %s, med_chr : %s\n" % (chr, m))
logging.info(f"Leaving calcul_med_total for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving calcul_med_total for {chr} (Time taken: {elapsed_time:.4f} seconds)")
return m
......@@ -654,6 +751,7 @@ def calcul_med_same_gc(gc_results, depth_correction_results, chr):
A dictionary where keys are unique GC content values and values are the median depth correction for those GC values.
"""
logging.info(f"Entering calcul_med_same_gc for {chr}")
start_time = time.time()
mGC = []
depth_correction_results_array = np.array(depth_correction_results)
......@@ -679,7 +777,9 @@ def calcul_med_same_gc(gc_results, depth_correction_results, chr):
gc_to_median = dict(mGC)
logging.info(f"Leaving calcul_med_same_gc for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving calcul_med_same_gc for {chr} (Time taken: {elapsed_time:.4f} seconds)")
return gc_to_median
......@@ -701,6 +801,7 @@ def calcul_moy_totale(normalize_depth_results, chr):
The mean of the non-zero normalized depth values, or 0 if no non-zero values are present.
"""
logging.info(f"Entering calcul_moy_totale for {chr}")
start_time = time.time()
normalize_depth_results = np.array(normalize_depth_results)
# Filter results to remove zero values
......@@ -710,7 +811,9 @@ def calcul_moy_totale(normalize_depth_results, chr):
sys.stderr.write("Chromosome : %s, mean_chr : %s\n" % (chr, mean_chr))
logging.info(f"Leaving calcul_moy_totale for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving calcul_moy_totale for {chr} (Time taken: {elapsed_time:.4f} seconds)")
return mean_chr
......@@ -733,6 +836,7 @@ def calcul_std(normalize_depth_results, chr):
The standard deviation of the non-zero normalized depth values, or 0 if no non-zero values are present.
"""
logging.info(f"Entering calcul_std for {chr}")
start_time = time.time()
normalize_depth_results = np.array(normalize_depth_results)
# Filter results to remove zero values
......@@ -743,7 +847,9 @@ def calcul_std(normalize_depth_results, chr):
sys.stderr.write("Chromosome : %s, std_chr : %s\n" % (chr, std_chr))
logging.info(f"Leaving calcul_std for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving calcul_std for {chr} (Time taken: {elapsed_time:.4f} seconds)")
return std_chr
......@@ -765,6 +871,7 @@ def compute_mean_std_med(ratio_par_window_results, chr):
A tuple containing the mean, standard deviation, and median of the filtered ratio values.
"""
logging.info(f"Entering compute_mean_std_med for {chr}")
start_time = time.time()
# Filter results to remove zero and -1 values
ratio_par_window_results = np.array(ratio_par_window_results)
......@@ -786,7 +893,10 @@ def compute_mean_std_med(ratio_par_window_results, chr):
sys.stderr.write("Chromosome : %s, mean_ratio : %s, std_ratio : %s, med_ratio : %s\n" % (chr, mean_ratio, std_ratio, med_ratio))
logging.info(f"Leaving compute_mean_std_med for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving compute_mean_std_med for {chr} (Time taken: {elapsed_time:.4f} seconds)")
# Return results
return mean_ratio, std_ratio, med_ratio
......@@ -811,19 +921,15 @@ def cn_level(x, chr):
- 2 if 0.75 < x < 1 or round(x) == 1
- round(x) if round(x) > 1
"""
if x < 1:
if x <= 0.75:
if x >= 0.1:
return 1
else:
if x < 0.1:
return 0
else:
elif x <= 0.75:
return 1
elif x < 1:
return 2
else:
if round(x) == 1:
return 2
if round(x) > 1:
return round(x)
rounded_x = round(x)
return 2 if rounded_x == 1 else rounded_x
def get_sample_name(bamfile_path):
......@@ -844,13 +950,16 @@ def get_sample_name(bamfile_path):
"""
logging.info(f"Entering get_sample_name")
start_time = time.time()
with pysam.AlignmentFile(bamfile_path, "rb") as bamfile:
for read_group in bamfile.header.get("RG", []):
if "SM" in read_group:
return read_group["SM"]
logging.info(f"Leaving get_sample_name")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving get_sample_name (Time taken: {elapsed_time:.4f} seconds)")
return "UnknownSample"
......@@ -877,6 +986,7 @@ def create_signal(signal, chr, z_score_results, step_size):
The function modifies the signal dictionary in place.
"""
logging.info(f"Entering create_signal for {chr} (include copy_number_level)")
start_time = time.time()
if chr not in signal:
signal[chr] = {}
......@@ -884,7 +994,10 @@ def create_signal(signal, chr, z_score_results, step_size):
pos = (i * step_size) + 1
signal[chr][pos] = z_score_results[i]
logging.info(f"Leaving create_signal for {chr} (include copy_number_level)")
#sys.stderr.write("\t signal %s\n" % signal[chr])
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving create_signal for {chr} (include copy_number_level) (Time taken: {elapsed_time:.4f} seconds)")
def detect_events(
z_score_results,
......@@ -920,27 +1033,28 @@ def detect_events(
The function modifies the events dictionary in place.
"""
logging.info(f"Entering detect_events for {chr}")
start_time = time.time()
c = 0
for i, z_score in enumerate(z_score_results):
if (z_score <= -zscore_threshold) or (z_score >= zscore_threshold):
if chr not in events:
events[chr] = {}
if med_ratio == 0:
c = 0
else:
for i, z_score in enumerate(z_score_results):
if abs(z_score) >= zscore_threshold:
if med_ratio != 0:
c = cn_level(float(ratio_par_mean_ratio_results[i]), chr)
if z_score >= zscore_threshold:
if z_score >= 0:
c = 3
elif c == 2 and z_score <= -zscore_threshold:
elif c == 2 and z_score < 0:
c = 1
pos_start = (i * step_size) + 1
pos_end = pos_start + window_size
events[chr][(pos_start, pos_end)] = c
#sys.stderr.write("\t events %s\n" % events)
logging.info(f"Leaving detect_events for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving detect_events for {chr} (Time taken: {elapsed_time:.4f} seconds)")
def segmentation(events, segment, chr):
"""
......@@ -961,15 +1075,17 @@ def segmentation(events, segment, chr):
The function modifies the segment dictionary in place.
"""
logging.info(f"Entering segmentation for {chr}")
start_time = time.time()
for k in events.keys():
sys.stderr.write("\tfor chromosome %s\n" % k)
starts, oldPos, oldLevel = 0, 0, -1
starts, oldPos, oldLevel = 1, 1, -1
for p in sorted(events[k].keys()):
level = events[k][p]
#sys.stderr.write("\t p %s\n" % p)
# new coordinates
if p[0] > (oldPos + window_size):
if (starts != 0) and (starts != p[0]):
if (starts != 1) and (starts != p[0]):
if k not in segment:
segment[k] = {}
index = str(starts) + "-" + str(oldPos)
......@@ -997,7 +1113,9 @@ def segmentation(events, segment, chr):
oldLevel = level
oldPos, oldLevel = p[0], level
logging.info(f"Leaving segmentation for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving segmentation for {chr} (Time taken: {elapsed_time:.4f} seconds)")
def display_results_vcf(sample, segment, signal, lengthFilter, output_file, chr):
"""
......@@ -1025,6 +1143,7 @@ def display_results_vcf(sample, segment, signal, lengthFilter, output_file, chr)
"""
global header_written
logging.info(f"Entering display_results_vcf for {chr}")
start_time = time.time()
with open(output_file, "a") as f:
if not header_written:
......@@ -1049,11 +1168,14 @@ def display_results_vcf(sample, segment, signal, lengthFilter, output_file, chr)
header_written = True
for k in segment.keys():
#sys.stderr.write("\tfor chromosome %s\n" % k)
for elt in sorted(segment[k].keys()):
#sys.stderr.write("\tfor elt %s\n" % elt)
if segment[k][elt]["start"] == segment[k][elt]["end"]:
continue
if (segment[k][elt]["end"] - segment[k][elt]["start"]) < lengthFilter:
continue
#sys.stderr.write("\t [segment[k][elt] %s\n" % [segment[k][elt]])
if int(signal[k][segment[k][elt]["start"]]) < 0:
f.write(
"%s\t%s\t.\tN\t<DEL>\t.\t.\tSVTYPE=DEL;END=%s;VALUE=%s\tGT:GQ\t./.:0\n"
......@@ -1075,7 +1197,9 @@ def display_results_vcf(sample, segment, signal, lengthFilter, output_file, chr)
)
)
logging.info(f"Leaving display_results_vcf for {chr}")
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(f"Leaving display_results_vcf for {chr} (Time taken: {elapsed_time:.4f} seconds)")
def main_calcul(
......@@ -1130,9 +1254,10 @@ def main_calcul(
# Appeler les différentes fonctions
map_data = calcul_mappability(seq_length, mappability, chr)
gc_data = calcul_gc_content(seq_length, chr, seq)
depth_data = calcul_depth_seq(seq_length, bamfile_path, chr)
#depth_data = calcul_depth_seq(seq_length, bamfile_path, chr)
#depth_data_count = calcul_depth_seq_count(seq_length, bamfile_path, chr)
#depth_data = calcul_depth_seq_copy(seq_length, bamfile_path, chr)
depth_data = calcul_depth_seq_samtools(seq_length, bamfile_path, chr)
# Transférer le tableau NumPy vers CUDA
d_depth_data = cuda.mem_alloc(depth_data.nbytes)
......@@ -1353,8 +1478,8 @@ def main_calcul(
context.synchronize()
# Copier les résultats depuis le périphérique CUDA vers l'hôte
# cuda.memcpy_dtoh(dest, src)
cuda.memcpy_dtoh(depth_results, d_depth_results)
cuda.memcpy_dtoh(depth_results, d_depth_results) # cuda.memcpy_dtoh(dest, src)
sys.stderr.write(
"\t Copie les resultats du GPU (d_depth_results) vers le CPU (depth_results)\n"
)
......@@ -1521,16 +1646,23 @@ gc_file = "/work/gad/shared/pipeline/grch38/index/grch38_essential.fa"
mappability_file = "/work/gad/shared/pipeline/grch38/cnv/k100.Umap.MultiTrackMappability.bedgraph"
seq = parse_fasta(gc_file)
mappability = dico_mappabilite(mappability_file)
#bamfile_path_2 = "/work/gad/shared/analyse/test/cnvGPU/test_scalability/dijen1000.bam"
if num_chr == "ALL" :
with pysam.AlignmentFile(bamfile_path, "rb") as bamfile_handle:
for i, seq_length in enumerate(bamfile_handle.lengths):
chr = bamfile_handle.references[i]
#Exclude chrM
if chr == "chrM":
continue
sys.stderr.write("Chromosome : %s, seq length : %s\n" % (chr, seq_length))
# Appeler la fonction de calcul de la profondeur moyenne pour ce chromosome
main_calcul(
bamfile_handle,
bamfile_path,
chr,
seq_length,
window_size,
......@@ -1557,7 +1689,7 @@ else :
# Appeler la fonction de calcul de la profondeur moyenne pour ce chromosome
main_calcul(
bamfile_handle,
bamfile_path,
chr,
seq_length,
window_size,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment