
This is really only for check if a particular channel exists for /messages/recent's backend * Flattening db-lib::Message::last_n SHould make code more readable as it now has a more linear flow * api::Messages::recent_messages now 404's when a channel that doesn't exist is request
245 lines
8.8 KiB
Rust
245 lines
8.8 KiB
Rust
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
use tokio::fs;
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
use mysql_async::params;
|
|
use mysql_async::Pool;
|
|
use mysql_async::prelude::Queryable;
|
|
use mysql_async::Error as SqlError;
|
|
|
|
use crate::Response;
|
|
use crate::{UBigInt, BigInt};
|
|
use crate::{Channel, Message, UserMessage};
|
|
use crate::common;
|
|
|
|
use rand::RngCore;
|
|
|
|
const MAX_MESSAGES: u64 = 1000;
|
|
|
|
|
|
impl Message {
|
|
fn new(id: u64, time: i64, content: &str, content_type: &str, uid: u64, cid: u64) -> Self {
|
|
// Here for convenience pretty straight forward, copying at the last second
|
|
Message {
|
|
id,
|
|
time,
|
|
content: content.into(),
|
|
content_type: content_type.into(),
|
|
author_id: uid,
|
|
channel_id: cid
|
|
}
|
|
}
|
|
|
|
|
|
async fn insert_text(p: &Pool, content: &str, cid: UBigInt, uid: UBigInt) -> Result<Response<Self>, SqlError> {
|
|
// TODO: make this check not compare against something so hardcoded and
|
|
if content.len() > 4_000 {
|
|
return Ok(Response::RestrictedInput("Text larger than 4000 bytes".into()))
|
|
} else {
|
|
let mut conn = p.get_conn().await?;
|
|
let q = "INSERT INTO messages (id, time, content, content_type, author_id, channel_id)
|
|
VALUES(:id, :time, :content, 'text/plain', :uid, :chan_id)";
|
|
|
|
let id: u64 = rand::rngs::OsRng.next_u64();
|
|
let now: i64 = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.expect("System time `NOW` failed")
|
|
.as_millis() as i64;
|
|
let p = params!{
|
|
"id" => id,
|
|
"time" => now,
|
|
"content" => content,
|
|
"uid" => uid,
|
|
"chan_id" => cid
|
|
};
|
|
conn.exec_drop(q, p).await?;
|
|
let msg = Message::new(id, now, content, "text/plain", uid, cid);
|
|
return Ok(Response::Row(msg))
|
|
}
|
|
}
|
|
|
|
async fn save_file(filename: String, data: &[u8], msg: Message) -> Response<Message> {
|
|
match fs::File::create(filename).await {
|
|
Ok(mut file) => {
|
|
if let Ok(_) = file.write_all(data).await {
|
|
Response::Row(msg)
|
|
} else {
|
|
Response::Other(format!("Couldn't write data to disk"))
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let msg = format!("db::Message::save_file {}", e);
|
|
Response::Other(msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn send(p: &Pool, content: &str, content_type: &str, cid: UBigInt, uid: UBigInt) -> Result<Response<Self>, SqlError> {
|
|
//! @returns on_sucess -> Ok(Response::Row<Message>)
|
|
//! @returns on_user_fail -> Ok(Response::RestrictedInput)
|
|
//! @returns on_caught_server_fail -> Ok(Response::Other)
|
|
//! @returns on_failure Err(SqlErr)
|
|
if content_type == "text/plain" {
|
|
match Self::insert_text(p, content, cid, uid).await {
|
|
Ok(pass) => Ok(pass),
|
|
Err(e) => common::try_error_passthrough(e)
|
|
}
|
|
} else {
|
|
if content.len() > 10_000_000 {
|
|
return Ok(Response::RestrictedInput("Large data not allowed".into()))
|
|
}
|
|
let extension = match content_type {
|
|
"image/png" => "png",
|
|
"image/jpeg" | "image/jpg" => "jpg",
|
|
"application/webm" => "webm",
|
|
"application/mp4" => "mp4",
|
|
"application/mp3" => "mp3",
|
|
_ => panic!("Bad file type sent to db layer {}", content_type)
|
|
};
|
|
|
|
let q = "INSERT INTO messages (id, time, content, content_type, author_id, channel_id)
|
|
VALUES(:id, :time, :fname, :ctype, :author, :channel)";
|
|
|
|
let id: u64 = rand::rngs::OsRng.next_u64();
|
|
let filename = format!("{}.{}", id, extension);
|
|
let now: i64 = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.expect("System time `NOW` failed")
|
|
.as_millis() as i64;
|
|
|
|
let mut conn = p.get_conn().await?;
|
|
let insert_res = conn.exec_drop(q, params!{
|
|
"id" => id,
|
|
"time" => now,
|
|
"ctype" => content_type,
|
|
"fname" => &filename,
|
|
"author" => uid,
|
|
"channel" => cid
|
|
}).await;
|
|
match insert_res {
|
|
Ok(_) => {
|
|
let msg = Message::new(id, now, &filename, content_type, uid, cid);
|
|
Ok(Message::save_file(filename, content.as_bytes(), msg).await)
|
|
},
|
|
Err(e) => common::try_error_passthrough(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn get_time_range(p: &Pool, channel_id: UBigInt, start: BigInt, end: BigInt, limit: Option<u64>) -> Result<Response<UserMessage>, SqlError> {
|
|
//! @returns on success : Set(Vec<Messages>)
|
|
//! @returns on userfail: RestrictedInput(message)
|
|
//! @returns on error : Err(SqlError)
|
|
|
|
if start >= end {
|
|
Ok(Response::RestrictedInput("Invalid start/end parameters".into()))
|
|
}
|
|
else {
|
|
let mut conn = p.get_conn().await?;
|
|
let limit = if let Some(limit) = limit {
|
|
match limit {
|
|
1 ..= MAX_MESSAGES => limit,
|
|
_ => MAX_MESSAGES
|
|
}
|
|
} else {
|
|
MAX_MESSAGES
|
|
};
|
|
|
|
let q = " SELECT mem.name, msg.id, msg.time, msg.content, msg.content_type, msg.author_id
|
|
FROM messages as msg
|
|
JOIN members as mem ON mem.id = msg.author_id
|
|
WHERE channel_id = :channel AND time >= :start AND time < :end
|
|
LIMIT :limit";
|
|
|
|
let params = params!{
|
|
"start" => start,
|
|
"end" => end,
|
|
"channel" => channel_id,
|
|
"limit" => limit
|
|
};
|
|
let messages = conn.exec_map(q, params, |(name, id, time, content, content_type, author_id)| {
|
|
UserMessage {
|
|
id,
|
|
time,
|
|
content,
|
|
content_type,
|
|
author_id,
|
|
name,
|
|
channel_id
|
|
}
|
|
}).await?;
|
|
|
|
Ok(Response::Set(messages))
|
|
}
|
|
}
|
|
|
|
pub async fn get_from_id(p: &Pool, channel_id: UBigInt, start: UBigInt, limit: Option<UBigInt>) -> Result<Response<Self>, SqlError> {
|
|
//! @returns on success : Set(Vec<Messages>)
|
|
//! @returns on user failure : RestrictedInput(String)
|
|
//! @returns on failure : Err(SqlError)
|
|
let mut conn = p.get_conn().await?;
|
|
let limit = if let Some(limit) = limit{
|
|
match limit {
|
|
1 ..= MAX_MESSAGES => limit,
|
|
_ => MAX_MESSAGES
|
|
}
|
|
} else {
|
|
MAX_MESSAGES // messages at a time
|
|
};
|
|
|
|
let q = "SELECT id, time, content, content_type, author_id FROM messages WHERE
|
|
channel_id = :channel AND id >= :start LIMIT :limit";
|
|
|
|
let params = params!{
|
|
"channel" => channel_id,
|
|
"start" => start,
|
|
"limit" => limit
|
|
};
|
|
|
|
let messages = conn.exec_map(q, params,
|
|
|(id, time, content, content_type, author_id)| {
|
|
Self {
|
|
id,
|
|
time,
|
|
content,
|
|
content_type,
|
|
author_id,
|
|
channel_id
|
|
}
|
|
}).await?;
|
|
|
|
Ok(Response::Set(messages))
|
|
}
|
|
|
|
/// On success Response::Set<UserMessage>
|
|
/// On user failure Response::RestrictedInput<String>
|
|
/// On error Err<SqlError>
|
|
pub async fn last_n(p: &Pool, limit: i32, channel_id: u64) -> Result<Response<UserMessage>, SqlError> {
|
|
if limit <= 0 || limit > 100 {
|
|
return Ok(Response::RestrictedInput("Invalid \"limit\" value".into()))
|
|
}
|
|
|
|
// Explicitly return empty if the channel requested does not exist
|
|
if let None = Channel::get(p, channel_id).await? {
|
|
return Ok(Response::Empty)
|
|
}
|
|
|
|
// Reminder: if content_type is not text/plain then content will be empty
|
|
let q = " SELECT mem.name, msg.id, msg.time, msg.content, msg.content_type, msg.author_id
|
|
FROM messages as msg
|
|
JOIN members as mem ON mem.id = msg.author_id
|
|
WHERE channel_id = :channel
|
|
ORDER BY id DESC LIMIT :limit";
|
|
let params = params!{"limit" => limit, "channel" => channel_id};
|
|
let mut conn = p.get_conn().await?;
|
|
let messages = conn.exec_map(q, params, |(name, id, time, content, content_type, author_id)| {
|
|
UserMessage {
|
|
name, id, time, content, content_type, author_id, channel_id
|
|
}
|
|
}).await?;
|
|
return Ok(Response::Set(messages))
|
|
}
|
|
}
|
|
|