Skip to content

pipeline

Four-step SNOMED CT attribute extraction pipeline.

Public API

find_attributes_two_stage(medical_term, attribute_index, ...) → dict

Helpers (prefixed with _) handle individual steps: _retrieve_reference_examples — Step 1 extract_components — Step 2 _retrieve_candidates — Step 3 _build_selection_prompt — Step 4a

ContentFilterError

Bases: Exception

Raised when the LLM content filter blocks a response.

Source code in src/ariadne/hierarchy/pipeline.py
45
46
class ContentFilterError(Exception):
    """Raised when the LLM content filter blocks a response."""

call_llm(system_prompt, user_prompt, model)

Call the LLM and return LlmResult(content, cost_usd).

Parameters:

Name Type Description Default
system_prompt str

System-level prompt text.

required
user_prompt str

User-level prompt text.

required
model str

Model identifier (from cfg.models).

required

Returns:

Type Description
LlmResult

LlmResult(content, cost).

Raises:

Type Description
ContentFilterError

If the content filter blocks the response.

Source code in src/ariadne/hierarchy/pipeline.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def call_llm(system_prompt: str, user_prompt: str, model: str) -> LlmResult:
    """Call the LLM and return ``LlmResult(content, cost_usd)``.

    Args:
        system_prompt: System-level prompt text.
        user_prompt: User-level prompt text.
        model: Model identifier (from ``cfg.models``).

    Returns:
        LlmResult(content, cost).

    Raises:
        ContentFilterError: If the content filter blocks the response.
    """
    result = get_llm_response(user_prompt, system_prompt=system_prompt)
    if result["content"] is None:
        raise ContentFilterError(
            f"Content filter triggered for prompt: {user_prompt[:100]}..."
        )
    return LlmResult(result["content"], result["usage"]["total_cost_usd"])

extract_components(medical_term, reference_text, cfg)

Step 2: Use the LLM to infer applicable SNOMED attributes.

Parameters:

Name Type Description Default
medical_term str

Term to decompose.

required
reference_text str

Formatted reference examples block.

required
cfg HierarchySettings

Pipeline configuration.

required

Returns:

Type Description
ExtractionResult

ExtractionResult(components, cost).

Source code in src/ariadne/hierarchy/pipeline.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def extract_components(
    medical_term: str,
    reference_text: str,
    cfg: HierarchySettings,
) -> ExtractionResult:
    """Step 2: Use the LLM to infer applicable SNOMED attributes.

    Args:
        medical_term: Term to decompose.
        reference_text: Formatted reference examples block.
        cfg: Pipeline configuration.

    Returns:
        ExtractionResult(components, cost).
    """
    if reference_text:
        reference_section = (
            "=== REFERENCE EXAMPLES ===\nStudy these carefully. "
            "They show how SNOMED assigns attributes to similar terms:\n\n" + reference_text
        )
    else:
        reference_section = ""
    system_prompt = cfg.prompts.extraction.format(reference_section=reference_section)
    user_prompt = f'Determine the attributes for: "{medical_term}"'
    response, cost = call_llm(system_prompt, user_prompt, model=cfg.models.extraction)
    return ExtractionResult(parse_json_response(response), cost)

find_attributes_two_stage(medical_term, attribute_index, reference_index=None, cfg=None, verbose=True, precomputed_embedding=None)

Run the 4-step SNOMED CT attribute extraction pipeline.

Steps
  1. Retrieve reference examples (pgvector or in-memory).
  2. LLM infers applicable attributes.
  3. Retrieve SNOMED candidate values per attribute.
  4. LLM selects exact SNOMED concepts from candidates.

Parameters:

Name Type Description Default
medical_term str

The clinical term to decompose.

required
attribute_index AttributeIndex

Attribute searcher (pgvector or legacy dict).

required
reference_index ReferenceIndex | None

Reference searcher (pgvector, legacy dict, or None).

None
cfg HierarchySettings | None

Pipeline configuration.

None
verbose bool

Whether to log progress.

True
precomputed_embedding

Optional np.ndarray (shape [dim]) — the embedding of medical_term computed upstream (e.g. by PgvectorConceptSearcher.search_terms). When supplied, Step 1's reference-retrieval embedding API call is skipped, saving cost.

None

Returns:

Type Description
dict

Dict with keys attributes, extracted_components,

dict

retrieved_candidates, reference_examples, cost.

Source code in src/ariadne/hierarchy/pipeline.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def find_attributes_two_stage(
    medical_term: str,
    attribute_index: AttributeIndex,
    reference_index: ReferenceIndex | None = None,
    cfg: HierarchySettings | None = None,
    verbose: bool = True,
    precomputed_embedding=None,
) -> dict:
    """Run the 4-step SNOMED CT attribute extraction pipeline.

    Steps:
        1. Retrieve reference examples (pgvector or in-memory).
        2. LLM infers applicable attributes.
        3. Retrieve SNOMED candidate values per attribute.
        4. LLM selects exact SNOMED concepts from candidates.

    Args:
        medical_term: The clinical term to decompose.
        attribute_index: Attribute searcher (pgvector or legacy dict).
        reference_index: Reference searcher (pgvector, legacy dict, or None).
        cfg: Pipeline configuration.
        verbose: Whether to log progress.
        precomputed_embedding: Optional ``np.ndarray`` (shape ``[dim]``) — the
            embedding of *medical_term* computed upstream (e.g. by
            ``PgvectorConceptSearcher.search_terms``).  When supplied, Step 1's
            reference-retrieval embedding API call is skipped, saving cost.

    Returns:
        Dict with keys ``attributes``, ``extracted_components``,
        ``retrieved_candidates``, ``reference_examples``, ``cost``.
    """
    cfg_local: HierarchySettings = cfg if cfg is not None else load_hierarchy_settings()

    # Step 1
    similar_terms, reference_text, ref_cost = _retrieve_reference_examples(
        medical_term, reference_index, cfg_local, verbose,
        precomputed_embedding=precomputed_embedding,
    )

    # Step 2
    if verbose:
        logger.info("Step 2: Inferring attributes...")
    components, extraction_cost = extract_components(medical_term, reference_text=reference_text,
                                                     cfg=cfg_local)
    if verbose:
        logger.info("  Inferred: %s", json.dumps({k: v for k, v in components.items() if v}, indent=2))

    # Step 3
    if verbose:
        logger.info("Step 3: Retrieving candidates...")
    candidates_df, embedding_cost = _retrieve_candidates(
        components, attribute_index, similar_terms,
        verbose, cfg=cfg_local
    )

    # Step 4
    if verbose:
        logger.info("Step 4: Selecting best matches...")
    candidates_text = _build_selection_prompt(candidates_df)
    user_prompt = f"Medical term: {medical_term}\n\n{reference_text}\n\nCandidates:\n{candidates_text}"
    response, selection_cost = call_llm(cfg_local.prompts.selection, user_prompt, model=cfg_local.models.selection)

    total_cost = ref_cost + extraction_cost + embedding_cost + selection_cost

    result = parse_json_response(response)

    # --- Enforce interprets ↔ interpretation pairing ---
    if "attributes" in result:
        _enforce_interprets_pairing(result["attributes"], verbose=verbose)

    # Inject concept_code into each selected attribute concept dict
    if len(candidates_df) > 0 and "concept_code" in candidates_df.columns:
        code_lookup: dict[int, str] = (
            candidates_df.dropna(subset=["concept_code"])
            .drop_duplicates(subset=["concept_id"])
            .set_index("concept_id")["concept_code"]
            .to_dict()
        )
        def _inject_code(obj):
            if isinstance(obj, dict) and "concept_id" in obj:
                cid = obj.get("concept_id")
                if cid is not None and "concept_code" not in obj:
                    obj["concept_code"] = code_lookup.get(int(cid))
        if "attributes" in result and isinstance(result["attributes"], dict):
            for attr_key, attr_val in result["attributes"].items():
                if attr_val is None:
                    continue
                if attr_key == "interprets_interpretation" and isinstance(attr_val, list):
                    for pair in attr_val:
                        if isinstance(pair, dict):
                            for v in pair.values():
                                _inject_code(v)
                elif isinstance(attr_val, list):
                    for item in attr_val:
                        _inject_code(item)
                else:
                    _inject_code(attr_val)

    result['extracted_components'] = components
    result['retrieved_candidates'] = candidates_df.to_dict('records') if len(candidates_df) > 0 else []
    if similar_terms:
        result['reference_examples'] = similar_terms
    result['cost'] = {
        'extraction_cost': extraction_cost,
        'embedding_cost': embedding_cost,
        'selection_cost': selection_cost,
        'total_cost': total_cost,
    }
    if verbose:
        logger.info("Total cost: $%.4f", total_cost)
    return result

find_similar_reference_terms(query, reference_index, top_k, precomputed_embedding=None)

Find similar reference terms for few-shot examples.

Parameters:

Name Type Description Default
query str

Medical term to search for.

required
reference_index ReferenceIndex

Reference searcher (pgvector or legacy wrapper).

required
top_k int

Number of reference examples (from cfg.retrieval.num_reference_examples).

required
precomputed_embedding

Optional np.ndarray (shape [dim]). When supplied, passed straight through to SnomedReferenceSearcher.search so the embedding API call is skipped entirely.

None

Returns:

Type Description
ReferenceSearchResult

ReferenceSearchResult(examples, cost).

Source code in src/ariadne/hierarchy/pipeline.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def find_similar_reference_terms(
    query: str,
    reference_index: ReferenceIndex,
    top_k: int,
    precomputed_embedding=None,
) -> ReferenceSearchResult:
    """Find similar reference terms for few-shot examples.

    Args:
        query: Medical term to search for.
        reference_index: Reference searcher (pgvector or legacy wrapper).
        top_k: Number of reference examples (from ``cfg.retrieval.num_reference_examples``).
        precomputed_embedding: Optional ``np.ndarray`` (shape ``[dim]``).  When
            supplied, passed straight through to
            ``SnomedReferenceSearcher.search`` so the embedding API call is
            skipped entirely.

    Returns:
        ReferenceSearchResult(examples, cost).
    """
    if precomputed_embedding is not None:
        return reference_index.search(query, top_k=top_k, embedding=precomputed_embedding)
    return reference_index.search(query, top_k=top_k)

format_reference_examples(similar_terms)

Format reference examples into a human-readable block for the prompt.

Parameters:

Name Type Description Default
similar_terms list[dict]

List of reference dicts from find_similar_reference_terms.

required

Returns:

Type Description
str

Formatted string, or empty string if no terms.

Source code in src/ariadne/hierarchy/pipeline.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def format_reference_examples(similar_terms: list[dict]) -> str:
    """Format reference examples into a human-readable block for the prompt.

    Args:
        similar_terms: List of reference dicts from ``find_similar_reference_terms``.

    Returns:
        Formatted string, or empty string if no terms.
    """
    if not similar_terms:
        return ""
    examples = []
    for term in similar_terms:
        attrs_text = [f"  - {a['attribute_category']}: {a['concept_name_2']} ({a['concept_id_2']})"
                      for a in term['attributes']]
        attrs_str = "\n".join(attrs_text) if attrs_text else "  (no attributes)"
        examples.append(f"Term: {term['concept_name']} ({term['concept_id']})\nAttributes:\n{attrs_str}")
    return "Similar SNOMED terms for reference:\n\n" + "\n\n".join(examples)

parse_json_response(response)

Parse a JSON response from an LLM, stripping markdown fences if present.

Parameters:

Name Type Description Default
response str

Raw LLM response string.

required

Returns:

Type Description
dict

Parsed dict.

Raises:

Type Description
ValueError

If the response cannot be parsed as JSON.

Source code in src/ariadne/hierarchy/pipeline.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def parse_json_response(response: str) -> dict:
    """Parse a JSON response from an LLM, stripping markdown fences if present.

    Args:
        response: Raw LLM response string.

    Returns:
        Parsed dict.

    Raises:
        ValueError: If the response cannot be parsed as JSON.
    """
    raw = response  # keep original for diagnostics
    response = response.strip()
    if response.startswith("```"):
        response = response.split("```")[1]
        if response.startswith("json"):
            response = response[4:]
    try:
        return json.loads(response)
    except json.JSONDecodeError as exc:
        logger.error(
            "Failed to parse LLM response as JSON. Raw response:\n%s", raw
        )
        raise ValueError(
            f"LLM returned malformed JSON: {exc}. "
            f"First 200 chars of response: {raw[:200]!r}"
        ) from exc