Database Connection Examples

Reference connection examples for using CDMConnector with common Ibis database backends.

The following connection examples are provided for reference.

All examples use cdmconnector.cdm_from_con() with an Ibis backend connection.

Prerequisites

Install cdmconnector from GitHub plus the backend extras you need. This is the closest Python equivalent to R’s devtools::install_github():

pip install "cdmconnector @ git+https://github.com/OHDSI/pyCDMConnector.git"
pip install "cdmconnector[postgres] @ git+https://github.com/OHDSI/pyCDMConnector.git"
pip install "cdmconnector[snowflake] @ git+https://github.com/OHDSI/pyCDMConnector.git"
pip install "cdmconnector[bigquery] @ git+https://github.com/OHDSI/pyCDMConnector.git"

Some backends, such as SQL Server or Databricks, may also require the corresponding Ibis backend dependencies in your environment.

Then import the package:

import cdmconnector as cc
import ibis

PostgreSQL

Connect to PostgreSQL with the Ibis Postgres backend:

import cdmconnector as cc
import ibis
import os

con = ibis.postgres.connect(
    host=os.getenv("CDM5_POSTGRESQL_HOST", "localhost"),
    port=int(os.getenv("CDM5_POSTGRESQL_PORT", "5432")),
    database=os.getenv("CDM5_POSTGRESQL_DBNAME"),
    user=os.getenv("CDM5_POSTGRESQL_USER"),
    password=os.getenv("CDM5_POSTGRESQL_PASSWORD"),
)

cdm = cc.cdm_from_con(
    con,
    cdm_schema=os.getenv("CDM5_POSTGRESQL_CDM_SCHEMA"),
    write_schema=os.getenv("CDM5_POSTGRESQL_SCRATCH_SCHEMA"),
    cdm_name="Postgres CDM",
)

con.disconnect()

Redshift

Redshift uses the PostgreSQL wire protocol, so the Ibis Postgres backend is used here too:

import cdmconnector as cc
import ibis
import os

con = ibis.postgres.connect(
    host=os.getenv("CDM5_REDSHIFT_HOST", "localhost"),
    port=int(os.getenv("CDM5_REDSHIFT_PORT", "5439")),
    database=os.getenv("CDM5_REDSHIFT_DBNAME"),
    user=os.getenv("CDM5_REDSHIFT_USER"),
    password=os.getenv("CDM5_REDSHIFT_PASSWORD"),
)

cdm = cc.cdm_from_con(
    con,
    cdm_schema=os.getenv("CDM5_REDSHIFT_CDM_SCHEMA"),
    write_schema=os.getenv("CDM5_REDSHIFT_SCRATCH_SCHEMA"),
    cdm_name="Redshift CDM",
)

con.disconnect()

SQL Server

For SQL Server, cdm_schema and write_schema are often best passed as { "catalog": ..., "schema": ... }:

import cdmconnector as cc
import ibis
import os

con = ibis.mssql.connect(
    host=os.getenv("CDM5_SQL_SERVER_SERVER", "localhost"),
    port=int(os.getenv("CDM5_SQL_SERVER_PORT", "1433")),
    database=os.getenv("CDM5_SQL_SERVER_CDM_DATABASE"),
    user=os.getenv("CDM5_SQL_SERVER_USER"),
    password=os.getenv("CDM5_SQL_SERVER_PASSWORD"),
)

cdm = cc.cdm_from_con(
    con,
    cdm_schema={
        "catalog": "cdmv54",
        "schema": "dbo",
    },
    write_schema={
        "catalog": "tempdb",
        "schema": "dbo",
    },
    cdm_name="SQL Server CDM",
)

con.disconnect()

If you store fully-qualified schemas in environment variables such as catalog.schema, you can split them before calling cdm_from_con():

import os

catalog, schema = os.environ["CDM5_SQL_SERVER_CDM_SCHEMA"].split(".", 1)
scratch_catalog, scratch_schema = os.environ["CDM5_SQL_SERVER_SCRATCH_SCHEMA"].split(".", 1)

cdm = cc.cdm_from_con(
    con,
    cdm_schema={"catalog": catalog, "schema": schema},
    write_schema={"catalog": scratch_catalog, "schema": scratch_schema},
)

Snowflake

Snowflake also typically uses catalog/schema pairs:

import cdmconnector as cc
import ibis
import os

con = ibis.snowflake.connect(
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    database=os.getenv("SNOWFLAKE_DATABASE"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
)

cdm = cc.cdm_from_con(
    con,
    cdm_schema={
        "catalog": "OMOP_SYNTHETIC_DATASET",
        "schema": "CDM53",
    },
    write_schema={
        "catalog": "ATLAS",
        "schema": "RESULTS",
    },
    cdm_name="Snowflake CDM",
)

con.disconnect()

If your environment variables already contain catalog.schema values:

import os

cdm_catalog, cdm_schema = os.environ["SNOWFLAKE_CDM_SCHEMA"].split(".", 1)
write_catalog, write_schema = os.environ["SNOWFLAKE_SCRATCH_SCHEMA"].split(".", 1)

cdm = cc.cdm_from_con(
    con,
    cdm_schema={"catalog": cdm_catalog, "schema": cdm_schema},
    write_schema={"catalog": write_catalog, "schema": write_schema},
)

Databricks / Spark

Connect to Databricks using the Ibis Databricks backend:

import cdmconnector as cc
import ibis
import os

con = ibis.databricks.connect(
    server_hostname=os.getenv("DATABRICKS_HOST"),
    http_path=os.getenv("DATABRICKS_HTTPPATH"),
    access_token=os.getenv("DATABRICKS_TOKEN"),
)

cdm = cc.cdm_from_con(
    con,
    cdm_schema=os.getenv("DATABRICKS_CDM_SCHEMA", "gibleed"),
    write_schema=os.getenv("DATABRICKS_SCRATCH_SCHEMA", "scratch"),
    cdm_name="Databricks CDM",
)

con.disconnect()

DuckDB

DuckDB is useful for local development and example datasets such as Eunomia:

import cdmconnector as cc
import ibis

path = cc.eunomia_dir("GiBleed", cdm_version="5.3")
con = ibis.duckdb.connect(path)

cdm = cc.cdm_from_con(
    con,
    cdm_schema="main",
    write_schema="main",
    cdm_name="GiBleed",
)

con.disconnect()

BigQuery

BigQuery support is also available through Ibis:

import cdmconnector as cc
import ibis
import os

con = ibis.bigquery.connect(
    project_id=os.getenv("BIGQUERY_PROJECT_ID"),
)

cdm = cc.cdm_from_con(
    con,
    cdm_schema=os.getenv("BIGQUERY_CDM_SCHEMA"),
    write_schema=os.getenv("BIGQUERY_SCRATCH_SCHEMA"),
    cdm_name="BigQuery CDM",
)

con.disconnect()

Notes

  • Use write_schema for cohort tables and other materialized outputs created by compute() or generate_cohort_set().
  • For SQL Server and Snowflake, prefer dict schemas when you need explicit catalog plus schema.
  • The environment variable names shown here match the package’s test fixtures and examples.