Database Connection Examples
The following connection examples are provided for reference.
All examples use cdmconnector.cdm_from_con() with an Ibis backend connection.
Prerequisites
Install cdmconnector from GitHub plus the backend extras you need. This is the closest Python equivalent to R’s devtools::install_github():
pip install "cdmconnector @ git+https://github.com/OHDSI/pyCDMConnector.git"
pip install "cdmconnector[postgres] @ git+https://github.com/OHDSI/pyCDMConnector.git"
pip install "cdmconnector[snowflake] @ git+https://github.com/OHDSI/pyCDMConnector.git"
pip install "cdmconnector[bigquery] @ git+https://github.com/OHDSI/pyCDMConnector.git"Some backends, such as SQL Server or Databricks, may also require the corresponding Ibis backend dependencies in your environment.
Then import the package:
import cdmconnector as cc
import ibisPostgreSQL
Connect to PostgreSQL with the Ibis Postgres backend:
import cdmconnector as cc
import ibis
import os
con = ibis.postgres.connect(
host=os.getenv("CDM5_POSTGRESQL_HOST", "localhost"),
port=int(os.getenv("CDM5_POSTGRESQL_PORT", "5432")),
database=os.getenv("CDM5_POSTGRESQL_DBNAME"),
user=os.getenv("CDM5_POSTGRESQL_USER"),
password=os.getenv("CDM5_POSTGRESQL_PASSWORD"),
)
cdm = cc.cdm_from_con(
con,
cdm_schema=os.getenv("CDM5_POSTGRESQL_CDM_SCHEMA"),
write_schema=os.getenv("CDM5_POSTGRESQL_SCRATCH_SCHEMA"),
cdm_name="Postgres CDM",
)
con.disconnect()Redshift
Redshift uses the PostgreSQL wire protocol, so the Ibis Postgres backend is used here too:
import cdmconnector as cc
import ibis
import os
con = ibis.postgres.connect(
host=os.getenv("CDM5_REDSHIFT_HOST", "localhost"),
port=int(os.getenv("CDM5_REDSHIFT_PORT", "5439")),
database=os.getenv("CDM5_REDSHIFT_DBNAME"),
user=os.getenv("CDM5_REDSHIFT_USER"),
password=os.getenv("CDM5_REDSHIFT_PASSWORD"),
)
cdm = cc.cdm_from_con(
con,
cdm_schema=os.getenv("CDM5_REDSHIFT_CDM_SCHEMA"),
write_schema=os.getenv("CDM5_REDSHIFT_SCRATCH_SCHEMA"),
cdm_name="Redshift CDM",
)
con.disconnect()SQL Server
For SQL Server, cdm_schema and write_schema are often best passed as { "catalog": ..., "schema": ... }:
import cdmconnector as cc
import ibis
import os
con = ibis.mssql.connect(
host=os.getenv("CDM5_SQL_SERVER_SERVER", "localhost"),
port=int(os.getenv("CDM5_SQL_SERVER_PORT", "1433")),
database=os.getenv("CDM5_SQL_SERVER_CDM_DATABASE"),
user=os.getenv("CDM5_SQL_SERVER_USER"),
password=os.getenv("CDM5_SQL_SERVER_PASSWORD"),
)
cdm = cc.cdm_from_con(
con,
cdm_schema={
"catalog": "cdmv54",
"schema": "dbo",
},
write_schema={
"catalog": "tempdb",
"schema": "dbo",
},
cdm_name="SQL Server CDM",
)
con.disconnect()If you store fully-qualified schemas in environment variables such as catalog.schema, you can split them before calling cdm_from_con():
import os
catalog, schema = os.environ["CDM5_SQL_SERVER_CDM_SCHEMA"].split(".", 1)
scratch_catalog, scratch_schema = os.environ["CDM5_SQL_SERVER_SCRATCH_SCHEMA"].split(".", 1)
cdm = cc.cdm_from_con(
con,
cdm_schema={"catalog": catalog, "schema": schema},
write_schema={"catalog": scratch_catalog, "schema": scratch_schema},
)Snowflake
Snowflake also typically uses catalog/schema pairs:
import cdmconnector as cc
import ibis
import os
con = ibis.snowflake.connect(
account=os.getenv("SNOWFLAKE_ACCOUNT"),
user=os.getenv("SNOWFLAKE_USER"),
password=os.getenv("SNOWFLAKE_PASSWORD"),
database=os.getenv("SNOWFLAKE_DATABASE"),
warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
)
cdm = cc.cdm_from_con(
con,
cdm_schema={
"catalog": "OMOP_SYNTHETIC_DATASET",
"schema": "CDM53",
},
write_schema={
"catalog": "ATLAS",
"schema": "RESULTS",
},
cdm_name="Snowflake CDM",
)
con.disconnect()If your environment variables already contain catalog.schema values:
import os
cdm_catalog, cdm_schema = os.environ["SNOWFLAKE_CDM_SCHEMA"].split(".", 1)
write_catalog, write_schema = os.environ["SNOWFLAKE_SCRATCH_SCHEMA"].split(".", 1)
cdm = cc.cdm_from_con(
con,
cdm_schema={"catalog": cdm_catalog, "schema": cdm_schema},
write_schema={"catalog": write_catalog, "schema": write_schema},
)Databricks / Spark
Connect to Databricks using the Ibis Databricks backend:
import cdmconnector as cc
import ibis
import os
con = ibis.databricks.connect(
server_hostname=os.getenv("DATABRICKS_HOST"),
http_path=os.getenv("DATABRICKS_HTTPPATH"),
access_token=os.getenv("DATABRICKS_TOKEN"),
)
cdm = cc.cdm_from_con(
con,
cdm_schema=os.getenv("DATABRICKS_CDM_SCHEMA", "gibleed"),
write_schema=os.getenv("DATABRICKS_SCRATCH_SCHEMA", "scratch"),
cdm_name="Databricks CDM",
)
con.disconnect()DuckDB
DuckDB is useful for local development and example datasets such as Eunomia:
import cdmconnector as cc
import ibis
path = cc.eunomia_dir("GiBleed", cdm_version="5.3")
con = ibis.duckdb.connect(path)
cdm = cc.cdm_from_con(
con,
cdm_schema="main",
write_schema="main",
cdm_name="GiBleed",
)
con.disconnect()BigQuery
BigQuery support is also available through Ibis:
import cdmconnector as cc
import ibis
import os
con = ibis.bigquery.connect(
project_id=os.getenv("BIGQUERY_PROJECT_ID"),
)
cdm = cc.cdm_from_con(
con,
cdm_schema=os.getenv("BIGQUERY_CDM_SCHEMA"),
write_schema=os.getenv("BIGQUERY_SCRATCH_SCHEMA"),
cdm_name="BigQuery CDM",
)
con.disconnect()Notes
- Use
write_schemafor cohort tables and other materialized outputs created bycompute()orgenerate_cohort_set(). - For SQL Server and Snowflake, prefer dict schemas when you need explicit
catalogplusschema. - The environment variable names shown here match the package’s test fixtures and examples.