Coverage for src/chainalysis/sql/analytical.py: 30%

89 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-20 16:53 -0400

1from time import sleep 

2 

3import pandas as pd 

4 

5from chainalysis.constants import ANALYTICAL_ENDPOINTS, BASE_URL 

6from chainalysis.util_functions.exceptions import ( 

7 BadRequest, 

8 DataSolutionsAPIException, 

9 DataSolutionsSDKException, 

10 UnhandledException, 

11) 

12from chainalysis.util_functions.requests import issue_request 

13 

14 

15class Analytical: 

16 """ 

17 This class provides methods to execute SQL queries on Data Solutions 

18 DataBricks tables. It supports fetching results as JSON or a 

19 pandas DataFrame, and provides query execution statistics. 

20 """ 

21 

22 def __init__(self, api_key: str): 

23 """ 

24 Initialize the Analytical class with the provided API key. 

25 

26 :param api_key: The API key for accessing the analytical service. 

27 :type api_key: str 

28 """ 

29 

30 self.api_key = api_key 

31 self._status_code = 0 

32 self.results = {} 

33 self._stats = {} 

34 self.json_response = {} 

35 self.dataframe_data = None 

36 self._status = "error" 

37 self.next_url = "" 

38 self._total_pages = 0 

39 self.query_id = None 

40 self.exception = UnhandledException() 

41 

42 def __call__( 

43 self, 

44 query: str, 

45 parameters: dict[str] = {}, 

46 polling_interval_sec: int = 5, 

47 ): 

48 """ 

49 Execute a SQL query asynchronously using the provided parameters 

50 and polling interval. 

51 

52 :param query: The SQL query to be executed. 

53 :type query: str 

54 :param parameters: A dictionary of parameters to be used in the query. 

55 :type parameters: dict[str], optional 

56 :param polling_interval_sec: The interval in seconds between status checks. 

57 :type polling_interval_sec: int, optional 

58 :return: An instance of the Analytical class with query results. 

59 :rtype: Analytical 

60 """ 

61 

62 query_execution_url = ( 

63 f"{BASE_URL['base_url']}/{ANALYTICAL_ENDPOINTS['async_query_execution']}" 

64 ) 

65 

66 body = { 

67 "sql": query, 

68 "parameters": parameters, 

69 } 

70 

71 async_response = issue_request( 

72 api_key=self.api_key, 

73 url=query_execution_url, 

74 body=body, 

75 method="POST", 

76 ) 

77 

78 self.query_id = async_response.get("query_id") 

79 if not self.query_id: 

80 raise DataSolutionsAPIException( 

81 "Unexpected response. Query ID was not returned." 

82 ) 

83 async_query_status_url = f"{BASE_URL['base_url']}/{ANALYTICAL_ENDPOINTS['async_query_status']}?query_id={self.query_id}" 

84 

85 try: 

86 while True: 

87 self.json_response = issue_request( 

88 api_key=self.api_key, 

89 url=async_query_status_url, 

90 method="GET", 

91 ) 

92 

93 self._status = self.json_response["status"] 

94 

95 if self._status == "running" or self._status == "pending": 

96 sleep(polling_interval_sec) 

97 elif self._status == "error": 

98 self.error_message = self.json_response["message"] 

99 self.error_details = self.json_response.get("details") 

100 break 

101 elif self._status == "success": 

102 self._status_code = 200 

103 self._stats = self.json_response["stats"] 

104 self.results = self.json_response["results"] 

105 self.next_url = self.json_response["next"] 

106 self._total_pages = self._stats["total_pages"] 

107 break 

108 except DataSolutionsSDKException as e: 

109 self.exception = e.get_exception() 

110 self._status_code = e.status_code 

111 except Exception as e: 

112 self.exception = UnhandledException( 

113 details=e, 

114 ) 

115 return self 

116 

117 def next_page(self): 

118 """ 

119 Fetch the next page of DataBricks query results. 

120 

121 :return: An instance of the Analytical class with the next page of results. 

122 :rtype: Analytical 

123 """ 

124 

125 if self.next_url: 

126 

127 self.json_response = issue_request( 

128 api_key=self.api_key, 

129 url=self.next_url, 

130 method="GET", 

131 ) 

132 self._status = self.json_response["status"] 

133 

134 if self._status == "error": 

135 self.error_message = self.json_response["message"] 

136 self.error_details = self.json_response.get("details") 

137 elif self._status == "success": 

138 self._status = self._status 

139 self._stats = self.json_response["stats"] 

140 self.results = self.json_response["results"] 

141 self.next_url = self.json_response["next"] 

142 else: 

143 raise BadRequest( 

144 "No next page available. Use the method has_next() to check if there is a next page that can be retrieved." 

145 ) 

146 return self 

147 

148 def json(self): 

149 """ 

150 Return results as a JSON. 

151 

152 :raises Exception: Raises an exception if the query resulted in an error. 

153 :return: Results of the SQL query as a JSON. 

154 :rtype: dict 

155 """ 

156 

157 if self._status != "error": 

158 return self.results 

159 else: 

160 raise self.exception 

161 

162 def df(self): 

163 """ 

164 Convert query results into a pandas DataFrame. 

165 

166 :raises Exception: Raises an exception if the query resulted in an error. 

167 :return: DataFrame containing the results of the SQL query. 

168 :rtype: pd.DataFrame 

169 """ 

170 

171 if self._status != "error": 

172 if self.dataframe_data: 

173 return self.dataframe_data 

174 self.dataframe_data = pd.DataFrame(self.results) 

175 return self.dataframe_data 

176 else: 

177 raise self.exception 

178 

179 def stats(self): 

180 """ 

181 Get the statistics of the executed query. 

182 

183 :return: Statistics of the query execution. 

184 :rtype: dict 

185 """ 

186 

187 if self._status != "error": 

188 return self._stats 

189 else: 

190 raise self.exception 

191 

192 def status_code(self): 

193 """ 

194 Get the HTTP status code of the response. 

195 

196 :return: HTTP status code. 

197 :rtype: int 

198 """ 

199 

200 return self._status_code 

201 

202 def was_successful(self): 

203 """ 

204 Determine if the query executed successfully. 

205 

206 :return: True if the query was successful, False otherwise. 

207 :rtype: bool 

208 """ 

209 

210 if self._status != "error": 

211 return True 

212 return False 

213 

214 def total_pages(self) -> int: 

215 """ 

216 Return total number of pages. 

217 

218 :return: Number of pages. 

219 :rtype: int 

220 """ 

221 

222 return self._total_pages 

223 

224 def has_next(self) -> bool: 

225 """ 

226 Return if the next page exists. 

227 

228 :return: Whether next page exists. 

229 :rtype: bool 

230 """ 

231 

232 if self.next_url: 

233 return True 

234 return False