Cohorts from JSON: CIRCE / Atlas

Load cohort definition JSON from file, render SQL (CIRCE), and materialise a cohort table.

You will learn

  • How to load cohort definition JSON from the repo (e.g. docs-src/assets/cohort_json/)
  • How to render cohort SQL using cdmconnector’s CIRCE-style API (build_cohort_query, render_cohort_sql)
  • How to run the rendered SQL to materialise a cohort table (skeleton)
  • How to use SQLGlot to translate cohort SQL to different dialects (e.g. Postgres) when using another backend

Story question

“How do we go from an Atlas/CIRCE JSON export to a cohort table in our CDM?”


Setup

We use GiBleed and/or synpuf-1k; cohort JSONs live in docs-src/assets/cohort_json/ (e.g. gibleed_simple.json, concept_set_example.json). Paths are relative to repo root or notebook location.

from pathlib import Path
import json
import cdmconnector as cc
import ibis

# Path to cohort JSONs: run from repo root, or set COHORT_JSON_DIR to docs-src/assets/cohort_json
COHORT_JSON_DIR = Path("docs-src/assets/cohort_json").resolve()
if not COHORT_JSON_DIR.exists():
    COHORT_JSON_DIR = Path(".").resolve() / "docs-src" / "assets" / "cohort_json"
gibleed_json_path = COHORT_JSON_DIR / "gibleed_simple.json"

path = cc.eunomia_dir("GiBleed", cdm_version="5.3")
con = ibis.duckdb.connect(path)
cdm = cc.cdm_from_con(con, cdm_schema="main", write_schema="main", cdm_name="eunomia")

Explore: Load JSON from file

Read the cohort definition JSON (CIRCE/Atlas format).

with open(gibleed_json_path) as f:
    cohort_json_str = f.read()
cohort_expression = json.loads(cohort_json_str)
print("Keys:", list(cohort_expression.keys()))

Build: Render cohort SQL

Use cdmconnector’s internal CIRCE API: build_cohort_query (requires Circepy) returns SQL with @placeholders; render_cohort_sql replaces placeholders with schema/table names. If Circepy is not installed, wrap in try/except and raise NotImplementedError with a clear message.

from cdmconnector._circe import (
    build_cohort_query,
    create_generate_options,
    render_cohort_sql,
)

try:
    opts = create_generate_options(
        cdm_schema="main",
        target_table="cohort",
        result_schema="main",
        cohort_id=1,
        generate_stats=True,
    )
    sql_with_placeholders = build_cohort_query(cohort_json_str, opts)
    rendered_sql = render_cohort_sql(
        sql_with_placeholders,
        cdm_database_schema="main",
        target_database_schema="main",
        target_cohort_table="cohort",
        target_cohort_id=1,
        cohort_stem="cohort",
    )
    print(rendered_sql[:500] + "..." if len(rendered_sql) > 500 else rendered_sql)
except NotImplementedError as e:
    # Circepy not available or API not implemented
    print("Circepy not available:", e)
    raise NotImplementedError(
        "Cohort SQL generation requires Circepy. Install with: pip install 'CDMConnector[duckdb]' or use a cohort_definition_set with a 'sql' column."
    ) from e

Translate SQL with SQLGlot (generic SQL → different dialects)

CIRCE returns SQL in one dialect (e.g. DuckDB). To run the same cohort logic on Postgres, Spark, or another backend, use SQLGlot to translate the SQL. Prefer Ibis for most queries; use SQLGlot when you have existing SQL (e.g. from CIRCE) and need dialect-specific output.

# pip install sqlglot
try:
    import sqlglot
    # Parse as DuckDB (CIRCE default), translate to Postgres for use on PostgreSQL
    if "rendered_sql" in dir() and rendered_sql.strip():
        parsed = sqlglot.parse_one(rendered_sql.strip().split(";")[0], dialect="duckdb")
        postgres_sql = parsed.sql(dialect="postgres")
        print("Translated first statement to Postgres:")
        print(postgres_sql[:400] + "..." if len(postgres_sql) > 400 else postgres_sql)
except ImportError:
    print("SQLGlot not installed. Install with: pip install sqlglot")

Interpret: Run SQL to materialise cohort table (skeleton)

Execute the rendered SQL against the connection to create/insert into the cohort table. Backend-specific (DuckDB: con.raw_sql or similar).

# Execute rendered_sql against the DuckDB connection (run the previous cell first to get rendered_sql)
import re
con = cdm.source.con
raw_sql = getattr(con, "raw_sql", None)
if raw_sql is not None and "rendered_sql" in dir():
    statements = [
        s.strip() + ";" if s.strip() and not s.strip().startswith("--") else ""
        for s in re.split(r";\s*", rendered_sql)
    ]
    statements = [s for s in statements if s and s != ";"]
    for stmt in statements:
        stmt = stmt.strip()
        if stmt and not stmt.startswith("--"):
            if not stmt.endswith(";"):
                stmt = stmt + ";"
            raw_sql(stmt)
    # Reload table reference and show first rows (cohort table name may include schema/prefix)
    if "cohort" in cdm:
        cc.collect(cdm["cohort"].limit(10))
else:
    # Run the previous cell first to get rendered_sql; or use cc.generate_cohort_set(cdm, cohort_set_df, name="cohort", overwrite=True) to build and run in one step
    if "cohort" in cdm:
        cc.collect(cdm["cohort"].limit(10))

Exercises

  • Edit the JSON (e.g. change concept set or primary criteria), save to a new file, rerun render and compare cohort counts.
  • Load concept_set_example.json (concept set only) and document how it fits into a full cohort definition.
  • Compare counts: run the same JSON on GiBleed vs synpuf-1k and summarise differences.

What we learned

  • Load JSON from file (e.g. Atlas export); build_cohort_query (Circepy) turns it into SQL with @placeholders.
  • render_cohort_sql replaces placeholders with cdm_schema, target_cohort_table, target_cohort_id, etc.
  • Run SQL to materialise the cohort table; generate_cohort_set can do both (build + run) from a cohort_definition_set DataFrame.
  • SQLGlot: when you have SQL (e.g. from CIRCE), use sqlglot.parse_one(sql, dialect="duckdb").sql(dialect="postgres") to translate to another backend. Prefer Ibis for most queries; use SQLGlot for existing SQL strings.

Cleanup

cdm.disconnect()