Coverage for src/chainalysis/sql/analytical.py: 30%
89 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-20 16:53 -0400
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-20 16:53 -0400
1from time import sleep
3import pandas as pd
5from chainalysis.constants import ANALYTICAL_ENDPOINTS, BASE_URL
6from chainalysis.util_functions.exceptions import (
7 BadRequest,
8 DataSolutionsAPIException,
9 DataSolutionsSDKException,
10 UnhandledException,
11)
12from chainalysis.util_functions.requests import issue_request
15class Analytical:
16 """
17 This class provides methods to execute SQL queries on Data Solutions
18 DataBricks tables. It supports fetching results as JSON or a
19 pandas DataFrame, and provides query execution statistics.
20 """
22 def __init__(self, api_key: str):
23 """
24 Initialize the Analytical class with the provided API key.
26 :param api_key: The API key for accessing the analytical service.
27 :type api_key: str
28 """
30 self.api_key = api_key
31 self._status_code = 0
32 self.results = {}
33 self._stats = {}
34 self.json_response = {}
35 self.dataframe_data = None
36 self._status = "error"
37 self.next_url = ""
38 self._total_pages = 0
39 self.query_id = None
40 self.exception = UnhandledException()
42 def __call__(
43 self,
44 query: str,
45 parameters: dict[str] = {},
46 polling_interval_sec: int = 5,
47 ):
48 """
49 Execute a SQL query asynchronously using the provided parameters
50 and polling interval.
52 :param query: The SQL query to be executed.
53 :type query: str
54 :param parameters: A dictionary of parameters to be used in the query.
55 :type parameters: dict[str], optional
56 :param polling_interval_sec: The interval in seconds between status checks.
57 :type polling_interval_sec: int, optional
58 :return: An instance of the Analytical class with query results.
59 :rtype: Analytical
60 """
62 query_execution_url = (
63 f"{BASE_URL['base_url']}/{ANALYTICAL_ENDPOINTS['async_query_execution']}"
64 )
66 body = {
67 "sql": query,
68 "parameters": parameters,
69 }
71 async_response = issue_request(
72 api_key=self.api_key,
73 url=query_execution_url,
74 body=body,
75 method="POST",
76 )
78 self.query_id = async_response.get("query_id")
79 if not self.query_id:
80 raise DataSolutionsAPIException(
81 "Unexpected response. Query ID was not returned."
82 )
83 async_query_status_url = f"{BASE_URL['base_url']}/{ANALYTICAL_ENDPOINTS['async_query_status']}?query_id={self.query_id}"
85 try:
86 while True:
87 self.json_response = issue_request(
88 api_key=self.api_key,
89 url=async_query_status_url,
90 method="GET",
91 )
93 self._status = self.json_response["status"]
95 if self._status == "running" or self._status == "pending":
96 sleep(polling_interval_sec)
97 elif self._status == "error":
98 self.error_message = self.json_response["message"]
99 self.error_details = self.json_response.get("details")
100 break
101 elif self._status == "success":
102 self._status_code = 200
103 self._stats = self.json_response["stats"]
104 self.results = self.json_response["results"]
105 self.next_url = self.json_response["next"]
106 self._total_pages = self._stats["total_pages"]
107 break
108 except DataSolutionsSDKException as e:
109 self.exception = e.get_exception()
110 self._status_code = e.status_code
111 except Exception as e:
112 self.exception = UnhandledException(
113 details=e,
114 )
115 return self
117 def next_page(self):
118 """
119 Fetch the next page of DataBricks query results.
121 :return: An instance of the Analytical class with the next page of results.
122 :rtype: Analytical
123 """
125 if self.next_url:
127 self.json_response = issue_request(
128 api_key=self.api_key,
129 url=self.next_url,
130 method="GET",
131 )
132 self._status = self.json_response["status"]
134 if self._status == "error":
135 self.error_message = self.json_response["message"]
136 self.error_details = self.json_response.get("details")
137 elif self._status == "success":
138 self._status = self._status
139 self._stats = self.json_response["stats"]
140 self.results = self.json_response["results"]
141 self.next_url = self.json_response["next"]
142 else:
143 raise BadRequest(
144 "No next page available. Use the method has_next() to check if there is a next page that can be retrieved."
145 )
146 return self
148 def json(self):
149 """
150 Return results as a JSON.
152 :raises Exception: Raises an exception if the query resulted in an error.
153 :return: Results of the SQL query as a JSON.
154 :rtype: dict
155 """
157 if self._status != "error":
158 return self.results
159 else:
160 raise self.exception
162 def df(self):
163 """
164 Convert query results into a pandas DataFrame.
166 :raises Exception: Raises an exception if the query resulted in an error.
167 :return: DataFrame containing the results of the SQL query.
168 :rtype: pd.DataFrame
169 """
171 if self._status != "error":
172 if self.dataframe_data:
173 return self.dataframe_data
174 self.dataframe_data = pd.DataFrame(self.results)
175 return self.dataframe_data
176 else:
177 raise self.exception
179 def stats(self):
180 """
181 Get the statistics of the executed query.
183 :return: Statistics of the query execution.
184 :rtype: dict
185 """
187 if self._status != "error":
188 return self._stats
189 else:
190 raise self.exception
192 def status_code(self):
193 """
194 Get the HTTP status code of the response.
196 :return: HTTP status code.
197 :rtype: int
198 """
200 return self._status_code
202 def was_successful(self):
203 """
204 Determine if the query executed successfully.
206 :return: True if the query was successful, False otherwise.
207 :rtype: bool
208 """
210 if self._status != "error":
211 return True
212 return False
214 def total_pages(self) -> int:
215 """
216 Return total number of pages.
218 :return: Number of pages.
219 :rtype: int
220 """
222 return self._total_pages
224 def has_next(self) -> bool:
225 """
226 Return if the next page exists.
228 :return: Whether next page exists.
229 :rtype: bool
230 """
232 if self.next_url:
233 return True
234 return False