Skip to content
75 changes: 75 additions & 0 deletions tests/component/_analog_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Shared utilities for analog component tests."""

from __future__ import annotations

import pytest


# Simulated DAQ voltage data is a noisy sinewave within the range of the minimum and maximum values
# of the virtual channel. We can leverage this behavior to validate we get the correct data from
# the Python bindings.
def _get_voltage_offset_for_chan(chan_index: int) -> float:
return float(chan_index + 1)


def _get_voltage_setpoint_for_chan(chan_index: int) -> float:
return float(chan_index + 1)


def _get_current_setpoint_for_chan(chan_index: int) -> float:
return float(chan_index + 1)


def _get_expected_voltage_for_chan(chan_index: int) -> float:
return float(chan_index + 1)


def _volts_to_codes(volts: float, max_code: int = 32767, max_voltage: float = 10.0) -> int:
return int(volts * max_code / max_voltage)


def _pwr_volts_to_codes(volts: float, codes_per_volt: int = 4096) -> int:
return int(volts * codes_per_volt)


def _pwr_current_to_codes(current: float, codes_per_amp: int = 8192) -> int:
return int(current * codes_per_amp)


def _get_voltage_code_setpoint_for_chan(chan_index: int) -> int:
return _pwr_volts_to_codes(_get_voltage_setpoint_for_chan(chan_index))


def _get_current_code_setpoint_for_chan(chan_index: int) -> int:
return _pwr_current_to_codes(_get_current_setpoint_for_chan(chan_index))


# Note: Since we only use positive voltages, this works fine for both signed and unsigned reads.
def _get_voltage_code_offset_for_chan(chan_index: int) -> int:
voltage_limits = _get_voltage_offset_for_chan(chan_index)
return _volts_to_codes(voltage_limits)


def _assert_equal_2d(data: list[list[float]], expected: list[list[float]], abs: float) -> None:
assert len(data) == len(expected)
for i in range(len(data)):
assert data[i] == pytest.approx(expected[i], abs=abs)


# NOTE: We use simulated signals for AI validation, so we can be fairly strict here.
AI_VOLTAGE_EPSILON = 1e-3

# NOTE: We must use real signals for AO validation, but we aren't validating hardware accuracy here.
# This should be wide enough tolerance to allow for uncalibrated boards while still ensuring we are
# correctly configuring hardware.
AO_VOLTAGE_EPSILON = 1e-2

# NOTE: You can't scale from volts to codes correctly without knowing the internal calibration
# constants. The internal reference has a healthy amount of overrange to ensure we can calibrate to
# device specifications. I've used 10.1 volts above to approximate that, but 100mv of accuracy is
# also fine since the expected output of each channel value will be 1 volt apart.
RAW_VOLTAGE_EPSILON = 1e-1

VOLTAGE_CODE_EPSILON = round(_volts_to_codes(AI_VOLTAGE_EPSILON))
POWER_EPSILON = 1e-3
POWER_BINARY_EPSILON = 1
127 changes: 127 additions & 0 deletions tests/component/_digital_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Shared utilities for digital component tests."""

from __future__ import annotations

from typing import Callable, TypeVar

import numpy
from nitypes.waveform import DigitalWaveform

import nidaqmx

_D = TypeVar("_D", bound=numpy.generic)


def _start_di_task(task: nidaqmx.Task) -> None:
# Don't reserve the lines, so we can read what DO is writing.
task.di_channels.all.di_tristate = False
task.start()


def _start_do_task(task: nidaqmx.Task, is_port: bool = False, num_chans: int = 1) -> None:
# We'll be doing on-demand, so start the task and drive all lines low
task.start()
if is_port:
if num_chans == 8:
task.write(0)
else:
task.write([0] * num_chans)
else:
if num_chans == 1:
task.write(False)
else:
task.write([False] * num_chans)


def _get_num_di_lines_in_task(task: nidaqmx.Task) -> int:
return sum([chan.di_num_lines for chan in task.channels])


def _get_num_do_lines_in_task(task: nidaqmx.Task) -> int:
return sum([chan.do_num_lines for chan in task.channels])


def _get_digital_data_for_sample(num_lines: int, sample_number: int) -> int:
result = 0
# Simulated digital signals "count" from 0 in binary within each group of 8 lines.
for _ in range((num_lines + 7) // 8):
result = (result << 8) | sample_number

line_mask = (2**num_lines) - 1
return result & line_mask


def _get_expected_data_for_line(num_samples: int, line_number: int) -> list[int]:
data = []
# Simulated digital signals "count" from 0 in binary within each group of 8 lines.
# Each line represents a bit in the binary representation of the sample number.
# - line 0 represents bit 0 (LSB) - alternates every sample: 0,1,0,1,0,1,0,1...
# - line 1 represents bit 1 - alternates every 2 samples: 0,0,1,1,0,0,1,1...
# - line 2 represents bit 2 - alternates every 4 samples: 0,0,0,0,1,1,1,1...
line_number %= 8
for sample_num in range(num_samples):
bit_value = (sample_num >> line_number) & 1
data.append(bit_value)
return data


def _get_digital_data(num_lines: int, num_samples: int) -> list[int]:
return [
_get_digital_data_for_sample(num_lines, sample_number)
for sample_number in range(num_samples)
]


def _get_expected_digital_port_data_port_major(
task: nidaqmx.Task, num_samples: int
) -> list[list[int]]:
return [_get_digital_data(chan.di_num_lines, num_samples) for chan in task.channels]


def _get_expected_digital_port_data_sample_major(
task: nidaqmx.Task, num_samples: int
) -> list[list[int]]:
result = _get_expected_digital_port_data_port_major(task, num_samples)
return numpy.transpose(result).tolist()


def _get_digital_port_data_for_sample(task: nidaqmx.Task, sample_number: int) -> list[int]:
return [
_get_digital_data_for_sample(chan.do_num_lines, sample_number) for chan in task.channels
]


def _get_digital_port_data_port_major(task: nidaqmx.Task, num_samples: int) -> list[list[int]]:
return [_get_digital_data(chan.do_num_lines, num_samples) for chan in task.channels]


def _get_digital_port_data_sample_major(task: nidaqmx.Task, num_samples: int) -> list[list[int]]:
result = _get_digital_port_data_port_major(task, num_samples)
return numpy.transpose(result).tolist()


def _bool_array_to_int(bool_array: numpy.typing.NDArray[numpy.bool_]) -> int:
result = 0
# Simulated data is little-endian
for bit in bool_array[::-1]:
result = (result << 1) | int(bit)
return result


def _int_to_bool_array(num_lines: int, input: int) -> numpy.typing.NDArray[numpy.bool_]:
result = numpy.full(num_lines, True, dtype=numpy.bool_)
for bit in range(num_lines):
result[bit] = (input & (1 << bit)) != 0
return result


def _get_waveform_data(waveform: DigitalWaveform) -> list[int]:
assert isinstance(waveform, DigitalWaveform)
return [_bool_array_to_int(sample) for sample in waveform.data]


def _read_and_copy(
read_func: Callable[[numpy.typing.NDArray[_D]], None], array: numpy.typing.NDArray[_D]
) -> numpy.typing.NDArray[_D]:
read_func(array)
return array.copy()
13 changes: 13 additions & 0 deletions tests/component/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Shared utilities for component tests."""

from __future__ import annotations

from datetime import timezone

from hightime import datetime as ht_datetime


def _is_timestamp_close_to_now(timestamp: ht_datetime, tolerance_seconds: float = 1.0) -> bool:
current_time = ht_datetime.now(timezone.utc)
time_diff = abs((timestamp - current_time).total_seconds())
return time_diff <= tolerance_seconds
Loading
Loading