Performance: Pushdown and Materialization

Avoid anti-patterns (execute too early, pull to pandas); use compile, defer execution, and materialize intermediates.

You will learn

  • Anti-patterns: calling .execute() or collect() too early, pulling large tables to pandas
  • Patterns: compile SQL, defer execution until the end
  • How to materialize intermediate results (temp table / snapshot) when supported
  • How to refactor a “bad” pipeline into a lazy pipeline

Story question

“How do we keep work in the database and avoid unnecessary data transfer?”


Setup

synpuf-1k for realistic-sized queries.

from pathlib import Path
import cdmconnector as cc
import ibis

path = cc.eunomia_dir("synpuf-1k", cdm_version="5.3")
con = ibis.duckdb.connect(path)
cdm = cc.cdm_from_con(con, cdm_schema="main", write_schema="main", cdm_name="eunomia")

Explore: Anti-patterns

Bad: Execute early (e.g. collect a large table), then do pandas operations. Good: Build one lazy pipeline and execute once at the end.

# Anti-pattern: pull entire table to pandas then filter in Python
# df = cc.collect(cdm.person)  # unnecessary transfer
# df = df[df["year_of_birth"] >= 1990]  # work done in Python

# Good: filter in the database, then collect only the result
person = cdm.person
expr = person.filter(person.year_of_birth >= 1990).select("person_id", "year_of_birth").limit(10)
cc.collect(expr)

Build: Defer execution and compile SQL

Build a multi-step pipeline (join, filter, aggregate) without executing; show compiled SQL.

person = cdm.person
op = cdm.observation_period
pipeline = (
    person
    .join(op, person.person_id == op.person_id, how="inner")
    .filter(person.year_of_birth >= 1950)
    .select(person.person_id, person.year_of_birth, op.observation_period_start_date, op.observation_period_end_date)
    .limit(100)
)
print(pipeline.compile())
cc.collect(pipeline)

Interpret: Materialize intermediate results

When a sub-result is reused many times, materialize it in the write schema with cdm.insert_table. You can pass an Ibis expression or a pandas/pyarrow table. Use cdm.drop_table to remove tables when no longer needed.

# Materialize an intermediate table in the write schema with insert_table.
# Example: persons born >= 1950, so downstream queries can join to it without recomputing.
person = cdm.person
subset_expr = person.filter(person.year_of_birth >= 1950).select(
    "person_id", "year_of_birth", "gender_concept_id"
)
cdm.insert_table("persons_1950_plus", subset_expr, overwrite=True)

# Use the new table (it is now in cdm)
cc.collect(cdm["persons_1950_plus"].count())

# Drop the table when no longer needed
cdm.drop_table("persons_1950_plus")

Exercises

  • Refactor a “bad” pipeline: take a workflow that does collect() in the middle and rewrite it as one lazy expression + single collect() at the end.
  • Compare compile() output for a query with vs without an unnecessary .limit(1000) in the middle.
  • Use cdm.snapshot() (if available) to capture CDM metadata without pulling large tables.

What we learned

  • Execute only at the end: avoid collect()/execute() until you have the final expression.
  • Pushdown: filter, join, aggregate in the database; use expr.compile() to inspect SQL.
  • Materialize heavy intermediates when reused: use cdm.insert_table to write an Ibis expression or DataFrame into the write schema, then cdm.drop_table when done.

Cleanup

cdm.disconnect()