import time import subprocess, os, sys import json, requests from web import http class Worker: def __init__(self, domain: str, create_admin: bool): ''' @opt:base = base url string @opt:create_admin = creates admin account directly off cargo -c @opt:admin = optionally pass in dictionary with some admin credentials potentially from another instance to run multiple tests at once ''' self.domain = domain self.requests = {} self.responses = {} self.jwt = None # never gets assigned until /login is hit if create_admin: self.basic_creds = self.__create_admin() # two most important details for basically everything self.id = self.body['id'] self.secret = self.body['secret'] else: # for now we use this because my dev server has this fake ass acc on it self.secret = 'mH7DfAfSLHhVbo9OW4dmqTbzzLnFaZQLxoZFIlKvKHm4NfpLyZsZrYdf12aACZXyEQxELOlBTe2rc36vTPVd8A==', self.id = 2 # This is to never be overwritten self.basic_creds = { 'name': 'owner sama uwu', 'joindate': 69, 'status': 0, 'permissions': 18446744073709551615 } def __create_admin(self): # /home/$user/.cargo/bin/cargo <- normally # NOTE: the above path is prolly some awful shit on ci pipeline CARGO_BIN = os.getenv('CARGO_BIN') proc = subprocess.run(f'{CARGO_BIN} run --release -- -c dev-test'.split(), text=True, capture_output=True) try: return json.loads(proc.stdout) except: import sys print('TESTBOT UNABLE TO LOAD JSON DATA FROM -c flag', file=sys.stderr) exit(1) def update_jwt(self, jwt: str): self.jwt = jwt def log(self, params: dict, url: str, method: str, request: requests.Response, body=True): print(f'TESTBOT {method} {url} {request.status_code}', file=self.out) print(f'TESTBOT\t\t[Parameters]: {params}', file=self.out) if body: print(f'TESTBOT\t\t[Body]: {request.text}\n', file=self.out) def request(self, method: str, path: str, opts: dict, expectation: int): assert(path[0] == '/') # Build the request and store it in our structure url = self.domain + path req = Request(method, url, opts) r_id = time.time() self.requests[r_id] = req resp = req.make(expectation) self.responses[r_id] = resp return req.id def spam_messages(channel, jwt, worker): for _ in range(15): worker.post('/message/send', jwt=jwt, channel=channel, content='dummy post') def run(worker: Worker): VOICE_CHAN = 1 TEXT_CHAN = 2 tests = [ {'init': ['get', '/channels/list', {}], 'hope': 400}, # sanity check {'init': ['post', '/login', {}], 'hope': 200}, {'init': ['post', '/channels/list', **creds], 'hope': 200}, ] # the first two are sanity checks and should not faill worker.get('/channels/list') # Should 400 or something since we're not sending the right keys jwt_response = worker.post('/login', jwt=False) jwt = json.loads(jwt_response)['jwt'] worker.get('/channels/list', jwt=jwt) # maybe now we'll 200? # now for some things that may/may not fail or something idk cname = time.time() # pass 200 newchannel = worker.post('/channels/create', jwt=jwt, name=f'{cname}', kind=2, description='some description') newchannel = json.loads(newchannel) # fail 400 or something worker.post('/channels/create', jwt=jwt, name=f'{cname}', kind=VOICE_CHAN, description='some description') # ez pass worker.get('/channels/list', jwt=jwt) # pass 200 worker.post('/message/send', jwt=jwt, channel=newchannel['id'], content='some bullshit message') # pass 200 worker.delete('/channels/delete', jwt=jwt, channel_id=newchannel['id']) # fail 400 worker.post('/channels/create', jwt=False, name=f'succ', kind=2) # pass 200 worker.get('/meta', jwt=jwt) # Getting messages now # pass 200 send_chan = worker.post('/channels/create', jwt=jwt, name=f'{cname}', kind=2, description='some description') send_chan = json.loads(send_chan) # pass 200(all) spam_messages(send_chan['id'], jwt, worker) now = time.time() messages = json.loads(worker.get('/message/get_range', jwt=jwt, **{ 'start-time': int(now - 15), 'end-time': int(now + 1), 'channel': send_chan['id'] })) # 200 pass from_id = worker.get('/message/from_id', jwt=jwt, **{ 'channel': send_chan['id'], 'start': messages['messages'][0]['id'] }) print('done') if __name__ == '__main__': worker = Worker('http://localhost:8888', create_admin=False) run(worker)