Pedagogical OMOP Data Analysis with CDMConnector (GiBleed / Eunomia)
1 1. What you’ll learn
This tutorial is a mini-course in doing observational data analysis directly on an OMOP CDM database using:
- CDMConnector: creates a single
Cdmobject (named CDM table references + metadata). - Ibis + DuckDB: connects to a local database file created from the Eunomia example dataset.
- Ibis verbs:
filter,select,mutate,group_by,aggregate,order_by,join, etc., executed lazily as SQL.
We’ll also learn the shape and meaning of core OMOP tables:
person,visit_occurrence,condition_occurrence,drug_exposure,measurement, andconcept.
Finally, we will create a simple cohort and compute:
- baseline characteristics (age/sex),
- event counts,
- a basic time-at-risk rate.
2 2. Prerequisites
You should be comfortable with:
- basic Python
- basic pandas (or Ibis) for tables and joins
Everything else will be taught as we go.
3 3. Install and load packages
# If needed:
# pip install "cdmconnector @ git+https://github.com/OHDSI/pyCDMConnector.git"
# pip install "cdmconnector[postgres] @ git+https://github.com/OHDSI/pyCDMConnector.git" # optional: postgres
# Eunomia datasets are downloaded on first use.
import cdmconnector as cc
import ibis4 4. Get the GiBleed example CDM and connect
Eunomia provides synthetic OMOP CDM datasets for testing and teaching. eunomia_dir() creates (and caches) a DuckDB database copy on your machine.
# Ensure the dataset is available (may download the first time).
cc.require_eunomia(dataset_name="GiBleed", cdm_version="5.3")
path = cc.eunomia_dir("GiBleed", cdm_version="5.3")
con = ibis.duckdb.connect(path)
cdm = cc.cdm_from_con(con, cdm_schema="main", write_schema="main", cdm_name="eunomia")
cdm4.1 4.1 What is a Cdm reference?
A Cdm behaves like a mapping: cdm.person, cdm["condition_occurrence"], etc. Each element is a lazy table reference (Ibis) that translates your operations into SQL.
cdm.tables5 5. First contact with OMOP data
5.1 5.1 person: who are the individuals?
cc.collect(
cdm.person.select("person_id", "gender_concept_id", "year_of_birth", "month_of_birth", "day_of_birth")
.limit(10)
)5.1.1 Key OMOP idea: “concept_id everywhere”
OMOP stores standardized medical vocabulary references as integer IDs. gender_concept_id points to the concept table. Decode gender counts by joining to concept.
joined = cdm.person.join(
cdm.concept.select(concept_id=cdm.concept.concept_id, concept_name=cdm.concept.concept_name),
cdm.person.gender_concept_id == cdm.concept.concept_id,
how="left",
)
gender_counts = (
joined.group_by("concept_name")
.aggregate(n=joined.person_id.count())
.order_by(ibis.desc("n"))
)
cc.collect(gender_counts)5.1.2 Optional: search the public vocabulary API
When you do not already know the concept ID you want, search_vocab() can query the public Hecate vocabulary API and return a pandas DataFrame of candidate concepts.
#| eval: false
cc.search_vocab(
"gastrointestinal hemorrhage",
domain_id="Condition",
standard_concept="S",
limit=5,
)This is a good way to discover concept IDs before using them in cohort definitions or targeted table filters.
5.2 5.2 Visits: where do records cluster in time?
In OMOP, many records can be linked to a visit via visit_occurrence_id (when available).
cc.collect(
cdm.visit_occurrence.aggregate(
n_visits=cdm.visit_occurrence.count(),
n_people=cdm.visit_occurrence.person_id.nunique(),
min_date=cdm.visit_occurrence.visit_start_date.min(),
max_date=cdm.visit_occurrence.visit_end_date.max(),
)
)6 6. Ibis on a database: mental model
When you run:
cdm.condition_occurrence.filter(cdm.condition_occurrence.condition_start_date >= "2000-01-01")you are not pulling the whole table into memory. You are building a query. Only when you call cc.collect(expr) (or .execute()) does work happen. Inspect SQL with .compile():
q = (
cdm.condition_occurrence
.filter(cdm.condition_occurrence.condition_start_date >= "2000-01-01")
.select("person_id", "condition_concept_id", "condition_start_date")
)
print(q.compile())7 7. A first descriptive analysis: “Top conditions”
We’ll compute the most frequent condition concepts, then decode them to names.
cond = cdm.condition_occurrence
concept = cdm.concept
by_concept = cond.group_by("condition_concept_id").aggregate(n=cond.count()).order_by(ibis.desc("n"))
top_conditions = (
by_concept.join(
concept.select("concept_id", "concept_name", "domain_id", "vocabulary_id"),
by_concept.condition_concept_id == concept.concept_id,
how="left",
)
.select("condition_concept_id", "concept_name", "domain_id", "vocabulary_id", "n")
.limit(15)
)
cc.collect(top_conditions)7.1 7.1 Exercise (optional)
- Repeat this for
drug_exposure(drug_concept_id). - Repeat this for
measurement(measurement_concept_id).
8 8. The observational workflow: define a cohort, then analyze within it
A common pattern in OHDSI-style observational studies:
- Define an index event (cohort entry).
- Define inclusion/exclusion (optional).
- Define time-at-risk.
- Estimate outcomes, covariates, rates, etc.
We’ll build a simple “GI bleed” cohort from condition concept ID 192671 (Gastrointestinal hemorrhage). In R, CDMConnector has generateConceptCohortSet; in Python we build the cohort manually with Ibis and insert it.
8.1 8.1 Define a “GI bleed” cohort
First occurrence of concept 192671 per person; cohort end = start + 10 days.
gibleed_concept_id = 192671
cond = cdm.condition_occurrence
first_occurrence = (
cond.filter(cond.condition_concept_id == gibleed_concept_id)
.group_by(cond.person_id)
.aggregate(
cohort_start_date=cond.condition_start_date.min(),
)
)
# cohort_end_date = cohort_start_date + 10 days
first_occurrence = first_occurrence.mutate(
cohort_definition_id=ibis.literal(1, type="int64"),
cohort_end_date=first_occurrence.cohort_start_date + ibis.interval(days=10),
)
cohort_expr = first_occurrence.select(
"cohort_definition_id",
subject_id=first_occurrence.person_id,
"cohort_start_date",
"cohort_end_date",
)Insert into the write schema as a cohort table (create empty table, then insert rows).
import pyarrow as pa
cohort_df = cc.collect(cohort_expr)
# PyArrow date columns for cohort_start_date, cohort_end_date
cohort_arrow = pa.table({
"cohort_definition_id": pa.array(cohort_df["cohort_definition_id"], type=pa.int64()),
"subject_id": pa.array(cohort_df["subject_id"], type=pa.int64()),
"cohort_start_date": pa.array(cohort_df["cohort_start_date"].dt.date, type=pa.date32()),
"cohort_end_date": pa.array(cohort_df["cohort_end_date"].dt.date, type=pa.date32()),
})
cdm = cc.new_cohort_table(cdm, "gibleed_cohort", overwrite=True)
cdm.source.insert_table("gibleed_cohort", cohort_arrow, overwrite=True)
cdm["gibleed_cohort"] = cdm.source.table("gibleed_cohort", cdm.write_schema)A cohort table has (at minimum): cohort_definition_id, subject_id, cohort_start_date, cohort_end_date.
cc.collect(cdm["gibleed_cohort"].select("cohort_definition_id", "subject_id", "cohort_start_date", "cohort_end_date").limit(10))8.2 8.2 Cohort size and person-time
cohort_tbl = cdm["gibleed_cohort"]
cc.collect(
cohort_tbl.aggregate(
n_entries=cohort_tbl.count(),
n_people=cohort_tbl.subject_id.nunique(),
min_start=cohort_tbl.cohort_start_date.min(),
max_start=cohort_tbl.cohort_start_date.max(),
)
)Compute crude person-time (days) inside cohort:
cohort_tbl = cdm["gibleed_cohort"]
tar_days_expr = cohort_tbl.cohort_end_date - cohort_tbl.cohort_start_date
cohort_with_tar = cohort_tbl.mutate(tar_days=tar_days_expr)
pt = cohort_with_tar.aggregate(
total_tar_days=cohort_with_tar.tar_days.sum(),
mean_tar_days=cohort_with_tar.tar_days.mean(),
)
cc.collect(pt)9 9. Baseline characteristics at cohort entry
A simple “baseline” snapshot: age and sex at cohort_start_date. We’ll compute approximate age from year/month/day of birth.
cohort_tbl = cdm["gibleed_cohort"]
person = cdm.person
baseline = cohort_tbl.join(
person.select("person_id", "gender_concept_id", "year_of_birth", "month_of_birth", "day_of_birth"),
cohort_tbl.subject_id == person.person_id,
how="inner",
)
# Approximate age at cohort entry (ref year - year_of_birth)
import datetime
ref_year = datetime.date.today().year
baseline = baseline.mutate(age=ref_year - baseline.year_of_birth)
cc.collect(baseline.select("subject_id", "cohort_start_date", "gender_concept_id", "age").limit(10))Summarise age distribution and sex breakdown:
cc.collect(
baseline.aggregate(
n_people=baseline.subject_id.nunique(),
age_mean=baseline.age.mean(),
age_sd=baseline.age.std(),
)
)
sex_summary = (
baseline.group_by("gender_concept_id")
.aggregate(n=baseline.subject_id.count())
.order_by(ibis.desc("n"))
)
sex_with_names = sex_summary.join(
cdm.concept.select("concept_id", "concept_name"),
sex_summary.gender_concept_id == cdm.concept.concept_id,
how="left",
).select("gender_concept_id", "concept_name", "n")
cc.collect(sex_with_names)9.1 9.1 The same baseline step with PatientProfiles helpers
The manual joins above are useful for learning the OMOP structure. Once you know the pattern, the PatientProfiles helpers can add common baseline fields for you.
profiles = cc.add_demographics(
cdm["gibleed_cohort"],
cdm,
index_date="cohort_start_date",
)
cc.collect(
profiles.select(
"subject_id",
"cohort_start_date",
"age",
"sex",
"prior_observation",
"future_observation",
).limit(10)
)This is often the quickest way to enrich a cohort table with age, sex, and observation time before building descriptive summaries.
9.2 9.2 CohortCharacteristics helpers
summarise_characteristics() produces a standardised result object with counts and demographic summaries for the cohort.
characteristics = cc.summarise_characteristics(
cdm["gibleed_cohort"],
cdm=cdm,
table_name="gibleed_cohort",
)
characteristics.results.head(10)If you want a wide, analyst-friendly table, use table_characteristics().
cc.table_characteristics(characteristics).head(10)9.3 9.3 visOmopResults-style table formatting
The visOmopResults-style helpers take the long summarised result and prepare it for presentation. Using type="dataframe" keeps the example lightweight and portable.
cc.vis_omop_table(
characteristics.results,
type="dataframe",
).head(10)For more custom layouts, vis_table() lets you format the same result object with your own header, grouping, and estimate label rules.
cc.vis_table(
characteristics.results,
type="dataframe",
rename={"cdm_name": "Database"},
).head(10)10 10. Outcomes: count events during time-at-risk
For each cohort entry, look for records between cohort_start_date and cohort_end_date.
10.1 10.1 Death during cohort window (if available)
if "death" in cdm.tables:
death_tbl = cdm.death
cohort_tbl = cdm["gibleed_cohort"]
death_joined = cohort_tbl.join(
death_tbl.select("person_id", "death_date"),
cohort_tbl.subject_id == death_tbl.person_id,
how="left",
)
death_joined = death_joined.mutate(
death_in_window=(
death_joined.death_date.isnull().not()
& (death_joined.death_date >= death_joined.cohort_start_date)
& (death_joined.death_date <= death_joined.cohort_end_date)
)
)
death_in_tar = death_joined.group_by("cohort_definition_id").aggregate(
n_entries=death_joined.count(),
n_deaths_in_window=death_joined.death_in_window.sum(),
)
cc.collect(death_in_tar)
else:
print("No death table found in this CDM.")10.2 10.2 Condition outcome during cohort window (general pattern)
Join cohort to condition_occurrence, filter condition dates within TAR, then summarise.
cohort_tbl = cdm["gibleed_cohort"]
cond = cdm.condition_occurrence
concept_outcome = 192671
outcome_joined = cohort_tbl.join(
cond.select("person_id", "condition_concept_id", "condition_start_date"),
cohort_tbl.subject_id == cond.person_id,
how="inner",
)
outcome_joined = outcome_joined.filter(
outcome_joined.condition_concept_id == concept_outcome,
outcome_joined.condition_start_date >= outcome_joined.cohort_start_date,
outcome_joined.condition_start_date <= outcome_joined.cohort_end_date,
)
outcome_counts = outcome_joined.aggregate(
n_events=outcome_joined.count(),
n_people_with_event=outcome_joined.subject_id.nunique(),
)
cc.collect(outcome_counts)11 11. Rates: events per person-time
Crude incidence rate = events / total time-at-risk (e.g. per person-year).
cohort_tbl = cdm["gibleed_cohort"]
cond = cdm.condition_occurrence
tar_expr = cohort_tbl.cohort_end_date - cohort_tbl.cohort_start_date
cohort_tar = cohort_tbl.mutate(tar_days=tar_expr)
tar = cc.collect(cohort_tar.aggregate(total_tar_days=cohort_tar.tar_days.sum()))
events_df = cc.collect(
cohort_tbl.join(
cond.select("person_id", "condition_concept_id", "condition_start_date"),
(cohort_tbl.subject_id == cond.person_id)
& (cond.condition_concept_id == concept_outcome)
& (cond.condition_start_date >= cohort_tbl.cohort_start_date)
& (cond.condition_start_date <= cohort_tbl.cohort_end_date),
how="inner",
).aggregate(n_events=ibis._.count())
)
total_tar_days = tar["total_tar_days"].iloc[0]
n_events = events_df["n_events"].iloc[0]
total_tar_years = total_tar_days / 365.25
rate_per_py = n_events / total_tar_years
print(f"Total TAR (days): {total_tar_days}, TAR (years): {total_tar_years:.2f}")
print(f"Events: {n_events}, Rate per person-year: {rate_per_py:.4f}")12 12. Practical SQL-on-CDM skills
12.1 12.1 semi_join to restrict to a cohort
Filter any CDM table to individuals in your cohort using a join + filter (semi_join pattern: keep only rows that match cohort subjects).
cohort_subjects = cdm["gibleed_cohort"].select("subject_id").distinct()
drugs_cohort_j = cdm.drug_exposure.join(
cohort_subjects, cdm.drug_exposure.person_id == cohort_subjects.subject_id, how="inner"
)
drugs_in_cohort = (
drugs_cohort_j.group_by("drug_concept_id")
.aggregate(n=drugs_cohort_j.count())
.order_by(ibis.desc("n"))
.limit(10)
)
drugs_with_names = drugs_in_cohort.join(
cdm.concept.select("concept_id", "concept_name"),
drugs_in_cohort.drug_concept_id == cdm.concept.concept_id,
how="left",
).select("drug_concept_id", "concept_name", "n")
cc.collect(drugs_with_names)12.2 12.2 Time-window joins (a core observational pattern)
Restrict drug exposures to the cohort TAR window:
cohort_tbl = cdm["gibleed_cohort"]
drugs_tar_joined = cohort_tbl.join(
cdm.drug_exposure.select("person_id", "drug_concept_id", "drug_exposure_start_date"),
cohort_tbl.subject_id == cdm.drug_exposure.person_id,
how="inner",
)
drugs_tar_joined = drugs_tar_joined.filter(
drugs_tar_joined.drug_exposure_start_date >= drugs_tar_joined.cohort_start_date,
drugs_tar_joined.drug_exposure_start_date <= drugs_tar_joined.cohort_end_date,
)
drugs_in_tar = (
drugs_tar_joined.group_by("drug_concept_id")
.aggregate(n=drugs_tar_joined.count())
.order_by(ibis.desc("n"))
.limit(10)
)
drugs_in_tar_named = drugs_in_tar.join(
cdm.concept.select("concept_id", "concept_name"),
drugs_in_tar.drug_concept_id == cdm.concept.concept_id,
how="left",
).select("drug_concept_id", "concept_name", "n")
cc.collect(drugs_in_tar_named)13 13. Performance and materialization
On large CDMs, keep queries lazy as long as possible; use cc.compute() to materialize intermediate results into the database when reused.
# Example: materialize a result for reuse
drugs_tar_j = cohort_tbl.join(
cdm.drug_exposure.select("person_id", "drug_concept_id", "drug_exposure_start_date"),
(cohort_tbl.subject_id == cdm.drug_exposure.person_id)
& (cdm.drug_exposure.drug_exposure_start_date >= cohort_tbl.cohort_start_date)
& (cdm.drug_exposure.drug_exposure_start_date <= cohort_tbl.cohort_end_date),
how="inner",
)
drugs_tar_for_compute = drugs_tar_j.select("drug_concept_id").group_by("drug_concept_id").aggregate(n=drugs_tar_j.count())
# Materialize into the write schema (name is backend-dependent)
tar_drugs_mat = cc.compute(drugs_tar_for_compute, name="tmp_tar_drugs")
cc.collect(tar_drugs_mat.limit(5))14 14. Putting it together: a tiny “study report” function
A realistic workflow is to write functions that accept a Cdm and return a result table.
def summarise_cohort_basics(cdm, cohort_table_name):
cohort_tbl = cdm[cohort_table_name]
person = cdm.person
joined = cohort_tbl.join(
person.select("person_id", "gender_concept_id", "year_of_birth", "month_of_birth", "day_of_birth"),
cohort_tbl.subject_id == person.person_id,
how="inner",
)
joined = joined.mutate(
birth_month=ibis.coalesce(joined.month_of_birth, 6),
birth_day=ibis.coalesce(joined.day_of_birth, 15),
)
joined = joined.mutate(
birth_date=ibis.date(joined.year_of_birth, joined.birth_month, joined.birth_day),
age=((joined.cohort_start_date - joined.birth_date).day() / 365.25).floor(),
)
return joined.aggregate(
n_entries=joined.count(),
n_people=joined.subject_id.nunique(),
age_mean=joined.age.mean(),
age_p50=joined.age.median(),
)
cc.collect(summarise_cohort_basics(cdm, "gibleed_cohort"))15 15. Next steps (what to learn after this)
To grow this into a full observational analysis course:
- Cohort design rigor: washout periods, prior observation, inclusion criteria, censoring rules.
- Confounding + covariates: baseline covariate construction from condition/drug/measurement history.
- Comparative effectiveness: target trial emulation, exposure cohorts, propensity scores.
- Outcome modeling: incidence rates, Kaplan–Meier, Cox models (with careful design).
- Reusable pipelines: parameterized functions taking a
Cdm, producing standardized outputs.
16 16. Clean up
cdm.disconnect()Provenance: CDMConnector provides helpers to download/cache Eunomia example CDMs (including “GiBleed”) and connect via Ibis to a DuckDB copy. See eunomia_dir and CDMConnector docs.