diff --git a/photon-lib/py/photonlibpy/photonCamera.py b/photon-lib/py/photonlibpy/photonCamera.py index c7635dca58..03ab2cafbe 100644 --- a/photon-lib/py/photonlibpy/photonCamera.py +++ b/photon-lib/py/photonlibpy/photonCamera.py @@ -48,6 +48,10 @@ def setVersionCheckEnabled(enabled: bool): class PhotonCamera: def __init__(self, cameraName: str): + """Constructs a PhotonCamera from the name of the camera. + + :param cameraName: The nickname of the camera (found in the PhotonVision UI). + """ instance = ntcore.NetworkTableInstance.getDefault() self._name = cameraName self._tableName = "photonvision" @@ -132,6 +136,14 @@ def getAllUnreadResults(self) -> List[PhotonPipelineResult]: return ret def getLatestResult(self) -> PhotonPipelineResult: + """Returns the latest pipeline result. This is simply the most recent result Received via NT. + Calling this multiple times will always return the most recent result. + + Replaced by :meth:`.getAllUnreadResults` over getLatestResult, as this function can miss + results, or provide duplicate ones! + TODO implement the thing that will take this ones place... + """ + self._versionCheck() now = RobotController.getFPGATime() @@ -149,34 +161,85 @@ def getLatestResult(self) -> PhotonPipelineResult: return retVal def getDriverMode(self) -> bool: + """Returns whether the camera is in driver mode. + + :returns: Whether the camera is in driver mode. + """ + return self._driverModeSubscriber.get() def setDriverMode(self, driverMode: bool) -> None: + """Toggles driver mode. + + :param driverMode: Whether to set driver mode. + """ + self._driverModePublisher.set(driverMode) def takeInputSnapshot(self) -> None: + """Request the camera to save a new image file from the input camera stream with overlays. Images + take up space in the filesystem of the PhotonCamera. Calling it frequently will fill up disk + space and eventually cause the system to stop working. Clear out images in + /opt/photonvision/photonvision_config/imgSaves frequently to prevent issues. + """ + self._inputSaveImgEntry.set(self._inputSaveImgEntry.get() + 1) def takeOutputSnapshot(self) -> None: + """Request the camera to save a new image file from the output stream with overlays. Images take + up space in the filesystem of the PhotonCamera. Calling it frequently will fill up disk space + and eventually cause the system to stop working. Clear out images in + /opt/photonvision/photonvision_config/imgSaves frequently to prevent issues. + """ self._outputSaveImgEntry.set(self._outputSaveImgEntry.get() + 1) def getPipelineIndex(self) -> int: + """Returns the active pipeline index. + + :returns: The active pipeline index. + """ + return self._pipelineIndexState.get(0) def setPipelineIndex(self, index: int) -> None: + """Allows the user to select the active pipeline index. + + :param index: The active pipeline index. + """ self._pipelineIndexRequest.set(index) def getLEDMode(self) -> VisionLEDMode: + """Returns the current LED mode. + + :returns: The current LED mode. + """ + mode = self._ledModeState.get() return VisionLEDMode(mode) def setLEDMode(self, led: VisionLEDMode) -> None: + """Sets the LED mode. + + :param led: The mode to set to. + """ + self._ledModeRequest.set(led.value) def getName(self) -> str: + """Returns the name of the camera. This will return the same value that was given to the + constructor as cameraName. + + :returns: The name of the camera. + """ return self._name def isConnected(self) -> bool: + """Returns whether the camera is connected and actively returning new data. Connection status is + debounced. + + :returns: True if the camera is actively sending frame data, false otherwise. + """ + curHeartbeat = self._heartbeatEntry.get() now = Timer.getFPGATimestamp() @@ -197,6 +260,8 @@ def _versionCheck(self) -> None: _lastVersionTimeCheck = Timer.getFPGATimestamp() + # Heartbeat entry is assumed to always be present. If it's not present, we + # assume that a camera with that name was never connected in the first place. if not self._heartbeatEntry.exists(): cameraNames = ( self._cameraTable.getInstance().getTable(self._tableName).getSubTables() @@ -222,6 +287,7 @@ def _versionCheck(self) -> None: True, ) + # Check for connection status. Warn if disconnected. elif not self.isConnected(): wpilib.reportWarning( f"PhotonVision coprocessor at path {self._path} is not sending new data.", @@ -229,8 +295,9 @@ def _versionCheck(self) -> None: ) versionString = self.versionEntry.get(defaultValue="") - localUUID = PhotonPipelineResult.photonStruct.MESSAGE_VERSION + # Check mdef UUID + localUUID = PhotonPipelineResult.photonStruct.MESSAGE_VERSION remoteUUID = str(self._rawBytesEntry.getTopic().getProperty("message_uuid")) if not remoteUUID: