Skip to content

pgvector_concept_searcher

PgvectorConceptSearcher

Bases: AbstractConceptSearcher

A concept searcher that uses pgvector in a PostgreSQL database to find concepts based on embedding vectors.

Source code in src/ariadne/vector_search/pgvector_concept_searcher.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class PgvectorConceptSearcher(AbstractConceptSearcher):

    """
    A concept searcher that uses pgvector in a PostgreSQL database to find concepts based on embedding vectors.
    """

    def __init__(self, for_evaluation: bool = False, include_synonyms: bool = True, include_mapped_terms: bool = True):
        self.for_evaluation = for_evaluation
        self.include_synonyms = include_synonyms
        self.include_mapped_terms = include_mapped_terms
        self.cost = 0.0

        if for_evaluation:
            self.concept_classes_to_ignore = [
                "Disposition",
                "Morph Abnormality",
                "Organism",
                "Qualifier Value",
                "Substance",
                "ICDO Condition",
            ]
            self.vocabularies_to_ignore = [
                "ICD9CM",
                "ICD10CM",
                "ICD10",
                "ICD10CN",
                "ICD10GM",
                "CIM10",
                "ICDO3",
                "KCD7",
                "Read",
            ]
            print("ConceptSearcher initialized in evaluation mode.")
        else:
            self.concept_classes_to_ignore = None
            self.vocabularies_to_ignore = None

        connection = psycopg.connect(os.getenv("vocab_connection_string").replace("+psycopg", ""))
        register_vector(connection)
        with connection.cursor() as cur:
            cur.execute("SET hnsw.ef_search = 1000")
            cur.execute("SET hnsw.iterative_scan = relaxed_order")
        self.connection = connection

    def close(self):
        self.connection.close()

    def _search_pgvector(self, source_vector: np.ndarray, limit: int) -> List:
        if self.concept_classes_to_ignore is None:
            ignore_string_class = "'dummy'"
        else:
            ignore_string_class = ", ".join(f"'{y}'" for y in self.concept_classes_to_ignore)

        if self.include_synonyms:
            term_type_clause = ""
        else:
            term_type_clause = "AND vectors.term_type = 'Name'"

        vocabulary_schema = get_environment_variable("VOCAB_SCHEMA")
        vector_table = get_environment_variable("VOCAB_VECTOR_TABLE")

        if self.include_mapped_terms:
            if self.vocabularies_to_ignore is None:
                ignore_string_vocab = "'dummy'"
            else:
                ignore_string_vocab = ", ".join(f"'{x}'" for x in self.vocabularies_to_ignore)
            query = f"""
                WITH target_concept AS (
                    SELECT concept_id,
                        concept_name,
                        MIN(relevance_score) AS relevance_score
                    FROM (
                        (
                            SELECT concept.concept_id,
                                concept.concept_name,
                                embedding_vector <=> %s AS relevance_score
                            FROM {vocabulary_schema}.{vector_table} vectors
                            INNER JOIN {vocabulary_schema}.concept source_concept
                                ON vectors.concept_id = source_concept.concept_id
                            INNER JOIN {vocabulary_schema}.concept_relationship
                                ON vectors.concept_id = concept_relationship.concept_id_1
                            INNER JOIN {vocabulary_schema}.concept
                                ON concept_relationship.concept_id_2 = concept.concept_id
                            WHERE relationship_id = 'Maps to'
                                AND source_concept.vocabulary_id NOT IN ({ignore_string_vocab})
                                AND concept.concept_class_id NOT IN ({ignore_string_class})
                                {term_type_clause}
                            ORDER BY embedding_vector <=> %s
                            LIMIT {limit * 4} -- May have duplicates due to synonyms
                        )

                        UNION ALL

                        (
                            SELECT concept.concept_id,
                                concept.concept_name,
                                embedding_vector <=> %s AS relevance_score
                            FROM {vocabulary_schema}.{vector_table} vectors
                            INNER JOIN {vocabulary_schema}.concept
                                ON vectors.concept_id = concept.concept_id
                            WHERE standard_concept = 'S'
                                AND concept.concept_class_id NOT IN ({ignore_string_class})
                                {term_type_clause}
                            ORDER BY embedding_vector <=> %s
                            LIMIT {limit * 4} -- May have duplicates due to synonyms
                        )
                    ) tmp
                    GROUP BY concept_id,
                        concept_name
                )
                SELECT target_concept.concept_id,
                    target_concept.concept_name,
                    target_concept.relevance_score
                FROM target_concept
                ORDER BY relevance_score
                LIMIT {limit};
            """
            with self.connection.cursor() as cur:
                cur.execute(query, (source_vector, source_vector, source_vector, source_vector))
                results = cur.fetchall()
        else:
            query = f"""
                WITH target_concept AS (
                    SELECT concept_id,
                        concept_name,
                        MIN(relevance_score) AS relevance_score
                    FROM (
                        SELECT concept.concept_id,
                            concept.concept_name,
                            embedding_vector <=> %s AS relevance_score
                        FROM {vocabulary_schema}.{vector_table} vectors
                        INNER JOIN {vocabulary_schema}.concept
                            ON vectors.concept_id = concept.concept_id
                        WHERE standard_concept = 'S'
                            AND concept.concept_class_id NOT IN ({ignore_string_class})
                            {term_type_clause}
                        ORDER BY embedding_vector <=> %s
                        LIMIT {limit * 4} -- May have duplicates due to synonyms
                    ) tmp
                    GROUP BY concept_id,
                        concept_name
                )
                SELECT target_concept.concept_id,
                    target_concept.concept_name,
                    target_concept.relevance_score
                FROM target_concept
                ORDER BY relevance_score
                LIMIT {limit};
            """
            with self.connection.cursor() as cur:
                cur.execute(query, (source_vector, source_vector))
                results = cur.fetchall()

        return results

    def search_term(self, term: str, limit: int = 25) -> Optional[pd.DataFrame]:
        """
        Searches for concepts matching the given term.

        Args:
            term: The clinical term to search for.
            limit: The maximum number of results to return.

        Returns:
            A DataFrame containing the matching concepts, or None if no matches are found.
        """
        vectors_with_usage = get_embedding_vectors([term])
        self.cost = self.cost + vectors_with_usage["usage"]["total_cost_usd"]
        vector = vectors_with_usage["embeddings"][0]
        results = self._search_pgvector(vector, limit)
        if not results:
            return None
        df = pd.DataFrame(
            results,
            columns=[
                "concept_id",
                "concept_name",
                "score",
            ],
        )
        return df

    def search_terms(
        self,
        df: pd.DataFrame,
        term_column: str,
        matched_concept_id_column: str = "matched_concept_id",
        matched_concept_name_column: str = "matched_concept_name",
        match_score_column: str = "match_score",
        match_rank_column: str = "match_rank",
        limit: int = 25,
    ) -> pd.DataFrame:
        """
        Searches for concepts matching terms in a DataFrame column.

        Args:
            df: DataFrame containing the terms to search for.
            term_column: Name of the column with terms to search.
            matched_concept_id_column: Name of the column to store matched concept IDs.
            matched_concept_name_column: Name of the column to store matched concept names.
            match_score_column: Name of the column to store match scores.
            match_rank_column: Name of the column to store match ranks.
            limit: The maximum number of results to return for each term.

        Returns:
            A DataFrame containing the same columns as the input dataframe plus the matching concepts for each term. For
            each term in the input dataframe, multiple rows will be returned corresponding to each matching concept.

        """

        vectors_with_usage = get_embedding_vectors(df[term_column].tolist())
        self.cost = self.cost + vectors_with_usage["usage"]["total_cost_usd"]
        vectors = vectors_with_usage["embeddings"]

        df = df.reset_index(drop=True)
        all_results = []
        for index, row in df.iterrows():
            term = row[term_column]
            # print(f"Processing term '{term}'")
            vector = vectors[index]
            results = self._search_pgvector(vector, limit=limit)
            results = pd.DataFrame(
                results,
                columns=[
                    matched_concept_id_column,
                    matched_concept_name_column,
                    match_score_column,
                ],
            )
            results[match_rank_column] = range(1, len(results) + 1)
            orig_cols = list(df.columns)
            new_columns = list(results.columns)
            results[term_column] = term
            for col in df.columns:
                results[col] = row[col]
            results = results[orig_cols + new_columns]
            all_results.append(results)

        all_results = pd.concat(all_results)
        return all_results

    def get_total_cost(self) -> float:
        """
        Returns the total cost incurred for embedding vector calls

        Returns:
            Total cost in USD.
        """

        return self.cost

get_total_cost()

Returns the total cost incurred for embedding vector calls

Returns:

Type Description
float

Total cost in USD.

Source code in src/ariadne/vector_search/pgvector_concept_searcher.py
276
277
278
279
280
281
282
283
284
def get_total_cost(self) -> float:
    """
    Returns the total cost incurred for embedding vector calls

    Returns:
        Total cost in USD.
    """

    return self.cost

search_term(term, limit=25)

Searches for concepts matching the given term.

Parameters:

Name Type Description Default
term str

The clinical term to search for.

required
limit int

The maximum number of results to return.

25

Returns:

Type Description
Optional[DataFrame]

A DataFrame containing the matching concepts, or None if no matches are found.

Source code in src/ariadne/vector_search/pgvector_concept_searcher.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def search_term(self, term: str, limit: int = 25) -> Optional[pd.DataFrame]:
    """
    Searches for concepts matching the given term.

    Args:
        term: The clinical term to search for.
        limit: The maximum number of results to return.

    Returns:
        A DataFrame containing the matching concepts, or None if no matches are found.
    """
    vectors_with_usage = get_embedding_vectors([term])
    self.cost = self.cost + vectors_with_usage["usage"]["total_cost_usd"]
    vector = vectors_with_usage["embeddings"][0]
    results = self._search_pgvector(vector, limit)
    if not results:
        return None
    df = pd.DataFrame(
        results,
        columns=[
            "concept_id",
            "concept_name",
            "score",
        ],
    )
    return df

search_terms(df, term_column, matched_concept_id_column='matched_concept_id', matched_concept_name_column='matched_concept_name', match_score_column='match_score', match_rank_column='match_rank', limit=25)

Searches for concepts matching terms in a DataFrame column.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing the terms to search for.

required
term_column str

Name of the column with terms to search.

required
matched_concept_id_column str

Name of the column to store matched concept IDs.

'matched_concept_id'
matched_concept_name_column str

Name of the column to store matched concept names.

'matched_concept_name'
match_score_column str

Name of the column to store match scores.

'match_score'
match_rank_column str

Name of the column to store match ranks.

'match_rank'
limit int

The maximum number of results to return for each term.

25

Returns:

Type Description
DataFrame

A DataFrame containing the same columns as the input dataframe plus the matching concepts for each term. For

DataFrame

each term in the input dataframe, multiple rows will be returned corresponding to each matching concept.

Source code in src/ariadne/vector_search/pgvector_concept_searcher.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def search_terms(
    self,
    df: pd.DataFrame,
    term_column: str,
    matched_concept_id_column: str = "matched_concept_id",
    matched_concept_name_column: str = "matched_concept_name",
    match_score_column: str = "match_score",
    match_rank_column: str = "match_rank",
    limit: int = 25,
) -> pd.DataFrame:
    """
    Searches for concepts matching terms in a DataFrame column.

    Args:
        df: DataFrame containing the terms to search for.
        term_column: Name of the column with terms to search.
        matched_concept_id_column: Name of the column to store matched concept IDs.
        matched_concept_name_column: Name of the column to store matched concept names.
        match_score_column: Name of the column to store match scores.
        match_rank_column: Name of the column to store match ranks.
        limit: The maximum number of results to return for each term.

    Returns:
        A DataFrame containing the same columns as the input dataframe plus the matching concepts for each term. For
        each term in the input dataframe, multiple rows will be returned corresponding to each matching concept.

    """

    vectors_with_usage = get_embedding_vectors(df[term_column].tolist())
    self.cost = self.cost + vectors_with_usage["usage"]["total_cost_usd"]
    vectors = vectors_with_usage["embeddings"]

    df = df.reset_index(drop=True)
    all_results = []
    for index, row in df.iterrows():
        term = row[term_column]
        # print(f"Processing term '{term}'")
        vector = vectors[index]
        results = self._search_pgvector(vector, limit=limit)
        results = pd.DataFrame(
            results,
            columns=[
                matched_concept_id_column,
                matched_concept_name_column,
                match_score_column,
            ],
        )
        results[match_rank_column] = range(1, len(results) + 1)
        orig_cols = list(df.columns)
        new_columns = list(results.columns)
        results[term_column] = term
        for col in df.columns:
            results[col] = row[col]
        results = results[orig_cols + new_columns]
        all_results.append(results)

    all_results = pd.concat(all_results)
    return all_results