How can I process anndata using multiprocessing?

I am encountering an issue when attempting to process an .h5ad file using multiprocessing.

I read the .h5ad file as follows:

adata=anndata.read_h5ad("/public/home/hhl/perturb_weak/K562_gwps_normalized_singlecell_01.h5ad", backed='r')

Then, I filter some cells and attempt to run a function using the multiprocessing module:

def process_row(row, adata, final_selected_cells):
    a_gene = row['sgRNA_id']
    b_gene = row['genes']
    adata_filtered = adata[final_selected_cells]
    
    other_a_genes = adata_filtered.obs[adata_filtered.obs['gene_id'] != a_gene]['gene_id'].unique()
    
    other_a_count = [] 
    
    for other_a_gene in other_a_genes:
        mask = adata.obs['gene_id'] == other_a_gene
        if not mask.any():
            continue
        
        selected_cells_4 = adata.obs[mask].index
        filtered_cells_index = final_selected_cells.intersection(selected_cells_4)
        
        if a_gene in adata.var_names:
            a_expr = adata[filtered_cells_index, a_gene].X.flatten()
            b_expr = adata[filtered_cells_index, a_gene].X.flatten()
            corr, p_value = spearmanr(a_expr, b_expr)
            
            if p_value <= 0.05:
                other_a_count.append({
                    'a_gene': perturb_gene,
                    'b_gene': response_gene,
                    'other_a_gene': other_a_gene,
                    'corr': corr,
                    'p_value': p_value
                })
    
    print(a_gene, '\t', b_gene, '\t')
    return other_a_count

max_workers = 12  # available CPU num

manager = Manager()
final_selected_cells_proxy = manager.list(final_selected_cells)

with Pool(processes=max_workers) as pool:
    results = pool.starmap(process_row, [(row, adata, final_selected_cells_proxy) for _, row in media_pertur.iterrows()])

# merge
other_perturb_counts = [item for sublist in results for item in sublist]

However, I am getting the following error :broken_heart: :broken_heart: :broken_heart:

Traceback (most recent call last):
  File "/public/home/hhl/perturb_weak/media_perturb/test_multiprocess.py", line 87, in <module>
    results = pool.starmap(process_row, [(row, adata, final_selected_cells_proxy) for _, row in media_pertur.iterrows()])
  File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/pool.py", line 372, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj) 
  File "/home/hhl/miniconda3/envs/jupyter/lib/python3.9/site-packages/h5py/_hl/base.py", line 370, in __getnewargs__
    raise TypeError("h5py objects cannot be pickled")
TypeError: h5py objects cannot be pickled

I am wondering if the code is I/O-intensive, making it unsuitable for multiprocessing but more appropriate for multithreading. :thinking: :thinking: I have tried running this function using multithreading, but the speed and memory usage do not meet my expectations.
Is there a way I can run my function using multiprocessing to improve execution speed and reduce memory overhead? I would greatly appreciate any comments and answers. :hand_with_index_finger_and_thumb_crossed:

I would try re-opening the AnnData object or simply X every time within the process. We ran into a similar issue with dask, hence anndata/src/anndata/_io/specs/lazy_methods.py at f2178fcdf72b814daf162789854325e1fce8877a · scverse/anndata · GitHub

Thanks so much for your advise, I will try it now! Again thanks for your powerful tools! :+1: