Skip to content

term_cleaner

TermCleaner

A class to clean clinical terms by removing non-essential modifiers and information using a Large Language Model (LLM).

Source code in src/ariadne/term_cleanup/term_cleaner.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class TermCleaner:
    """
    A class to clean clinical terms by removing non-essential modifiers and information using a Large Language Model (LLM).
    """

    def __init__(self, settings: TermCleanerSettings):
        self.system_prompt = settings.system_prompt
        self.cost = 0.0
        self._cost_lock = threading.Lock()

    def rewrite_and(
        self, term: str, vocabulary_id: str = "", concept_code: str = ""
    ) -> str:
        """
        Rewrite ' and ' → ' and/or ' for ICD-family terms where appropriate.
        Args:
            term: The clinical term to be cleaned.
            vocabulary_id: Source vocabulary (e.g. 'ICD10CM'). Used for
                           the and/or rewrite; safe to omit for non-ICD terms.
            concept_code: Source concept code. Used for and/or rewrite
                          exclusions; safe to omit.

        Returns:
            The cleaned clinical term.
        """
        if _should_replace_and(term, vocabulary_id, concept_code):
            term = re.sub(r" and ", " and/or ", term, flags=re.IGNORECASE)
        return term

    def _clean_terms_batch(self, terms: list[str]) -> list[str]:
        """Cleans up to 25 terms in one LLM request and returns results in input order."""
        if not terms:
            return []

        rows = [{"row_number": i, "source_term": term} for i, term in enumerate(terms)]
        prompt = (
            "Input JSON:\n"
            f"{json.dumps({'terms': rows}, ensure_ascii=False)}"
        )
        response = get_llm_response(
            prompt=prompt,
            system_prompt=self.system_prompt,
            json_schema=_TERM_CLEANING_BATCH_SCHEMA,
            json_schema_name="term_cleaning_batch",
        )
        self.cost += response["usage"]["total_cost_usd"]
        cleaned_terms = list(terms)
        parsed = response["parsed_json"]
        if not isinstance(parsed, dict):
            raise ValueError("Term cleaning response must be a JSON object with a 'results' array.")
        results = parsed["results"]
        if not isinstance(results, list):
            raise ValueError("Term cleaning response 'results' must be a list.")
        for item in results:
            if not isinstance(item, dict):
                raise ValueError("Each term cleaning result must be an object.")
            if "row_number" not in item:
                raise ValueError("Each term cleaning result must include integer 'row_number'.")
            if "cleaned_term" not in item:
                raise ValueError("Each term cleaning result must include string 'cleaned_term'.")
            row_number = item["row_number"]
            cleaned_term = item["cleaned_term"]
            if not isinstance(row_number, int):
                raise ValueError("Each term cleaning result must include integer 'row_number'.")
            if row_number < 0 or row_number >= len(cleaned_terms):
                raise ValueError("Row number out of range in term cleaning result.")
            if not isinstance(cleaned_term, str):
                raise ValueError("Each term cleaning result must include string 'cleaned_term'.")
            cleaned_term = cleaned_term.strip()
            cleaned_terms[row_number] = cleaned_term
        return cleaned_terms

    def clean_terms(
        self,
        df: pd.DataFrame,
        term_column: str = "source_term",
        output_column: str = "cleaned_term",
        vocabulary_column: str = "vocabulary_id",
        code_column: str = "concept_code",
    ) -> pd.DataFrame:
        """
        Cleans clinical terms in a DataFrame column.

        When *vocabulary_column* and *code_column* are present in *df*, they are
        used for the and/or rewrite (pass 1).  If absent the rewrite is skipped
        for all rows and only LLM cleanup (pass 2) runs — preserving full
        backward compatibility with callers that don't supply those columns.

        Args:
            df: DataFrame containing the terms to be cleaned.
            term_column: Column with source terms.
            output_column: Column to write cleaned terms to.
            vocabulary_column: Column with vocabulary_id (optional).
            code_column: Column with concept_code (optional).

        Returns:
            DataFrame with cleaned terms in *output_column*.
        """
        has_vocab = vocabulary_column in df.columns
        has_code = code_column in df.columns

        terms = df[term_column].astype(str).tolist()
        if has_vocab and has_code:
            vocab_values = df[vocabulary_column].fillna("").astype(str).tolist()
            code_values = df[code_column].fillna("").astype(str).tolist()
            terms = [
                self.rewrite_and(term, vocabulary_id, concept_code)
                for term, vocabulary_id, concept_code in zip(terms, vocab_values, code_values)
            ]

        for start in range(0, len(terms), _BATCH_SIZE):
            batch_indices = df.index[start : start + _BATCH_SIZE]
            batch_terms = terms[start : start + _BATCH_SIZE]
            cleaned_batch = self._clean_terms_batch(batch_terms)
            df.loc[batch_indices, output_column] = cleaned_batch

        return df

    def get_total_cost(self) -> float:
        """
        Returns the total cost incurred for LLM calls during term cleaning.

        Returns:
            Total cost in USD.
        """

        return self.cost

clean_terms(df, term_column='source_term', output_column='cleaned_term', vocabulary_column='vocabulary_id', code_column='concept_code')

Cleans clinical terms in a DataFrame column.

When vocabulary_column and code_column are present in df, they are used for the and/or rewrite (pass 1). If absent the rewrite is skipped for all rows and only LLM cleanup (pass 2) runs — preserving full backward compatibility with callers that don't supply those columns.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing the terms to be cleaned.

required
term_column str

Column with source terms.

'source_term'
output_column str

Column to write cleaned terms to.

'cleaned_term'
vocabulary_column str

Column with vocabulary_id (optional).

'vocabulary_id'
code_column str

Column with concept_code (optional).

'concept_code'

Returns:

Type Description
DataFrame

DataFrame with cleaned terms in output_column.

Source code in src/ariadne/term_cleanup/term_cleaner.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def clean_terms(
    self,
    df: pd.DataFrame,
    term_column: str = "source_term",
    output_column: str = "cleaned_term",
    vocabulary_column: str = "vocabulary_id",
    code_column: str = "concept_code",
) -> pd.DataFrame:
    """
    Cleans clinical terms in a DataFrame column.

    When *vocabulary_column* and *code_column* are present in *df*, they are
    used for the and/or rewrite (pass 1).  If absent the rewrite is skipped
    for all rows and only LLM cleanup (pass 2) runs — preserving full
    backward compatibility with callers that don't supply those columns.

    Args:
        df: DataFrame containing the terms to be cleaned.
        term_column: Column with source terms.
        output_column: Column to write cleaned terms to.
        vocabulary_column: Column with vocabulary_id (optional).
        code_column: Column with concept_code (optional).

    Returns:
        DataFrame with cleaned terms in *output_column*.
    """
    has_vocab = vocabulary_column in df.columns
    has_code = code_column in df.columns

    terms = df[term_column].astype(str).tolist()
    if has_vocab and has_code:
        vocab_values = df[vocabulary_column].fillna("").astype(str).tolist()
        code_values = df[code_column].fillna("").astype(str).tolist()
        terms = [
            self.rewrite_and(term, vocabulary_id, concept_code)
            for term, vocabulary_id, concept_code in zip(terms, vocab_values, code_values)
        ]

    for start in range(0, len(terms), _BATCH_SIZE):
        batch_indices = df.index[start : start + _BATCH_SIZE]
        batch_terms = terms[start : start + _BATCH_SIZE]
        cleaned_batch = self._clean_terms_batch(batch_terms)
        df.loc[batch_indices, output_column] = cleaned_batch

    return df

get_total_cost()

Returns the total cost incurred for LLM calls during term cleaning.

Returns:

Type Description
float

Total cost in USD.

Source code in src/ariadne/term_cleanup/term_cleaner.py
232
233
234
235
236
237
238
239
240
def get_total_cost(self) -> float:
    """
    Returns the total cost incurred for LLM calls during term cleaning.

    Returns:
        Total cost in USD.
    """

    return self.cost

rewrite_and(term, vocabulary_id='', concept_code='')

Rewrite ' and ' → ' and/or ' for ICD-family terms where appropriate. Args: term: The clinical term to be cleaned. vocabulary_id: Source vocabulary (e.g. 'ICD10CM'). Used for the and/or rewrite; safe to omit for non-ICD terms. concept_code: Source concept code. Used for and/or rewrite exclusions; safe to omit.

Returns:

Type Description
str

The cleaned clinical term.

Source code in src/ariadne/term_cleanup/term_cleaner.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def rewrite_and(
    self, term: str, vocabulary_id: str = "", concept_code: str = ""
) -> str:
    """
    Rewrite ' and ' → ' and/or ' for ICD-family terms where appropriate.
    Args:
        term: The clinical term to be cleaned.
        vocabulary_id: Source vocabulary (e.g. 'ICD10CM'). Used for
                       the and/or rewrite; safe to omit for non-ICD terms.
        concept_code: Source concept code. Used for and/or rewrite
                      exclusions; safe to omit.

    Returns:
        The cleaned clinical term.
    """
    if _should_replace_and(term, vocabulary_id, concept_code):
        term = re.sub(r" and ", " and/or ", term, flags=re.IGNORECASE)
    return term