Skip to content

Data Helpers

Data loading and simulation helpers.

Loading

read_mtx

read_mtx(
    mtx_file_name: Union[str, Path],
    gene_file_name: Union[str, Path],
    barcode_file_name: Optional[Union[str, Path]],
) -> pd.DataFrame

Read mtx data

Parameters:

Name Type Description Default
mtx_file_name Union[str, Path]

File name of mtx data

required
gene_file_name Union[str, Path]

File name of gene vector

required
barcode_file_name Optional[Union[str, Path]]

File name of barcode vector

required

Returns:

Name Type Description
df DataFrame

A dataframe with genes as rows and cells as columns

Source code in scTenifold/data/_io.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def read_mtx(mtx_file_name: Union[str, Path],
             gene_file_name: Union[str, Path],
             barcode_file_name: Optional[Union[str, Path]]) -> pd.DataFrame:
    """
    Read mtx data

    Parameters
    ----------
    mtx_file_name: str
        File name of mtx data
    gene_file_name
        File name of gene vector
    barcode_file_name
        File name of barcode vector

    Returns
    -------
    df: pd.DataFrame
        A dataframe with genes as rows and cells as columns
    """
    if mtx_file_name is None:
        raise ValueError("matrix file is required")
    if gene_file_name is None:
        raise ValueError("gene file is required")
    genes = pd.read_csv(gene_file_name, sep='\t', header=None).iloc[:, 0]
    barcodes = pd.read_csv(barcode_file_name, sep='\t', header=None).iloc[:, 0] \
        if barcode_file_name is not None else None
    if barcodes is None:
        warn("Barcode file is not existed. Added fake barcode name in the dataset")
    body, is_dense, n_rows, n_cols = _parse_mtx(mtx_file_name)
    barcodes = barcodes if barcodes is not None else [f"barcode_{i}" for i in range(n_cols)]
    print(f"creating a {(len(genes), len(barcodes))} matrix")
    if not is_dense:
        data = _build_matrix_from_sparse(body, shape=(len(genes), len(barcodes)))
    else:
        data = body
    df = pd.DataFrame(index=genes, columns=barcodes, data=data)
    return df

read_folder

read_folder(
    file_dir: Union[str, Path],
    matrix_fn: str = "matrix",
    gene_fn: str = "genes",
    barcodes_fn: str = "barcodes",
) -> pd.DataFrame

Read mtx + genes + barcodes from a directory by filename substring.

Parameters:

Name Type Description Default
file_dir Union[str, Path]

Path to a directory containing matrix, gene, and barcode files.

required
matrix_fn str

Substring identifying the matrix file (e.g. "matrix").

'matrix'
gene_fn str

Substring identifying the gene file.

'genes'
barcodes_fn str

Substring identifying the barcode file.

'barcodes'

Returns:

Type Description
Genes-by-cells DataFrame.
Source code in scTenifold/data/_io.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def read_folder(file_dir: Union[str, Path],
                matrix_fn: str = "matrix",
                gene_fn: str = "genes",
                barcodes_fn: str = "barcodes") -> pd.DataFrame:
    """Read mtx + genes + barcodes from a directory by filename substring.

    Parameters
    ----------
    file_dir
        Path to a directory containing matrix, gene, and barcode files.
    matrix_fn
        Substring identifying the matrix file (e.g. ``"matrix"``).
    gene_fn
        Substring identifying the gene file.
    barcodes_fn
        Substring identifying the barcode file.

    Returns
    -------
    Genes-by-cells DataFrame.
    """
    dir_path = Path(file_dir)
    fn_dic = {fn: [] for fn in [matrix_fn, gene_fn, barcodes_fn]}
    if not dir_path.is_dir():
        raise ValueError("Path is not exist or is not a folder path")
    for fn in dir_path.iterdir():
        for k in fn_dic:
            if k in fn.name:
                fn_dic[k].append(fn)

    resolved = {}
    for key, matches in fn_dic.items():
        if len(matches) > 1:
            raise ValueError(f"Multiple files match {key!r}: {[match.name for match in matches]}")
        resolved[key] = matches[0] if matches else None

    matrix_path = resolved[matrix_fn]
    gene_path = resolved[gene_fn]
    barcode_path = resolved[barcodes_fn]
    if matrix_path is None:
        raise ValueError("matrix file is required")
    if gene_path is None:
        raise ValueError("gene file is required")

    return read_mtx(mtx_file_name=str(matrix_path),
                    gene_file_name=str(gene_path),
                    barcode_file_name=str(barcode_path) if barcode_path else None)

fetch_data

fetch_data(
    ds_name: str,
    dataset_path: Path = Path(__file__).parent.parent.parent
    / Path("datasets"),
    owner: str = "qwerty239qwe",
) -> Dict[str, pd.DataFrame]

Fetch and load a remote scTenifold dataset by name.

Parameters:

Name Type Description Default
ds_name str

Dataset name (one of :data:_valid_ds_names).

required
dataset_path Path

Local directory to cache downloads.

parent / Path('datasets')
owner str

GitHub owner of the scTenifold-data mirror.

'qwerty239qwe'

Returns:

Type Description
Mapping from sample-group name to a genes-by-cells DataFrame.
Source code in scTenifold/data/_get.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def fetch_data(ds_name: str,
               dataset_path: Path = Path(__file__).parent.parent.parent / Path("datasets"),
               owner: str = "qwerty239qwe") -> Dict[str, pd.DataFrame]:
    """Fetch and load a remote scTenifold dataset by name.

    Parameters
    ----------
    ds_name
        Dataset name (one of :data:`_valid_ds_names`).
    dataset_path
        Local directory to cache downloads.
    owner
        GitHub owner of the ``scTenifold-data`` mirror.

    Returns
    -------
    Mapping from sample-group name to a genes-by-cells DataFrame.
    """
    if not dataset_path.is_dir():
        dataset_path.mkdir(parents=True)
    if ds_name not in _valid_ds_names:
        raise ValueError(f"Unknown dataset {ds_name!r}; expected one of {_valid_ds_names}")
    ds_dic = list_data(owner=owner, return_list=False)
    if ds_name not in ds_dic:
        raise ValueError(f"Dataset {ds_name!r} was not found in the remote data repository")

    result_df = {}

    for lv_1, files in ds_dic[ds_name].items():
        fn_names = {k: None for k in ["matrix", "genes", "barcodes"]}
        for f in files:
            if not (dataset_path / Path(lv_1)).is_dir():
                (dataset_path / Path(lv_1)).mkdir(parents=True, exist_ok=True)
            for fn_name in fn_names:
                if fn_name in f:
                    fn_names[fn_name] = f
            if not (dataset_path / Path(f)).exists():
                download_url(url=_repo_url.format(owner=owner, ds_name=f), save_path=(dataset_path / Path(f)))
        result_df[re.findall(r".*/(.*)", lv_1)[0]] = read_mtx(mtx_file_name=str((dataset_path / Path(fn_names["matrix"]))),
                                                              gene_file_name=str((dataset_path / Path(fn_names["genes"]))),
                                                              barcode_file_name=str((dataset_path / Path(fn_names["barcodes"])))
                                                              if fn_names["barcodes"] is not None else None) # optional
    return result_df

list_data

list_data(
    owner: str = "qwerty239qwe", return_list: bool = True
) -> Union[Dict[str, Dict[str, List[str]]], List[str]]

Parameters:

Name Type Description Default
owner str

owner name of dataset repo

'qwerty239qwe'
return_list bool

To return list of data name or return a dict indicating repo structure

True

Returns:

Name Type Description
data_info_tree list or dict

The obtainable data store in a dict, structure {'data_name': {'group': ['file_names']}} or in a list of data_names

Source code in scTenifold/data/_get.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def list_data(owner: str = "qwerty239qwe",
              return_list: bool = True) -> Union[Dict[str, Dict[str, List[str]]], List[str]]:
    """

    Parameters
    ----------
    owner: str, default = 'qwerty239qwe'
        owner name of dataset repo
    return_list: bool, default = True
        To return list of data name or return a dict indicating repo structure
    Returns
    -------
    data_info_tree: list or dict
        The obtainable data store in a dict, structure {'data_name': {'group': ['file_names']}}
        or in a list of data_names
    """
    response = requests.get(_repo_tree_url.format(owner=owner))
    response.raise_for_status()
    tree = response.json()['tree']
    ds_list = [p["path"] for p in tree if "/" not in p["path"] and p["type"] == "tree"]
    if return_list:
        return ds_list

    s_pattern = re.compile(r"/")
    lv1, lv2 = {}, []
    for t in tree:
        if len(re.findall(s_pattern, t['path'])) == 1:
            lv1[t["path"]] = []
        elif len(re.findall(s_pattern, t['path'])) == 2:
            lv2.append(t["path"])
    for b in lv2:
        lv1[re.findall(r"(.*)/", b)[0]].append(b)

    ds_dic = {ds: {} for ds in ds_list}
    for k, v in lv1.items():
        ds_dic[re.findall(r"(.*)/", k)[0]][k] = v
    return ds_dic

Simulation

get_test_df

get_test_df(
    n_cells: int = 100,
    n_genes: int = 1000,
    random_state: Optional[int] = None,
) -> pd.DataFrame

Function to generate test dataframe

Parameters:

Name Type Description Default
n_cells int

Number of cells in the generated df

100
n_genes int

Number of genes in the generated df

1000
random_state Optional[int]

Random seed of generated data, used the same seed to reproduce the same dataset

None

Returns:

Name Type Description
test_df DataFrame

testing data

Source code in scTenifold/data/_sim.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def get_test_df(n_cells: int = 100,
                n_genes: int = 1000,
                random_state: Optional[int] = None) -> pd.DataFrame:
    """
    Function to generate test dataframe

    Parameters
    ----------
    n_cells: int, default = 100
        Number of cells in the generated df
    n_genes: default = 1000
        Number of genes in the generated df
    random_state: default = None
        Random seed of generated data, used the same seed to reproduce the same dataset

    Returns
    -------
    test_df: pd.DataFrame
        testing data
    """
    data = np.random.default_rng(seed=random_state).negative_binomial(20, 0.98,
                                                                      n_cells * n_genes).reshape(n_genes, n_cells)
    n_mt = min(10, n_genes)
    pseudo_gene_names = ["MT-{}".format(i) for i in range(1, n_mt + 1)] + \
        ["NG-{}".format(i) for i in range(1, n_genes - n_mt + 1)]
    pseudo_cell_names = ["Cell-{}".format(i) for i in range(1, n_cells + 1)]
    return pd.DataFrame(data, index=pseudo_gene_names, columns=pseudo_cell_names)

TestDataGenerator dataclass

TestDataGenerator(
    n_genes: int = 1000,
    n_samples: int = 100,
    pos_eff_ratio: float = 0.3,
    neg_eff_ratio: float = 0,
    target_pos: Optional[Sequence[str]] = None,
    target_neg: Optional[Sequence[str]] = None,
    n_bins: int = 25,
    n_ctrl: int = 50,
    random_state: int = 42,
)

A test data generator produces test data for cell scoring functions

Parameters:

Name Type Description Default
n_genes int

Number of genes in the data

1000
n_samples int

Number of cells(samples) in the data

100
pos_eff_ratio float

Fraction of up-regulated cells

0.3
neg_eff_ratio float

Fraction of down-regulated cells

0
target_pos Optional[Sequence[str]]
None
target_neg Optional[Sequence[str]]
None
n_bins int
25
n_ctrl int
50
random_state int
42

__post_init__

__post_init__() -> None

Build the simulated count matrix and gene/sample labels.

Source code in scTenifold/data/_sim.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __post_init__(self) -> None:
    """Build the simulated count matrix and gene/sample labels."""
    self.random_state_seed = self.random_state
    random_state = np.random.default_rng(self.random_state)
    if self.target_pos is None:
        self.target_pos = DEFAULT_POS
    if self.target_neg is None:
        self.target_neg = []
    if len(self.target_pos) + len(self.target_neg) > self.n_genes:
        raise ValueError("n_genes must be at least the number of target positive and negative genes")
    self.X = random_state.negative_binomial(20, 0.9,
                                            size=(self.n_genes, self.n_samples))
    self._add_eff(random_state)

    self.gene_list = ([f"pseudo_G{i}" for i in range(self.n_genes -
                                                     len(self.target_pos) -
                                                     len(self.target_neg))] +
                      self.target_pos + self.target_neg)
    self.samples = [f"cell{i}" for i in range(self.X.shape[1])]
    self.n_X = _normalize(self.X)

save_data

save_data(
    file_path: Union[str, Path], use_normalized: bool
) -> None

Save the simulated count matrix as CSV to file_path.

Source code in scTenifold/data/_sim.py
121
122
123
124
125
def save_data(self,
              file_path: Union[str, Path],
              use_normalized: bool) -> None:
    """Save the simulated count matrix as CSV to ``file_path``."""
    self.get_data("pandas", use_normalized)["X"].to_csv(file_path)

get_data

get_data(
    data_type: str, use_normalized: bool
) -> Dict[str, object]

Return the simulated data packaged for downstream scorers.

Parameters:

Name Type Description Default
data_type str

One of "numpy", "pandas", or "ann_data" (currently disabled — returns an empty dict).

required
use_normalized bool

If True, return log-CPM-like normalized counts; otherwise raw counts.

required

Returns:

Type Description
Keyword arguments suitable for :func:`cell_cycle_score` or :func:`adobo_score`.
Source code in scTenifold/data/_sim.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def get_data(self, data_type: str, use_normalized: bool) -> Dict[str, object]:
    """Return the simulated data packaged for downstream scorers.

    Parameters
    ----------
    data_type
        One of ``"numpy"``, ``"pandas"``, or ``"ann_data"`` (currently
        disabled — returns an empty dict).
    use_normalized
        If True, return log-CPM-like normalized counts; otherwise raw counts.

    Returns
    -------
    Keyword arguments suitable for :func:`cell_cycle_score` or :func:`adobo_score`.
    """
    used_X = self.n_X if use_normalized else self.X
    if data_type == "ann_data":
        # used_X = AnnData(
        #     sparse.csr_matrix(used_X.T),
        #     obs=pd.DataFrame(index=self.samples),
        #     var=pd.DataFrame(index=self.gene_list),
        # )
        # return {"random_state": self.random_state_seed,
        #         "adata": used_X,
        #         "gene_list": self.target_pos,
        #         "n_bins": self.n_bins,
        #         "ctrl_size": self.n_ctrl,
        #         "copy": True}
        return {}
    elif data_type == "numpy":
        return {"random_state": self.random_state_seed,
                "X": used_X,
                "gene_list": self.gene_list,
                "sample_list": self.samples,
                "n_bins": self.n_bins,
                "n_ctrl": self.n_ctrl}
    elif data_type == "pandas":
        used_X = pd.DataFrame(used_X,
                              index=self.gene_list,
                              columns=self.samples)
        return {"random_state": self.random_state_seed,
                "X": used_X,
                "genes": self.target_pos,
                "n_bins": self.n_bins,
                "n_ctrl": self.n_ctrl}