← 返回首页
client - OPTIMADE Python tools
Skip to content
OPTIMADE Python tools
client
optimade-python-tools
OPTIMADE Python tools

client

This module implements OPTIMADE client functionality for:

  • making web requests to filter and harvest resources from OPTIMADE APIs,
  • query multiple providers simultaneously.

OptimadeClient

This class implemements a client for executing the same queries across multiple OPTIMADE APIs simultaneously, paging and caching the results.

By default, all registered OPTIMADE providers will be queried simulateneously and asynchronously, with the results collected into the all_results attribute, keyed by endpoint, filter and provider.

Source code in optimade/client/client.py
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369
class OptimadeClient: """This class implemements a client for executing the same queries across multiple OPTIMADE APIs simultaneously, paging and caching the results. By default, all registered OPTIMADE providers will be queried simulateneously and asynchronously, with the results collected into the `all_results` attribute, keyed by endpoint, filter and provider. """ base_urls: str | Iterable[str] """A list (or any iterable) of OPTIMADE base URLs to query.""" all_results: dict[str, dict[str, dict[str, QueryResults]]] = defaultdict(dict) """A nested dictionary keyed by endpoint and OPTIMADE filter string that contains the results from each base URL for that particular filter. """ count_results: dict[str, dict[str, dict[str, int | None]]] = defaultdict(dict) """A nested dictionary keyed by endpoint and OPTIMADE filter string that contains the number of results from each base URL for that particular filter. """ max_results_per_provider: int | None = None """Maximum number of results to downlod per provider. If None, will download all. """ property_lists: dict[str, dict[str, list[str]]] = defaultdict(dict) """A dictionary containing list of properties served by each database, broken down by entry type, then database. """ headers: dict = {"User-Agent": f"optimade-python-tools/{__version__}"} """Additional HTTP headers.""" http_timeout: httpx.Timeout = httpx.Timeout(10.0, read=1000.0) """The timeout to use for each HTTP request.""" max_attempts: int """The maximum number of times to repeat a failed query before giving up.""" max_requests_per_provider: int = 2_000_000 """An upper limit guard rail to avoid infinite hanging of the client on malformed APIs. If available, a better value will be estimated for each API based on the total number of entries. """ use_async: bool """Whether or not to make all requests asynchronously using asyncio.""" callbacks: list[Callable[[str, dict], None | dict]] | None = None """A list of callbacks to execute after each successful request, used to e.g., write to a file, add results to a database or perform additional filtering. The callbacks will receive the request URL and the results extracted from the JSON response, with keys 'data', 'meta', 'links', 'errors' and 'included'. Each callback can return a dictionary that can modify the `next_url` with the key `next` and the progress bar with the key `advance_results`. In the case of multiple provided callbacks, only the value returned by the final callback in the stack will be used. """ count_binary_search: bool = True """Enable binary search count for databases that do not support `meta->data_returned`.""" silent: bool """Whether to disable progress bar printing.""" skip_ssl: bool = False """Whether to skip SSL verification.""" _excluded_providers: set[str] | None = None """A set of providers IDs excluded from future queries.""" _included_providers: set[str] | None = None """A set of providers IDs included from future queries.""" _excluded_databases: set[str] | None = None """A set of child database URLs excluded from future queries.""" __current_endpoint: str | None = None """Used internally when querying via `client.structures.get()` to set the chosen endpoint. Should be reset to `None` outside of all `get()` calls.""" _http_client: type[httpx.AsyncClient] | type[requests.Session] | None = None """Override the HTTP client class, primarily used for testing.""" __strict_async: bool = False """Whether or not to fallover if `use_async` is true yet asynchronous mode is impossible due to, e.g., a running event loop. """ _force_binary_search: bool = False """Setting to test binary searches in cases where servers do return the count. """ def __init__( self, base_urls: str | Iterable[str] | None = None, max_results_per_provider: int = 1000, headers: dict | None = None, http_timeout: httpx.Timeout | float | None = None, max_attempts: int = 5, use_async: bool = True, silent: bool = False, exclude_providers: list[str] | None = None, include_providers: list[str] | None = None, exclude_databases: list[str] | None = None, http_client: None | (type[httpx.AsyncClient] | type[requests.Session]) = None, verbosity: int = 0, callbacks: list[Callable[[str, dict], None | dict]] | None = None, skip_ssl: bool = False, ): """Create the OPTIMADE client object. Parameters: base_urls: A list of OPTIMADE base URLs to query. max_results_per_provider: The maximum number of results to download from each provider (-1 or 0 indicate unlimited). headers: Any additional HTTP headers to use for the queries. http_timeout: The timeout to use per request. Defaults to 10 seconds with 1000 seconds for reads specifically. Overriding this value will replace all timeouts (connect, read, write and pool) with this value. max_attempts: The maximum number of times to repeat a failing query. use_async: Whether or not to make all requests asynchronously. exclude_providers: A set or collection of provider IDs to exclude from queries. include_providers: A set or collection of provider IDs to include in queries. exclude_databases: A set or collection of child database URLs to exclude from queries. http_client: An override for the underlying HTTP client, primarily used for testing. callbacks: A list of functions to call after each successful response, see the attribute [`OptimadeClient.callbacks`][optimade.client.client.OptimadeClient.callbacks] docstring for more details. verbosity: The verbosity level of the client. """ self.max_results_per_provider = max_results_per_provider if self.max_results_per_provider in (-1, 0): self.max_results_per_provider = None self._excluded_providers = set(exclude_providers) if exclude_providers else None self._included_providers = set(include_providers) if include_providers else None self._excluded_databases = set(exclude_databases) if exclude_databases else None self.max_attempts = max_attempts self.silent = silent self.verbosity = verbosity self.skip_ssl = skip_ssl self._progress = OptimadeClientProgress() if self.silent: self._progress.disable = True if headers: self.headers.update(headers) if not base_urls: progress = None if not self.silent: progress = OptimadeClientProgress() self.base_urls = list( get_all_databases( exclude_providers=self._excluded_providers, include_providers=self._included_providers, exclude_databases=self._excluded_databases, progress=progress, skip_ssl=self.skip_ssl, ) ) else: if exclude_providers or include_providers or exclude_databases: raise RuntimeError( "Cannot provide both a list of base URLs and included/excluded databases." ) self.base_urls = base_urls if isinstance(self.base_urls, str): self.base_urls = [self.base_urls] self.base_urls = list(self.base_urls) if not self.base_urls: raise SystemExit( "Unable to access any OPTIMADE base URLs. If you believe this is an error, try manually specifying some base URLs." ) if http_timeout: if isinstance(http_timeout, httpx.Timeout): self.http_timeout = http_timeout else: self.http_timeout = httpx.Timeout(http_timeout) self.use_async = use_async if http_client: self._http_client = http_client if issubclass(self._http_client, httpx.AsyncClient): if not self.use_async and self.__strict_async: raise RuntimeError( "Cannot use synchronous mode with an asynchronous HTTP client, please set `use_async=True` or pass an asynchronous HTTP client." ) self.use_async = True elif issubclass(self._http_client, requests.Session): if self.use_async and self.__strict_async: raise RuntimeError( "Cannot use async mode with a synchronous HTTP client, please set `use_async=False` or pass an synchronous HTTP client." ) self.use_async = False else: if use_async: self._http_client = httpx.AsyncClient else: self._http_client = requests.Session self.callbacks = callbacks def __getattribute__(self, name): """Allows entry endpoints to be queried via attribute access, using the allowed list for this module. Should also pass through any `extensions/<example>` endpoints. Any non-entry-endpoint name requested will be passed to the original `__getattribute__`. !!! example ```python from optimade.client import OptimadeClient cli = OptimadeClient() structures = cli.structures.get() references = cli.references.get() info_structures = cli.info.structures.get() ``` """ if name in ENDPOINTS: if self.__current_endpoint == "info": self.__current_endpoint = f"info/{name}" elif self.__current_endpoint == "extensions": self.__current_endpoint = f"extensions/{name}" else: self.__current_endpoint = name return self return super().__getattribute__(name) def get( self, filter: str | None = None, endpoint: str | None = None, response_fields: list[str] | None = None, sort: str | None = None, ) -> dict[str, dict[str, dict[str, dict]]]: """Gets the results from the endpoint and filter across the defined OPTIMADE APIs. Parameters: filter: The OPTIMADE filter string for the query. endpoint: The endpoint to query. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. Raises: RuntimeError: If the query could not be executed. Returns: A nested mapping from endpoint, filter and base URL to the query results. """ if endpoint is None: if self.__current_endpoint is not None: endpoint = self.__current_endpoint self.__current_endpoint = None else: endpoint = "structures" if filter is None: filter = "" self._check_filter(filter, endpoint) with self._progress: if not self.silent: self._progress.print( Panel( f"Performing query [bold yellow]{endpoint}[/bold yellow]/?filter=[bold magenta][i]{filter}[/i][/bold magenta]", expand=False, ) ) results = self._execute_queries( filter, endpoint, response_fields=response_fields, page_limit=None, paginate=True, sort=sort, ) self.all_results[endpoint][filter] = results return {endpoint: {filter: {k: results[k].asdict() for k in results}}} def count( self, filter: str | None = None, endpoint: str | None = None ) -> dict[str, dict[str, dict[str, int | None]]]: """Counts the number of results for the filter, requiring only 1 request per provider by making use of the `meta->data_returned` key. If missing, attempts will be made to perform an exponential/binary search over pagination to count the results. Raises: RuntimeError: If the query could not be executed. Returns: A nested mapping from endpoint, filter and base URL to the number of query results. """ if endpoint is None: if self.__current_endpoint is not None: endpoint = self.__current_endpoint self.__current_endpoint = None else: endpoint = "structures" if filter is None: filter = "" self._progress = OptimadeClientProgress() if self.silent: self._progress.disable = True self._check_filter(filter, endpoint) with self._progress: if not self.silent: self._progress.print( Panel( f"Counting results for [bold yellow]{endpoint}[/bold yellow]/?filter=[bold magenta][i]{filter}[/i][/bold magenta]", expand=False, ) ) results = self._execute_queries( filter, endpoint, page_limit=1, paginate=False, response_fields=[], sort=None, ) count_results = {} for base_url in results: count_results[base_url] = results[base_url].meta.get( "data_returned", None ) if count_results[base_url] is None or self._force_binary_search: if self.count_binary_search: count_results[base_url] = self.binary_search_count( filter, endpoint, base_url, results ) else: self._progress.print( f"Warning: {base_url} did not return a value for `meta->data_returned`, unable to count results. Full response: {results[base_url]}" ) self.count_results[endpoint][filter] = count_results return {endpoint: {filter: count_results}} def binary_search_count( self, filter: str, endpoint: str, base_url: str, results: dict | None = None ) -> int: """In cases where `data_returned` is not available (due to database limitations or otherwise), iteratively probe the final page of results available for a filter using binary search. Note: These queries always happen synchronously across APIs, but can be executed asynchronously within a single API. Parameters: filter: The OPTIMADE filter string for the query. endpoint: The endpoint to query. base_url: The base URL to query. results: The results from a previous query for the first page of results. Returns: The number of results for the filter. """ if self.verbosity: self._progress.print(f"Performing binary search count for {base_url}") if self.use_async: return self._binary_search_count_async(filter, endpoint, base_url, results) else: raise NotImplementedError( "Binary search count is not yet implemented for synchronous queries." ) def _binary_search_count_async( self, filter: str, endpoint: str, base_url: str, result: dict | None = None ) -> int: """Run a series of asynchronously queries on a given API to find the number of results for a filter. Starting with logarithmically spaced page offsets, iteratively probe the final page of results available for a filter. Parameters: filter: The OPTIMADE filter string for the query. endpoint: The endpoint to query. base_url: The base URL to query. result: The results from a previous query for the first page of results. Returns: The number of results for the filter. """ if result is None: # first a check that there are any results at all result = asyncio.run( self.get_one_async( endpoint, filter, base_url, page_limit=1, response_fields=[], paginate=False, ) ) if self.verbosity: self._progress.print("Definitely found results") if not result[base_url].data: return 0 attempts = 0 max_attempts = 100 window, probe = self._update_probe_and_window() while attempts < max_attempts: self._progress.disable = True result = asyncio.run( self.get_one_async( endpoint, filter, base_url, page_limit=1, response_fields=[], paginate=False, other_params={"page_offset": probe}, ) ) # if we got any data, we are below the target value below = bool(result[base_url].data) self._progress.disable = self.silent window, probe = self._update_probe_and_window(window, probe, below) if window[0] == window[1] and window[0] == probe: return probe attempts += 1 if self.verbosity > 2: self._progress.print(f"Binary search debug info: {window=}, {probe=}") else: message = f"Exceeded maximum number of attempts for binary search on {base_url}, {filter=}" self._progress.print(message) raise RuntimeError(message) @staticmethod def _update_probe_and_window( window: tuple[int, int | None] | None = None, last_probe: int | None = None, below: bool | None = None, ) -> tuple[tuple[int, int | None], int]: """Sets the new range, trial value and exit condition for exponential/binary search. When converged, returns the same value three times. Parameters: window: The current window of results. last_probe: The last probe value. below: Whether the last probe was below the target value. Returns: A tuple of the new window and probe value, or the count three times if converged. """ if window is None and last_probe is None: return (1, None), 1_000_000 if window is None or last_probe is None: raise RuntimeError( "Invalid arguments: must provide all or none of window, last_probe and below parameters" ) probe: int = last_probe # Exit condition: find a range of (count, count+1) values # and determine whether the probe was above or below in the last guess if window[1] is not None and window[1] - window[0] == 1: if below: return (window[1], window[1]), window[1] else: return (window[0], window[0]), window[0] # Enclose the real value in the window, with `None` indicating an open boundary if below: window = (last_probe, window[1]) else: window = (window[0], last_probe) # If we've not reached the upper bound yet, try 10x if window[1] is None: probe *= 10 # Otherwise, if we're in the window and the ends of the window now have the same power of 10 (or within +-1), # take the average (102 => 108) => 105 elif abs(math.log10(window[1]) - math.log10(window[0])) <= 1: probe = (window[1] + window[0]) // 2 # otherwise use logarithmic average (10, 1000) => 100 else: probe = int(10 ** ((math.log10(window[1]) + math.log10(window[0])) / 2)) return window, probe def list_properties( self, entry_type: str, ) -> dict[str, list[str]]: """Returns the list of properties reported at `/info/<entry_type>` for the given entry type, for each database. """ self._progress = OptimadeClientProgress() if self.silent: self._progress.disable = True with self._progress: if not self.silent: self._progress.print( Panel( f"Listing properties for [bold yellow]{entry_type}[/bold yellow]", expand=False, ) ) results = self._execute_queries( "", f"info/{entry_type}", paginate=False, page_limit=1, response_fields=[], sort=None, ) self.property_lists = {entry_type: {}} for database in results: self.property_lists[entry_type][database] = list( results[database].data.get("properties", {}).keys() # type: ignore ) return self.property_lists[entry_type] def search_property(self, query: str, entry_type: str) -> dict[str, list[str]]: """Searches for the query substring within the listed properties served by each database. Parameters: query: The substring to search for. entry_type: The entry type to query. Returns: A nested dictionary of matching property lists, arranged by entry type and database. """ if not self.property_lists: self.list_properties(entry_type=entry_type) matching_properties: dict[str, dict[str, list[str]]] = { entry_type: defaultdict(list) } if entry_type in self.property_lists: for database in self.property_lists[entry_type]: for property in self.property_lists[entry_type][database]: if query in property: matching_properties[entry_type][database].append(property) return matching_properties[entry_type] def _execute_queries( self, filter: str, endpoint: str, page_limit: int | None, paginate: bool, response_fields: list[str] | None, sort: str | None, ) -> dict[str, QueryResults]: """Executes the queries over the base URLs either asynchronously or serially, depending on the `self.use_async` setting. Parameters: filter: The OPTIMADE filter string. endpoint: The OPTIMADE endpoint to query. page_limit: A page limit to enforce for each query (used in conjunction with `paginate`). paginate: Whether to pull all pages of results (up to the value of `max_results_per_provider`) or whether to return after one page. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. Returns: A mapping from base URL to `QueryResults` for each queried API. """ if self.use_async: # Check for a pre-existing event loop (e.g. within a Jupyter notebook) # and use it if present try: event_loop = asyncio.get_running_loop() if event_loop: if self.__strict_async: raise RuntimeError( "Detected a running event loop, cannot run in async mode." ) self._progress.print( "Detected a running event loop (e.g., Jupyter). Attempting to switch to synchronous mode." ) self.use_async = False self._http_client = requests.Session except RuntimeError: event_loop = None if self.use_async and not event_loop: return asyncio.run( self._get_all_async( endpoint, filter, page_limit=page_limit, paginate=paginate, response_fields=response_fields, sort=sort, ) ) return self._get_all( endpoint, filter, page_limit=page_limit, paginate=paginate, response_fields=response_fields, sort=sort, ) def get_one( self, endpoint: str, filter: str, base_url: str, response_fields: list[str] | None = None, sort: str | None = None, page_limit: int | None = None, paginate: bool = True, other_params: dict[str, Any] | None = None, override_url: str | None = None, ) -> dict[str, QueryResults]: """Executes the query synchronously on one API. Parameters: endpoint: The OPTIMADE endpoint to query. filter: The OPTIMADE filter string. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. page_limit: A page limit to enforce for each query (used in conjunction with `paginate`). paginate: Whether to pull all pages of results (up to the value of `max_results_per_provider`) or whether to return after one page. other_params: Any other parameters to pass to the server. override_url: Allow overriding the URL for the request, e.g., when doing pagination externally. Returns: A dictionary mapping from base URL to the results of the query. """ try: return self._get_one( endpoint, filter, base_url, page_limit=page_limit, paginate=paginate, response_fields=response_fields, sort=sort, other_params=other_params, override_url=override_url, ) except Exception as exc: error_query_results = QueryResults() error_query_results.errors = [ f"{exc.__class__.__name__}: {str(exc.args[0])}" ] self._progress.print( f"[red]Error[/red]: Provider {str(base_url)!r} returned: [red i]{exc}[/red i]" ) return {base_url: error_query_results} async def _get_all_async( self, endpoint: str, filter: str, response_fields: list[str] | None = None, sort: str | None = None, page_limit: int | None = None, paginate: bool = True, base_urls: Iterable[str] | None = None, other_params: dict[str, Any] | None = None, ) -> dict[str, QueryResults]: """Executes the query asynchronously across all defined APIs. Parameters: endpoint: The OPTIMADE endpoint to query. filter: The OPTIMADE filter string. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. page_limit: A page limit to enforce for each query (used in conjunction with `paginate`). paginate: Whether to pull all pages of results (up to the value of `max_results_per_provider`) or whether to return after one page. base_urls: A list of base URLs to query (defaults to `self.base_urls`). other_params: Any other parameters to pass to the server. Returns: A dictionary mapping from base URL to the results of the query. """ if not base_urls: base_urls = self.base_urls results = await asyncio.gather( *[ self.get_one_async( endpoint, filter, base_url, page_limit=page_limit, paginate=paginate, response_fields=response_fields, sort=sort, other_params=other_params, ) for base_url in base_urls ] ) return functools.reduce(lambda r1, r2: {**r1, **r2}, results) def _get_all( self, endpoint: str, filter: str, page_limit: int | None = None, response_fields: list[str] | None = None, sort: str | None = None, paginate: bool = True, base_urls: Iterable[str] | None = None, other_params: dict[str, Any] | None = None, ) -> dict[str, QueryResults]: """Executes the query synchronously across all defined APIs. Parameters: endpoint: The OPTIMADE endpoint to query. filter: The OPTIMADE filter string. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. page_limit: A page limit to enforce for each query (used in conjunction with `paginate`). paginate: Whether to pull all pages of results (up to the value of `max_results_per_provider`) or whether to return after one page. base_urls: A list of base URLs to query (defaults to `self.base_urls`). other_params: Any other parameters to pass to the server. Returns: A dictionary mapping from base URL to the results of the query. """ if not base_urls: base_urls = self.base_urls results = [ self.get_one( endpoint, filter, base_url, page_limit=page_limit, paginate=paginate, response_fields=response_fields, sort=sort, other_params=other_params, ) for base_url in base_urls ] if results: return functools.reduce(lambda r1, r2: {**r1, **r2}, results) return {} async def get_one_async( self, endpoint: str, filter: str, base_url: str, response_fields: list[str] | None = None, sort: str | None = None, page_limit: int | None = None, paginate: bool = True, other_params: dict[str, Any] | None = None, override_url: str | None = None, ) -> dict[str, QueryResults]: """Executes the query asynchronously on one API. !!! note This method currently makes non-blocking requests to a single API, but these requests are executed serially on that API, i.e., results are pulled one page at a time, but requests will not block other async requests to other APIs. Parameters: endpoint: The OPTIMADE endpoint to query. filter: The OPTIMADE filter string. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. page_limit: A page limit to enforce for each query (used in conjunction with `paginate`). paginate: Whether to pull all pages of results (up to the value of `max_results_per_provider`) or whether to return after one page. other_params: Any other parameters to pass to the server. override_url: Allow overriding the URL for the request, e.g., when doing pagination externally. Returns: A dictionary mapping from base URL to the results of the query. """ try: return await self._get_one_async( endpoint, filter, base_url, page_limit=page_limit, paginate=paginate, response_fields=response_fields, sort=sort, other_params=other_params, override_url=override_url, ) except Exception as exc: error_query_results = QueryResults() error_query_results.errors = [ f"{exc.__class__.__name__}: {str(exc.args[0])}" ] self._progress.print( f"[red]Error[/red]: Provider {str(base_url)!r} returned: [red i]{error_query_results.errors}[/red i]" ) return {base_url: error_query_results} async def _get_one_async( self, endpoint: str, filter: str, base_url: str, response_fields: list[str] | None = None, sort: str | None = None, page_limit: int | None = None, paginate: bool = True, other_params: dict[str, Any] | None = None, override_url: str | None = None, ) -> dict[str, QueryResults]: """See [`OptimadeClient.get_one_async`][optimade.client.OptimadeClient.get_one_async].""" next_url, _task = self._setup( endpoint=endpoint, base_url=base_url, filter=filter, page_limit=page_limit, response_fields=response_fields, sort=sort, other_params=other_params, ) if override_url: next_url = override_url request_delay: float | None = None results = QueryResults() number_of_requests = 0 total_data_available: int | None = None try: async with self._http_client(headers=self.headers) as client: # type: ignore[union-attr,call-arg,misc] while next_url: number_of_requests += 1 attempts = 0 try: if self.verbosity: self._progress.print( f"Making request to {next_url!r} {attempts=}" ) r = await client.get( next_url, follow_redirects=True, timeout=self.http_timeout ) page_results, next_url = self._handle_response(r, _task) request_delay = page_results["meta"].get("request_delay", None) # Don't wait any longer than 5 seconds if request_delay: request_delay = min(request_delay, 5) # Compute the upper limit guard rail on pagination requests based on the number of entries in the entire db # and the chosen page limit if total_data_available is None: total_data_available = page_results["meta"].get( "data_available", 0 ) page_limit = len(page_results["data"]) if page_limit == 0: page_limit = 1 if total_data_available and total_data_available > 0: stopping_criteria = min( math.ceil(total_data_available / page_limit), self.max_requests_per_provider, ) else: stopping_criteria = self.max_results_per_provider except RecoverableHTTPError: attempts += 1 if attempts > self.max_attempts: raise RuntimeError( f"Exceeded maximum number of retries for {next_url}" ) await asyncio.sleep(request_delay or 1) continue results.update(page_results) if not paginate: break if len(results.data) == 0 or number_of_requests > stopping_criteria: if next_url: message = f"Detected potential infinite loop for {base_url} (more than {stopping_criteria=} requests made). Stopping download." results.errors.append(message) if not self.silent: self._progress.print(message) break if ( self.max_results_per_provider and len(results.data) >= self.max_results_per_provider ): if not self.silent: self._progress.print( f"Reached {len(results.data)} results for {base_url}, exceeding `max_results_per_provider` parameter ({self.max_results_per_provider}). Stopping download." ) break return {str(base_url): results} finally: self._teardown(_task, len(results.data)) def _get_one( self, endpoint: str, filter: str, base_url: str, sort: str | None = None, page_limit: int | None = None, response_fields: list[str] | None = None, paginate: bool = True, other_params: dict[str, Any] | None = None, override_url: str | None = None, ) -> dict[str, QueryResults]: """See [`OptimadeClient.get_one`][optimade.client.OptimadeClient.get_one].""" next_url, _task = self._setup( endpoint=endpoint, base_url=base_url, filter=filter, page_limit=page_limit, response_fields=response_fields, sort=sort, other_params=other_params, ) if override_url: next_url = override_url request_delay: float | None = None results = QueryResults() number_of_requests: int = 0 total_data_available: int | None = None try: with self._http_client() as client: # type: ignore[misc] client.headers.update(self.headers) if isinstance(client, requests.Session): # Convert configured httpx timeout to requests-style tuple timeout = (self.http_timeout.connect, self.http_timeout.read) while next_url: number_of_requests += 1 attempts = 0 try: if self.verbosity: self._progress.print( f"Making request to {next_url!r} {attempts=}" ) r = client.get(next_url, timeout=timeout) page_results, next_url = self._handle_response(r, _task) # Compute the upper limit guard rail on pagination requests based on the number of entries in the entire db # and the chosen page limit if total_data_available is None: total_data_available = page_results["meta"].get( "data_available", 0 ) page_limit = len(page_results["data"]) if page_limit == 0: page_limit = 1 if total_data_available and total_data_available > 0: stopping_criteria = min( math.ceil(total_data_available / page_limit), self.max_requests_per_provider, ) else: stopping_criteria = self.max_results_per_provider request_delay = page_results["meta"].get("request_delay", None) # Don't wait any longer than 5 seconds if request_delay: request_delay = min(request_delay, 5) except RecoverableHTTPError: attempts += 1 if attempts > self.max_attempts: raise RuntimeError( f"Exceeded maximum number of retries for {next_url}" ) time.sleep(request_delay or 1) continue results.update(page_results) if len(results.data) == 0 or number_of_requests > stopping_criteria: if next_url: message = f"Detected potential infinite loop for {base_url} (more than {stopping_criteria=} requests made). Stopping download." results.errors.append(message) if not self.silent: self._progress.print(message) break if ( self.max_results_per_provider and len(results.data) >= self.max_results_per_provider ): if not self.silent: self._progress.print( f"Reached {len(results.data)} results for {base_url}, exceeding `max_results_per_provider` parameter ({self.max_results_per_provider}). Stopping download." ) break if not paginate: break return {str(base_url): results} finally: self._teardown(_task, len(results.data)) def _setup( self, endpoint: str, base_url: str, filter: str, page_limit: int | None, response_fields: list[str] | None, sort: str | None, other_params: dict[str, Any] | None = None, ) -> tuple[str, TaskID]: """Constructs the first query URL and creates the progress bar task. Returns: The URL for the first query and the Rich TaskID for progress logging. """ url = self._build_url( base_url=base_url, endpoint=endpoint, filter=filter, page_limit=page_limit, response_fields=response_fields, sort=sort, other_params=other_params, ) parsed_url = urlparse(url) _task = self._progress.add_task( description=parsed_url.netloc + parsed_url.path, total=None, ) return url, _task def _build_url( self, base_url: str, endpoint: str | None = "structures", version: str | None = None, filter: str | None = None, response_fields: list[str] | None = None, sort: str | None = None, page_limit: int | None = None, other_params: dict[str, Any] | None = None, ) -> str: """Builds the URL to query based on the passed parameters. Parameters: base_url: The server's base URL. endpoint: The endpoint to query. version: The OPTIMADE version string. filter: The filter to apply to the endpoint. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. page_limit: The page limit for an individual request. other_params: Any other parameters to pass to the server. Returns: The overall query URL, including parameters. """ if not version: version = f"v{__api_version__.split('.')[0]}" while base_url.endswith("/"): base_url = base_url[:-1] url = f"{base_url}/{version}/{endpoint}" params_dict: dict[str, str] = {} if filter: params_dict["filter"] = f"filter={filter}" if response_fields is not None: # If we have requested no response fields (e.g., in the case of --count) then just ask for IDs if len(response_fields) == 0: params_dict["response_fields"] = "response_fields=id" else: params_dict["response_fields"] = ( f"response_fields={','.join(response_fields)}" ) if page_limit: params_dict["page_limit"] = f"page_limit={page_limit}" if sort: params_dict["sort"] = f"sort={sort}" if other_params: for p in other_params: params_dict[p] = f"{p}={other_params[p]}" params = "&".join(p for p in params_dict.values() if p) if params: url += f"?{params}" return url def _check_filter(self, filter: str, endpoint: str) -> None: """Passes the filter through [`LarkParser`][optimade.filterparser.LarkParser] from the optimade-python-tools reference server implementation. Parameters: filter: The filter string. endpoint: The endpoint being queried. If this endpoint is not "known" to OPTIMADE, the filter will automatically pass. Raises: RuntimeError: If the filter cannot be parsed. """ try: if endpoint in ENDPOINTS: parser = LarkParser() parser.parse(filter) except BadRequest as exc: self._progress.print( f"[bold red]Filter [blue i]{filter!r}[/blue i] could not be parsed as an OPTIMADE filter.[/bold red]", Panel(f"[magenta]{exc}[/magenta]"), ) with silent_raise(): raise RuntimeError(exc) from None def _handle_response( self, response: httpx.Response | requests.Response, _task: TaskID ) -> tuple[dict[str, Any], str]: """Handle the response from the server. Parameters: response: The response from the server. _task: The Rich TaskID for this task's progressbar. Returns: A dictionary containing the results, and a link to the next page, if it exists. """ # Handle error statuses if response.status_code == 429: raise TooManyRequestsException(response.content) if response.status_code != 200: try: errors = response.json().get("errors") error_message = "\n".join( [f"{error['title']}: {error['detail']}" for error in errors] ) except Exception: error_message = str(response.content) raise RuntimeError( f"{response.status_code} - {response.url}: {error_message}" ) try: r = response.json() except json.JSONDecodeError as exc: raise RuntimeError( f"Could not decode response as JSON: {response.content!r}" ) from exc # Accumulate results with correct empty containers if missing results = { "data": r.get("data", []), "meta": r.get("meta", {}), "links": r.get("links", {}), "included": r.get("included", []), "errors": r.get("errors", []), } callback_response = None if self.callbacks: callback_response = self._execute_callbacks(results, response) callback_response = callback_response or {} # Advance the progress bar for this provider self._progress.update( _task, advance=callback_response.get("advance_results", len(results["data"])), total=results["meta"].get("data_returned", None), ) next_url = callback_response.get("next") or results["links"].get("next", None) if isinstance(next_url, dict): next_url = next_url["href"] return results, next_url def _teardown(self, _task: TaskID, num_results: int) -> None: """Update the finished status of the progress bar depending on the number of results. Parameters: _task: The Rich TaskID for this task's progressbar. num_results: The number of data entries returned. """ if num_results == 0: self._progress.update(_task, total=None, finished=False, complete=True) else: self._progress.update( _task, total=num_results, finished=True, complete=True ) def _execute_callbacks( self, results: dict, response: httpx.Response | requests.Response ) -> None | dict: """Execute any callbacks registered with the client. Parameters: results: The results from the query. response: The full response from the server. Returns: Either `None` or the string value returned from the *final* callback. """ request_url = str(response.request.url) if not self.callbacks: return None for callback in self.callbacks: cb_response = callback(request_url, results) return cb_response

__current_endpoint = None class-attribute instance-attribute

Used internally when querying via client.structures.get() to set the chosen endpoint. Should be reset to None outside of all get() calls.

__strict_async = False class-attribute instance-attribute

Whether or not to fallover if use_async is true yet asynchronous mode is impossible due to, e.g., a running event loop.

all_results = defaultdict(dict) class-attribute instance-attribute

A nested dictionary keyed by endpoint and OPTIMADE filter string that contains the results from each base URL for that particular filter.

base_urls instance-attribute

A list (or any iterable) of OPTIMADE base URLs to query.

callbacks = callbacks class-attribute instance-attribute

A list of callbacks to execute after each successful request, used to e.g., write to a file, add results to a database or perform additional filtering.

The callbacks will receive the request URL and the results extracted from the JSON response, with keys 'data', 'meta', 'links', 'errors' and 'included'.

Each callback can return a dictionary that can modify the next_url with the key next and the progress bar with the key advance_results. In the case of multiple provided callbacks, only the value returned by the final callback in the stack will be used.

Enable binary search count for databases that do not support meta->data_returned.

count_results = defaultdict(dict) class-attribute instance-attribute

A nested dictionary keyed by endpoint and OPTIMADE filter string that contains the number of results from each base URL for that particular filter.

headers = {'User-Agent': f'optimade-python-tools/{__version__}'} class-attribute instance-attribute

Additional HTTP headers.

http_timeout = httpx.Timeout(10.0, read=1000.0) class-attribute instance-attribute

The timeout to use for each HTTP request.

max_attempts = max_attempts instance-attribute

The maximum number of times to repeat a failed query before giving up.

max_requests_per_provider = 2000000 class-attribute instance-attribute

An upper limit guard rail to avoid infinite hanging of the client on malformed APIs. If available, a better value will be estimated for each API based on the total number of entries.

max_results_per_provider = max_results_per_provider class-attribute instance-attribute

Maximum number of results to downlod per provider. If None, will download all.

property_lists = defaultdict(dict) class-attribute instance-attribute

A dictionary containing list of properties served by each database, broken down by entry type, then database.

silent = silent instance-attribute

Whether to disable progress bar printing.

skip_ssl = skip_ssl class-attribute instance-attribute

Whether to skip SSL verification.

use_async = use_async instance-attribute

Whether or not to make all requests asynchronously using asyncio.

__getattribute__(name)

Allows entry endpoints to be queried via attribute access, using the allowed list for this module.

Should also pass through any extensions/<example> endpoints.

Any non-entry-endpoint name requested will be passed to the original __getattribute__.

Example

from optimade.client import OptimadeClient cli = OptimadeClient() structures = cli.structures.get() references = cli.references.get() info_structures = cli.info.structures.get()
Source code in optimade/client/client.py
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
def __getattribute__(self, name): """Allows entry endpoints to be queried via attribute access, using the allowed list for this module. Should also pass through any `extensions/<example>` endpoints. Any non-entry-endpoint name requested will be passed to the original `__getattribute__`. !!! example ```python from optimade.client import OptimadeClient cli = OptimadeClient() structures = cli.structures.get() references = cli.references.get() info_structures = cli.info.structures.get() ``` """ if name in ENDPOINTS: if self.__current_endpoint == "info": self.__current_endpoint = f"info/{name}" elif self.__current_endpoint == "extensions": self.__current_endpoint = f"extensions/{name}" else: self.__current_endpoint = name return self return super().__getattribute__(name)

__init__(base_urls=None, max_results_per_provider=1000, headers=None, http_timeout=None, max_attempts=5, use_async=True, silent=False, exclude_providers=None, include_providers=None, exclude_databases=None, http_client=None, verbosity=0, callbacks=None, skip_ssl=False)

Create the OPTIMADE client object.

Parameters:

Name Type Description Default
base_urls str | Iterable[str] | None

A list of OPTIMADE base URLs to query.

None
max_results_per_provider int

The maximum number of results to download from each provider (-1 or 0 indicate unlimited).

1000
headers dict | None

Any additional HTTP headers to use for the queries.

None
http_timeout Timeout | float | None

The timeout to use per request. Defaults to 10 seconds with 1000 seconds for reads specifically. Overriding this value will replace all timeouts (connect, read, write and pool) with this value.

None
max_attempts int

The maximum number of times to repeat a failing query.

5
use_async bool

Whether or not to make all requests asynchronously.

True
exclude_providers list[str] | None

A set or collection of provider IDs to exclude from queries.

None
include_providers list[str] | None

A set or collection of provider IDs to include in queries.

None
exclude_databases list[str] | None

A set or collection of child database URLs to exclude from queries.

None
http_client None | (type[AsyncClient] | type[Session])

An override for the underlying HTTP client, primarily used for testing.

None
callbacks list[Callable[[str, dict], None | dict]] | None

A list of functions to call after each successful response, see the attribute OptimadeClient.callbacks docstring for more details.

None
verbosity int

The verbosity level of the client.

0
Source code in optimade/client/client.py
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
def __init__( self, base_urls: str | Iterable[str] | None = None, max_results_per_provider: int = 1000, headers: dict | None = None, http_timeout: httpx.Timeout | float | None = None, max_attempts: int = 5, use_async: bool = True, silent: bool = False, exclude_providers: list[str] | None = None, include_providers: list[str] | None = None, exclude_databases: list[str] | None = None, http_client: None | (type[httpx.AsyncClient] | type[requests.Session]) = None, verbosity: int = 0, callbacks: list[Callable[[str, dict], None | dict]] | None = None, skip_ssl: bool = False, ): """Create the OPTIMADE client object. Parameters: base_urls: A list of OPTIMADE base URLs to query. max_results_per_provider: The maximum number of results to download from each provider (-1 or 0 indicate unlimited). headers: Any additional HTTP headers to use for the queries. http_timeout: The timeout to use per request. Defaults to 10 seconds with 1000 seconds for reads specifically. Overriding this value will replace all timeouts (connect, read, write and pool) with this value. max_attempts: The maximum number of times to repeat a failing query. use_async: Whether or not to make all requests asynchronously. exclude_providers: A set or collection of provider IDs to exclude from queries. include_providers: A set or collection of provider IDs to include in queries. exclude_databases: A set or collection of child database URLs to exclude from queries. http_client: An override for the underlying HTTP client, primarily used for testing. callbacks: A list of functions to call after each successful response, see the attribute [`OptimadeClient.callbacks`][optimade.client.client.OptimadeClient.callbacks] docstring for more details. verbosity: The verbosity level of the client. """ self.max_results_per_provider = max_results_per_provider if self.max_results_per_provider in (-1, 0): self.max_results_per_provider = None self._excluded_providers = set(exclude_providers) if exclude_providers else None self._included_providers = set(include_providers) if include_providers else None self._excluded_databases = set(exclude_databases) if exclude_databases else None self.max_attempts = max_attempts self.silent = silent self.verbosity = verbosity self.skip_ssl = skip_ssl self._progress = OptimadeClientProgress() if self.silent: self._progress.disable = True if headers: self.headers.update(headers) if not base_urls: progress = None if not self.silent: progress = OptimadeClientProgress() self.base_urls = list( get_all_databases( exclude_providers=self._excluded_providers, include_providers=self._included_providers, exclude_databases=self._excluded_databases, progress=progress, skip_ssl=self.skip_ssl, ) ) else: if exclude_providers or include_providers or exclude_databases: raise RuntimeError( "Cannot provide both a list of base URLs and included/excluded databases." ) self.base_urls = base_urls if isinstance(self.base_urls, str): self.base_urls = [self.base_urls] self.base_urls = list(self.base_urls) if not self.base_urls: raise SystemExit( "Unable to access any OPTIMADE base URLs. If you believe this is an error, try manually specifying some base URLs." ) if http_timeout: if isinstance(http_timeout, httpx.Timeout): self.http_timeout = http_timeout else: self.http_timeout = httpx.Timeout(http_timeout) self.use_async = use_async if http_client: self._http_client = http_client if issubclass(self._http_client, httpx.AsyncClient): if not self.use_async and self.__strict_async: raise RuntimeError( "Cannot use synchronous mode with an asynchronous HTTP client, please set `use_async=True` or pass an asynchronous HTTP client." ) self.use_async = True elif issubclass(self._http_client, requests.Session): if self.use_async and self.__strict_async: raise RuntimeError( "Cannot use async mode with a synchronous HTTP client, please set `use_async=False` or pass an synchronous HTTP client." ) self.use_async = False else: if use_async: self._http_client = httpx.AsyncClient else: self._http_client = requests.Session self.callbacks = callbacks

binary_search_count(filter, endpoint, base_url, results=None)

In cases where data_returned is not available (due to database limitations or otherwise), iteratively probe the final page of results available for a filter using binary search.

Note: These queries always happen synchronously across APIs, but can be executed asynchronously within a single API.

Parameters:

Name Type Description Default
filter str

The OPTIMADE filter string for the query.

required
endpoint str

The endpoint to query.

required
base_url str

The base URL to query.

required
results dict | None

The results from a previous query for the first page of results.

None

Returns:

Type Description
int

The number of results for the filter.

Source code in optimade/client/client.py
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
def binary_search_count( self, filter: str, endpoint: str, base_url: str, results: dict | None = None ) -> int: """In cases where `data_returned` is not available (due to database limitations or otherwise), iteratively probe the final page of results available for a filter using binary search. Note: These queries always happen synchronously across APIs, but can be executed asynchronously within a single API. Parameters: filter: The OPTIMADE filter string for the query. endpoint: The endpoint to query. base_url: The base URL to query. results: The results from a previous query for the first page of results. Returns: The number of results for the filter. """ if self.verbosity: self._progress.print(f"Performing binary search count for {base_url}") if self.use_async: return self._binary_search_count_async(filter, endpoint, base_url, results) else: raise NotImplementedError( "Binary search count is not yet implemented for synchronous queries." )

count(filter=None, endpoint=None)

Counts the number of results for the filter, requiring only 1 request per provider by making use of the meta->data_returned key. If missing, attempts will be made to perform an exponential/binary search over pagination to count the results.

Raises:

Type Description
RuntimeError

If the query could not be executed.

Returns:

Type Description
dict[str, dict[str, dict[str, int | None]]]

A nested mapping from endpoint, filter and base URL to the number of query results.

Source code in optimade/client/client.py
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
def count( self, filter: str | None = None, endpoint: str | None = None ) -> dict[str, dict[str, dict[str, int | None]]]: """Counts the number of results for the filter, requiring only 1 request per provider by making use of the `meta->data_returned` key. If missing, attempts will be made to perform an exponential/binary search over pagination to count the results. Raises: RuntimeError: If the query could not be executed. Returns: A nested mapping from endpoint, filter and base URL to the number of query results. """ if endpoint is None: if self.__current_endpoint is not None: endpoint = self.__current_endpoint self.__current_endpoint = None else: endpoint = "structures" if filter is None: filter = "" self._progress = OptimadeClientProgress() if self.silent: self._progress.disable = True self._check_filter(filter, endpoint) with self._progress: if not self.silent: self._progress.print( Panel( f"Counting results for [bold yellow]{endpoint}[/bold yellow]/?filter=[bold magenta][i]{filter}[/i][/bold magenta]", expand=False, ) ) results = self._execute_queries( filter, endpoint, page_limit=1, paginate=False, response_fields=[], sort=None, ) count_results = {} for base_url in results: count_results[base_url] = results[base_url].meta.get( "data_returned", None ) if count_results[base_url] is None or self._force_binary_search: if self.count_binary_search: count_results[base_url] = self.binary_search_count( filter, endpoint, base_url, results ) else: self._progress.print( f"Warning: {base_url} did not return a value for `meta->data_returned`, unable to count results. Full response: {results[base_url]}" ) self.count_results[endpoint][filter] = count_results return {endpoint: {filter: count_results}}

get(filter=None, endpoint=None, response_fields=None, sort=None)

Gets the results from the endpoint and filter across the defined OPTIMADE APIs.

Parameters:

Name Type Description Default
filter str | None

The OPTIMADE filter string for the query.

None
endpoint str | None

The endpoint to query.

None
response_fields list[str] | None

A list of response fields to request from the server.

None
sort str | None

The field by which to sort the results.

None

Raises:

Type Description
RuntimeError

If the query could not be executed.

Returns:

Type Description
dict[str, dict[str, dict[str, dict]]]

A nested mapping from endpoint, filter and base URL to the query results.

Source code in optimade/client/client.py
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
def get( self, filter: str | None = None, endpoint: str | None = None, response_fields: list[str] | None = None, sort: str | None = None, ) -> dict[str, dict[str, dict[str, dict]]]: """Gets the results from the endpoint and filter across the defined OPTIMADE APIs. Parameters: filter: The OPTIMADE filter string for the query. endpoint: The endpoint to query. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. Raises: RuntimeError: If the query could not be executed. Returns: A nested mapping from endpoint, filter and base URL to the query results. """ if endpoint is None: if self.__current_endpoint is not None: endpoint = self.__current_endpoint self.__current_endpoint = None else: endpoint = "structures" if filter is None: filter = "" self._check_filter(filter, endpoint) with self._progress: if not self.silent: self._progress.print( Panel( f"Performing query [bold yellow]{endpoint}[/bold yellow]/?filter=[bold magenta][i]{filter}[/i][/bold magenta]", expand=False, ) ) results = self._execute_queries( filter, endpoint, response_fields=response_fields, page_limit=None, paginate=True, sort=sort, ) self.all_results[endpoint][filter] = results return {endpoint: {filter: {k: results[k].asdict() for k in results}}}

get_one(endpoint, filter, base_url, response_fields=None, sort=None, page_limit=None, paginate=True, other_params=None, override_url=None)

Executes the query synchronously on one API.

Parameters:

Name Type Description Default
endpoint str

The OPTIMADE endpoint to query.

required
filter str

The OPTIMADE filter string.

required
response_fields list[str] | None

A list of response fields to request from the server.

None
sort str | None

The field by which to sort the results.

None
page_limit int | None

A page limit to enforce for each query (used in conjunction with paginate).

None
paginate bool

Whether to pull all pages of results (up to the value of max_results_per_provider) or whether to return after one page.

True
other_params dict[str, Any] | None

Any other parameters to pass to the server.

None
override_url str | None

Allow overriding the URL for the request, e.g., when doing pagination externally.

None

Returns:

Type Description
dict[str, QueryResults]

A dictionary mapping from base URL to the results of the query.

Source code in optimade/client/client.py
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
def get_one( self, endpoint: str, filter: str, base_url: str, response_fields: list[str] | None = None, sort: str | None = None, page_limit: int | None = None, paginate: bool = True, other_params: dict[str, Any] | None = None, override_url: str | None = None, ) -> dict[str, QueryResults]: """Executes the query synchronously on one API. Parameters: endpoint: The OPTIMADE endpoint to query. filter: The OPTIMADE filter string. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. page_limit: A page limit to enforce for each query (used in conjunction with `paginate`). paginate: Whether to pull all pages of results (up to the value of `max_results_per_provider`) or whether to return after one page. other_params: Any other parameters to pass to the server. override_url: Allow overriding the URL for the request, e.g., when doing pagination externally. Returns: A dictionary mapping from base URL to the results of the query. """ try: return self._get_one( endpoint, filter, base_url, page_limit=page_limit, paginate=paginate, response_fields=response_fields, sort=sort, other_params=other_params, override_url=override_url, ) except Exception as exc: error_query_results = QueryResults() error_query_results.errors = [ f"{exc.__class__.__name__}: {str(exc.args[0])}" ] self._progress.print( f"[red]Error[/red]: Provider {str(base_url)!r} returned: [red i]{exc}[/red i]" ) return {base_url: error_query_results}

get_one_async(endpoint, filter, base_url, response_fields=None, sort=None, page_limit=None, paginate=True, other_params=None, override_url=None) async

Executes the query asynchronously on one API.

Note

This method currently makes non-blocking requests to a single API, but these requests are executed serially on that API, i.e., results are pulled one page at a time, but requests will not block other async requests to other APIs.

Parameters:

Name Type Description Default
endpoint str

The OPTIMADE endpoint to query.

required
filter str

The OPTIMADE filter string.

required
response_fields list[str] | None

A list of response fields to request from the server.

None
sort str | None

The field by which to sort the results.

None
page_limit int | None

A page limit to enforce for each query (used in conjunction with paginate).

None
paginate bool

Whether to pull all pages of results (up to the value of max_results_per_provider) or whether to return after one page.

True
other_params dict[str, Any] | None

Any other parameters to pass to the server.

None
override_url str | None

Allow overriding the URL for the request, e.g., when doing pagination externally.

None

Returns:

Type Description
dict[str, QueryResults]

A dictionary mapping from base URL to the results of the query.

Source code in optimade/client/client.py
875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935
async def get_one_async( self, endpoint: str, filter: str, base_url: str, response_fields: list[str] | None = None, sort: str | None = None, page_limit: int | None = None, paginate: bool = True, other_params: dict[str, Any] | None = None, override_url: str | None = None, ) -> dict[str, QueryResults]: """Executes the query asynchronously on one API. !!! note This method currently makes non-blocking requests to a single API, but these requests are executed serially on that API, i.e., results are pulled one page at a time, but requests will not block other async requests to other APIs. Parameters: endpoint: The OPTIMADE endpoint to query. filter: The OPTIMADE filter string. response_fields: A list of response fields to request from the server. sort: The field by which to sort the results. page_limit: A page limit to enforce for each query (used in conjunction with `paginate`). paginate: Whether to pull all pages of results (up to the value of `max_results_per_provider`) or whether to return after one page. other_params: Any other parameters to pass to the server. override_url: Allow overriding the URL for the request, e.g., when doing pagination externally. Returns: A dictionary mapping from base URL to the results of the query. """ try: return await self._get_one_async( endpoint, filter, base_url, page_limit=page_limit, paginate=paginate, response_fields=response_fields, sort=sort, other_params=other_params, override_url=override_url, ) except Exception as exc: error_query_results = QueryResults() error_query_results.errors = [ f"{exc.__class__.__name__}: {str(exc.args[0])}" ] self._progress.print( f"[red]Error[/red]: Provider {str(base_url)!r} returned: [red i]{error_query_results.errors}[/red i]" ) return {base_url: error_query_results}

list_properties(entry_type)

Returns the list of properties reported at /info/<entry_type> for the given entry type, for each database.

Source code in optimade/client/client.py
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
def list_properties( self, entry_type: str, ) -> dict[str, list[str]]: """Returns the list of properties reported at `/info/<entry_type>` for the given entry type, for each database. """ self._progress = OptimadeClientProgress() if self.silent: self._progress.disable = True with self._progress: if not self.silent: self._progress.print( Panel( f"Listing properties for [bold yellow]{entry_type}[/bold yellow]", expand=False, ) ) results = self._execute_queries( "", f"info/{entry_type}", paginate=False, page_limit=1, response_fields=[], sort=None, ) self.property_lists = {entry_type: {}} for database in results: self.property_lists[entry_type][database] = list( results[database].data.get("properties", {}).keys() # type: ignore ) return self.property_lists[entry_type]

search_property(query, entry_type)

Searches for the query substring within the listed properties served by each database.

Parameters:

Name Type Description Default
query str

The substring to search for.

required
entry_type str

The entry type to query.

required

Returns:

Type Description
dict[str, list[str]]

A nested dictionary of matching property lists, arranged by

dict[str, list[str]]

entry type and database.

Source code in optimade/client/client.py
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
def search_property(self, query: str, entry_type: str) -> dict[str, list[str]]: """Searches for the query substring within the listed properties served by each database. Parameters: query: The substring to search for. entry_type: The entry type to query. Returns: A nested dictionary of matching property lists, arranged by entry type and database. """ if not self.property_lists: self.list_properties(entry_type=entry_type) matching_properties: dict[str, dict[str, list[str]]] = { entry_type: defaultdict(list) } if entry_type in self.property_lists: for database in self.property_lists[entry_type]: for property in self.property_lists[entry_type][database]: if query in property: matching_properties[entry_type][database].append(property) return matching_properties[entry_type]