Skip to content

Data Ingestion

Heet Sankesara edited this page Jul 18, 2023 · 5 revisions

Data ingestion is one of the main component of the pipeline. We have used PySpark, a big data pipeline, to read data using Avro or CSV schema. This has proved to be much more efficient than reading data using standard reader module. The current module support reading data from the local storage and from the SFTP server. Our next aim is to integrate S3 integration to data ingestion as well.

We have also created a custom data reading function which can be used outside the pipeline. This would facilitate researchers to read data much more quickly with the least amount of effort.

Custom data reading module

After installing radarpipeline as a python library, it can be used to import SparkCSVDataReader which is a custom data reader module. This would help users to read radar data much more quickly and efficient. Here's an example of how it would work.

Module documentation

Inputs for initializing the class SparkCSVDataReader arr config: Dict, required_data: List[str], df_type: str = "pandas" and spark_config: Dict = {}.

config: config requires location of the data in the same format as input in config.yaml

required_data: List of variables that should be read

df_type: Type of the data frame. Same as stated in configuration

spark_config: Dict containing all the spark configuration. Same variables as stated in the spark_config section in Configuration

Example

from radarpipeline import radarpipeline
from radarpipeline.io import SparkCSVDataReader

data_reader = SparkCSVDataReader({"config": {"source_path": "../mockdata/mockdata"}}, ["android_phone_battery_level"])
data = data_reader.read_data()

# In this case source path is the mock data location and the required data is android_phone_battery_level

Logs:

2023-07-18 10:32:50,446 [INFO]: Reading data for user: 2a02e53a-951e-4fd0-b47f-195a87096bd0
2023-07-18 10:32:50,449 [INFO]: Reading data for variable: android_phone_battery_level
2023-07-18 10:32:50,452 [INFO]: Schema found
2023-07-18 10:32:55,040 [INFO]: Reading data for user: 07a69f47-1923-4cfc-b89b-0eefad483f43
2023-07-18 10:32:55,041 [INFO]: Reading data for variable: android_phone_battery_level
2023-07-18 10:32:55,045 [INFO]: Schema found
2023-07-18 10:32:58,568 [INFO]: Reading data for user: 5c0e2ec7-6f85-4041-9669-7145075d1754
2023-07-18 10:32:58,570 [INFO]: Reading data for variable: android_phone_battery_level
2023-07-18 10:32:58,571 [INFO]: Schema found
2023-07-18 10:32:59,113 [INFO]: Reading data for user: 072ddb22-82ef-4b81-8460-41ab096b54bb
2023-07-18 10:32:59,114 [INFO]: Reading data for variable: android_phone_battery_level
2023-07-18 10:32:59,116 [INFO]: Schema found
# Would return all the data keys i.e user ids
print(data.get_data_keys())

Output:

['2a02e53a-951e-4fd0-b47f-195a87096bd0',
 '07a69f47-1923-4cfc-b89b-0eefad483f43',
 '5c0e2ec7-6f85-4041-9669-7145075d1754',
 '072ddb22-82ef-4b81-8460-41ab096b54bb']
# Returns all the combined data for the given variables
print(data.get_data())

Output:

{'2a02e53a-951e-4fd0-b47f-195a87096bd0': <radarpipeline.datalib.radar_user_data.RadarUserData at 0x7f89e706a6a0>,
 '07a69f47-1923-4cfc-b89b-0eefad483f43': <radarpipeline.datalib.radar_user_data.RadarUserData at 0x7f89e71c5760>,
 '5c0e2ec7-6f85-4041-9669-7145075d1754': <radarpipeline.datalib.radar_user_data.RadarUserData at 0x7f89e7200d90>,
 '072ddb22-82ef-4b81-8460-41ab096b54bb': <radarpipeline.datalib.radar_user_data.RadarUserData at 0x7f89e71cdaf0>}
# Returns all the combined data for given variables
data.get_variable_data(variables='android_phone_battery_level')

Output:

	key.projectId	key.userId	key.sourceId	value.time	value.timeReceived	value.batteryLevel	value.isPlugged	value.status
0	STAGING_PROJECT	2a02e53a-951e-4fd0-b47f-195a87096bd0	f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6	2018-11-24 09:00:52.672000000	2018-11-24 09:00:52.672000000	0.97	False	DISCHARGING
1	STAGING_PROJECT	2a02e53a-951e-4fd0-b47f-195a87096bd0	f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6	2018-11-24 09:05:52.667000064	2018-11-24 09:05:52.667000064	0.96	False	DISCHARGING
2	STAGING_PROJECT	2a02e53a-951e-4fd0-b47f-195a87096bd0	f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6	2018-11-24 09:10:52.664999936	2018-11-24 09:10:52.664999936	0.95	False	DISCHARGING
3	STAGING_PROJECT	2a02e53a-951e-4fd0-b47f-195a87096bd0	f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6	2018-11-24 09:20:52.676000000	2018-11-24 09:20:52.676000000	0.93	False	DISCHARGING
4	STAGING_PROJECT	2a02e53a-951e-4fd0-b47f-195a87096bd0	f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6	2018-11-24 09:30:52.776999936	2018-11-24 09:30:52.776999936	0.92	False	DISCHARGING
...	...	...	...	...	...	...	...	...
4560	STAGING_PROJECT	072ddb22-82ef-4b81-8460-41ab096b54bb	fc184ba0-677f-4aae-a4ea-b2b8c7530b18	2019-10-25 15:45:26.290999808	2019-10-25 15:45:26.290999808	0.64	False	DISCHARGING
4561	STAGING_PROJECT	072ddb22-82ef-4b81-8460-41ab096b54bb	fc184ba0-677f-4aae-a4ea-b2b8c7530b18	2019-10-25 15:55:25.657000192	2019-10-25 15:55:25.657000192	0.63	False	DISCHARGING
4562	STAGING_PROJECT	072ddb22-82ef-4b81-8460-41ab096b54bb	fbbed041-f570-4c6c-8a7f-a866ac11fec0	2019-10-27 10:54:34.744999936	2019-10-27 10:54:34.744999936	0.98	False	DISCHARGING
4563	STAGING_PROJECT	072ddb22-82ef-4b81-8460-41ab096b54bb	fbbed041-f570-4c6c-8a7f-a866ac11fec0	2019-10-27 10:57:26.454999808	2019-10-27 10:57:26.454999808	0.97	False	DISCHARGING
4564	STAGING_PROJECT	072ddb22-82ef-4b81-8460-41ab096b54bb	fc184ba0-677f-4aae-a4ea-b2b8c7530b18	2019-10-25 23:00:25.706000128	2019-10-25 23:00:25.706000128	0.01	False	DISCHARGING