- Removing old client test code in favor of new structure

+ Moving tests to the new client
- Removing web/ module
! Currently all tests are passing 17/17 but the real trickery comes with doing this
on CI which should will likely take some magic somewhere
Or we'll just extend the freechat docker image to finally have all the required
dependancies and just test on that with diesel and what not
This commit is contained in:
shockrah 2021-04-21 21:38:25 -07:00
parent 9ab9cdb176
commit e2e68c51ba
5 changed files with 47 additions and 452 deletions

View File

@ -1,257 +0,0 @@
import time
import subprocess, os, sys
import json, requests
from json import JSONDecodeError
from web import http
class Worker:
def __init__(self, domain: str, create_admin: bool):
'''
@opt:base = base url string
@opt:create_admin = creates admin account directly off cargo -c <NAME>
@opt:admin = optionally pass in dictionary with some admin credentials
potentially from another instance to run multiple tests at once
'''
self.domain = domain
self.requests = {}
self.responses = {}
self.body_logs = {}
self.jwt = None # never gets assigned until /login is hit
self.basic_creds = self.__create_admin()
self.id = self.basic_creds['user']['id']
self.secret = self.basic_creds['user']['secret']
def __create_admin(self):
# /home/$user/.cargo/bin/cargo <- normally
# NOTE: the above path is prolly some awful shit on ci pipeline
CARGO_BIN = os.getenv('CARGO_BIN')
proc = subprocess.run(f'cargo run --release -- -c dev-test'.split(), text=True, capture_output=True)
try:
return json.loads(proc.stdout)
except JSONDecodeError as err:
import sys
print('TESTBOT UNABLE TO LOAD JSON DATA FROM -c flag', file=sys.stderr)
print(err)
exit(1)
def auth(self, auth_opt: str):
if auth_opt == 'basic':
return self.basic_creds
else:
return {'id': self.id, 'jwt': auth_opt}
def _append_auth(self, opts: dict, auth: str):
'''
Default auth fallback type is jwt because that's they only other type of auth
FC cares about really
@param opts: Dictionary of parameters to pass to the endpoint
@auth: Denotes if we use basic auth or jwt if its not 'basic'
'''
# id is basically always required by the api so we mindlessly add it here
opts['id'] = self.id
if auth == 'basic':
opts['secret'] = self.secret
else:
opts['jwt'] = auth
return opts
def run_test(self, test):
# {
# 'init': [method, uri, query_string_dict],
# 'auth':jwt|none,
# 'hope': <status-code>
# 'body': <bool> # show response body or not (optional
# }
# not 'try'ing these as misconfigurations are completely fatal anyway
method, path, opts = test['init']
auth = test['auth']
hope = test['hope']
if 'body' in test:
# Only checking for headers if a body is present as we really only use
# headers with the body
head = test['headers'] if 'headers' in test else None
self.request(method, path, auth, opts, hope, show_body=test['body'], headers=head)
else:
self.request(method, path, auth, opts, hope)
def logs(self):
ids = sorted(self.requests.keys()) # shared keys in requests/responses
for key in ids:
self.responses[key].log()
# if body is request to be shown then dump it tabbed out by 1 tab
if self.body_logs[key] is True:
print(f'\tBody: {self.responses[key].body}')
def test_stats(self):
passc = 0
failc = 0
total = len(self.responses)
for key in self.responses:
hope = self.responses[key].expected
real = self.responses[key].code
if hope == real:
passc += 1
else:
failc += 1
print('=======')
print(f'\033[1;32mPassing\033[0m {passc}/{total}')
print(f'\033[1;31mFailing\033[0m {failc}/{total}')
print('=======')
def request(self, method: str, path: str, auth: str, opts: dict, expectation: int, show_body=False, headers={}):
assert(path[0] == '/')
# First make sure we add in the correct auth params that are requested
opts = self._append_auth(opts, auth)
# Build the request and store it in our structure
url = self.domain + path
req = http.Request(method, url, opts, headers=headers)
r_id = time.time()
resp = req.make(expectation)
# update log trackers
self.requests[r_id] = req
self.responses[r_id] = resp
self.body_logs[r_id] = show_body
return r_id
def run(worker: Worker):
VOICE_CHAN = 1
TEXT_CHAN = 2
# Basically every test requires a jwt to be passed in so we grab that here
# Should this fail so should nearly every other test from this point
req_login = worker.request('post', '/login', 'basic',{}, 200)
jwt = worker.responses[req_login].json()['jwt']
new_channel_name = time.time()
channel_tests = [
# sanity check
{'init': ['get', '/channels/list', {}], 'auth': None, 'hope': 401},
{'init': ['post', '/channels/list', {}], 'auth': jwt, 'hope': 404},
{'init': ['post', '/channels/create', {'name': str(new_channel_name), 'kind': TEXT_CHAN, 'description': 'asdf'}], 'auth': jwt, 'hope': 200},
{'init': ['post', '/channels/create', {'name': str(new_channel_name+1), 'kind': TEXT_CHAN,}], 'auth': jwt, 'hope': 200},
{'init': ['post', '/channels/create', {}], 'auth': jwt, 'hope': 400},
{'init': ['post', '/channels/create', {'name': 123, 'kind': 'adsf'}], 'auth': jwt, 'hope': 400},
{'init': ['get', '/channels/list', {'kind': TEXT_CHAN}], 'auth': jwt, 'hope': 200},
]
for test in channel_tests:
worker.run_test(test)
msg_chan_name = time.time()
_id = worker.request('post', '/channels/create', jwt, {
'name': str(msg_chan_name),
'kind': TEXT_CHAN,
'description': 'asdf'
}, 200)
chan_d = worker.responses[_id].json()
message_tests = [
# bs message spam
{
'init': ['post', '/message/send', {'type': 'text', 'channel_id': chan_d['id'], 'content': 'bs content'}],
'headers': {'content-type': 'text/plain'},
'auth': jwt, 'hope': 200, 'body': False
},
{
'init': ['post', '/message/send', {'type': 'text', 'channel_id': chan_d['id'], 'content': 'bs content'}],
'headers': {'content-type': 'text/plain'},
'auth': jwt, 'hope': 200
},
{
'init': ['post', '/message/send', {'type': 'text', 'channel_id': chan_d['id'], 'content': 'bs content'}],
'headers': {'content-type': 'text/plain'},
'auth': jwt, 'hope': 200
},
{
'init': ['post', '/message/send', {'type': 'text', 'channel_id': chan_d['id'], 'content': 'bs content'}],
'headers': {'content-type': 'text/plain'},
'auth': jwt, 'hope': 200
},
# can we get them back tho?
{
'init': [
'get', '/message/get_range', {'channel_id': chan_d['id'], 'start_time': int(msg_chan_name-10), 'end_time': int(msg_chan_name + 10)}
],
'auth': jwt, 'hope': 200
},
{
'init': [
'get', '/message/get_range', {'channel_id': chan_d['id'], 'end_time': int(msg_chan_name)}
],
'auth': jwt, 'hope': 400
},
{
'init': [
'get', '/message/get_range', {'channel_id': chan_d['id'], 'start_time': int(msg_chan_name), 'end_time': int(msg_chan_name)}
],
'auth': jwt, 'hope': 400
},
{
'init': [
'get', '/message/get_range', {'channel_id': chan_d['id'], 'end_time': int(msg_chan_name), 'start_time': int(msg_chan_name)}
],
'auth': jwt, 'hope': 400
},
]
for test in message_tests:
worker.run_test(test)
member_tests = [
{ 'init': ['get', '/members/me', {}], 'auth': jwt, 'hope': 200, 'body': True},
{ 'init': ['get', '/members/get_online', {}], 'auth': jwt, 'hope': 200, 'body': True},
{ 'init': ['post', '/members/me/nickname', {'nick': f'New name: {time.ctime()}'}], 'auth': jwt, 'hope': 200 },
{ 'init': ['get', '/members/me', {}], 'auth': jwt, 'hope': 200, 'body': True},
]
for test in member_tests:
worker.run_test(test)
invite_tests = [
{'init': ['post', '/invite/create', {}], 'auth': jwt, 'hope': 200, 'body': True},
{'init': ['get', '/invite/create', {}], 'auth': jwt, 'hope': 404, 'body': True},
]
for test in invite_tests:
worker.run_test(test)
# ad-hoc test for joining now
invite_req_id = worker.request('post', '/invite/create', jwt, {}, 200, True)
code = worker.responses[invite_req_id].json()['id']
worker.request('get', '/join', None, {'code': code}, 200, True)
worker.logs()
worker.test_stats()
if __name__ == '__main__':
worker = Worker('http://localhost:4536', create_admin=True)
run(worker)

View File

@ -48,14 +48,14 @@ def bs_admin(server: Server) -> Admin:
return Admin(user_, server_)
def std_request(admin: Admin, method: str, path: str, qs: dict, hope: int, headers: dict={}, body=None) -> Request:
def req(admin: Admin, method: str, path: str, qs: dict, hope: int, headers: dict={}, body=None, verbose=False) -> Request:
url = admin.server.url + path
# Copy over the additional params into the query string
params = {'id': admin.id, 'jwt': admin.jwt}
for key in qs:
params[key] = qs[key]
return Request(url, method, params, headers, hope, body)
return Request(url, method, params, headers, hope, body, verbose)
if __name__ == '__main__':
admin = create_admin()
@ -81,14 +81,20 @@ if __name__ == '__main__':
# Container for most/all the generic requests we want to fire off
requests.extend([
std_request(fake_user, 'get', '/channels/list', {}, 401),
std_request(admin, 'post', '/channels/list', {'kind': TEXT_CHAN}, 404),
std_request(admin, 'get' , '/channels/list', {'kind': TEXT_CHAN}, 200),
std_request(admin , 'delete', '/channels/delete', {'channel_id':del_chan_id}, 200),
std_request(admin, 'post', '/message/send', {'channel_id': chan_id},200,{'content-type':'text/plain'}, 'asdf'),
std_request(admin, 'post', '/message/send', {'channel_id': 123}, 400, {'content-type': 'text/plain'}, 'asdf'),
std_request(admin , 'post', '/message/send', {'channel_id': chan_id}, 200, {'content-type': 'image/png'}, 'asdf'),
std_request(admin , 'post', '/message/send', {'channel_id': 123}, 400, {'content-type': 'image/png'}, 'asdf'),
req(fake_user, 'get', '/channels/list', {}, 401),
req(admin, 'post', '/channels/list', {'kind': TEXT_CHAN}, 404),
req(admin, 'get' , '/channels/list', {'kind': TEXT_CHAN}, 200),
req(admin , 'delete', '/channels/delete', {'channel_id':del_chan_id}, 200),
req(admin, 'post', '/message/send', {'channel_id': chan_id},200,{'content-type':'text/plain'}, 'asdf'),
req(admin, 'post', '/message/send', {'channel_id': 123}, 400, {'content-type': 'text/plain'}, 'asdf'),
req(admin , 'post', '/message/send', {'channel_id': chan_id}, 200, {'content-type': 'image/png'}, 'asdf'),
req(admin , 'post', '/message/send', {'channel_id': 123}, 400, {'content-type': 'image/png'}, 'asdf'),
req(admin, 'get', '/message/recent', {'channel_id': chan_id, 'limit': 20}, 200, verbose=True),
req(admin, 'get', '/message/recent', {'channel_id': 123, 'limit': 20}, 404),
req(admin, 'get', '/members/me', {}, 200),
req(admin, 'get', '/members/get_online', {}, 200),
req(admin, 'post', '/members/me/nickname', {'nick': f'randy-{time()}'}, 200),
req(admin , 'post', '/invite/create', {}, 200, verbose=True)
])
for req in requests:
@ -98,7 +104,13 @@ if __name__ == '__main__':
requests.insert(0, login_req)
requests.insert(0, mk_chan_sanity)
requests.insert(0, mk_chan_to_delete)
for req in requests:
req.show_response()
pass_count = 0
for r in requests:
r.show_response()
if r.passing:
pass_count += 1
req_count = len(requests)
print(f'Passing {pass_count}/{req_count}')

View File

@ -7,7 +7,7 @@ GREEN = '\033[1;32m'
class Request:
def __init__(self, url: str, method: str, qs: dict, headers: dict, hope: int, body=None):
def __init__(self, url: str, method: str, qs: dict, headers: dict, hope: int, body=None, verbose=False):
self.method = method
self.url = url
# query string parameters are appended to the url which is why we do this
@ -16,10 +16,18 @@ class Request:
self.body = body
self.response = None
self.hope = hope
# This flag lets us control how much output we want to see
self.verbose = verbose
@property
def passing(self):
if self.response is None:
return False
else:
return self.response.status_code == self.hope
def fire(self) -> requests.Response:
try:
if self.method == 'get':
@ -47,13 +55,23 @@ class Request:
print(abstract)
print('\t', self.method, ' ', self.url)
if len(self.headers) != 0:
print('\tHeaders ', self.headers)
print('\tRequest-Headers ', self.headers)
if len(self.qs) != 0:
print('\tQuery Dictionary ', self.qs)
print('\tQuery-Dictionary ', self.qs)
if self.body is not None:
print('\tBody ', str(self.body))
print('\tRequest-Body ', str(self.body))
if self.verbose:
print(f'\tResponse-Status {self.response.status_code}')
print(f'\tResponse-Headers {self.response.headers}')
print(f'\tResponse-Text {self.response.text}')
else:
print(f'{GREEN}Pass{NC} {self.method} {self.url}')
if self.verbose:
print(f'\tResponse-Status {self.response.status_code}')
print(f'\tResponse-Headers {self.response.headers}')
print(f'\tResponse-Text {self.response.text}')

View File

@ -1 +0,0 @@
from . import http

View File

@ -1,177 +0,0 @@
import sys
import requests
import time
import json
from urllib.parse import quote
# TODO/NOTE: this isn't necessary for literally any server that doesn't self-sign
# There's prolly a less retared way of doing this "safely" in testing pipelines
# but im in no rush to figure that out right now
import urllib3
urllib3.disable_warnings()
class RequestError(Exception):
pass
class Response:
'''
Response is wrapper for reading + extracting information we care about
Primarily created by Requests that get `make`'d.
'''
def __init__(self, method: str, url: str, body: str, code: int, expected: int, headers=None,
out=sys.stdout, color=True, truncate_long_body=True, p=None):
self.headers = headers
self.method = method
self.url = url
self.base_url = self.url[:self.url.find('?')]
try:
self.query_string = self.url.strip(self.base_url)[1:]
except: # do this for empty query strings
self.query_string = self.url.strip(self.base_url)
self.body = body #typically a string before parsing anything
self.code = code #u16
self.expected = expected #u16
self.out = out # file handle to write to normally sys.stdout
self.color = color # bool telling if log should color anything (on by default)
self.truncate_long_body = truncate_long_body
self.raw_params = p
def _color(self, cc, string):
nc = '\033[0m'
return f'{cc}{string}{nc}'
def _color_failing(self, string):
red = '\033[1;31m'
return self._color(red, string)
def _color_passing(self, string):
green = '\033[1;32m'
return self._color(green, string)
def __write_msg(self, s):
# mega dumb wrapper to reduce visual noise i think
if (len(s) == 1 or len(s) == 0) is False: print(s, file=self.out)
def _log_body(self):
# TODO: refactor this func to be more flexible as its hella useful
# for truncating ugly long as hell strings
if self.truncate_long_body:
if len(self.body) > 80:
msg = self.body
while len(msg) > 80:
msg = msg[:len(msg)//2] + msg[len(msg)//2+1:]
msg = '.....'.join([msg[:40], msg[44]])
self.__write_msg(msg)
else:
self.__write_msg(f'\t{self.body}')
else:
self.__write_msg(f'\t{self.body}')
def log(self, show_query: bool = True):
if self.code != self.expected:
fail = f'Failed {self.method.upper()} {self.base_url}\t{self.code} expected {self.expected}\n'
content = f'\tRaw params: {self.raw_params[0]}\n\tParsed Params: {self.raw_params[1]}'
headers = f'\tHeaders: {self.headers}'
if self.color:
fail = self._color_failing(fail)
self.__write_msg(fail)
self.__write_msg(content)
print(headers)
self._log_body()
else:
msg = f'Passing: {self.method} {self.base_url}'
if self.color:
msg = self._color_passing(msg)
# no color on the query like ever(because its typically massive
if show_query:
msg += f'\n\t{self.query_string[:77]}...'
self.__write_msg(msg)
def json(self):
'''
Force an interpretation of json from the body
NOTE: this method is rather obtuse and a massive afterthough so its usage
should be limited as much as possible
'''
try:
return json.loads(self.body)
except:
return {}
def __str__(self):
'''
Returns: str(Response) -> `code => response.bdoy`
'''
return f'{self.code} => {self.body}'
class Request:
def __init__(self, method: str, url: str, params: dict, headers: dict):
assert(method in ['get', 'post', 'delete'])
self.method = method
self.url = url
self.headers = headers
if 'content' in params:
self.body = params['content']
else:
self.body = None
self.query_string = ''
self.qs_dict = params
for key in params:
# percent encode all of our values before we construct the query string
key = quote(key)
value = quote(str(params[key]))
if key == 'content':
self.body = params['content']
else:
value = params[key]
if len(self.query_string) == 0:
self.query_string += f'?{key}={value}'
else:
self.query_string += f'&{key}={value}'
def _make_request(self, method: str, hope: int):
# Lower driver for actuall making the request we are looking for
method = method.lower()
url = self.url + self.query_string
raw_params = (self.query_string, self.qs_dict)
if method == 'get':
resp = requests.get(url, verify=False, headers=self.headers)
return Response('get', url, resp.text, resp.status_code, hope, p=raw_params, headers=self.headers)
elif method == 'post':
print(self.headers)
resp = requests.post(url, data=self.body, verify=False, headers=self.headers)
return Response('post', url, resp.text, resp.status_code, hope, p=raw_params, headers=self.headers)
elif method == 'delete':
resp = requests.delete(url, verify=False, headers=self.headers)
return Response('delete', url, resp.text, resp.status_code, hope, p=raw_params, headers=self.headers)
else:
raise RequestError('Invalid method passed')
return resp
def make(self, hope: int) -> Response:
'''
@param hope: int -> status code we hope to get back
@return Response -> Wrapper around server http response
'''
return self._make_request(self.method, hope)