Skip to content

Commit

Permalink
refactor: Clean the ble_module and remove unused functions
Browse files Browse the repository at this point in the history
	modified:   ble_module/src/ble/ble_service.rs
	modified:   ble_module/src/ble/utils.rs
	modified:   ble_module/src/lib.rs
	modified:   ble_module/src/rpc/msg_loop.rs
	modified:   libqaul/src/connections/ble/mod.rs
	modified:   libqaul/src/router/neighbours.rs
  • Loading branch information
sukhman-sukh committed Aug 26, 2024
1 parent 7994789 commit 1bf958b
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 593 deletions.
761 changes: 247 additions & 514 deletions rust/ble_module/src/ble/ble_service.rs

Large diffs are not rendered by default.

41 changes: 14 additions & 27 deletions rust/ble_module/src/ble/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ mod tests {
pub fn add_device(device: BleScanDevice) {
let mut devices = DEVICE_LIST.lock().unwrap();
if devices.contains_key(&device.mac_address) {
devices.get_mut(&device.mac_address).map(|val| { *val = device; });
}else {
devices.get_mut(&device.mac_address).map(|val| {
*val = device;
});
} else {
devices.insert(device.mac_address, device);
}
}
Expand All @@ -69,13 +71,6 @@ pub fn find_device_by_mac(mac_address: Address) -> Option<BleScanDevice> {
} else {
None
}
// match devices
// .iter()
// .find(|device| device.mac_address == mac_address)
// {
// Some(device) => Some(device.clone()),
// None => None,
// }
}

#[allow(dead_code)]
Expand All @@ -89,8 +84,10 @@ pub fn remove_device_by_mac(mac_address: Address) {
pub fn add_msg_map(stringified_addr: String, hex_msg: String) {
let mut msg_map = MSG_MAP.lock().unwrap();
if msg_map.contains_key(stringified_addr.as_str()) {
msg_map.get_mut(&stringified_addr).map(|val| { *val = hex_msg; });
}else {
msg_map.get_mut(&stringified_addr).map(|val| {
*val = hex_msg;
});
} else {
msg_map.insert(stringified_addr, hex_msg);
}
}
Expand All @@ -111,8 +108,10 @@ pub fn remove_msg_map_by_mac(stringified_addr: String) {
pub fn add_ignore_device(device: BleScanDevice) {
let mut ignore_devices = IGNORE_LIST.lock().unwrap();
if ignore_devices.contains_key(&device.mac_address) {
ignore_devices.get_mut(&device.mac_address).map(|val| { *val = device; });
}else {
ignore_devices.get_mut(&device.mac_address).map(|val| {
*val = device;
});
} else {
ignore_devices.insert(device.mac_address, device);
}
}
Expand All @@ -134,7 +133,6 @@ pub fn update_last_found(mac_address: Address) {
match k {
Some(device) => {
device.last_found_time = current_time_millis();
log::info!("Time updated");
}
None => log::warn!("Device not discovered"),
};
Expand All @@ -143,9 +141,7 @@ pub fn update_last_found(mac_address: Address) {
/// Remove a device from the list of devices present nearby.
pub fn remove_ignore_device_by_mac(mac_address: Address) {
let mut devices = IGNORE_LIST.lock().unwrap();
// devices.retain(|device| device.mac_address != mac_address);
devices.remove(&mac_address);
log::info!("Device removed from ignore list");
}

/// Get the current time in milliseconds since UNIX_EPOCH.
Expand Down Expand Up @@ -187,7 +183,6 @@ pub fn out_of_range_checker(adapter: Adapter, mut internal_sender: BleResultSend
mac_address,
);
remove_ignore_device_by_mac(mac_address);
// remove_device_by_mac(mac_address);
drop(ignore_list);
break;
} else {
Expand All @@ -198,19 +193,17 @@ pub fn out_of_range_checker(adapter: Adapter, mut internal_sender: BleResultSend
});
}

// Proccess the message recieved and send it to libqaul.
pub fn message_received(e: (String, Address), mut internal_sender: BleResultSender) {
let byte_encoded_message = hex_to_bytes(&e.0);
// log::error!("Byte array: {:?}", byte_encoded_message);
let json_message: String = match String::from_utf8(byte_encoded_message) {
Ok(v) => v,
Err(e) => {
println!("Failed to parse JSON: {}", e);
return;
}
};
// let json_message = String::from_utf8_lossy(&byte_encoded_message);
// log::error!("Received messages: {:?} ", json_message);
let msg_object: Message = match serde_json::from_str(&json_message) {
let msg_object: Message = match serde_json::from_str(&json_message) {
Ok(v) => v,
Err(e) => {
println!("Failed to parse JSON: {}", e);
Expand All @@ -232,15 +225,9 @@ pub fn message_received(e: (String, Address), mut internal_sender: BleResultSend
/// Byte to Hex conversion.
pub fn bytes_to_hex(bytes: &[u8]) -> String {
let hex_chars: Vec<String> = bytes.iter().map(|byte| format!("{:02x}", byte)).collect();

hex_chars.join("")
}

// /// Bytes to String conversion.
// pub fn bytes_to_str(bytes: &[u8]) -> Result<&str, std::str::Utf8Error> {
// std::str::from_utf8(bytes)
// }

/// Hex to Byte conversion.
pub fn hex_to_bytes(hex: &str) -> Vec<u8> {
if hex.len() % 2 != 0 {
Expand Down
2 changes: 0 additions & 2 deletions rust/ble_module/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ pub mod rpc;
pub fn init(sys_rpc_callback: Box<dyn FnMut(Vec<u8>) + Send>) {
let rpc_receiver = rpc::init();

// Spawn new thread
thread::spawn(move || {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create BLE module tokio runtime!");

rt.block_on(async move {
// start BLE module main loop
main_loop(sys_rpc_callback, rpc_receiver).await;
});

Expand Down
67 changes: 22 additions & 45 deletions rust/ble_module/src/rpc/msg_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ pub async fn listen_for_sys_msgs(
) -> Result<(), Box<dyn Error>> {
let mut local_sender_handle = internal_sender.clone();
loop {
// async_std::task::spawn(async move {
let evt = rpc_receiver.recv().await;
// log::info!("Received event: ",);
match evt {
None => {
log::info!("Qaul 'sys' message channel closed. Shutting down gracefully.");
Expand All @@ -32,63 +30,43 @@ pub async fn listen_for_sys_msgs(
match msg.message.unwrap() {
StartRequest(req) => match ble_service {
QaulBleService::Idle(svc) => {
let mut internal_sender_1 = local_sender_handle.clone();
// let mut internal_sender_2 = local_sender_handle.clone();
// let
let internal_sender_1 = local_sender_handle.clone();
let qaul_id = Bytes::from(req.qaul_id);
let handle =async_std::task::spawn(async move {
// let mut
let ble_service = svc
.advertise_scan_listen(qaul_id, None, internal_sender_1.clone(), rpc_receiver.clone())
.await;
log::info!("BLE Service started successfully");
let handle = async_std::task::spawn(async move {
let ble_service = svc
.advertise_scan_listen(
qaul_id,
None,
internal_sender_1,
rpc_receiver.clone(),
)
.await;
log::info!("BLE Service started successfully");

match ble_service {
QaulBleService::Idle(_) => {
log::error!("Error occured in configuring BLE module");
match ble_service {
QaulBleService::Idle(_) => {
log::error!("Error occured in configuring BLE module");
}
QaulBleService::Started(svc) => {
svc.spawn_handles().await;
}
}
QaulBleService::Started(svc) => {
// ble_service = QaulBleService::Started(svc);
// async_std::task::spawn(async move {
// ble_service =
svc.spawn_handles().await;
// svc.join_handles.await;
// });
}
}
internal_sender_1.send_start_successful();
// ble_service
local_sender_handle.send_start_successful();
});
handle.await;
log::info!("BLE Service started successfully");
// continue;
break;
}
QaulBleService::Started(_) => {
log::warn!(
"Received Start Request, but bluetooth service is already running!"
);
local_sender_handle.send_result_already_running()
// continue;
}
},
// This streams were mearged into IdleBleService stream.
// The events are recieved by the main loop and handled there.
StopRequest(_) => {},
DirectSend(_) => {},
// log::info!("Received Direct Send Request: {:#?}", req);
// let receiver_id = req.receiver_id.clone();
// match svc.direct_send(req).await {
// Ok(_) => local_sender_handle.send_direct_send_success(receiver_id),
// Err(err) => local_sender_handle
// .send_direct_send_error(receiver_id, err.to_string()),
// }

// QaulBleService::Idle(_) => {
// log::info!("Received Direct Send Request, but bluetooth service is not running!");
// local_sender_handle.send_result_not_running()
// }
// },
// The events are recieved by the main loop and handled there.
StopRequest(_) => {}
DirectSend(_) => {}
InfoRequest(_) => {
let mut sender_handle_clone = internal_sender.clone();
spawn(async move {
Expand All @@ -106,7 +84,6 @@ pub async fn listen_for_sys_msgs(
}
}
}
// });
}
Ok(())
}
4 changes: 1 addition & 3 deletions rust/libqaul/src/connections/ble/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Ble {
async_std::task::sleep(std::time::Duration::from_secs(5)).await;
}
ble_module::init(Box::new(|sys_msg| Sys::send_to_libqaul(sys_msg)));

// initialize local state
{
// create node states
Expand Down Expand Up @@ -608,14 +608,12 @@ impl Ble {
.expect("Vec<u8> provides capacity as needed");

// send the message
log::info!("BLE send feed message to {:?}", receiver_small_id.clone());
Self::message_send(receiver_small_id, sender_id, buf);
}

/// BLE message received
fn message_received(message: proto::BleDirectReceived) {
log::info!("BLE message received");
log::info!("qaul: Data recieved {:?}", &message.data);
// get node ID of sender
let node_id: PeerId;
if let Some(node) = Neighbours::node_from_small_id(message.from.clone()) {
Expand Down
2 changes: 0 additions & 2 deletions rust/libqaul/src/router/neighbours.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ impl Neighbours {
/// If the node does not yet exist, it creates it.
pub fn update_node(module: ConnectionModule, node_id: PeerId, rtt: u32) {
log::trace!("update_node node {:?}", node_id);
log::debug!("update_node rtt {:?} at connection module {:#?}", rtt, module);
// get table
let mut neighbours;
match module {
Expand All @@ -110,7 +109,6 @@ impl Neighbours {
node.updated_at = Timestamp::get_timestamp();
} else {
log::trace!("add node {:?} to neighbours table", node_id);
log::debug!("add node {:?} to neighbours table", node_id);
neighbours.nodes.insert(
node_id,
Neighbour {
Expand Down

0 comments on commit 1bf958b

Please sign in to comment.