Download terms from vocabulary database and store them in parquet files for use in verbatim mapping.
Parameters:
| Name |
Type |
Description |
Default |
config
|
Config
|
A Config object containing configuration parameters. This function uses the verbatim_mapping section of
the config, which specifies the vocabularies, domains, etc. to filter the terms to be downloaded.
|
Config()
|
Returns:
Source code in src/ariadne/verbatim_mapping/term_downloader.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192 | def download_terms(config: Config = Config()) -> None:
"""
Download terms from vocabulary database and store them in parquet files for use in verbatim mapping.
Args:
config: A Config object containing configuration parameters. This function uses the verbatim_mapping section of
the config, which specifies the vocabularies, domains, etc. to filter the terms to be downloaded.
Returns:
None
"""
# Check if Parquet files already exist. Skip download if they do.
if os.path.exists(config.system.terms_folder) and os.listdir(config.system.terms_folder):
print(f"Parquet files already exist in folder {config.system.terms_folder}. Skipping download.")
return
os.makedirs(config.system.log_folder, exist_ok=True)
os.makedirs(config.system.terms_folder, exist_ok=True)
open_log(os.path.join(config.system.log_folder, "logDownloadTerms.txt"))
logging.info("Starting downloading terms")
engine = create_engine(get_environment_variable("VOCAB_CONNECTION_STRING"))
query = _create_query(engine=engine, config=config)
with engine.connect() as connection:
terms_result_set = connection.execution_options(stream_results=True).execute(query)
total_inserted = 0
while True:
chunk = terms_result_set.fetchmany(config.system.download_batch_size)
if not chunk:
break
_store_in_parquet(
concept_ids=[row.concept_id for row in chunk],
terms=[row.term for row in chunk],
concept_names=[row.concept_name for row in chunk],
# vocabulary_ids=[row.vocabulary_id for row in chunk],
# domain_ids=[row.domain_id for row in chunk],
# standard_concepts=[row.standard_concept for row in chunk],
# sources=[row.source for row in chunk],
file_name=os.path.join(
config.system.terms_folder,
f"Terms_{total_inserted + 1}_{total_inserted + len(chunk)}.parquet",
),
)
total_inserted += len(chunk)
logging.info(f"Downloaded {len(chunk)} rows, total downloaded: {total_inserted}")
logging.info("Finished downloading terms")
|