Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Version 0.6.7

* Issue #12167 - Fixed ZPLSC-B cabled data processing in the playback tool
* Added a flag to indicate the ZPLSC-B driver has completed the processing of a raw data file.
* Used above flag to break out of a infonite loop calling publish.
* Added a 1 second delay in the publishing loop, to allow the ZPLSC-B driver to process.

* Issue #13568 - Added capability of processing a 1-hour data file
* Added option --file - Generate an echogram from a single 1-hour file from the command line
* Added option --all - Generate all echograms for all ZPLSC instruments
Expand Down
7 changes: 4 additions & 3 deletions mi/common/zpls_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ def _get_power_range(power_dict):
return min_db, max_db

@staticmethod
def _transpose_and_flip(power_dict):
for channel in power_dict:
def _transpose_and_flip(original_power_dict):
power_dict = {}
for channel in original_power_dict:
# Transpose array data so we have time on the x-axis and depth on the y-axis
power_dict[channel] = power_dict[channel].transpose()
power_dict[channel] = original_power_dict[channel].transpose()
# reverse the Y axis (so depth is measured from the surface (at the top) to the ZPLS (at the bottom)
power_dict[channel] = power_dict[channel][::-1]
return power_dict
Expand Down
16 changes: 8 additions & 8 deletions mi/core/instrument/playback.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,13 @@ def zplsc_playback(self):
if filename:
self.set_header_filename(filename)
log.info("filename is: %s", filename)
if hasattr(self.protocol, 'got_filename'):
self.protocol.got_filename(filename)

pub_index = 0
while True:
while not self.protocol.is_processing_completed():
self.publish()
pub_index = pub_index + 1
log.info("publish index is: %d", pub_index)
time.sleep(1)
self.publish()

def got_data(self, packet):
try:
Expand Down Expand Up @@ -191,8 +190,9 @@ def construct_protocol(self, proto_module):
def publish(self):
for publisher in [self.event_publisher, self.particle_publisher]:
remaining = publisher.publish()
while remaining >= publisher._max_events:
while remaining >= publisher.get_max_events():
remaining = publisher.publish()
publisher.publish()

def handle_event(self, event_type, val=None):
"""
Expand Down Expand Up @@ -366,7 +366,8 @@ def main():
files = [files]

zplsc_reader = False

reader = None

if options['datalog']:
reader = DatalogReader
elif options['ascii']:
Expand All @@ -376,14 +377,13 @@ def main():
elif options['zplsc']:
reader = ZplscReader
zplsc_reader = True
else:
reader = None

wrapper = PlaybackWrapper(module, refdes, event_url, particle_url, reader, allowed, files, max_events)
if zplsc_reader:
wrapper.zplsc_playback()
else:
wrapper.playback()


if __name__ == '__main__':
main()
4 changes: 4 additions & 0 deletions mi/core/instrument/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def _merge_headers(self, headers):
def set_source(self, source):
self._headers[self.SOURCE] = source

def get_max_events(self):
return self._max_events

def start(self):
t = Thread(target=self._run)
t.setDaemon(True)
Expand Down Expand Up @@ -209,6 +212,7 @@ def _publish(self, events, headers):
self.total += count
log.info('Publish %d events (%d total)', count, self.total)


class IngestEnginePublisher(Publisher):
""" Publisher used to send particle data to Ingest Engine via a ParticleDataHandler """
def __init__(self, handler, *args, **kwargs):
Expand Down
6 changes: 6 additions & 0 deletions mi/dataset/dataset_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ def setParticleDataCaptureFailure(self):
log.debug("Particle data capture failed")
self._failure = True

def is_particle_data_capture_failure(self):
return self._failure

def get_particle_samples(self):
return self._samples


class DataSetDriver(object):
"""
Expand Down
2 changes: 1 addition & 1 deletion mi/dataset/parser/zplsc_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class AzfpProfileHeader(BigEndianStructure):
('hour', c_ushort), # 018 - Hour
('minute', c_ushort), # 020 - Minute
('second', c_ushort), # 022 - Second
('hundredths', c_ushort), # 024 - Hundreths of a second
('hundredths', c_ushort), # 024 - Hundredths of a second
('digitization_rate', c_ushort*4), # 026 - Digitization Rate (channels 1-4) (64000, 40000 or 20000)
('lockout_index', c_ushort*4), # 034 - The sample number of samples skipped at start of ping (channels 1-4)
('num_bins', c_ushort*4), # 042 - Number of bins (channels 1-4)
Expand Down
56 changes: 39 additions & 17 deletions mi/instrument/kut/ek60/ooicore/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,9 @@ def __init__(self, prompts, newline, driver_event):

self._chunker = StringChunker(self.sieve_function)

# Used by invoking methods (e.g, the playback tool) to know when the processing has completed.
self._processing_completed = False

log.info('processing particles with %d workers', POOL_SIZE)
self._process_particles = True
self._pending_particles = deque()
Expand All @@ -568,7 +571,7 @@ def particles_thread(self):
while self._process_particles or futures:
# Pull all processing requests from our request deque
# Unless we have been instructed to terminate
while True and self._process_particles:
while self._process_particles:
try:
filepath, timestamp = self._pending_particles.popleft()
log.info('Received RAW file to process: %r %r', filepath, timestamp)
Expand All @@ -577,6 +580,8 @@ def particles_thread(self):
# tuple containing the metadata and timestamp for creation
# of the particle
futures[(filepath, timestamp)] = processing_pool.apply_async(parse_particles_file, (filepath,))
# After receiving a file to process, reset the process completed flag.
self._processing_completed = False
except IndexError:
break

Expand Down Expand Up @@ -615,31 +620,38 @@ def particles_thread(self):
if self._driver_event:
self._driver_event(DriverAsyncEvent.SAMPLE, parsed_sample)

for counter, data_timestamp in enumerate(data_times):
zp_data = {
ZplscBParticleKey.FREQ_CHAN_1: frequencies[1],
ZplscBParticleKey.VALS_CHAN_1: list(power_data_dict[1][counter]),
ZplscBParticleKey.FREQ_CHAN_2: frequencies[2],
ZplscBParticleKey.VALS_CHAN_2: list(power_data_dict[2][counter]),
ZplscBParticleKey.FREQ_CHAN_3: frequencies[3],
ZplscBParticleKey.VALS_CHAN_3: list(power_data_dict[3][counter]),
}
try:
for counter, data_timestamp in enumerate(data_times):
zp_data = {
ZplscBParticleKey.FREQ_CHAN_1: frequencies[1],
ZplscBParticleKey.VALS_CHAN_1: list(power_data_dict[1][counter]),
ZplscBParticleKey.FREQ_CHAN_2: frequencies[2],
ZplscBParticleKey.VALS_CHAN_2: list(power_data_dict[2][counter]),
ZplscBParticleKey.FREQ_CHAN_3: frequencies[3],
ZplscBParticleKey.VALS_CHAN_3: list(power_data_dict[3][counter]),
}

sample_particle = ZplscBSampleDataParticle(zp_data, port_timestamp=timestamp,
internal_timestamp=data_timestamp,
preferred_timestamp=DataParticleKey.INTERNAL_TIMESTAMP)

sample_particle = ZplscBSampleDataParticle(zp_data, port_timestamp=timestamp,
internal_timestamp=data_timestamp,
preferred_timestamp=DataParticleKey.INTERNAL_TIMESTAMP)
parsed_sample_particles = sample_particle.generate()

parsed_sample_particles = sample_particle.generate()
if self._driver_event:
self._driver_event(DriverAsyncEvent.SAMPLE, parsed_sample_particles)
except Exception as ex:
log.error('Error creating sample data particle for %r: %r', filepath, ex)

if self._driver_event:
self._driver_event(DriverAsyncEvent.SAMPLE, parsed_sample_particles)
# Indicate for invoking methods that the file processing has completed.
self._processing_completed = True

time.sleep(1)

finally:
if processing_pool:
processing_pool.terminate()
processing_pool.join()
self._processing_completed = True

def shutdown(self):
log.info('Shutting down ZPLSC protocol')
Expand All @@ -651,6 +663,15 @@ def shutdown(self):
self._particles_thread.join(timeout=600)
log.info('Completed ZPLSC protocol shutdown')

def is_processing_completed(self):
"""
Indicates to methods that instantiate this Protocol that the file processing in
the particles thread has completed. The playback tools uses for ingesting ZPLSC
B series data.
:return: self._processing_completed
"""
return self._processing_completed

def _build_param_dict(self):
"""
Populate the parameter dictionary with parameters.
Expand Down Expand Up @@ -1093,5 +1114,6 @@ def _got_chunk(self, chunk, timestamp):
# Queue up this file for processing
self._pending_particles.append((match.group('Filepath'), timestamp))


def create_playback_protocol(callback):
return Protocol(None, None, callback)
return Protocol(None, None, callback)
17 changes: 13 additions & 4 deletions mi/instrument/kut/ek60/ooicore/zplsc_b.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ def parse_particles_file(input_file_path, output_file_path=None):
# Read binary file a block at a time
raw = input_file.read(BLOCK_SIZE)

datagrams_processed = 0
while len(raw) > 4:
# We only care for the Sample datagrams, skip over all the other datagrams
match = SAMPLE_MATCHER.search(raw)
Expand Down Expand Up @@ -454,6 +455,11 @@ def parse_particles_file(input_file_path, output_file_path=None):
except InvalidTransducer:
pass

# Log progress so the user has has knowledge of the progress.
if (datagrams_processed % 5000) == 0:
log.info('Completed processing %r datagrams from: %r', datagrams_processed, input_file_path)
datagrams_processed += 1

else:
input_file.seek(position + BLOCK_SIZE - 4)

Expand All @@ -477,10 +483,13 @@ def parse_particles_file(input_file_path, output_file_path=None):

log.info('Begin generating echogram: %r', image_path)

plot = ZPLSPlot(data_times, power_data_dict, frequencies, 0, max_depth * bin_size)
plot.generate_plots()
plot.write_image(image_path)
try:
plot = ZPLSPlot(data_times, power_data_dict, frequencies, 0, max_depth * bin_size)
plot.generate_plots()
plot.write_image(image_path)

log.info('Completed generating echogram: %r', image_path)
log.info('Completed generating echogram: %r', image_path)
except IOError:
log.error('Error generating echogram: %r', image_path)

return meta_data, timestamp, data_times, power_data_dict, frequencies