Where the Care App and Care Portal are mentioned, the articles in the Support Center refer to both the professional Plan (Care Lab App and Care Lab Portal) and the enterprise Plan (Care App and Care Portal), unless indicated otherwise.
The availability of Care Portal access on enterprise is defined by the study configuration. For further information on your study configuration, talk to your Study team or account manager.
Python example to convert Empatica's Avro into CSV
The Avro data format is platform-independent, and it offers various official APIs, including one for Python:
Apache Avro™ 1.11.1 Getting Started (Python)
To gain familiarity with data provided in Avro format, we offer an example Python script that shows the basics of how to access data contained within Avro files.
This script takes one Empatica's Avro file as input and converts it into multiple CSV files, one for each sensor.
The code below shows how to load and access the fields of an Avro file (NB: this code is tested for the v6 of the Avro files). The script saves raw data on a CSV file for each sensor. Note that the availability of the different raw data depends on the study configuration: some output files could be empty.
The avro package needs to be installed in your python environment before running this script (e.g. prompt pip install avro in your terminal).
from avro.datafile import DataFileReader
from avro.io import DatumReader
import json
import csv
import os
## Define the location of the Avro file and output folder.
# macOS example:
# avro_file_path = "/Users/timmytommy/Data/Avros/1-1-100_1707311064.avro"
# output_dir = "/Users/timmytommy/Data/Output/"
#
# Windows example:
# avro_file_path = "C:/Data/Avros/1-1-100_1707311064.avro"
# output_dir = "C:/Data/Output/"
avro_file_path = "[insert the avro filename with the full path]"
output_dir = "[insert the path to a directory where to save csv files]"
## Read Avro file
reader = DataFileReader(open(avro_file_path, "rb"), DatumReader())
schema = json.loads(reader.meta.get('avro.schema').decode('utf-8'))
data= next(reader)
## Uncomment the below 2 lines to print the Avro schema
# print(schema)
# print(" ")
## Export sensors data to csv files
# Accelerometer
acc = data["rawData"]["accelerometer"]
timestamp = [round(acc["timestampStart"] + i * (1e6 / acc["samplingFrequency"]))
for i in range(len(acc["x"]))]
# Convert ADC counts in g
delta_physical = acc["imuParams"]["physicalMax"] - acc["imuParams"]["physicalMin"]
delta_digital = acc["imuParams"]["digitalMax"] - acc["imuParams"]["digitalMin"]
x_g = [val * delta_physical / delta_digital for val in acc["x"]]
y_g = [val * delta_physical / delta_digital for val in acc["y"]]
z_g = [val * delta_physical / delta_digital for val in acc["z"]]
with open(os.path.join(output_dir, 'accelerometer.csv'), 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["unix_timestamp", "x", "y", "z"])
writer.writerows([[ts, x, y, z] for ts, x, y, z in zip(timestamp, x_g, y_g, z_g)])
# Gyroscope
gyro = data["rawData"]["gyroscope"]
timestamp = [round(gyro["timestampStart"] + i * (1e6 / gyro["samplingFrequency"]))
for i in range(len(gyro["x"]))]
# Convert ADC counts in dps (degree per second)
delta_physical = gyro["imuParams"]["physicalMax"] - gyro["imuParams"]["physicalMin"]
delta_digital = gyro["imuParams"]["digitalMax"] - gyro["imuParams"]["digitalMin"]
x_dps = [val * delta_physical / delta_digital for val in gyro["x"]]
y_dps = [val * delta_physical / delta_digital for val in gyro["y"]]
z_dps = [val * delta_physical / delta_digital for val in gyro["z"]]
with open(os.path.join(output_dir, 'gyroscope.csv'), 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["unix_timestamp", "x", "y", "z"])
writer.writerows([[ts, x, y, z] for ts, x, y, z in zip(timestamp, x_dps, y_dps, z_dps)])
# Eda
eda = data["rawData"]["eda"]
timestamp = [round(eda["timestampStart"] + i * (1e6 / eda["samplingFrequency"]))
for i in range(len(eda["values"]))]
with open(os.path.join(output_dir, 'eda.csv'), 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["unix_timestamp", "eda"])
writer.writerows([[ts, eda] for ts, eda in zip(timestamp, eda["values"])])
# Temperature
tmp = data["rawData"]["temperature"]
timestamp = [round(tmp["timestampStart"] + i * (1e6 / tmp["samplingFrequency"]))
for i in range(len(tmp["values"]))]
with open(os.path.join(output_dir, 'temperature.csv'), 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["unix_timestamp", "temperature"])
writer.writerows([[ts, tmp] for ts, tmp in zip(timestamp, tmp["values"])])
# Tags
tags = data["rawData"]["tags"]
with open(os.path.join(output_dir, 'tags.csv'), 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["tags_timestamp"])
writer.writerows([[tag] for tag in tags["tagsTimeMicros"]])
# BVP
bvp = data["rawData"]["bvp"]
timestamp = [round(bvp["timestampStart"] + i * (1e6 / bvp["samplingFrequency"]))
for i in range(len(bvp["values"]))]
with open(os.path.join(output_dir, 'bvp.csv'), 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["unix_timestamp", "bvp"])
writer.writerows([[ts, bvp] for ts, bvp in zip(timestamp, bvp["values"])])
# Systolic peaks
sps = data["rawData"]["systolicPeaks"]
with open(os.path.join(output_dir, 'systolic_peaks.csv'), 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["systolic_peak_timestamp"])
writer.writerows([[sp] for sp in sps["peaksTimeNanos"]])
# Steps
steps = data["rawData"]["steps"]
timestamp = [round(steps["timestampStart"] + i * (1e6 / steps["samplingFrequency"]))
for i in range(len(steps["values"]))]
with open(os.path.join(output_dir, 'steps.csv'), 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["unix_timestamp", "steps"])
writer.writerows([[ts, step] for ts, step in zip(timestamp, steps["values"])])
How to run the example script
With the avro package installed, enter the path of one Avro file and the output directory path in the script:
avro_file_path = "[insert the avro filename with the full path]"
output_dir = "[insert the path to a directory where to save csv files]"
Save the script and run it.
- Install Python from Download Python
- Open a terminal and run
pip3 install avro
- Download this file: avro_to_csv_example_script.py
- Open the downloaded file in a text editor and replace the following:
-
avro_file_path with the path of one Avro file, for example:
avro_file_path = "/Users/your_ursename/RawData/Avros/1-1-001_1671668087.avro"
-
output_dir with the path of your chosen output folder, for example:
output_dir = "/Users/your_ursename/RawData/Output/"
- Save the file
-
avro_file_path with the path of one Avro file, for example:
- Run the script:
- On the terminal, write
python3
, press space on your keyboard, then drag and drop avro_to_csv_example_script.py on the terminal. For example:
python3 /Users/your_ursename/RawData/avro_to_csv_example_script.py
- Press enter to run the script.
- On the terminal, write
- Install Python from Download Python
- Make sure to add Python to PATH during installation
- Open a terminal and run
pip install avro
- Download this file: avro_to_csv_example_script.py
- Open the downloaded file in a text editor and replace the following:
-
avro_file_path with the path of one Avro file, for example:
avro_file_path = "C:/RawData/Avros/1-1-001_1671668087.avro"
-
output_dir with the path of your chosen output folder, for example:
output_dir = "C:/RawData/Output/"
- Save the file
-
avro_file_path with the path of one Avro file, for example:
- Run the script:
- On the terminal, write
python
, press space on your keyboard, then drag and drop avro_to_csv_example_script.py on the terminal. For example:
python C:/RawData/avro_to_csv_example_script.py
- Press enter to run the script.
- On the terminal, write
The output directory will now contain 8 CSV files with raw data extracted from the Avro file:
- accelerometer.csv
- bvp.csv
- eda.csv
- gyroscope.csv
- steps.csv
- systolic_peaks.csv
- tags.csv
- temperature.csv