Skip to content

Neighborhood Module API Reference

Module for performing neighborhood analysis.

NBHD

A class representing neighborhoods with associated derived data matrices.

Source code in src/celldega/nbhd/neighborhoods.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
class NBHD:
    """A class representing neighborhoods with associated derived data matrices."""

    def __init__(
        self,
        gdf: gpd.GeoDataFrame,
        nbhd_type: str,
        adata: AnnData,
        data_dir: str,
        path_landscape_files: str,
        source: str | dict[str, Any] | None = None,
        name: str | None = None,
        meta: dict[str, Any] | None = None,
    ) -> None:
        self.gdf = gdf.copy()
        self.nbhd_type = nbhd_type
        self.adata = adata
        self.data_dir = data_dir
        self.path_landscape_files = path_landscape_files
        self.source = source
        self.name = name
        self.meta = meta or {}

        self.derived: dict[str, Any] = {
            "NBI": None,
            "NBG-CF": None,
            "NBG-CD": None,
            "NBP": {},
            "NBN-O": None,
            "NBN-B": None,
        }

    def set_derived(self, key: str, subkey: str | None = None) -> None:
        """
        Set a derived data matrix.
        """
        if key == "NBG-CD":
            data = calc_nbhd_by_gene(self.gdf, by="cell", adata=self.adata)
        elif key == "NBG-CF":
            data = calc_nbhd_by_gene(self.gdf, by="cell-free", data_dir=self.data_dir)
        elif key == "NBP":
            data = {
                "pct": calc_nbhd_by_pop(
                    self.adata, self.gdf, category="leiden", output="percentage"
                )
            }
            data["abs"] = calc_nbhd_by_pop(self.adata, self.gdf, category="leiden", output="counts")
        elif key == "NBM":
            gdf_trx = _get_gdf_trx(self.data_dir)
            gdf_cell = _get_gdf_cell(self.adata)
            data = get_nbhd_meta(self.gdf, "name", gdf_trx, gdf_cell)
        elif key == "NBN-O":
            if self.nbhd_type == "ALPH":
                nb = self.gdf[["name", "geometry"]]
                print("Calculating neighborhood overlap")
                data = calc_nbhd_overlap(nb)
            else:
                raise ValueError("NBN-O can be derived for ALPH only")
        elif key == "NBN-B":
            if self.nbhd_type == "ALPH":
                raise ValueError("NBN-B can not be derived for nbhd having overlap")
            nb = self.gdf[["name", "geometry"]]
            print("Calculating neighborhood bordering")
            data = calc_nbhd_bordering(nb)
        elif key == "NBI":
            data = calc_nbhd_by_image(
                f"{self.data_dir}/morphology_focus/morphology_focus_0000.ome.tif",
                self.path_landscape_files,
                self.gdf,
            )
        else:
            raise ValueError(f"Unknown derived key: {key}")

        if key == "NBP":
            for subkey in data:
                self.derived[key][subkey] = data[subkey]
        else:
            self.derived[key] = data

        print(f"{key} is derived and attached to nbhd")

    def _add_geo(self, df: pd.DataFrame) -> pd.DataFrame:
        return (
            self.gdf[["name", "geometry"]]
            .set_index("name")
            .join(df, how="left")
            .fillna(0)
            .reset_index()
            .rename(columns={"name": "nbhd_id"})
        )

    def get_derived(self, key: str, subkey: str | None = None) -> pd.DataFrame:
        if key == "NBP":
            df = self.derived[key].get(subkey)
            return self._add_geo(df)
        df = self.derived.get(key)
        return self._add_geo(df)

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        return self.gdf

    def summary(self) -> dict[str, Any]:
        return {
            "name": self.name,
            "type": self.nbhd_type,
            "n_regions": len(self.gdf),
            "derived": {k: self._derived_summary(k) for k in self.derived},
            "meta": self.meta,
        }

    def _derived_summary(self, key: str) -> tuple | dict[str, tuple] | None:
        val = self.derived.get(key)
        if val is None:
            return None
        if key == "NBP":
            subkeys = ["abs", "pct"]
            summary = {}
            for subkey in subkeys:
                subval = val.get(subkey)
                summary[subkey] = subval.shape if hasattr(subval, "shape") else None
            return summary
        return val.shape if hasattr(val, "shape") else None

set_derived(key, subkey=None)

Set a derived data matrix.

Source code in src/celldega/nbhd/neighborhoods.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def set_derived(self, key: str, subkey: str | None = None) -> None:
    """
    Set a derived data matrix.
    """
    if key == "NBG-CD":
        data = calc_nbhd_by_gene(self.gdf, by="cell", adata=self.adata)
    elif key == "NBG-CF":
        data = calc_nbhd_by_gene(self.gdf, by="cell-free", data_dir=self.data_dir)
    elif key == "NBP":
        data = {
            "pct": calc_nbhd_by_pop(
                self.adata, self.gdf, category="leiden", output="percentage"
            )
        }
        data["abs"] = calc_nbhd_by_pop(self.adata, self.gdf, category="leiden", output="counts")
    elif key == "NBM":
        gdf_trx = _get_gdf_trx(self.data_dir)
        gdf_cell = _get_gdf_cell(self.adata)
        data = get_nbhd_meta(self.gdf, "name", gdf_trx, gdf_cell)
    elif key == "NBN-O":
        if self.nbhd_type == "ALPH":
            nb = self.gdf[["name", "geometry"]]
            print("Calculating neighborhood overlap")
            data = calc_nbhd_overlap(nb)
        else:
            raise ValueError("NBN-O can be derived for ALPH only")
    elif key == "NBN-B":
        if self.nbhd_type == "ALPH":
            raise ValueError("NBN-B can not be derived for nbhd having overlap")
        nb = self.gdf[["name", "geometry"]]
        print("Calculating neighborhood bordering")
        data = calc_nbhd_bordering(nb)
    elif key == "NBI":
        data = calc_nbhd_by_image(
            f"{self.data_dir}/morphology_focus/morphology_focus_0000.ome.tif",
            self.path_landscape_files,
            self.gdf,
        )
    else:
        raise ValueError(f"Unknown derived key: {key}")

    if key == "NBP":
        for subkey in data:
            self.derived[key][subkey] = data[subkey]
    else:
        self.derived[key] = data

    print(f"{key} is derived and attached to nbhd")

alpha_shape_cell_clusters(adata, cat='cluster', alphas=(100, 150, 200, 250, 300, 350), meta_cluster=None)

Compute alpha shapes for each cluster in the cell metadata.

Parameters

adata : AnnData AnnData object with cell metadata in obs and spatial coordinates in obsm["spatial"]. cat : str Column name in adata.obs containing cluster/category labels. alphas : Sequence[float] List of inverse alpha values to compute shapes for. meta_cluster : pd.DataFrame | None Optional DataFrame with cluster metadata including 'color' column. If not provided, colors will be extracted from adata.uns[f'{cat}_colors'] if available, otherwise defaults to black.

Returns

gpd.GeoDataFrame GeoDataFrame with alpha shapes for each cluster at each alpha value.

Source code in src/celldega/nbhd/alpha_shapes.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def alpha_shape_cell_clusters(
    adata: ad.AnnData,
    cat: str = "cluster",
    alphas: Sequence[float] = (100, 150, 200, 250, 300, 350),
    meta_cluster: pd.DataFrame | None = None,
) -> gpd.GeoDataFrame:
    """
    Compute alpha shapes for each cluster in the cell metadata.

    Parameters
    ----------
    adata : AnnData
        AnnData object with cell metadata in obs and spatial coordinates in obsm["spatial"].
    cat : str
        Column name in adata.obs containing cluster/category labels.
    alphas : Sequence[float]
        List of inverse alpha values to compute shapes for.
    meta_cluster : pd.DataFrame | None
        Optional DataFrame with cluster metadata including 'color' column.
        If not provided, colors will be extracted from adata.uns[f'{cat}_colors']
        if available, otherwise defaults to black.

    Returns
    -------
    gpd.GeoDataFrame
        GeoDataFrame with alpha shapes for each cluster at each alpha value.
    """

    meta_cell = adata.obs

    coords = adata.obsm["spatial"]
    meta_cell["geometry"] = list(coords)

    # Build color lookup from adata.uns if meta_cluster not provided
    adata_color_dict: dict[str, str] = {}
    if meta_cluster is None:
        color_key = f"{cat}_colors"
        if color_key in adata.uns:
            # Get categories and their corresponding colors
            categories = (
                adata.obs[cat].cat.categories
                if hasattr(adata.obs[cat], "cat")
                else adata.obs[cat].unique()
            )
            colors = adata.uns[color_key]
            # Map categories to colors (colors are in same order as categories)
            for i, category in enumerate(categories):
                if i < len(colors):
                    adata_color_dict[str(category)] = colors[i]

    gdf_alpha = gpd.GeoDataFrame()

    for inv_alpha in alphas:
        for inst_cluster in meta_cell[cat].unique():
            inst_clust = meta_cell[meta_cell[cat] == inst_cluster]
            if inst_clust.shape[0] > 3:
                nested_array = inst_clust["geometry"].values
                flat_array = np.vstack(nested_array)
                inst_shape = alpha_shape(flat_array, inv_alpha)

                inst_name = f"{inst_cluster}_{inv_alpha}"

                gdf_alpha.loc[inst_name, "name"] = inst_name
                gdf_alpha.loc[inst_name, "cat"] = inst_cluster
                gdf_alpha.loc[inst_name, "geometry"] = inst_shape
                gdf_alpha.loc[inst_name, "inv_alpha"] = int(inv_alpha)

                # Look up color: meta_cluster > adata.uns colors > default black
                inst_cluster_str = str(inst_cluster)
                if meta_cluster is not None and inst_cluster in meta_cluster.index:
                    gdf_alpha.loc[inst_name, "color"] = meta_cluster.loc[inst_cluster, "color"]
                elif inst_cluster_str in adata_color_dict:
                    gdf_alpha.loc[inst_name, "color"] = adata_color_dict[inst_cluster_str]
                else:
                    gdf_alpha.loc[inst_name, "color"] = "#000000"

    gdf_alpha["geometry"] = gdf_alpha["geometry"].apply(
        lambda geom: _round_coordinates(geom, precision=2)
    )
    gdf_alpha["area"] = gdf_alpha.area

    return gdf_alpha.loc[gdf_alpha.area.sort_values(ascending=False).index.tolist()]

calc_grad_nbhd_from_roi(polygon, gdf_reference, band_width=300)

Generate concentric rings (neighborhood bands) from a polygon, clipped to the convex hull of a reference GeoDataFrame.

Parameters

polygon : GeoDataFrame GeoDataFrame containing a single polygon. gdf_reference : GeoDataFrame Reference GeoDataFrame used to calculate the boundary area (convex hull). band_width : float Width of each band in microns (default: 300).

Returns

GeoDataFrame GeoDataFrame with columns for band (index of ring) and geometry (polygon).

Source code in src/celldega/nbhd/gradient.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def calc_grad_nbhd_from_roi(
    polygon: gpd.GeoDataFrame,
    gdf_reference: gpd.GeoDataFrame,
    band_width: float = 300,
) -> gpd.GeoDataFrame:
    """
    Generate concentric rings (neighborhood bands) from a polygon,
    clipped to the convex hull of a reference GeoDataFrame.

    Parameters
    ----------
    polygon : GeoDataFrame
        GeoDataFrame containing a single polygon.
    gdf_reference : GeoDataFrame
        Reference GeoDataFrame used to calculate the boundary area (convex hull).
    band_width : float
        Width of each band in microns (default: 300).

    Returns
    -------
    GeoDataFrame
        GeoDataFrame with columns for band (index of ring) and geometry (polygon).
    """
    if len(polygon) != 1:
        raise ValueError("Input polygon GeoDataFrame must contain exactly one polygon.")

    roi_polygon = polygon.geometry.iloc[0]
    boundary = gdf_reference.unary_union.convex_hull

    bands = []
    current_polygon = roi_polygon
    band_idx = 0

    # Add the original polygon as band 0
    bands.append({"band": f"grad_{band_idx}", "geometry": roi_polygon})

    while True:
        band_idx += 1
        # Generate next ring
        next_buffer = current_polygon.buffer(band_width)
        ring = next_buffer.difference(current_polygon)

        # Clip the ring to the convex hull boundary
        ring_clipped = ring.intersection(boundary)

        # Stop if no part of the ring remains within boundary
        if ring_clipped.is_empty:
            break

        bands.append({"band": f"grad_{band_idx}", "geometry": ring_clipped})
        current_polygon = next_buffer

    gdf = gpd.GeoDataFrame(bands, crs=polygon.crs)
    gdf["band_width"] = band_width

    return gdf

calc_nbhd_bordering(gdf_nbhd, metric='border_ratio', name_col='name', category='leiden')

Calculate pairwise border relationships between neighborhoods as a neighborhood-by-neighborhood matrix.

Parameters

gdf_nbhd : gpd.GeoDataFrame GeoDataFrame containing neighborhood geometries. Must have a geometry column and a column specified by name_col for neighborhood identifiers. metric : str, default "border_ratio" The border metric to compute: - "border_ratio": Border length over self (row) perimeter. Value = shared_border_length / row_perimeter. Asymmetric measure showing what fraction of the row neighborhood's perimeter is shared with the column neighborhood. - "border_length": Raw shared border length in linear units. Symmetric measure of the absolute length of shared boundary. - "binary": Binary adjacency (1 if touching, 0 otherwise). Symmetric measure indicating whether neighborhoods share a border. name_col : str, default "name" Column name containing neighborhood identifiers. category : str, default "leiden" Name of the category that neighborhoods represent (e.g., "leiden", "cell_type"). This is used to name the category column in obs/var and the colors in uns.

Returns

AnnData AnnData object with shape (n_neighborhoods, n_neighborhoods) where: - X: Matrix of border metric values - obs: DataFrame indexed by neighborhood names (rows) - var: DataFrame indexed by neighborhood names (columns) - obs["perimeter"]: Perimeter of each neighborhood - obs[category]: Category value for each neighborhood - uns["metric"]: The metric used for computation

Notes

Shared border length is computed as the length of the intersection of the two neighborhood boundaries (perimeters). This works for neighborhoods that touch but don't overlap. For overlapping neighborhoods, consider using calc_nbhd_overlap instead.

Examples

adata_border = dega.nbhd.calc_nbhd_bordering(gdf_nbhd, metric="border_ratio") adata_adj = dega.nbhd.calc_nbhd_bordering(gdf_nbhd, metric="binary") mat = dega.clust.Matrix(adata_border, row_entity="nbhd", col_entity="nbhd")

Source code in src/celldega/nbhd/neighborhoods.py
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
def calc_nbhd_bordering(
    gdf_nbhd: gpd.GeoDataFrame,
    metric: str = "border_ratio",
    name_col: str = "name",
    category: str = "leiden",
) -> AnnData:
    """
    Calculate pairwise border relationships between neighborhoods as a neighborhood-by-neighborhood matrix.

    Parameters
    ----------
    gdf_nbhd : gpd.GeoDataFrame
        GeoDataFrame containing neighborhood geometries. Must have a geometry column
        and a column specified by `name_col` for neighborhood identifiers.
    metric : str, default "border_ratio"
        The border metric to compute:
        - "border_ratio": Border length over self (row) perimeter.
          Value = shared_border_length / row_perimeter.
          Asymmetric measure showing what fraction of the row neighborhood's
          perimeter is shared with the column neighborhood.
        - "border_length": Raw shared border length in linear units.
          Symmetric measure of the absolute length of shared boundary.
        - "binary": Binary adjacency (1 if touching, 0 otherwise).
          Symmetric measure indicating whether neighborhoods share a border.
    name_col : str, default "name"
        Column name containing neighborhood identifiers.
    category : str, default "leiden"
        Name of the category that neighborhoods represent (e.g., "leiden", "cell_type").
        This is used to name the category column in obs/var and the colors in uns.

    Returns
    -------
    AnnData
        AnnData object with shape (n_neighborhoods, n_neighborhoods) where:
        - `X`: Matrix of border metric values
        - `obs`: DataFrame indexed by neighborhood names (rows)
        - `var`: DataFrame indexed by neighborhood names (columns)
        - `obs["perimeter"]`: Perimeter of each neighborhood
        - `obs[category]`: Category value for each neighborhood
        - `uns["metric"]`: The metric used for computation

    Notes
    -----
    Shared border length is computed as the length of the intersection of the
    two neighborhood boundaries (perimeters). This works for neighborhoods that
    touch but don't overlap. For overlapping neighborhoods, consider using
    `calc_nbhd_overlap` instead.

    Examples
    --------
    >>> adata_border = dega.nbhd.calc_nbhd_bordering(gdf_nbhd, metric="border_ratio")
    >>> adata_adj = dega.nbhd.calc_nbhd_bordering(gdf_nbhd, metric="binary")
    >>> mat = dega.clust.Matrix(adata_border, row_entity="nbhd", col_entity="nbhd")
    """
    print(f"Calculating NBN-B ({metric})")

    valid_metrics = {"border_ratio", "border_length", "binary"}
    if metric not in valid_metrics:
        raise ValueError(f"metric must be one of {valid_metrics}, got '{metric}'")

    gdf_nbhd = gdf_nbhd.copy()
    gdf_nbhd["geometry"] = gdf_nbhd["geometry"].buffer(0)

    names = gdf_nbhd[name_col].tolist()

    # Pre-compute perimeters for efficiency
    perimeters = {row[name_col]: row["geometry"].length for _, row in gdf_nbhd.iterrows()}

    # Build a lookup for geometries
    geom_lookup = {row[name_col]: row["geometry"] for _, row in gdf_nbhd.iterrows()}

    # Initialize matrix with zeros
    matrix = pd.DataFrame(0.0, index=names, columns=names)

    # Use spatial index to find touching pairs efficiently
    gdf_touches = gpd.sjoin(gdf_nbhd, gdf_nbhd, how="inner", predicate="touches")
    gdf_touches = gdf_touches[gdf_touches[f"{name_col}_left"] != gdf_touches[f"{name_col}_right"]]

    # Get unique pairs
    seen_pairs: set[tuple[str, str]] = set()
    for _, row in gdf_touches.iterrows():
        nb1 = row[f"{name_col}_left"]
        nb2 = row[f"{name_col}_right"]

        # Skip if we've already processed this pair
        pair_key = tuple(sorted((nb1, nb2)))
        if pair_key in seen_pairs:
            continue
        seen_pairs.add(pair_key)

        geom1 = geom_lookup[nb1]
        geom2 = geom_lookup[nb2]

        # Compute shared border length as intersection of boundaries
        boundary1 = geom1.boundary
        boundary2 = geom2.boundary
        shared_border = boundary1.intersection(boundary2)
        border_length = shared_border.length if not shared_border.is_empty else 0.0

        if metric == "binary":
            matrix.loc[nb1, nb2] = 1.0
            matrix.loc[nb2, nb1] = 1.0
        elif metric == "border_length":
            matrix.loc[nb1, nb2] = round(border_length, 2)
            matrix.loc[nb2, nb1] = round(border_length, 2)  # Symmetric
        elif metric == "border_ratio":
            # Asymmetric: what fraction of each neighborhood's perimeter is shared
            perim1 = perimeters[nb1]
            perim2 = perimeters[nb2]
            ratio_1 = border_length / perim1 if perim1 > 0 else 0.0
            ratio_2 = border_length / perim2 if perim2 > 0 else 0.0
            matrix.loc[nb1, nb2] = round(ratio_1, 4)
            matrix.loc[nb2, nb1] = round(ratio_2, 4)

    # Build AnnData
    adata_nbn = AnnData(
        X=matrix.values,
        obs=pd.DataFrame(index=matrix.index),
        var=pd.DataFrame(index=matrix.columns),
    )
    adata_nbn.obs["perimeter"] = [perimeters[n] for n in matrix.index]
    adata_nbn.uns["metric"] = metric
    adata_nbn.uns["category"] = category

    # Add category and color metadata from gdf_nbhd if available
    nbhd_lookup = gdf_nbhd.set_index(name_col)

    if "cat" in gdf_nbhd.columns:
        # Use the category parameter name (e.g., "leiden") instead of "cat"
        adata_nbn.obs[category] = [
            str(nbhd_lookup.loc[n, "cat"]) if n in nbhd_lookup.index else str(n)
            for n in matrix.index
        ]
        adata_nbn.var[category] = [
            str(nbhd_lookup.loc[n, "cat"]) if n in nbhd_lookup.index else str(n)
            for n in matrix.columns
        ]

    if "color" in gdf_nbhd.columns:
        obs_colors = [
            nbhd_lookup.loc[n, "color"] if n in nbhd_lookup.index else "#808080"
            for n in matrix.index
        ]
        adata_nbn.obs["color"] = obs_colors
        adata_nbn.var["color"] = [
            nbhd_lookup.loc[n, "color"] if n in nbhd_lookup.index else "#808080"
            for n in matrix.columns
        ]
        # Store colors in uns using the category name (e.g., "leiden_colors")
        if "cat" in gdf_nbhd.columns:
            unique_cats = adata_nbn.obs[category].unique()
            cat_color_map = dict(zip(adata_nbn.obs[category], obs_colors, strict=False))
            adata_nbn.uns[f"{category}_colors"] = [
                cat_color_map.get(c, "#808080") for c in unique_cats
            ]

    return adata_nbn

calc_nbhd_by_gene(gdf_nbhd, by='cell', adata=None, data_dir=None, nbhd_col='name', min_cells=1)

Calculate neighborhood-by-gene expression matrix.

Computes gene expression values for each neighborhood, either from cell-level expression data (mean expression of cells within each neighborhood) or from raw transcript counts (cell-free mode).

Parameters

gdf_nbhd : gpd.GeoDataFrame GeoDataFrame containing neighborhood geometries. Must have a geometry column and a column specified by nbhd_col for neighborhood identifiers. by : str, default "cell" Method for calculating gene expression: - "cell": Mean expression of cells within each neighborhood (requires adata) - "cell-free": Transcript counts within each neighborhood (requires data_dir) adata : AnnData, optional AnnData object with cell data. Required when by="cell". Must have spatial coordinates in obsm["spatial"]. data_dir : str, optional Path to directory containing transcripts.parquet. Required when by="cell-free". nbhd_col : str, default "name" Column in gdf_nbhd containing neighborhood identifiers. min_cells : int, default 1 Minimum number of cells/transcripts required within a neighborhood to include it in the output. Only applies when by="cell".

Returns

AnnData AnnData object with shape (n_neighborhoods, n_genes) where: - X: Matrix of gene expression values (mean for cell-derived, counts for cell-free) - obs: DataFrame indexed by neighborhood names - var: DataFrame indexed by gene names - obs["n_cells"]: Cell count per neighborhood (when by="cell") - uns["by"]: Method used ("cell" or "cell-free")

Examples

Cell-derived gene expression per neighborhood

adata_nbg = dega.nbhd.calc_nbhd_by_gene(gdf_alpha, by="cell", adata=adata)

Cell-free transcript counts per neighborhood

adata_nbg = dega.nbhd.calc_nbhd_by_gene(gdf_alpha, by="cell-free", data_dir="./data")

For cluster-specific analysis, pre-filter the AnnData

adata_cluster0 = adata[adata.obs["leiden"] == "0"] adata_nbg = dega.nbhd.calc_nbhd_by_gene(gdf_alpha, by="cell", adata=adata_cluster0)

Notes

For cluster-specific gene expression analysis, filter your AnnData object to include only cells from the desired cluster before calling this function.

Source code in src/celldega/nbhd/neighborhoods.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def calc_nbhd_by_gene(
    gdf_nbhd: gpd.GeoDataFrame,
    by: str = "cell",
    adata: AnnData | None = None,
    data_dir: str | None = None,
    nbhd_col: str = "name",
    min_cells: int = 1,
) -> AnnData:
    """
    Calculate neighborhood-by-gene expression matrix.

    Computes gene expression values for each neighborhood, either from cell-level
    expression data (mean expression of cells within each neighborhood) or from
    raw transcript counts (cell-free mode).

    Parameters
    ----------
    gdf_nbhd : gpd.GeoDataFrame
        GeoDataFrame containing neighborhood geometries. Must have a geometry column
        and a column specified by `nbhd_col` for neighborhood identifiers.
    by : str, default "cell"
        Method for calculating gene expression:
        - "cell": Mean expression of cells within each neighborhood (requires `adata`)
        - "cell-free": Transcript counts within each neighborhood (requires `data_dir`)
    adata : AnnData, optional
        AnnData object with cell data. Required when `by="cell"`. Must have spatial
        coordinates in `obsm["spatial"]`.
    data_dir : str, optional
        Path to directory containing `transcripts.parquet`. Required when
        `by="cell-free"`.
    nbhd_col : str, default "name"
        Column in `gdf_nbhd` containing neighborhood identifiers.
    min_cells : int, default 1
        Minimum number of cells/transcripts required within a neighborhood to
        include it in the output. Only applies when `by="cell"`.

    Returns
    -------
    AnnData
        AnnData object with shape (n_neighborhoods, n_genes) where:
        - `X`: Matrix of gene expression values (mean for cell-derived, counts for cell-free)
        - `obs`: DataFrame indexed by neighborhood names
        - `var`: DataFrame indexed by gene names
        - `obs["n_cells"]`: Cell count per neighborhood (when `by="cell"`)
        - `uns["by"]`: Method used ("cell" or "cell-free")

    Examples
    --------
    >>> # Cell-derived gene expression per neighborhood
    >>> adata_nbg = dega.nbhd.calc_nbhd_by_gene(gdf_alpha, by="cell", adata=adata)
    >>>
    >>> # Cell-free transcript counts per neighborhood
    >>> adata_nbg = dega.nbhd.calc_nbhd_by_gene(gdf_alpha, by="cell-free", data_dir="./data")
    >>>
    >>> # For cluster-specific analysis, pre-filter the AnnData
    >>> adata_cluster0 = adata[adata.obs["leiden"] == "0"]
    >>> adata_nbg = dega.nbhd.calc_nbhd_by_gene(gdf_alpha, by="cell", adata=adata_cluster0)

    Notes
    -----
    For cluster-specific gene expression analysis, filter your AnnData object
    to include only cells from the desired cluster before calling this function.
    """
    if by == "cell":
        if adata is None:
            raise ValueError("adata is required when by='cell'")

        print("Calculating neighborhood-by-gene (cell-derived)")

        gene_list = adata.var.index
        gene_exp = pd.DataFrame(
            adata.X.toarray() if hasattr(adata.X, "toarray") else adata.X,
            columns=gene_list,
            index=adata.obs_names,
        )

        gdf_cell = gpd.GeoDataFrame(
            data=gene_exp,
            geometry=gpd.points_from_xy(*adata.obsm["spatial"].T[:2]),
        )

        # Spatial join cells to neighborhoods
        joined = gdf_cell.sjoin(
            gdf_nbhd[[nbhd_col, "geometry"]],
            how="left",
            predicate="within",
        )
        joined.drop(columns=["index_right", "geometry"], inplace=True, errors="ignore")

        # Count cells per neighborhood for filtering
        cell_counts = joined[nbhd_col].value_counts()
        valid_nbhds = cell_counts[cell_counts >= min_cells].index
        joined = joined[joined[nbhd_col].isin(valid_nbhds)]

        # Compute mean expression per neighborhood
        df_result = joined.groupby(nbhd_col)[list(gene_list)].mean()

        # Filter gdf_nbhd to only include valid neighborhoods
        filtered_gdf = gdf_nbhd[gdf_nbhd[nbhd_col].isin(valid_nbhds)].reset_index(drop=True)

        # Reindex to preserve order
        df_result = df_result.reindex(filtered_gdf[nbhd_col]).fillna(0)

        # Build AnnData
        adata_nbg = AnnData(
            X=df_result.values,
            obs=pd.DataFrame(index=df_result.index),
            var=pd.DataFrame(index=df_result.columns),
        )

        # Add cell counts
        adata_nbg.obs["n_cells"] = [cell_counts.get(n, 0) for n in adata_nbg.obs.index]

    elif by == "cell-free":
        if data_dir is None:
            raise ValueError("data_dir is required when by='cell-free'")

        print("Calculating neighborhood-by-gene (cell-free)")

        df_trx = pd.read_parquet(
            f"{data_dir}/transcripts.parquet",
            columns=["feature_name", "x_location", "y_location"],
            engine="pyarrow",
        )
        geometry = gpd.points_from_xy(df_trx["x_location"], df_trx["y_location"])
        gdf_trx = gpd.GeoDataFrame(df_trx[["feature_name"]], geometry=geometry)
        gdf_trx = gdf_trx.sjoin(gdf_nbhd[[nbhd_col, "geometry"]], how="left", predicate="within")

        df_result = (
            gdf_trx.groupby([nbhd_col, "feature_name"])
            .size()
            .unstack(fill_value=0)
            .rename_axis(None, axis=1)
            .reindex(gdf_nbhd[nbhd_col])
            .fillna(0)
            .astype(int)
        )

        # Filter by min_cells (here it's min transcripts total)
        trx_counts = df_result.sum(axis=1)
        valid_nbhds = trx_counts[trx_counts >= min_cells].index
        df_result = df_result.loc[valid_nbhds]

        filtered_gdf = gdf_nbhd[gdf_nbhd[nbhd_col].isin(valid_nbhds)].reset_index(drop=True)

        # Build AnnData
        adata_nbg = AnnData(
            X=df_result.values,
            obs=pd.DataFrame(index=df_result.index),
            var=pd.DataFrame(index=df_result.columns),
        )

        # Add transcript counts
        adata_nbg.obs["n_transcripts"] = trx_counts.loc[valid_nbhds].values

    else:
        raise ValueError("by must be 'cell' or 'cell-free'")

    # Store metadata common to both modes
    adata_nbg.uns["by"] = by

    # Add neighborhood category and colors from gdf_nbhd if available
    if "cat" in filtered_gdf.columns:
        nbhd_cat_lookup = dict(
            zip(filtered_gdf[nbhd_col], filtered_gdf["cat"].astype(str), strict=False)
        )
        adata_nbg.obs["cat"] = [nbhd_cat_lookup.get(n, str(n)) for n in adata_nbg.obs.index]

    if "color" in filtered_gdf.columns:
        nbhd_color_lookup = dict(zip(filtered_gdf[nbhd_col], filtered_gdf["color"], strict=False))
        adata_nbg.obs["color"] = [nbhd_color_lookup.get(n, "#808080") for n in adata_nbg.obs.index]
        # Store colors in uns as well
        adata_nbg.uns["nbhd_colors"] = [
            nbhd_color_lookup.get(n, "#808080") for n in adata_nbg.obs.index
        ]

    return adata_nbg

calc_nbhd_by_pop(adata, gdf_nbhd, category='leiden', nbhd_col='name', min_cells=5, output='percentage')

Calculate cell-level population distribution of neighborhoods.

Computes a neighborhood-by-population matrix showing the distribution of cell categories (e.g., clusters, cell types) within each neighborhood.

Parameters

adata : AnnData AnnData object containing cell data. Must have spatial coordinates in obsm["spatial"] and the category column in obs. gdf_nbhd : gpd.GeoDataFrame GeoDataFrame containing neighborhood geometries. Must have a geometry column and a column specified by nbhd_col for neighborhood identifiers. category : str, default "leiden" Column name in adata.obs containing cell category labels (e.g., "leiden", "cell_type", "cluster"). nbhd_col : str, default "name" Column name in gdf_nbhd containing neighborhood identifiers. min_cells : int, default 5 Minimum number of cells required within a neighborhood to include it in the output. Neighborhoods with fewer cells are filtered out. output : str, default "percentage" Type of values in the output matrix: - "percentage": Fraction of cells per category (sums to 1 per neighborhood) - "counts": Raw cell counts per category

Returns

AnnData AnnData object with shape (n_neighborhoods, n_categories) where: - X: Matrix of population distributions (percentages or counts) - obs: DataFrame indexed by neighborhood names - var: DataFrame indexed by category names - obs["n_cells"]: Total cell count per neighborhood

Examples

adata_nbp = dega.nbhd.calc_nbhd_by_pop(adata, gdf_alpha, category="leiden") adata_nbp.shape (42, 18) # 42 neighborhoods, 18 clusters

Source code in src/celldega/nbhd/neighborhoods.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def calc_nbhd_by_pop(
    adata: AnnData,
    gdf_nbhd: gpd.GeoDataFrame,
    category: str = "leiden",
    nbhd_col: str = "name",
    min_cells: int = 5,
    output: str = "percentage",
) -> AnnData:
    """
    Calculate cell-level population distribution of neighborhoods.

    Computes a neighborhood-by-population matrix showing the distribution of cell
    categories (e.g., clusters, cell types) within each neighborhood.

    Parameters
    ----------
    adata : AnnData
        AnnData object containing cell data. Must have spatial coordinates in
        `obsm["spatial"]` and the category column in `obs`.
    gdf_nbhd : gpd.GeoDataFrame
        GeoDataFrame containing neighborhood geometries. Must have a geometry column
        and a column specified by `nbhd_col` for neighborhood identifiers.
    category : str, default "leiden"
        Column name in `adata.obs` containing cell category labels (e.g., "leiden",
        "cell_type", "cluster").
    nbhd_col : str, default "name"
        Column name in `gdf_nbhd` containing neighborhood identifiers.
    min_cells : int, default 5
        Minimum number of cells required within a neighborhood to include it in
        the output. Neighborhoods with fewer cells are filtered out.
    output : str, default "percentage"
        Type of values in the output matrix:
        - "percentage": Fraction of cells per category (sums to 1 per neighborhood)
        - "counts": Raw cell counts per category

    Returns
    -------
    AnnData
        AnnData object with shape (n_neighborhoods, n_categories) where:
        - `X`: Matrix of population distributions (percentages or counts)
        - `obs`: DataFrame indexed by neighborhood names
        - `var`: DataFrame indexed by category names
        - `obs["n_cells"]`: Total cell count per neighborhood

    Examples
    --------
    >>> adata_nbp = dega.nbhd.calc_nbhd_by_pop(adata, gdf_alpha, category="leiden")
    >>> adata_nbp.shape
    (42, 18)  # 42 neighborhoods, 18 clusters
    """
    print("Calculating NBP")

    # Validate inputs
    required_nbhd = {"geometry", nbhd_col}
    if not required_nbhd.issubset(gdf_nbhd.columns):
        raise ValueError(
            f"gdf_nbhd missing required columns: {required_nbhd - set(gdf_nbhd.columns)}"
        )
    if category not in adata.obs.columns:
        raise ValueError(f"adata.obs missing required '{category}' column")
    if "spatial" not in adata.obsm:
        raise ValueError("adata.obsm missing 'spatial' coordinates")

    # Build GeoDataFrame from adata with the specified category
    # No CRS set - using micron imaging coordinates, not geospatial
    gdf_cell = gpd.GeoDataFrame(
        {category: adata.obs[category].values},
        geometry=gpd.points_from_xy(*adata.obsm["spatial"].T[:2]),
    )

    # Spatial join: assign each cell to a neighborhood
    sjoin_df = gdf_cell.sjoin(gdf_nbhd[[nbhd_col, "geometry"]], how="left", predicate="within")

    # Filter neighborhoods with at least min_cells
    cell_counts_per_nbhd = sjoin_df[nbhd_col].value_counts()
    valid_nbhd_names = cell_counts_per_nbhd[cell_counts_per_nbhd >= min_cells].index
    sjoin_df = sjoin_df[sjoin_df[nbhd_col].isin(valid_nbhd_names)]

    # Count cells per (neighborhood, cluster)
    counts = (
        sjoin_df.groupby([nbhd_col, category])
        .size()
        .unstack(fill_value=0)
        .pipe(lambda df: df.set_axis(df.columns.astype(str), axis=1))
    )

    # Reindex to preserve order of filtered gdf_nbhd
    filtered_gdf_nbhd = gdf_nbhd[gdf_nbhd[nbhd_col].isin(valid_nbhd_names)].reset_index(drop=True)
    counts = counts.reindex(filtered_gdf_nbhd[nbhd_col]).fillna(0).astype(int)

    # Calculate output values
    if output == "percentage":
        values = counts.div(counts.sum(axis=1), axis=0).fillna(0).values
    else:
        values = counts.values

    # Build AnnData
    adata_nbp = AnnData(
        X=values,
        obs=pd.DataFrame(index=counts.index),
        var=pd.DataFrame(index=counts.columns),
    )
    adata_nbp.obs["n_cells"] = counts.sum(axis=1).values
    adata_nbp.uns["category"] = category

    # Add category as a var column (columns represent categories)
    adata_nbp.var[category] = adata_nbp.var.index.astype(str)

    # Also add category to obs - neighborhoods are named by their cluster
    # Look up category from gdf_nbhd if available
    if "cat" in filtered_gdf_nbhd.columns:
        nbhd_cat_lookup = dict(
            zip(filtered_gdf_nbhd[nbhd_col], filtered_gdf_nbhd["cat"].astype(str), strict=False)
        )
        adata_nbp.obs[category] = [nbhd_cat_lookup.get(n, str(n)) for n in adata_nbp.obs.index]
    else:
        # Default: use the index (neighborhood name) as the category
        adata_nbp.obs[category] = adata_nbp.obs.index.astype(str)

    # Copy colors from source adata if available
    color_key = f"{category}_colors"
    color_dict: dict[str, str] = {}
    if color_key in adata.uns:
        # Map colors to the category values
        src_colors = adata.uns[color_key]
        if hasattr(adata.obs[category], "cat"):
            src_categories = list(adata.obs[category].cat.categories.astype(str))
        else:
            src_categories = list(adata.obs[category].unique().astype(str))

        color_dict = {
            str(cat): src_colors[i] for i, cat in enumerate(src_categories) if i < len(src_colors)
        }

        # Assign colors to var (columns)
        adata_nbp.var["color"] = [color_dict.get(str(c), "#808080") for c in adata_nbp.var.index]
        adata_nbp.uns[color_key] = [color_dict.get(str(c), "#808080") for c in adata_nbp.var.index]

        # Also assign colors to obs (rows/neighborhoods)
        adata_nbp.obs["color"] = [
            color_dict.get(str(c), "#808080") for c in adata_nbp.obs[category]
        ]

    return adata_nbp

calc_nbhd_overlap(gdf_nbhd, metric='iou', name_col='name', category='leiden')

Calculate pairwise overlap between all neighborhoods as a neighborhood-by-neighborhood matrix.

Parameters

gdf_nbhd : gpd.GeoDataFrame GeoDataFrame containing neighborhood geometries. Must have a geometry column and a column specified by name_col for neighborhood identifiers. metric : str, default "iou" The overlap metric to compute: - "iou": Intersection over Union. Value = intersection_area / union_area. Symmetric measure ranging from 0 (no overlap) to 1 (identical geometries). - "ioa": Intersection over Area (of self/row). Value = intersection_area / row_area. Asymmetric measure showing what fraction of the row neighborhood overlaps with the column neighborhood. Useful for containment analysis. - "intersection": Raw intersection area in square units. name_col : str, default "name" Column name containing neighborhood identifiers. category : str, default "leiden" Name of the category that neighborhoods represent (e.g., "leiden", "cell_type"). This is used to name the category column in obs/var and the colors in uns.

Returns

AnnData AnnData object with shape (n_neighborhoods, n_neighborhoods) where: - X: Matrix of overlap values - obs: DataFrame indexed by neighborhood names (rows) - var: DataFrame indexed by neighborhood names (columns) - obs["area"]: Area of each neighborhood - obs[category]: Category value for each neighborhood - uns["metric"]: The metric used for computation

Examples

adata_iou = dega.nbhd.calc_nbhd_overlap(gdf_nbhd, metric="iou") adata_ioa = dega.nbhd.calc_nbhd_overlap(gdf_nbhd, metric="ioa") mat = dega.clust.Matrix(adata_iou, row_entity="nbhd", col_entity="nbhd")

Source code in src/celldega/nbhd/neighborhoods.py
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
def calc_nbhd_overlap(
    gdf_nbhd: gpd.GeoDataFrame,
    metric: str = "iou",
    name_col: str = "name",
    category: str = "leiden",
) -> AnnData:
    """
    Calculate pairwise overlap between all neighborhoods as a neighborhood-by-neighborhood matrix.

    Parameters
    ----------
    gdf_nbhd : gpd.GeoDataFrame
        GeoDataFrame containing neighborhood geometries. Must have a geometry column
        and a column specified by `name_col` for neighborhood identifiers.
    metric : str, default "iou"
        The overlap metric to compute:
        - "iou": Intersection over Union. Value = intersection_area / union_area.
          Symmetric measure ranging from 0 (no overlap) to 1 (identical geometries).
        - "ioa": Intersection over Area (of self/row). Value = intersection_area / row_area.
          Asymmetric measure showing what fraction of the row neighborhood overlaps
          with the column neighborhood. Useful for containment analysis.
        - "intersection": Raw intersection area in square units.
    name_col : str, default "name"
        Column name containing neighborhood identifiers.
    category : str, default "leiden"
        Name of the category that neighborhoods represent (e.g., "leiden", "cell_type").
        This is used to name the category column in obs/var and the colors in uns.

    Returns
    -------
    AnnData
        AnnData object with shape (n_neighborhoods, n_neighborhoods) where:
        - `X`: Matrix of overlap values
        - `obs`: DataFrame indexed by neighborhood names (rows)
        - `var`: DataFrame indexed by neighborhood names (columns)
        - `obs["area"]`: Area of each neighborhood
        - `obs[category]`: Category value for each neighborhood
        - `uns["metric"]`: The metric used for computation

    Examples
    --------
    >>> adata_iou = dega.nbhd.calc_nbhd_overlap(gdf_nbhd, metric="iou")
    >>> adata_ioa = dega.nbhd.calc_nbhd_overlap(gdf_nbhd, metric="ioa")
    >>> mat = dega.clust.Matrix(adata_iou, row_entity="nbhd", col_entity="nbhd")
    """
    print(f"Calculating NBN-O ({metric})")

    valid_metrics = {"iou", "ioa", "intersection"}
    if metric not in valid_metrics:
        raise ValueError(f"metric must be one of {valid_metrics}, got '{metric}'")

    gdf_nbhd = gdf_nbhd.copy()
    gdf_nbhd["geometry"] = gdf_nbhd["geometry"].buffer(0)

    names = gdf_nbhd[name_col].tolist()

    # Pre-compute areas for efficiency
    areas = {row[name_col]: row["geometry"].area for _, row in gdf_nbhd.iterrows()}

    # Initialize matrix with zeros
    matrix = pd.DataFrame(0.0, index=names, columns=names)

    # Set diagonal values
    for name in names:
        if metric in ("iou", "ioa"):
            matrix.loc[name, name] = 1.0
        else:  # intersection
            matrix.loc[name, name] = round(areas[name], 2)

    # Build a lookup for geometries
    geom_lookup = {row[name_col]: row["geometry"] for _, row in gdf_nbhd.iterrows()}

    # Compute pairwise overlaps
    for nb1, nb2 in combinations(names, 2):
        geom1 = geom_lookup[nb1]
        geom2 = geom_lookup[nb2]
        intersection = geom1.intersection(geom2)

        if intersection.is_empty or intersection.area == 0:
            continue

        intersection_area = intersection.area
        area1 = areas[nb1]
        area2 = areas[nb2]

        if metric == "iou":
            union_area = geom1.union(geom2).area
            value = intersection_area / union_area if union_area > 0 else 0.0
            matrix.loc[nb1, nb2] = round(value, 4)
            matrix.loc[nb2, nb1] = round(value, 4)  # Symmetric
        elif metric == "ioa":
            # Asymmetric: row's perspective (what fraction of row overlaps with col)
            value_1_to_2 = intersection_area / area1 if area1 > 0 else 0.0
            value_2_to_1 = intersection_area / area2 if area2 > 0 else 0.0
            matrix.loc[nb1, nb2] = round(value_1_to_2, 4)
            matrix.loc[nb2, nb1] = round(value_2_to_1, 4)
        else:  # intersection
            matrix.loc[nb1, nb2] = round(intersection_area, 2)
            matrix.loc[nb2, nb1] = round(intersection_area, 2)  # Symmetric

    # Build AnnData
    adata_nbn = AnnData(
        X=matrix.values,
        obs=pd.DataFrame(index=matrix.index),
        var=pd.DataFrame(index=matrix.columns),
    )
    adata_nbn.obs["area"] = [areas[n] for n in matrix.index]
    adata_nbn.uns["metric"] = metric
    adata_nbn.uns["category"] = category

    # Add category and color metadata from gdf_nbhd if available
    # Look up by name_col to get cat and color for each neighborhood
    nbhd_lookup = gdf_nbhd.set_index(name_col)

    if "cat" in gdf_nbhd.columns:
        # Use the category parameter name (e.g., "leiden") instead of "cat"
        adata_nbn.obs[category] = [
            str(nbhd_lookup.loc[n, "cat"]) if n in nbhd_lookup.index else str(n)
            for n in matrix.index
        ]
        adata_nbn.var[category] = [
            str(nbhd_lookup.loc[n, "cat"]) if n in nbhd_lookup.index else str(n)
            for n in matrix.columns
        ]

    if "color" in gdf_nbhd.columns:
        obs_colors = [
            nbhd_lookup.loc[n, "color"] if n in nbhd_lookup.index else "#808080"
            for n in matrix.index
        ]
        adata_nbn.obs["color"] = obs_colors
        adata_nbn.var["color"] = [
            nbhd_lookup.loc[n, "color"] if n in nbhd_lookup.index else "#808080"
            for n in matrix.columns
        ]
        # Store colors in uns using the category name (e.g., "leiden_colors")
        if "cat" in gdf_nbhd.columns:
            unique_cats = adata_nbn.obs[category].unique()
            cat_color_map = dict(zip(adata_nbn.obs[category], obs_colors, strict=False))
            adata_nbn.uns[f"{category}_colors"] = [
                cat_color_map.get(c, "#808080") for c in unique_cats
            ]

    return adata_nbn

filter_alpha_shapes(gdf_alpha, alpha, min_area=0, clean_names=True)

Filter alpha shapes by a specific alpha value and optionally clean up names.

Alpha shapes computed by alpha_shape_cell_clusters have names in the format {category}_{alpha} (e.g., "cluster_0_150"). This function filters to a specific alpha value and removes the trailing _{alpha} suffix from names.

Parameters

gdf_alpha : gpd.GeoDataFrame GeoDataFrame of alpha shapes with 'inv_alpha', 'area', 'name', and 'cat' columns. Typically the output of alpha_shape_cell_clusters. alpha : float The inverse alpha value to filter for (must match values in 'inv_alpha' column). min_area : float, default 0 Minimum area threshold. Shapes with area <= min_area are excluded. clean_names : bool, default True If True, removes the trailing _{alpha} suffix from the 'name' column, leaving just the category name (e.g., "cluster_0" instead of "cluster_0_150").

Returns

gpd.GeoDataFrame Filtered GeoDataFrame with optionally cleaned names.

Examples

gdf_alpha = dega.nbhd.alpha_shape_cell_clusters(adata, cat="leiden") gdf_filtered = dega.nbhd.filter_alpha_shapes(gdf_alpha, alpha=150)

Names are now just category names without the alpha suffix

print(gdf_filtered["name"].tolist()[:3]) ['0', '1', '2']

Source code in src/celldega/nbhd/alpha_shapes.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def filter_alpha_shapes(
    gdf_alpha: gpd.GeoDataFrame,
    alpha: float,
    min_area: float = 0,
    clean_names: bool = True,
) -> gpd.GeoDataFrame:
    """
    Filter alpha shapes by a specific alpha value and optionally clean up names.

    Alpha shapes computed by `alpha_shape_cell_clusters` have names in the format
    `{category}_{alpha}` (e.g., "cluster_0_150"). This function filters to a specific
    alpha value and removes the trailing `_{alpha}` suffix from names.

    Parameters
    ----------
    gdf_alpha : gpd.GeoDataFrame
        GeoDataFrame of alpha shapes with 'inv_alpha', 'area', 'name', and 'cat' columns.
        Typically the output of `alpha_shape_cell_clusters`.
    alpha : float
        The inverse alpha value to filter for (must match values in 'inv_alpha' column).
    min_area : float, default 0
        Minimum area threshold. Shapes with area <= min_area are excluded.
    clean_names : bool, default True
        If True, removes the trailing `_{alpha}` suffix from the 'name' column,
        leaving just the category name (e.g., "cluster_0" instead of "cluster_0_150").

    Returns
    -------
    gpd.GeoDataFrame
        Filtered GeoDataFrame with optionally cleaned names.

    Examples
    --------
    >>> gdf_alpha = dega.nbhd.alpha_shape_cell_clusters(adata, cat="leiden")
    >>> gdf_filtered = dega.nbhd.filter_alpha_shapes(gdf_alpha, alpha=150)
    >>> # Names are now just category names without the alpha suffix
    >>> print(gdf_filtered["name"].tolist()[:3])
    ['0', '1', '2']
    """
    # Filter by alpha value
    gdf_filtered = gdf_alpha[gdf_alpha["inv_alpha"] == alpha].copy()

    # Filter by minimum area
    gdf_filtered = gdf_filtered[gdf_filtered["area"] > min_area]

    if clean_names:
        # Remove the trailing _{alpha} suffix from names
        # Names are in format "{cat}_{alpha}", we want just "{cat}"
        alpha_suffix = f"_{int(alpha)}"
        gdf_filtered["name"] = gdf_filtered["name"].apply(
            lambda x: x[: -len(alpha_suffix)] if x.endswith(alpha_suffix) else x
        )

    # Reset index for clean output
    return gdf_filtered.reset_index(drop=True)

generate_hextile(adata, diameter=100)

Generate a hexagonal grid over the bounding box of cell spatial coordinates.

Parameters

adata : AnnData AnnData object with spatial coordinates in obsm["spatial"]. diameter : float, default 100 Diameter of each hexagon in the same units as the spatial coordinates (typically microns).

Returns

gpd.GeoDataFrame GeoDataFrame with hexagon geometries covering the spatial extent. Columns: "name" (hex_0, hex_1, ...), "geometry" (Polygon).

Examples

gdf_hex = dega.nbhd.generate_hextile(adata, diameter=100) gdf_hex.shape (1234, 2)

Source code in src/celldega/nbhd/hextile.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def generate_hextile(
    adata: "AnnData",
    diameter: float = 100,
) -> gpd.GeoDataFrame:
    """
    Generate a hexagonal grid over the bounding box of cell spatial coordinates.

    Parameters
    ----------
    adata : AnnData
        AnnData object with spatial coordinates in `obsm["spatial"]`.
    diameter : float, default 100
        Diameter of each hexagon in the same units as the spatial coordinates
        (typically microns).

    Returns
    -------
    gpd.GeoDataFrame
        GeoDataFrame with hexagon geometries covering the spatial extent.
        Columns: "name" (hex_0, hex_1, ...), "geometry" (Polygon).

    Examples
    --------
    >>> gdf_hex = dega.nbhd.generate_hextile(adata, diameter=100)
    >>> gdf_hex.shape
    (1234, 2)
    """
    # Get bounding box directly from spatial coordinates
    coords = adata.obsm["spatial"]
    minx, miny = coords[:, 0].min(), coords[:, 1].min()
    maxx, maxy = coords[:, 0].max(), coords[:, 1].max()

    radius = diameter / 2
    dx = np.sqrt(3) * radius
    dy = 1.5 * radius

    angles_deg = [30 + i * 60 for i in range(6)]
    angles_rad = [np.radians(a) for a in angles_deg]
    unit_hex = Polygon([(radius * np.cos(a), radius * np.sin(a)) for a in angles_rad])

    n_cols = int((maxx - minx) / dx) + 3
    n_rows = int((maxy - miny) / dy) + 3

    hexagons = []
    for row in range(n_rows):
        for col in range(n_cols):
            x = col * dx
            y = row * dy
            if row % 2 == 1:
                x += dx / 2
            hex_tile = translate(unit_hex, xoff=x + minx - dx, yoff=y + miny - dy)
            hexagons.append(hex_tile)

    names = [f"hex_{i}" for i in range(len(hexagons))]
    gdf_hex = gpd.GeoDataFrame(
        {"name": names, "geometry": hexagons},
    )
    gdf_hex = gdf_hex.set_index("name")
    gdf_hex["name"] = names  # Keep name as both index and column

    return gdf_hex

get_nbhd_meta(gdf_nbhd, unique_nbhd_col, gdf_trx, gdf_cell)

Compute neighborhood-level summary statistics including transcript and cell assignments, along with area and perimeter from geometry.

Source code in src/celldega/nbhd/neighborhoods.py
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
def get_nbhd_meta(
    gdf_nbhd: gpd.GeoDataFrame,
    unique_nbhd_col: str,
    gdf_trx: gpd.GeoDataFrame,
    gdf_cell: gpd.GeoDataFrame,
) -> pd.DataFrame:
    """
    Compute neighborhood-level summary statistics including transcript and cell assignments,
    along with area and perimeter from geometry.
    """
    print("Calculating NBM")
    gdf_nbhd = gdf_nbhd.copy()
    gdf_nbhd = gdf_nbhd.set_index(unique_nbhd_col)
    gdf_nbhd[unique_nbhd_col] = gdf_nbhd.index
    summary = pd.DataFrame(index=gdf_nbhd.index)
    summary.index.name = "nbhd_id"
    summary["area_squm"] = gdf_nbhd.geometry.area.round(2)
    summary["perimeter_um"] = gdf_nbhd.geometry.length.round(2)
    gdf_trx = gdf_trx.sjoin(gdf_nbhd[[unique_nbhd_col, "geometry"]], how="left", predicate="within")
    trx_summary = gdf_trx.groupby(unique_nbhd_col).agg(
        total_trx=("cell_id", "size"),
        unassigned_trx_count=("cell_id", lambda x: (x == "UNASSIGNED").sum()),
        assigned_trx_count=("cell_id", lambda x: (x != "UNASSIGNED").sum()),
    )
    trx_summary = trx_summary.reindex(gdf_nbhd.index).fillna(0)
    trx_summary["assigned_trx_pct"] = trx_summary["assigned_trx_count"] / trx_summary[
        "total_trx"
    ].replace(0, 1)
    trx_summary["unassigned_trx_pct"] = trx_summary["unassigned_trx_count"] / trx_summary[
        "total_trx"
    ].replace(0, 1)
    gdf_c = gdf_cell[["geometry"]].sjoin(
        gdf_nbhd[[unique_nbhd_col, "geometry"]], how="left", predicate="within"
    )
    cell_counts = gdf_c.groupby(unique_nbhd_col).size().rename("cell_count")
    cell_counts = cell_counts.reindex(gdf_nbhd.index).fillna(0)
    return summary.join(trx_summary).join(cell_counts)

hextile_niche(gdf_hex, adata_hex, category='leiden', dissolve=True)

Create niche polygons from hextiles based on clustering results.

Takes hexagon geometries and assigns them to niches based on clustering (e.g., Leiden clustering of hexagon population distributions). Optionally dissolves adjacent hexagons of the same niche into unified polygons.

Parameters

gdf_hex : gpd.GeoDataFrame GeoDataFrame of hexagon geometries. Must be indexed by hexagon name (matching adata_hex.obs.index). adata_hex : AnnData AnnData object containing clustering results in obs[category]. The index must match the hexagon names in gdf_hex. Colors can be provided in uns[f"{category}_colors"]. category : str, default "leiden" Column name in adata_hex.obs containing the niche/cluster assignment. dissolve : bool, default True If True, dissolve adjacent hexagons of the same niche into unified MultiPolygon geometries. If False, return individual hexagons with their niche assignment.

Returns

gpd.GeoDataFrame If dissolve=True: GeoDataFrame with dissolved niche polygons. Columns: "name", "cat", "geometry", "color", "area". If dissolve=False: GeoDataFrame with individual hexagons and niche assignment. Columns: "name", "cat", "geometry", "color".

Examples

Generate hexagons and compute population distribution

gdf_hex = dega.nbhd.generate_hextile(adata, diameter=100) adata_hex = dega.nbhd.calc_nbhd_by_pop(adata, gdf_hex, category="leiden")

Cluster hexagons by population similarity (e.g., using scanpy)

import scanpy as sc sc.pp.pca(adata_hex) sc.pp.neighbors(adata_hex) sc.tl.leiden(adata_hex)

Create dissolved niche polygons

gdf_niche = dega.nbhd.hextile_niche(gdf_hex, adata_hex, category="leiden")

Or keep individual hexagons with niche assignment

gdf_hex_niche = dega.nbhd.hextile_niche(gdf_hex, adata_hex, dissolve=False)

Source code in src/celldega/nbhd/hextile.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def hextile_niche(
    gdf_hex: gpd.GeoDataFrame,
    adata_hex: "AnnData",
    category: str = "leiden",
    dissolve: bool = True,
) -> gpd.GeoDataFrame:
    """
    Create niche polygons from hextiles based on clustering results.

    Takes hexagon geometries and assigns them to niches based on clustering
    (e.g., Leiden clustering of hexagon population distributions). Optionally
    dissolves adjacent hexagons of the same niche into unified polygons.

    Parameters
    ----------
    gdf_hex : gpd.GeoDataFrame
        GeoDataFrame of hexagon geometries. Must be indexed by hexagon name
        (matching `adata_hex.obs.index`).
    adata_hex : AnnData
        AnnData object containing clustering results in `obs[category]`.
        The index must match the hexagon names in `gdf_hex`.
        Colors can be provided in `uns[f"{category}_colors"]`.
    category : str, default "leiden"
        Column name in `adata_hex.obs` containing the niche/cluster assignment.
    dissolve : bool, default True
        If True, dissolve adjacent hexagons of the same niche into unified
        MultiPolygon geometries. If False, return individual hexagons with
        their niche assignment.

    Returns
    -------
    gpd.GeoDataFrame
        If dissolve=True:
            GeoDataFrame with dissolved niche polygons.
            Columns: "name", "cat", "geometry", "color", "area".
        If dissolve=False:
            GeoDataFrame with individual hexagons and niche assignment.
            Columns: "name", "cat", "geometry", "color".

    Examples
    --------
    >>> # Generate hexagons and compute population distribution
    >>> gdf_hex = dega.nbhd.generate_hextile(adata, diameter=100)
    >>> adata_hex = dega.nbhd.calc_nbhd_by_pop(adata, gdf_hex, category="leiden")
    >>>
    >>> # Cluster hexagons by population similarity (e.g., using scanpy)
    >>> import scanpy as sc
    >>> sc.pp.pca(adata_hex)
    >>> sc.pp.neighbors(adata_hex)
    >>> sc.tl.leiden(adata_hex)
    >>>
    >>> # Create dissolved niche polygons
    >>> gdf_niche = dega.nbhd.hextile_niche(gdf_hex, adata_hex, category="leiden")
    >>>
    >>> # Or keep individual hexagons with niche assignment
    >>> gdf_hex_niche = dega.nbhd.hextile_niche(gdf_hex, adata_hex, dissolve=False)
    """
    # Validate inputs
    if category not in adata_hex.obs.columns:
        raise ValueError(f"adata_hex.obs missing '{category}' column")

    # Get niche assignments aligned with hexagon index
    hex_names = gdf_hex.index.tolist()
    adata_names = adata_hex.obs.index.tolist()

    # Check alignment
    common_names = set(hex_names) & set(adata_names)
    if len(common_names) == 0:
        raise ValueError("No matching names between gdf_hex index and adata_hex.obs.index")

    # Filter to common hexagons
    gdf_result = gdf_hex.loc[gdf_hex.index.isin(common_names)].copy()
    niche_values = adata_hex.obs.loc[gdf_result.index, category].astype(str)
    gdf_result["cat"] = niche_values.values

    # Get colors from adata.uns if available
    color_key = f"{category}_colors"
    color_dict: dict[str, str] = {}
    if color_key in adata_hex.uns:
        src_colors = adata_hex.uns[color_key]
        if hasattr(adata_hex.obs[category], "cat"):
            src_categories = list(adata_hex.obs[category].cat.categories.astype(str))
        else:
            src_categories = sorted(adata_hex.obs[category].unique().astype(str))

        color_dict = {
            str(cat): src_colors[i] for i, cat in enumerate(src_categories) if i < len(src_colors)
        }

    # Assign colors
    gdf_result["color"] = [color_dict.get(c, "#808080") for c in gdf_result["cat"]]

    if dissolve:
        # Dissolve hexagons by niche category
        gdf_dissolved = gdf_result.dissolve(by="cat", as_index=False)

        # Clean up and add metadata
        gdf_dissolved["name"] = gdf_dissolved["cat"]
        gdf_dissolved["area"] = gdf_dissolved.geometry.area

        # Reorder columns
        return gdf_dissolved[["name", "cat", "geometry", "color", "area"]]

    # Return individual hexagons with niche assignment
    gdf_result = gdf_result.reset_index()
    return gdf_result[["name", "cat", "geometry", "color"]]