Download terms from vocabulary database and store them in parquet files for use in verbatim mapping.
Parameters:
| Name |
Type |
Description |
Default |
settings
|
VerbatimMappingSettings
|
A VerbatimMappingSettings object containing configuration parameters. This specifies the
vocabularies, domains, etc. to filter the terms to be downloaded.
|
required
|
Returns:
Source code in src/ariadne/verbatim_mapping/term_downloader.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 | def download_terms(settings: VerbatimMappingSettings) -> None:
"""
Download terms from vocabulary database and store them in parquet files for use in verbatim mapping.
Args:
settings: A VerbatimMappingSettings object containing configuration parameters. This specifies the
vocabularies, domains, etc. to filter the terms to be downloaded.
Returns:
None
"""
# Check if Parquet files already exist. Skip download if they do.
if os.path.exists(settings.terms_folder) and os.listdir(settings.terms_folder):
print(f"Parquet files already exist in folder {settings.terms_folder}. Skipping download.")
return
os.makedirs(settings.log_folder, exist_ok=True)
os.makedirs(settings.terms_folder, exist_ok=True)
open_log(os.path.join(settings.log_folder, "logDownloadTerms.txt"))
logging.info("Starting downloading terms")
engine = create_engine(get_environment_variable("VOCAB_CONNECTION_STRING"))
query = _create_query(engine=engine, settings=settings)
with engine.connect() as connection:
terms_result_set = connection.execution_options(stream_results=True).execute(query)
total_inserted = 0
while True:
chunk = terms_result_set.fetchmany(settings.download_batch_size)
if not chunk:
break
_store_in_parquet(
concept_ids=[row.concept_id for row in chunk],
terms=[row.term for row in chunk],
concept_names=[row.concept_name for row in chunk],
vocabulary_ids=[row.vocabulary_id for row in chunk],
# vocabulary_ids=[row.vocabulary_id for row in chunk],
# domain_ids=[row.domain_id for row in chunk],
# standard_concepts=[row.standard_concept for row in chunk],
# sources=[row.source for row in chunk],
file_name=os.path.join(
settings.terms_folder,
f"Terms_{total_inserted + 1}_{total_inserted + len(chunk)}.parquet",
),
)
total_inserted += len(chunk)
logging.info(f"Downloaded {len(chunk)} rows, total downloaded: {total_inserted}")
logging.info("Finished downloading terms")
|