Skip to content
Merged
22 changes: 22 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ $ poetry run tox
This requires you to have all the Python interpreters supported by **nidaqmx** installed on your
machine.

# Benchmarks

Benchmark tests are not run by default when you run pytest. To run the benchmarks, use this command:

```sh
# Run the benchmarks
# Compare benchmark before/after a change
# see https://pytest-benchmark.readthedocs.io/en/latest/comparing.html
# Run 1: --benchmark-save=some-name
# Run N: --benchmark-compare=0001
$ poetry run pytest -v tests/benchmark --device Dev1
```

Or you can use tox (which skips the gRPC variants):
```
poetry run -- tox -e py39-base-benchmark -- --device Dev1
```

The benchmarks are designed to run on a 6363 device. If you don't spcecify a specific
device using `--device`, then it will automatically use any real or simulated 6363
that can be found.

# Building Documentation

To build the documentation install the optional docs packages and run sphinx. For example:
Expand Down
35 changes: 34 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ types-grpcio = ">=1.0"

[tool.poetry.group.test.dependencies]
pytest = ">=7.2"
pytest-benchmark = ">=5.1"
pytest-cov = ">=4.0"
pytest-mock = ">=3.0"
pykka = ">=3.0"
Expand Down Expand Up @@ -135,7 +136,7 @@ application-import-names = "nidaqmx"
[tool.pytest.ini_options]
addopts = "--doctest-modules --strict-markers"
filterwarnings = ["always::ImportWarning", "always::ResourceWarning"]
testpaths = ["tests"]
testpaths = ["tests/acceptance", "tests/component", "tests/legacy", "tests/unit"]
markers = [
# Defines custom markers used by nidaqmx tests. Prevents PytestUnknownMarkWarning.
"library_only(reason=...): run the test with only the library interpreter implementation.",
Expand Down Expand Up @@ -184,6 +185,8 @@ module = [
"importlib_metadata",
"mako.*",
"nidaqmx.*",
# https://github.com/ionelmc/pytest-benchmark/issues/212 - Add type annotations
"pytest_benchmark.*",
]
ignore_missing_imports = true

Expand All @@ -196,6 +199,7 @@ warn_unused_ignores = false
typeCheckingMode = "basic"
reportArgumentType = false
reportAttributeAccessIssue = false
reportGeneralTypeIssues = false
reportInvalidTypeForm = false
reportOperatorIssue = false
reportOptionalIterable = false
Expand Down
1 change: 1 addition & 0 deletions tests/benchmark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Benchmarks for the nidaqmx package."""
202 changes: 202 additions & 0 deletions tests/benchmark/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""Fixtures for benchmark tests."""

from __future__ import annotations

import pytest

from nidaqmx import Task
from nidaqmx.constants import (
AcquisitionType,
Edge,
LineGrouping,
ReadRelativeTo,
TaskMode,
WaveformAttributeMode,
)
from nidaqmx.system import Device, System
from tests.conftest import DeviceType, _device_by_product_type


_WAVEFORM_BENCHMARK_MODES = [
WaveformAttributeMode.NONE,
WaveformAttributeMode.TIMING,
WaveformAttributeMode.TIMING | WaveformAttributeMode.EXTENDED_PROPERTIES,
]

_WAVEFORM_BENCHMARK_MODE_IDS = ["NONE", "TIMING", "ALL"]


def _configure_timing(task: Task, num_channels: int, num_samples: int) -> None:
task.timing.cfg_samp_clk_timing(
rate=25000.0,
active_edge=Edge.RISING,
sample_mode=AcquisitionType.FINITE,
samps_per_chan=num_channels * num_samples * 2,
)


def _start_input_task(task: Task) -> None:
task.start()
task.wait_until_done(timeout=10.0)
task.in_stream.relative_to = ReadRelativeTo.FIRST_SAMPLE


def _commit_output_task(task: Task, num_channels: int, num_samples: int) -> None:
task.out_stream.output_buf_size = num_channels * num_samples * 2
task.control(TaskMode.TASK_COMMIT)
task.out_stream.relative_to = ReadRelativeTo.FIRST_SAMPLE


def pytest_addoption(parser: pytest.Parser) -> None:
"""Add command line options to pytest."""
parser.addoption("--device", action="store", default=None, help="Device name for benchmarks")


@pytest.fixture
def benchmark_device(system: System, request: pytest.FixtureRequest) -> Device:
"""Get device for benchmarking."""
device: str | None = request.config.getoption("--device")
if device is not None:
return system.devices[device]

return _device_by_product_type("PCIe-6363", DeviceType.ANY, system)


@pytest.fixture
def ai_benchmark_task(
task: Task,
benchmark_device: Device,
request: pytest.FixtureRequest,
) -> Task:
"""Configure an AI task for benchmarking."""
num_channels = request.node.callspec.params.get("num_channels", 1)
num_samples = request.node.callspec.params.get("num_samples", 1)

for chan in range(num_channels):
task.ai_channels.add_ai_voltage_chan(
benchmark_device.ai_physical_chans[chan].name,
min_val=-5.0,
max_val=5.0,
)

_configure_timing(task, num_channels, num_samples)
_start_input_task(task)

return task


@pytest.fixture
def ao_benchmark_task(
task: Task,
real_x_series_multiplexed_device: Device,
request: pytest.FixtureRequest,
) -> Task:
"""Configure a hardware-timed buffered AO task for benchmarking."""
num_channels = request.node.callspec.params.get("num_channels", 1)
num_samples = request.node.callspec.params.get("num_samples", 1)

for chan in range(num_channels):
task.ao_channels.add_ao_voltage_chan(
real_x_series_multiplexed_device.ao_physical_chans[chan].name,
min_val=-10.0,
max_val=10.0,
)

_configure_timing(task, num_channels, num_samples)
_commit_output_task(task, num_channels, num_samples)

return task


@pytest.fixture
def di_lines_benchmark_task(
task: Task,
benchmark_device: Device,
request: pytest.FixtureRequest,
) -> Task:
"""Configure a hardware-timed buffered DI task for benchmarking."""
num_channels = request.node.callspec.params.get("num_channels", 1)
num_samples = request.node.callspec.params.get("num_samples", 1)
num_lines = request.node.callspec.params.get("num_lines", 1)

for chan in range(num_channels):
line_names = [
chan.name
for chan in benchmark_device.di_lines[chan * num_lines : (chan + 1) * num_lines]
]
physical_channel_string = ",".join(line_names)
task.di_channels.add_di_chan(
physical_channel_string, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES
)

_configure_timing(task, num_channels, num_samples)
_start_input_task(task)

return task


@pytest.fixture
def di_port32_benchmark_task(
task: Task,
benchmark_device: Device,
request: pytest.FixtureRequest,
) -> Task:
"""Configure a hardware-timed buffered DI task for benchmarking."""
num_samples = request.node.callspec.params.get("num_samples", 1)

# port 0 is the only port that supports buffered operations
task.di_channels.add_di_chan(
benchmark_device.di_ports[0].name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES
)

_configure_timing(task, 1, num_samples)
_start_input_task(task)

return task


@pytest.fixture
def do_lines_benchmark_task(
task: Task,
benchmark_device: Device,
request: pytest.FixtureRequest,
) -> Task:
"""Configure a hardware-timed buffered DO task for benchmarking."""
num_channels = request.node.callspec.params.get("num_channels", 1)
num_samples = request.node.callspec.params.get("num_samples", 1)
num_lines = request.node.callspec.params.get("num_lines", 1)

for chan in range(num_channels):
line_names = [
chan.name
for chan in benchmark_device.do_lines[chan * num_lines : (chan + 1) * num_lines]
]
physical_channel_string = ",".join(line_names)
task.do_channels.add_do_chan(
physical_channel_string, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES
)

_configure_timing(task, num_channels, num_samples)
_commit_output_task(task, num_channels, num_samples)

return task


@pytest.fixture
def do_port32_benchmark_task(
task: Task,
benchmark_device: Device,
request: pytest.FixtureRequest,
) -> Task:
"""Configure a hardware-timed buffered DO task for benchmarking."""
num_samples = request.node.callspec.params.get("num_samples", 1)

# port 0 is the only port that supports buffered operations
task.do_channels.add_do_chan(
benchmark_device.do_ports[0].name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES
)

_configure_timing(task, 1, num_samples)
_commit_output_task(task, 1, num_samples)

return task
Loading
Loading