Skip to content

Pre Module API Reference

Module for pre-processing to generate LandscapeFiles from ST data.

calc_meta_gene_data(cbg)

Calculate gene metadata from the cell-by-gene matrix

Parameters:

Name Type Description Default
cbg DataFrame

A sparse DataFrame with genes as columns and barcodes as rows.

required

Returns:

Type Description

pandas.DataFrame: A DataFrame with gene metadata including mean, standard deviation, maximum expression, and proportion of non-zero expression.

Source code in src/celldega/pre/landscape.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def calc_meta_gene_data(cbg):
    """
    Calculate gene metadata from the cell-by-gene matrix

    Args:
        cbg (pandas.DataFrame): A sparse DataFrame with genes as columns and barcodes as rows.

    Returns:
        pandas.DataFrame: A DataFrame with gene metadata including mean, standard deviation,
            maximum expression, and proportion of non-zero expression.
    """

    # Helper function to convert to dense if sparse
    def convert_to_dense(series):
        """
        Convert a pandas Series to dense format if it's sparse.

        Parameters
        ----------
        series : pandas.Series

        Returns
        -------
        pandas.Series
            Dense Series if input was sparse; original Series otherwise.
        """
        if pd.api.types.is_sparse(series):
            return series.sparse.to_dense()
        return series

    # Ensure cbg is a DataFrame
    if not isinstance(cbg, pd.DataFrame):
        raise TypeError("cbg must be a pandas DataFrame")

    # Determine if cbg is sparse
    is_sparse = pd.api.types.is_sparse(cbg)

    if is_sparse:
        # Ensure cbg has SparseDtype with float and fill_value=0
        cbg = cbg.astype(pd.SparseDtype("float", fill_value=0))
        print("cbg is a sparse DataFrame. Proceeding with sparse operations.")
    else:
        print("cbg is a dense DataFrame. Proceeding with dense operations.")

    # Calculate mean expression across tiles
    print("Calculating mean expression")
    mean_expression = cbg.mean(axis=0)

    # Calculate variance as the average of the squared deviations
    print("Calculating variance")
    num_tiles = cbg.shape[1]
    variance = cbg.apply(
        lambda x: ((x - mean_expression[x.name]) ** 2).sum() / num_tiles, axis=0
    )
    std_deviation = np.sqrt(variance)

    # Calculate maximum expression
    max_expression = cbg.max(axis=0)

    # Calculate proportion of tiles with non-zero expression
    proportion_nonzero = (cbg != 0).sum(axis=0) / len(cbg)

    # Create a DataFrame to hold all these metrics
    meta_gene = pd.DataFrame(
        {
            "mean": mean_expression.sparse.to_dense(),
            "std": std_deviation,
            "max": max_expression.sparse.to_dense(),
            "non-zero": proportion_nonzero.sparse.to_dense(),
        }

    )

    meta_gene_clean = pd.DataFrame(meta_gene.values, index=meta_gene.index.tolist(), columns=meta_gene.columns)

    return meta_gene_clean

convert_long_id_to_short(df)

Converts a column of long integer cell IDs in a DataFrame to a shorter, hash-based representation.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing the EntityID.

required

Returns: pd.DataFrame: The original DataFrame with an additional column named cell_id containing the shortened cell IDs.

The function applies a SHA-256 hash to each cell ID, encodes the hash using base64, and truncates it to create a shorter identifier that is added as a new column to the DataFrame.

Source code in src/celldega/pre/__init__.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def convert_long_id_to_short(df):
    """
    Converts a column of long integer cell IDs in a DataFrame to a shorter, hash-based representation.

    Args:
        df (pd.DataFrame): The DataFrame containing the EntityID.
    Returns:
        pd.DataFrame: The original DataFrame with an additional column named `cell_id`
                      containing the shortened cell IDs.

    The function applies a SHA-256 hash to each cell ID, encodes the hash using base64, and truncates
    it to create a shorter identifier that is added as a new column to the DataFrame.
    """
    # Function to hash and encode the cell ID
    def hash_and_shorten_id(cell_id):
        # Create a hash of the cell ID
        cell_id_bytes = str(cell_id).encode('utf-8')
        hash_object = hashlib.sha256(cell_id_bytes)
        hash_digest = hash_object.digest()

        # Encode the hash to a base64 string to mix letters and numbers, truncate to 9 characters
        short_id = base64.urlsafe_b64encode(hash_digest).decode('utf-8')[:9]
        return short_id

    # Apply the hash_and_shorten_id function to each cell ID in the specified column
    df['cell_id'] = df['EntityID'].apply(hash_and_shorten_id)

    return df

convert_to_jpeg(image_path, quality=80)

Convert a TIFF image to a JPEG image with a quality of score

Parameters

image_path : str Path to the image file quality : int (default=80) Quality score for the JPEG image

Returns

new_image_path : str Path to the JPEG image file

Source code in src/celldega/pre/__init__.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def convert_to_jpeg(image_path, quality=80):
    """
    Convert a TIFF image to a JPEG image with a quality of score

    Parameters
    ----------
    image_path : str
        Path to the image file
    quality : int (default=80)
        Quality score for the JPEG image

    Returns
    -------
    new_image_path : str
        Path to the JPEG image file

    """

    # Load the TIFF image
    image = pyvips.Image.new_from_file(image_path, access="sequential")

    # Save the image as a JPEG with a quality of 80
    new_image_path = image_path.replace(".tif", ".jpeg")
    image.jpegsave(new_image_path, Q=quality)

    return new_image_path

convert_to_png(image_path)

Convert a TIFF image to a JPEG image with a quality of score

Parameters

image_path : str Path to the image file quality : int (default=80) Quality score for the JPEG image

Returns

new_image_path : str Path to the JPEG image file

Source code in src/celldega/pre/__init__.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def convert_to_png(image_path):
    """
    Convert a TIFF image to a JPEG image with a quality of score

    Parameters
    ----------
    image_path : str
        Path to the image file
    quality : int (default=80)
        Quality score for the JPEG image

    Returns
    -------
    new_image_path : str
        Path to the JPEG image file

    """

    # Load the TIFF image
    image = pyvips.Image.new_from_file(image_path, access="sequential")

    # Save the image as a JPEG with a quality of 80
    new_image_path = image_path.replace(".tif", ".png")
    image.pngsave(new_image_path)

    return new_image_path

convert_to_webp(image_path, quality=100)

Convert a TIFF image to a WEBP image with a specified quality score.

Parameters

image_path : str Path to the image file quality : int (default=100) Quality score for the WEBP image (higher is better quality)

Returns

new_image_path : str Path to the WEBP image file

Source code in src/celldega/pre/__init__.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def convert_to_webp(image_path, quality=100):
    """
    Convert a TIFF image to a WEBP image with a specified quality score.

    Parameters
    ----------
    image_path : str
        Path to the image file
    quality : int (default=100)
        Quality score for the WEBP image (higher is better quality)

    Returns
    -------
    new_image_path : str
        Path to the WEBP image file
    """
    # Load the TIFF image
    image = pyvips.Image.new_from_file(image_path, access="sequential")

    # Save the image as a WEBP with specified quality
    new_image_path = image_path.replace(".tif", ".webp")
    image.webpsave(new_image_path, Q=quality)

    return new_image_path

get_max_zoom_level(path_image_pyramid)

Returns the maximum zoom level based on the highest-numbered directory in the specified path_image_pyramid.

Parameters:

Name Type Description Default
path_image_pyramid str

The path to the directory containing zoom level directories.

required

Returns:

Name Type Description
max_pyramid_zoom int

The maximum zoom level.

Source code in src/celldega/pre/__init__.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
def get_max_zoom_level(path_image_pyramid):
    """
    Returns the maximum zoom level based on the highest-numbered directory
    in the specified path_image_pyramid.

    Parameters:
        path_image_pyramid (str): The path to the directory containing zoom level directories.

    Returns:
        max_pyramid_zoom (int): The maximum zoom level.
    """
    # List all entries in the path_image_pyramid that are directories and can be converted to integers
    zoom_levels = [
        entry
        for entry in os.listdir(path_image_pyramid)
        if os.path.isdir(os.path.join(path_image_pyramid, entry)) and entry.isdigit()
    ]

    # Convert to integer and find the maximum value
    max_pyramid_zoom = max(map(int, zoom_levels)) if zoom_levels else None

    return max_pyramid_zoom

make_cell_boundary_tiles(technology, path_cell_boundaries, path_meta_cell_micron, path_transformation_matrix, path_output, coarse_tile_factor=20, tile_size=250, tile_bounds=None, image_scale=1, max_workers=8)

Processes cell boundary data and divides it into spatial tiles based on the provided technology. Reads cell boundary data, applies affine transformations, and divides the data into coarse and fine tiles. The resulting tiles are saved as Parquet files, each containing the geometries of cells in that tile.

Parameters

technology : str The technology used to generate the cell boundary data, e.g., "MERSCOPE", "Xenium", or "custom". path_cell_boundaries : str Path to the file containing the cell boundaries (Parquet format). path_meta_cell_micron : str Path to the file containing cell metadata (CSV format). path_transformation_matrix : str Path to the file containing the transformation matrix (CSV format). path_output : str Directory path where the output files (Parquet files) for each tile will be saved. coarse_tile_factor : int, optional, default=20. scaling factor of each coarse-grain tile comparing to the fine tile size. tile_size : int, optional, default=500 Size of each fine-grain tile in microns. tile_bounds : dict, optional Dictionary containing the minimum and maximum bounds for x and y coordinates. image_scale : float, optional, default=1 Scale factor to apply to the geometry data. max_workers : int, optional, default=8 Maximum number of parallel workers for processing tiles.

Returns

None

Source code in src/celldega/pre/boundary_tile.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def make_cell_boundary_tiles(
    technology,
    path_cell_boundaries,
    path_meta_cell_micron,
    path_transformation_matrix,
    path_output,
    coarse_tile_factor=20,
    tile_size=250,
    tile_bounds=None,
    image_scale=1,
    max_workers=8
):


    """
    Processes cell boundary data and divides it into spatial tiles based on the provided technology.
    Reads cell boundary data, applies affine transformations, and divides the data into coarse and fine tiles.
    The resulting tiles are saved as Parquet files, each containing the geometries of cells in that tile.

    Parameters
    ----------
    technology : str
        The technology used to generate the cell boundary data, e.g., "MERSCOPE", "Xenium", or "custom".
    path_cell_boundaries : str
        Path to the file containing the cell boundaries (Parquet format).
    path_meta_cell_micron : str
        Path to the file containing cell metadata (CSV format).
    path_transformation_matrix : str
        Path to the file containing the transformation matrix (CSV format).
    path_output : str
        Directory path where the output files (Parquet files) for each tile will be saved.
    coarse_tile_factor  : int, optional, default=20.
        scaling factor of each coarse-grain tile comparing to the fine tile size.
    tile_size : int, optional, default=500
        Size of each fine-grain tile in microns.
    tile_bounds : dict, optional
        Dictionary containing the minimum and maximum bounds for x and y coordinates.
    image_scale : float, optional, default=1
        Scale factor to apply to the geometry data.
    max_workers : int, optional, default=8
        Maximum number of parallel workers for processing tiles.

    Returns
    -------
    None
    """

    def numpy_affine_transform(coords, matrix):
        """Apply affine transformation to numpy coordinates."""
        # Homogeneous coordinates for affine transformation
        coords = np.hstack([coords, np.ones((coords.shape[0], 1))])
        transformed_coords = coords @ matrix.T
        return transformed_coords[:, :2]  # Drop the homogeneous coordinate

    def batch_transform_geometries(geometries, transformation_matrix, scale):
        """
        Batch transform geometries using numpy for optimized performance.
        """
        # Extract affine transformation parameters into a 3x3 matrix for numpy
        affine_matrix = np.array([
            [transformation_matrix[0, 0], transformation_matrix[0, 1], transformation_matrix[0, 2]],
            [transformation_matrix[1, 0], transformation_matrix[1, 1], transformation_matrix[1, 2]],
            [0, 0, 1]
        ])

        transformed_geometries = []

        for polygon in geometries:
            # Extract coordinates and transform them
            if isinstance(polygon, MultiPolygon):
                polygon = next(polygon.geoms)  # Use the first geometry

            # Transform the exterior of the polygon
            exterior_coords = np.array(polygon.exterior.coords)

            # Apply the affine transformation and scale
            transformed_coords = numpy_affine_transform(exterior_coords, affine_matrix) / scale

            # Append the result to the transformed_geometries list
            transformed_geometries.append([transformed_coords.tolist()])

        return transformed_geometries


    def filter_and_save_fine_boundary(coarse_tile, fine_i, fine_j, fine_tile_x_min, fine_tile_x_max, fine_tile_y_min, fine_tile_y_max, path_output):
        cell_ids = coarse_tile.index.values

        tile_filter = (
            (coarse_tile["center_x"] >= fine_tile_x_min) & (coarse_tile["center_x"] < fine_tile_x_max) &
            (coarse_tile["center_y"] >= fine_tile_y_min) & (coarse_tile["center_y"] < fine_tile_y_max)
        )
        filtered_indices = np.where(tile_filter)[0]

        keep_cells = cell_ids[filtered_indices]
        fine_tile_cells = coarse_tile.loc[keep_cells, ["GEOMETRY"]]
        fine_tile_cells = fine_tile_cells.assign(name=fine_tile_cells.index)

        if not fine_tile_cells.empty:
            filename = f"{path_output}/cell_tile_{fine_i}_{fine_j}.parquet"
            fine_tile_cells.to_parquet(filename)

    def process_fine_boundaries(coarse_tile, i, j, coarse_tile_x_min, coarse_tile_x_max, coarse_tile_y_min, coarse_tile_y_max, tile_size, path_output, x_min, y_min, n_fine_tiles_x, n_fine_tiles_y):
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for fine_i in range(n_fine_tiles_x):
                fine_tile_x_min = x_min + fine_i * tile_size
                fine_tile_x_max = fine_tile_x_min + tile_size

                if not (fine_tile_x_min >= coarse_tile_x_min and fine_tile_x_max <= coarse_tile_x_max):
                    continue

                for fine_j in range(n_fine_tiles_y):
                    fine_tile_y_min = y_min + fine_j * tile_size
                    fine_tile_y_max = fine_tile_y_min + tile_size

                    if not (fine_tile_y_min >= coarse_tile_y_min and fine_tile_y_max <= coarse_tile_y_max):
                        continue

                    futures.append(executor.submit(
                        filter_and_save_fine_boundary, coarse_tile, fine_i, fine_j, fine_tile_x_min, fine_tile_x_max, fine_tile_y_min, fine_tile_y_max, path_output
                    ))

            for future in futures:
                future.result()

    tile_size_x = tile_size
    tile_size_y = tile_size

    transformation_matrix = pd.read_csv(path_transformation_matrix, header=None, sep=" ").values

    # Load cell boundary data based on the technology
    if technology == "MERSCOPE":
        df_meta = pd.read_parquet(f"{path_output.replace('cell_segmentation','cell_metadata.parquet')}")
        entity_to_cell_id_dict = pd.Series(df_meta.index.values, index=df_meta.EntityID).to_dict()
        cells_orig = gpd.read_parquet(path_cell_boundaries)
        cells_orig['cell_id'] = cells_orig['EntityID'].map(entity_to_cell_id_dict)
        cells_orig = cells_orig[cells_orig["ZIndex"] == 1]

        # Correct cell_id issues with meta_cell
        meta_cell = pd.read_csv(path_meta_cell_micron)
        meta_cell['cell_id'] = meta_cell['EntityID'].map(entity_to_cell_id_dict)
        cells_orig.index = meta_cell[meta_cell["cell_id"].isin(cells_orig['cell_id'])].index

        # Correct 'MultiPolygon' to 'Polygon'
        cells_orig["geometry"] = cells_orig["Geometry"].apply(
            lambda x: list(x.geoms)[0] if isinstance(x, MultiPolygon) else x
        )

        cells_orig.set_index('cell_id', inplace=True)

    elif technology == "Xenium":
        xenium_cells = pd.read_parquet(path_cell_boundaries)
        grouped = xenium_cells.groupby("cell_id")[["vertex_x", "vertex_y"]].agg(lambda x: x.tolist())
        grouped["geometry"] = grouped.apply(lambda row: Polygon(zip(row["vertex_x"], row["vertex_y"])), axis=1)
        cells_orig = gpd.GeoDataFrame(grouped, geometry="geometry")[["geometry"]]

    elif technology == "custom":
        cells_orig = gpd.read_parquet(path_cell_boundaries)

    # Transform geometries
    cells_orig["GEOMETRY"] = batch_transform_geometries(cells_orig["geometry"], transformation_matrix, image_scale)

    # Convert transformed geometries to polygons and calculate centroids
    cells_orig["polygon"] = cells_orig["GEOMETRY"].apply(lambda x: Polygon(x[0]))
    gdf_cells = gpd.GeoDataFrame(geometry=cells_orig["polygon"])
    gdf_cells["center_x"] = gdf_cells.geometry.centroid.x
    gdf_cells["center_y"] = gdf_cells.geometry.centroid.y
    gdf_cells["GEOMETRY"] = cells_orig["GEOMETRY"]

    # Ensure the output directory exists
    if not os.path.exists(path_output):
        os.makedirs(path_output)

    # Calculate tile bounds and fine/coarse tiles
    x_min, x_max = tile_bounds["x_min"], tile_bounds["x_max"]
    y_min, y_max = tile_bounds["y_min"], tile_bounds["y_max"]
    n_fine_tiles_x = int(np.ceil((x_max - x_min) / tile_size))
    n_fine_tiles_y = int(np.ceil((y_max - y_min) / tile_size))
    n_coarse_tiles_x = int(np.ceil((x_max - x_min) / (coarse_tile_factor * tile_size)))
    n_coarse_tiles_y = int(np.ceil((y_max - y_min) / (coarse_tile_factor * tile_size)))

    # Process coarse tiles in parallel
    for i in tqdm(range(n_coarse_tiles_x), desc="Processing coarse tiles"):
        coarse_tile_x_min = x_min + i * (coarse_tile_factor * tile_size)
        coarse_tile_x_max = coarse_tile_x_min + (coarse_tile_factor * tile_size)

        for j in range(n_coarse_tiles_y):
            coarse_tile_y_min = y_min + j * (coarse_tile_factor * tile_size)
            coarse_tile_y_max = coarse_tile_y_min + (coarse_tile_factor * tile_size)

            coarse_tile = gdf_cells[
                (gdf_cells["center_x"] >= coarse_tile_x_min) & (gdf_cells["center_x"] < coarse_tile_x_max) &
                (gdf_cells["center_y"] >= coarse_tile_y_min) & (gdf_cells["center_y"] < coarse_tile_y_max)
            ]
            if not coarse_tile.empty:
                process_fine_boundaries(coarse_tile, i, j, coarse_tile_x_min, coarse_tile_x_max, coarse_tile_y_min, coarse_tile_y_max, tile_size, path_output, x_min, y_min, n_fine_tiles_x, n_fine_tiles_y)

make_deepzoom_pyramid(image_path, output_path, pyramid_name, tile_size=512, overlap=0, suffix='.jpeg')

Create a DeepZoom image pyramid from a JPEG image

Parameters

image_path : str Path to the JPEG image file tile_size : int (default=512) Tile size for the DeepZoom pyramid overlap : int (default=0) Overlap size for the DeepZoom pyramid suffix : str (default='jpeg') Suffix for the DeepZoom pyramid tiles

Returns

None

Source code in src/celldega/pre/__init__.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def make_deepzoom_pyramid(
    image_path, output_path, pyramid_name, tile_size=512, overlap=0, suffix=".jpeg"
):
    """
    Create a DeepZoom image pyramid from a JPEG image

    Parameters
    ----------
    image_path : str
        Path to the JPEG image file
    tile_size : int (default=512)
        Tile size for the DeepZoom pyramid
    overlap : int (default=0)
        Overlap size for the DeepZoom pyramid
    suffix : str (default='jpeg')
        Suffix for the DeepZoom pyramid tiles

    Returns
    -------
    None

    """

    # Define the output path
    output_path = Path(output_path)

    # Load the JPEG image
    image = pyvips.Image.new_from_file(image_path, access="sequential")

    # check if the output path exists and create it if it does not
    output_path.mkdir(parents=True, exist_ok=True)

    # append the pyramid name to the output path
    output_path = output_path / pyramid_name

    # Save the image as a DeepZoom image pyramid
    image.dzsave(output_path, tile_size=tile_size, overlap=overlap, suffix=suffix)

make_meta_cell_image_coord(technology, path_transformation_matrix, path_meta_cell_micron, path_meta_cell_image, image_scale)

Apply an affine transformation to the cell coordinates in microns and save the transformed coordinates in pixels

Parameters

technology : str The technology used to generate the data, Xenium and MERSCOPE are supported. path_transformation_matrix : str Path to the transformation matrix file path_meta_cell_micron : str Path to the meta cell file with coordinates in microns path_meta_cell_image : str Path to save the meta cell file with coordinates in pixels

Returns

None

Examples

make_meta_cell_image_coord( ... technology='Xenium', ... path_transformation_matrix='data/transformation_matrix.txt', ... path_meta_cell_micron='data/meta_cell_micron.csv', ... path_meta_cell_image='data/meta_cell_image.parquet' ... )

Source code in src/celldega/pre/__init__.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def make_meta_cell_image_coord(
    technology,
    path_transformation_matrix,
    path_meta_cell_micron,
    path_meta_cell_image,
    image_scale
):
    """
    Apply an affine transformation to the cell coordinates in microns and save
    the transformed coordinates in pixels

    Parameters
    ----------
    technology : str
        The technology used to generate the data, Xenium and MERSCOPE are supported.
    path_transformation_matrix : str
        Path to the transformation matrix file
    path_meta_cell_micron : str
        Path to the meta cell file with coordinates in microns
    path_meta_cell_image : str
        Path to save the meta cell file with coordinates in pixels

    Returns
    -------
    None

    Examples
    --------
    >>> make_meta_cell_image_coord(
    ...     technology='Xenium',
    ...     path_transformation_matrix='data/transformation_matrix.txt',
    ...     path_meta_cell_micron='data/meta_cell_micron.csv',
    ...     path_meta_cell_image='data/meta_cell_image.parquet'
    ... )

    """

    transformation_matrix = pd.read_csv(
        path_transformation_matrix, header=None, sep=" "
    ).values

    if technology == "MERSCOPE":
        meta_cell = pd.read_csv(path_meta_cell_micron, usecols=["EntityID", "center_x", "center_y"])
        meta_cell = convert_long_id_to_short(meta_cell)
        meta_cell["name"] =  meta_cell["cell_id"]
        meta_cell = meta_cell.set_index('cell_id')
    elif technology == "Xenium":
        usecols = ["cell_id", "x_centroid", "y_centroid"]
        meta_cell = pd.read_csv(path_meta_cell_micron, index_col=0, usecols=usecols)
        meta_cell.columns = ["center_x", "center_y"]
        meta_cell["name"] = pd.Series(meta_cell.index, index=meta_cell.index)

    # Adding a ones column to accommodate for affine transformation
    meta_cell["ones"] = 1

    # Preparing the data for matrix multiplication
    points = meta_cell[["center_x", "center_y", "ones"]].values

    # Applying the transformation matrix
    transformed_points = np.dot(transformation_matrix, points.T).T

    # Updating the DataFrame with transformed coordinates
    meta_cell["center_x"] = transformed_points[:, 0]
    meta_cell["center_y"] = transformed_points[:, 1]

    # Dropping the ones column as it's no longer needed
    meta_cell.drop(columns=["ones"], inplace=True)

    meta_cell["center_x"] = meta_cell["center_x"] / image_scale
    meta_cell["center_y"] = meta_cell["center_y"] / image_scale

    meta_cell["geometry"] = meta_cell.apply(
        lambda row: [row["center_x"], row["center_y"]], axis=1
    )

    if technology == "MERSCOPE":
        meta_cell = meta_cell[["name", "geometry", "EntityID"]]
    else:
        meta_cell = meta_cell[["name", "geometry"]]


    meta_cell.to_parquet(path_meta_cell_image)

make_meta_gene(technology, path_cbg, path_output)

Create a DataFrame with genes and their assigned colors

Parameters

technology : str The technology used to generate the data, Xenium and MERSCOPE are supported. path_cbg : str Path to the cell-by-gene matrix data (the data format can vary based on technology) path_output : str Path to save the meta gene file

Returns

None

Examples

make_meta_gene( ... technology='Xenium', ... path_cbg='data/', ... path_output='data/meta_gene.parquet' ... )

Source code in src/celldega/pre/__init__.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def make_meta_gene(technology, path_cbg, path_output):
    """
    Create a DataFrame with genes and their assigned colors

    Parameters
    ----------
    technology : str
        The technology used to generate the data, Xenium and MERSCOPE are supported.
    path_cbg : str
        Path to the cell-by-gene matrix data (the data format can vary based on technology)
    path_output : str
        Path to save the meta gene file

    Returns
    -------
    None

    Examples
    --------
    >>> make_meta_gene(
    ...     technology='Xenium',
    ...     path_cbg='data/',
    ...     path_output='data/meta_gene.parquet'
    ... )
    """

    if technology == "MERSCOPE":
        cbg = pd.read_csv(path_cbg, index_col=0)
        genes = cbg.columns.tolist()
    elif technology == "Xenium":
        # genes = pd.read_csv(path_cbg + 'features.tsv.gz', sep='\t', header=None)[1].values.tolist()
        cbg = read_cbg_mtx(path_cbg)
        genes = cbg.columns.tolist()

    # Get all categorical color palettes from Matplotlib and flatten them into a single list of colors
    palettes = [plt.get_cmap(name).colors for name in plt.colormaps() if "tab" in name]
    flat_colors = [color for palette in palettes for color in palette]

    # Convert RGB tuples to hex codes
    flat_colors_hex = [to_hex(color) for color in flat_colors]

    # Use modular arithmetic to assign a color to each gene, white for genes with "Blank"
    colors = [
        flat_colors_hex[i % len(flat_colors_hex)] if "Blank" not in gene else "#FFFFFF"
        for i, gene in enumerate(genes)
    ]

    # Create a DataFrame with genes and their assigned colors
    ser_color = pd.Series(colors, index=genes)

    # calculate gene expression metadata
    meta_gene = calc_meta_gene_data(cbg)
    meta_gene['color'] = ser_color

    # Identify sparse columns
    sparse_cols = [col for col in meta_gene.columns if pd.api.types.is_sparse(meta_gene[col])]

    # Convert sparse columns to dense
    for col in sparse_cols:
        meta_gene[col] = meta_gene[col].sparse.to_dense()

    meta_gene.to_parquet(path_output)

make_trx_tiles(technology, path_trx, path_transformation_matrix, path_trx_tiles, coarse_tile_factor=10, tile_size=250, chunk_size=1000000, verbose=False, image_scale=1, max_workers=8)

Processes transcript data by dividing it into coarse-grain and fine-grain tiles, applying transformations, and saving the results in a parallelized manner.

Parameters

technology : str The technology used for generating the transcript data (e.g., "MERSCOPE" or "Xenium"). path_trx : str Path to the file containing the transcript data. path_transformation_matrix : str Path to the file containing the transformation matrix (CSV file). path_trx_tiles : str Directory path where the output files (Parquet files) for each tile will be saved. coarse_tile_factor : int, optional Scaling factor of each coarse-grain tile comparing to the fine tile size. tile_size : int, optional Size of each fine-grain tile in microns (default is 250). chunk_size : int, optional Number of rows to process per chunk for memory efficiency (default is 1000000). verbose : bool, optional Flag to enable verbose output (default is False). image_scale : float, optional Scale factor to apply to the transcript coordinates (default is 0.5). max_workers : int, optional Maximum number of parallel workers for processing tiles (default is 8).

Returns

dict A dictionary containing the bounds of the processed data in both x and y directions.

Source code in src/celldega/pre/trx_tile.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def make_trx_tiles(
    technology,
    path_trx,
    path_transformation_matrix,
    path_trx_tiles,
    coarse_tile_factor=10,
    tile_size=250,
    chunk_size=1000000,
    verbose=False,
    image_scale=1,
    max_workers=8
):
    """
    Processes transcript data by dividing it into coarse-grain and fine-grain tiles,
    applying transformations, and saving the results in a parallelized manner.

    Parameters
    ----------
    technology : str
        The technology used for generating the transcript data (e.g., "MERSCOPE" or "Xenium").
    path_trx : str
        Path to the file containing the transcript data.
    path_transformation_matrix : str
        Path to the file containing the transformation matrix (CSV file).
    path_trx_tiles : str
        Directory path where the output files (Parquet files) for each tile will be saved.
    coarse_tile_factor : int, optional
        Scaling factor of each coarse-grain tile comparing to the fine tile size.
    tile_size : int, optional
        Size of each fine-grain tile in microns (default is 250).
    chunk_size : int, optional
        Number of rows to process per chunk for memory efficiency (default is 1000000).
    verbose : bool, optional
        Flag to enable verbose output (default is False).
    image_scale : float, optional
        Scale factor to apply to the transcript coordinates (default is 0.5).
    max_workers : int, optional
        Maximum number of parallel workers for processing tiles (default is 8).

    Returns
    -------
    dict
        A dictionary containing the bounds of the processed data in both x and y directions.
    """

    def process_coarse_tile(trx, i, j, coarse_tile_x_min, coarse_tile_x_max, coarse_tile_y_min, coarse_tile_y_max, tile_size, path_trx_tiles, x_min, y_min, n_fine_tiles_x, n_fine_tiles_y, max_workers):
        # Filter the entire dataset for the current coarse tile
        coarse_tile = trx.filter(
            (pl.col("transformed_x") >= coarse_tile_x_min) & (pl.col("transformed_x") < coarse_tile_x_max) &
            (pl.col("transformed_y") >= coarse_tile_y_min) & (pl.col("transformed_y") < coarse_tile_y_max)
        )

        if not coarse_tile.is_empty():
            # Now process fine tiles using global fine tile indices
            process_fine_tiles(coarse_tile, i, j, coarse_tile_x_min, coarse_tile_x_max, coarse_tile_y_min, coarse_tile_y_max, tile_size, path_trx_tiles, x_min, y_min, n_fine_tiles_x, n_fine_tiles_y, max_workers)   


    def process_fine_tiles(coarse_tile, coarse_i, coarse_j, coarse_tile_x_min, coarse_tile_x_max, coarse_tile_y_min, coarse_tile_y_max, tile_size, path_trx_tiles, x_min, y_min, n_fine_tiles_x, n_fine_tiles_y, max_workers=8):

        # Use ThreadPoolExecutor for parallel processing of fine-grain tiles within the coarse tile
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []

            # Iterate over fine-grain tiles within the global bounds
            for fine_i in range(n_fine_tiles_x):
                fine_tile_x_min = x_min + fine_i * tile_size
                fine_tile_x_max = fine_tile_x_min + tile_size

                # Process only if the fine tile falls within the current coarse tile's bounds
                if not (fine_tile_x_min >= coarse_tile_x_min and fine_tile_x_max <= coarse_tile_x_max):
                    continue

                for fine_j in range(n_fine_tiles_y):
                    fine_tile_y_min = y_min + fine_j * tile_size
                    fine_tile_y_max = fine_tile_y_min + tile_size

                    # Process only if the fine tile falls within the current coarse tile's bounds
                    if not (fine_tile_y_min >= coarse_tile_y_min and fine_tile_y_max <= coarse_tile_y_max):
                        continue

                    # Submit the task for each fine tile to process in parallel
                    futures.append(executor.submit(
                        filter_and_save_fine_tile, coarse_tile, coarse_i, coarse_j, fine_i, fine_j, 
                        fine_tile_x_min, fine_tile_x_max, fine_tile_y_min, fine_tile_y_max, path_trx_tiles
                    ))

            # Wait for all futures to complete
            for future in concurrent.futures.as_completed(futures):
                future.result()  # Raise exceptions if any occurred during execution


    def filter_and_save_fine_tile(coarse_tile, coarse_i, coarse_j, fine_i, fine_j, fine_tile_x_min, fine_tile_x_max, fine_tile_y_min, fine_tile_y_max, path_trx_tiles):

        # Filter the coarse tile for the current fine tile's boundaries
        fine_tile_trx = coarse_tile.filter(
            (pl.col("transformed_x") >= fine_tile_x_min) & (pl.col("transformed_x") < fine_tile_x_max) &
            (pl.col("transformed_y") >= fine_tile_y_min) & (pl.col("transformed_y") < fine_tile_y_max)
        )

        if not fine_tile_trx.is_empty():
            # Add geometry column as a list of [x, y] pairs
            fine_tile_trx = fine_tile_trx.with_columns(
                pl.concat_list([pl.col("transformed_x"), pl.col("transformed_y")]).alias("geometry")
            ).drop(['transformed_x', 'transformed_y'])

            # Define the filename based on fine tile coordinates
            filename = f"{path_trx_tiles}/transcripts_tile_{fine_i}_{fine_j}.parquet"

            # Save the filtered DataFrame to a Parquet file
            fine_tile_trx.to_pandas().to_parquet(filename)


    # Load transformation matrix
    transformation_matrix = np.loadtxt(path_transformation_matrix)

    # Load the transcript data based on the technology using Polars
    if technology == "MERSCOPE":
        trx_ini = pl.read_csv(path_trx, columns=["gene", "global_x", "global_y"])
        trx_ini = trx_ini.with_columns([
            pl.col("global_x").alias("x"),
            pl.col("global_y").alias("y"),
            pl.col("gene").alias("name")
        ]).select(["name", "x", "y"])

    elif technology == "Xenium":
        trx_ini = pl.read_parquet(path_trx).select([
            pl.col("feature_name").alias("name"),
            pl.col("x_location").alias("x"),
            pl.col("y_location").alias("y")
        ])

    # Process the data in chunks and apply transformations
    all_chunks = []

    for start_row in tqdm(range(0, trx_ini.height, chunk_size), desc="Processing chunks"):
        chunk = trx_ini.slice(start_row, chunk_size)

        # Apply transformation matrix to the coordinates
        points = np.hstack([chunk.select(["x", "y"]).to_numpy(), np.ones((chunk.height, 1))])
        transformed_points = np.dot(points, transformation_matrix.T)[:, :2]

        # Create new transformed columns and drop original x, y columns
        transformed_chunk = chunk.with_columns([
            (pl.Series(transformed_points[:, 0]) * image_scale).round(2).alias("transformed_x"),
            (pl.Series(transformed_points[:, 1]) * image_scale).round(2).alias("transformed_y")
        ]).drop(["x", "y"])
        all_chunks.append(transformed_chunk)

    # Concatenate all chunks after processing
    trx = pl.concat(all_chunks)

    # Ensure the output directory exists
    if not os.path.exists(path_trx_tiles):
        os.makedirs(path_trx_tiles)

    # Get min and max x, y values
    x_min, x_max = trx.select([
        pl.col("transformed_x").min().alias("x_min"),
        pl.col("transformed_x").max().alias("x_max")
    ]).row(0)

    y_min, y_max = trx.select([
        pl.col("transformed_y").min().alias("y_min"),
        pl.col("transformed_y").max().alias("y_max")
    ]).row(0)

    # Calculate the number of fine-grain tiles globally
    n_fine_tiles_x = int(np.ceil((x_max - x_min) / tile_size))
    n_fine_tiles_y = int(np.ceil((y_max - y_min) / tile_size))

    # Calculate the number of coarse-grain tiles
    n_coarse_tiles_x = int(np.ceil((x_max - x_min) / (coarse_tile_factor * tile_size)))
    n_coarse_tiles_y = int(np.ceil((y_max - y_min) / (coarse_tile_factor * tile_size)))

    # Use ThreadPoolExecutor for parallel processing of coarse-grain tiles
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for i in range(n_coarse_tiles_x):
            coarse_tile_x_min = x_min + i * (coarse_tile_factor * tile_size)
            coarse_tile_x_max = coarse_tile_x_min + (coarse_tile_factor * tile_size)

            for j in range(n_coarse_tiles_y):
                coarse_tile_y_min = y_min + j * (coarse_tile_factor * tile_size)
                coarse_tile_y_max = coarse_tile_y_min + (coarse_tile_factor * tile_size)

                # Submit each coarse tile for parallel processing
                futures.append(executor.submit(
                    process_coarse_tile, trx, i, j, coarse_tile_x_min, coarse_tile_x_max, coarse_tile_y_min, coarse_tile_y_max, tile_size, path_trx_tiles, x_min, y_min, n_fine_tiles_x, n_fine_tiles_y, max_workers
                ))

        # Wait for all coarse tiles to complete
        for future in tqdm(concurrent.futures.as_completed(futures), desc="Processing coarse tiles", unit="tile"):
            future.result()  # Raise exceptions if any occurred during execution

    # Return the tile bounds
    tile_bounds = {
        "x_min": x_min,
        "x_max": x_max,
        "y_min": y_min,
        "y_max": y_max,
    }

    return tile_bounds

read_cbg_mtx(base_path)

Read the cell-by-gene matrix from the mtx files.

Parameters

base_path : str The base path to the directory containing the mtx files.

Returns

cbg : pandas.DataFrame A sparse DataFrame with genes as columns and barcodes as rows.

Source code in src/celldega/pre/landscape.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def read_cbg_mtx(base_path):
    """
    Read the cell-by-gene matrix from the mtx files.

    Parameters
    ----------
    base_path : str
        The base path to the directory containing the mtx files.

    Returns
    -------
    cbg : pandas.DataFrame
        A sparse DataFrame with genes as columns and barcodes as rows.
    """
    print("Reading mtx file from ", base_path)

    # File paths
    barcodes_path = os.path.join(base_path, "barcodes.tsv.gz")
    features_path = os.path.join(base_path, "features.tsv.gz")
    matrix_path = os.path.join(base_path, "matrix.mtx.gz")

    # Read barcodes and features
    barcodes = pd.read_csv(barcodes_path, header=None, compression="gzip")
    features = pd.read_csv(features_path, header=None, compression="gzip", sep="\t")

    # Read the gene expression matrix and transpose it
    # Transpose and convert to CSC format for fast column slicing
    matrix = mmread(matrix_path).transpose().tocsc()

    # Create a sparse DataFrame with genes as columns and barcodes as rows
    cbg = pd.DataFrame.sparse.from_spmatrix(
        matrix, index=barcodes[0], columns=features[1]
    )
    cbg = cbg.rename_axis('__index_level_0__', axis='columns')

    return cbg

reduce_image_size(image_path, scale_image=0.5, path_landscape_files='')

Parameters

image_path : str Path to the image file scale_image : float (default=0.5) Scale factor for the image resize

Returns

new_image_path : str Path to the resized image file

Source code in src/celldega/pre/__init__.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def reduce_image_size(image_path, scale_image=0.5, path_landscape_files=""):
    """

    Parameters
    ----------
    image_path : str
        Path to the image file
    scale_image : float (default=0.5)
        Scale factor for the image resize

    Returns
    -------
    new_image_path : str
        Path to the resized image file
    """

    image = pyvips.Image.new_from_file(image_path, access="sequential")

    resized_image = image.resize(scale_image)

    new_image_name = image_path.split("/")[-1].replace(".tif", "_downsize.tif")
    new_image_path = f"{path_landscape_files}/{new_image_name}"
    resized_image.write_to_file(new_image_path)

    return new_image_path

save_cbg_gene_parquets(base_path, cbg, verbose=False)

Save the cell-by-gene matrix as gene-specific Parquet files.

Parameters

base_path : str The base path to the parent directory containing the landscape_files directory. cbg : pandas.DataFrame A sparse DataFrame with genes as columns and barcodes as rows. verbose : bool, optional Whether to print progress information, by default False.

Returns

None

Source code in src/celldega/pre/landscape.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def save_cbg_gene_parquets(base_path, cbg, verbose=False):
    """
    Save the cell-by-gene matrix as gene-specific Parquet files.

    Parameters
    ----------
    base_path : str
        The base path to the parent directory containing the landscape_files directory.
    cbg : pandas.DataFrame
        A sparse DataFrame with genes as columns and barcodes as rows.
    verbose : bool, optional
        Whether to print progress information, by default False.

    Returns
    -------
    None
    """
    output_dir = os.path.join(base_path, "cbg")
    os.makedirs(output_dir, exist_ok=True)

    for index, gene in enumerate(cbg.columns):
        if verbose and index % 100 == 0:
            print(f"Processing gene {index}: {gene}")

        # Extract the column as a DataFrame as a copy
        col_df = cbg[[gene]].copy()

        # Convert to dense and integer type
        col_df = col_df.sparse.to_dense().astype(int)

        # Create a DataFrame necessary to prevent error in to_parquet
        inst_df = pd.DataFrame(
            col_df.values, columns=[gene], index=col_df.index.tolist()
        )

        # Replace 0 with NA and drop rows where all values are NA
        inst_df.replace(0, pd.NA, inplace=True)
        inst_df.dropna(how="all", inplace=True)

        # Save to Parquet if DataFrame is not empty
        if not inst_df.empty:
            output_path = os.path.join(output_dir, f"{gene}.parquet")
            inst_df.to_parquet(output_path)

save_landscape_parameters(technology, path_landscape_files, image_name='dapi_files', tile_size=1000, image_info={}, image_format='.webp')

Save the landscape parameters to a JSON file.

Source code in src/celldega/pre/__init__.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def save_landscape_parameters(
    technology, path_landscape_files, image_name="dapi_files", tile_size=1000, image_info={}, image_format='.webp'
):
    """
    Save the landscape parameters to a JSON file.
    """

    path_image_pyramid = f"{path_landscape_files}/pyramid_images/{image_name}"

    print(path_image_pyramid)

    max_pyramid_zoom = get_max_zoom_level(path_image_pyramid)

    landscape_parameters = {
        "technology": technology,
        "max_pyramid_zoom": max_pyramid_zoom,
        "tile_size": tile_size,
        "image_info": image_info,
        "image_format": image_format
    }

    path_landscape_parameters = f"{path_landscape_files}/landscape_parameters.json"

    with open(path_landscape_parameters, "w") as file:
        json.dump(landscape_parameters, file, indent=4)