I am encountering an issue when attempting to process an .h5ad
file using multiprocessing.
I read the .h5ad
file as follows:
adata=anndata.read_h5ad("/public/home/hhl/perturb_weak/K562_gwps_normalized_singlecell_01.h5ad", backed='r')
Then, I filter some cells and attempt to run a function using the multiprocessing
module:
def process_row(row, adata, final_selected_cells):
a_gene = row['sgRNA_id']
b_gene = row['genes']
adata_filtered = adata[final_selected_cells]
other_a_genes = adata_filtered.obs[adata_filtered.obs['gene_id'] != a_gene]['gene_id'].unique()
other_a_count = []
for other_a_gene in other_a_genes:
mask = adata.obs['gene_id'] == other_a_gene
if not mask.any():
continue
selected_cells_4 = adata.obs[mask].index
filtered_cells_index = final_selected_cells.intersection(selected_cells_4)
if a_gene in adata.var_names:
a_expr = adata[filtered_cells_index, a_gene].X.flatten()
b_expr = adata[filtered_cells_index, a_gene].X.flatten()
corr, p_value = spearmanr(a_expr, b_expr)
if p_value <= 0.05:
other_a_count.append({
'a_gene': perturb_gene,
'b_gene': response_gene,
'other_a_gene': other_a_gene,
'corr': corr,
'p_value': p_value
})
print(a_gene, '\t', b_gene, '\t')
return other_a_count
max_workers = 12 # available CPU num
manager = Manager()
final_selected_cells_proxy = manager.list(final_selected_cells)
with Pool(processes=max_workers) as pool:
results = pool.starmap(process_row, [(row, adata, final_selected_cells_proxy) for _, row in media_pertur.iterrows()])
# merge
other_perturb_counts = [item for sublist in results for item in sublist]
However, I am getting the following error
Traceback (most recent call last):
File "/public/home/hhl/perturb_weak/media_perturb/test_multiprocess.py", line 87, in <module>
results = pool.starmap(process_row, [(row, adata, final_selected_cells_proxy) for _, row in media_pertur.iterrows()])
File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/pool.py", line 372, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/pool.py", line 771, in get
raise self._value
File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/pool.py", line 537, in _handle_tasks
put(task)
File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/site-packages/h5py/_hl/base.py", line 370, in __getnewargs__
raise TypeError("h5py objects cannot be pickled")
TypeError: h5py objects cannot be pickled
I am wondering if the code is I/O-intensive, making it unsuitable for multiprocessing but more appropriate for multithreading. I have tried running this function using multithreading, but the speed and memory usage do not meet my expectations.
Is there a way I can run my function using multiprocessing to improve execution speed and reduce memory overhead? I would greatly appreciate any comments and answers.