Commit 10941d40 authored by Theo Serralta's avatar Theo Serralta

Add parallelization (don't work)

parent 773d7dc8
......@@ -8,6 +8,9 @@ import pycuda.autoinit
from pycuda.compiler import SourceModule
import pycuda.gpuarray as gpuarray
from pycuda.autoinit import context
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
# Options
try:
......@@ -111,7 +114,7 @@ calcul_map_kernel_cuda = mod.get_function("calcul_map_kernel")
######<---Fonctions mappability--->#########
#############################################
def merge_intervals(intervals):
#sys.stderr.write("\t merge_intervals\n")
#sys.stderr.write("\t Entering merge_intervals\n")
merged = []
start, end, score = intervals[0]
for interval in intervals[1:]:
......@@ -124,7 +127,7 @@ def merge_intervals(intervals):
return merged
def dico_mappabilite(mappability_file):
sys.stderr.write("\t dico_mappabilite\n")
sys.stderr.write("\t Entering dico_mappabilite\n")
mappability_dico = {}
with open(mappability_file, 'r') as f:
......@@ -152,12 +155,13 @@ def dico_mappabilite(mappability_file):
for chromosome, intervals in mappability_dico.items():
merged_intervals = merge_intervals(intervals)
mappability_dico[chromosome] = {start: score for start, _, score in merged_intervals}
sys.stderr.write("\t Leaving dico_mappabilite\n")
return mappability_dico #Dictionnaire avec les bornes de mappabilité en fonction des positions pour chaque chromosome.
def calcul_mappability(seq_length, mappability, chr):
sys.stderr.write("\t Entering calcul_mappability =\n")
map_data = np.zeros(seq_length, dtype=np.float32)
sys.stderr.write("\t map_data =\n")
sorted_keys = sorted(mappability[chr].keys())
sys.stderr.write("\t sorted_keys =\n")
......@@ -173,14 +177,15 @@ def calcul_mappability(seq_length, mappability, chr):
# Fill remaining positions if sequence length exceeds last bound
for i in range(prev_bound, seq_length):
map_data[i] = prev_mappability
return map_data
sys.stderr.write("\t Leaving calcul_mappability =\n")
return map_data
#############################################
######<---Fonctions calcul gc--->############
#############################################
def parse_fasta(gc_file):
sys.stderr.write("\t parse_fasta\n")
sys.stderr.write("\t Entering parse_fasta\n")
sequences = {}
with open(gc_file, 'r') as f:
data = f.read().split('>')
......@@ -189,28 +194,34 @@ def parse_fasta(gc_file):
header = lines[0]
sequence = ''.join(lines[1:])
sequences[header] = sequence
sys.stderr.write("\t Leaving parse_fasta\n")
return sequences
def calcul_gc_content(seq_length, chr, seq):
sys.stderr.write("\t Entering calcul_gc_content\n")
gc_data = np.zeros(seq_length, dtype="S")
sys.stderr.write("\t gc_data =\n")
for i in range(len(seq[chr])):
gc_data[i] = seq[chr][i]
#print(gc_data[9950:10200])
return gc_data
sys.stderr.write("\t Leaving calcul_gc_content\n")
return gc_data
##############################################
######<---Fonctions calcul Depth Seq--->######
##############################################
def calcul_depth_seq(seq_length, bamfile, chr):
sys.stderr.write("\t Entering calcul_depth_seq\n")
depth_data = np.zeros(seq_length, dtype=np.int32)
sys.stderr.write("\t depth_data =\n")
for pileupcolumn in bamfile.pileup():
#sys.stderr.write("%s : %s \n" % (pileupcolumn.reference_pos, pileupcolumn.nsegments))
if pileupcolumn.reference_pos > seq_length:
break
depth_data[pileupcolumn.reference_pos] = pileupcolumn.nsegments
sys.stderr.write("\t Leaving calcul_depth_seq\n")
return depth_data
#################################
......@@ -220,14 +231,24 @@ def main_calcul(bamfile, chr, seq_length, window_size, step_size, output_file):
sys.stderr.write("\t entering main_calcul\n")
global seq
# Calcul mappability
map_data = calcul_mappability(seq_length, mappability, chr)
with ProcessPoolExecutor(max_workers=3) as executor:
# Calcul GC
gc_data = calcul_gc_content(seq_length, chr, seq)
# Calcul depth seq
depth_data = calcul_depth_seq(seq_length, bamfile, chr)
# Calcul mappability
future_map_data = executor.submit(calcul_mappability, seq_length, mappability, chr)
# Calcul GC
future_gc_data = executor.submit(calcul_gc_content, seq_length, chr, seq)
# Calcul depth seq
future_depth_data = executor.submit(calcul_depth_seq, seq_length, bamfile, chr)
map_data = future_map_data.result()
gc_data = future_gc_data.result()
depth_data = future_depth_data.result()
# Transférer le tableau NumPy vers CUDA
d_depth_data = cuda.mem_alloc(depth_data.nbytes)
......@@ -307,11 +328,12 @@ def main_calcul(bamfile, chr, seq_length, window_size, step_size, output_file):
device = cuda.Device(0)
attributes = device.get_attributes()
num_cores = attributes[1]
print("Nombre de CPU: ", multiprocessing.cpu_count())
print(f"Nombre de coeurs max GPU: {num_cores}")
gc_file = '/work/gad/shared/pipeline/grch38/index/grch38_essential.fa'
mappability_file = '/work/gad/shared/analyse/test/cnvGPU/test_scalability/wgEncodeCrgMapabilityAlign100mer_no_uniq.grch38.bedgraph'
seq = parse_fasta(gc_file)
mappability = dico_mappabilite(mappability_file)
print(f"Nombre de coeurs max : {num_cores}")
#print(attributes)
with pysam.AlignmentFile(bamfile, "rb") as bamfile_handle:
for i, seq_length in enumerate(bamfile_handle.lengths):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment