from pathlib import Path
import cdmconnector as cc
import ibis
import pyarrow as pa
import datetime
path = cc.eunomia_dir("empty_cdm", cdm_version="5.3")
con = ibis.duckdb.connect(path)
cdm = cc.cdm_from_con(con, cdm_schema="main", write_schema="main", cdm_name="eunomia")Cohorts 101: Tables, Attrition, and Counts
Create a cohort table in the write schema, insert minimal synthetic data, and record attrition.
You will learn
- How to use empty_cdm and create synthetic minimal tables (person, observation_period, condition_occurrence) or insert minimal cohort rows
- How to create a cohort table in the write schema (cdmconnector helper or Ibis DDL)
- How to record attrition steps (if supported) or use the attrition table pattern
- How to get cohort counts from cohort_attrition
Story question
“How do we create a cohort table, add membership rows, and track attrition?”
Setup
We use empty_cdm and optionally insert minimal synthetic data into the write schema (DuckDB).
Explore: Create cohort table in write schema
Use new_cohort_table from cdmconnector.cohorts to create an empty cohort table (and cohort_set, cohort_attrition).
from cdmconnector.cohorts import new_cohort_table
# Create empty cohort table "my_cohort" in write schema
cdm = new_cohort_table(cdm, "my_cohort", overwrite=True)
# Access the new table
cohort_tbl = cdm["my_cohort"]
cc.collect(cohort_tbl.limit(5))Build: Synthetic minimal cohort rows
Insert minimal cohort rows (cohort_definition_id, subject_id, cohort_start_date, cohort_end_date) using cdm.insert_table. We overwrite the empty table with one that contains the synthetic rows.
# Build a small Arrow table with cohort rows and insert into write schema (overwrites empty table)
cohort_rows = pa.table({
"cohort_definition_id": pa.array([1, 1], type=pa.int64()),
"subject_id": pa.array([1, 2], type=pa.int64()),
"cohort_start_date": pa.array([datetime.date(2020, 1, 1), datetime.date(2020, 1, 1)], type=pa.date32()),
"cohort_end_date": pa.array([datetime.date(2020, 12, 31), datetime.date(2020, 12, 31)], type=pa.date32()),
})
cdm.insert_table("my_cohort", cohort_rows, overwrite=True)
# Populate cohort_set so the definition is registered (optional but useful for reporting)
cdm.insert_table("my_cohort_set", pa.table({
"cohort_definition_id": pa.array([1], type=pa.int64()),
"cohort_name": pa.array(["Demo cohort"], type=pa.string()),
}), overwrite=True)
# Show the inserted cohort rows
cc.collect(cdm["my_cohort"])Interpret: Attrition and cohort counts
If cohort_attrition is populated (e.g. by generate_cohort_set), use cdm.cohort_count and cdm.attrition from cdmconnector.cohorts. Otherwise placeholder for attrition table pattern.
from cdmconnector.cohorts import cohort_count, attrition, record_cohort_attrition
# cohort_count and attrition expect a cohort object with cohort_attrition attribute
# (e.g. result of generate_cohort_set attaches cohort_set and cohort_attrition to the returned table).
# For a manually created cohort we can read counts and attrition tables directly:
cc.collect(cdm["my_cohort"].aggregate(n_subjects=cdm["my_cohort"].subject_id.count()))
cc.collect(cdm["my_cohort_set"])
cc.collect(cdm["my_cohort_attrition"])
# If the cohort object has cohort_attrition attached (e.g. from generate_cohort_set), use:
# counts = cohort_count(cdm["cohort"]) # requires cohort to have .cohort_attrition
# attr_df = attrition(cdm["cohort"])
try:
cohort_count(cdm["my_cohort"])
except Exception as e:
print("cohort_count requires cohort_attrition attribute:", type(e).__name__, str(e)[:80])Exercises
- Add exclusions: e.g. filter out persons with no observation_period before inserting cohort rows; record as an attrition step.
- Add more attrition steps using record_cohort_attrition (reason string, optional cohort_id).
- Populate cohort_set (cohort_definition_id, cohort_name) and cohort_attrition (reason_id, reason, number_subjects, number_records, excluded_*) so cohort_count returns meaningful counts.
What we learned
- new_cohort_table(cdm, name) creates an empty cohort table plus cohort_set and cohort_attrition in the write schema.
- Cohort rows: cohort_definition_id, subject_id, cohort_start_date, cohort_end_date; insert via backend or cdm.insert_table.
- Attrition: record steps with record_cohort_attrition; cohort_count and attrition read from cohort_attrition when attached.
Cleanup
cdm.disconnect()