# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# This file (mne_export.py) is part of neurone_loader -
# (https://www.github.com/heilerich/neurone_loader) -
# Copyright © 2019 Felix Heilmeyer. -
# -
# This code is released under the MIT License -
# https://opensource.org/licenses/mit-license.php -
# Please see the file LICENSE for details. -
# ------------------------------------------------------------------------------
"""
Provides the metaclass `MneExportable` that allows subclasses implementing all the metaclass's properties
to be converted to a `mne.io.RawArray`.
"""
import numpy as np
import abc
from copy import deepcopy
from .util import logger
# compatible with Python 2 *and* 3:
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
[docs]class UnknownChannelException(Exception):
"""
Raised if data contains a channel name that is neither in a list of well-known channels nor in an (optional) list
of user supplied channel name to channel type mappings.
"""
pass
[docs]class MneExportable(ABC):
"""
A metaclass that provides a function allowing objects that expose data, events, channels and sampling_rate
properties to be converted to an mne.io.RawArray.
"""
def _import_mne(self):
try:
# noinspection PyPackageRequirements
import mne
self._mne = mne
return True
except ImportError:
logger.error("To convert data to an MNE object you must install MNE "
"and all its dependencies.")
return False
[docs] def to_mne(self, substitute_zero_events_with=None, copy=False, channel_type_mappings=None):
"""
Convert loaded data to a mne.io.RawArray
:param substitute_zero_events_with: None. events with code = 0 are not supported by MNE, if this parameter is
set, the event code 0 will be substituted with this parameter
:param copy: False. If False (default), the original data will be removed from memory to save space while
creating the mne.io.RawArray. If the data is needed again it must be reloaded from disk
:param channel_type_mappings: Optional. You can provide a dictionary of channel name to type mappings. If the
data contains any channel not in the list of well-known channel names and not in
this mapping the conversion will raise UnknownChannelException. You can choose
to map any unknown channel to one specific type e.g. {'#unknown': 'eeg'}. For a
list of available types see the documentation of :func:`mne.pick_types`. This
setting takes precedence over the built-in list of common channel names.
:type substitute_zero_events_with: None or int
:type copy: bool
:type channel_type_mappings: None or dict
:return: the converted data
:rtype: mne.io.RawArray
:raises ImportError: if the mne package is not installed
:raises UnknownChannelException: if a unknown channel name is encountered (see channel_type_mappings parameter)
"""
if not hasattr(self, '_mne'):
if not self._import_mne():
raise ImportError
mne = self._mne
allowed_channel_types = ['meg', 'eeg', 'stim', 'eog', 'ecg', 'emg', 'misc', 'resp', 'chpi', 'exci', 'ias',
'syst', 'seeg', 'dipole', 'gof', 'bio', 'ecog', 'fnirs']
if channel_type_mappings is not None:
for value in channel_type_mappings.values():
assert value in allowed_channel_types, '{type} is not a recognized mne channel type'.format(type=value)
events = self.events
def _channel_type(name):
mappings = [
(['emg'], 'emg'),
(['eog'], 'eog'),
(['ecg'], 'ecg'),
(['Microphone', 'GSR'], 'bio'),
(_default_eeg_channel_names, 'eeg')
]
def _get_common_type(ch_name):
for starts, ch_type in mappings:
for start in starts:
if ch_name.lower().startswith(start.lower()):
return ch_type
return None
if channel_type_mappings is not None:
if name in channel_type_mappings:
assert channel_type_mappings[name] in allowed_channel_types, '{type} is not a recognized mne' \
' channel type'.format(type=value)
return channel_type_mappings[name]
elif _get_common_type(name) is not None:
return _get_common_type(name)
elif '#unknown' in channel_type_mappings:
unknown_channel_type = channel_type_mappings['#unknown']
logger.warning('Could not properly classify {channel}. It will be classified as {default} channel '
'because {default} was set as default for unknown channels.'
.format(channel=name, default=unknown_channel_type))
return unknown_channel_type
else:
if _get_common_type(name) is not None:
return _get_common_type(name)
logger.error('Encountered channel ({channel}) which is not in the list of well-known channels'
'and no user defined mapping was supplied.'.format(channel=name))
raise UnknownChannelException
channel_types = list(map(_channel_type, self.channels))
assert len(channel_types) == len(self.channels)
data_length = self.data.T.shape[1]
data_dtype = self.data.dtype
ssc = [(start, stop, code) for (start, stop, code)
in events[['StartSampleIndex', 'StopSampleIndex', 'Code']].values]
if substitute_zero_events_with is not None:
event_codes = np.unique(events['Code'].values)
assert type(substitute_zero_events_with) is int, 'substitute_zero_events_with must be int or None'
assert substitute_zero_events_with not in event_codes, \
"""the original data can't contain event with code
substitute_zero_events_with ({})""".format(substitute_zero_events_with)
stim_channel_names = ['STI 014']
stim_channels = [np.zeros(data_length, dtype=data_dtype)]
def _add_stim_channel():
index = len(stim_channel_names)
stim_channel_names.append('STI {:03g}'.format(index + 14))
stim_channels.append(np.zeros(data_length, dtype=data_dtype))
for start, stop, code in ssc:
channel_index = 0
if substitute_zero_events_with is not None:
if code == 0:
code = substitute_zero_events_with
else:
assert code != 0, "events with event code 0 are not supported by MNE, use the " \
"substitute_zero_events_with parameter of this method to substitute with an " \
"alternative code"
corrected_start = start - 1 if start != 0 else 0 # sample before event start needs to be 0
while not (stim_channels[channel_index][corrected_start:stop + 1] == 0).all():
channel_index += 1
if channel_index >= len(stim_channels):
_add_stim_channel()
if channel_index != 0:
logger.warning('Event (code {current_code}) has a concurrent event between samples ({start}) and '
'({stop}). An additional stim channel will be used/created: ({channel}).'
.format(current_code=code, start=start, stop=stop,
channel=stim_channel_names[channel_index]))
stim_channels[channel_index][start:stop + 1] += code # event must be at least one sample long
stim_info = mne.create_info(ch_names=stim_channel_names,
sfreq=self.sampling_rate,
ch_types='stim')
stim_cnt = mne.io.RawArray(stim_channels, stim_info, verbose='WARNING')
channel_names = deepcopy(self.channels)
channel_names.extend(stim_channel_names)
all_channel_types = deepcopy(channel_types)
all_channel_types.extend(['stim' for _ in stim_channel_names])
info = mne.create_info(ch_names=self.channels,
sfreq=self.sampling_rate,
ch_types=channel_types)
# data is µV samples x channels, mne needs V channels x samples
data = self.data.T / (1000 * 1000)
cnt = mne.io.RawArray(data, info)
if not copy:
self.clear_data()
cnt = cnt.add_channels([stim_cnt])
return cnt
@property
@abc.abstractmethod
def data(self):
"""
Abstract Property
:return: should contain data in (n_samples, n_channels) shape
:rtype: numpy.ndarray
"""
[docs] @abc.abstractmethod
def clear_data(self):
"""
Abstract Method
Should delete loaded data from memory
"""
@property
@abc.abstractmethod
def events(self):
"""
Abstract Property
:return: should contain the events as a DataFrame, required fields are `StartSampleIndex`, `StopSampleIndex`
and `Code`. Additional fields are ignored.
:rtype: pandas.DataFrame
"""
@property
@abc.abstractmethod
def channels(self):
"""
Abstract Property
:return: should contain the names of channels, matching the sequence used in the data property
:rtype: list[str]
"""
@property
@abc.abstractmethod
def sampling_rate(self):
"""
Abstract Property
:return: should contain the used sampling rate
:rtype: int
"""
_default_eeg_channel_names = ['Fp1', 'Fpz', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'FC5', 'FC1', 'FC2', 'FC6', 'M1', 'T7',
'C3', 'Cz', 'C4', 'T8', 'M2', 'CP5', 'CP1', 'CP2', 'CP6', 'P7', 'P3', 'Pz', 'P4', 'P8',
'POz', 'O1', 'Oz', 'O2', 'AF7', 'AF3', 'AF4', 'AF8', 'F5', 'F1', 'F2', 'F6', 'FC3', 'FCz',
'FC4', 'C5', 'C1', 'C2', 'C6', 'CP3', 'CPz', 'CP4', 'P5', 'P1', 'P2', 'P6', 'PO5', 'PO3',
'PO4', 'PO6', 'FT7', 'FT8', 'TP7', 'TP8', 'PO7', 'PO8', 'FT9', 'FT10', 'TPP9h', 'TPP10h',
'PO9', 'PO10', 'P9', 'P10', 'AFF1', 'AFz', 'AFF2', 'FFC5h', 'FFC3h', 'FFC4h', 'FFC6h',
'FCC5h', 'FCC3h', 'FCC4h', 'FCC6h', 'CCP5h', 'CCP3h', 'CCP4h', 'CCP6h', 'CPP5h', 'CPP3h',
'CPP4h', 'CPP6h', 'PPO1', 'PPO2', 'I1', 'Iz', 'I2', 'AFp3h', 'AFp4h', 'AFF5h', 'AFF6h',
'FFT7h', 'FFC1h', 'FFC2h', 'FFT8h', 'FTT9h', 'FTT7h', 'FCC1h', 'FCC2h', 'FTT8h', 'FTT10h',
'TTP7h', 'CCP1h', 'CCP2h', 'TTP8h', 'TPP7h', 'CPP1h', 'CPP2h', 'TPP8h', 'PPO9h', 'PPO5h',
'PPO6h', 'PPO10h', 'POO9h', 'POO3h', 'POO4h', 'POO10h', 'OI1h', 'OI2h']