-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
173 lines (130 loc) · 6.38 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# Importing libraries for communicating with Arduino
import serial
ser = serial.Serial('/dev/tty.usbmodem2101', 115200) # open serial port
print("Connected to ", ser.name) # check which port was really used
state = 0
"""
Estimate Relaxation from Band Powers
This example shows how to buffer, epoch, and transform EEG data from a single
electrode into values for each of the classic frequencies (e.g. alpha, beta, theta)
Furthermore, it shows how ratios of the band powers can be used to estimate
mental state for neurofeedback.
The neurofeedback protocols described here are inspired by
*Neurofeedback: A Comprehensive Review on System Design, Methodology and Clinical Applications* by Marzbani et. al
Adapted from https://github.com/NeuroTechX/bci-workshop
"""
import numpy as np # Module that simplifies computations on matrices
import matplotlib.pyplot as plt # Module used for plotting
from pylsl import StreamInlet, resolve_byprop # Module to receive EEG data
import utils # Our own utility functions
# Handy little enum to make code more readable
class Band:
Delta = 0
Theta = 1
Alpha = 2
Beta = 3
""" EXPERIMENTAL PARAMETERS """
# Modify these to change aspects of the signal processing
# Length of the EEG data buffer (in seconds)
# This buffer will hold last n seconds of data and be used for calculations
BUFFER_LENGTH = 5
# Length of the epochs used to compute the FFT (in seconds)
EPOCH_LENGTH = 1
# Amount of overlap between two consecutive epochs (in seconds)
OVERLAP_LENGTH = 0.8
# Amount to 'shift' the start of each next consecutive epoch
SHIFT_LENGTH = EPOCH_LENGTH - OVERLAP_LENGTH
# Index of the channel(s) (electrodes) to be used
# 0 = left ear, 1 = left forehead, 2 = right forehead, 3 = right ear
INDEX_CHANNEL = [0]
if __name__ == "__main__":
""" 1. CONNECT TO EEG STREAM """
# Search for active LSL streams
print('Looking for an EEG stream...')
streams = resolve_byprop('type', 'EEG', timeout=2)
if len(streams) == 0:
raise RuntimeError('Can\'t find EEG stream.')
# Set active EEG stream to inlet and apply time correction
print("Start acquiring data")
inlet = StreamInlet(streams[0], max_chunklen=12)
eeg_time_correction = inlet.time_correction()
# Get the stream info and description
info = inlet.info()
description = info.desc()
# Get the sampling frequency
# This is an important value that represents how many EEG data points are
# collected in a second. This influences our frequency band calculation.
# for the Muse 2016, this should always be 256
fs = int(info.nominal_srate())
""" 2. INITIALIZE BUFFERS """
# Initialize raw EEG data buffer
eeg_buffer = np.zeros((int(fs * BUFFER_LENGTH), 1))
filter_state = None # for use with the notch filter
# Compute the number of epochs in "buffer_length"
n_win_test = int(np.floor((BUFFER_LENGTH - EPOCH_LENGTH) /
SHIFT_LENGTH + 1))
# Initialize the band power buffer (for plotting)
# bands will be ordered: [delta, theta, alpha, beta]
band_buffer = np.zeros((n_win_test, 4))
""" 3. GET DATA """
# The try/except structure allows to quit the while loop by aborting the
# script with <Ctrl-C>
print('Press Ctrl-C in the console to break the while loop.')
try:
# The following loop acquires data, computes band powers, and calculates neurofeedback metrics based on those band powers
while True:
""" 3.1 ACQUIRE DATA """
# Obtain EEG data from the LSL stream
eeg_data, timestamp = inlet.pull_chunk(
timeout=1, max_samples=int(SHIFT_LENGTH * fs))
# Only keep the channel we're interested in
ch_data = np.array(eeg_data)[:, INDEX_CHANNEL]
# Update EEG buffer with the new data
eeg_buffer, filter_state = utils.update_buffer(
eeg_buffer, ch_data, notch=True,
filter_state=filter_state)
""" 3.2 COMPUTE BAND POWERS """
# Get newest samples from the buffer
data_epoch = utils.get_last_data(eeg_buffer,
EPOCH_LENGTH * fs)
# Compute band powers
band_powers = utils.compute_band_powers(data_epoch, fs)
band_buffer, _ = utils.update_buffer(band_buffer,
np.asarray([band_powers]))
# Compute the average band powers for all epochs in buffer
# This helps to smooth out noise
smooth_band_powers = np.mean(band_buffer, axis=0)
# print('Delta: ', band_powers[Band.Delta], ' Theta: ', band_powers[Band.Theta],
# ' Alpha: ', band_powers[Band.Alpha], ' Beta: ', band_powers[Band.Beta])
""" 3.3 COMPUTE NEUROFEEDBACK METRICS """
# These metrics could also be used to drive brain-computer interfaces
# Alpha Protocol:
# Simple redout of alpha power, divided by delta waves in order to rule out noise
# alpha_metric = smooth_band_powers[Band.Alpha] / \
# smooth_band_powers[Band.Delta]
# print('Alpha Relaxation: ', alpha_metric)
# Beta Protocol:
# Beta waves have been used as a measure of mental activity and concentration
# This beta over theta ratio is commonly used as neurofeedback for ADHD
beta_metric = smooth_band_powers[Band.Beta] / \
smooth_band_powers[Band.Theta]
print('Beta Concentration: ', beta_metric)
# Alpha/Theta Protocol:
# This is another popular neurofeedback metric for stress reduction
# Higher theta over alpha is supposedly associated with reduced anxiety
# theta_metric = smooth_band_powers[Band.Theta] / \
# smooth_band_powers[Band.Alpha]
# print('Theta Relaxation: ', theta_metric)
if (beta_metric > 0.5) :
new_state = 2 # Excited
elif (beta_metric > 0.3) :
new_state = 1 # Neutral
else :
new_state = 0 # Relaxed
if (new_state != state) :
state = new_state
ser.write((f"{new_state}").encode())
print('State: ', state)
except KeyboardInterrupt:
print('Closing!')
ser.close() # close port