diff --git a/rust/ble_module/src/ble/ble_service.rs b/rust/ble_module/src/ble/ble_service.rs index 0733419a1..4b2366d4b 100644 --- a/rust/ble_module/src/ble/ble_service.rs +++ b/rust/ble_module/src/ble/ble_service.rs @@ -1,12 +1,10 @@ use super::super::BleRpc; use super::utils::find_device_by_mac; -use crate::ble::ble_uuids::{main_service_uuid, msg_char, msg_service_uuid, read_char}; -// use crate::rpc::SysRpcReceiver; use crate::{ + ble::ble_uuids::{main_service_uuid, msg_char, read_char}, ble::utils, rpc::{process_received_message, proto_sys::ble::Message::*, proto_sys::*, utils::*}, }; -// use async_std::task; use async_std::{channel::Sender, prelude::*, task::JoinHandle}; use bluer::{ adv::{Advertisement, AdvertisementHandle}, @@ -17,9 +15,6 @@ use bytes::Bytes; use futures::FutureExt; use futures_concurrency::stream::Merge; use lazy_static::lazy_static; -// use serde_json::{self, de}; -// use std::cmp::max_by_key; -// use std::string; use std::{ cell::RefCell, collections::{HashMap, HashSet, VecDeque}, error::Error, sync::Mutex, @@ -31,7 +26,6 @@ lazy_static! { Mutex::new(HashMap::new()); } - pub enum QaulBleService { Idle(IdleBleService), Started(StartedBleService), @@ -56,13 +50,7 @@ pub struct IdleBleService { } enum BleMainLoopEvent { - // Stop, - // MessageReceived((String, Address)), - // MainCharEvent(CharacteristicControlEvent), - // MsgCharEvent(CharacteristicControlEvent), DeviceDiscovered(Device), - // SendMessage((Vec, Vec, Vec, Vec)), - // RpcEvent(Option), RpcEvent(Vec), } @@ -105,9 +93,6 @@ impl IdleBleService { advertisement_type: bluer::adv::Type::Peripheral, service_uuids: vec![ main_service_uuid(), - // msg_service_uuid(), - // msg_char(), - // read_char(), ] .into_iter() .collect(), @@ -124,26 +109,28 @@ impl IdleBleService { log::error!("{:#?}", err); return QaulBleService::Idle(self); } - }; + }; + + // ================================================================================== + // ------------------------- SET UP APPLICATION ------------------------------------- + // ================================================================================== + + let (_, main_service_handle) = service_control(); + let (_, main_chara_handle) = characteristic_control(); + let (msg_chara_ctrl, msg_chara_handle) = characteristic_control(); + let (cmd_tx, cmd_rx) = async_std::channel::bounded::(8); let (adp_send, adp_recv) = async_std::channel::unbounded::(); + match adp_send.try_send(self.adapter.clone()) { Ok(_) => { - log::debug!("Sent adapter to channel"); + log::debug!("Adapter sent to channel"); } Err(err) => { log::error!("{:#?}", err); } - } - - // ================================================================================== - // ------------------------- SET UP APPLICATION ------------------------------------- - // ================================================================================== + } - let (_, main_service_handle) = service_control(); - let (_, main_chara_handle) = characteristic_control(); - // let (_, msg_service_handle) = service_control(); - let (msg_chara_ctrl, msg_chara_handle) = characteristic_control(); let msg_characterstic = Characteristic { uuid: msg_char(), write: Some(CharacteristicWrite { @@ -155,6 +142,7 @@ impl IdleBleService { control_handle: msg_chara_handle, ..Default::default() }; + let adp = self.adapter.clone(); let cmd_tx2 = cmd_tx.clone(); let main_characterstic = Characteristic { @@ -166,6 +154,10 @@ impl IdleBleService { "Read request received from device: {:?}", &(req.device_address) ); + + // Below snippet checks for device presence in ignore list(discovered devices lsit) + // If device is present, it updates the last found time. + // Else, It triggers an event to try to discover and connect to device. match utils::find_ignore_device_by_mac(req.device_address) { Some(_) => { // utils::update_last_found(req.device_address); @@ -178,7 +170,6 @@ impl IdleBleService { Ok(device) => { match cmd_tx2.send(BleMainLoopEvent::DeviceDiscovered(device.clone())).await { Ok(_) => { - // Add a null ble_device to stop event from being sent again // Will be update in func: on_device_discovered. let ble_device = utils::BleScanDevice { @@ -186,7 +177,7 @@ impl IdleBleService { rssi: 0, mac_address: device.address(), name: device.name().await.unwrap().unwrap_or_default(), - device: device, + device, last_found_time: utils::current_time_millis(), is_connected: false, }; @@ -197,7 +188,6 @@ impl IdleBleService { log::error!("Error sending device discovered event: {:#?}", err); } }; - // self.on_device_discovered( &device, &mut internal_sender); }, Err(e) => { log::error!("Error: {:#?}", e); @@ -214,6 +204,7 @@ impl IdleBleService { control_handle: main_chara_handle, ..Default::default() }; + let main_service = Service { uuid: main_service_uuid(), primary: true, @@ -222,24 +213,6 @@ impl IdleBleService { ..Default::default() }; - // let msg_service = Service { - // uuid: msg_service_uuid(), - // primary: true, - // characteristics: vec![Characteristic { - // uuid: msg_char(), - // write: Some(CharacteristicWrite { - // write: true, - // write_without_response: true, - // method: CharacteristicWriteMethod::Io, - // ..Default::default() - // }), - // control_handle: msg_chara_handle, - // ..Default::default() - // }], - // control_handle: msg_service_handle, - // ..Default::default() - // }; - let app = Application { services: vec![main_service], ..Default::default() @@ -253,21 +226,18 @@ impl IdleBleService { } }; - // let cmd_sender = cmd_tx.clone(); + // ================================================================================== + // ------------------------- MAIN BLE LOOP ------------------------------------------ + // ================================================================================== + let join_handle = async_std::task::Builder::new() .name("main-ble-loop".into()) .local(async move { - log::info!("Starting BLE main loop..."); - - let adapter: Adapter; - match adp_recv.recv().await { - Ok(adp) => { - adapter = adp; - } - Err(_) => { - adapter = self.adapter.clone(); - } - }; + + let adapter: Adapter = adp_recv.recv().await.unwrap_or_else(|err| { + log::error!("{:#?}", err); + self.adapter.clone() + }); // Set up discovery filter and start streaming the discovered devices adn out of range checker. let _ = adapter.set_discovery_filter(get_filter()).await; @@ -286,28 +256,6 @@ impl IdleBleService { Err(_) => None, } }, - - // match utils::find_device_by_mac(addr) { - // Some(_) => { - // log::warn!("Device mac_addres {:?}", addr); - // utils::update_last_found(addr); - // // None - // let device = self.adapter.device(addr)?; - // Some(BleMainLoopEvent::DeviceDiscovered(device)) - // } - // None => { - // log::warn!("Device mac_addres {:?}", addr); - // if self.device_block_list.contains(&addr) { - // return None; - // } - // match adapter.device(addr) { - // Ok(device) => { - // Some(BleMainLoopEvent::DeviceDiscovered(device)) - // } - // Err(_) => None, - // } - // } - // }, AdapterEvent::DeviceRemoved(addr) => { utils::find_device_by_mac(addr).map(|device| { device_result_sender.send_device_unavailable( @@ -327,131 +275,26 @@ impl IdleBleService { }; + let _ = self + .spawn_msg_listener(internal_sender.clone(), msg_chara_ctrl) + .await; + // TODO: Setup out of range checker. + // utils::out_of_range_checker(adapter.clone(), internal_sender.clone()); - // ================================================================================== - // --------------------------------- MAIN BLE LOOP ---------------------------------- - // ================================================================================== - // let (ble_msg_sender, ble_msg_reciever) = async_std::channel::unbounded::<(Address, Vec)>(); - // let (message_tx, message_rx) = async_std::channel::bounded::(32); - // let main_evt_stream = main_chara_ctrl.map(BleMainLoopEvent::MainCharEvent); - // let msg_evt_stream = msg_chara_ctrl.map(BleMainLoopEvent::MsgCharEvent); let rpc_reciever_stream = rpc_receiver.receiver.map(BleMainLoopEvent::RpcEvent); - // .map(BleMainLoopEvent::RpcEvent); - let mut merged_ble_streams = ( cmd_rx, - // main_evt_stream, - // msg_evt_stream, device_stream, - // message_rx, rpc_reciever_stream, ) .merge(); - - let _ = self - .spawn_msg_listener(internal_sender.clone(), msg_chara_ctrl) - .await; - // utils::out_of_range_checker(adapter.clone(), internal_sender.clone()); - 'outer: loop { - log::warn!("Waiting for event... "); match merged_ble_streams.next().await { Some(evt) => { - log::info!("Received event:"); + log::debug!("Received event:"); match evt { - // BleMainLoopEvent::Stop => { - // log::info!( - // "Received stop signal, stopping advertising, scanning, and listening." - // ); - // break; - // } - // BleMainLoopEvent::SendMessage(( - // message_id, - // mut receiver_id, - // sender_id, - // data, - // // )) => match utils::bytes_to_str(&message_id) { - // )) => { - // match std::str::from_utf8(&message_id) { - // Ok(msg_id) => { - - - - // let msg_id = String::from_utf8_lossy(&message_id); - // log::info!("Sending message with ID: {:?}", &msg_id); - // match self - // .send_direct_message( - // msg_id.to_string(), - // &mut receiver_id, - // sender_id, - // data, - // ) - // .await - // { - // Ok(_) => { - // internal_sender.send_direct_send_success(receiver_id); - // } - // Err(err) => { - // internal_sender.send_direct_send_error(receiver_id, err.to_string()); - // log::error!("Error sending direct BLE message: {:#?}", err) - // } - // } - - - // } - // Err(err) => { - // log::error!("Message Id {:#?}", message_id); - // log::error!("Error converting message ID to string: {:#?}", err); - // } - // } - // }, - // BleMainLoopEvent::MessageReceived(_e) => { - // let json_message = String::from_utf8_lossy(&e.0.as_bytes()); - // log::error!("Received messages: {:?} ", json_message); - // let msg_object: Message = serde_json::from_str(&json_message).unwrap(); - // let message = msg_object.message.unwrap(); - // log::info!( - // "Received {} bytes of data from {}", - // message.len(), - // utils::mac_to_string(&e.1) - // ); - // internal_sender.send_direct_received(e.1.0.to_vec(), message) - // } - // BleMainLoopEvent::MainCharEvent(_e) => { - // TODO: should main character events be sent to the UI? - // } - // BleMainLoopEvent::MsgCharEvent(_e) => { - // CharacteristicControlEvent::Write(write) => { - // log::info!("Accepting write event with MTU {} from {}", write.mtu(), write.device_address()); - // let mac_address = write.device_address(); - // let mut read_buf = vec![0; 20]; - // if let Ok(mut reader) = write.accept() { - // let k = reader.read(&mut read_buf).await; - // match k { - // Ok(0) => { - // log::info!("Write stream from device {:?} has ended", &mac_address); - // continue; - // } - // Ok(n) => { - // log::error!("req {:?} = {:?}", &read_buf, &n); - // // let message_tx = message_tx.clone(); - // let _ = ble_msg_sender.send((mac_address, read_buf)).await; - // }, - // Err(err) => { - // log::error!("Write stream error: {}", &err); - // } - // } - // // let _ = self - // // .spawn_msg_listener(Some(reader), message_tx, mac_address) - // // .await; - // } else { - // log::error!("Error accepting write request"); - // } - // } - // CharacteristicControlEvent::Notify(_) => (), - // }, BleMainLoopEvent::DeviceDiscovered(device) => { match self .on_device_discovered(&device, &mut internal_sender) @@ -479,7 +322,7 @@ impl IdleBleService { break; }, Some(msg) => { - log::info!("Received rpc event: "); + log::debug!("Received rpc event: "); if msg.message.is_none() { continue; } @@ -495,68 +338,37 @@ impl IdleBleService { break 'outer; }, DirectSend(mut req) => { - // QaulBleService::Started(ref mut svc) => { - log::info!("Received Direct Send Request: "); - // let receiver_id = req.receiver_id.clone(); - // match svc.direct_send(req).await { - // Ok(_) => local_sender_handle.send_direct_send_success(receiver_id), - // Err(err) => local_sender_handle - // .send_direct_send_error(receiver_i d, err.to_string()), - // } - - let msg_id = String::from_utf8_lossy(&req.message_id); - log::info!("Sending message with ID: {:?}", &msg_id); - match self - .send_direct_message( - msg_id.to_string(), - &mut req.receiver_id, - req.sender_id, - req.data, - ) - .await - { - Ok(_) => { - internal_sender.send_direct_send_success(req.receiver_id); - } - Err(err) => { - log::error!("Error sending direct BLE message: {:#?}", err); - internal_sender.send_direct_send_error(req.receiver_id, err.to_string()); - } + log::info!("Received Direct Send Request: "); + let msg_id = String::from_utf8_lossy(&req.message_id); + log::info!("Sending message with ID: {:?}", &msg_id); + match self + .send_direct_message( + msg_id.to_string(), + &mut req.receiver_id, + req.sender_id, + req.data, + ) + .await + { + Ok(_) => { + internal_sender.send_direct_send_success(req.receiver_id); } - // match cmd_sender - // .send(BleMainLoopEvent::SendMessage(( - // req.message_id, - // req.receiver_id.clone(), - // req.sender_id, - // req.data, - // ))) - // .await { - // Ok(()) => { - // log::info!("Message sent to main loop"); - // } - // Err(err) => { - // log::error!("Failed to send message to main loop: {:#?}", &err); - // internal_sender.send_direct_send_error(req.receiver_id, err.to_string()); - // } - // } - // } - // QaulBleService::Idle(_) => { - // log::info!("Received Direct Send Request, but bluetooth service is not running!"); - // local_sender_handle.send_result_not_running() - // } + Err(err) => { + log::error!("Error sending direct BLE message: {:#?}", err); + internal_sender.send_direct_send_error(req.receiver_id, err.to_string()); + } + } }, InfoRequest(_) => { let mut sender_handle_clone = internal_sender.clone(); - // spawn(async move { - match get_device_info().await { - Ok(info) => { - sender_handle_clone.send_ble_sys_msg(InfoResponse(info)) - } - Err(err) => { - log::error!("Error getting device info: {:#?}", &err) - } + match get_device_info().await { + Ok(info) => { + sender_handle_clone.send_ble_sys_msg(InfoResponse(info)) + } + Err(err) => { + log::error!("Error getting device info: {:#?}", &err) } - // }); + } } _ => { log::info!("Received unknown rpc event"); @@ -579,14 +391,13 @@ impl IdleBleService { }) .expect("Unable to spawn BLE main loop!"); - log::info!("BLE main loop start completed successfully!"); QaulBleService::Started(StartedBleService { join_handle: Some(join_handle), cmd_handle: cmd_tx, }) } - /// Handles device discovery and validates the qaul uuids. + /// Handles device discovery and validates the Qaul Services and Characterstic. async fn on_device_discovered( &self, device: &Device, @@ -605,6 +416,7 @@ impl IdleBleService { &stringified_addr, &device_name, ); + let mut retries = 1; if !device.is_connected().await? { log::warn!("Device not connected. Trying to connect..."); @@ -623,6 +435,7 @@ impl IdleBleService { } } } + let services_discovered = device.services().await?; for service in services_discovered { if read_char_uuid_found { @@ -642,12 +455,14 @@ impl IdleBleService { let remote_char_uuid = char.uuid().await?; log::info!( - "Characteristic flags for device {} are: {:?} and char {}", + "Characteristic flags for device {} are: {:?} and service {} and char {}", &stringified_addr, &flags.read, + &service_uuid, &remote_char_uuid, ); if flags.notify || flags.indicate { + // Was present in Kotlin code but never used. msg_receivers.push(char.notify_io().await?); log::info!( "Setting up notification for characteristic {} of device {}", @@ -668,7 +483,6 @@ impl IdleBleService { utils::add_ignore_device(ble_device.clone()); utils::add_device(ble_device); read_char_uuid_found = true; - log::info!( "Read characteristic found for device {} with qaul ID {:?}", &stringified_addr, @@ -712,108 +526,91 @@ impl IdleBleService { log::info!("Write stream from device {:?} has ended", &mac_address); continue; } - let mut hex_msg = utils::bytes_to_hex(&buffer); - if hex_msg.contains("24") { - // Remove trailing zeros - let trimmed = hex_msg.trim_end_matches('0'); - hex_msg = trimmed.to_string(); - } - log::info!("Received message: {:?} from {:?}", &hex_msg, &mac_address); - let stringified_addr = utils::mac_to_string(&mac_address); - - match utils::find_msg_map_by_mac(stringified_addr.clone()) { - Some(mut old_value) => { - if hex_msg.ends_with("2424") - || (old_value.ends_with("24") && hex_msg == "24") - { - old_value = old_value + &hex_msg; - let trim_old_value = &old_value[4..old_value.len() - 4]; - log::error!("Received message: {:?} ", trim_old_value); - if !trim_old_value.contains("2424") { - hex_msg = trim_old_value.to_string(); - - utils::message_received((hex_msg, mac_address), internal_sender.clone()); - utils::remove_msg_map_by_mac(stringified_addr); - } - } else { - old_value += &hex_msg; - utils::add_msg_map(stringified_addr, old_value); + let mut hex_msg = utils::bytes_to_hex(&buffer); + if hex_msg.contains("24") { + // Remove trailing zeros + let trimmed = hex_msg.trim_end_matches('0'); + hex_msg = trimmed.to_string(); + } + log::info!("Received message: {:?} from {:?}", &hex_msg, &mac_address); + let stringified_addr = utils::mac_to_string(&mac_address); + + match utils::find_msg_map_by_mac(stringified_addr.clone()) { + Some(mut old_value) => { + if hex_msg.ends_with("2424") + || (old_value.ends_with("24") && hex_msg == "24") + { + old_value = old_value + &hex_msg; + let trim_old_value = &old_value[4..old_value.len() - 4]; + log::info!("Received message: {:?} ", trim_old_value); + if !trim_old_value.contains("2424") { + hex_msg = trim_old_value.to_string(); + + utils::message_received((hex_msg, mac_address), internal_sender.clone()); + utils::remove_msg_map_by_mac(stringified_addr); } + } else { + old_value += &hex_msg; + utils::add_msg_map(stringified_addr, old_value); } - None => { - if hex_msg.starts_with("2424") && hex_msg.ends_with("2424") { - let trim_hex_msg = &hex_msg[4..&hex_msg.len() - 4]; - if !trim_hex_msg.contains("2424") { - hex_msg = trim_hex_msg.to_string(); - - // log::error!("Received message: {:?} ", hex_msg); - utils::message_received((hex_msg, mac_address), internal_sender.clone()); - // let _ = message_tx - // .send(BleMainLoopEvent::MessageReceived((hex_msg, mac_address))) - // .await - // .map_err(|err| log::error!("{:#?}", err)); - } - } else if hex_msg.starts_with("2424") { - utils::add_msg_map(stringified_addr, hex_msg.clone()); - } else { - // Error handling + } + None => { + if hex_msg.starts_with("2424") && hex_msg.ends_with("2424") { + let trim_hex_msg = &hex_msg[4..&hex_msg.len() - 4]; + if !trim_hex_msg.contains("2424") { + hex_msg = trim_hex_msg.to_string(); + utils::message_received((hex_msg, mac_address), internal_sender.clone()); } + } else if hex_msg.starts_with("2424") { + utils::add_msg_map(stringified_addr, hex_msg.clone()); + } else { + // Error handling } } + } } } }); async_std::task::spawn(async move { - loop { - match msg_chara_ctrl.next().await { - Some(CharacteristicControlEvent::Write(write)) => { - let mut device_known: bool = true; - // log::info!("Accepting write event with MTU {} from {}", write.mtu(), write.device_address()); - let mac_address = write.device_address(); - match find_device_by_mac(mac_address) { - Some(_) => { - // utils::update_last_found(mac_address); - } - None => { - device_known = false; - log::warn!("Device not found in known devices"); - // match self.adapter.device(mac_address) { - // Ok(device) => { - // self. - // self.on_device_discovered(&device, &mut internal_sender.clone()).await.unwrap(); - // }, - // Err(_) => { - // // log::error!("Error:"); - // } - // }; - - } - } - let mut read_buf = vec![0; 20 ]; - if let Ok(mut reader) = write.accept() { - let k = reader.read(&mut read_buf).await; - match k { - Ok(0) => { - log::debug!("Write stream from device {:?} has ended", &mac_address); - continue; + loop { + match msg_chara_ctrl.next().await { + Some(CharacteristicControlEvent::Write(write)) => { + let mut device_known: bool = true; + let mac_address = write.device_address(); + match find_device_by_mac(mac_address) { + Some(_) => { + // utils::update_last_found(mac_address); } - Ok(_) => { - if device_known { - let _ = ble_msg_sender.send((mac_address, read_buf)).await; - } - }, - Err(err) => { - log::error!("Write stream error: {}", &err); + None => { + device_known = false; + log::warn!("Device not found in known devices"); } - } - } else { - log::error!("Error accepting write request"); + } + let mut read_buf = vec![0; 20]; + if let Ok(mut reader) = write.accept() { + let k = reader.read(&mut read_buf).await; + match k { + Ok(0) => { + log::debug!("Write stream from device {:?} has ended", &mac_address); + continue; + } + Ok(_) => { + if device_known { + let _ = ble_msg_sender.send((mac_address, read_buf)).await; + } + }, + Err(err) => { + log::error!("Write stream error: {}", &err); + } + } + } else { + log::error!("Error accepting write request"); + } } + _ => continue, } - _ => continue, } - } }); } @@ -832,175 +629,124 @@ impl IdleBleService { .ok_or("Could not find a device address for the given qaul ID!")?; let stringified_addr = utils::mac_to_string(mac_address); - // match utils::find_ignore_device_by_mac(mac_address.clone()) { - // Some(_) => { - // let device = ble_device.device.clone(); - let device = match self.adapter.device(*mac_address) { - Ok(device) => device, - Err(err) => { - log::error!("Error:: {:#?}", err); - return Err("Error getting device".into()); - } + let device = match self.adapter.device(*mac_address) { + Ok(device) => device, + Err(err) => { + log::error!("Error:: {:#?}", err); + return Err("Error getting device".into()); + } + }; + let mut hash_map = HASH_MAP.lock().unwrap(); + match hash_map.get(&stringified_addr) { + Some(queue) => { + let mut queue = queue.clone(); + if queue.len() < 2 { + queue.push_back((message_id, sender_id, data.clone())); + } else { + queue.clear(); + } + hash_map.insert(stringified_addr.clone(), queue); + } + None => { + let mut queue: VecDeque<(String, Vec, Vec)> = VecDeque::new(); + queue.push_back((message_id, sender_id, data.clone())); + hash_map.insert(stringified_addr.clone(), queue); + } + } + + // Messages from queue are read and broken down into packets of 40 bytes and streamed to the remote device. + let extracted_queue = hash_map.get(&stringified_addr).clone(); + let mut message_id: String = "".to_string(); + let mut send_queue: VecDeque = VecDeque::new(); + if let Some(queue) = extracted_queue { + let mut queue = queue.clone(); + if !queue.is_empty() { + let data = queue.pop_front().unwrap(); + message_id = data.0; + let i8_qaul_id: Vec = data.1.into_iter().map(|x| x as i8).collect(); + let i8_message: Vec = data.2.into_iter().map(|x| x as i8).collect(); + let msg = utils::Message { + qaul_id: Some(i8_qaul_id), + message: Some(i8_message), }; - // if(ble_device.device device) { - // log::info!("Device found in ignore list"); - // } - // let mainQueue : HashMap, Vec)>> = HashMap::new(); - let mut hash_map = HASH_MAP.lock().unwrap(); - match hash_map.get(&stringified_addr) { - Some(queue) => { - let mut queue = queue.clone(); - if queue.len() < 2 { - queue.push_back((message_id, sender_id, data.clone())); - } else { - queue.clear(); - } - hash_map.insert(stringified_addr.clone(), queue); - } - None => { - let mut queue: VecDeque<(String, Vec, Vec)> = VecDeque::new(); - queue.push_back((message_id, sender_id, data.clone())); - hash_map.insert(stringified_addr.clone(), queue); - } + let json_str = serde_json::to_string(&msg).unwrap(); + let bt_array = json_str.as_bytes(); + let delimiter = vec![0x24, 0x24]; + let temp = [delimiter.clone(), bt_array.to_vec(), delimiter].concat(); + let mut final_data = utils::bytes_to_hex(&temp); + + while final_data.len() > 40 { + send_queue.push_back(final_data[..40].to_string()); + final_data = final_data[40..].to_string(); } - log::info!("Message added to queue -- > {:?}", &data); - let extracted_queue = hash_map.get(&stringified_addr).clone(); - let mut message_id: String = "".to_string(); - let mut send_queue: VecDeque = VecDeque::new(); - if let Some(queue) = extracted_queue { - let mut queue = queue.clone(); - if !queue.is_empty() { - // log::error!("Here am I"); - let data = queue.pop_front().unwrap(); - message_id = data.0; - let i8_qaul_id: Vec = data.1.into_iter().map(|x| x as i8).collect(); - let i8_message: Vec = data.2.into_iter().map(|x| x as i8).collect(); - let msg = utils::Message { - qaul_id: Some(i8_qaul_id), - message: Some(i8_message), - }; - let json_str = serde_json::to_string(&msg).unwrap(); - // log::error!("json_str = {:?}", json_str); - let bt_array = json_str.as_bytes(); - let delimiter = vec![0x24, 0x24]; - let temp = [delimiter.clone(), bt_array.to_vec(), delimiter].concat(); - let mut final_data = utils::bytes_to_hex(&temp); + if !final_data.is_empty() { + send_queue.push_back(final_data); + } + } + } - while final_data.len() > 40 { - send_queue.push_back(final_data[..40].to_string()); - final_data = final_data[40..].to_string(); - } - if !final_data.is_empty() { - send_queue.push_back(final_data); - } - // log::error!("queue length {}", send_queue.len()); - } + // Connect to device and write into characterstic of other device + if !device.is_connected().await? { + match device.connect().await { + Ok(()) => { + log::error!("Device connected"); } - log::error!("{:}", device.is_connected().await?); - if !device.is_connected().await? { - // let mut retries = 1; - // loop { - match device.connect().await { - Ok(()) => { - log::error!("Device connected"); - // break; - } - Err(err) => { - // if retries > 0 { - log::error!(" Connect error: {}", &err); - return Err("Connection error".into()); - // retries -= 1; - // } else { - // self.adapter.remove_device(device.address()).await?; - // return Err("Connection retries timeout.".into()); - // } - } - // } - } + Err(err) => { + log::error!(" Connect error: {}", &err); + return Err("Connection error".into()); } - log::info!("Connected to device {}", &stringified_addr); - let mut read_char_found = false; - for service in device.services().await? { - if read_char_found { - break; - } - log::info!("Service UUID: {:?}", service.uuid().await?); - if service.uuid().await? == main_service_uuid() { - for chara in service.characteristics().await? { - log::info!("chara = {}", chara.uuid().await?); - if chara.uuid().await? == msg_char() { - utils::update_last_found(*mac_address); - // chara.write(&data).await?; - read_char_found = true; - log::error!("queue length = {}", send_queue.len()); - // let write_io = chara.write_io().await?; - while send_queue.len() > 0 { - // let data = send_queue.pop_front().unwrap(); - let data: String; - match send_queue.pop_front() { - Some(queue_top) => data = queue_top, - None => { - log::error!("No data found in queue"); - break; - } - } - log::info!( - "Sending data to device {} : {:?}", + } + } + let mut read_char_found = false; + for service in device.services().await? { + if read_char_found { + break; + } + if service.uuid().await? == main_service_uuid() { + for chara in service.characteristics().await? { + if chara.uuid().await? == msg_char() { + utils::update_last_found(*mac_address); + read_char_found = true; + while send_queue.len() > 0 { + let data: String; + match send_queue.pop_front() { + Some(queue_top) => data = queue_top, + None => { + log::error!("No data found in queue"); + break; + } + } + log::info!( + "Sending data to device {} : {:?}", + &stringified_addr, + &data + ); + let data = utils::hex_to_bytes(&data); + match chara.write(&data).await { + Ok(()) => { + log::debug!( + "Data sent to device {}", + &stringified_addr + ); + } + Err(err) => { + log::error!( + "Error1 sending data to device {}: {:#?}", &stringified_addr, - &data + &err ); - let data = utils::hex_to_bytes(&data); - match chara.write(&data).await { - Ok(()) => { - log::info!( - " Data1 sent to device {}", - &stringified_addr - ); - } - Err(err) => { - log::error!( - " Error1 sending data to device {}: {:#?}", - &stringified_addr, - &err - ); - } - }; - // async_std::task::sleep(std::time::Duration::from_secs(2)).await; - - // match write_io.send(&data).await { - // Ok(()) => { - // log::info!( - // " Data sent to device {}", - // &stringified_addr - // ); - // } - // Err(err) => { - // log::error!( - // " Error sending data to device {}: {:#?}", - // &stringified_addr, - // &err - // ); - // } - // }; - // log::info!(" {written} bytes written"); } - } + }; } } } - - log::info!("Message sent to device {}", &stringified_addr); - // device.disconnect().await?; - if message_id != "" { - Ok(message_id) - } else { - Err("Message ID not found".into()) - } - // } - // None => { - // return Err("Device not found".into()); - // } - // } + } + } + if message_id != "" { + Ok(message_id) + } else { + Err("Message ID not found".into()) + } } /// Check if bluetooth is powered on by the device. @@ -1011,7 +757,6 @@ impl IdleBleService { drop(session); return ble_enabled; } - } impl StartedBleService { @@ -1066,21 +811,9 @@ pub async fn get_device_info() -> Result> { fn get_filter() -> bluer::DiscoveryFilter { let mut qaul_uuids = HashSet::new(); qaul_uuids.insert(main_service_uuid()); - // qaul_uuids.insert(read_char()); - // qaul_uuids.insert(msg_char()); - bluer::DiscoveryFilter { uuids: qaul_uuids, transport: bluer::DiscoveryTransport::Le, ..Default::default() } -} - -// pub fn disconnect_device(mac_address: Vec) -> Result<(), Box> { -// let addr = Address::from_bytes(mac_address); -// let session = bluer::Session::new()?; -// let adapter = session.default_adapter()?; -// let device = adapter.device(addr)?; -// device.disconnect()?; -// Ok(()) -// } +} \ No newline at end of file diff --git a/rust/ble_module/src/ble/utils.rs b/rust/ble_module/src/ble/utils.rs index 5aeae62f2..bbcd4c91b 100644 --- a/rust/ble_module/src/ble/utils.rs +++ b/rust/ble_module/src/ble/utils.rs @@ -55,8 +55,10 @@ mod tests { pub fn add_device(device: BleScanDevice) { let mut devices = DEVICE_LIST.lock().unwrap(); if devices.contains_key(&device.mac_address) { - devices.get_mut(&device.mac_address).map(|val| { *val = device; }); - }else { + devices.get_mut(&device.mac_address).map(|val| { + *val = device; + }); + } else { devices.insert(device.mac_address, device); } } @@ -69,13 +71,6 @@ pub fn find_device_by_mac(mac_address: Address) -> Option { } else { None } - // match devices - // .iter() - // .find(|device| device.mac_address == mac_address) - // { - // Some(device) => Some(device.clone()), - // None => None, - // } } #[allow(dead_code)] @@ -89,8 +84,10 @@ pub fn remove_device_by_mac(mac_address: Address) { pub fn add_msg_map(stringified_addr: String, hex_msg: String) { let mut msg_map = MSG_MAP.lock().unwrap(); if msg_map.contains_key(stringified_addr.as_str()) { - msg_map.get_mut(&stringified_addr).map(|val| { *val = hex_msg; }); - }else { + msg_map.get_mut(&stringified_addr).map(|val| { + *val = hex_msg; + }); + } else { msg_map.insert(stringified_addr, hex_msg); } } @@ -111,8 +108,10 @@ pub fn remove_msg_map_by_mac(stringified_addr: String) { pub fn add_ignore_device(device: BleScanDevice) { let mut ignore_devices = IGNORE_LIST.lock().unwrap(); if ignore_devices.contains_key(&device.mac_address) { - ignore_devices.get_mut(&device.mac_address).map(|val| { *val = device; }); - }else { + ignore_devices.get_mut(&device.mac_address).map(|val| { + *val = device; + }); + } else { ignore_devices.insert(device.mac_address, device); } } @@ -134,7 +133,6 @@ pub fn update_last_found(mac_address: Address) { match k { Some(device) => { device.last_found_time = current_time_millis(); - log::info!("Time updated"); } None => log::warn!("Device not discovered"), }; @@ -143,9 +141,7 @@ pub fn update_last_found(mac_address: Address) { /// Remove a device from the list of devices present nearby. pub fn remove_ignore_device_by_mac(mac_address: Address) { let mut devices = IGNORE_LIST.lock().unwrap(); - // devices.retain(|device| device.mac_address != mac_address); devices.remove(&mac_address); - log::info!("Device removed from ignore list"); } /// Get the current time in milliseconds since UNIX_EPOCH. @@ -187,7 +183,6 @@ pub fn out_of_range_checker(adapter: Adapter, mut internal_sender: BleResultSend mac_address, ); remove_ignore_device_by_mac(mac_address); - // remove_device_by_mac(mac_address); drop(ignore_list); break; } else { @@ -198,9 +193,9 @@ pub fn out_of_range_checker(adapter: Adapter, mut internal_sender: BleResultSend }); } +// Proccess the message recieved and send it to libqaul. pub fn message_received(e: (String, Address), mut internal_sender: BleResultSender) { let byte_encoded_message = hex_to_bytes(&e.0); - // log::error!("Byte array: {:?}", byte_encoded_message); let json_message: String = match String::from_utf8(byte_encoded_message) { Ok(v) => v, Err(e) => { @@ -208,9 +203,7 @@ pub fn message_received(e: (String, Address), mut internal_sender: BleResultSend return; } }; - // let json_message = String::from_utf8_lossy(&byte_encoded_message); - // log::error!("Received messages: {:?} ", json_message); - let msg_object: Message = match serde_json::from_str(&json_message) { + let msg_object: Message = match serde_json::from_str(&json_message) { Ok(v) => v, Err(e) => { println!("Failed to parse JSON: {}", e); @@ -232,15 +225,9 @@ pub fn message_received(e: (String, Address), mut internal_sender: BleResultSend /// Byte to Hex conversion. pub fn bytes_to_hex(bytes: &[u8]) -> String { let hex_chars: Vec = bytes.iter().map(|byte| format!("{:02x}", byte)).collect(); - hex_chars.join("") } -// /// Bytes to String conversion. -// pub fn bytes_to_str(bytes: &[u8]) -> Result<&str, std::str::Utf8Error> { -// std::str::from_utf8(bytes) -// } - /// Hex to Byte conversion. pub fn hex_to_bytes(hex: &str) -> Vec { if hex.len() % 2 != 0 { diff --git a/rust/ble_module/src/lib.rs b/rust/ble_module/src/lib.rs index f5d2ffa67..f6afeee43 100644 --- a/rust/ble_module/src/lib.rs +++ b/rust/ble_module/src/lib.rs @@ -19,7 +19,6 @@ pub mod rpc; pub fn init(sys_rpc_callback: Box) + Send>) { let rpc_receiver = rpc::init(); - // Spawn new thread thread::spawn(move || { let rt = runtime::Builder::new_current_thread() .enable_all() @@ -27,7 +26,6 @@ pub fn init(sys_rpc_callback: Box) + Send>) { .expect("Failed to create BLE module tokio runtime!"); rt.block_on(async move { - // start BLE module main loop main_loop(sys_rpc_callback, rpc_receiver).await; }); diff --git a/rust/ble_module/src/rpc/msg_loop.rs b/rust/ble_module/src/rpc/msg_loop.rs index 13e83de29..0961f2d2d 100644 --- a/rust/ble_module/src/rpc/msg_loop.rs +++ b/rust/ble_module/src/rpc/msg_loop.rs @@ -17,9 +17,7 @@ pub async fn listen_for_sys_msgs( ) -> Result<(), Box> { let mut local_sender_handle = internal_sender.clone(); loop { - // async_std::task::spawn(async move { let evt = rpc_receiver.recv().await; - // log::info!("Received event: ",); match evt { None => { log::info!("Qaul 'sys' message channel closed. Shutting down gracefully."); @@ -32,36 +30,30 @@ pub async fn listen_for_sys_msgs( match msg.message.unwrap() { StartRequest(req) => match ble_service { QaulBleService::Idle(svc) => { - let mut internal_sender_1 = local_sender_handle.clone(); - // let mut internal_sender_2 = local_sender_handle.clone(); - // let + let internal_sender_1 = local_sender_handle.clone(); let qaul_id = Bytes::from(req.qaul_id); - let handle =async_std::task::spawn(async move { - // let mut - let ble_service = svc - .advertise_scan_listen(qaul_id, None, internal_sender_1.clone(), rpc_receiver.clone()) - .await; - log::info!("BLE Service started successfully"); + let handle = async_std::task::spawn(async move { + let ble_service = svc + .advertise_scan_listen( + qaul_id, + None, + internal_sender_1, + rpc_receiver.clone(), + ) + .await; + log::info!("BLE Service started successfully"); - match ble_service { - QaulBleService::Idle(_) => { - log::error!("Error occured in configuring BLE module"); + match ble_service { + QaulBleService::Idle(_) => { + log::error!("Error occured in configuring BLE module"); + } + QaulBleService::Started(svc) => { + svc.spawn_handles().await; + } } - QaulBleService::Started(svc) => { - // ble_service = QaulBleService::Started(svc); - // async_std::task::spawn(async move { - // ble_service = - svc.spawn_handles().await; - // svc.join_handles.await; - // }); - } - } - internal_sender_1.send_start_successful(); - // ble_service + local_sender_handle.send_start_successful(); }); handle.await; - log::info!("BLE Service started successfully"); - // continue; break; } QaulBleService::Started(_) => { @@ -69,26 +61,12 @@ pub async fn listen_for_sys_msgs( "Received Start Request, but bluetooth service is already running!" ); local_sender_handle.send_result_already_running() - // continue; } }, // This streams were mearged into IdleBleService stream. - // The events are recieved by the main loop and handled there. - StopRequest(_) => {}, - DirectSend(_) => {}, - // log::info!("Received Direct Send Request: {:#?}", req); - // let receiver_id = req.receiver_id.clone(); - // match svc.direct_send(req).await { - // Ok(_) => local_sender_handle.send_direct_send_success(receiver_id), - // Err(err) => local_sender_handle - // .send_direct_send_error(receiver_id, err.to_string()), - // } - - // QaulBleService::Idle(_) => { - // log::info!("Received Direct Send Request, but bluetooth service is not running!"); - // local_sender_handle.send_result_not_running() - // } - // }, + // The events are recieved by the main loop and handled there. + StopRequest(_) => {} + DirectSend(_) => {} InfoRequest(_) => { let mut sender_handle_clone = internal_sender.clone(); spawn(async move { @@ -106,7 +84,6 @@ pub async fn listen_for_sys_msgs( } } } - // }); } Ok(()) } diff --git a/rust/libqaul/src/connections/ble/mod.rs b/rust/libqaul/src/connections/ble/mod.rs index eccde310f..7c60b2c4d 100644 --- a/rust/libqaul/src/connections/ble/mod.rs +++ b/rust/libqaul/src/connections/ble/mod.rs @@ -119,7 +119,7 @@ impl Ble { async_std::task::sleep(std::time::Duration::from_secs(5)).await; } ble_module::init(Box::new(|sys_msg| Sys::send_to_libqaul(sys_msg))); - + // initialize local state { // create node states @@ -608,14 +608,12 @@ impl Ble { .expect("Vec provides capacity as needed"); // send the message - log::info!("BLE send feed message to {:?}", receiver_small_id.clone()); Self::message_send(receiver_small_id, sender_id, buf); } /// BLE message received fn message_received(message: proto::BleDirectReceived) { log::info!("BLE message received"); - log::info!("qaul: Data recieved {:?}", &message.data); // get node ID of sender let node_id: PeerId; if let Some(node) = Neighbours::node_from_small_id(message.from.clone()) { diff --git a/rust/libqaul/src/router/neighbours.rs b/rust/libqaul/src/router/neighbours.rs index c6263b9c4..71df6527f 100644 --- a/rust/libqaul/src/router/neighbours.rs +++ b/rust/libqaul/src/router/neighbours.rs @@ -92,7 +92,6 @@ impl Neighbours { /// If the node does not yet exist, it creates it. pub fn update_node(module: ConnectionModule, node_id: PeerId, rtt: u32) { log::trace!("update_node node {:?}", node_id); - log::debug!("update_node rtt {:?} at connection module {:#?}", rtt, module); // get table let mut neighbours; match module { @@ -110,7 +109,6 @@ impl Neighbours { node.updated_at = Timestamp::get_timestamp(); } else { log::trace!("add node {:?} to neighbours table", node_id); - log::debug!("add node {:?} to neighbours table", node_id); neighbours.nodes.insert( node_id, Neighbour {