Skip to content

hierarchy

SNOMED CT attribute extraction pipeline (hierarchy sub-package).

Public API re-exports for convenient imports::

from ariadne.hierarchy import (
    HierarchySettings,
    SnomedAttributeSearcher,
    SnomedReferenceSearcher,
    find_attributes_two_stage,
    process_gold_standard,
    evaluate_results,
)

AbstractSnomedSearcher

Bases: ABC

ABC for pgvector-backed SNOMED searchers.

Enforces lifecycle management (close / context-manager) and cost tracking — matching the patterns used by PgvectorConceptSearcher and LlmMapper.

Subclasses must implement :meth:search.

Source code in src/ariadne/hierarchy/searchers.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class AbstractSnomedSearcher(ABC):
    """ABC for pgvector-backed SNOMED searchers.

    Enforces lifecycle management (``close`` / context-manager) and cost
    tracking — matching the patterns used by ``PgvectorConceptSearcher`` and
    ``LlmMapper``.

    Subclasses must implement :meth:`search`.
    """

    def __init__(self, cfg: HierarchySettings | None = None):
        if cfg is None:
            cfg = load_hierarchy_settings()
        self.cfg = cfg
        conn_str = get_environment_variable("VOCAB_CONNECTION_STRING")
        conn_str = conn_str.replace("+psycopg", "").replace("+psycopg2", "")
        self.connection = psycopg.connect(conn_str, autocommit=True)
        try:
            register_vector(self.connection)
            with self.connection.cursor() as cur:
                cur.execute(f"SET hnsw.ef_search = {self.cfg.retrieval.hnsw_ef_search}")
            self.schema = get_environment_variable("VOCAB_SCHEMA")
        except Exception:
            self.connection.close()
            raise
        self._cost = 0.0

    # -- abstract contract --------------------------------------------------

    @abstractmethod
    def search(self, text: str, *args, **kwargs):
        """Run a similarity search.  Signature varies by subclass."""

    # -- lifecycle ----------------------------------------------------------

    def get_total_cost(self) -> float:
        """Return accumulated embedding cost (USD)."""
        return self._cost

    def close(self):
        """Close the database connection."""
        self.connection.close()

    def __enter__(self):
        return self

    def __exit__(self, *exc):
        self.close()

close()

Close the database connection.

Source code in src/ariadne/hierarchy/searchers.py
116
117
118
def close(self):
    """Close the database connection."""
    self.connection.close()

get_total_cost()

Return accumulated embedding cost (USD).

Source code in src/ariadne/hierarchy/searchers.py
112
113
114
def get_total_cost(self) -> float:
    """Return accumulated embedding cost (USD)."""
    return self._cost

search(text, *args, **kwargs) abstractmethod

Run a similarity search. Signature varies by subclass.

Source code in src/ariadne/hierarchy/searchers.py
106
107
108
@abstractmethod
def search(self, text: str, *args, **kwargs):
    """Run a similarity search.  Signature varies by subclass."""

ContentFilterError

Bases: Exception

Raised when the LLM content filter blocks a response.

Source code in src/ariadne/hierarchy/pipeline.py
45
46
class ContentFilterError(Exception):
    """Raised when the LLM content filter blocks a response."""

EvaluationConfig dataclass

Output and gold-standard paths for hierarchy evaluation.

Source code in src/ariadne/utils/settings.py
230
231
232
233
234
235
236
@dataclass
class EvaluationConfig:
    """Output and gold-standard paths for hierarchy evaluation."""

    attribute_gold_standard_path: str = "./data/gold_standards/hierarchy_attributes_snomed_gs.csv"
    parent_gold_standard_path: str = "./data/gold_standards/hierarchy_snomed_gs.csv"
    output_dir: str = "./data/notebook_results"

ExtractionResult

Bases: NamedTuple

Return type for :func:~ariadne.hierarchy.pipeline.extract_components.

Source code in src/ariadne/hierarchy/types.py
29
30
31
32
33
class ExtractionResult(NamedTuple):
    """Return type for :func:`~ariadne.hierarchy.pipeline.extract_components`."""

    components: dict
    cost: float

HierarchySettings dataclass

Settings block loaded from the optional top-level hierarchy config key.

Source code in src/ariadne/utils/settings.py
247
248
249
250
251
252
253
254
255
256
@dataclass
class HierarchySettings:
    """Settings block loaded from the optional top-level ``hierarchy`` config key."""

    models: ModelsConfig = field(default_factory=ModelsConfig)
    retrieval: RetrievalConfig = field(default_factory=RetrievalConfig)
    scoring: ScoringConfig = field(default_factory=ScoringConfig)
    evaluation: EvaluationConfig = field(default_factory=EvaluationConfig)
    prompts: PromptsConfig = field(default_factory=PromptsConfig)
    snomed_relationships: List[str] = field(default_factory=lambda: list(_DEFAULT_SNOMED_RELATIONSHIPS))

LlmResult

Bases: NamedTuple

Return type for :func:~ariadne.hierarchy.pipeline.call_llm.

Source code in src/ariadne/hierarchy/types.py
22
23
24
25
26
class LlmResult(NamedTuple):
    """Return type for :func:`~ariadne.hierarchy.pipeline.call_llm`."""

    content: str
    cost: float

ModelsConfig dataclass

LLM / embedding model identifiers for hierarchy extraction.

Source code in src/ariadne/utils/settings.py
204
205
206
207
208
209
210
@dataclass
class ModelsConfig:
    """LLM / embedding model identifiers for hierarchy extraction."""

    embedding: str = "text-embedding-3-large"
    extraction: str = "o3"
    selection: str = "o3"

PromptsConfig dataclass

Prompt templates for hierarchy extraction and candidate selection.

Source code in src/ariadne/utils/settings.py
239
240
241
242
243
244
@dataclass
class PromptsConfig:
    """Prompt templates for hierarchy extraction and candidate selection."""

    extraction: str = ""
    selection: str = ""

ReferenceRetrievalResult

Bases: NamedTuple

Return type for _retrieve_reference_examples.

Source code in src/ariadne/hierarchy/types.py
57
58
59
60
61
62
class ReferenceRetrievalResult(NamedTuple):
    """Return type for ``_retrieve_reference_examples``."""

    examples: list
    prompt_text: str
    cost: float

ReferenceSearchResult

Bases: NamedTuple

Return type for reference search and find_similar_reference_terms.

Source code in src/ariadne/hierarchy/types.py
50
51
52
53
54
class ReferenceSearchResult(NamedTuple):
    """Return type for reference ``search`` and ``find_similar_reference_terms``."""

    examples: list
    cost: float

RetrievalConfig dataclass

Retrieval-stage hyper-parameters for hierarchy extraction.

Source code in src/ariadne/utils/settings.py
213
214
215
216
217
218
219
@dataclass
class RetrievalConfig:
    """Retrieval-stage hyper-parameters for hierarchy extraction."""

    num_reference_examples: int = 5
    top_k_per_category: int = 20
    hnsw_ef_search: int = 200

ScoringConfig dataclass

Similarity score overrides used by hierarchy ranking.

Source code in src/ariadne/utils/settings.py
222
223
224
225
226
227
@dataclass
class ScoringConfig:
    """Similarity score overrides used by hierarchy ranking."""

    reference_similarity: float = 0.9
    hierarchy_similarity: float = 0.85

SearchBatchResult

Bases: NamedTuple

Return type for search_batch.

Source code in src/ariadne/hierarchy/types.py
43
44
45
46
47
class SearchBatchResult(NamedTuple):
    """Return type for ``search_batch``."""

    results: dict  # dict[str, pd.DataFrame]
    cost: float

SearchResult

Bases: NamedTuple

Return type for attribute search and _retrieve_candidates.

Source code in src/ariadne/hierarchy/types.py
36
37
38
39
40
class SearchResult(NamedTuple):
    """Return type for attribute ``search`` and ``_retrieve_candidates``."""

    dataframe: Any  # pd.DataFrame
    cost: float

SnomedAttributeSearcher

Bases: AbstractSnomedSearcher

Source code in src/ariadne/hierarchy/searchers.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class SnomedAttributeSearcher(AbstractSnomedSearcher):

    def search(self, text: str, category_name: str, top_k: int | None = None) -> SearchResult:
        """Embed *text* and return the closest concepts in the given attribute category.

        Args:
            text: Free-text value to embed and search for.
            category_name: SNOMED attribute category filter.
            top_k: Maximum number of results (defaults to ``cfg.retrieval.top_k_per_category``).

        Returns:
            SearchResult(dataframe, cost).
        """
        top_k = top_k if top_k is not None else self.cfg.retrieval.top_k_per_category
        result = get_embedding_vectors([text])
        vector = result["embeddings"][0]
        cost = result["usage"]["total_cost_usd"]
        self._cost += cost

        query = sql.SQL("""
            WITH q AS (SELECT %s::vector AS vec)
            SELECT concept_id, concept_code, concept_name, attribute_category,
                   1 - (embedding <=> q.vec) AS similarity
            FROM {schema}.{table}, q
            WHERE attribute_category = %s
            ORDER BY embedding <=> q.vec
            LIMIT %s
        """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_attribute"))
        with self.connection.cursor() as cur:
            cur.execute(query, (vector.tolist(), category_name, top_k))
            rows = cur.fetchall()

        if not rows:
            return SearchResult(pd.DataFrame(columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]), cost)

        return SearchResult(pd.DataFrame(rows, columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]), cost)

    def search_batch(
        self,
        texts_and_categories: list[tuple[str, str, str]],
        top_k: int | None = None,
    ) -> SearchBatchResult:
        """Embed all *texts* in a single API call and run per-category queries.

        Args:
            texts_and_categories: List of ``(attr_key, text, category_name)`` tuples.
            top_k: Number of candidates per category (defaults to ``cfg.retrieval.top_k_per_category``).

        Returns:
            SearchBatchResult(results, total_cost).
        """
        top_k = top_k if top_k is not None else self.cfg.retrieval.top_k_per_category
        if not texts_and_categories:
            return SearchBatchResult({}, 0.0)

        texts = [t[1] for t in texts_and_categories]
        result = get_embedding_vectors(texts)
        vectors = result["embeddings"]
        cost = result["usage"]["total_cost_usd"]
        self._cost += cost

        results: dict[str, pd.DataFrame] = {}
        for (attr_key, _text, category_name), vector in zip(texts_and_categories, vectors):
            query = sql.SQL("""
                WITH q AS (SELECT %s::vector AS vec)
                SELECT concept_id, concept_code, concept_name, attribute_category,
                       1 - (embedding <=> q.vec) AS similarity
                FROM {schema}.{table}, q
                WHERE attribute_category = %s
                ORDER BY embedding <=> q.vec
                LIMIT %s
            """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_attribute"))
            with self.connection.cursor() as cur:
                cur.execute(query, (vector.tolist(), category_name, top_k))
                rows = cur.fetchall()

            if rows:
                results[attr_key] = pd.DataFrame(
                    rows, columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]
                )
            else:
                results[attr_key] = pd.DataFrame(
                    columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]
                )
        return SearchBatchResult(results, cost)

    def expand_via_hierarchy(
        self,
        concept_ids: list[int],
        attribute_category: str,
    ) -> pd.DataFrame:
        """Expand candidate concept IDs by 1-hop SNOMED hierarchy.

        For each *concept_id*, retrieves its immediate parents (``Is a``) and
        children (``Subsumes``) from ``concept_relationship``, then filters to
        those that exist in ``snomed_attribute`` under the same
        *attribute_category*.  No embedding API call is needed.

        Args:
            concept_ids: Seed concept IDs to expand.
            attribute_category: SNOMED attribute category filter.

        Returns:
            DataFrame with columns
            ``[concept_id, concept_name, attribute_category, similarity]``
            where *similarity* is set to ``cfg.scoring.hierarchy_similarity``.
        """
        if not concept_ids:
            return pd.DataFrame(
                columns=["concept_id", "concept_name", "attribute_category", "similarity"]
            )

        query = sql.SQL("""
            WITH seeds AS (
                SELECT unnest(%s::int[]) AS cid
            ),
            neighbors AS (
                -- parents (seed "Is a" parent)
                SELECT DISTINCT cr.concept_id_2 AS cid
                FROM {schema}.{concept_rel} cr
                JOIN seeds s ON cr.concept_id_1 = s.cid
                WHERE cr.relationship_id = 'Is a'
                  AND cr.invalid_reason IS NULL
                UNION
                -- children (child "Is a" seed)
                SELECT DISTINCT cr.concept_id_1 AS cid
                FROM {schema}.{concept_rel} cr
                JOIN seeds s ON cr.concept_id_2 = s.cid
                WHERE cr.relationship_id = 'Is a'
                  AND cr.invalid_reason IS NULL
            )
            SELECT DISTINCT sa.concept_id, sa.concept_code, sa.concept_name, sa.attribute_category
            FROM {schema}.{snomed_attr} sa
            JOIN neighbors n ON sa.concept_id = n.cid
            WHERE sa.attribute_category = %s
              AND sa.concept_id NOT IN (SELECT cid FROM seeds)
        """).format(
            schema=sql.Identifier(self.schema),
            concept_rel=sql.Identifier("concept_relationship"),
            snomed_attr=sql.Identifier("snomed_attribute"),
        )
        params = [concept_ids, attribute_category]
        with self.connection.cursor() as cur:
            cur.execute(query, params)
            rows = cur.fetchall()

        if not rows:
            return pd.DataFrame(
                columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]
            )

        df = pd.DataFrame(rows, columns=["concept_id", "concept_code", "concept_name", "attribute_category"])
        df = df.drop_duplicates(subset=["concept_id"])
        df["similarity"] = self.cfg.scoring.hierarchy_similarity
        return df

expand_via_hierarchy(concept_ids, attribute_category)

Expand candidate concept IDs by 1-hop SNOMED hierarchy.

For each concept_id, retrieves its immediate parents (Is a) and children (Subsumes) from concept_relationship, then filters to those that exist in snomed_attribute under the same attribute_category. No embedding API call is needed.

Parameters:

Name Type Description Default
concept_ids list[int]

Seed concept IDs to expand.

required
attribute_category str

SNOMED attribute category filter.

required

Returns:

Type Description
DataFrame

DataFrame with columns

DataFrame

[concept_id, concept_name, attribute_category, similarity]

DataFrame

where similarity is set to cfg.scoring.hierarchy_similarity.

Source code in src/ariadne/hierarchy/searchers.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def expand_via_hierarchy(
    self,
    concept_ids: list[int],
    attribute_category: str,
) -> pd.DataFrame:
    """Expand candidate concept IDs by 1-hop SNOMED hierarchy.

    For each *concept_id*, retrieves its immediate parents (``Is a``) and
    children (``Subsumes``) from ``concept_relationship``, then filters to
    those that exist in ``snomed_attribute`` under the same
    *attribute_category*.  No embedding API call is needed.

    Args:
        concept_ids: Seed concept IDs to expand.
        attribute_category: SNOMED attribute category filter.

    Returns:
        DataFrame with columns
        ``[concept_id, concept_name, attribute_category, similarity]``
        where *similarity* is set to ``cfg.scoring.hierarchy_similarity``.
    """
    if not concept_ids:
        return pd.DataFrame(
            columns=["concept_id", "concept_name", "attribute_category", "similarity"]
        )

    query = sql.SQL("""
        WITH seeds AS (
            SELECT unnest(%s::int[]) AS cid
        ),
        neighbors AS (
            -- parents (seed "Is a" parent)
            SELECT DISTINCT cr.concept_id_2 AS cid
            FROM {schema}.{concept_rel} cr
            JOIN seeds s ON cr.concept_id_1 = s.cid
            WHERE cr.relationship_id = 'Is a'
              AND cr.invalid_reason IS NULL
            UNION
            -- children (child "Is a" seed)
            SELECT DISTINCT cr.concept_id_1 AS cid
            FROM {schema}.{concept_rel} cr
            JOIN seeds s ON cr.concept_id_2 = s.cid
            WHERE cr.relationship_id = 'Is a'
              AND cr.invalid_reason IS NULL
        )
        SELECT DISTINCT sa.concept_id, sa.concept_code, sa.concept_name, sa.attribute_category
        FROM {schema}.{snomed_attr} sa
        JOIN neighbors n ON sa.concept_id = n.cid
        WHERE sa.attribute_category = %s
          AND sa.concept_id NOT IN (SELECT cid FROM seeds)
    """).format(
        schema=sql.Identifier(self.schema),
        concept_rel=sql.Identifier("concept_relationship"),
        snomed_attr=sql.Identifier("snomed_attribute"),
    )
    params = [concept_ids, attribute_category]
    with self.connection.cursor() as cur:
        cur.execute(query, params)
        rows = cur.fetchall()

    if not rows:
        return pd.DataFrame(
            columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]
        )

    df = pd.DataFrame(rows, columns=["concept_id", "concept_code", "concept_name", "attribute_category"])
    df = df.drop_duplicates(subset=["concept_id"])
    df["similarity"] = self.cfg.scoring.hierarchy_similarity
    return df

search(text, category_name, top_k=None)

Embed text and return the closest concepts in the given attribute category.

Parameters:

Name Type Description Default
text str

Free-text value to embed and search for.

required
category_name str

SNOMED attribute category filter.

required
top_k int | None

Maximum number of results (defaults to cfg.retrieval.top_k_per_category).

None

Returns:

Type Description
SearchResult

SearchResult(dataframe, cost).

Source code in src/ariadne/hierarchy/searchers.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def search(self, text: str, category_name: str, top_k: int | None = None) -> SearchResult:
    """Embed *text* and return the closest concepts in the given attribute category.

    Args:
        text: Free-text value to embed and search for.
        category_name: SNOMED attribute category filter.
        top_k: Maximum number of results (defaults to ``cfg.retrieval.top_k_per_category``).

    Returns:
        SearchResult(dataframe, cost).
    """
    top_k = top_k if top_k is not None else self.cfg.retrieval.top_k_per_category
    result = get_embedding_vectors([text])
    vector = result["embeddings"][0]
    cost = result["usage"]["total_cost_usd"]
    self._cost += cost

    query = sql.SQL("""
        WITH q AS (SELECT %s::vector AS vec)
        SELECT concept_id, concept_code, concept_name, attribute_category,
               1 - (embedding <=> q.vec) AS similarity
        FROM {schema}.{table}, q
        WHERE attribute_category = %s
        ORDER BY embedding <=> q.vec
        LIMIT %s
    """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_attribute"))
    with self.connection.cursor() as cur:
        cur.execute(query, (vector.tolist(), category_name, top_k))
        rows = cur.fetchall()

    if not rows:
        return SearchResult(pd.DataFrame(columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]), cost)

    return SearchResult(pd.DataFrame(rows, columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]), cost)

search_batch(texts_and_categories, top_k=None)

Embed all texts in a single API call and run per-category queries.

Parameters:

Name Type Description Default
texts_and_categories list[tuple[str, str, str]]

List of (attr_key, text, category_name) tuples.

required
top_k int | None

Number of candidates per category (defaults to cfg.retrieval.top_k_per_category).

None

Returns:

Type Description
SearchBatchResult

SearchBatchResult(results, total_cost).

Source code in src/ariadne/hierarchy/searchers.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def search_batch(
    self,
    texts_and_categories: list[tuple[str, str, str]],
    top_k: int | None = None,
) -> SearchBatchResult:
    """Embed all *texts* in a single API call and run per-category queries.

    Args:
        texts_and_categories: List of ``(attr_key, text, category_name)`` tuples.
        top_k: Number of candidates per category (defaults to ``cfg.retrieval.top_k_per_category``).

    Returns:
        SearchBatchResult(results, total_cost).
    """
    top_k = top_k if top_k is not None else self.cfg.retrieval.top_k_per_category
    if not texts_and_categories:
        return SearchBatchResult({}, 0.0)

    texts = [t[1] for t in texts_and_categories]
    result = get_embedding_vectors(texts)
    vectors = result["embeddings"]
    cost = result["usage"]["total_cost_usd"]
    self._cost += cost

    results: dict[str, pd.DataFrame] = {}
    for (attr_key, _text, category_name), vector in zip(texts_and_categories, vectors):
        query = sql.SQL("""
            WITH q AS (SELECT %s::vector AS vec)
            SELECT concept_id, concept_code, concept_name, attribute_category,
                   1 - (embedding <=> q.vec) AS similarity
            FROM {schema}.{table}, q
            WHERE attribute_category = %s
            ORDER BY embedding <=> q.vec
            LIMIT %s
        """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_attribute"))
        with self.connection.cursor() as cur:
            cur.execute(query, (vector.tolist(), category_name, top_k))
            rows = cur.fetchall()

        if rows:
            results[attr_key] = pd.DataFrame(
                rows, columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]
            )
        else:
            results[attr_key] = pd.DataFrame(
                columns=["concept_id", "concept_code", "concept_name", "attribute_category", "similarity"]
            )
    return SearchBatchResult(results, cost)

SnomedReferenceSearcher

Bases: AbstractSnomedSearcher

Source code in src/ariadne/hierarchy/searchers.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
class SnomedReferenceSearcher(AbstractSnomedSearcher):

    def __init__(self, cfg: HierarchySettings | None = None,
                 exclude_concept_ids: set[int] | None = None):
        super().__init__(cfg)
        self._exclude_concept_ids: list[int] = sorted(exclude_concept_ids) if exclude_concept_ids else []
        if self._exclude_concept_ids:
            logger.info(
                "SnomedReferenceSearcher: excluding %d concept IDs from results.",
                len(self._exclude_concept_ids),
            )

    def search(
        self,
        text: str,
        top_k: int | None = None,
        embedding: "np.ndarray | None" = None,
    ) -> ReferenceSearchResult:
        """Embed *text* and return similar reference terms with their attributes.

        Args:
            text: Free-text term to embed (ignored when *embedding* is supplied).
            top_k: Maximum number of reference concepts to return
                (defaults to ``cfg.retrieval.num_reference_examples``).
            embedding: Optional precomputed embedding vector (shape ``[dim]``).
                When provided the API call is skipped and cost is recorded as
                zero.  Pass the vector produced by
                ``PgvectorConceptSearcher.search_terms(..., return_embeddings=True)``
                to avoid re-embedding the same term in Step 2.

        Returns:
            ReferenceSearchResult(examples, cost).
        """
        top_k = top_k if top_k is not None else self.cfg.retrieval.num_reference_examples
        if embedding is not None:
            import numpy as np  # local import — only needed here
            vector = np.asarray(embedding, dtype=float)
            cost = 0.0
        else:
            result = get_embedding_vectors([text])
            vector = result["embeddings"][0]
            cost = result["usage"]["total_cost_usd"]
            self._cost += cost

        inner_limit = top_k * 10
        if self._exclude_concept_ids:
            query = sql.SQL("""
                WITH q AS (SELECT %s::vector AS vec),
                     ranked AS (
                    SELECT concept_id_1, concept_name_1,
                           1 - (embedding <=> q.vec) AS similarity,
                           ROW_NUMBER() OVER (
                               PARTITION BY concept_id_1
                               ORDER BY embedding <=> q.vec
                           ) AS rn
                    FROM {schema}.{table}, q
                    WHERE concept_id_1 != ALL(%s)
                    ORDER BY embedding <=> q.vec
                    LIMIT %s
                )
                SELECT concept_id_1, concept_name_1, similarity
                FROM ranked
                WHERE rn = 1
                ORDER BY similarity DESC
                LIMIT %s
            """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_reference"))
            with self.connection.cursor() as cur:
                cur.execute(query, (vector.tolist(), self._exclude_concept_ids, inner_limit, top_k))
                top_terms = cur.fetchall()
        else:
            query = sql.SQL("""
                WITH q AS (SELECT %s::vector AS vec),
                     ranked AS (
                    SELECT concept_id_1, concept_name_1,
                           1 - (embedding <=> q.vec) AS similarity,
                           ROW_NUMBER() OVER (
                               PARTITION BY concept_id_1
                               ORDER BY embedding <=> q.vec
                           ) AS rn
                    FROM {schema}.{table}, q
                    ORDER BY embedding <=> q.vec
                    LIMIT %s
                )
                SELECT concept_id_1, concept_name_1, similarity
                FROM ranked
                WHERE rn = 1
                ORDER BY similarity DESC
                LIMIT %s
            """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_reference"))
            with self.connection.cursor() as cur:
                cur.execute(query, (vector.tolist(), inner_limit, top_k))
                top_terms = cur.fetchall()

        if not top_terms:
            return ReferenceSearchResult([], cost)

        # Fetch all attribute rows for those concept_id_1 values
        ids = [r[0] for r in top_terms]
        attr_query = sql.SQL("""
            SELECT concept_id_1, concept_id_2, concept_code_2, concept_name_2, attribute_category
            FROM {schema}.{table}
            WHERE concept_id_1 = ANY(%s)
        """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_reference"))
        with self.connection.cursor() as cur:
            cur.execute(attr_query, (ids,))
            attr_rows = cur.fetchall()

        # Group attributes by concept_id_1
        attrs_by_id: dict[int, list] = {}
        for concept_id_1, concept_id_2, concept_code_2, concept_name_2, attribute_category in attr_rows:
            attrs_by_id.setdefault(concept_id_1, []).append({
                "concept_id_2": concept_id_2,
                "concept_code_2": concept_code_2,
                "concept_name_2": concept_name_2,
                "attribute_category": attribute_category,
            })

        return ReferenceSearchResult([
            {
                "concept_id": concept_id_1,
                "concept_name": concept_name_1,
                "similarity": similarity,
                "attributes": attrs_by_id.get(concept_id_1, []),
            }
            for concept_id_1, concept_name_1, similarity in top_terms
        ], cost)

search(text, top_k=None, embedding=None)

Embed text and return similar reference terms with their attributes.

Parameters:

Name Type Description Default
text str

Free-text term to embed (ignored when embedding is supplied).

required
top_k int | None

Maximum number of reference concepts to return (defaults to cfg.retrieval.num_reference_examples).

None
embedding ndarray | None

Optional precomputed embedding vector (shape [dim]). When provided the API call is skipped and cost is recorded as zero. Pass the vector produced by PgvectorConceptSearcher.search_terms(..., return_embeddings=True) to avoid re-embedding the same term in Step 2.

None

Returns:

Type Description
ReferenceSearchResult

ReferenceSearchResult(examples, cost).

Source code in src/ariadne/hierarchy/searchers.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def search(
    self,
    text: str,
    top_k: int | None = None,
    embedding: "np.ndarray | None" = None,
) -> ReferenceSearchResult:
    """Embed *text* and return similar reference terms with their attributes.

    Args:
        text: Free-text term to embed (ignored when *embedding* is supplied).
        top_k: Maximum number of reference concepts to return
            (defaults to ``cfg.retrieval.num_reference_examples``).
        embedding: Optional precomputed embedding vector (shape ``[dim]``).
            When provided the API call is skipped and cost is recorded as
            zero.  Pass the vector produced by
            ``PgvectorConceptSearcher.search_terms(..., return_embeddings=True)``
            to avoid re-embedding the same term in Step 2.

    Returns:
        ReferenceSearchResult(examples, cost).
    """
    top_k = top_k if top_k is not None else self.cfg.retrieval.num_reference_examples
    if embedding is not None:
        import numpy as np  # local import — only needed here
        vector = np.asarray(embedding, dtype=float)
        cost = 0.0
    else:
        result = get_embedding_vectors([text])
        vector = result["embeddings"][0]
        cost = result["usage"]["total_cost_usd"]
        self._cost += cost

    inner_limit = top_k * 10
    if self._exclude_concept_ids:
        query = sql.SQL("""
            WITH q AS (SELECT %s::vector AS vec),
                 ranked AS (
                SELECT concept_id_1, concept_name_1,
                       1 - (embedding <=> q.vec) AS similarity,
                       ROW_NUMBER() OVER (
                           PARTITION BY concept_id_1
                           ORDER BY embedding <=> q.vec
                       ) AS rn
                FROM {schema}.{table}, q
                WHERE concept_id_1 != ALL(%s)
                ORDER BY embedding <=> q.vec
                LIMIT %s
            )
            SELECT concept_id_1, concept_name_1, similarity
            FROM ranked
            WHERE rn = 1
            ORDER BY similarity DESC
            LIMIT %s
        """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_reference"))
        with self.connection.cursor() as cur:
            cur.execute(query, (vector.tolist(), self._exclude_concept_ids, inner_limit, top_k))
            top_terms = cur.fetchall()
    else:
        query = sql.SQL("""
            WITH q AS (SELECT %s::vector AS vec),
                 ranked AS (
                SELECT concept_id_1, concept_name_1,
                       1 - (embedding <=> q.vec) AS similarity,
                       ROW_NUMBER() OVER (
                           PARTITION BY concept_id_1
                           ORDER BY embedding <=> q.vec
                       ) AS rn
                FROM {schema}.{table}, q
                ORDER BY embedding <=> q.vec
                LIMIT %s
            )
            SELECT concept_id_1, concept_name_1, similarity
            FROM ranked
            WHERE rn = 1
            ORDER BY similarity DESC
            LIMIT %s
        """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_reference"))
        with self.connection.cursor() as cur:
            cur.execute(query, (vector.tolist(), inner_limit, top_k))
            top_terms = cur.fetchall()

    if not top_terms:
        return ReferenceSearchResult([], cost)

    # Fetch all attribute rows for those concept_id_1 values
    ids = [r[0] for r in top_terms]
    attr_query = sql.SQL("""
        SELECT concept_id_1, concept_id_2, concept_code_2, concept_name_2, attribute_category
        FROM {schema}.{table}
        WHERE concept_id_1 = ANY(%s)
    """).format(schema=sql.Identifier(self.schema), table=sql.Identifier("snomed_reference"))
    with self.connection.cursor() as cur:
        cur.execute(attr_query, (ids,))
        attr_rows = cur.fetchall()

    # Group attributes by concept_id_1
    attrs_by_id: dict[int, list] = {}
    for concept_id_1, concept_id_2, concept_code_2, concept_name_2, attribute_category in attr_rows:
        attrs_by_id.setdefault(concept_id_1, []).append({
            "concept_id_2": concept_id_2,
            "concept_code_2": concept_code_2,
            "concept_name_2": concept_name_2,
            "attribute_category": attribute_category,
        })

    return ReferenceSearchResult([
        {
            "concept_id": concept_id_1,
            "concept_name": concept_name_1,
            "similarity": similarity,
            "attributes": attrs_by_id.get(concept_id_1, []),
        }
        for concept_id_1, concept_name_1, similarity in top_terms
    ], cost)

build_indexes(rebuild=False, attributes_only=False, reference_only=False, check=False, reference_sample_size=REFERENCE_SAMPLE_SIZE)

Build (or rebuild) the pgvector SNOMED indexes.

Parameters:

Name Type Description Default
rebuild bool

Truncate existing data before inserting.

False
attributes_only bool

Only build snomed_attribute.

False
reference_only bool

Only build snomed_reference.

False
check bool

Skip any table that is already populated (takes precedence over rebuild only when the table is non-empty).

False
reference_sample_size int

Number of unique source concepts for the reference index (default 10 000).

REFERENCE_SAMPLE_SIZE
Source code in src/ariadne/hierarchy/index_builder.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def build(
    rebuild: bool = False,
    attributes_only: bool = False,
    reference_only: bool = False,
    check: bool = False,
    reference_sample_size: int = REFERENCE_SAMPLE_SIZE,
) -> None:
    """Build (or rebuild) the pgvector SNOMED indexes.

    Args:
        rebuild: Truncate existing data before inserting.
        attributes_only: Only build ``snomed_attribute``.
        reference_only: Only build ``snomed_reference``.
        check: Skip any table that is already populated (takes precedence over
            *rebuild* only when the table is non-empty).
        reference_sample_size: Number of unique source concepts for the
            reference index (default 10 000).
    """
    from dotenv import load_dotenv
    from ariadne.utils.utils import get_project_root
    load_dotenv(get_project_root() / ".env")

    logging.basicConfig(level=logging.INFO, format="%(asctime)s  %(message)s")

    conn = _pg_connect()

    # Determine embedding dimension from a probe call
    probe = get_embedding_vectors(["probe"])
    dim = probe["embeddings"].shape[1]
    logger.info("Embedding dimension: %d", dim)

    create_tables(conn, dim=dim)

    if not reference_only:
        if check and check_populated(conn, "snomed_attribute"):
            logger.info("snomed_attribute already populated — skipping (--check).")
        else:
            df_attr = load_attributes_from_db()
            conn = upsert_attribute_index(conn, df_attr, rebuild=rebuild)

    if not attributes_only:
        if check and check_populated(conn, "snomed_reference"):
            logger.info("snomed_reference already populated — skipping (--check).")
        else:
            df_ref = load_reference_from_db(sample_size=reference_sample_size)
            conn = upsert_reference_index(conn, df_ref, rebuild=rebuild)

    conn.close()
    logger.info("Index build complete.")

check_populated(conn, table='snomed_attribute')

Return True if table already contains at least one row.

Parameters:

Name Type Description Default
conn Connection

psycopg connection (read access is enough).

required
table str

Table name to check ("snomed_attribute" or "snomed_reference").

'snomed_attribute'

Returns:

Type Description
bool

True if the table is non-empty.

Source code in src/ariadne/hierarchy/index_builder.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def check_populated(conn: psycopg.Connection, table: str = "snomed_attribute") -> bool:
    """Return True if *table* already contains at least one row.

    Args:
        conn: psycopg connection (read access is enough).
        table: Table name to check (``"snomed_attribute"`` or ``"snomed_reference"``).

    Returns:
        ``True`` if the table is non-empty.
    """
    schema = _vocab_schema()
    with conn.cursor() as cur:
        cur.execute(
            sql.SQL("SELECT 1 FROM {schema}.{table} LIMIT 1").format(
                schema=sql.Identifier(schema),
                table=sql.Identifier(table),
            )
        )
        return cur.fetchone() is not None

classification_summary(new_is_a, removed, total_source_concepts=None)

Compute summary statistics from classification results.

Parameters:

Name Type Description Default
new_is_a DataFrame

DataFrame of new inferred Is a relationships.

required
removed DataFrame

DataFrame of removed redundant relationships.

required
total_source_concepts int | None

Total number of source concepts in the delta (used to detect orphans). If None, orphan detection is skipped.

None

Returns:

Type Description
dict[str, object]

Dict with keys: new_is_a_count, removed_count,

dict[str, object]

concepts_with_parents, orphan_concepts,

dict[str, object]

avg_parents_per_concept, max_parents, min_parents.

Source code in src/ariadne/hierarchy/classifier.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
def classification_summary(
    new_is_a: pd.DataFrame,
    removed: pd.DataFrame,
    total_source_concepts: int | None = None,
) -> dict[str, object]:
    """Compute summary statistics from classification results.

    Args:
        new_is_a: DataFrame of new inferred *Is a* relationships.
        removed: DataFrame of removed redundant relationships.
        total_source_concepts: Total number of source concepts in the delta
            (used to detect orphans).  If ``None``, orphan detection is skipped.

    Returns:
        Dict with keys: ``new_is_a_count``, ``removed_count``,
        ``concepts_with_parents``, ``orphan_concepts``,
        ``avg_parents_per_concept``, ``max_parents``, ``min_parents``.
    """
    stats: dict[str, object] = {
        "new_is_a_count": len(new_is_a),
        "removed_count": len(removed),
    }

    if new_is_a.empty:
        stats.update({
            "concepts_with_parents": 0,
            "orphan_concepts": total_source_concepts or "unknown",
            "avg_parents_per_concept": 0.0,
            "max_parents": 0,
            "min_parents": 0,
        })
        return stats

    parents_per_concept = new_is_a.groupby("sourceId").size()
    stats["concepts_with_parents"] = len(parents_per_concept)
    stats["avg_parents_per_concept"] = round(parents_per_concept.mean(), 2)
    stats["max_parents"] = int(parents_per_concept.max())
    stats["min_parents"] = int(parents_per_concept.min())

    if total_source_concepts is not None:
        classified_sources = set(new_is_a["sourceId"].unique())
        stats["orphan_concepts"] = total_source_concepts - len(classified_sources)
    else:
        stats["orphan_concepts"] = "unknown"

    return stats

classify_delta(delta_zip, base_snomed_zip=None, *, toolkit_jar=None, java_xms='4g', timeout=600, output_dir=None)

Run ELK classification via snomed-owl-toolkit.

This calls the snomed-owl-toolkit's -classify command which:

  1. Converts the base SNOMED RF2 snapshot + your delta to OWL
  2. Runs the ELK reasoner to infer Is a relationships
  3. Produces a classification-results-*.zip with the inferred relationship changes

Parameters:

Name Type Description Default
delta_zip str | Path

Path to the RF2 delta ZIP (from export_to_rf2).

required
base_snomed_zip str | None

Path to the SNOMED CT International Edition RF2 snapshot ZIP. Falls back to SNOMED_BASE_RELEASE_ZIP env var.

None
toolkit_jar str | None

Path to the snomed-owl-toolkit executable JAR. Falls back to SNOMED_OWL_TOOLKIT_JAR env var, then tools/snomed-owl-toolkit.jar.

None
java_xms str

JVM initial heap size (default 4g).

'4g'
timeout int

Maximum seconds to wait for classification (default 600).

600
output_dir str | Path | None

Directory where the results ZIP will be written. Defaults to the parent directory of delta_zip.

None

Returns:

Type Description
Path

Path to the classification results ZIP.

Raises:

Type Description
RuntimeError

If the classification process fails.

FileNotFoundError

If the toolkit JAR or base release cannot be found.

TimeoutExpired

If classification exceeds timeout.

Source code in src/ariadne/hierarchy/classifier.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def classify_delta(
    delta_zip: str | Path,
    base_snomed_zip: str | None = None,
    *,
    toolkit_jar: str | None = None,
    java_xms: str = "4g",
    timeout: int = 600,
    output_dir: str | Path | None = None,
) -> Path:
    """Run ELK classification via snomed-owl-toolkit.

    This calls the snomed-owl-toolkit's ``-classify`` command which:

    1. Converts the base SNOMED RF2 snapshot + your delta to OWL
    2. Runs the ELK reasoner to infer *Is a* relationships
    3. Produces a ``classification-results-*.zip`` with the inferred
       relationship changes

    Args:
        delta_zip: Path to the RF2 delta ZIP (from ``export_to_rf2``).
        base_snomed_zip: Path to the SNOMED CT International Edition RF2
            snapshot ZIP.  Falls back to ``SNOMED_BASE_RELEASE_ZIP`` env var.
        toolkit_jar: Path to the snomed-owl-toolkit executable JAR.
            Falls back to ``SNOMED_OWL_TOOLKIT_JAR`` env var, then
            ``tools/snomed-owl-toolkit.jar``.
        java_xms: JVM initial heap size (default ``4g``).
        timeout: Maximum seconds to wait for classification (default 600).
        output_dir: Directory where the results ZIP will be written.
            Defaults to the parent directory of *delta_zip*.

    Returns:
        Path to the classification results ZIP.

    Raises:
        RuntimeError: If the classification process fails.
        FileNotFoundError: If the toolkit JAR or base release cannot be found.
        subprocess.TimeoutExpired: If classification exceeds *timeout*.
    """
    jar = _toolkit_jar(toolkit_jar)
    base = _base_release(base_snomed_zip)
    delta_zip = Path(delta_zip)
    output_dir = Path(output_dir or delta_zip.parent)

    cmd = [
        "java",
        f"-Xms{java_xms}",
        "--add-opens", "java.base/java.lang=ALL-UNNAMED",
        "-jar", jar,
        "-classify",
        "-rf2-snapshot-archives", base,
        "-rf2-authoring-delta-archive", str(delta_zip),
    ]

    logger.info("Running classification: %s", " ".join(cmd))
    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        timeout=timeout,
        cwd=str(output_dir),
    )

    if result.stdout:
        logger.info("snomed-owl-toolkit stdout:\n%s", result.stdout)
    if result.stderr:
        logger.warning("snomed-owl-toolkit stderr:\n%s", result.stderr)

    if result.returncode != 0:
        raise RuntimeError(
            f"Classification failed (exit code {result.returncode}).\n"
            f"stderr:\n{result.stderr}\nstdout:\n{result.stdout}"
        )

    # Find the newest results ZIP
    pattern = str(output_dir / "classification-results-*.zip")
    candidates = sorted(glob.glob(pattern), key=os.path.getmtime)
    if not candidates:
        raise FileNotFoundError(
            f"No classification-results-*.zip found in {output_dir} after "
            "successful classification.  Check snomed-owl-toolkit output above."
        )

    results_zip = Path(candidates[-1])
    logger.info("Classification complete → %s", results_zip)
    return results_zip

evaluate_results(results, gs_path, cfg=None)

Produce a combined evaluation table (full outer join of GS and predictions).

Columns

concept_id_1, concept_name_1, attribute_category, gs_concept_id_2, gs_concept_name_2, predicted_concept_id_2, predicted_concept_name_2, matched, status (match / missed / extra).

Summary statistics are printed and the combined table is saved to CSV.

Parameters:

Name Type Description Default
results list[dict]

List of pipeline result dicts from process_gold_standard.

required
gs_path str

Path to the gold-standard CSV.

required
cfg HierarchySettings | None

Pipeline configuration (reads cfg.evaluation.output_dir).

None

Returns:

Type Description
DataFrame

Combined evaluation DataFrame.

Source code in src/ariadne/hierarchy/evaluator.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def evaluate_results(
    results: list[dict],
    gs_path: str,
    cfg: HierarchySettings | None = None,
) -> pd.DataFrame:
    """Produce a combined evaluation table (full outer join of GS and predictions).

    Columns:
        concept_id_1, concept_name_1, attribute_category,
        gs_concept_id_2, gs_concept_name_2,
        predicted_concept_id_2, predicted_concept_name_2,
        matched, status (``match`` / ``missed`` / ``extra``).

    Summary statistics are printed and the combined table is saved to CSV.

    Args:
        results: List of pipeline result dicts from ``process_gold_standard``.
        gs_path: Path to the gold-standard CSV.
        cfg: Pipeline configuration (reads ``cfg.evaluation.output_dir``).

    Returns:
        Combined evaluation DataFrame.
    """
    cfg_local: HierarchySettings = cfg if cfg is not None else load_hierarchy_settings()
    output_dir = cfg_local.evaluation.output_dir
    # --- build predicted rows ---
    pred_rows = _build_prediction_rows(results)
    pred_df = pd.DataFrame(pred_rows)

    # --- load gold standard ---
    gs_df = pd.read_csv(gs_path)
    gs_df = gs_df.rename(columns={'concept_id_2': 'gs_concept_id_2', 'concept_code_2': 'gs_concept_code_2', 'concept_name_2': 'gs_concept_name_2'})

    # --- full outer join on the matching key ---
    gs_df['_join_id2'] = gs_df['gs_concept_id_2']
    pred_df['_join_id2'] = pred_df['predicted_concept_id_2']

    combined = gs_df.merge(
        pred_df,
        on=['concept_id_1', '_join_id2', 'attribute_category'],
        how='outer',
        suffixes=('_gs', '_pred'),
    )

    # Reconcile concept_name_1 from both sides
    if 'concept_name_1_gs' in combined.columns:
        combined['concept_name_1'] = combined['concept_name_1_gs'].fillna(combined['concept_name_1_pred'])
        combined.drop(columns=['concept_name_1_gs', 'concept_name_1_pred'], inplace=True)

    combined.drop(columns=['_join_id2'], inplace=True)

    # --- flags ---
    has_gs = combined['gs_concept_id_2'].notna()
    has_pred = combined['predicted_concept_id_2'].notna()
    combined['matched'] = has_gs & has_pred
    combined['status'] = 'match'
    combined.loc[has_gs & ~has_pred, 'status'] = 'missed'
    combined.loc[~has_gs & has_pred, 'status'] = 'extra'

    # --- order columns nicely ---
    leading = ['concept_id_1', 'concept_name_1', 'attribute_category',
               'gs_concept_id_2', 'gs_concept_code_2', 'gs_concept_name_2',
               'predicted_concept_id_2', 'predicted_concept_code_2', 'predicted_concept_name_2',
               'matched', 'status']
    extra_cols = [c for c in combined.columns if c not in leading]
    combined = combined[[c for c in leading if c in combined.columns] + extra_cols]

    # Sort for readability
    combined = combined.sort_values(['concept_id_1', 'attribute_category', 'status']).reset_index(drop=True)

    # --- summary stats ---
    n_gs = int(has_gs.sum())
    n_pred = int(has_pred.sum())
    n_match = int(combined['matched'].sum())
    precision = n_match / n_pred * 100 if n_pred else 0.0
    recall = n_match / n_gs * 100 if n_gs else 0.0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0

    logger.info("Gold standard rows: %d", n_gs)
    logger.info("Predicted rows:     %d", n_pred)
    logger.info("Matched:            %d", n_match)
    logger.info("Precision:          %.1f%%", precision)
    logger.info("Recall:             %.1f%%", recall)
    logger.info("F1:                 %.1f%%", f1)

    # --- save ---
    out_path = os.path.join(output_dir, "attribute_evaluation.csv")
    combined.to_csv(out_path, index=False)
    logger.info("Combined evaluation saved: %s (%d rows)", out_path, len(combined))
    return combined

export_to_rf2(source, output_dir, *, date=None, module_id=_MODULE_ID_DEFAULT, stated_parent=_CLINICAL_FINDING, rel_group=1, concept_id_start=_CONCEPT_ID_START, rel_id_start=_REL_ID_START, zip_it=True)

Export Step 2 attribute predictions as an RF2 delta ZIP.

Source OMOP concept_id_1 values are remapped to sequential synthetic IDs starting at concept_id_start to avoid collisions with real SNOMED SCTIDs in the base release.

Parameters:

Name Type Description Default
source Union[str, Path, DataFrame]

Path to attribute_results.csv or a DataFrame with columns concept_id_1, concept_name_1, predicted_concept_code_2 (SNOMED SCTID string), and attribute_category.

required
output_dir Union[str, Path]

Directory where the Delta/ folder and ZIP are written.

required
date str | None

Effective date string in YYYYMMDD format. Defaults to today.

None
module_id int

SNOMED module concept ID (default: SNOMED CT core module).

_MODULE_ID_DEFAULT
stated_parent Union[int, dict[int, list[str]]]

Either a single SCTID int applied to all concepts, or a dict[omop_concept_id, list[sctid_str]] for per-concept parents (output of build_stated_parents_map). Defaults to 404684003 (Clinical finding).

_CLINICAL_FINDING
rel_group int

Relationship group number for all predicted attributes (default: 1 — grouped). Pass 0 for ungrouped.

1
concept_id_start int

First synthetic concept ID (default: 1 000 000 001).

_CONCEPT_ID_START
rel_id_start int

Starting integer for generated relationship IDs.

_REL_ID_START
zip_it bool

When True (default), produce a .zip archive in output_dir.

True

Returns:

Type Description
Path

(zip_path, id_map_df) where id_map_df is a DataFrame with

DataFrame

columns omop_concept_id, synthetic_sctid, concept_name

tuple[Path, DataFrame]

mapping original OMOP IDs to synthetic delta IDs.

Source code in src/ariadne/hierarchy/rf2_exporter.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def export_to_rf2(
    source: Union[str, Path, pd.DataFrame],
    output_dir: Union[str, Path],
    *,
    date: str | None = None,
    module_id: int = _MODULE_ID_DEFAULT,
    stated_parent: Union[int, dict[int, list[str]]] = _CLINICAL_FINDING,
    rel_group: int = 1,
    concept_id_start: int = _CONCEPT_ID_START,
    rel_id_start: int = _REL_ID_START,
    zip_it: bool = True,
) -> tuple[Path, pd.DataFrame]:
    """Export Step 2 attribute predictions as an RF2 delta ZIP.

    Source OMOP ``concept_id_1`` values are remapped to sequential
    synthetic IDs starting at *concept_id_start* to avoid collisions
    with real SNOMED SCTIDs in the base release.

    Args:
        source: Path to ``attribute_results.csv`` **or** a DataFrame with
            columns ``concept_id_1``, ``concept_name_1``,
            ``predicted_concept_code_2`` (SNOMED SCTID string), and
            ``attribute_category``.
        output_dir: Directory where the ``Delta/`` folder and ZIP are written.
        date: Effective date string in ``YYYYMMDD`` format.  Defaults to today.
        module_id: SNOMED module concept ID (default: SNOMED CT core module).
        stated_parent: Either a single SCTID ``int`` applied to all concepts,
            or a ``dict[omop_concept_id, list[sctid_str]]`` for per-concept
            parents (output of ``build_stated_parents_map``).  Defaults to
            404684003 (Clinical finding).
        rel_group: Relationship group number for all predicted attributes
            (default: 1 — grouped).  Pass 0 for ungrouped.
        concept_id_start: First synthetic concept ID (default: 1 000 000 001).
        rel_id_start: Starting integer for generated relationship IDs.
        zip_it: When True (default), produce a ``.zip`` archive in
            *output_dir*.

    Returns:
        ``(zip_path, id_map_df)`` where *id_map_df* is a DataFrame with
        columns ``omop_concept_id``, ``synthetic_sctid``, ``concept_name``
        mapping original OMOP IDs to synthetic delta IDs.
    """
    date = date or datetime.today().strftime("%Y%m%d")
    output_dir = Path(output_dir)
    term_dir = output_dir / "Delta" / "Terminology"
    term_dir.mkdir(parents=True, exist_ok=True)
    refset_content_dir = output_dir / "Delta" / "Refset" / "Content"
    refset_content_dir.mkdir(parents=True, exist_ok=True)
    refset_meta_dir = output_dir / "Delta" / "Refset" / "Metadata"
    refset_meta_dir.mkdir(parents=True, exist_ok=True)

    rows = _rows_from_source(source)
    if not rows:
        raise ValueError("No rows to export — source is empty or all SCTIDs missing.")

    logger.info("Exporting %d attribute rows to RF2 delta (date=%s)…", len(rows), date)

    # ── Build OMOP → synthetic ID mapping ─────────────────────────────────
    unique_sources = dict.fromkeys(r[0] for r in rows)  # ordered dedup
    omop_to_synth: dict[int, int] = {}
    synth_to_omop: dict[int, int] = {}
    for i, omop_id in enumerate(unique_sources):
        synth_id = concept_id_start + i
        omop_to_synth[omop_id] = synth_id
        synth_to_omop[synth_id] = omop_id

    # Save mapping CSV
    id_map_rows = []
    for omop_id in unique_sources:
        name = next((r[1] for r in rows if r[0] == omop_id), "")
        id_map_rows.append({
            "omop_concept_id": omop_id,
            "synthetic_sctid": omop_to_synth[omop_id],
            "concept_name": name,
        })
    id_map_df = pd.DataFrame(id_map_rows)
    id_map_path = output_dir / f"id_mapping_{date}.csv"
    id_map_df.to_csv(id_map_path, index=False)
    logger.info("  ID mapping: %d concepts → %s", len(id_map_df), id_map_path.name)

    # ── Concept file ──────────────────────────────────────────────────────────
    concept_path = term_dir / f"sct2_Concept_Delta_INT_{date}.txt"
    with concept_path.open("w", encoding="utf-8") as fh:
        fh.write(_CONCEPT_HEADER)
        for omop_id in unique_sources:
            fh.write(
                f"{omop_to_synth[omop_id]}\t{date}\t1\t{module_id}\t{_DEFN_STATUS_SD}\n"
            )
    logger.info("  Concept file: %d concepts → %s", len(unique_sources), concept_path.name)

    # ── Group attributes by source concept ──────────────────────────────────
    concept_attrs: dict[int, list[tuple[int, str]]] = defaultdict(list)
    unknown_types: set[str] = set()
    for src_id, _src_name, dest_sctid, attr_type in rows:
        type_id = GS_CATEGORY_TO_TYPE_ID.get(attr_type)
        if type_id is None:
            unknown_types.add(attr_type)
            continue
        concept_attrs[src_id].append((type_id, dest_sctid))

    if unknown_types:
        logger.warning(
            "Skipped rows with unrecognised attribute_category values: %s",
            unknown_types,
        )

    # ── StatedRelationship file (header-only — definitions use OWL axioms) ──
    stated_path = term_dir / f"sct2_StatedRelationship_Delta_INT_{date}.txt"
    with stated_path.open("w", encoding="utf-8") as fh:
        fh.write(_REL_HEADER)
    logger.info("  StatedRelationship file (header-only): %s", stated_path.name)

    # ── Empty inferred Relationship file (required by snomed-owl-toolkit) ──
    rel_path = term_dir / f"sct2_Relationship_Delta_INT_{date}.txt"
    with rel_path.open("w", encoding="utf-8") as fh:
        fh.write(_REL_HEADER)
    logger.info("  Relationship file (header-only placeholder): %s", rel_path.name)

    # ── OWL Axiom refset (one row per source concept) ─────────────────────
    owl_path = refset_content_dir / f"der2_sRefset_OWLAxiomDelta_INT_{date}.txt"
    with owl_path.open("w", encoding="utf-8") as fh:
        fh.write(_OWL_AXIOM_HEADER)
        for omop_id in unique_sources:
            synth_id = omop_to_synth[omop_id]
            attrs = concept_attrs.get(omop_id, [])
            if isinstance(stated_parent, dict):
                raw_parents = stated_parent.get(omop_id, [str(_CLINICAL_FINDING)])
                parent_ids = [int(p) for p in raw_parents]
            else:
                parent_ids = [int(stated_parent)]
            owl_expr = _build_owl_expression(synth_id, parent_ids, attrs)
            axiom_uuid = str(uuid.uuid4())
            fh.write(
                f"{axiom_uuid}\t{date}\t1\t{module_id}\t"
                f"{_OWL_AXIOM_REFSET_ID}\t{synth_id}\t{owl_expr}\n"
            )
    n_with_attrs = sum(1 for omop_id in unique_sources if concept_attrs.get(omop_id))
    logger.info(
        "  OWL Axiom refset: %d concepts (%d with attributes, %d attr-only Is a) → %s",
        len(unique_sources), n_with_attrs,
        len(unique_sources) - n_with_attrs,
        owl_path.name,
    )

    # ── Module Dependency refset ──────────────────────────────────────────
    mod_dep_path = refset_meta_dir / f"der2_ssRefset_ModuleDependencyDelta_INT_{date}.txt"
    with mod_dep_path.open("w", encoding="utf-8") as fh:
        fh.write(_MODULE_DEP_HEADER)
        dep_uuid = str(uuid.uuid4())
        fh.write(
            f"{dep_uuid}\t{date}\t1\t{module_id}\t{_MODULE_DEP_REFSET}\t"
            f"{_MODEL_COMPONENT_MOD}\t\t{date}\n"
        )
    logger.info("  Module Dependency refset: %s", mod_dep_path.name)

    if not zip_it:
        delta_dir = output_dir / "Delta"
        logger.info("RF2 delta written (no ZIP): %s", delta_dir)
        return delta_dir, id_map_df

    # ── ZIP ───────────────────────────────────────────────────────────────────
    zip_path = output_dir / f"snomed_delta_{date}.zip"
    delta_root = output_dir / "Delta"
    with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
        for fpath in delta_root.rglob("*"):
            if fpath.is_file():
                zf.write(fpath, fpath.relative_to(output_dir))
    logger.info("RF2 delta ZIP: %s", zip_path)
    return zip_path, id_map_df

find_attributes_two_stage(medical_term, attribute_index, reference_index=None, cfg=None, verbose=True, precomputed_embedding=None)

Run the 4-step SNOMED CT attribute extraction pipeline.

Steps
  1. Retrieve reference examples (pgvector or in-memory).
  2. LLM infers applicable attributes.
  3. Retrieve SNOMED candidate values per attribute.
  4. LLM selects exact SNOMED concepts from candidates.

Parameters:

Name Type Description Default
medical_term str

The clinical term to decompose.

required
attribute_index AttributeIndex

Attribute searcher (pgvector or legacy dict).

required
reference_index ReferenceIndex | None

Reference searcher (pgvector, legacy dict, or None).

None
cfg HierarchySettings | None

Pipeline configuration.

None
verbose bool

Whether to log progress.

True
precomputed_embedding

Optional np.ndarray (shape [dim]) — the embedding of medical_term computed upstream (e.g. by PgvectorConceptSearcher.search_terms). When supplied, Step 1's reference-retrieval embedding API call is skipped, saving cost.

None

Returns:

Type Description
dict

Dict with keys attributes, extracted_components,

dict

retrieved_candidates, reference_examples, cost.

Source code in src/ariadne/hierarchy/pipeline.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def find_attributes_two_stage(
    medical_term: str,
    attribute_index: AttributeIndex,
    reference_index: ReferenceIndex | None = None,
    cfg: HierarchySettings | None = None,
    verbose: bool = True,
    precomputed_embedding=None,
) -> dict:
    """Run the 4-step SNOMED CT attribute extraction pipeline.

    Steps:
        1. Retrieve reference examples (pgvector or in-memory).
        2. LLM infers applicable attributes.
        3. Retrieve SNOMED candidate values per attribute.
        4. LLM selects exact SNOMED concepts from candidates.

    Args:
        medical_term: The clinical term to decompose.
        attribute_index: Attribute searcher (pgvector or legacy dict).
        reference_index: Reference searcher (pgvector, legacy dict, or None).
        cfg: Pipeline configuration.
        verbose: Whether to log progress.
        precomputed_embedding: Optional ``np.ndarray`` (shape ``[dim]``) — the
            embedding of *medical_term* computed upstream (e.g. by
            ``PgvectorConceptSearcher.search_terms``).  When supplied, Step 1's
            reference-retrieval embedding API call is skipped, saving cost.

    Returns:
        Dict with keys ``attributes``, ``extracted_components``,
        ``retrieved_candidates``, ``reference_examples``, ``cost``.
    """
    cfg_local: HierarchySettings = cfg if cfg is not None else load_hierarchy_settings()

    # Step 1
    similar_terms, reference_text, ref_cost = _retrieve_reference_examples(
        medical_term, reference_index, cfg_local, verbose,
        precomputed_embedding=precomputed_embedding,
    )

    # Step 2
    if verbose:
        logger.info("Step 2: Inferring attributes...")
    components, extraction_cost = extract_components(medical_term, reference_text=reference_text,
                                                     cfg=cfg_local)
    if verbose:
        logger.info("  Inferred: %s", json.dumps({k: v for k, v in components.items() if v}, indent=2))

    # Step 3
    if verbose:
        logger.info("Step 3: Retrieving candidates...")
    candidates_df, embedding_cost = _retrieve_candidates(
        components, attribute_index, similar_terms,
        verbose, cfg=cfg_local
    )

    # Step 4
    if verbose:
        logger.info("Step 4: Selecting best matches...")
    candidates_text = _build_selection_prompt(candidates_df)
    user_prompt = f"Medical term: {medical_term}\n\n{reference_text}\n\nCandidates:\n{candidates_text}"
    response, selection_cost = call_llm(cfg_local.prompts.selection, user_prompt, model=cfg_local.models.selection)

    total_cost = ref_cost + extraction_cost + embedding_cost + selection_cost

    result = parse_json_response(response)

    # --- Enforce interprets ↔ interpretation pairing ---
    if "attributes" in result:
        _enforce_interprets_pairing(result["attributes"], verbose=verbose)

    # Inject concept_code into each selected attribute concept dict
    if len(candidates_df) > 0 and "concept_code" in candidates_df.columns:
        code_lookup: dict[int, str] = (
            candidates_df.dropna(subset=["concept_code"])
            .drop_duplicates(subset=["concept_id"])
            .set_index("concept_id")["concept_code"]
            .to_dict()
        )
        def _inject_code(obj):
            if isinstance(obj, dict) and "concept_id" in obj:
                cid = obj.get("concept_id")
                if cid is not None and "concept_code" not in obj:
                    obj["concept_code"] = code_lookup.get(int(cid))
        if "attributes" in result and isinstance(result["attributes"], dict):
            for attr_key, attr_val in result["attributes"].items():
                if attr_val is None:
                    continue
                if attr_key == "interprets_interpretation" and isinstance(attr_val, list):
                    for pair in attr_val:
                        if isinstance(pair, dict):
                            for v in pair.values():
                                _inject_code(v)
                elif isinstance(attr_val, list):
                    for item in attr_val:
                        _inject_code(item)
                else:
                    _inject_code(attr_val)

    result['extracted_components'] = components
    result['retrieved_candidates'] = candidates_df.to_dict('records') if len(candidates_df) > 0 else []
    if similar_terms:
        result['reference_examples'] = similar_terms
    result['cost'] = {
        'extraction_cost': extraction_cost,
        'embedding_cost': embedding_cost,
        'selection_cost': selection_cost,
        'total_cost': total_cost,
    }
    if verbose:
        logger.info("Total cost: $%.4f", total_cost)
    return result

parse_classification_results(results_zip)

Parse the classification results ZIP into DataFrames.

Parameters:

Name Type Description Default
results_zip str | Path

Path to the classification-results-*.zip produced by :func:classify_delta.

required

Returns:

Type Description
DataFrame

(new_is_a, removed_redundant, equiv_df) where:

DataFrame
  • new_is_a — DataFrame of newly inferred Is a relationships (active == 1, typeId == 116680003). Columns: sourceId, destinationId, plus all original RF2 columns.
DataFrame
  • removed_redundant — DataFrame of relationships marked inactive (active == 0). These are stated relationships that became redundant after classification.
tuple[DataFrame, DataFrame, DataFrame]
  • equiv_df — DataFrame of equivalent concept pairs from the EquivalentConceptSimpleMap refset. Columns include referencedComponentId and mapTarget (group UUID).
Source code in src/ariadne/hierarchy/classifier.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def parse_classification_results(
    results_zip: str | Path,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Parse the classification results ZIP into DataFrames.

    Args:
        results_zip: Path to the ``classification-results-*.zip`` produced by
            :func:`classify_delta`.

    Returns:
        ``(new_is_a, removed_redundant, equiv_df)`` where:

        - **new_is_a** — DataFrame of newly inferred *Is a* relationships
          (``active == 1``, ``typeId == 116680003``).  Columns:
          ``sourceId``, ``destinationId``, plus all original RF2 columns.
        - **removed_redundant** — DataFrame of relationships marked inactive
          (``active == 0``).  These are stated relationships that became
          redundant after classification.
        - **equiv_df** — DataFrame of equivalent concept pairs from the
          EquivalentConceptSimpleMap refset.  Columns include
          ``referencedComponentId`` and ``mapTarget`` (group UUID).
    """
    results_zip = Path(results_zip)
    rel_dfs: list[pd.DataFrame] = []
    equiv_df = pd.DataFrame()

    with zipfile.ZipFile(results_zip, "r") as zf:
        for name in zf.namelist():
            if "Relationship" in name and name.endswith(".txt"):
                with zf.open(name) as f:
                    df = pd.read_csv(f, sep="\t", dtype=str)
                    rel_dfs.append(df)

            # Parse equivalent concepts refset
            if "equivalent" in name.lower() or "equiv" in name.lower():
                with zf.open(name) as f:
                    equiv_df = pd.read_csv(f, sep="\t", dtype=str)
                    if len(equiv_df) > 0:
                        logger.warning(
                            "EQUIVALENT CONCEPTS FOUND (%d rows)!  "
                            "This usually indicates a modelling error — "
                            "two concepts have identical defining attributes.",
                            len(equiv_df),
                        )

    if not rel_dfs:
        logger.warning("No Relationship files found in %s", results_zip)
        empty = pd.DataFrame(columns=[
            "id", "effectiveTime", "active", "moduleId", "sourceId",
            "destinationId", "relationshipGroup", "typeId",
            "characteristicTypeId", "modifierId",
        ])
        return empty, empty, equiv_df

    all_rels = pd.concat(rel_dfs, ignore_index=True)

    new_is_a = all_rels[
        (all_rels["typeId"] == "116680003") & (all_rels["active"] == "1")
    ].copy()

    removed = all_rels[all_rels["active"] == "0"].copy()

    logger.info(
        "Classification results: %d new 'Is a' relationships, "
        "%d redundant removals, %d equivalent concept rows.",
        len(new_is_a), len(removed), len(equiv_df),
    )

    return new_is_a, removed, equiv_df

pre_classification_checks(delta_zip)

Run lightweight structural checks on an RF2 delta ZIP.

Validates: - The ZIP contains the expected Terminology files - StatedRelationship file has correct TSV headers - All destinationId values look like valid SCTIDs (6+ digit integers) - All typeId values are in the known set of SNOMED attribute types

Parameters:

Name Type Description Default
delta_zip str | Path

Path to the RF2 delta ZIP.

required

Returns:

Type Description
list[str]

List of issue descriptions. Empty list means all checks passed.

Source code in src/ariadne/hierarchy/classifier.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def pre_classification_checks(delta_zip: str | Path) -> list[str]:
    """Run lightweight structural checks on an RF2 delta ZIP.

    Validates:
    - The ZIP contains the expected Terminology files
    - StatedRelationship file has correct TSV headers
    - All ``destinationId`` values look like valid SCTIDs (6+ digit integers)
    - All ``typeId`` values are in the known set of SNOMED attribute types

    Args:
        delta_zip: Path to the RF2 delta ZIP.

    Returns:
        List of issue descriptions.  Empty list means all checks passed.
    """
    issues: list[str] = []
    delta_zip = Path(delta_zip)

    if not delta_zip.is_file():
        issues.append(f"Delta ZIP not found: {delta_zip}")
        return issues

    stated_found = False
    stated_row_count = 0
    owl_axiom_found = False
    owl_axiom_row_count = 0
    concept_found = False

    with zipfile.ZipFile(delta_zip, "r") as zf:
        for name in zf.namelist():
            if "StatedRelationship" in name and name.endswith(".txt"):
                stated_found = True
                with zf.open(name) as f:
                    df = pd.read_csv(f, sep="\t", dtype=str)

                stated_row_count = len(df)

                # Only validate content if file is non-empty
                if stated_row_count > 0:
                    expected_cols = {
                        "id", "effectiveTime", "active", "moduleId",
                        "sourceId", "destinationId", "relationshipGroup",
                        "typeId", "characteristicTypeId", "modifierId",
                    }
                    missing_cols = expected_cols - set(df.columns)
                    if missing_cols:
                        issues.append(f"StatedRelationship missing columns: {missing_cols}")

                    if "destinationId" in df.columns:
                        bad_dests = df[
                            ~df["destinationId"].str.match(r"^\d{6,18}$", na=False)
                        ]
                        if len(bad_dests) > 0:
                            samples = bad_dests["destinationId"].head(5).tolist()
                            issues.append(
                                f"{len(bad_dests)} destinationId values look invalid "
                                f"(expected 6-18 digit SCTIDs). Samples: {samples}"
                            )

                    if "typeId" in df.columns:
                        unknown_types = set(df["typeId"]) - _VALID_TYPE_IDS
                        if unknown_types:
                            issues.append(
                                f"Unknown typeId values (not standard SNOMED attribute types): "
                                f"{unknown_types}"
                            )

            elif "OWLAxiom" in name and name.endswith(".txt"):
                owl_axiom_found = True
                with zf.open(name) as f:
                    owl_df = pd.read_csv(f, sep="\t", dtype=str)

                # Filter to active rows only
                active_owl = owl_df[owl_df.get("active", pd.Series(dtype=str)) == "1"] if "active" in owl_df.columns else owl_df
                owl_axiom_row_count = len(active_owl)

                if owl_axiom_row_count > 0:
                    expected_owl_cols = {
                        "id", "active", "moduleId", "refsetId",
                        "referencedComponentId", "owlExpression",
                    }
                    missing_owl_cols = expected_owl_cols - set(owl_df.columns)
                    if missing_owl_cols:
                        issues.append(f"OWL Axiom refset missing columns: {missing_owl_cols}")
                    elif "owlExpression" in owl_df.columns:
                        # Validate each active expression starts with a known axiom type
                        bad_exprs = active_owl[
                            ~active_owl["owlExpression"].str.match(
                                r"^(EquivalentClasses|SubClassOf|TransitiveObjectProperty|ReflexiveObjectProperty)\(",
                                na=False,
                            )
                        ]
                        if len(bad_exprs) > 0:
                            samples = bad_exprs["owlExpression"].head(3).str[:80].tolist()
                            issues.append(
                                f"{len(bad_exprs)} OWL expressions do not start with a known "
                                f"axiom type (EquivalentClasses/SubClassOf/...). "
                                f"Samples: {samples}"
                            )

                        # Validate referencedComponentId looks like SCTIDs
                        if "referencedComponentId" in owl_df.columns:
                            bad_ids = active_owl[
                                ~active_owl["referencedComponentId"].str.match(
                                    r"^\d{6,18}$", na=False
                                )
                            ]
                            if len(bad_ids) > 0:
                                samples = bad_ids["referencedComponentId"].head(5).tolist()
                                issues.append(
                                    f"{len(bad_ids)} OWL axiom referencedComponentId values "
                                    f"look invalid. Samples: {samples}"
                                )

            if "Concept" in name and name.endswith(".txt") and "OWLAxiom" not in name:
                concept_found = True

    if not stated_found and not owl_axiom_found:
        issues.append(
            "No StatedRelationship or OWL Axiom refset file found in the delta ZIP."
        )
    elif stated_row_count == 0 and owl_axiom_row_count == 0:
        issues.append(
            "No concept definitions found: StatedRelationship is empty and OWL Axiom "
            "refset has no active rows.  At least one must contain data."
        )
    if not concept_found:
        issues.append("No Concept file found in the delta ZIP.")

    return issues

process_gold_standard(gs_path, attribute_index, reference_index=None, cfg=None, checkpoint_every=5, max_workers=1)

Run the pipeline over every unique term in a gold-standard CSV.

Supports checkpointing and optional parallel execution.

When max_workers > 1, each worker thread creates its own database connections (psycopg is not thread-safe).

Parameters:

Name Type Description Default
gs_path str

Path to the gold-standard CSV (must have concept_id_1, concept_name_1 columns).

required
attribute_index AttributeIndex

Attribute searcher.

required
reference_index ReferenceIndex | None

Reference searcher (or None).

None
cfg HierarchySettings | None

Pipeline configuration.

None
checkpoint_every int

Save a checkpoint every N terms (default 5).

5
max_workers int

Number of parallel worker threads (default 1 = sequential).

1

Returns:

Type Description
list[dict]

List of result dicts (one per term).

Source code in src/ariadne/hierarchy/evaluator.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def process_gold_standard(
    gs_path: str,
    attribute_index: AttributeIndex,
    reference_index: ReferenceIndex | None = None,
    cfg: HierarchySettings | None = None,
    checkpoint_every: int = 5,
    max_workers: int = 1,
) -> list[dict]:
    """Run the pipeline over every unique term in a gold-standard CSV.

    Supports checkpointing and optional parallel execution.

    When *max_workers* > 1, each worker thread creates its own database
    connections (psycopg is not thread-safe).

    Args:
        gs_path: Path to the gold-standard CSV (must have ``concept_id_1``,
            ``concept_name_1`` columns).
        attribute_index: Attribute searcher.
        reference_index: Reference searcher (or None).
        cfg: Pipeline configuration.
        checkpoint_every: Save a checkpoint every N terms (default 5).
        max_workers: Number of parallel worker threads (default 1 = sequential).

    Returns:
        List of result dicts (one per term).
    """
    cfg_local: HierarchySettings = cfg if cfg is not None else load_hierarchy_settings()
    checkpoint_file = Path(cfg_local.evaluation.output_dir) / "hierarchy_checkpoint.pkl"

    gs_df = pd.read_csv(gs_path)
    unique_terms = gs_df[["concept_id_1", "concept_name_1"]].drop_duplicates()
    logger.info("Processing %d terms from %s", len(unique_terms), gs_path)
    logger.info(
        "Models: extraction=%s, selection=%s | workers=%d",
        cfg_local.models.extraction, cfg_local.models.selection, max_workers,
    )

    # --- resume from checkpoint if available ---
    if checkpoint_file.exists():
        try:
            with open(checkpoint_file, "rb") as f:
                checkpoint = pickle.load(f)
            all_results: list[dict] = checkpoint["results"]
            processed_ids: set = checkpoint["processed_ids"]
            logger.info("Resuming from checkpoint: %d terms already done", len(all_results))
        except (pickle.UnpicklingError, EOFError, KeyError) as exc:
            logger.warning("Corrupted checkpoint %s — starting fresh: %s", checkpoint_file, exc)
            checkpoint_file.unlink(missing_ok=True)
            all_results = []
            processed_ids = set()
    else:
        all_results = []
        processed_ids = set()

    pending = [
        row for row in unique_terms.itertuples(index=False)
        if row.concept_id_1 not in processed_ids
    ]

    if not pending:
        logger.info("All terms already processed.")
        return all_results

    if max_workers <= 1:
        # ── Sequential path ────────────────────────────────────────────────
        total_cost = sum(r.get("cost", {}).get("total_cost", 0.0) for r in all_results)
        for row in pending:
            logger.info(
                "\n%s\n[%d/%d] %s",
                "=" * 60, len(all_results) + 1, len(unique_terms), row.concept_name_1,
            )
            result = _process_term(
                row.concept_id_1, row.concept_name_1,
                attribute_index, reference_index, cfg_local,
            )
            all_results.append(result)
            processed_ids.add(row.concept_id_1)
            if "cost" in result:
                total_cost += result["cost"]["total_cost"]
                logger.info("  cost: $%.4f | running total: $%.4f",
                            result["cost"]["total_cost"], total_cost)

            if len(all_results) % checkpoint_every == 0:
                _save_checkpoint(checkpoint_file, all_results, processed_ids, cfg_local)

    else:
        # ── Parallel path ──────────────────────────────────────────────────
        # psycopg connections are not thread-safe — each worker creates its
        # own searcher instances. In-memory (legacy) indexes are read-only
        # and can be shared directly.
        attr_needs_conn = _has_db_connection(attribute_index)
        ref_needs_conn = reference_index is not None and _has_db_connection(reference_index)
        attr_cls = type(attribute_index)
        ref_cls = type(reference_index) if reference_index is not None else None

        lock = threading.Lock()
        done_count = [len(all_results)]  # mutable counter shared across threads

        def _worker(concept_id: int, concept_name: str) -> dict:
            # Create thread-local DB connections if needed
            local_attr = attr_cls(cfg=cfg_local) if attr_needs_conn else attribute_index
            local_ref = None
            if reference_index is not None:
                local_ref = ref_cls(cfg=cfg_local) if ref_needs_conn else reference_index
            try:
                return _process_term(concept_id, concept_name, local_attr, local_ref, cfg_local)
            finally:
                if attr_needs_conn:
                    local_attr.close()
                if ref_needs_conn and local_ref is not None:
                    local_ref.close()

        total_cost = sum(r.get("cost", {}).get("total_cost", 0.0) for r in all_results)
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(_worker, row.concept_id_1, row.concept_name_1): row
                for row in pending
            }
            for future in as_completed(futures):
                row = futures[future]
                result = future.result()
                with lock:
                    all_results.append(result)
                    processed_ids.add(row.concept_id_1)
                    done_count[0] += 1
                    n_done = done_count[0]
                    if "cost" in result:
                        total_cost += result["cost"]["total_cost"]
                    logger.info(
                        "[%d/%d] %s — cost: $%.4f | total: $%.4f",
                        n_done, len(unique_terms), row.concept_name_1,
                        result.get("cost", {}).get("total_cost", 0.0),
                        total_cost,
                    )
                    if n_done % checkpoint_every == 0:
                        _save_checkpoint(checkpoint_file, all_results, processed_ids, cfg_local)

    logger.info(
        "\n%s\nCompleted: %d terms, Total cost: $%.4f",
        "=" * 60, len(all_results), total_cost,
    )

    # --- clean up checkpoint on success ---
    if checkpoint_file.exists():
        checkpoint_file.unlink()
        logger.info("Checkpoint file cleaned up")

    return all_results

resolve_parent_names(is_a_df, source_names=None, id_mapping=None)

Add human-readable names to inferred Is a relationships.

Resolves destinationId (SNOMED SCTIDs) to concept names via the vocabulary database, and sourceId (synthetic IDs) back to OMOP IDs using the ID mapping from export_to_rf2.

Parameters:

Name Type Description Default
is_a_df DataFrame

DataFrame with sourceId and destinationId columns (from :func:parse_classification_results).

required
source_names dict[int, str] | DataFrame | None

Mapping of source concept ID → name. Can be: - dict[int, str] — direct mapping - pd.DataFrame with concept_id_1 and concept_name_1 columns - None — source names will be left as IDs

None
id_mapping DataFrame | None

DataFrame with synthetic_sctid and omop_concept_id columns (from export_to_rf2). Used to translate synthetic sourceIds back to OMOP IDs. If None, sourceIds are used as-is.

None

Returns:

Type Description
DataFrame

DataFrame with columns: source_id, source_name,

DataFrame

parent_sctid, parent_name.

Source code in src/ariadne/hierarchy/classifier.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
def resolve_parent_names(
    is_a_df: pd.DataFrame,
    source_names: dict[int, str] | pd.DataFrame | None = None,
    id_mapping: pd.DataFrame | None = None,
) -> pd.DataFrame:
    """Add human-readable names to inferred *Is a* relationships.

    Resolves ``destinationId`` (SNOMED SCTIDs) to concept names via the
    vocabulary database, and ``sourceId`` (synthetic IDs) back to OMOP IDs
    using the ID mapping from ``export_to_rf2``.

    Args:
        is_a_df: DataFrame with ``sourceId`` and ``destinationId`` columns
            (from :func:`parse_classification_results`).
        source_names: Mapping of source concept ID → name.  Can be:
            - ``dict[int, str]`` — direct mapping
            - ``pd.DataFrame`` with ``concept_id_1`` and ``concept_name_1`` columns
            - ``None`` — source names will be left as IDs
        id_mapping: DataFrame with ``synthetic_sctid`` and ``omop_concept_id``
            columns (from ``export_to_rf2``).  Used to translate synthetic
            sourceIds back to OMOP IDs.  If ``None``, sourceIds are used as-is.

    Returns:
        DataFrame with columns: ``source_id``, ``source_name``,
        ``parent_sctid``, ``parent_name``.
    """
    import psycopg
    from pgvector.psycopg import register_vector

    from ariadne.utils.utils import get_environment_variable

    if is_a_df.empty:
        return pd.DataFrame(
            columns=["source_id", "source_name", "parent_sctid", "parent_name"]
        )

    # Build synthetic → OMOP mapping if provided
    synth_to_omop: dict[str, str] = {}
    if id_mapping is not None:
        synth_to_omop = dict(
            zip(
                id_mapping["synthetic_sctid"].astype(str),
                id_mapping["omop_concept_id"].astype(str),
            )
        )

    # Resolve source names (keyed by OMOP concept_id)
    if isinstance(source_names, pd.DataFrame):
        src_map: dict[str, str] = dict(
            zip(
                source_names["concept_id_1"].astype(str),
                source_names["concept_name_1"],
            )
        )
    elif isinstance(source_names, dict):
        src_map = {str(k): v for k, v in source_names.items()}
    else:
        src_map = {}

    # Resolve parent names from DB using concept_code (SCTID)
    parent_sctids = is_a_df["destinationId"].unique().tolist()

    # Some parents may be synthetic delta concepts (inter-delta Is a);
    # resolve those from the id_mapping instead of querying the DB.
    parent_name_map: dict[str, str] = {}
    real_parent_sctids: list[str] = []
    if id_mapping is not None:
        synth_set = set(id_mapping["synthetic_sctid"].astype(str))
        synth_name_lookup = dict(
            zip(
                id_mapping["synthetic_sctid"].astype(str),
                id_mapping["concept_name"].astype(str),
            )
        )
        for sctid in parent_sctids:
            if sctid in synth_set:
                parent_name_map[sctid] = synth_name_lookup.get(sctid, sctid)
            else:
                real_parent_sctids.append(sctid)
    else:
        real_parent_sctids = parent_sctids

    conn_str = get_environment_variable("VOCAB_CONNECTION_STRING")
    conn_str = conn_str.replace("+psycopg", "").replace("+psycopg2", "")
    schema = get_environment_variable("VOCAB_SCHEMA")

    if real_parent_sctids:
        with psycopg.connect(conn_str) as conn:
            with conn.cursor() as cur:
                cur.execute(
                    f"SELECT concept_code, concept_name "
                    f"FROM {schema}.concept "
                    f"WHERE vocabulary_id = 'SNOMED' "
                    f"  AND concept_code = ANY(%s)",
                    (real_parent_sctids,),
                )
                for code, name in cur.fetchall():
                    parent_name_map[str(code)] = name

    resolved = len(parent_name_map)
    total = len(parent_sctids)
    if resolved < total:
        unresolved = set(parent_sctids) - set(parent_name_map.keys())
        logger.warning(
            "Could not resolve %d / %d parent SCTIDs to names: %s",
            total - resolved, total, list(unresolved)[:10],
        )

    # Also build a fallback name map from id_mapping for source concepts
    # whose OMOP IDs may not be in source_names (e.g. from attribute_results
    # but absent from the gold standard DataFrame).
    if id_mapping is not None:
        for _, m in id_mapping.iterrows():
            omop_str = str(m["omop_concept_id"])
            if omop_str not in src_map and str(m.get("concept_name", "")):
                src_map[omop_str] = str(m["concept_name"])

    rows = []
    for _, rel in is_a_df.iterrows():
        synth_id = str(rel["sourceId"])
        # Map synthetic ID back to OMOP ID if mapping is available
        omop_id = synth_to_omop.get(synth_id, synth_id)
        dest_id = str(rel["destinationId"])
        rows.append({
            "source_id": omop_id,
            "source_name": src_map.get(omop_id, omop_id),
            "parent_sctid": dest_id,
            "parent_name": parent_name_map.get(dest_id, dest_id),
        })

    return pd.DataFrame(rows)