Skip to content

Commit

Permalink
Revert "make em track in place"
Browse files Browse the repository at this point in the history
This reverts commit 55f5579.
  • Loading branch information
caila-marashaj committed Jan 22, 2025
1 parent 55f5579 commit 5e62dd1
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 148 deletions.
194 changes: 115 additions & 79 deletions api/src/opentrons/protocol_engine/commands/aspirate_while_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
FlowRateMixin,
BaseLiquidHandlingResult,
aspirate_while_tracking,
prepare_for_aspirate,
)
from .movement_common import (
LiquidHandlingWellLocationMixin,
DestinationPositionResult,
StallOrCollisionError,
move_to_well,
)
from .command import (
AbstractCommandImpl,
Expand All @@ -24,13 +26,18 @@
DefinedErrorData,
SuccessData,
)
from ..errors.exceptions import PipetteNotReadyToAspirateError

from opentrons.hardware_control import HardwareControlAPI
from ..state.update_types import CLEAR
from ..types import CurrentWell, DeckPoint

from ..state.update_types import StateUpdate, CLEAR
from ..types import (
WellLocation,
WellOrigin,
CurrentWell,
)

if TYPE_CHECKING:
from ..execution import MovementHandler, PipettingHandler, GantryMover
from ..execution import MovementHandler, PipettingHandler
from ..resources import ModelUtils
from ..state.state import StateView
from ..notes import CommandNoteAdder
Expand Down Expand Up @@ -75,7 +82,6 @@ def __init__(
movement: MovementHandler,
command_note_adder: CommandNoteAdder,
model_utils: ModelUtils,
gantry_mover: GantryMover,
**kwargs: object,
) -> None:
self._pipetting = pipetting
Expand All @@ -84,106 +90,136 @@ def __init__(
self._movement = movement
self._command_note_adder = command_note_adder
self._model_utils = model_utils
self._gantry_mover = gantry_mover

async def execute(self, params: AspirateWhileTrackingParams) -> _ExecuteReturn:
"""Move to and aspirate from the requested well.
Raises:
TipNotAttachedError: if no tip is attached to the pipette.
PipetteNotReadyToAspirateError: pipette plunger is not ready.
"""
pipette_id = params.pipetteId
labware_id = params.labwareId
well_name = params.wellName
well_location = params.wellLocation

state_update = StateUpdate()

final_location = self._state_view.geometry.get_well_position(
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
operation_volume=-params.volume,
pipette_id=pipette_id,
)

ready_to_aspirate = self._pipetting.get_is_ready_to_aspirate(
pipette_id=params.pipetteId,
pipette_id=pipette_id
)

current_well = None

if not ready_to_aspirate:
raise PipetteNotReadyToAspirateError(
"Pipette cannot aspirate in place because of a previous blow out."
" The first aspirate following a blow-out must be from a specific well"
" so the plunger can be reset in a known safe position."
move_result = await move_to_well(
movement=self._movement,
model_utils=self._model_utils,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=WellLocation(origin=WellOrigin.TOP),
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return DefinedErrorData(move_result.public, state_update=state_update)

prepare_result = await prepare_for_aspirate(
pipette_id=pipette_id,
pipetting=self._pipetting,
model_utils=self._model_utils,
# Note that the retryLocation is the final location, inside the liquid,
# because that's where we'd want the client to try re-aspirating if this
# command fails and the run enters error recovery.
location_if_error={"retryLocation": final_location},
)
state_update.append(prepare_result.state_update)
if isinstance(prepare_result, DefinedErrorData):
return DefinedErrorData(
public=prepare_result.public, state_update=state_update
)

current_position = await self._gantry_mover.get_position(params.pipetteId)
current_location = self._state_view.pipettes.get_current_location()
# set our current deck location to the well now that we've made
# an intermediate move for the "prepare for aspirate" step
current_well = CurrentWell(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
)
move_result = await move_to_well(
movement=self._movement,
model_utils=self._model_utils,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
current_well=current_well,
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return DefinedErrorData(
public=move_result.public, state_update=state_update
)

aspirate_result = await aspirate_while_tracking(
pipette_id=params.pipetteId,
labware_id=params.labwareId,
well_name=params.wellName,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
volume=params.volume,
flow_rate=params.flowRate,
location_if_error={
"retryLocation": (
current_position.x,
current_position.y,
current_position.z,
move_result.public.position.x,
move_result.public.position.y,
move_result.public.position.z,
)
},
command_note_adder=self._command_note_adder,
pipetting=self._pipetting,
model_utils=self._model_utils,
)
position_after_aspirate = await self._gantry_mover.get_position(
params.pipetteId
state_update.append(aspirate_result.state_update)
if isinstance(aspirate_result, DefinedErrorData):
state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
)
return DefinedErrorData(
public=aspirate_result.public, state_update=state_update
)

state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
)
result_deck_point = DeckPoint.model_construct(
x=position_after_aspirate.x,
y=position_after_aspirate.y,
z=position_after_aspirate.z,

return SuccessData(
public=AspirateWhileTrackingResult(
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=state_update,
)
if isinstance(aspirate_result, DefinedErrorData):
if (
isinstance(current_location, CurrentWell)
and current_location.pipette_id == params.pipetteId
):
return DefinedErrorData(
public=aspirate_result.public,
state_update=aspirate_result.state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=CLEAR,
),
state_update_if_false_positive=aspirate_result.state_update_if_false_positive,
)
else:
return aspirate_result
else:
if (
isinstance(current_location, CurrentWell)
and current_location.pipette_id == params.pipetteId
):
return SuccessData(
public=AspirateWhileTrackingResult(
volume=aspirate_result.public.volume,
position=result_deck_point,
),
state_update=aspirate_result.state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
),
)
else:
return SuccessData(
public=AspirateWhileTrackingResult(
volume=aspirate_result.public.volume,
position=result_deck_point,
),
state_update=aspirate_result.state_update,
)


class AspirateWhileTracking(
Expand Down
Loading

0 comments on commit 5e62dd1

Please sign in to comment.