Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ All notable changes to this project will be documented in this file.

* ### Major Changes
* Added support for mioDAQ configurable digital voltage.
* Added support for mioDAQ ID Pin.
* Removed support for Python 3.8.

* ### Known Issues
Expand Down
8 changes: 8 additions & 0 deletions generated/nidaqmx/_base_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,10 @@ def read_digital_u8(
self, task, num_samps_per_chan, timeout, fill_mode, read_array):
raise NotImplementedError

@abc.abstractmethod
def read_id_pin_memory(self, device_name, id_pin_name):
raise NotImplementedError

@abc.abstractmethod
def read_power_binary_i16(
self, task, num_samps_per_chan, timeout, fill_mode,
Expand Down Expand Up @@ -1817,6 +1821,10 @@ def write_digital_u8(
write_array):
raise NotImplementedError

@abc.abstractmethod
def write_id_pin_memory(self, device_name, id_pin_name, data, format_code):
raise NotImplementedError

@abc.abstractmethod
def write_raw(self, task, num_samps, auto_start, timeout, write_array):
raise NotImplementedError
Expand Down
23 changes: 21 additions & 2 deletions generated/nidaqmx/_grpc_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3533,6 +3533,13 @@ def write_digital_u8(
write_array=write_array.tobytes()))
return response.samps_per_chan_written

def write_id_pin_memory(self, device_name, id_pin_name, data, format_code):
response = self._invoke(
self._client.WriteIDPinMemory,
grpc_types.WriteIDPinMemoryRequest(
device_name=device_name, id_pin_name=id_pin_name,
data=data.tobytes(), format_code=format_code))

def write_raw(self, task, num_samps, auto_start, timeout, write_array):
_validate_array_dtype(write_array, numpy.generic)
response = self._invoke(
Expand Down Expand Up @@ -3574,6 +3581,18 @@ def get_error_string(self, error_code):
_logger.exception('Failed to get error string for error code %d.', error_code)
return 'Failed to retrieve error description.'

def read_id_pin_memory(self, device_name, id_pin_name):
response = self._invoke(
self._client.ReadIDPinMemory,
grpc_types.ReadIDPinMemoryRequest(device_name=device_name, id_pin_name=id_pin_name, array_size=0))
if response.status <= 0:
self._check_for_error_from_response(response.status)
response = self._invoke(
self._client.ReadIDPinMemory,
grpc_types.ReadIDPinMemoryRequest(device_name=device_name, id_pin_name=id_pin_name, array_size=response.status))
self._check_for_error_from_response(response.status)
return list(response.data), response.data_length_read, response.format_code

def set_runtime_environment(
self, environment, environment_version, reserved_1, reserved_2):
raise NotImplementedError
Expand Down Expand Up @@ -3607,5 +3626,5 @@ def _is_cancelled(ex: Exception) -> bool:
)

_ERROR_MESSAGES = {
DAQmxErrors.SAMPLES_NOT_YET_AVAILABLE: 'Some or all of the samples requested have not yet been acquired.\nTo wait for the samples to become available use a longer read timeout or read later in your program. To make the samples available sooner, increase the sample rate. If your task uses a start trigger, make sure that your start trigger is configured correctly. It is also possible that you configured the task for external timing, and no clock was supplied. If this is the case, supply an external clock.'
}
DAQmxErrors.SAMPLES_NOT_YET_AVAILABLE: 'Some or all of the samples requested have not yet been acquired.\n\nTo wait for the samples to become available use a longer read timeout or read later in your program. To make the samples available sooner, increase the sample rate. If your task uses a start trigger, make sure that your start trigger is configured correctly. It is also possible that you configured the task for external timing, and no clock was supplied. If this is the case, supply an external clock.'
}
47 changes: 45 additions & 2 deletions generated/nidaqmx/_library_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class LibraryInterpreter(BaseInterpreter):
__slots__ = ()

def __init__(self):
global _was_runtime_environment_set
global _was_runtime_environment_set
if _was_runtime_environment_set is None:
try:
runtime_env = platform.python_implementation()
Expand Down Expand Up @@ -6242,6 +6242,20 @@ def write_digital_u8(
self.check_for_error(error_code, samps_per_chan_written=samps_per_chan_written.value)
return samps_per_chan_written.value

def write_id_pin_memory(self, device_name, id_pin_name, data, format_code):
cfunc = lib_importer.windll.DAQmxWriteIDPinMemory
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes_byte_str,
wrapped_ndpointer(dtype=numpy.uint8, flags=('C')),
ctypes.c_uint, ctypes.c_uint]

error_code = cfunc(
device_name, id_pin_name, data, len(data), format_code)
self.check_for_error(error_code)

def write_to_teds_from_array(
self, physical_channel, bit_stream, basic_teds_options):
cfunc = lib_importer.windll.DAQmxWriteToTEDSFromArray
Expand Down Expand Up @@ -6300,6 +6314,35 @@ def get_extended_error_info(self):
return 'Failed to retrieve error description.'
return error_buffer.value.decode(lib_importer.encoding)

def read_id_pin_memory(self, device_name, id_pin_name):
data_length_read = ctypes.c_uint()
format_code = ctypes.c_uint()

cfunc = lib_importer.windll.DAQmxReadIDPinMemory
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
ctypes_byte_str, ctypes_byte_str,
wrapped_ndpointer(dtype=numpy.uint8, flags=('C','W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_uint),
ctypes.POINTER(ctypes.c_uint)]

array_size = cfunc(
device_name, id_pin_name, None, 0,
ctypes.byref(data_length_read), ctypes.byref(format_code))

if array_size < 0:
self.check_for_error(array_size)

data = numpy.zeros(array_size, dtype=numpy.uint8)

error_code = cfunc(
device_name, id_pin_name, data, array_size,
ctypes.byref(data_length_read), ctypes.byref(format_code))
self.check_for_error(error_code)
return data.tolist(), data_length_read.value, format_code.value

def read_power_binary_i16(
self, task, num_samps_per_chan, timeout, fill_mode,
read_voltage_array, read_current_array):
Expand Down Expand Up @@ -6424,4 +6467,4 @@ def is_string_buffer_too_small(error_code):


def is_array_buffer_too_small(error_code):
return error_code == DAQmxErrors.WRITE_BUFFER_TOO_SMALL
return error_code == DAQmxErrors.WRITE_BUFFER_TOO_SMALL
49 changes: 49 additions & 0 deletions generated/nidaqmx/_stubs/data_moniker_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading