Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support KServe 13.1 #2

Merged
merged 7 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
test:
PYTHONPATH=. pytest tests/

.PHONY: test
70 changes: 62 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# kserve-helper

This is a helper for building docker images for ML models.
*kserve-helper* is a toolkit for building docker images for ML models built on KServe. It supports
model input validation, uploading generated files to S3 or GCS, building model images, etc.
[Here](https://github.com/HyperGAI/kserve-helper/tree/main/examples) are some basic examples.
For more examples, please visit this [repo](https://github.com/HyperGAI/model-zoo).

## Implement a Model Class for Serving
To build a docker image for serving, we only need to implement one class with `load` and `predict`
To build a docker image for serving, we only need to implement a model class with `load` and `predict`
methods:
```python
class Model:
Expand Down Expand Up @@ -34,17 +34,71 @@ class Model:
output_image.save(output_path)
return Path(output_path)
```
The `load` function will be called during the initialization step, which will be only called once.
The `predict` function will be called for each request. The input parameter info is specified by
the `Input` class. This `Input` class allows us to set parameter descriptions, default value and
The `load` function will be called during the model initialization step, which will be only called once.
The `predict` function will be called for each request. The input parameter information is specified by
the `Input` class. This `Input` class allows us to set parameter descriptions, default values and
constraints (e.g., 0 <= input value <= 1).

The output typing of the `predict` function is important. If the output type is `Path` or
`List[Path]`, the webhook for uploading will be called after `predict` is finished. In this case,
`List[Path]`, the webhook for uploading files will be called after `predict` is finished. In this case,
the input request should also contain an additional key "upload_webhook" to specify the webhook server
address (an [example](https://github.com/HyperGAI/kserve-helper/tree/main/examples/rotate-image)).
If the output type is not `Path`, the results will be returned directly without calling the webhook.

If streaming outputs are required, the output of `predict` should be an iterator:
```python
class Model:

def load(self):
pass

def predict(
self,
repeat: int = Input(
description="The number of repeats",
default=5
)
):
def _generator():
for i in range(repeat):
yield "Hello World!"
time.sleep(1)

return KServeModel.wrap_generator(_generator)
```
Note that we combine streaming and non-streaming APIs together as `predict` when using KServe >= 0.13.1.
For KServe <= 0.10.2, we seperate streaming and non-streaming APIs, i.e.,
```python
class Model:

def load(self):
pass

def predict(
self,
repeat: int = Input(
description="The number of repeats",
default=5
)
):
time.sleep(repeat)
return {"output": " ".join(["Hello World!"] * repeat)}

def generate(
self,
repeat: int = Input(
description="The number of repeats",
default=5
)
):
def _generator():
for i in range(repeat):
yield "Hello World!"
time.sleep(1)

return KServeModel.wrap_generator(_generator)
```

## Write a Config for Building Docker Image

To build the corresponding docker image for serving, we only need to write a config file:
Expand Down Expand Up @@ -98,4 +152,4 @@ To push the docker image, run this command:
kservehelper push .
```

For more details, please check the implementations in the [repo](https://github.com/HyperGAI/model-zoo).
For more details, please check the implementations in the [repo](https://github.com/yangwenz/Hyperbooth).
1 change: 0 additions & 1 deletion examples/rotate-image/kserve_local_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ export INGRESS_PORT=8080

SERVICE_HOSTNAME=$(kubectl get inferenceservice test-rotate-image -o jsonpath='{.status.url}' | cut -d "/" -f 3)
curl -v -H "Host: ${SERVICE_HOSTNAME}" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/test-rotate-image:predict" -d @./input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" "http://${INGRESS_HOST}:${INGRESS_PORT}/v1/docs/test-rotate-image"
3 changes: 1 addition & 2 deletions examples/rotate-image/test.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#!/bin/bash

curl localhost:8080/v1/models/test-rotate-image:predict -d @./input.json
curl -X GET localhost:8080/v1/docs/test-rotate-image
curl localhost:8080/v1/models/test-rotate-image:predict -d @./input.json
File renamed without changes.
File renamed without changes.
27 changes: 27 additions & 0 deletions examples/streaming/kserve-0.13.1/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import time
from kservehelper.model import KServeModel
from kservehelper.types import Input


class Model:

def load(self):
pass

def predict(
self,
repeat: int = Input(
description="The number of repeats",
default=5
)
):
def _generator():
for i in range(repeat):
yield "Hello World!"
time.sleep(1)

return KServeModel.wrap_generator(_generator)


if __name__ == "__main__":
KServeModel.serve("streaming", Model)
6 changes: 6 additions & 0 deletions examples/streaming/kserve-0.13.1/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import requests

url = "http://localhost:8080/v1/models/streaming:predict"
with requests.post(url, stream=True, json={"repeat": 5}) as r:
for chunk in r.iter_lines():
print(chunk)
4 changes: 2 additions & 2 deletions kservehelper/kserve/model_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import Dict, List, Optional, Union, Tuple, AsyncIterator, Any

from ray import serve as rayserve
from ray.serve.api import Deployment, RayServeHandle
from ray.serve.api import Deployment

from kserve.logging import KSERVE_LOG_CONFIG, logger
from kserve.model import Model
Expand Down Expand Up @@ -230,7 +230,7 @@ async def stop(self, sig: Optional[int] = None):
logger.info("Stopping the grpc server")
await self._grpc_server.stop(sig)

def register_model_handle(self, name: str, model_handle: RayServeHandle):
def register_model_handle(self, name: str, model_handle):
self.registered_models.update_handle(name, model_handle)
logger.info("Registering model handle: %s", name)

Expand Down
56 changes: 42 additions & 14 deletions kservehelper/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@
import typing
import pydantic
import inspect
from importlib.metadata import version

from collections import OrderedDict
from typing import Any, Dict, Callable
from typing import Any, Dict, Callable, Union, Iterator, AsyncIterator
from inspect import signature
from datetime import datetime

from kserve import Model
from kservehelper.kserve import ModelServer

if version("kserve") <= "0.10.2":
from kservehelper.kserve import ModelServer
else:
from kserve import ModelServer

from kservehelper.types import Path, validate
from kservehelper.utils import upload_files

Expand Down Expand Up @@ -222,27 +228,41 @@ def _process_payload(payload: Dict):
return payload

@staticmethod
def _predict(self, payload: Dict, headers: Dict[str, str] = None) -> Dict:
def _predict(
self,
payload: Union[Dict, bytes],
headers: Dict[str, str] = None
) -> Union[Dict, Iterator, AsyncIterator]:
start_time = time.time()
if isinstance(payload, bytes):
payload = json.loads(payload.decode("utf-8"))
upload_webhook = payload.pop("upload_webhook", None)

payload = KServeModel._process_payload(payload)
outputs = self.model.predict(**payload)
results = KServeModel._upload(upload_webhook, outputs)
if getattr(self.model, "after_predict", None) is not None:
results = self.model.after_predict(results)
results["running_time"] = f"{time.time() - start_time}s"
return results
if not isinstance(outputs, (Iterator, AsyncIterator)):
results = KServeModel._upload(upload_webhook, outputs)
if getattr(self.model, "after_predict", None) is not None:
results = self.model.after_predict(results)
results["running_time"] = f"{time.time() - start_time}s"
return results
else:
return outputs

@staticmethod
def _generate(self, payload: Dict, headers: Dict[str, str] = None):
def _generate(self, payload: Union[Dict, bytes], headers: Dict[str, str] = None):
if isinstance(payload, bytes):
payload = json.loads(payload.decode("utf-8"))
payload.pop("upload_webhook", None)
payload = KServeModel._process_payload(payload)
generator = self.model.generate(**payload)
return generator

@staticmethod
def _preprocess(self, payload: Dict, headers: Dict[str, str] = None) -> Dict:
def _preprocess(self, payload: Union[Dict, bytes], headers: Dict[str, str] = None) -> Dict:
self.predict_start_time = time.time()
if isinstance(payload, bytes):
payload = json.loads(payload.decode("utf-8"))
self.upload_webhook = payload.pop("upload_webhook", None)
payload = KServeModel._process_payload(payload)
return self.model.preprocess(**payload)
Expand Down Expand Up @@ -308,11 +328,19 @@ def generate_filepath(filename: str) -> str:

@staticmethod
def wrap_generator(g):
def _g():
for i, data in enumerate(g()):
yield json.dumps({"id": i, "data": data}, ensure_ascii=False) + "\n"

return _g
if version("kserve") <= "0.10.2":
def _g():
for i, data in enumerate(g()):
yield json.dumps({"id": i, "data": data}, ensure_ascii=False) + "\n"

return _g
else:
async def _g():
for i, data in enumerate(g()):
yield json.dumps({"id": i, "data": data}, ensure_ascii=False) + "\n"

return _g()

@staticmethod
def serve(name: str, model_class: Any, num_replicas: int = 1, **kwargs):
Expand Down
54 changes: 51 additions & 3 deletions kservehelper/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@
import tempfile
import urllib
from typing import Any, Dict, Iterator, List, Optional, TypeVar, Union
from packaging.version import Version

import requests
import pydantic
from pydantic import Field

try:
from annotated_types import MinLen, MaxLen, Ge, Le
except:
pass

FILENAME_ILLEGAL_CHARS = set("\u0000/")

# Linux allows files up to 255 bytes long. We enforce a slightly shorter
Expand All @@ -24,8 +30,8 @@
def Input(
default: Any = ...,
description: str = None,
ge: float = None,
le: float = None,
ge: Union[float, str] = None,
le: Union[float, str] = None,
min_length: int = None,
max_length: int = None,
min_items: int = None,
Expand All @@ -48,7 +54,7 @@ def Input(
)


def validate(value, name: str, field: pydantic.fields.FieldInfo):
def validate_old(value, name: str, field: pydantic.fields.FieldInfo):
if isinstance(value, str):
if field.min_length:
assert len(value) >= field.min_length, \
Expand All @@ -74,6 +80,48 @@ def validate(value, name: str, field: pydantic.fields.FieldInfo):
f"the value of {name} should be <= {field.le}"


def validate_new(value, name: str, field: pydantic.fields.FieldInfo):
if isinstance(value, str):
for constraint in field.metadata:
if isinstance(constraint, MinLen):
assert len(value) >= constraint.min_length, \
f"the length of {name} should be >= {constraint.min_length}"
elif isinstance(constraint, MaxLen):
assert len(value) <= constraint.max_length, \
f"the length of {name} should be <= {constraint.max_length}"
elif isinstance(constraint, Ge):
assert value >= constraint.ge, \
f"the value of {name} should be >= {constraint.ge}"
elif isinstance(constraint, Le):
assert value <= constraint.le, \
f"the value of {name} should be <= {constraint.le}"

elif isinstance(value, (list, tuple)):
for constraint in field.metadata:
if isinstance(constraint, MinLen):
assert len(value) >= constraint.min_length, \
f"the number of items in {name} should be >= {constraint.min_length}"
elif isinstance(constraint, MaxLen):
assert len(value) <= constraint.max_length, \
f"the number of items in {name} should be <= {constraint.max_length}"

elif isinstance(value, (int, float)):
for constraint in field.metadata:
if isinstance(constraint, Ge):
assert value >= constraint.ge, \
f"the value of {name} should be >= {constraint.ge}"
elif isinstance(constraint, Le):
assert value <= constraint.le, \
f"the value of {name} should be <= {constraint.le}"


def validate(value, name: str, field: pydantic.fields.FieldInfo):
if Version(pydantic.version.VERSION) >= Version("2.0"):
validate_new(value, name, field)
else:
validate_old(value, name, field)


class File(io.IOBase):
validate_always = True

Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
kserve==0.10.2
pydantic==1.10.12
pydantic==2.8.2
kserve==0.13.1
requests==2.29.0
aiohttp==3.8.3
aiofiles==23.2.1
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="kservehelper",
version="1.2.6",
version="2.0.0",
author="Wenzhuo Yang",
description="A KServe Model Wrapper",
long_description=open("README.md", "r", encoding="utf-8").read(),
Expand All @@ -17,8 +17,8 @@
]
},
install_requires=[
"kserve==0.10.2",
"pydantic==1.10.12",
"pydantic==2.8.2",
"kserve==0.13.1",
"requests==2.29.0",
"aiohttp==3.8.3",
"aiofiles==23.2.1",
Expand Down
Loading