Building stateless tests for now as a proof of concept that this more spammy testing approach can work

This commit is contained in:
shockrah 2021-01-20 21:28:33 -08:00
parent e39179da78
commit 68099af7ba

View File

@ -49,8 +49,12 @@ class Worker:
print('TESTBOT UNABLE TO LOAD JSON DATA FROM -c flag', file=sys.stderr) print('TESTBOT UNABLE TO LOAD JSON DATA FROM -c flag', file=sys.stderr)
exit(1) exit(1)
def update_jwt(self, jwt: str): def auth(self, auth_opt: str):
self.jwt = jwt if auth_opt == 'basic':
return self.basic_creds
else:
return {'id': self.id, 'jwt': auth_opt}
def log(self, params: dict, url: str, method: str, request: requests.Response, body=True): def log(self, params: dict, url: str, method: str, request: requests.Response, body=True):
print(f'TESTBOT {method} {url} {request.status_code}', file=self.out) print(f'TESTBOT {method} {url} {request.status_code}', file=self.out)
@ -58,9 +62,32 @@ class Worker:
if body: if body:
print(f'TESTBOT\t\t[Body]: {request.text}\n', file=self.out) print(f'TESTBOT\t\t[Body]: {request.text}\n', file=self.out)
def request(self, method: str, path: str, opts: dict, expectation: int): def _append_auth(self, opts: dict, auth: str):
'''
Default auth fallback type is jwt because that's they only other type of auth
FC cares about really
@param opts: Dictionary of parameters to pass to the endpoint
@auth: Denotes if we use basic auth or jwt if its not 'basic'
'''
if type(auth) == str:
opts['id'] = self.id
if auth == 'basic':
opts['secret'] = self.secret
else:
opts['jwt'] = auth
return opts
else:
# if its not a string we don't add anything in
return opts
def request(self, method: str, path: str, auth: str, opts: dict, expectation: int):
assert(path[0] == '/') assert(path[0] == '/')
# First make sure we add in the correct auth params that are requested
opts = self._append_auth(opts, auth)
# Build the request and store it in our structure # Build the request and store it in our structure
url = self.domain + path url = self.domain + path
req = Request(method, url, opts) req = Request(method, url, opts)
@ -73,69 +100,82 @@ class Worker:
return req.id return req.id
def spam_messages(channel, jwt, worker):
for _ in range(15):
worker.post('/message/send', jwt=jwt, channel=channel, content='dummy post')
def run(worker: Worker): def run(worker: Worker):
VOICE_CHAN = 1 VOICE_CHAN = 1
TEXT_CHAN = 2 TEXT_CHAN = 2
tests = [ # preliminary test
{'init': ['get', '/channels/list', {}], 'hope': 400}, # sanity check req_login = worker.request('post', '/login', 'basic',{}, 200)
{'init': ['post', '/login', {}], 'hope': 200}, jwt = worker.responses[req_login].json()
{'init': ['post', '/channels/list', **creds], 'hope': 200},
new_channel_name = time.time()
channel_tests = [
# init field provides args for Response object generation
# hop field gives us the ideal statuscode we want when we actually send the request
# NOTE: Grouping by status code
# sanity check
{'init': ['get', '/channels/list', {}], 'auth': None, 'hope': 400},
{'init': ['post', '/login', {}], 'auth': 'basic', 'hope': 200},
{'init': ['post', '/channels/list', {}], 'auth': jwt, 'hope': 200},
# somehow save this garbino
{'init': ['post', '/channels/create', {name: str(new_channel_name), kind: 2, description: 'asdf'}], 'auth': jwt, 'hope': 200},
# Just a regular test no saving for this one
{'init': ['post', '/channels/create', {name: str(new_channel_name+1), kind: 2,}], 'auth': jwt, 'hope': 200},
{'init': ['post', '/channels/create', {}], 'auth': jwt, 'hope': 400},
{'init': ['post', '/channels/create', {name: 123, kind: 'adsf'}], 'auth': jwt, 'hope': 400},
# save this and compare its results to the previous
{'init': ['get', '/channels/list', {}], 'auth': jwt, 'hope': 200},
{'init': ['get', '/channels/list', {'random-param': 123}], 'auth': jwt, 'hope': 200},
{'init': ['post'], 'auth': jwt, 'hope': 200}
] ]
# the first two are sanity checks and should not faill for test in channel_tests:
worker.get('/channels/list') # Should 400 or something since we're not sending the right keys method = test['init'][0]
jwt_response = worker.post('/login', jwt=False) path = test['init'][1]
jwt = json.loads(jwt_response)['jwt'] opts = test['init'][2]
worker.get('/channels/list', jwt=jwt) # maybe now we'll 200?
# now for some things that may/may not fail or something idk worker.request(method, path, test['auth'], opts, test['hope'])
cname = time.time()
# pass 200
newchannel = worker.post('/channels/create', jwt=jwt, name=f'{cname}', kind=2, description='some description')
newchannel = json.loads(newchannel)
# fail 400 or something
worker.post('/channels/create', jwt=jwt, name=f'{cname}', kind=VOICE_CHAN, description='some description')
# ez pass
worker.get('/channels/list', jwt=jwt)
# pass 200
worker.post('/message/send', jwt=jwt, channel=newchannel['id'], content='some bullshit message')
# pass 200
worker.delete('/channels/delete', jwt=jwt, channel_id=newchannel['id'])
# fail 400
worker.post('/channels/create', jwt=False, name=f'succ', kind=2)
# pass 200
worker.get('/meta', jwt=jwt)
# Getting messages now # # pass 200
# pass 200 # worker.post('/message/send', jwt=jwt, channel=newchannel['id'], content='some bullshit message')
send_chan = worker.post('/channels/create', jwt=jwt, name=f'{cname}', kind=2, description='some description') #
send_chan = json.loads(send_chan) # # pass 200
# pass 200(all) # worker.delete('/channels/delete', jwt=jwt, channel_id=newchannel['id'])
spam_messages(send_chan['id'], jwt, worker) #
now = time.time() # # fail 400
messages = json.loads(worker.get('/message/get_range', jwt=jwt, **{ # worker.post('/channels/create', jwt=False, name=f'succ', kind=2)
'start-time': int(now - 15), #
'end-time': int(now + 1), # # pass 200
'channel': send_chan['id'] # worker.get('/meta', jwt=jwt)
})) #
#
# 200 pass # # Getting messages now
from_id = worker.get('/message/from_id', jwt=jwt, **{ # # pass 200
'channel': send_chan['id'], # send_chan = worker.post('/channels/create', jwt=jwt, name=f'{cname}', kind=2, description='some description')
'start': messages['messages'][0]['id'] # send_chan = json.loads(send_chan)
}) # # pass 200(all)
print('done') # spam_messages(send_chan['id'], jwt, worker)
# now = time.time()
# messages = json.loads(worker.get('/message/get_range', jwt=jwt, **{
# 'start-time': int(now - 15),
# 'end-time': int(now + 1),
# 'channel': send_chan['id']
# }))
#
# # 200 pass
# from_id = worker.get('/message/from_id', jwt=jwt, **{
# 'channel': send_chan['id'],
# 'start': messages['messages'][0]['id']
# })
# print('done')