fix issue with room msg logic; add client keepalive watchdog

This commit is contained in:
David Westgate 2023-11-30 14:11:56 -08:00
parent ac1eecd9e5
commit a5df7865a1
3 changed files with 87 additions and 41 deletions

View File

@ -2,9 +2,9 @@ use prompted::input;
use rust_irc::{clear, codes}; use rust_irc::{clear, codes};
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::TcpStream; use std::net::TcpStream;
use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::{Duration, Instant};
use crate::client;
fn no_param_op(opcode: u8, stream: &mut TcpStream) { fn no_param_op(opcode: u8, stream: &mut TcpStream) {
stream.write(&[opcode]).unwrap(); stream.write(&[opcode]).unwrap();
@ -40,7 +40,7 @@ fn two_param_op(opcode: u8, stream: &mut TcpStream, param0: &str, param1: &str)
stream.write(&out_buf).unwrap(); stream.write(&out_buf).unwrap();
} }
fn read_messages(mut stream: TcpStream, nick: &str) { fn read_messages(mut stream: TcpStream, nick: &str, timestamp: &mut Arc<Mutex<Instant>>) {
let mut buffer: [u8; 1024] = [0; 1024]; let mut buffer: [u8; 1024] = [0; 1024];
loop { loop {
match stream.read(&mut buffer) { match stream.read(&mut buffer) {
@ -49,6 +49,8 @@ fn read_messages(mut stream: TcpStream, nick: &str) {
std::process::exit(0); std::process::exit(0);
} }
Ok(size) => { Ok(size) => {
let mut lock = timestamp.lock().unwrap();
*lock = Instant::now();
let msg_bytes: &[u8] = &buffer[..size]; let msg_bytes: &[u8] = &buffer[..size];
process_message(msg_bytes, nick); process_message(msg_bytes, nick);
} }
@ -60,7 +62,6 @@ fn read_messages(mut stream: TcpStream, nick: &str) {
} }
fn process_message(msg_bytes: &[u8], nick: &str) { fn process_message(msg_bytes: &[u8], nick: &str) {
println!();
match msg_bytes[0] { match msg_bytes[0] {
codes::ERROR => match msg_bytes[1] { codes::ERROR => match msg_bytes[1] {
codes::error::INVALID_ROOM => { codes::error::INVALID_ROOM => {
@ -75,6 +76,9 @@ fn process_message(msg_bytes: &[u8], nick: &str) {
codes::error::NOT_IN_ROOM => { codes::error::NOT_IN_ROOM => {
eprintln!("Cannot send a message before joining room. Use /join [room].") eprintln!("Cannot send a message before joining room. Use /join [room].")
} }
codes::error::EMPTY_ROOM => {
eprintln!("Room is Empty");
}
_ => { _ => {
eprintln!("Error code: {:x?}", msg_bytes[1]); eprintln!("Error code: {:x?}", msg_bytes[1]);
} }
@ -158,23 +162,42 @@ pub fn start() {
} }
// let host: String = input!("Enter the server host: "); // let host: String = input!("Enter the server host: ");
let host: &str = "localhost"; let host: &str = "fab04.cecs.pdx.edu";
if let Ok(mut stream) = TcpStream::connect(host.to_owned() + ":6667") { if let Ok(mut stream) = TcpStream::connect(host.to_owned() + ":6667") {
println!("Connected to {}", host); println!("Connected to {}", host);
//another stream for reading messages //another stream for reading messages
let stream_clone: TcpStream = stream.try_clone().expect("Failed to clone stream"); let reader_clone: TcpStream = stream.try_clone().expect("Failed to clone stream");
let mut keepalive_clone = stream.try_clone().expect("failed to clone stream");
let nick_clone: String = nick.clone(); let nick_clone: String = nick.clone();
//timestamp for detecting unresponsive server
let timestamp = Arc::new(Mutex::new(Instant::now()));
let mut timestamp_clone: Arc<Mutex<Instant>> = Arc::clone(&timestamp);
thread::spawn(move || { thread::spawn(move || {
read_messages(stream_clone, &nick_clone); read_messages(reader_clone, &nick_clone, &mut timestamp_clone);
});
//watchdog to send keep_alive and stop client if server fails to respond
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(5));
let lock = timestamp.lock().unwrap();
let now = Instant::now();
if now.duration_since(*lock) > Duration::from_secs(30) {
eprintln!("Server is unresponsive. Stopping client");
std::process::exit(1);
} else if now.duration_since(*lock) > Duration::from_secs(5) {
keepalive_clone.write_all(&[codes::KEEP_ALIVE]).unwrap();
}
}); });
//try to register the nickname //try to register the nickname
one_param_op(codes::client::REGISTER_NICK, &mut stream, &nick); one_param_op(codes::client::REGISTER_NICK, &mut stream, &nick);
loop { loop {
let mut inp = String::new(); let inp;
if active_room.is_empty() { if active_room.is_empty() {
inp = input!(""); inp = input!("");
} else { } else {

View File

@ -26,6 +26,7 @@ pub mod codes {
pub const MALFORMED: u8 = 0x15; pub const MALFORMED: u8 = 0x15;
pub const ALREADY_IN_ROOM: u8 = 0x16; pub const ALREADY_IN_ROOM: u8 = 0x16;
pub const NOT_IN_ROOM: u8 = 0x17; pub const NOT_IN_ROOM: u8 = 0x17;
pub const EMPTY_ROOM: u8 = 0x8;
} }
} }

View File

@ -60,25 +60,57 @@ fn message(room: &str, msg: &str, sender: &str, server: &Arc<Mutex<Server>>) {
let mut guard: std::sync::MutexGuard<'_, Server> = server.lock().unwrap(); let mut guard: std::sync::MutexGuard<'_, Server> = server.lock().unwrap();
let server: &mut Server = guard.deref_mut(); let server: &mut Server = guard.deref_mut();
//1: Make sure specified rooms exists, ifn error
//2: Make sure sender is a member of the room, ifn error
//3: Message all non-sender users in the room the message, ifnone error empty room
//4: Message the sender RESPONSE_OK
let room_users: Option<&Vec<String>> = server.rooms.get(room); let room_users: Option<&Vec<String>> = server.rooms.get(room);
let mut sender_stream = server
.users
.get_mut(sender)
.unwrap()
.try_clone()
.expect("Clone issue");
//1
match room_users { match room_users {
Some(users) => { Some(users) => {
//2
let mut is_member = false;
for user in users { for user in users {
let stream: Option<&mut TcpStream> = server.users.get_mut(user); if user.eq(sender) {
match stream { is_member = true;
Some(str) => { }
str.write_all(&out_buf).unwrap(); }
}
None => { if is_member {
//TODO send error msg to sender for user in users {
eprintln!("Error: Invalid message from client"); if user.eq(sender) {
//4
sender_stream.write_all(&[codes::RESPONSE_OK]).unwrap();
} else {
//3
let recipient_stream: Option<&mut TcpStream> = server.users.get_mut(user);
match recipient_stream {
Some(str) => {
println!("Sending msg {:?}", out_buf.to_ascii_lowercase());
str.write_all(&out_buf).unwrap();
}
None => {
eprintln!("Server error: could not find user");
}
}
} }
} }
} else {
sender_stream
.write_all(&[codes::ERROR, codes::error::NOT_IN_ROOM])
.unwrap();
} }
} }
None => { None => {
//TODO send error msg to sender sender_stream
eprintln!("Error: Invalid message from client"); .write_all(&[codes::ERROR, codes::error::EMPTY_ROOM])
.unwrap();
} }
} }
} }
@ -184,36 +216,21 @@ fn handle_client(
codes::client::MESSAGE => { codes::client::MESSAGE => {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
println!("MESSAGE"); println!("MESSAGE");
stream.write_all(&[codes::RESPONSE_OK]).unwrap();
}
codes::KEEP_ALIVE => {
println!("kEEP alive");
stream.write_all(&[codes::RESPONSE_OK]).unwrap();
} }
codes::client::MESSAGE_ROOM => { codes::client::MESSAGE_ROOM => {
let p: String = String::from_utf8_lossy(param_bytes).to_string(); let p: String = String::from_utf8_lossy(param_bytes).to_string();
let params: Option<(&str, &str)> = p.split_once(" "); let params: Option<(&str, &str)> = p.split_once(" ");
match params { match params {
Some((room, msg)) => { Some((room, msg)) => {
let unlocked_server: std::sync::MutexGuard<'_, Server> = server.lock().unwrap(); message(room, msg, nickname, server);
let users_in_room: Option<&Vec<String>> = unlocked_server.rooms.get(room);
match users_in_room {
Some(users) => {
let is_user_in_room: Option<&String> =
users.iter().find(|&u| u.eq(nickname));
match is_user_in_room {
Some(_) => {
message(room, msg, nickname, server);
}
None => {
stream
.write_all(&[codes::ERROR, codes::error::NOT_IN_ROOM])
.unwrap();
}
}
}
None => {
stream
.write_all(&[codes::ERROR, codes::error::INVALID_ROOM])
.unwrap();
}
}
} }
_ => { _ => {
stream stream
@ -402,6 +419,7 @@ pub fn start() {
println!("1: list connected users"); println!("1: list connected users");
println!("2: list rooms"); println!("2: list rooms");
println!("3: Broadcast message to all"); println!("3: Broadcast message to all");
println!("4: Freeze server via double lock (for testing)");
let inp: String = input!(":"); let inp: String = input!(":");
match inp.parse::<u8>() { match inp.parse::<u8>() {
Ok(num) => match num { Ok(num) => match num {
@ -416,6 +434,10 @@ pub fn start() {
let inp2 = input!("Enter message: "); let inp2 = input!("Enter message: ");
broadcast(codes::client::MESSAGE, &server, &inp2); broadcast(codes::client::MESSAGE, &server, &inp2);
} }
4 => {
let s1 = server.lock().unwrap();
let s2 = server.lock().unwrap();
}
_ => println!("Invalid Input"), _ => println!("Invalid Input"),
}, },
Err(_) => { Err(_) => {