128 lines
3.8 KiB
Python
128 lines
3.8 KiB
Python
import time
|
|
import subprocess, os
|
|
import json, requests
|
|
|
|
class Response:
|
|
def __init__(self, body, code):
|
|
self.body = body
|
|
self.code = code
|
|
|
|
def __str__(self):
|
|
return f'{self.code} => {self.body}'
|
|
|
|
class Test:
|
|
def __init__(self, base='http://localhost:8888', create_admin=False, admin=None):
|
|
'''
|
|
@opt:base = base url string
|
|
@opt:create_admin = creates admin account directly off cargo -c <NAME>
|
|
@opt:admin = optionally pass in dictionary with some admin credentials
|
|
potentially from another instance to run multiple tests at once
|
|
'''
|
|
|
|
self.test_count = 0
|
|
if create_admin:
|
|
self.body = Test.__create_admin()
|
|
elif admin is not None:
|
|
self.body = body
|
|
else:
|
|
# for now we use this because my dev server has this fake ass acc on it
|
|
self.body = {
|
|
'secret': 'JfW_Icct2P1WEo6PQlGb7l1IMd2QsRXAVarPoPZHZnj7NOMWBMdirnH9JqAKgrO5z3fb54QYsWlPPlRGowwFSA==',
|
|
'id': 1,
|
|
'name': 'owner sama uwu',
|
|
'joindate': 69,
|
|
'status': 0,
|
|
'permissions': 18446744073709551615
|
|
}
|
|
|
|
self.base = base
|
|
|
|
@staticmethod
|
|
def __create_admin():
|
|
# /home/$user/.cargo/bin/cargo <- normally
|
|
# prolly some awful shit on pipes
|
|
CARGO_BIN = os.getenv('CARGO_BIN')
|
|
|
|
proc = subprocess.run(f'{CARGO_BIN} run --release -- -c dev-test'.split(), text=True, capture_output=True)
|
|
try:
|
|
return json.loads(proc.stdout)
|
|
except:
|
|
import sys
|
|
print('UNABLE TO LOAD JSON DATA FROM -c flag', file=sys.stderr)
|
|
exit(1)
|
|
|
|
|
|
|
|
@staticmethod
|
|
def log(url: str, method: str, request: requests.Response):
|
|
print(f'{method} {url}')
|
|
print(f'\t[Status Code]: {request.status_code}')
|
|
print(f'\t[Body]: {request.text}')
|
|
|
|
def _build_req_body(self, **options):
|
|
_map = self.body
|
|
for key in options:
|
|
_map[key] = options[key]
|
|
|
|
return json.dumps(_map)
|
|
|
|
|
|
|
|
def post(self, url, **opts):
|
|
'''
|
|
@returns text of response
|
|
'''
|
|
body = self._build_req_body(**opts)
|
|
r = requests.post(self.base + url, data=body)
|
|
self.log(self.base + url, 'POST', r)
|
|
return r.text
|
|
|
|
def get(self, url, **opts):
|
|
'''
|
|
@returns text of response
|
|
'''
|
|
body = self._build_req_body(**opts)
|
|
r = requests.get(self.base + url, data=body)
|
|
self.log(self.base + url, 'GET', r)
|
|
return r.text
|
|
|
|
def delete(self, url, **opts):
|
|
'''
|
|
@returns text of response
|
|
'''
|
|
body = self._build_req_body(**opts)
|
|
r = requests.delete(self.base + url, data=body)
|
|
self.log(self.base + url, 'DELETE', r)
|
|
return r.text
|
|
|
|
def creds(self):
|
|
return self.body
|
|
|
|
|
|
if __name__ == '__main__':
|
|
worker = Test(create_admin=True)
|
|
|
|
# First the invites api gets some basic tests
|
|
worker.get('/invite/create')
|
|
|
|
# Channels things
|
|
VOICE_CHANNEL = 1
|
|
TEXT_CHANNEL = 2
|
|
|
|
new_channel_response = worker.post('/channels/create', name=f'{time.time()}', kind=TEXT_CHANNEL)
|
|
new_channel = json.loads(new_channel_response)
|
|
|
|
channel_list = json.loads(worker.get('/channels/list'))
|
|
worker.delete('/channels/delete', channel_id=new_channel['id'])
|
|
worker.get('/channels/list')
|
|
|
|
# Messaging
|
|
|
|
msg_chan_id = time.time()
|
|
msg_chan_raw = worker.post('/channels/create', name=f'{msg_chan_id}', kind=TEXT_CHANNEL)
|
|
msg_chan = json.loads(msg_chan_raw)
|
|
print(f'Channel id to be used: {msg_chan["id"]}')
|
|
|
|
worker.post('/message/send', channel=msg_chan['id'], content="some random content")
|
|
worker.delete('/channels/delete', channel_id=msg_chan['id']) # finally clean up the channel we created
|