Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Job Queue and Status Tracking for Segmentation Tasks in PlanktoScope #46

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions processing/segmenter/planktoscope/segmenter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ def __init__(self, event, data_path):
if not os.path.exists(path):
# create the path!
os.makedirs(path)

# Initialize job queue and status tracking
self.job_queue = queue.Queue()
self.job_status = {} # Dictionary to track status of each job
self.segmenter_client = self.initialize_mqtt_client()

logger.success("planktoscope.segmenter is initialised and ready to go!")

Expand Down Expand Up @@ -740,6 +745,65 @@ def _pipe(self, ecotaxa_export):
# we're done free some mem
self.__flat = None

def initialize_mqtt_client(self):
# Initialize the MQTT client (placeholder)
return MQTT_Client("segmenter/#", "segmenter_client")

def add_job(self, job_id, job_data):
# Add a new job to the queue and set initial status
self.job_queue.put((job_id, job_data))
self.job_status[job_id] = {"status": "queued"}
self.segmenter_client.client.publish("status/segmenter", json.dumps({
"job_id": job_id,
"status": "queued"
}))

def update_job_status(self, job_id, status):
# Update the status of a job
if job_id in self.job_status:
self.job_status[job_id]["status"] = status
self.segmenter_client.client.publish("status/segmenter", json.dumps({
"job_id": job_id,
"status": status
}))

def process_jobs(self):
# Process jobs from the queue
while True:
if not self.job_queue.empty():
job_id, job_data = self.job_queue.get()
try:
self.update_job_status(job_id, "processing")
self.process_job(job_data) # Actual job processing logic
self.update_job_status(job_id, "completed")
except Exception as e:
self.update_job_status(job_id, "failed")
self.job_status[job_id]["error"] = str(e)
time.sleep(0.5)

def process_job(self, job_data):
# Placeholder for job processing logic
time.sleep(2) # Simulate processing time

def run(self):
# Start the job processing thread
threading.Thread(target=self.process_jobs, daemon=True).start()

def handle_mqtt_message(self, topic, payload):
# Handle incoming MQTT messages for job status queries
try:
request = json.loads(payload)
if "job_id" in request:
job_id = request["job_id"]
if job_id in self.job_status:
response = self.job_status[job_id]
else:
response = {"status": "not_found"}
self.segmenter_client.client.publish(f"status/segmenter/{job_id}", json.dumps(response))
except Exception as e:
print(f"Failed to handle message: {e}")


def segment_all(self, paths: list, force=False, ecotaxa_export=True):
"""Starts the segmentation in all the folders given recursively

Expand Down Expand Up @@ -998,6 +1062,19 @@ def treat_message(self):
f"We did not understand the received request {last_message}"
)

class MQTT_Client:
def __init__(self, topic, name):
# Placeholder for MQTT client initialization
self.client = self.initialize_mqtt_client()

def initialize_mqtt_client(self):
# Placeholder for setting up the MQTT client
return self

def publish(self, topic, message):
# Placeholder for MQTT publish functionality
print(f"Published to {topic}: {message}")

################################################################################
# While loop for capturing commands from Node-RED
################################################################################
Expand Down Expand Up @@ -1053,5 +1130,14 @@ def run(self):

# This is called if this script is launched directly
if __name__ == "__main__":
segmenter = Segmenter()
segmenter.run()

# Simulate adding jobs
segmenter.add_job("job1", {"data": "example_data1"})
segmenter.add_job("job2", {"data": "example_data2"})

# Simulate MQTT message for job status query
segmenter.handle_mqtt_message("status/segmenter/request", json.dumps({"job_id": "job1"}))
# TODO This should be a test suite for this library
pass