Base version of the rtc server

Authorization is required for implementation
So is a proper messaging model to make implementation clean and scalable
For now this serves as a good starting point for the rest of this project
This commit is contained in:
shockrah
2021-03-22 02:45:54 -07:00
parent 3ee22ff932
commit 185b64d9db
10 changed files with 869 additions and 784 deletions

View File

@@ -0,0 +1,6 @@
// The concept of channels here is a bit different from
struct VoiceCollection {
}
struct TextCollection {
}

105
rtc-server/src/main.rs Normal file
View File

@@ -0,0 +1,105 @@
use std::sync::{Arc, Mutex};
use std::net::SocketAddr;
use std::collections::HashMap;
use futures::StreamExt;
use futures::channel::mpsc::UnboundedSender;
use futures::channel::mpsc::unbounded;
use futures::future;
use futures::pin_mut;
use futures::stream::TryStreamExt;
use tokio_tungstenite as tokio_ws;
use tokio_ws::tungstenite::Message;
use tokio_ws::tungstenite::handshake::server::ErrorResponse;
use tokio_ws::tungstenite::http::{Response, Request};
use tokio::net::{TcpListener, TcpStream};
use clap::{Arg, App};
type Tx = UnboundedSender<Message>;
type Peers = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let matches = App::new("Freechat RTC Websocket Server")
.version("1.0")
.author("shockrah")
.about("Enables RTC communication to authorized client applications")
.arg(Arg::with_name("port")
.short("p")
.long("port")
.value_name("PORT")
.takes_value(true))
.get_matches();
let addr = match matches.value_of("port") {
Some(pval) => {
let port = pval.parse::<u16>().unwrap();
format!("127.0.0.1:{}", port)
},
None => format!("127.0.0.1:5648")
};
let socket = TcpListener::bind(&addr).await?;
println!("[INFO] Listening on {}", addr);
let peers = Peers::new(Mutex::new(HashMap::new()));
while let Ok((stream, _)) = socket.accept().await {
tokio::spawn(handle_connections(stream, peers.clone()));
}
Ok(())
}
fn header_validation(request: &Request<()>, response: Response<()>) -> Result<Response<()>, ErrorResponse> {
// validate that the required headers are presetn
// Required headers: Subscribe-Channel & Jwt-Token
let valid_channels = ["/text", "/voice"];
let path = request.uri();
for (hdr, val) in request.headers().iter() {
println!("{:?} -> {:?}", hdr, val);
}
Ok(response)
}
async fn handle_connections(stream: TcpStream, peers: Peers) {
let addr = stream.peer_addr().expect("[ERROR] Peer address not found");
// NOTE: this call underneath actually blocks which blows but it will do for now
// NOTE: find some kind of way of doing async callbacks here so that we can scale
let ws_stream = tokio_ws::accept_hdr_async(stream, header_validation)
.await.expect("[ERROR] Could not finish handshake");
println!("[INFO] New websocket connection: {}", addr.ip());
let (tx, rx) = unbounded(); // split the peer's write and read streams
peers.lock().unwrap().insert(addr, tx);
let (write, read) = ws_stream.split();
let broadcast_incoming = read.try_for_each(|msg| {
println!("Got message from {}: {}", addr, msg.to_text().unwrap());
// hold a ref to the peers map so that we can iterate through them
// OPTI NOTE: because this runs next to the REST API it makes sense to avoid
// doing anything if the user-level peers try do anything but listen
let peers = peers.lock().unwrap();
let recipients = peers
.iter().filter(|(p_addr, _)| p_addr != &&addr ) // avoid echo back to sender
.map(|(_, ws_sink)| ws_sink);
for rec in recipients {
rec.unbounded_send(msg.clone()).expect("[WARN] Unable to send message");
}
future::ok(())
});
// magic
let forward = rx.map(Ok).forward(write);
pin_mut!(broadcast_incoming, forward);
future::select(broadcast_incoming, forward).await;
println!("{} dc'd", &addr);
peers.lock().unwrap().remove(&addr);
}