95 lines
3.0 KiB
Python
95 lines
3.0 KiB
Python
import subprocess, os
|
|
import json, requests
|
|
|
|
class Test:
|
|
def __init__(self, base='http://localhost:8888', create_admin=False, admin=None):
|
|
'''
|
|
@opt:base = base url string
|
|
@opt:create_admin = creates admin account directly off cargo -c <NAME>
|
|
@opt:admin = optionally pass in dictionary with some admin credentials
|
|
potentially from another instance to run multiple tests at once
|
|
'''
|
|
|
|
self.test_count = 0
|
|
if create_admin:
|
|
self.body = Test.__create_admin()
|
|
elif admin is not None:
|
|
self.body = body
|
|
else:
|
|
# for now we use this because my dev server has this fake ass acc on it
|
|
self.body = {
|
|
'secret': 'utO088fltYrTZg323NZfAGrAkArMkDfwjPmt0ooAYta2oJOYDWcAd1FnrpVVMqZtMeUX4_Hu57-LHCkXy8gedg==',
|
|
'id': 23,
|
|
'name': 'admin',
|
|
'joindate':1602385239,
|
|
'status': 0,
|
|
'permissions': 18446744073709551615
|
|
}
|
|
|
|
self.base = base
|
|
|
|
@staticmethod
|
|
def __create_admin(cargo_path: str):
|
|
# /home/$user/.cargo/bin/cargo <- normally
|
|
# prolly some awful shit on pipes
|
|
CARGO_BIN = os.getenv(cargo_path)
|
|
raw_json = subprocess.run(f'{CARGO_BIN} run --release -- -c dev-test'.split(), text=True, capture_output=True)
|
|
#raw_json = raw_json_b[2:-1].replace('\\n', ' ').strip()
|
|
return json.loads(raw_json.stdout)
|
|
|
|
|
|
@staticmethod
|
|
def log(url: str, method: str, request: requests.Response):
|
|
print(f'{method} {url}')
|
|
print(f'\t[Status Code]: {request.status_code}')
|
|
print(f'\t[Body]: {request.text}')
|
|
|
|
def _build_req_body(self, **options):
|
|
_map = self.body
|
|
for key in options:
|
|
_map[key] = options[key]
|
|
|
|
return json.dumps(_map)
|
|
|
|
|
|
|
|
def post(self, url, **opts):
|
|
body = self._build_req_body(**opts)
|
|
r = requests.post(self.base + url, data=body)
|
|
self.log(self.base + url, 'POST', r)
|
|
|
|
def get(self, url, **opts):
|
|
body = self._build_req_body(**opts)
|
|
r = requests.get(self.base + url, data=body)
|
|
self.log(self.base + url, 'GET', r)
|
|
|
|
def delete(self, url, **opts):
|
|
body = self._build_req_body(**opts)
|
|
r = requests.delete(self.base + url, data=body)
|
|
self.log(self.base + url, 'DELETE', r)
|
|
|
|
def creds(self):
|
|
return self.body
|
|
|
|
|
|
if __name__ == '__main__':
|
|
worker = Test(create_admin=False)
|
|
|
|
# First the invites api gets some basic tests
|
|
worker.get('/invite/create')
|
|
|
|
# Channels things
|
|
VOICE_CHANNEL = 1
|
|
TEXT_CHANNEL = 2
|
|
|
|
worker.post('/channels/create', name='something random', kind=TEXT_CHANNEL)
|
|
worker.get('/channels/list')
|
|
worker.delete('/channels/delete', name='something random')
|
|
worker.get('/channels/list')
|
|
|
|
# Messaging
|
|
|
|
worker.post('/channels/create', name='send-channel', kind=TEXT_CHANNEL)
|
|
worker.post('/message/send', channel='send-channel', content="some random content")
|
|
worker.delete('/channels/delete', name='send-channel')
|