165 lines
5.3 KiB
Python
165 lines
5.3 KiB
Python
import sys
|
|
import requests
|
|
import time
|
|
import json
|
|
from urllib.parse import quote
|
|
|
|
|
|
class RequestError(Exception):
|
|
pass
|
|
|
|
class Response:
|
|
'''
|
|
Response is wrapper for reading + extracting information we care about
|
|
Primarily created by Requests that get `make`'d.
|
|
'''
|
|
def __init__(self, method: str, url: str, body: str, code: int, expected: int, out=sys.stdout, color=True,
|
|
truncate_long_body=True, p=None):
|
|
|
|
self.method = method
|
|
self.url = url
|
|
self.base_url = self.url[:self.url.find('?')]
|
|
try:
|
|
self.query_string = self.url.strip(self.base_url)[1:]
|
|
except: # do this for empty query strings
|
|
self.query_string = self.url.strip(self.base_url)
|
|
|
|
self.body = body #typically a string before parsing anything
|
|
self.code = code #u16
|
|
self.expected = expected #u16
|
|
|
|
self.out = out # file handle to write to normally sys.stdout
|
|
self.color = color # bool telling if log should color anything (on by default)
|
|
self.truncate_long_body = truncate_long_body
|
|
self.raw_params = p
|
|
|
|
def _color(self, cc, string):
|
|
nc = '\033[0m'
|
|
return f'{cc}{string}{nc}'
|
|
|
|
def _color_failing(self, string):
|
|
red = '\033[1;31m'
|
|
return self._color(red, string)
|
|
|
|
|
|
def _color_passing(self, string):
|
|
green = '\033[1;32m'
|
|
return self._color(green, string)
|
|
|
|
def __write_msg(self, s):
|
|
# mega dumb wrapper to reduce visual noise i think
|
|
if (len(s) == 1 or len(s) == 0) is False: print(s, file=self.out)
|
|
|
|
def _log_body(self):
|
|
# TODO: refactor this func to be more flexible as its hella useful
|
|
# for truncating ugly long as hell strings
|
|
if self.truncate_long_body:
|
|
if len(self.body) > 80:
|
|
msg = self.body
|
|
while len(msg) > 80:
|
|
msg = msg[:len(msg)//2] + msg[len(msg)//2+1:]
|
|
msg = '.....'.join([msg[:40], msg[44]])
|
|
|
|
self.__write_msg(msg)
|
|
else:
|
|
self.__write_msg(f'\t{self.body}')
|
|
else:
|
|
self.__write_msg(f'\t{self.body}')
|
|
|
|
def log(self, show_query: bool = True):
|
|
if self.code != self.expected:
|
|
fail = f'Failed {self.method.upper()} {self.base_url}\t{self.code} expected {self.expected}\n'
|
|
content = f'\tRaw params: {self.raw_params[0]}\n\tParsed Params: {self.raw_params[1]}'
|
|
if self.color:
|
|
fail = self._color_failing(fail)
|
|
|
|
self.__write_msg(fail)
|
|
self.__write_msg(content)
|
|
self._log_body()
|
|
else:
|
|
msg = f'Passing: {self.method} {self.base_url}'
|
|
if self.color:
|
|
msg = self._color_passing(msg)
|
|
|
|
# no color on the query like ever(because its typically massive
|
|
if show_query:
|
|
msg += f'\n\t{self.query_string[:80]}'
|
|
|
|
self.__write_msg(msg)
|
|
|
|
def json(self):
|
|
'''
|
|
Force an interpretation of json from the body
|
|
NOTE: this method is rather obtuse and a massive afterthough so its usage
|
|
should be limited as much as possible
|
|
'''
|
|
try:
|
|
return json.loads(self.body)
|
|
except:
|
|
return {}
|
|
|
|
def __str__(self):
|
|
'''
|
|
Returns: str(Response) -> `code => response.bdoy`
|
|
'''
|
|
return f'{self.code} => {self.body}'
|
|
|
|
class Request:
|
|
def __init__(self, method: str, url: str, params: dict):
|
|
assert(method in ['get', 'post', 'delete'])
|
|
|
|
self.method = method
|
|
self.url = url
|
|
if 'content' in params:
|
|
self.body = params['content']
|
|
else:
|
|
self.body = None
|
|
|
|
self.query_string = ''
|
|
self.qs_dict = params
|
|
for key in params:
|
|
# percent encode all of our values before we construct the query string
|
|
key = quote(key)
|
|
value = quote(str(params[key]))
|
|
if key == 'content':
|
|
self.body = params['content']
|
|
else:
|
|
value = params[key]
|
|
if len(self.query_string) == 0:
|
|
self.query_string += f'?{key}={value}'
|
|
else:
|
|
self.query_string += f'&{key}={value}'
|
|
|
|
|
|
|
|
def _make_request(self, method: str, hope: int):
|
|
# Lower driver for actuall making the request we are looking for
|
|
method = method.lower()
|
|
|
|
url = self.url + self.query_string
|
|
raw_params = (self.query_string, self.qs_dict)
|
|
if method == 'get':
|
|
resp = requests.get(url)
|
|
return Response('get', url, resp.text, resp.status_code, hope, p=raw_params)
|
|
elif method == 'post':
|
|
resp = requests.post(url, data=self.body)
|
|
return Response('post', url, resp.text, resp.status_code, hope, p=raw_params)
|
|
elif method == 'delete':
|
|
resp = requests.delete(url)
|
|
return Response('delete', url, resp.text, resp.status_code, hope, p=raw_params)
|
|
|
|
else:
|
|
raise RequestError('Invalid method passed')
|
|
|
|
return resp
|
|
|
|
|
|
def make(self, hope: int) -> Response:
|
|
'''
|
|
@param hope: int -> status code we hope to get back
|
|
@return Response -> Wrapper around server http response
|
|
'''
|
|
return self._make_request(self.method, hope)
|
|
|
|
|