Skip to content

Neighborhood Module API Reference

Module for performing neighborhood analysis.

NBHD

A class representing neighborhoods with associated derived data matrices.

Source code in src/celldega/nbhd/neighborhoods.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class NBHD:
    """A class representing neighborhoods with associated derived data matrices."""

    def __init__(
        self,
        gdf: gpd.GeoDataFrame,
        nbhd_type: str,
        adata: AnnData,
        data_dir: str,
        path_landscape_files: str,
        source: str | dict[str, Any] | None = None,
        name: str | None = None,
        meta: dict[str, Any] | None = None,
    ) -> None:
        self.gdf = gdf.copy()
        self.nbhd_type = nbhd_type
        self.adata = adata
        self.data_dir = data_dir
        self.path_landscape_files = path_landscape_files
        self.source = source
        self.name = name
        self.meta = meta or {}

        self.derived: dict[str, Any] = {
            "NBI": None,
            "NBG-CF": None,
            "NBG-CD": None,
            "NBG-LCD": {},
            "NBP": {},
            "NBN-O": None,
            "NBN-B": None,
        }

    def set_derived(self, key: str, subkey: str | None = None) -> None:
        """
        Set a derived data matrix.
        """
        if key == "NBG-CD":
            data = calc_nbg_cd(self.adata, self.gdf, "CD")
        elif key == "NBG-LCD":
            data = calc_nbg_cd(self.adata, self.gdf, "LCD")
        elif key == "NBG-CF":
            data = calc_nbg_cf(self.data_dir, self.gdf)
        elif key == "NBP":
            data = {}
            gdf_cell = _get_gdf_cell(self.adata)
            data["abs"], data["pct"] = calc_nbp(gdf_cell, self.gdf)
        elif key == "NBM":
            gdf_trx = _get_gdf_trx(self.data_dir)
            gdf_cell = _get_gdf_cell(self.adata)
            data = get_nbhd_meta(self.gdf, "name", gdf_trx, gdf_cell)
        elif key == "NBN-O":
            if self.nbhd_type == "ALPH":
                nb = self.gdf[["name", "geometry"]]
                print("Calculating neighborhood overlap")
                data = calc_nb_overlap(nb)
            else:
                raise ValueError("NBN-O can be derived for ALPH only")
        elif key == "NBN-B":
            if self.nbhd_type == "ALPH":
                raise ValueError("NBN-B can not be derived for nbhd having overlap")
            nb = self.gdf[["name", "geometry"]]
            print("Calculating neighborhood bordering")
            data = calc_nb_bordering(nb)
        elif key == "NBI":
            data = calc_nbi(
                f"{self.data_dir}/morphology_focus/morphology_focus_0000.ome.tif",
                self.path_landscape_files,
                self.gdf,
            )
        else:
            raise ValueError(f"Unknown derived key: {key}")

        if key in {"NBP", "NBG-LCD"}:
            for subkey in data:
                self.derived[key][subkey] = data[subkey]
        else:
            self.derived[key] = data

        print(f"{key} is derived and attached to nbhd")

    def _add_geo(self, df: pd.DataFrame) -> pd.DataFrame:
        return (
            self.gdf[["name", "geometry"]]
            .set_index("name")
            .join(df, how="left")
            .fillna(0)
            .reset_index()
            .rename(columns={"name": "nbhd_id"})
        )

    def get_derived(self, key: str, subkey: str | None = None) -> pd.DataFrame:
        if key in {"NBP", "NBG-LCD"}:
            df = self.derived[key].get(subkey)
            return self._add_geo(df)
        df = self.derived.get(key)
        return self._add_geo(df)

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        return self.gdf

    def summary(self) -> dict[str, Any]:
        return {
            "name": self.name,
            "type": self.nbhd_type,
            "n_regions": len(self.gdf),
            "derived": {k: self._derived_summary(k) for k in self.derived},
            "meta": self.meta,
        }

    def _derived_summary(self, key: str) -> tuple | dict[str, tuple] | None:
        val = self.derived.get(key)
        if val is None:
            return None
        if key in ["NBP", "NBG-LCD"]:
            if key == "NBP":
                subkeys = ["abs", "pct"]
            elif key == "NBG-LCD":
                subkeys = sorted(self.adata.obs["leiden"].unique().tolist())
            summary = {}
            for subkey in subkeys:
                subval = val.get(subkey)
                summary[subkey] = subval.shape if hasattr(subval, "shape") else None
            return summary
        return val.shape if hasattr(val, "shape") else None

set_derived(key, subkey=None)

Set a derived data matrix.

Source code in src/celldega/nbhd/neighborhoods.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def set_derived(self, key: str, subkey: str | None = None) -> None:
    """
    Set a derived data matrix.
    """
    if key == "NBG-CD":
        data = calc_nbg_cd(self.adata, self.gdf, "CD")
    elif key == "NBG-LCD":
        data = calc_nbg_cd(self.adata, self.gdf, "LCD")
    elif key == "NBG-CF":
        data = calc_nbg_cf(self.data_dir, self.gdf)
    elif key == "NBP":
        data = {}
        gdf_cell = _get_gdf_cell(self.adata)
        data["abs"], data["pct"] = calc_nbp(gdf_cell, self.gdf)
    elif key == "NBM":
        gdf_trx = _get_gdf_trx(self.data_dir)
        gdf_cell = _get_gdf_cell(self.adata)
        data = get_nbhd_meta(self.gdf, "name", gdf_trx, gdf_cell)
    elif key == "NBN-O":
        if self.nbhd_type == "ALPH":
            nb = self.gdf[["name", "geometry"]]
            print("Calculating neighborhood overlap")
            data = calc_nb_overlap(nb)
        else:
            raise ValueError("NBN-O can be derived for ALPH only")
    elif key == "NBN-B":
        if self.nbhd_type == "ALPH":
            raise ValueError("NBN-B can not be derived for nbhd having overlap")
        nb = self.gdf[["name", "geometry"]]
        print("Calculating neighborhood bordering")
        data = calc_nb_bordering(nb)
    elif key == "NBI":
        data = calc_nbi(
            f"{self.data_dir}/morphology_focus/morphology_focus_0000.ome.tif",
            self.path_landscape_files,
            self.gdf,
        )
    else:
        raise ValueError(f"Unknown derived key: {key}")

    if key in {"NBP", "NBG-LCD"}:
        for subkey in data:
            self.derived[key][subkey] = data[subkey]
    else:
        self.derived[key] = data

    print(f"{key} is derived and attached to nbhd")

alpha_shape_cell_clusters(adata, cat='cluster', alphas=(100, 150, 200, 250, 300, 350), meta_cluster=None)

Compute alpha shapes for each cluster in the cell metadata.

Source code in src/celldega/nbhd/alpha_shapes.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def alpha_shape_cell_clusters(
    adata: ad.AnnData,
    cat: str = "cluster",
    alphas: Sequence[float] = (100, 150, 200, 250, 300, 350),
    meta_cluster: pd.DataFrame | None = None,
) -> gpd.GeoDataFrame:
    """
    Compute alpha shapes for each cluster in the cell metadata.
    """

    meta_cell = adata.obs

    coords = adata.obsm["spatial"]
    meta_cell["geometry"] = list(coords)

    gdf_alpha = gpd.GeoDataFrame()

    for inv_alpha in alphas:
        for inst_cluster in meta_cell[cat].unique():
            inst_clust = meta_cell[meta_cell[cat] == inst_cluster]
            if inst_clust.shape[0] > 3:
                nested_array = inst_clust["geometry"].values
                flat_array = np.vstack(nested_array)
                inst_shape = alpha_shape(flat_array, inv_alpha)

                inst_name = f"{inst_cluster}_{inv_alpha}"

                gdf_alpha.loc[inst_name, "name"] = inst_name
                gdf_alpha.loc[inst_name, "cat"] = inst_cluster
                gdf_alpha.loc[inst_name, "geometry"] = inst_shape
                gdf_alpha.loc[inst_name, "inv_alpha"] = int(inv_alpha)

                # look up color using meta_cluster if provided
                if meta_cluster is not None and inst_cluster in meta_cluster.index:
                    gdf_alpha.loc[inst_name, "color"] = meta_cluster.loc[inst_cluster, "color"]
                else:
                    gdf_alpha.loc[inst_name, "color"] = "#000000"

    gdf_alpha["geometry"] = gdf_alpha["geometry"].apply(
        lambda geom: _round_coordinates(geom, precision=2)
    )
    gdf_alpha["area"] = gdf_alpha.area

    return gdf_alpha.loc[gdf_alpha.area.sort_values(ascending=False).index.tolist()]

calc_grad_nbhd_from_roi(polygon, gdf_reference, band_width=300)

Generate concentric rings (neighborhood bands) from a polygon, clipped to the convex hull of a reference GeoDataFrame.

Parameters

polygon : GeoDataFrame GeoDataFrame containing a single polygon. gdf_reference : GeoDataFrame Reference GeoDataFrame used to calculate the boundary area (convex hull). band_width : float Width of each band in microns (default: 300).

Returns

GeoDataFrame GeoDataFrame with columns for band (index of ring) and geometry (polygon).

Source code in src/celldega/nbhd/gradient.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def calc_grad_nbhd_from_roi(
    polygon: gpd.GeoDataFrame,
    gdf_reference: gpd.GeoDataFrame,
    band_width: float = 300,
) -> gpd.GeoDataFrame:
    """
    Generate concentric rings (neighborhood bands) from a polygon,
    clipped to the convex hull of a reference GeoDataFrame.

    Parameters
    ----------
    polygon : GeoDataFrame
        GeoDataFrame containing a single polygon.
    gdf_reference : GeoDataFrame
        Reference GeoDataFrame used to calculate the boundary area (convex hull).
    band_width : float
        Width of each band in microns (default: 300).

    Returns
    -------
    GeoDataFrame
        GeoDataFrame with columns for band (index of ring) and geometry (polygon).
    """
    if len(polygon) != 1:
        raise ValueError("Input polygon GeoDataFrame must contain exactly one polygon.")

    roi_polygon = polygon.geometry.iloc[0]
    boundary = gdf_reference.unary_union.convex_hull

    bands = []
    current_polygon = roi_polygon
    band_idx = 0

    # Add the original polygon as band 0
    bands.append({"band": f"grad_{band_idx}", "geometry": roi_polygon})

    while True:
        band_idx += 1
        # Generate next ring
        next_buffer = current_polygon.buffer(band_width)
        ring = next_buffer.difference(current_polygon)

        # Clip the ring to the convex hull boundary
        ring_clipped = ring.intersection(boundary)

        # Stop if no part of the ring remains within boundary
        if ring_clipped.is_empty:
            break

        bands.append({"band": f"grad_{band_idx}", "geometry": ring_clipped})
        current_polygon = next_buffer

    gdf = gpd.GeoDataFrame(bands, crs=polygon.crs)
    gdf["band_width"] = band_width

    return gdf

calc_nb_bordering(gdf_nbhd)

Identify pairs of neighborhoods that share a border (touch), using spatial indexing for efficiency.

Source code in src/celldega/nbhd/neighborhoods.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def calc_nb_bordering(
    gdf_nbhd: gpd.GeoDataFrame,
) -> pd.DataFrame:
    """
    Identify pairs of neighborhoods that share a border (touch), using spatial indexing for efficiency.
    """
    print("Calculating NBN-B")
    gdf_nbhd = gdf_nbhd.copy()
    gdf_nbhd["geometry"] = gdf_nbhd["geometry"].buffer(0)
    gdf_touches = gpd.sjoin(gdf_nbhd, gdf_nbhd, how="inner", predicate="touches")
    gdf_touches = gdf_touches[gdf_touches["name_left"] != gdf_touches["name_right"]]
    gdf_touches["pair"] = gdf_touches.apply(
        lambda row: tuple(sorted((row["name_left"], row["name_right"]))), axis=1
    )
    gdf_touches = gdf_touches.drop_duplicates(subset="pair")
    return (
        gdf_touches[["name_left", "name_right"]]
        .rename(columns={"name_left": "nbhd_1", "name_right": "nbhd_2"})
        .reset_index(drop=True)
    )

calc_nb_overlap(gdf_nbhd)

Calculate the pairwise overlap between all neighborhoods, including overlap area and geometry. Skips intersections that are empty or have zero area.

Source code in src/celldega/nbhd/neighborhoods.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def calc_nb_overlap(
    gdf_nbhd: gpd.GeoDataFrame,
) -> gpd.GeoDataFrame:
    """
    Calculate the pairwise overlap between all neighborhoods, including overlap area and geometry.
    Skips intersections that are empty or have zero area.
    """
    print("Calculating NBN-O")
    gdf_nbhd = gdf_nbhd.copy()
    gdf_nbhd["geometry"] = gdf_nbhd["geometry"].buffer(0)
    results = []
    for nb1, nb2 in combinations(gdf_nbhd["name"], 2):
        geom1 = gdf_nbhd.loc[gdf_nbhd["name"] == nb1, "geometry"].values[0]
        geom2 = gdf_nbhd.loc[gdf_nbhd["name"] == nb2, "geometry"].values[0]
        intersection = geom1.intersection(geom2)
        if not intersection.is_empty and intersection.area > 0:
            results.append(
                {
                    "nbhd_1": nb1,
                    "nbhd_2": nb2,
                    "overlap_area": round(intersection.area, 2),
                    "geometry": intersection,
                }
            )
    if results:
        return gpd.GeoDataFrame(results, geometry="geometry", crs=gdf_nbhd.crs)
    return gpd.GeoDataFrame(
        columns=["nbhd_1", "nbhd_2", "overlap_area", "geometry"],
        geometry="geometry",
        crs=gdf_nbhd.crs,
    )

calc_nbg_cd(adata, gdf_nbhd, cd_mode='CD/LCD', unique_nbhd_col='name')

Calculate the mean expression of cells within a neighborhood (CD) or the mean expression of cells from a given Leiden cluster (LCD).

Source code in src/celldega/nbhd/neighborhoods.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def calc_nbg_cd(
    adata: AnnData,
    gdf_nbhd: gpd.GeoDataFrame,
    cd_mode: str = "CD/LCD",
    unique_nbhd_col: str = "name",
) -> gpd.GeoDataFrame | dict[Any, gpd.GeoDataFrame]:
    """
    Calculate the mean expression of cells within a neighborhood (CD)
    or the mean expression of cells from a given Leiden cluster (LCD).
    """
    gene_list = adata.var.index

    gene_exp = pd.DataFrame(
        adata.X.toarray() if hasattr(adata.X, "toarray") else adata.X,
        columns=gene_list,
        index=adata.obs_names,
    )

    gdf_cell = gpd.GeoDataFrame(
        data={"cluster": adata.obs["leiden"], **gene_exp},
        geometry=gpd.points_from_xy(*adata.obsm["spatial"].T[:2]),
        crs="EPSG:4326",
    )

    def compute_cd(gdf_cell_subset: gpd.GeoDataFrame) -> pd.DataFrame:
        joined = gdf_cell_subset.sjoin(
            gdf_nbhd[[unique_nbhd_col, "geometry"]],
            how="left",
            predicate="within",
        )
        joined.drop(columns=["index_right", "cat", "geometry"], inplace=True, errors="ignore")

        df_nbhd_join = gdf_nbhd[[unique_nbhd_col]]
        for gene in gene_list:
            avg = joined.groupby(unique_nbhd_col)[gene].mean().reset_index()
            avg.columns = [unique_nbhd_col, gene]
            df_nbhd_join = df_nbhd_join.merge(avg, on=unique_nbhd_col, how="left")

        df_nbhd_join.rename(columns={unique_nbhd_col: "nbhd_id"}, inplace=True)
        df_nbhd_join.set_index("nbhd_id", inplace=True)

        return df_nbhd_join

    if cd_mode == "LCD":
        print("Calculating NBG-LCD")
        nbhd_by_cluster: dict[Any, pd.DataFrame] = {}
        for cluster in gdf_cell["cluster"].unique():
            cluster_cells = gdf_cell[gdf_cell["cluster"] == cluster]
            nbhd_by_cluster[cluster] = compute_cd(cluster_cells)
        return nbhd_by_cluster

    if cd_mode == "CD":
        print("Calculating NBG-CD")
        return compute_cd(gdf_cell)

    raise ValueError("cd_mode must be 'CD' or 'LCD'")

calc_nbg_cf(data_dir, gdf_nbhd, unique_nbhd_col='name')

Calculates the neighborhood by gene expression.

Source code in src/celldega/nbhd/neighborhoods.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def calc_nbg_cf(
    data_dir: str,
    gdf_nbhd: gpd.GeoDataFrame,
    unique_nbhd_col: str = "name",
) -> pd.DataFrame:
    """
    Calculates the neighborhood by gene expression.
    """
    print("Calculating NBG-CF")
    df_trx = pd.read_parquet(
        f"{data_dir}/transcripts.parquet",
        columns=["feature_name", "x_location", "y_location", "cell_id"],
        engine="pyarrow",
    )
    geometry = gpd.points_from_xy(df_trx["x_location"], df_trx["y_location"])
    gdf_trx = gpd.GeoDataFrame(df_trx[["feature_name"]], geometry=geometry, crs="EPSG:4326")
    gdf_trx = gdf_trx.sjoin(gdf_nbhd[[unique_nbhd_col, "geometry"]], how="left", predicate="within")
    gdf_trx.rename(columns={unique_nbhd_col: "nbhd_id"}, inplace=True)
    return (
        gdf_trx.groupby(["nbhd_id", "feature_name"])
        .size()
        .unstack(fill_value=0)
        .rename_axis("nbhd_id")
        .rename_axis(None, axis=1)
        .reindex(gdf_nbhd[unique_nbhd_col])
        .fillna(0)
        .astype(int)
    )

calc_nbp(gdf_cell, gdf_nbhd, nbhd_col='name')

Calculate cell counts and percentages per cluster within neighborhoods. Returns two DataFrames: 1. Raw counts per neighborhood-cluster combination 2. Percentage distribution of clusters within each neighborhood

Source code in src/celldega/nbhd/neighborhoods.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def calc_nbp(
    gdf_cell: gpd.GeoDataFrame,
    gdf_nbhd: gpd.GeoDataFrame,
    nbhd_col: str = "name",
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Calculate cell counts and percentages per cluster within neighborhoods.
    Returns two DataFrames:
    1. Raw counts per neighborhood-cluster combination
    2. Percentage distribution of clusters within each neighborhood
    """
    print("Calculating NBP")
    required = {"geometry", nbhd_col}
    if not required.issubset(gdf_nbhd.columns):
        raise ValueError(f"gdf_nbhd missing required columns: {required - set(gdf_nbhd.columns)}")
    if not {"geometry", "cluster"}.issubset(gdf_cell.columns):
        raise ValueError("gdf_cell missing required 'geometry' or 'cluster' column")

    counts = (
        gdf_cell.sjoin(gdf_nbhd[[nbhd_col, "geometry"]], how="left", predicate="within")
        .groupby([nbhd_col, "cluster"])
        .size()
        .unstack(fill_value=0)
        .pipe(lambda df: df.set_axis(df.columns.astype(str), axis=1))
    )
    counts = counts.reindex(gdf_nbhd[nbhd_col]).fillna(0).astype(int)
    percentages = counts.div(counts.sum(axis=1), axis=0).fillna(0) * 100
    return counts, percentages

generate_hex_grid(gdf_cell, radius=20)

Generate a hexagonal grid over the convex hull of a GeoDataFrame using affine translation.

Source code in src/celldega/nbhd/hextile.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def generate_hex_grid(
    gdf_cell: gpd.GeoDataFrame,
    radius: float = 20,
) -> gpd.GeoDataFrame:
    """
    Generate a hexagonal grid over the convex hull of a GeoDataFrame using affine translation.
    """
    bounding_geom = gdf_cell.unary_union.convex_hull
    minx, miny, maxx, maxy = bounding_geom.bounds

    dx = np.sqrt(3) * radius
    dy = 1.5 * radius

    angles_deg = [30 + i * 60 for i in range(6)]
    angles_rad = [np.radians(a) for a in angles_deg]
    unit_hex = Polygon([(radius * np.cos(a), radius * np.sin(a)) for a in angles_rad])

    n_cols = int((maxx - minx) / dx) + 3
    n_rows = int((maxy - miny) / dy) + 3

    hexagons = []
    for row in range(n_rows):
        for col in range(n_cols):
            x = col * dx
            y = row * dy
            if row % 2 == 1:
                x += dx / 2
            hex_tile = translate(unit_hex, xoff=x + minx - dx, yoff=y + miny - dy)
            if hex_tile.intersects(bounding_geom):
                hexagons.append(hex_tile)

    return gpd.GeoDataFrame(
        {
            "name": [f"hex_{i}" for i in range(len(hexagons))],
            "geometry": hexagons,
        },
        crs=gdf_cell.crs,
    )