import time import subprocess, os import json, requests class Response: def __init__(self, body, code): self.body = body self.code = code def __str__(self): return f'{self.code} => {self.body}' class Test: def __init__(self, base='http://localhost:8888', create_admin=False, admin=None): ''' @opt:base = base url string @opt:create_admin = creates admin account directly off cargo -c @opt:admin = optionally pass in dictionary with some admin credentials potentially from another instance to run multiple tests at once ''' self.test_count = 0 if create_admin: self.body = Test.__create_admin() elif admin is not None: self.body = body else: # for now we use this because my dev server has this fake ass acc on it self.body = { 'secret': 'JfW_Icct2P1WEo6PQlGb7l1IMd2QsRXAVarPoPZHZnj7NOMWBMdirnH9JqAKgrO5z3fb54QYsWlPPlRGowwFSA==', 'id': 1, 'name': 'owner sama uwu', 'joindate': 69, 'status': 0, 'permissions': 18446744073709551615 } self.base = base @staticmethod def __create_admin(): # /home/$user/.cargo/bin/cargo <- normally # prolly some awful shit on pipes CARGO_BIN = os.getenv('CARGO_BIN') proc = subprocess.run(f'{CARGO_BIN} run --release -- -c dev-test'.split(), text=True, capture_output=True) try: return json.loads(proc.stdout) except: import sys print('UNABLE TO LOAD JSON DATA FROM -c flag', file=sys.stderr) exit(1) @staticmethod def log(url: str, method: str, request: requests.Response): print(f'{method} {url}') print(f'\t[Status Code]: {request.status_code}') print(f'\t[Body]: {request.text}') def _build_req_body(self, **options): _map = self.body for key in options: _map[key] = options[key] return json.dumps(_map) def post(self, url, **opts): ''' @returns text of response ''' body = self._build_req_body(**opts) r = requests.post(self.base + url, data=body) self.log(self.base + url, 'POST', r) return r.text def get(self, url, **opts): ''' @returns text of response ''' body = self._build_req_body(**opts) r = requests.get(self.base + url, data=body) self.log(self.base + url, 'GET', r) return r.text def delete(self, url, **opts): ''' @returns text of response ''' body = self._build_req_body(**opts) r = requests.delete(self.base + url, data=body) self.log(self.base + url, 'DELETE', r) return r.text def creds(self): return self.body if __name__ == '__main__': worker = Test(create_admin=True) # First the invites api gets some basic tests worker.get('/invite/create') # Channels things VOICE_CHANNEL = 1 TEXT_CHANNEL = 2 new_channel_response = worker.post('/channels/create', name=f'{time.time()}', kind=TEXT_CHANNEL) new_channel = json.loads(new_channel_response) channel_list = json.loads(worker.get('/channels/list')) worker.delete('/channels/delete', channel_id=new_channel['id']) worker.get('/channels/list') # Messaging msg_chan_id = time.time() msg_chan_raw = worker.post('/channels/create', name=f'{msg_chan_id}', kind=TEXT_CHANNEL) msg_chan = json.loads(msg_chan_raw) worker.post('/message/send', channel=msg_chan['id'], content="some random content") worker.delete('/channels/delete', channel_id=msg_chan['id']) # finally clean up the channel we created