diff --git a/CNV/test_gpu_mean_depth.py b/CNV/test_gpu_mean_depth.py index f58aac1a5e62fa0613931dc9f36e22ff602eb625..2b27eb332f79694d462ba49b3139434cee5e2a48 100644 --- a/CNV/test_gpu_mean_depth.py +++ b/CNV/test_gpu_mean_depth.py @@ -11,6 +11,8 @@ from pycuda.autoinit import context import multiprocessing from multiprocessing import Process, Queue import time +import subprocess +import pandas as pd # Options @@ -327,10 +329,12 @@ def dico_mappabilite(mappability_file): and mappability scores as values. """ logging.info("Entering dico_mappability") + start_time = time.time() mappability_dico = {} logging.info(" In dico_mappability : Open file and create the dico") + start_time_1 = time.time() with open(mappability_file, "r") as f: for line in f: @@ -348,19 +352,25 @@ def dico_mappabilite(mappability_file): mappability_dico[chromosome].append((start_pos, end_pos, score)) - logging.info(f"In dico_mappability : Leaving file and create the dico") + end_time_1 = time.time() + elapsed_time_1 = end_time_1 - start_time_1 + logging.info(f"In dico_mappability : Leaving file and create the dico (Time taken: {elapsed_time_1:.4f} seconds)") # Add position 0 for each chromosome logging.info(" In dico_mappability : Add position 0 for each chromosome") + start_time_2 = time.time() for chromosome, intervals in mappability_dico.items(): if intervals[0][0] != 0: mappability_dico[chromosome].insert(0, (0, intervals[0][0], 0)) - logging.info(f"In dico_mappability : Ending add position 0 for each chromosome") + end_time_2 = time.time() + elapsed_time_2 = end_time_2 - start_time_2 + logging.info(f"In dico_mappability : Ending add position 0 for each chromosome (Time taken: {elapsed_time_2:.4f} seconds)") # Merge intervals with the same score logging.info(" In dico_mappability : Merge intervals with the same score") + start_time_3 = time.time() for chromosome, intervals in mappability_dico.items(): merged_intervals = merge_intervals(intervals) @@ -368,8 +378,13 @@ def dico_mappabilite(mappability_file): start: score for start, _, score in merged_intervals } - logging.info(f"In dico_mappability : Ending merge intervals with the same score") - logging.info(f"Leaving dico_mappability") + end_time_3 = time.time() + elapsed_time_3 = end_time_3 - start_time_3 + logging.info(f"In dico_mappability : Ending merge intervals with the same score (Time taken: {elapsed_time_2:.4f} seconds)") + + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving dico_mappability (Time taken: {elapsed_time:.4f} seconds)") return mappability_dico @@ -396,6 +411,7 @@ def calcul_mappability(seq_length, mappability, chr): An array of mappability scores for the given sequence length. """ logging.info(f"Entering calcul_mappability for {chr}") + start_time = time.time() map_data = np.zeros(seq_length, dtype=np.float32) sorted_keys = sorted(mappability[chr].keys()) @@ -404,6 +420,7 @@ def calcul_mappability(seq_length, mappability, chr): prev_mappability = 0 logging.info(f"In calcul_mappability : Entering for bound in sorted_keys for {chr}") + start_time_1 = time.time() for bound in sorted_keys: for i in range(prev_bound, min(seq_length, bound)): @@ -411,17 +428,25 @@ def calcul_mappability(seq_length, mappability, chr): prev_bound = bound prev_mappability = mappability[chr][bound] - logging.info(f"In calcul_mappability : Leaving for bound in sorted_keys for {chr}") + end_time_1 = time.time() + elapsed_time_1 = end_time_1 - start_time_1 + logging.info(f"In calcul_mappability : Leaving for bound in sorted_keys for {chr} (Time taken: {elapsed_time_1:.4f} seconds)") # Fill remaining positions if sequence length exceeds last bound logging.info(f"In calcul_mappability : Entering for i in range(prev_bound, seq_length) for {chr}") + start_time_2 = time.time() for i in range(prev_bound, seq_length): map_data[i] = prev_mappability - logging.info(f"In calcul_mappability : Leaving for i in range(prev_bound, seq_length) for {chr}") + end_time_2 = time.time() + elapsed_time_2 = end_time_2 - start_time_2 + logging.info(f"In calcul_mappability : Leaving for i in range(prev_bound, seq_length) for {chr} (Time taken: {elapsed_time_2:.4f} seconds)") + + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving dico_mappability for {chr} (Time taken: {elapsed_time:.4f} seconds)") - logging.info(f"Leaving dico_mappability for {chr}") return map_data @@ -442,6 +467,7 @@ def parse_fasta(gc_file): A dictionary where keys are sequence headers and values are the corresponding sequences. """ logging.info("Entering parse_fasta") + start_time = time.time() sequences = {} with open(gc_file, "r") as f: @@ -452,7 +478,9 @@ def parse_fasta(gc_file): sequence = "".join(lines[1:]) sequences[header] = sequence - logging.info(f"Leaving parse_fasta") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving parse_fasta (Time taken: {elapsed_time:.4f} seconds)") return sequences @@ -478,12 +506,15 @@ def calcul_gc_content(seq_length, chr, seq): An array of bytes ('S' dtype) representing the GC content for the given sequence length. """ logging.info(f"Entering calcul_gc_content for {chr}") + start_time = time.time() gc_data = np.zeros(seq_length, dtype="S") for i in range(len(seq[chr])): gc_data[i] = seq[chr][i] - logging.info(f"Leaving calcul_gc_content for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving calcul_gc_content for {chr} (Time taken: {elapsed_time:.4f} seconds)") return gc_data @@ -509,6 +540,7 @@ def calcul_depth_seq(seq_length, bamfile_path, chr): An array of integers representing the sequencing depth for the given sequence length. """ logging.info(f"Entering calcul_depth_seq for {chr}") + start_time = time.time() depth_data = np.zeros(seq_length, dtype=np.int32) for pileupcolumn in bamfile_path.pileup(reference = chr): @@ -519,7 +551,10 @@ def calcul_depth_seq(seq_length, bamfile_path, chr): #depth_data = bamfile_path.pileup().nsegments - logging.info(f"Leaving calcul_depth_seq for {chr}") + + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving calcul_depth_seq for {chr} (Time taken: {elapsed_time:.4f} seconds)") return depth_data @@ -545,6 +580,7 @@ def calcul_depth_seq_copy(seq_length, bamfile_path, chr): An array of integers representing the sequencing depth for the given sequence length. """ logging.info(f"Entering calcul_depth_seq for {chr}") + start_time = time.time() depth_data = np.zeros(seq_length, dtype=np.int32) @@ -568,7 +604,9 @@ def calcul_depth_seq_copy(seq_length, bamfile_path, chr): # depth_data = np.array(bamfile_path.pileup().nsegments) - logging.info(f"Leaving calcul_depth_seq for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving calcul_depth_seq for {chr} (Time taken: {elapsed_time:.4f} seconds)") return depth_data @@ -593,16 +631,72 @@ def calcul_depth_seq_count(seq_length, bamfile_path, chr): numpy.ndarray An array of integers representing the sequencing depth for the given sequence length. """ - logging.info(f"Entering calcul_depth_seq for {chr}") + logging.info(f"Entering calcul_depth_seq_count for {chr}") + start_time = time.time() - depth_data_count = np.zeros(seq_length, dtype=np.int32) - for i in range(seq_length): - depth_data_count[i] = bamfile_path.count(contig = chr, start = i, stop = i+1) + # Use count_coverage to get coverage depth + depth_data_count = bamfile_path.count_coverage(contig = chr) + type(depth_data_count) + #print(depth_data_count[:10056]) - logging.info(f"Leaving calcul_depth_seq for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving calcul_depth_seq for {chr} (Time taken: {elapsed_time:.4f} seconds)") return depth_data_count + + +def calcul_depth_seq_samtools(seq_length, bamfile_path, chr, num_threads=12): + """ + Calculate the sequencing depth for a given chromosome using a parallelized bash script. + + Parameters + ---------- + seq_length : int + The length of the sequence. + bamfile_path : str + The path to the BAM file. + chr : str + The chromosome for which the depth is calculated. + num_threads : int + The number of threads to use for parallel processing. + + Returns + ------- + numpy.ndarray + An array of integers representing the sequencing depth for the given sequence length. + """ + logging.info(f"Entering calcul_depth_seq for {chr}") + start_time = time.time() + + #Define the output file for depth results + output_file_1 = f"/work/gad/shared/analyse/test/cnvGPU/test_scalability/tmp_depth_{chr}.out" + + # Run the parallelized bash script to calculate depth + p = subprocess.Popen(["/work/gad/shared/analyse/test/cnvGPU/test_scalability/samtools_test_by_chr_multithreading_log.sh %s %s %s %s" % (bamfile_path, chr, output_file_1, num_threads)], shell = True, executable = "/bin/bash") + + p.communicate() + + # Create the numpy array for depth data + depth_data = np.zeros(seq_length, dtype=np.int32) + + # Read the output file and fill the numpy array using pandas for efficiency + try: + df = pd.read_csv(output_file_1, delim_whitespace=True, header=None, names=['chr', 'pos', 'depth']) + df['pos'] -= 1 # Convert to 0-based index + df = df[df['pos'] < seq_length] # Ensure positions are within sequence length + depth_data[df['pos']] = df['depth'] + except Exception as e: + logging.error(f"Error reading depth file: {e}") + + # Clean up the output file + subprocess.run(['rm', output_file_1]) + + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving calcul_depth_seq for {chr} (Time taken: {elapsed_time:.4f} seconds)") + return depth_data def calcul_med_total(depth_correction_results, chr): """ @@ -621,7 +715,8 @@ def calcul_med_total(depth_correction_results, chr): The median of the non-zero depth correction values, or 0 if no non-zero values are present. """ logging.info(f"Entering calcul_med_total for {chr}") - + start_time = time.time() + depth_correction_results = np.array(depth_correction_results) # Filter results to remove zero values non_zero_results = depth_correction_results[depth_correction_results != 0] @@ -630,7 +725,9 @@ def calcul_med_total(depth_correction_results, chr): sys.stderr.write("Chromosome : %s, med_chr : %s\n" % (chr, m)) - logging.info(f"Leaving calcul_med_total for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving calcul_med_total for {chr} (Time taken: {elapsed_time:.4f} seconds)") return m @@ -654,6 +751,7 @@ def calcul_med_same_gc(gc_results, depth_correction_results, chr): A dictionary where keys are unique GC content values and values are the median depth correction for those GC values. """ logging.info(f"Entering calcul_med_same_gc for {chr}") + start_time = time.time() mGC = [] depth_correction_results_array = np.array(depth_correction_results) @@ -679,7 +777,9 @@ def calcul_med_same_gc(gc_results, depth_correction_results, chr): gc_to_median = dict(mGC) - logging.info(f"Leaving calcul_med_same_gc for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving calcul_med_same_gc for {chr} (Time taken: {elapsed_time:.4f} seconds)") return gc_to_median @@ -701,6 +801,7 @@ def calcul_moy_totale(normalize_depth_results, chr): The mean of the non-zero normalized depth values, or 0 if no non-zero values are present. """ logging.info(f"Entering calcul_moy_totale for {chr}") + start_time = time.time() normalize_depth_results = np.array(normalize_depth_results) # Filter results to remove zero values @@ -710,7 +811,9 @@ def calcul_moy_totale(normalize_depth_results, chr): sys.stderr.write("Chromosome : %s, mean_chr : %s\n" % (chr, mean_chr)) - logging.info(f"Leaving calcul_moy_totale for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving calcul_moy_totale for {chr} (Time taken: {elapsed_time:.4f} seconds)") return mean_chr @@ -733,6 +836,7 @@ def calcul_std(normalize_depth_results, chr): The standard deviation of the non-zero normalized depth values, or 0 if no non-zero values are present. """ logging.info(f"Entering calcul_std for {chr}") + start_time = time.time() normalize_depth_results = np.array(normalize_depth_results) # Filter results to remove zero values @@ -743,7 +847,9 @@ def calcul_std(normalize_depth_results, chr): sys.stderr.write("Chromosome : %s, std_chr : %s\n" % (chr, std_chr)) - logging.info(f"Leaving calcul_std for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving calcul_std for {chr} (Time taken: {elapsed_time:.4f} seconds)") return std_chr @@ -765,6 +871,7 @@ def compute_mean_std_med(ratio_par_window_results, chr): A tuple containing the mean, standard deviation, and median of the filtered ratio values. """ logging.info(f"Entering compute_mean_std_med for {chr}") + start_time = time.time() # Filter results to remove zero and -1 values ratio_par_window_results = np.array(ratio_par_window_results) @@ -786,7 +893,10 @@ def compute_mean_std_med(ratio_par_window_results, chr): sys.stderr.write("Chromosome : %s, mean_ratio : %s, std_ratio : %s, med_ratio : %s\n" % (chr, mean_ratio, std_ratio, med_ratio)) - logging.info(f"Leaving compute_mean_std_med for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving compute_mean_std_med for {chr} (Time taken: {elapsed_time:.4f} seconds)") + # Return results return mean_ratio, std_ratio, med_ratio @@ -811,19 +921,15 @@ def cn_level(x, chr): - 2 if 0.75 < x < 1 or round(x) == 1 - round(x) if round(x) > 1 """ - if x < 1: - if x <= 0.75: - if x >= 0.1: - return 1 - else: - return 0 - else: - return 2 + if x < 0.1: + return 0 + elif x <= 0.75: + return 1 + elif x < 1: + return 2 else: - if round(x) == 1: - return 2 - if round(x) > 1: - return round(x) + rounded_x = round(x) + return 2 if rounded_x == 1 else rounded_x def get_sample_name(bamfile_path): @@ -844,13 +950,16 @@ def get_sample_name(bamfile_path): """ logging.info(f"Entering get_sample_name") + start_time = time.time() with pysam.AlignmentFile(bamfile_path, "rb") as bamfile: for read_group in bamfile.header.get("RG", []): if "SM" in read_group: return read_group["SM"] - logging.info(f"Leaving get_sample_name") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving get_sample_name (Time taken: {elapsed_time:.4f} seconds)") return "UnknownSample" @@ -877,14 +986,18 @@ def create_signal(signal, chr, z_score_results, step_size): The function modifies the signal dictionary in place. """ logging.info(f"Entering create_signal for {chr} (include copy_number_level)") + start_time = time.time() if chr not in signal: signal[chr] = {} for i in range(len(z_score_results)): pos = (i * step_size) + 1 signal[chr][pos] = z_score_results[i] - - logging.info(f"Leaving create_signal for {chr} (include copy_number_level)") + + #sys.stderr.write("\t signal %s\n" % signal[chr]) + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving create_signal for {chr} (include copy_number_level) (Time taken: {elapsed_time:.4f} seconds)") def detect_events( z_score_results, @@ -920,27 +1033,28 @@ def detect_events( The function modifies the events dictionary in place. """ logging.info(f"Entering detect_events for {chr}") + start_time = time.time() c = 0 + if chr not in events: + events[chr] = {} for i, z_score in enumerate(z_score_results): - if (z_score <= -zscore_threshold) or (z_score >= zscore_threshold): - if chr not in events: - events[chr] = {} - if med_ratio == 0: - c = 0 - else: + if abs(z_score) >= zscore_threshold: + if med_ratio != 0: c = cn_level(float(ratio_par_mean_ratio_results[i]), chr) - - if z_score >= zscore_threshold: + if z_score >= 0: c = 3 - elif c == 2 and z_score <= -zscore_threshold: + elif c == 2 and z_score < 0: c = 1 pos_start = (i * step_size) + 1 pos_end = pos_start + window_size events[chr][(pos_start, pos_end)] = c + #sys.stderr.write("\t events %s\n" % events) - logging.info(f"Leaving detect_events for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving detect_events for {chr} (Time taken: {elapsed_time:.4f} seconds)") def segmentation(events, segment, chr): """ @@ -961,15 +1075,17 @@ def segmentation(events, segment, chr): The function modifies the segment dictionary in place. """ logging.info(f"Entering segmentation for {chr}") + start_time = time.time() for k in events.keys(): sys.stderr.write("\tfor chromosome %s\n" % k) - starts, oldPos, oldLevel = 0, 0, -1 + starts, oldPos, oldLevel = 1, 1, -1 for p in sorted(events[k].keys()): level = events[k][p] + #sys.stderr.write("\t p %s\n" % p) # new coordinates if p[0] > (oldPos + window_size): - if (starts != 0) and (starts != p[0]): + if (starts != 1) and (starts != p[0]): if k not in segment: segment[k] = {} index = str(starts) + "-" + str(oldPos) @@ -997,7 +1113,9 @@ def segmentation(events, segment, chr): oldLevel = level oldPos, oldLevel = p[0], level - logging.info(f"Leaving segmentation for {chr}") + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving segmentation for {chr} (Time taken: {elapsed_time:.4f} seconds)") def display_results_vcf(sample, segment, signal, lengthFilter, output_file, chr): """ @@ -1025,6 +1143,7 @@ def display_results_vcf(sample, segment, signal, lengthFilter, output_file, chr) """ global header_written logging.info(f"Entering display_results_vcf for {chr}") + start_time = time.time() with open(output_file, "a") as f: if not header_written: @@ -1049,11 +1168,14 @@ def display_results_vcf(sample, segment, signal, lengthFilter, output_file, chr) header_written = True for k in segment.keys(): + #sys.stderr.write("\tfor chromosome %s\n" % k) for elt in sorted(segment[k].keys()): + #sys.stderr.write("\tfor elt %s\n" % elt) if segment[k][elt]["start"] == segment[k][elt]["end"]: continue if (segment[k][elt]["end"] - segment[k][elt]["start"]) < lengthFilter: continue + #sys.stderr.write("\t [segment[k][elt] %s\n" % [segment[k][elt]]) if int(signal[k][segment[k][elt]["start"]]) < 0: f.write( "%s\t%s\t.\tN\t<DEL>\t.\t.\tSVTYPE=DEL;END=%s;VALUE=%s\tGT:GQ\t./.:0\n" @@ -1074,8 +1196,10 @@ def display_results_vcf(sample, segment, signal, lengthFilter, output_file, chr) int(segment[k][elt]["cn"]), ) ) - - logging.info(f"Leaving display_results_vcf for {chr}") + + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"Leaving display_results_vcf for {chr} (Time taken: {elapsed_time:.4f} seconds)") def main_calcul( @@ -1130,9 +1254,10 @@ def main_calcul( # Appeler les différentes fonctions map_data = calcul_mappability(seq_length, mappability, chr) gc_data = calcul_gc_content(seq_length, chr, seq) - depth_data = calcul_depth_seq(seq_length, bamfile_path, chr) + #depth_data = calcul_depth_seq(seq_length, bamfile_path, chr) #depth_data_count = calcul_depth_seq_count(seq_length, bamfile_path, chr) #depth_data = calcul_depth_seq_copy(seq_length, bamfile_path, chr) + depth_data = calcul_depth_seq_samtools(seq_length, bamfile_path, chr) # Transférer le tableau NumPy vers CUDA d_depth_data = cuda.mem_alloc(depth_data.nbytes) @@ -1353,8 +1478,8 @@ def main_calcul( context.synchronize() # Copier les résultats depuis le périphérique CUDA vers l'hôte - # cuda.memcpy_dtoh(dest, src) - cuda.memcpy_dtoh(depth_results, d_depth_results) + + cuda.memcpy_dtoh(depth_results, d_depth_results) # cuda.memcpy_dtoh(dest, src) sys.stderr.write( "\t Copie les resultats du GPU (d_depth_results) vers le CPU (depth_results)\n" ) @@ -1521,16 +1646,23 @@ gc_file = "/work/gad/shared/pipeline/grch38/index/grch38_essential.fa" mappability_file = "/work/gad/shared/pipeline/grch38/cnv/k100.Umap.MultiTrackMappability.bedgraph" seq = parse_fasta(gc_file) mappability = dico_mappabilite(mappability_file) +#bamfile_path_2 = "/work/gad/shared/analyse/test/cnvGPU/test_scalability/dijen1000.bam" + if num_chr == "ALL" : with pysam.AlignmentFile(bamfile_path, "rb") as bamfile_handle: for i, seq_length in enumerate(bamfile_handle.lengths): chr = bamfile_handle.references[i] + + #Exclude chrM + if chr == "chrM": + continue + sys.stderr.write("Chromosome : %s, seq length : %s\n" % (chr, seq_length)) # Appeler la fonction de calcul de la profondeur moyenne pour ce chromosome main_calcul( - bamfile_handle, + bamfile_path, chr, seq_length, window_size, @@ -1557,7 +1689,7 @@ else : # Appeler la fonction de calcul de la profondeur moyenne pour ce chromosome main_calcul( - bamfile_handle, + bamfile_path, chr, seq_length, window_size,