from pathlib import Path
import cdmconnector as cc
import ibis
path = cc.eunomia_dir("GiBleed", cdm_version="5.3")
con = ibis.duckdb.connect(path)
cdm = cc.cdm_from_con(con, cdm_schema="main", write_schema="main", cdm_name="eunomia")Story: GiBleed End-to-End
Define a simple GI bleed cohort, index date per person, and windowed analysis ±30 days.
You will learn
- How to define a simple GI bleed cohort from condition concepts (discover or use placeholder concept IDs)
- How to define an index date per person (first condition occurrence)
- How to run windowed analysis ±30 days (conditions, visits, measurements)
- How to output “top events around index” tables (skeleton)
Story question
“What conditions, visits, and measurements occur in the 30 days before and after a person’s first GI bleed?”
Setup
GiBleed dataset; connect and obtain cdm.
Explore: GI bleed concept(s)
Discover GI bleed condition concept IDs from the concept table (or use known IDs). Placeholder: 192671 (Gastrointestinal hemorrhage) and descendants.
concept = cdm.concept
# Optionally discover GI bleed concepts by name (e.g. filter concept_name containing "hemorrhage" or "bleed")
# gibleed_concepts = concept.filter(concept.concept_name.str.contains("Gastrointestinal hemorrhage", regex=False))
# cc.collect(gibleed_concepts.select("concept_id", "concept_name").limit(5))
# For this notebook we use a known concept_id (Gastrointestinal hemorrhage)
gibleed_concept_id = 192671
cond = cdm.condition_occurrence
gibleed_cond = cond.filter(cond.condition_concept_id == gibleed_concept_id)Build: Index date per person
First GI bleed date per person = index date.
# First occurrence per person (index date)
index_dates = (
gibleed_cond
.group_by(cond.person_id)
.aggregate(index_date=cond.condition_start_date.min())
)
cc.collect(index_dates.limit(10))Build: Windowed analysis ±30 days
Join index_dates to condition_occurrence / visit_occurrence / measurement and filter to ±30 days from index_date. Skeleton: conditions in window.
# Conditions in ±30-day window (join to index_dates, filter by date range)
cond_all = cdm.condition_occurrence
joined = cond_all.join(index_dates, cond_all.person_id == index_dates.person_id, how="inner")
in_window = joined.filter(
joined.condition_start_date >= joined.index_date - ibis.interval(days=30),
joined.condition_start_date <= joined.index_date + ibis.interval(days=30),
)
# Aggregate to "top conditions in window" (count by concept_id)
top_conditions = (
in_window
.group_by(cond_all.condition_concept_id)
.aggregate(n=cond_all.condition_occurrence_id.count())
.order_by(ibis.desc("n"))
.limit(10)
)
cc.collect(top_conditions)Interpret: Top events around index (skeleton)
Optionally join top_conditions to concept for names; same pattern for visits and measurements (skeleton).
# Join top_conditions to concept for condition names
top_conditions_with_names = top_conditions.join(
concept, top_conditions.condition_concept_id == concept.concept_id, how="left"
).select("condition_concept_id", concept.concept_name.name("condition_name"), "n")
cc.collect(top_conditions_with_names)
# Top visits in ±30-day window
visit_all = cdm.visit_occurrence
joined_visits = visit_all.join(index_dates, visit_all.person_id == index_dates.person_id, how="inner")
in_window_visits = joined_visits.filter(
joined_visits.visit_start_date >= joined_visits.index_date - ibis.interval(days=30),
joined_visits.visit_start_date <= joined_visits.index_date + ibis.interval(days=30),
)
top_visits = (
in_window_visits.group_by(visit_all.visit_concept_id)
.aggregate(n=visit_all.visit_occurrence_id.count())
.order_by(ibis.desc("n"))
.limit(10)
)
top_visits_with_names = top_visits.join(
concept, top_visits.visit_concept_id == concept.concept_id, how="left"
).select("visit_concept_id", concept.concept_name.name("visit_type"), "n")
cc.collect(top_visits_with_names)
# Top measurements in ±30-day window
meas_all = cdm.measurement
joined_meas = meas_all.join(index_dates, meas_all.person_id == index_dates.person_id, how="inner")
in_window_meas = joined_meas.filter(
joined_meas.measurement_date >= joined_meas.index_date - ibis.interval(days=30),
joined_meas.measurement_date <= joined_meas.index_date + ibis.interval(days=30),
)
top_measurements = (
in_window_meas.group_by(meas_all.measurement_concept_id)
.aggregate(n=meas_all.measurement_id.count())
.order_by(ibis.desc("n"))
.limit(10)
)
top_meas_with_names = top_measurements.join(
concept, top_measurements.measurement_concept_id == concept.concept_id, how="left"
).select("measurement_concept_id", concept.concept_name.name("measurement_name"), "n")
cc.collect(top_meas_with_names)Exercises
- Change the window to ±14 days or ±60 days and compare counts.
- Stratify top conditions by sex (join to person, group by sex and condition_concept_id).
- Show the compiled SQL for the “conditions in window” query using
.compile().
What we learned
- Define a cohort by a condition (e.g. GI bleed) and take first occurrence as index date.
- Windowed analysis: join index_dates to domain tables and filter by date range (±N days).
- Aggregate to “top events” (count by concept_id) and join to concept for names.
Cleanup
cdm.disconnect()