# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# This file (neurone.py) is part of neurone_loader -
# (https://www.github.com/heilerich/neurone_loader) -
# Copyright © 2019 Felix Heilmeyer. -
# -
# This code is released under the MIT License -
# https://opensource.org/licenses/mit-license.php -
# Please see the file LICENSE for details. -
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# This file contains code originally from export2hdf5 -
# (https://github.com/bwrc/export2hdf5) -
# Created by Andreas Henelius <andreas.henelius@ttl.fi>, -
# Finnish Institute of Occupational Health -
# ------------------------------------------------------------------------------
"""
Contains functions for reading data recorded with a
Bittium NeurOne device. This module currently supports
reading of data and events.
"""
import numpy as np
import xml.etree.ElementTree
from os import path
from construct import Struct, Int32sl, Int64ul
from datetime import datetime
from collections import namedtuple
[docs]def read_neurone_protocol(fpath):
"""
Read the measurement protocol from an XML file.
Arguments:
- fpath: the path to the directory holding the
NeurOne measurement (i.e., the
directory Protocol.xml and Session.xml
files.
Returns:
- a dictionary containing (i) the names of the channels
in the recording and (ii) meta information
(recording start/stop times, sampling rate).
{"meta" : <dict with metadata>,
"channels" : <array with channel names>}
"""
# Define filename
fname_protocol = path.join(fpath, "Protocol.xml")
fname_session = path.join(fpath, "Session.xml")
# --------------------------------------------------
# Read the protocol data
# --------------------------------------------------
# Define the XML namespace as a shorthand
ns = {'xmlns': 'http://www.megaemg.com/DataSetGeneralProtocol.xsd'}
# Get channel names and organise them according to their
# physical order (InputNumber), which is the order
# in which the channels are being sampled.
doc_root = xml.etree.ElementTree.parse(fname_protocol).getroot()
channels = doc_root.findall("xmlns:TableInput", namespaces=ns)
channel_names = [(0, 0)] * len(channels)
for i, ch in enumerate(channels):
channel_names[i] = (int(ch.findall("xmlns:PhysicalInputNumber", namespaces=ns)[0].text),
ch.findall("xmlns:Name", namespaces=ns)[0].text)
channel_names = [i for _, i in sorted(channel_names)]
# Get the sampling rate
sampling_rate = int(doc_root.findall("xmlns:TableProtocol", namespaces=ns)[0]
.findall("xmlns:ActualSamplingFrequency", namespaces=ns)[0].text)
# --------------------------------------------------
# Read the session data
# --------------------------------------------------
# Define the XML namespace as a shorthand
ns2 = {'xmlns': 'http://www.megaemg.com/DataSetGeneralSession.xsd'}
# Get channel names and organise them according to their
# physical order (InputNumber), which is the order
# in which the channels are being sampled.
doc_root = xml.etree.ElementTree.parse(fname_session).getroot()
session = doc_root.findall("xmlns:TableSession", namespaces=ns2)
time_start = session[0].findall("xmlns:StartDateTime", namespaces=ns2)[0].text
time_stop = session[0].findall("xmlns:StopDateTime", namespaces=ns2)[0].text
phases = [{'number': phase.findall("xmlns:Folder", namespaces=ns2)[0].text.split("\\")[-1],
'time_start': phase.findall("xmlns:StartDateTime", namespaces=ns2)[0].text,
'time_stop': phase.findall("xmlns:StopDateTime", namespaces=ns2)[0].text}
for phase in doc_root.findall("xmlns:TableSessionPhase", namespaces=ns2)]
# --------------------------------------------------
# Package the information
# --------------------------------------------------
meta = {}
def _convert_time(time_str):
time_str = time_str[0:time_str.index('+')]
if len(time_str) > 26:
time_str = time_str[0:26]
return datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%f")
meta["time_start"] = _convert_time(time_start)
meta["time_stop"] = _convert_time(time_stop)
meta["sampling_rate"] = sampling_rate
for phase in phases:
phase['time_start'] = _convert_time(phase['time_start'])
phase['time_stop'] = _convert_time(phase['time_stop'])
return {'channels': channel_names, 'meta': meta, 'phases': phases}
[docs]def read_neurone_data(fpath, session_phase=1, protocol=None):
"""
Read the NeurOne signal data from a binary file.
Arguments:
- fpath: the path to the directory holding the
NeurOne measurement (i.e., the
directory Protocol.xml and Session.xml
files.
- session_phase:
The phase of the measurement. Currently
only reading of the first phase (1) is
supported.
- protocol:
The dictionary obtained using the function
read_neurone_protocol. This argument is optional
and if not given, the protocol is automatically read.
Returns:
- A numpy ndarray with the data, where each columns stores
the data for one channel.
"""
fname = path.join(fpath, str(session_phase), '1.bin')
# Read the protocol unless provided
if protocol is None:
protocol = read_neurone_protocol(fpath)
# Determine number of samples to read
n_samples, n_channels = read_neurone_data_info(fpath, session_phase, protocol)
# Read the data and store the data
# in an ndarray
data = np.fromfile(fname, dtype='<i4')
data.shape = (n_samples, n_channels)
return data
[docs]def read_neurone_data_info(fpath, session_phase=1, protocol=None):
"""
Read the sample and channel count from a NeurOne signal binary file.
Arguments:
- fpath: the path to the directory holding the
NeurOne measurement (i.e., the
directory Protocol.xml and Session.xml
files.
- session_phase:
The phase of the measurement. Currently
only reading of the first phase (1) is
supported.
- protocol:
The dictionary obtained using the function
read_neurone_protocol. This argument is optional
and if not given, the protocol is automatically read.
Returns:
Returns:
- a named tuple containing (i) the number of channels
and (ii) the number of samples in the recording.
( n_samples, n_channels )
"""
fname = path.join(fpath, str(session_phase), '1.bin')
# Read the protocol unless provided
if protocol is None:
protocol = read_neurone_protocol(fpath)
# Determine number of samples and channels
f_info = path.getsize(fname)
n_channels = len(protocol['channels'])
n_samples = int(f_info / 4 / n_channels)
DataInfo = namedtuple('DataInfo', ['n_samples', 'n_channels'])
return DataInfo(n_samples, n_channels)
[docs]def read_neurone_events(fpath, session_phase=1, sampling_rate=None):
"""
Read the NeurOne events from a binary file.
Arguments:
- fpath: the path to the directory holding the
NeurOne measurement (i.e., the
directory Protocol.xml and Session.xml
files.
- sampling_rate:
The sampling rate of the recording.
This argument is optional and if not given,
the protocol is automatically read.
- session_phase:
The phase of the measurement. Currently
only reading of the first phase (1) is
supported.
Returns:
- A dict containing the events and the data type for the events.
{"events" : <numpy structured array with the events>,
"events_dtype" : <array with the numpy dtype for the events>}
"""
fname = path.join(fpath, str(session_phase), "events.bin")
# Get the sampling rate unless provided
if sampling_rate is None:
protocol = read_neurone_protocol(fpath)
sampling_rate = protocol['meta']['sampling_rate']
# Determine number of events
f_info = path.getsize(fname)
n_events = int(f_info / 88)
events = [{}] * n_events
# Read events in chunks of 88 bytes and unpack
# also add start / stop time for each event
# and remove 'reserved for future use' (RFU) fields
event_format = get_n1_event_format()
with open(fname, mode='rb') as file:
for i in range(n_events):
events[i] = event_format.parse(file.read(88))
events[i]['StartTime'] = events[i]['StartSampleIndex'] / sampling_rate
events[i]['StopTime'] = events[i]['StopSampleIndex'] / sampling_rate
for j in range(5):
del events[i]['RFU' + str(j+1)]
del events[i]['_io']
# Create a numpy structured array from the events
events_dtype = np.dtype([("Revision", np.int32),
("Type", np.int32),
("SourcePort", np.int32),
("ChannelNumber", np.int32),
("Code", np.int32),
("StartSampleIndex", np.int64),
("StopSampleIndex", np.int64),
("DescriptionLength", np.int64),
("DescriptionOffset", np.int64),
("DataLength", np.int64),
("DataOffset", np.int64),
("StartTime", np.int64),
("StopTime", np.int64)])
# convert array of event dicts to an array of tuples
if len(events) == 0:
return {'events': np.array([], dtype=events_dtype), 'dtype': events_dtype}
key_list = [k for k, v in events[0].items()]
tmp = [tuple([e[k] for k in key_list]) for e in events]
events = np.array(tmp, dtype=events_dtype)
return {'events': events, 'dtype': events_dtype}