Skip to content

pgvector_concept_searcher

PgvectorConceptSearcher

Bases: AbstractConceptSearcher

A concept searcher that uses pgvector in a PostgreSQL database to find concepts based on embedding vectors.

Source code in src/ariadne/vector_search/pgvector_concept_searcher.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
class PgvectorConceptSearcher(AbstractConceptSearcher):
    """
    A concept searcher that uses pgvector in a PostgreSQL database to find concepts based on embedding vectors.
    """

    def __init__(self, for_evaluation: bool = False, include_synonyms: bool = True, include_mapped_terms: bool = True):
        self.for_evaluation = for_evaluation
        self.include_synonyms = include_synonyms
        self.include_mapped_terms = include_mapped_terms
        self.cost = 0.0

        if for_evaluation:
            self.concept_classes_to_ignore = [
                "Disposition",
                "Morph Abnormality",
                "Organism",
                "Qualifier Value",
                "Substance",
                "ICDO Condition",
            ]
            self.vocabularies_to_ignore = [
                "ICD9CM",
                "ICD10CM",
                "ICD10",
                "ICD10CN",
                "ICD10GM",
                "CIM10",
                "ICDO3",
                "KCD7",
                "Read",
            ]
            print("ConceptSearcher initialized in evaluation mode.")
        else:
            self.concept_classes_to_ignore = None
            self.vocabularies_to_ignore = None

        connection = psycopg.connect(get_environment_variable("VOCAB_CONNECTION_STRING").replace("+psycopg", ""))
        register_vector(connection)
        with connection.cursor() as cur:
            cur.execute("SET hnsw.ef_search = 1000")
            cur.execute("SET hnsw.iterative_scan = relaxed_order")
        self.connection = connection

    def close(self):
        self.connection.close()

    def _search_pgvector(self, source_vector: np.ndarray, limit: int) -> List:
        if self.concept_classes_to_ignore is None:
            ignore_string_class = "'dummy'"
        else:
            ignore_string_class = ", ".join(f"'{y}'" for y in self.concept_classes_to_ignore)

        if self.include_synonyms:
            term_type_clause = ""
        else:
            term_type_clause = "AND vectors.term_type = 'Name'"

        vocabulary_schema = get_environment_variable("VOCAB_SCHEMA")
        vector_table = get_environment_variable("VOCAB_VECTOR_TABLE")

        if self.include_mapped_terms:
            if self.vocabularies_to_ignore is None:
                ignore_string_vocab = "'dummy'"
            else:
                ignore_string_vocab = ", ".join(f"'{x}'" for x in self.vocabularies_to_ignore)
            query = f"""
                WITH target_concept AS (
                    SELECT concept_id,
                        concept_name,
                        MIN(relevance_score) AS relevance_score
                    FROM (
                        (
                            SELECT concept.concept_id,
                                concept.concept_name,
                                embedding_vector <=> %s AS relevance_score
                            FROM {vocabulary_schema}.{vector_table} vectors
                            INNER JOIN {vocabulary_schema}.concept source_concept
                                ON vectors.concept_id = source_concept.concept_id
                            INNER JOIN {vocabulary_schema}.concept_relationship
                                ON vectors.concept_id = concept_relationship.concept_id_1
                            INNER JOIN {vocabulary_schema}.concept
                                ON concept_relationship.concept_id_2 = concept.concept_id
                            WHERE relationship_id = 'Maps to'
                                AND source_concept.vocabulary_id NOT IN ({ignore_string_vocab})
                                AND concept.concept_class_id NOT IN ({ignore_string_class})
                                {term_type_clause}
                            ORDER BY embedding_vector <=> %s
                            LIMIT {limit * 4} -- May have duplicates due to synonyms
                        )

                        UNION ALL

                        (
                            SELECT concept.concept_id,
                                concept.concept_name,
                                embedding_vector <=> %s AS relevance_score
                            FROM {vocabulary_schema}.{vector_table} vectors
                            INNER JOIN {vocabulary_schema}.concept
                                ON vectors.concept_id = concept.concept_id
                            WHERE standard_concept = 'S'
                                AND concept.concept_class_id NOT IN ({ignore_string_class})
                                {term_type_clause}
                            ORDER BY embedding_vector <=> %s
                            LIMIT {limit * 4} -- May have duplicates due to synonyms
                        )
                    ) tmp
                    GROUP BY concept_id,
                        concept_name
                )
                SELECT target_concept.concept_id,
                    target_concept.concept_name,
                    target_concept.relevance_score
                FROM target_concept
                ORDER BY relevance_score
                LIMIT {limit};
            """
            with self.connection.cursor() as cur:
                cur.execute(query, (source_vector, source_vector, source_vector, source_vector))
                results = cur.fetchall()
        else:
            query = f"""
                WITH target_concept AS (
                    SELECT concept_id,
                        concept_name,
                        MIN(relevance_score) AS relevance_score
                    FROM (
                        SELECT concept.concept_id,
                            concept.concept_name,
                            embedding_vector <=> %s AS relevance_score
                        FROM {vocabulary_schema}.{vector_table} vectors
                        INNER JOIN {vocabulary_schema}.concept
                            ON vectors.concept_id = concept.concept_id
                        WHERE standard_concept = 'S'
                            AND concept.concept_class_id NOT IN ({ignore_string_class})
                            {term_type_clause}
                        ORDER BY embedding_vector <=> %s
                        LIMIT {limit * 4} -- May have duplicates due to synonyms
                    ) tmp
                    GROUP BY concept_id,
                        concept_name
                )
                SELECT target_concept.concept_id,
                    target_concept.concept_name,
                    target_concept.relevance_score
                FROM target_concept
                ORDER BY relevance_score
                LIMIT {limit};
            """
            with self.connection.cursor() as cur:
                cur.execute(query, (source_vector, source_vector))
                results = cur.fetchall()

        return results

    def search_term(self, term: str, limit: int = 25) -> Optional[pd.DataFrame]:
        """
        Searches for concepts matching the given term.

        Args:
            term: The clinical term to search for.
            limit: The maximum number of results to return.

        Returns:
            A DataFrame containing the matching concepts, or None if no matches are found.
        """
        vectors_with_usage = get_embedding_vectors([term])
        self.cost = self.cost + vectors_with_usage["usage"]["total_cost_usd"]
        vector = vectors_with_usage["embeddings"][0]
        results = self._search_pgvector(vector, limit)
        if not results:
            return None
        df = pd.DataFrame(
            results,
            columns=[
                "concept_id",
                "concept_name",
                "score",
            ],
        )
        return df

    def search_terms(
            self,
            df: pd.DataFrame,
            term_column: str,
            matched_concept_id_column: str = "matched_concept_id",
            matched_concept_name_column: str = "matched_concept_name",
            match_score_column: str = "match_score",
            match_rank_column: str = "match_rank",
            limit: int = 25,
            return_embeddings: bool = False,
    ):
        """
        Searches for concepts matching terms in a DataFrame column.

        Args:
            df: DataFrame containing the terms to search for.
            term_column: Name of the column with terms to search.
            matched_concept_id_column: Name of the column to store matched concept IDs.
            matched_concept_name_column: Name of the column to store matched concept names.
            match_score_column: Name of the column to store match scores.
            match_rank_column: Name of the column to store match ranks.
            limit: The maximum number of results to return for each term.
            return_embeddings: When True, also return a ``dict[term -> np.ndarray]``
                mapping each unique source term to its embedding vector.  The
                caller can pass these vectors to
                ``find_attributes_two_stage(..., precomputed_embedding=...)`` to
                skip the duplicate reference-retrieval embedding call in Step 2.

        Returns:
            When *return_embeddings* is False (default): a DataFrame containing
            the same columns as the input dataframe plus the matching concepts
            for each term (multiple rows per input term).

            When *return_embeddings* is True: a ``(results_df, term_to_vector)``
            tuple where ``term_to_vector`` is ``dict[str, np.ndarray]``.
        """

        terms = df[term_column].tolist()
        vectors_with_usage = get_embedding_vectors(terms)
        self.cost = self.cost + vectors_with_usage["usage"]["total_cost_usd"]
        vectors = vectors_with_usage["embeddings"]

        df = df.reset_index(drop=True)
        all_results = []
        term_to_vector: dict[str, np.ndarray] = {}
        for index, row in df.iterrows():
            term = row[term_column]
            vector = vectors[index]
            if return_embeddings:
                term_to_vector[term] = vector
            results = self._search_pgvector(vector, limit=limit)
            results = pd.DataFrame(
                results,
                columns=[
                    matched_concept_id_column,
                    matched_concept_name_column,
                    match_score_column,
                ],
            )
            results[match_rank_column] = range(1, len(results) + 1)
            orig_cols = list(df.columns)
            new_columns = list(results.columns)
            results[term_column] = term
            for col in df.columns:
                results[col] = row[col]
            results = results[orig_cols + new_columns]
            all_results.append(results)

        all_results = pd.concat(all_results)
        if return_embeddings:
            return all_results, term_to_vector
        return all_results

    def get_total_cost(self) -> float:
        """
        Returns the total cost incurred for embedding vector calls

        Returns:
            Total cost in USD.
        """

        return self.cost

get_total_cost()

Returns the total cost incurred for embedding vector calls

Returns:

Type Description
float

Total cost in USD.

Source code in src/ariadne/vector_search/pgvector_concept_searcher.py
290
291
292
293
294
295
296
297
298
def get_total_cost(self) -> float:
    """
    Returns the total cost incurred for embedding vector calls

    Returns:
        Total cost in USD.
    """

    return self.cost

search_term(term, limit=25)

Searches for concepts matching the given term.

Parameters:

Name Type Description Default
term str

The clinical term to search for.

required
limit int

The maximum number of results to return.

25

Returns:

Type Description
Optional[DataFrame]

A DataFrame containing the matching concepts, or None if no matches are found.

Source code in src/ariadne/vector_search/pgvector_concept_searcher.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def search_term(self, term: str, limit: int = 25) -> Optional[pd.DataFrame]:
    """
    Searches for concepts matching the given term.

    Args:
        term: The clinical term to search for.
        limit: The maximum number of results to return.

    Returns:
        A DataFrame containing the matching concepts, or None if no matches are found.
    """
    vectors_with_usage = get_embedding_vectors([term])
    self.cost = self.cost + vectors_with_usage["usage"]["total_cost_usd"]
    vector = vectors_with_usage["embeddings"][0]
    results = self._search_pgvector(vector, limit)
    if not results:
        return None
    df = pd.DataFrame(
        results,
        columns=[
            "concept_id",
            "concept_name",
            "score",
        ],
    )
    return df

search_terms(df, term_column, matched_concept_id_column='matched_concept_id', matched_concept_name_column='matched_concept_name', match_score_column='match_score', match_rank_column='match_rank', limit=25, return_embeddings=False)

Searches for concepts matching terms in a DataFrame column.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing the terms to search for.

required
term_column str

Name of the column with terms to search.

required
matched_concept_id_column str

Name of the column to store matched concept IDs.

'matched_concept_id'
matched_concept_name_column str

Name of the column to store matched concept names.

'matched_concept_name'
match_score_column str

Name of the column to store match scores.

'match_score'
match_rank_column str

Name of the column to store match ranks.

'match_rank'
limit int

The maximum number of results to return for each term.

25
return_embeddings bool

When True, also return a dict[term -> np.ndarray] mapping each unique source term to its embedding vector. The caller can pass these vectors to find_attributes_two_stage(..., precomputed_embedding=...) to skip the duplicate reference-retrieval embedding call in Step 2.

False

Returns:

Type Description

When return_embeddings is False (default): a DataFrame containing

the same columns as the input dataframe plus the matching concepts

for each term (multiple rows per input term).

When return_embeddings is True: a (results_df, term_to_vector)

tuple where term_to_vector is dict[str, np.ndarray].

Source code in src/ariadne/vector_search/pgvector_concept_searcher.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def search_terms(
        self,
        df: pd.DataFrame,
        term_column: str,
        matched_concept_id_column: str = "matched_concept_id",
        matched_concept_name_column: str = "matched_concept_name",
        match_score_column: str = "match_score",
        match_rank_column: str = "match_rank",
        limit: int = 25,
        return_embeddings: bool = False,
):
    """
    Searches for concepts matching terms in a DataFrame column.

    Args:
        df: DataFrame containing the terms to search for.
        term_column: Name of the column with terms to search.
        matched_concept_id_column: Name of the column to store matched concept IDs.
        matched_concept_name_column: Name of the column to store matched concept names.
        match_score_column: Name of the column to store match scores.
        match_rank_column: Name of the column to store match ranks.
        limit: The maximum number of results to return for each term.
        return_embeddings: When True, also return a ``dict[term -> np.ndarray]``
            mapping each unique source term to its embedding vector.  The
            caller can pass these vectors to
            ``find_attributes_two_stage(..., precomputed_embedding=...)`` to
            skip the duplicate reference-retrieval embedding call in Step 2.

    Returns:
        When *return_embeddings* is False (default): a DataFrame containing
        the same columns as the input dataframe plus the matching concepts
        for each term (multiple rows per input term).

        When *return_embeddings* is True: a ``(results_df, term_to_vector)``
        tuple where ``term_to_vector`` is ``dict[str, np.ndarray]``.
    """

    terms = df[term_column].tolist()
    vectors_with_usage = get_embedding_vectors(terms)
    self.cost = self.cost + vectors_with_usage["usage"]["total_cost_usd"]
    vectors = vectors_with_usage["embeddings"]

    df = df.reset_index(drop=True)
    all_results = []
    term_to_vector: dict[str, np.ndarray] = {}
    for index, row in df.iterrows():
        term = row[term_column]
        vector = vectors[index]
        if return_embeddings:
            term_to_vector[term] = vector
        results = self._search_pgvector(vector, limit=limit)
        results = pd.DataFrame(
            results,
            columns=[
                matched_concept_id_column,
                matched_concept_name_column,
                match_score_column,
            ],
        )
        results[match_rank_column] = range(1, len(results) + 1)
        orig_cols = list(df.columns)
        new_columns = list(results.columns)
        results[term_column] = term
        for col in df.columns:
            results[col] = row[col]
        results = results[orig_cols + new_columns]
        all_results.append(results)

    all_results = pd.concat(all_results)
    if return_embeddings:
        return all_results, term_to_vector
    return all_results