Module cNMF.cnmf_parallel

Entire cNMF pipeline run in parallel using GNU parallel adapted from (Kotliar, et al. 2019)

Expand source code
# -*- coding: utf-8 -*-
"""
Entire cNMF pipeline run in parallel using GNU parallel adapted from 
(Kotliar, et al. 2019)
"""
import subprocess as sp
from ._version import get_versions


def parallel(args):
    argdict = vars(args)

    # convert arguments from list to string for passing to cnmf.py
    argdict["components"] = " ".join([str(k) for k in argdict["components"]])
    if argdict["subset"]:
        argdict["subset"] = " ".join([str(k) for k in argdict["subset"]])

    # remove arguments from dictionary prior to prepare
    counts_arg = argdict["counts"]
    del argdict["counts"]
    local_dens_thresh_arg = argdict["local_density_threshold"]
    del argdict["local_density_threshold"]
    local_neighborhood_size_arg = argdict["local_neighborhood_size"]
    del argdict["local_neighborhood_size"]

    # Run prepare
    prepare_opts = [
        "--{} {}".format(k.replace("_", "-"), argdict[k])
        for k in argdict.keys()
        if (argdict[k] is not None) and not isinstance(argdict[k], bool)
    ]
    prepare_cmd = "cnmf prepare {} ".format(counts_arg)
    prepare_cmd += " ".join(prepare_opts)
    print("Preparing directories and preprocessing:  {}".format(prepare_cmd))
    sp.call(prepare_cmd, shell=True)

    # Run factorize
    workind = " ".join([str(x) for x in range(argdict["n_jobs"])])
    factorize_cmd = (
        "nohup parallel cnmf factorize --output-dir %s --name %s --worker-index {} ::: %s"
        % (argdict["output_dir"], argdict["name"], workind)
    )
    print("Running iterative NMF:  {}".format(factorize_cmd))
    sp.call(factorize_cmd, shell=True)

    # Run combine
    combine_cmd = "cnmf combine --output-dir %s --name %s --components %s" % (
        argdict["output_dir"],
        argdict["name"],
        argdict["components"],
    )
    print("Combining NMF replicates:  {}".format(combine_cmd))
    sp.call(combine_cmd, shell=True)

    # Plot K selection
    Kselect_cmd = "cnmf k_selection_plot --output-dir %s --name %s" % (
        argdict["output_dir"],
        argdict["name"],
    )
    print("Plotting K selection parameters:  {}".format(Kselect_cmd))
    sp.call(Kselect_cmd, shell=True)

    # Delete individual iteration files
    clean_cmd = "rm %s/%s/cnmf_tmp/*.iter_*.df.npz" % (
        argdict["output_dir"],
        argdict["name"],
    )
    print("Cleaning up workspace:  {}".format(clean_cmd))
    sp.call(clean_cmd, shell=True)

    if argdict["auto_k"]:
        consensus_cmd = "cnmf consensus --output-dir {} --name {} --auto-k --local-density-threshold {} --local-neighborhood-size {}".format(
            argdict["output_dir"],
            argdict["name"],
            local_dens_thresh_arg,
            local_neighborhood_size_arg,
        )
        if argdict["cleanup"]:
            consensus_cmd = " ".join([consensus_cmd, "--cleanup"])
        print("Building consensus factors:  {}".format(consensus_cmd))
        sp.call(consensus_cmd, shell=True)


def main():
    import argparse

    parser = argparse.ArgumentParser(prog="cnmf_p")
    parser.add_argument(
        "-V",
        "--version",
        action="version",
        version=get_versions()["version"],
    )

    parser.add_argument(
        "counts",
        type=str,
        nargs="?",
        help="Input (cell x gene) counts matrix as .h5ad, df.npz, or tab delimited text file.",
    )

    parser.add_argument(
        "--name",
        type=str,
        help="Name for analysis. All output will be placed in [output-dir]/[name]/... Default 'cNMF'",
        nargs="?",
        default="cNMF",
    )
    parser.add_argument(
        "--output-dir",
        type=str,
        help="Output directory. All output will be placed in [output-dir]/[name]/... Default '.'",
        nargs="?",
        default=".",
    )
    parser.add_argument(
        "-j",
        "--n-jobs",
        type=int,
        help="Total number of workers to distribute jobs to. Default 1.",
        default=1,
    )
    parser.add_argument(
        "-k",
        "--components",
        type=int,
        help='Number of components (k) for matrix factorization. Several can be specified with "-k 8 9 10". Default range(7,18).',
        nargs="*",
        default=[7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    )
    parser.add_argument(
        "-n",
        "--n-iter",
        type=int,
        help="Number of factorization replicates. Default 30.",
        default=30,
    )
    parser.add_argument(
        "--subset",
        help="AnnData.obs column name to subset on before performing NMF. Cells to keep should be True or 1.",
        nargs="*",
    )
    parser.add_argument(
        "-l",
        "--layer",
        type=str,
        default=None,
        help="Key from .layers to use. Default '.X'.",
    )
    parser.add_argument(
        "--gene-symbol-col",
        type=str,
        default=None,
        help="Replace `adata.var_names` with values from `adata.var[gene_symbol_col]` (i.e. to switch symbol for Ensembl ID)",
    )
    parser.add_argument(
        "--seed",
        type=int,
        help="Seed for pseudorandom number generation. Default 18.",
        default=18,
    )
    parser.add_argument(
        "--genes-file",
        type=str,
        help="File containing a list of genes to include, one gene per line. Must match column labels of counts matrix.",
        default=None,
    )
    parser.add_argument(
        "--numgenes",
        type=int,
        help="Number of high variance genes to use for matrix factorization. Default 2000.",
        default=2000,
    )
    parser.add_argument(
        "--tpm",
        type=str,
        help="Pre-computed (cell x gene) TPM values as df.npz or tab separated txt file. If not provided TPM will be calculated automatically.",
        default=None,
    )
    parser.add_argument(
        "--beta-loss",
        type=str,
        choices=["frobenius", "kullback-leibler", "itakura-saito"],
        help="Loss function for NMF. Default 'frobenius'.",
        default="frobenius",
    )
    parser.add_argument(
        "--densify",
        dest="densify",
        help="Treat the input data as non-sparse",
        action="store_true",
        default=False,
    )
    parser.add_argument(
        "--auto-k",
        help="Automatically pick k value for consensus based on maximum stability",
        action="store_true",
    )
    parser.add_argument(
        "--local-density-threshold",
        type=str,
        help="Threshold for the local density filtering. This string must convert to a float >0 and <=2. Default 0.1.",
        default="0.1",
    )
    parser.add_argument(
        "--local-neighborhood-size",
        type=float,
        help="Fraction of the number of replicates to use as nearest neighbors for local density filtering. Default 0.3.",
        default=0.30,
    )
    parser.add_argument(
        "--cleanup",
        help="Remove excess files after saving results to clean workspace",
        action="store_true",
    )

    args = parser.parse_args()
    parallel(args)

Functions

def main()
Expand source code
def main():
    import argparse

    parser = argparse.ArgumentParser(prog="cnmf_p")
    parser.add_argument(
        "-V",
        "--version",
        action="version",
        version=get_versions()["version"],
    )

    parser.add_argument(
        "counts",
        type=str,
        nargs="?",
        help="Input (cell x gene) counts matrix as .h5ad, df.npz, or tab delimited text file.",
    )

    parser.add_argument(
        "--name",
        type=str,
        help="Name for analysis. All output will be placed in [output-dir]/[name]/... Default 'cNMF'",
        nargs="?",
        default="cNMF",
    )
    parser.add_argument(
        "--output-dir",
        type=str,
        help="Output directory. All output will be placed in [output-dir]/[name]/... Default '.'",
        nargs="?",
        default=".",
    )
    parser.add_argument(
        "-j",
        "--n-jobs",
        type=int,
        help="Total number of workers to distribute jobs to. Default 1.",
        default=1,
    )
    parser.add_argument(
        "-k",
        "--components",
        type=int,
        help='Number of components (k) for matrix factorization. Several can be specified with "-k 8 9 10". Default range(7,18).',
        nargs="*",
        default=[7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    )
    parser.add_argument(
        "-n",
        "--n-iter",
        type=int,
        help="Number of factorization replicates. Default 30.",
        default=30,
    )
    parser.add_argument(
        "--subset",
        help="AnnData.obs column name to subset on before performing NMF. Cells to keep should be True or 1.",
        nargs="*",
    )
    parser.add_argument(
        "-l",
        "--layer",
        type=str,
        default=None,
        help="Key from .layers to use. Default '.X'.",
    )
    parser.add_argument(
        "--gene-symbol-col",
        type=str,
        default=None,
        help="Replace `adata.var_names` with values from `adata.var[gene_symbol_col]` (i.e. to switch symbol for Ensembl ID)",
    )
    parser.add_argument(
        "--seed",
        type=int,
        help="Seed for pseudorandom number generation. Default 18.",
        default=18,
    )
    parser.add_argument(
        "--genes-file",
        type=str,
        help="File containing a list of genes to include, one gene per line. Must match column labels of counts matrix.",
        default=None,
    )
    parser.add_argument(
        "--numgenes",
        type=int,
        help="Number of high variance genes to use for matrix factorization. Default 2000.",
        default=2000,
    )
    parser.add_argument(
        "--tpm",
        type=str,
        help="Pre-computed (cell x gene) TPM values as df.npz or tab separated txt file. If not provided TPM will be calculated automatically.",
        default=None,
    )
    parser.add_argument(
        "--beta-loss",
        type=str,
        choices=["frobenius", "kullback-leibler", "itakura-saito"],
        help="Loss function for NMF. Default 'frobenius'.",
        default="frobenius",
    )
    parser.add_argument(
        "--densify",
        dest="densify",
        help="Treat the input data as non-sparse",
        action="store_true",
        default=False,
    )
    parser.add_argument(
        "--auto-k",
        help="Automatically pick k value for consensus based on maximum stability",
        action="store_true",
    )
    parser.add_argument(
        "--local-density-threshold",
        type=str,
        help="Threshold for the local density filtering. This string must convert to a float >0 and <=2. Default 0.1.",
        default="0.1",
    )
    parser.add_argument(
        "--local-neighborhood-size",
        type=float,
        help="Fraction of the number of replicates to use as nearest neighbors for local density filtering. Default 0.3.",
        default=0.30,
    )
    parser.add_argument(
        "--cleanup",
        help="Remove excess files after saving results to clean workspace",
        action="store_true",
    )

    args = parser.parse_args()
    parallel(args)
def parallel(args)
Expand source code
def parallel(args):
    argdict = vars(args)

    # convert arguments from list to string for passing to cnmf.py
    argdict["components"] = " ".join([str(k) for k in argdict["components"]])
    if argdict["subset"]:
        argdict["subset"] = " ".join([str(k) for k in argdict["subset"]])

    # remove arguments from dictionary prior to prepare
    counts_arg = argdict["counts"]
    del argdict["counts"]
    local_dens_thresh_arg = argdict["local_density_threshold"]
    del argdict["local_density_threshold"]
    local_neighborhood_size_arg = argdict["local_neighborhood_size"]
    del argdict["local_neighborhood_size"]

    # Run prepare
    prepare_opts = [
        "--{} {}".format(k.replace("_", "-"), argdict[k])
        for k in argdict.keys()
        if (argdict[k] is not None) and not isinstance(argdict[k], bool)
    ]
    prepare_cmd = "cnmf prepare {} ".format(counts_arg)
    prepare_cmd += " ".join(prepare_opts)
    print("Preparing directories and preprocessing:  {}".format(prepare_cmd))
    sp.call(prepare_cmd, shell=True)

    # Run factorize
    workind = " ".join([str(x) for x in range(argdict["n_jobs"])])
    factorize_cmd = (
        "nohup parallel cnmf factorize --output-dir %s --name %s --worker-index {} ::: %s"
        % (argdict["output_dir"], argdict["name"], workind)
    )
    print("Running iterative NMF:  {}".format(factorize_cmd))
    sp.call(factorize_cmd, shell=True)

    # Run combine
    combine_cmd = "cnmf combine --output-dir %s --name %s --components %s" % (
        argdict["output_dir"],
        argdict["name"],
        argdict["components"],
    )
    print("Combining NMF replicates:  {}".format(combine_cmd))
    sp.call(combine_cmd, shell=True)

    # Plot K selection
    Kselect_cmd = "cnmf k_selection_plot --output-dir %s --name %s" % (
        argdict["output_dir"],
        argdict["name"],
    )
    print("Plotting K selection parameters:  {}".format(Kselect_cmd))
    sp.call(Kselect_cmd, shell=True)

    # Delete individual iteration files
    clean_cmd = "rm %s/%s/cnmf_tmp/*.iter_*.df.npz" % (
        argdict["output_dir"],
        argdict["name"],
    )
    print("Cleaning up workspace:  {}".format(clean_cmd))
    sp.call(clean_cmd, shell=True)

    if argdict["auto_k"]:
        consensus_cmd = "cnmf consensus --output-dir {} --name {} --auto-k --local-density-threshold {} --local-neighborhood-size {}".format(
            argdict["output_dir"],
            argdict["name"],
            local_dens_thresh_arg,
            local_neighborhood_size_arg,
        )
        if argdict["cleanup"]:
            consensus_cmd = " ".join([consensus_cmd, "--cleanup"])
        print("Building consensus factors:  {}".format(consensus_cmd))
        sp.call(consensus_cmd, shell=True)