Source code for neurone_loader.loader

# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#  This file (loader.py) is part of neurone_loader                             -
#  (https://www.github.com/heilerich/neurone_loader)                           -
#  Copyright © 2019 Felix Heilmeyer.                                           -
#                                                                              -
#  This code is released under the MIT License                                 -
#  https://opensource.org/licenses/mit-license.php                             -
#  Please see the file LICENSE for details.                                    -
# ------------------------------------------------------------------------------
"""
Provides classes to load, represent and export data recorded with the Bittium NeurOne device.
"""

import os
import pandas as pd
import numpy as np
from operator import indexOf

from . import neurone as nr
from .lazy import Lazy, preloadable
from .mne_export import MneExportable
from .util import logger, doc_inherit


# noinspection PyAbstractClass
[docs]class BaseContainer(MneExportable): """ A metaclass that provides properties for accessing data shared between all subclasses. I cannot be used itself as it is not implementing all required methods of its abstract superclass. """ def __init__(self): self._dropped_channels = set() @property def sampling_rate(self): """ :return: the sampling rate, read from the session protocol :rtype: int """ return self._protocol['meta']['sampling_rate'] if hasattr(self, '_protocol') else None def _protocol_channels(self): return self._protocol['channels'] if hasattr(self, '_protocol') else [] def _drop_indexes(self): return sorted([indexOf(self._protocol_channels(), channel) for channel in self._dropped_channels], reverse=True)
[docs] @Lazy def channels(self): """ :return: ordered list of all channel names, read from the session protocol :rtype: list[str] """ return [channel for channel in self._protocol_channels() if channel not in self._dropped_channels]
def _has_data(self): private_attribute_name = getattr(type(self), 'data').private_name return hasattr(self, private_attribute_name) def _extend_droplist(self, channels_to_drop): self._dropped_channels |= set(channels_to_drop) if type(getattr(type(self), 'channels')) is Lazy: if hasattr(self, getattr(type(self), 'channels').private_name): self.channels = [channel for channel in self.channels if channel not in self._dropped_channels]
[docs] def drop_channels(self, channels_to_drop): """ Remove specified channels from loaded data. Dropped channels will be remembered and when data is cleared from memory and reloaded from disk the channels will get removed again. To get them back create a new object of this type to reload from disk. :param channels_to_drop: names of channels to drop :type channels_to_drop: list[str] """ drop_set = (set(channels_to_drop) - self._dropped_channels).intersection(set(self.channels)) not_dropping = set(channels_to_drop) - drop_set if len(not_dropping) > 0: logger.warning('Not dropping channels {channels} since they don\'t exist or have already been dropped' .format(channels=', '.join(not_dropping))) logger.debug('Dropping channels {channels}'.format(channels=', '.join(drop_set))) self._extend_droplist(drop_set)
[docs]@preloadable class Phase(BaseContainer): """ Represents one recording phase of one NeurOne session in one NeurOne Recording :param path: path to the recording *session* folder :param phase: phase object from a session protocol :type path: str :type phase: dict """ def __init__(self, path, phase, protocol=None): BaseContainer.__init__(self) self.path = path self.number = phase['number'] if protocol is None: self._protocol = nr.read_neurone_protocol(self.path) else: self._protocol = protocol self.time_start = phase['time_start'] self.time_stop = phase['time_stop']
[docs] @Lazy def events(self): """ :return: recorded events with Revision, Type, SourcePort, ChannelNumber, Code, StartSampleIndex, StopSampleIndex, DescriptionLength, DescriptionOffset, DataLength, DataOffset, StartTime, StopTime :rtype: pandas.DataFrame """ return pd.DataFrame(nr.read_neurone_events(self.path, self.number, self.sampling_rate)['events'])
@property def event_codes(self): """ :return: all event codes used in the data as int32 in an numpy.ndarray :rtype: numpy.ndarray """ return np.unique(self.events['Code'].values) if 'Code' in self.events else []
[docs] @Lazy def data(self): """ :return: recorded data with shape (samples, channels) in µV :rtype: numpy.ndarray """ data = nr.read_neurone_data(self.path, self.number, self._protocol) / 1000 # data is nanovolts return np.delete(data, self._drop_indexes(), axis=1)
@property def n_samples(self): """ :return: the number of channels, inferred from the binary recording's file size :rtype: int """ return nr.read_neurone_data_info(self.path, self.number, self._protocol).n_samples @property def n_channels(self): """ :return: the number of channels, read from the session protocol :rtype: int """ return nr.read_neurone_data_info(self.path, self.number, self._protocol).n_channels - len(self._dropped_channels)
[docs] def clear_data(self): """ Remove loaded data from memory """ del self.data
# noinspection PyMissingOrEmptyDocstring
[docs] @doc_inherit def drop_channels(self, channels_to_drop): if self._has_data(): drop_indexes = sorted([indexOf(self.channels, channel) for channel in channels_to_drop if channel in self.channels], reverse=True) # ignore channels that were dropped before # noinspection PyAttributeOutsideInit self.data = np.delete(self.data, drop_indexes, axis=1) BaseContainer.drop_channels(self, channels_to_drop)
[docs]@preloadable class Session(BaseContainer): """ Represents one session in one NeurOne Recording and contains all of the session's phases :param path: path to the recording *session* folder :type path: str """ def __init__(self, path): BaseContainer.__init__(self) self.path = path self._protocol = nr.read_neurone_protocol(self.path) self._get_meta() def _get_meta(self): self.time_start = self._protocol['meta']['time_start'] self.time_stop = self._protocol['meta']['time_stop'] assert len(self._protocol['phases']) > 0, \ "Session at {} has no phases".format(self.path) self.phases = [Phase(self.path, p, self._protocol) for p in self._protocol['phases']] @property def event_codes(self): """ :return: all event codes used in the data as int32 in an :class:`numpy.ndarray` :rtype: numpy.ndarray """ return np.unique(np.concatenate([phase.event_codes for phase in self.phases]))
[docs] @Lazy def data(self): """ .. warning:: Calling this replaces the data attribute of the contained phases with a view on the concatenated data to save memory. Keep this in mind when manipulating the contained sessions. :return: concatenated data of all phases with shape (samples, channels) in µV :rtype: numpy.ndarray """ phases = sorted(self.phases, key=lambda phase: phase.number) new_array = None slices = [] for p in phases: p.drop_channels(list(self._dropped_channels)) if new_array is None: new_array = np.copy(p.data, order='C') slices.append((0, len(p.data))) else: old_length = len(new_array) shape = list(new_array.shape) shape[0] += len(p.data) new_array.resize(shape, refcheck=False) # data is explicitly copied above and following slices are new_array[-len(p.data):] = p.data # appended hence also copied, this should be fine slices.append((old_length, old_length + len(p.data))) del p.data for index, p in enumerate(phases): start, stop = slices[index] p.data = new_array[start:stop] return new_array
[docs] def clear_data(self): """ Remove loaded data in all phases from memory """ for p in self.phases: p.clear_data() del self.data
@property def events(self): """ :return: concatenated events of all phases with Revision, Type, SourcePort, ChannelNumber, Code, StartSampleIndex, StopSampleIndex, DescriptionLength, DescriptionOffset, DataLength, DataOffset, StartTime, StopTime :rtype: pandas.DataFrame """ phases = sorted(self.phases, key=lambda p: p.number) all_events = [phases[0].events] current_samples = phases[0].n_samples for i in range(1, len(phases)): if len(phases[i].events) > 0: cur_events = phases[i].events.copy() cur_events['StartSampleIndex'] += current_samples cur_events['StopSampleIndex'] += current_samples cur_time = int(current_samples / self.sampling_rate) cur_events['StartTime'] += cur_time cur_events['StopTime'] += cur_time current_samples += phases[i].n_samples all_events.append(cur_events) return pd.concat(all_events) @property def n_samples(self): """ :return: sum of the number of samples, inferred from the binary recording's file size, of all phases :rtype: int """ return sum([p.n_samples for p in self.phases]) @property def n_channels(self): """ Returns the number of channels used in all phases and makes sure they're equal :return: the number of channels, read from the session protocol :rtype: int """ assert len(set([p.n_channels for p in self.phases])) <= 1, \ "The number of channels shouldn't change between phases" return self.phases[0].n_channels if len(self.phases) > 0 else 0 # noinspection PyMissingOrEmptyDocstring
[docs] @doc_inherit def drop_channels(self, channels_to_drop): if self._has_data(): # noinspection PyAttributeOutsideInit drop_indexes = sorted([indexOf(self.channels, channel) for channel in channels_to_drop if channel in self.channels], reverse=True) # ignore channels that were dropped before # noinspection PyAttributeOutsideInit self.data = np.delete(self.data, drop_indexes, axis=1) p_offset = 0 for phase in self.phases: phase._extend_droplist(channels_to_drop) data_length = phase.data.shape[0] phase.data = self.data[p_offset:p_offset + data_length] p_offset += data_length else: for phase in self.phases: phase.drop_channels(channels_to_drop) BaseContainer.drop_channels(self, channels_to_drop)
[docs]@preloadable class Recording(BaseContainer): """ Represents one NeurOne Recording and contains all of the recording's sessions :param path: path to the recording *recording* folder :type path: str """ def __init__(self, path): BaseContainer.__init__(self) self.path = path self._find_sessions() def _find_sessions(self): session_dirs = [os.path.join(self.path, dirname) for dirname in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, dirname)) and 'Protocol.xml' in os.listdir(os.path.join(self.path, dirname))] assert len(session_dirs) > 0, "No sessions found in {}".format(self.path) self.sessions = list(sorted([Session(path) for path in session_dirs], key=lambda s: s.time_start)) @property def event_codes(self): """ :return: all event codes used in the data as int32 in an numpy.ndarray :rtype: numpy.ndarray """ return np.unique(np.concatenate([session.event_codes for session in self.sessions]))
[docs] @Lazy def data(self): """ :return: concatenated data of all phases of all sessions with shape (samples, channels) in µV :rtype: numpy.ndarray .. warning:: Calling this replaces the data attribute of the contained phases and sessions with a view on the concatenated data to save memory. Keep this in mind when manipulating the contained sessions or phases. """ sessions = sorted(self.sessions, key=lambda x: x.time_start) new_array = None slices = [] all_phase_slices = [] for s in sessions: old_length = len(new_array) if new_array is not None else 0 if s._has_data(): new_length = old_length + len(s.data) else: new_length = old_length + s.n_samples del s.data phases = sorted(s.phases, key=lambda phase: phase.number) phase_slices = [] for p in phases: p.drop_channels(list(self._dropped_channels)) if new_array is None: new_array = np.copy(p.data, order='C') phase_slices.append((0, len(p.data))) else: old_phase_length = len(new_array) shape = list(new_array.shape) shape[0] += len(p.data) new_array.resize(shape, refcheck=False) # data is explicitly copied above and following slices are new_array[-len(p.data):] = p.data # appended hence also copied, this should be fine phase_slices.append((old_phase_length, old_phase_length + len(p.data))) del p.data all_phase_slices.append(phase_slices) slices.append((old_length, new_length)) for s_index, s in enumerate(sessions): for p_index, p in enumerate(s.phases): start, stop = all_phase_slices[s_index][p_index] p.data = new_array[start:stop] start, stop = slices[s_index] s.data = new_array[start:stop] return new_array
[docs] def clear_data(self): """ Remove loaded data in all phases of all sessions from memory """ for s in self.sessions: s.clear_data() del self.data
@property def events(self): """ :return: concatenated events of all phases of all sessions with Revision, Type, SourcePort, ChannelNumber, Code, StartSampleIndex, StopSampleIndex, DescriptionLength, DescriptionOffset, DataLength, DataOffset, StartTime, StopTime :rtype: pandas.DataFrame """ sessions = sorted(self.sessions, key=lambda x: x.time_start) assert len(set([s.sampling_rate for s in sessions])) >= 1, \ 'Loading Sessions with different sampling rates is not supported at this time' sampling_rate = sessions[0].sampling_rate all_events = [sessions[0].events] current_samples = sessions[0].n_samples for i in range(1, len(sessions)): if len(sessions[i].events) > 0: cur_events = sessions[i].events.copy() cur_events['StartSampleIndex'] += current_samples cur_events['StopSampleIndex'] += current_samples cur_time = int(current_samples / sampling_rate) cur_events['StartTime'] += cur_time cur_events['StopTime'] += cur_time current_samples += sessions[i].n_samples all_events.append(cur_events) return pd.concat(all_events) @property def n_samples(self): """ :return: sum of the number of samples, inferred from the binary recording's file size, of all phases of all sessions :rtype: int """ return sum([s.n_samples for s in self.sessions]) @property def n_channels(self): """ Returns the number of channels used in all phases and makes sure they're equal :return: the number of channels, read from the session protocol :rtype: int """ assert len(set([s.n_channels for s in self.sessions])) <= 1, \ "The number of channels shouldn't change between sessions" return self.sessions[0].n_channels if len(self.sessions) > 0 else 0 @property def sampling_rate(self): """ Returns the sampling rate used in all sessions and makes sure they're all equal :return: the sampling rate, read from the session protocols :rtype: int """ assert len(set([s.sampling_rate for s in self.sessions])) <= 1, \ "The sampling rate shouldn't change between sessions" return self.sessions[0].sampling_rate if len(self.sessions) > 0 else 0 @property def channels(self): """ Returns the channels used in all sessions and makes sure they're equal :return: ordered list of all channel names, read from the session protocols :rtype: list[str] """ assert len(set([''.join(s.channels) for s in self.sessions])) <= 1, \ "Channel names shouldn't change between sessions" return [channel for channel in self.sessions[0].channels if channel not in self._dropped_channels] if len(self.sessions) > 0 else 0 # noinspection PyMissingOrEmptyDocstring
[docs] @doc_inherit def drop_channels(self, channels_to_drop): if self._has_data(): # noinspection PyAttributeOutsideInit drop_indexes = sorted([indexOf(self.channels, channel) for channel in channels_to_drop if channel in self.channels], reverse=True) # ignore channels that were dropped before # noinspection PyAttributeOutsideInit self.data = np.delete(self.data, drop_indexes, axis=1) s_offset = 0 p_offset = 0 # update views and droplists for session in self.sessions: session._extend_droplist(channels_to_drop) data_length = session.data.shape[0] session.data = self.data[s_offset:s_offset + data_length] s_offset += data_length for phase in session.phases: phase._extend_droplist(channels_to_drop) data_length = phase.data.shape[0] phase.data = self.data[p_offset:p_offset + data_length] p_offset += data_length else: for session in self.sessions: session.drop_channels(channels_to_drop) BaseContainer.drop_channels(self, channels_to_drop)