Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Retry Mechanism #78

Open
harshraj22 opened this issue May 12, 2023 · 0 comments
Open

Add Retry Mechanism #78

harshraj22 opened this issue May 12, 2023 · 0 comments
Labels
enhancement New feature or request

Comments

@harshraj22
Copy link
Owner

Read this answer from bard:

You are right, I didn't add anything to Kafka and Celery. If they are down, the submit() method will fail and the circuit breaker will be opened. To prevent this, you can add a retry mechanism to the submit() method. The retry mechanism will try to call the submit() method a few times before giving up.

Here is an example of how to add a retry mechanism to the submit() method:

import time


@circuit_breaker(name="my_circuit_breaker")
async def submit(img1: UploadFile = UploadFile(filename='img1'), img2: UploadFile = UploadFile(filename='img2'), user_id = Depends(auth_wrapper)) -> str:
    """
    Submit two images to be compared. The images are put into the message
    queue, and the id corresponding to the task is returned.

    Note: One can only send JSON objects from Fastapi. So, the images are
    encoded as base64 strings. On the Celery side, the images are decoded
    back to bytes.
    """
    logger.info(f'user_id {user_id} submitted a post request.')

    if not is_allowed(user_id):
        raise HTTPException(status_code=403, detail='Rate Limit Exceeded')
    # shutil.copyfileobj(img1.file, open(img1.filename, 'wb'))
    img1_contents = await img1.read()
    img2_contents = await img2.read()
    data = {
            "img1": base64.b64encode(img1_contents).decode("utf-8"),
            "img2": base64.b64encode(img2_contents).decode("utf-8"),
        }

    logger.info(f'Sending data to kafka: {data.keys()}')
    producer.send(KAFKA_TOPIC, json.dumps(data).encode("utf-8"))

    for _ in range(3):
        try:
            return celery_app.send_task(
                "models.similarity",
                kwargs=data,
            ).id
        except Exception as e:
            logger.error(e)
            time.sleep(1)

    raise HTTPException(status_code=502, detail='Kafka or Celery is down.')

The submit() method will now try to call the submit() method 3 times before giving up. If the submit() method fails all 3 times, the submit() method will raise an exception.

By adding a retry mechanism to the submit() method, you can help to prevent your FastAPI microservice from failing if Kafka or Celery are down.

@harshraj22 harshraj22 added the enhancement New feature or request label May 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant