Skip to content

term_downloader

download_terms(settings)

Download terms from vocabulary database and store them in parquet files for use in verbatim mapping.

Parameters:

Name Type Description Default
settings VerbatimMappingSettings

A VerbatimMappingSettings object containing configuration parameters. This specifies the vocabularies, domains, etc. to filter the terms to be downloaded.

required

Returns:

Type Description
None

None

Source code in src/ariadne/verbatim_mapping/term_downloader.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def download_terms(settings: VerbatimMappingSettings) -> None:
    """
    Download terms from vocabulary database and store them in parquet files for use in verbatim mapping.

    Args:
        settings: A VerbatimMappingSettings object containing configuration parameters. This specifies the
            vocabularies, domains, etc. to filter the terms to be downloaded.

    Returns:
        None
    """
    # Check if Parquet files already exist. Skip download if they do.
    if os.path.exists(settings.terms_folder) and os.listdir(settings.terms_folder):
        print(f"Parquet files already exist in folder {settings.terms_folder}. Skipping download.")
        return

    os.makedirs(settings.log_folder, exist_ok=True)
    os.makedirs(settings.terms_folder, exist_ok=True)
    open_log(os.path.join(settings.log_folder, "logDownloadTerms.txt"))

    logging.info("Starting downloading terms")

    engine = create_engine(get_environment_variable("VOCAB_CONNECTION_STRING"))
    query = _create_query(engine=engine, settings=settings)

    with engine.connect() as connection:
        terms_result_set = connection.execution_options(stream_results=True).execute(query)
        total_inserted = 0
        while True:
            chunk = terms_result_set.fetchmany(settings.download_batch_size)
            if not chunk:
                break
            _store_in_parquet(
                concept_ids=[row.concept_id for row in chunk],
                terms=[row.term for row in chunk],
                concept_names=[row.concept_name for row in chunk],
                vocabulary_ids=[row.vocabulary_id for row in chunk],
                # vocabulary_ids=[row.vocabulary_id for row in chunk],
                # domain_ids=[row.domain_id for row in chunk],
                # standard_concepts=[row.standard_concept for row in chunk],
                # sources=[row.source for row in chunk],
                file_name=os.path.join(
                    settings.terms_folder,
                    f"Terms_{total_inserted + 1}_{total_inserted + len(chunk)}.parquet",
                ),
            )
            total_inserted += len(chunk)
            logging.info(f"Downloaded {len(chunk)} rows, total downloaded: {total_inserted}")
    logging.info("Finished downloading terms")