From e1ff4ba5851cf88ae5f253bb8dd36a8e4c99db92 Mon Sep 17 00:00:00 2001 From: tonysy Date: Fri, 17 Nov 2023 23:49:05 +0800 Subject: [PATCH] Update API --- configs/{eval_360.py => eval_api_360.py} | 1 - configs/eval_api_baichuan.py | 42 +++++ configs/eval_api_pangu.py | 45 +++++ opencompass/models/__init__.py | 2 + opencompass/models/ai360_api.py | 22 ++- opencompass/models/baichuan_api.py | 196 +++++++++++++++++++++ opencompass/models/minimax_api.py | 9 +- opencompass/models/pangu_api.py | 215 +++++++++++++++++++++++ opencompass/models/xunfei_api.py | 9 +- opencompass/models/zhipuai_api.py | 9 +- requirements/api.txt | 1 + 11 files changed, 531 insertions(+), 20 deletions(-) rename configs/{eval_360.py => eval_api_360.py} (96%) create mode 100644 configs/eval_api_baichuan.py create mode 100644 configs/eval_api_pangu.py create mode 100644 opencompass/models/baichuan_api.py create mode 100644 opencompass/models/pangu_api.py diff --git a/configs/eval_360.py b/configs/eval_api_360.py similarity index 96% rename from configs/eval_360.py rename to configs/eval_api_360.py index d1507df29..195d461b4 100644 --- a/configs/eval_360.py +++ b/configs/eval_api_360.py @@ -1,7 +1,6 @@ from mmengine.config import read_base from opencompass.models import AI360GPT from opencompass.partitioners import NaivePartitioner -from opencompass.runners import LocalRunner from opencompass.runners.local_api import LocalAPIRunner from opencompass.tasks import OpenICLInferTask diff --git a/configs/eval_api_baichuan.py b/configs/eval_api_baichuan.py new file mode 100644 index 000000000..9cef341e2 --- /dev/null +++ b/configs/eval_api_baichuan.py @@ -0,0 +1,42 @@ +from mmengine.config import read_base +from opencompass.models import ( + BaiChuan, +) + +from opencompass.partitioners import NaivePartitioner +from opencompass.runners import LocalRunner +from opencompass.runners.local_api import LocalAPIRunner +from opencompass.tasks import OpenICLInferTask + +with read_base(): + # from .datasets.collections.chat_medium import datasets + from .summarizers.medium import summarizer + from .datasets.ceval.ceval_gen import ceval_datasets + +datasets = [ + *ceval_datasets, +] + +models = [ + dict( + abbr='Baichuan2-53B', + type=BaiChuan, + path='Baichuan2-53B', + api_key='xxxxxx', + secret_key="xxxxx", + query_per_second=1, + max_out_len=2048, + max_seq_len=2048, + batch_size=8), +] + +infer = dict( + partitioner=dict(type=NaivePartitioner), + runner=dict( + type=LocalAPIRunner, + max_num_workers=2, + concurrent_users=2, + task=dict(type=OpenICLInferTask)), +) + +work_dir = "outputs/api_baichuan53b/" \ No newline at end of file diff --git a/configs/eval_api_pangu.py b/configs/eval_api_pangu.py new file mode 100644 index 000000000..04523a128 --- /dev/null +++ b/configs/eval_api_pangu.py @@ -0,0 +1,45 @@ +from mmengine.config import read_base +from opencompass.models import PanGu + +from opencompass.partitioners import NaivePartitioner +from opencompass.runners import LocalRunner +from opencompass.runners.local_api import LocalAPIRunner +from opencompass.tasks import OpenICLInferTask + +with read_base(): + # from .datasets.collections.chat_medium import datasets + from .summarizers.medium import summarizer + from .datasets.ceval.ceval_gen import ceval_datasets + +datasets = [ + *ceval_datasets, +] + +models = [ +dict( + abbr='pangu', + type=PanGu, + path='pangu', + access_key="xxxxxx", + secret_key="xxxxxx", + url = "xxxxxx", + # url of token sever, used for generate token, like "https://xxxxxx.myhuaweicloud.com/v3/auth/tokens", + token_url = "xxxxxx", + # scope-project-name, used for generate token + project_name = "xxxxxx", + query_per_second=1, + max_out_len=2048, + max_seq_len=2048, + batch_size=8), +] + +infer = dict( + partitioner=dict(type=NaivePartitioner), + runner=dict( + type=LocalAPIRunner, + max_num_workers=2, + concurrent_users=2, + task=dict(type=OpenICLInferTask)), +) + +work_dir = "outputs/api_pangu/" \ No newline at end of file diff --git a/opencompass/models/__init__.py b/opencompass/models/__init__.py index 1790a1daa..5709d53e2 100644 --- a/opencompass/models/__init__.py +++ b/opencompass/models/__init__.py @@ -1,4 +1,5 @@ from .ai360_api import AI360GPT # noqa: F401 +from .baichuan_api import BaiChuan # noqa: F401 from .base import BaseModel, LMTemplateParser # noqa from .base_api import APITemplateParser, BaseAPIModel # noqa from .claude_api import Claude # noqa: F401 @@ -10,5 +11,6 @@ from .llama2 import Llama2, Llama2Chat # noqa: F401, F403 from .minimax_api import MiniMax # noqa: F401 from .openai_api import OpenAI # noqa: F401 +from .pangu_api import PanGu # noqa: F401 from .xunfei_api import XunFei # noqa: F401 from .zhipuai_api import ZhiPuAI # noqa: F401 diff --git a/opencompass/models/ai360_api.py b/opencompass/models/ai360_api.py index 2f823502c..676155f3f 100644 --- a/opencompass/models/ai360_api.py +++ b/opencompass/models/ai360_api.py @@ -18,6 +18,16 @@ class AI360GPT(BaseAPIModel): Documentations: https://ai.360.com/platform/docs/overview Args: + path (str): Model name + key (str): Provide API Key + url (str): Provided URL + query_per_second (int): The maximum queries allowed per second + between two consecutive calls of the API. Defaults to 2. + max_seq_len (int): Unused here. + meta_template (Dict, optional): The model's meta prompt + template if needed, in case the requirement of injecting or + wrapping of any meta instructions. + retry (int): Number of retires if the API call fails. Defaults to 2. """ def __init__( @@ -66,11 +76,12 @@ def generate( return results def flush(self): - """Flush stdout and stderr when concurrent resources exists. + """Ensure simultaneous emptying of stdout and stderr when concurrent + resources are available. - When use multiproessing with standard io rediected to files, need to - flush internal information for examination or log loss when system - breaks. + When employing multiprocessing with standard I/O redirected to files, + it is crucial to clear internal data for examination or prevent log + loss in case of system failures." """ if hasattr(self, 'tokens'): sys.stdout.flush() @@ -160,9 +171,6 @@ def _generate( self.wait() continue if raw_response.status_code == 200: - # msg = json.load(response.text) - # response - # msg = response['text'] try: msg = response['choices'][0]['message']['content'].strip() return msg diff --git a/opencompass/models/baichuan_api.py b/opencompass/models/baichuan_api.py new file mode 100644 index 000000000..f14309df2 --- /dev/null +++ b/opencompass/models/baichuan_api.py @@ -0,0 +1,196 @@ +import hashlib +import json +import sys +import time +from concurrent.futures import ThreadPoolExecutor +from typing import Dict, List, Optional, Union + +import requests + +from opencompass.utils.prompt import PromptList + +from .base_api import BaseAPIModel + +PromptType = Union[PromptList, str] + + +class BaiChuan(BaseAPIModel): + """Model wrapper around Baichuan. + + Documentation: https://platform.baichuan-ai.com/docs/api + + Args: + path (str): The name of Baichuan model. + e.g. `Baichuan2-53B` + api_key (str): Provided api key + secretkey (str): secretkey in order to obtain access_token + query_per_second (int): The maximum queries allowed per second + between two consecutive calls of the API. Defaults to 1. + max_seq_len (int): Unused here. + meta_template (Dict, optional): The model's meta prompt + template if needed, in case the requirement of injecting or + wrapping of any meta instructions. + retry (int): Number of retires if the API call fails. Defaults to 2. + """ + + def __init__( + self, + path: str, + api_key: str, + secret_key: str, + url: str = 'https://api.baichuan-ai.com/v1/chat', + query_per_second: int = 2, + max_seq_len: int = 2048, + meta_template: Optional[Dict] = None, + retry: int = 2, + ): + super().__init__(path=path, + max_seq_len=max_seq_len, + query_per_second=query_per_second, + meta_template=meta_template, + retry=retry) + + self.api_key = api_key + self.secret_key = secret_key + self.url = url + self.model = path + + def generate( + self, + inputs: List[str or PromptList], + max_out_len: int = 512, + ) -> List[str]: + """Generate results given a list of inputs. + + Args: + inputs (List[str or PromptList]): A list of strings or PromptDicts. + The PromptDict should be organized in OpenCompass' + API format. + max_out_len (int): The maximum length of the output. + + Returns: + List[str]: A list of generated strings. + """ + with ThreadPoolExecutor() as executor: + results = list( + executor.map(self._generate, inputs, + [max_out_len] * len(inputs))) + self.flush() + return results + + def flush(self): + """Ensure simultaneous emptying of stdout and stderr when concurrent + resources are available. + + When employing multiprocessing with standard I/O redirected to files, + it is crucial to clear internal data for examination or prevent log + loss in case of system failures." + """ + if hasattr(self, 'tokens'): + sys.stdout.flush() + sys.stderr.flush() + + def acquire(self): + """Acquire concurrent resources if exists. + + This behavior will fall back to wait with query_per_second if there are + no concurrent resources. + """ + if hasattr(self, 'tokens'): + self.tokens.acquire() + else: + self.wait() + + def release(self): + """Release concurrent resources if acquired. + + This behavior will fall back to do nothing if there are no concurrent + resources. + """ + if hasattr(self, 'tokens'): + self.tokens.release() + + def _generate( + self, + input: str or PromptList, + max_out_len: int = 512, + ) -> str: + """Generate results given an input. + + Args: + inputs (str or PromptList): A string or PromptDict. + The PromptDict should be organized in OpenCompass' + API format. + max_out_len (int): The maximum length of the output. + + Returns: + str: The generated string. + """ + + assert isinstance(input, (str, PromptList)) + + if isinstance(input, str): + messages = [{'role': 'user', 'content': input}] + else: + messages = [] + for item in input: + msg = {'content': item['prompt']} + if item['role'] == 'HUMAN': + msg['role'] = 'user' + elif item['role'] == 'BOT': + msg['role'] = 'assistant' + + messages.append(msg) + + data = {'model': self.model, 'messages': messages} + + def calculate_md5(input_string): + md5 = hashlib.md5() + md5.update(input_string.encode('utf-8')) + encrypted = md5.hexdigest() + return encrypted + + json_data = json.dumps(data) + time_stamp = int(time.time()) + signature = calculate_md5(self.secret_key + json_data + + str(time_stamp)) + + headers = { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer ' + self.api_key, + 'X-BC-Request-Id': 'your requestId', + 'X-BC-Timestamp': str(time_stamp), + 'X-BC-Signature': signature, + 'X-BC-Sign-Algo': 'MD5', + } + + max_num_retries = 0 + while max_num_retries < self.retry: + self.acquire() + raw_response = requests.request('POST', + url=self.url, + headers=headers, + json=data) + response = raw_response.json() + self.release() + + if response is None: + print('Connection error, reconnect.') + # if connect error, frequent requests will casuse + # continuous unstable network, therefore wait here + # to slow down the request + self.wait() + continue + if raw_response.status_code == 200 and response['code'] == 0: + # msg = json.load(response.text) + # response + msg = response['data']['messages'][0]['content'] + return msg + + if response['code'] != 0: + print(response) + return '' + print(response) + max_num_retries += 1 + + raise RuntimeError(response) diff --git a/opencompass/models/minimax_api.py b/opencompass/models/minimax_api.py index 813d91624..cc9cd9a83 100644 --- a/opencompass/models/minimax_api.py +++ b/opencompass/models/minimax_api.py @@ -82,11 +82,12 @@ def generate( return results def flush(self): - """Flush stdout and stderr when concurrent resources exists. + """Ensure simultaneous emptying of stdout and stderr when concurrent + resources are available. - When use multiproessing with standard io rediected to files, need to - flush internal information for examination or log loss when system - breaks. + When employing multiprocessing with standard I/O redirected to files, + it is crucial to clear internal data for examination or prevent log + loss in case of system failures." """ if hasattr(self, 'tokens'): sys.stdout.flush() diff --git a/opencompass/models/pangu_api.py b/opencompass/models/pangu_api.py new file mode 100644 index 000000000..991fa9d48 --- /dev/null +++ b/opencompass/models/pangu_api.py @@ -0,0 +1,215 @@ +import sys +from concurrent.futures import ThreadPoolExecutor +from typing import Dict, List, Optional, Union + +import requests + +from opencompass.utils.prompt import PromptList + +from .base_api import BaseAPIModel + +PromptType = Union[PromptList, str] + + +class PanGu(BaseAPIModel): + """Model wrapper around PanGu. + + Args: + path (str): The name of Pangu model. + e.g. `pangu` + access_key (str): provided access_key + secret_key (str): secretkey in order to obtain access_token + url (str): provide url for requests + token_url (str): url of token server + project_name (str): project name for generate the token + query_per_second (int): The maximum queries allowed per second + between two consecutive calls of the API. Defaults to 1. + max_seq_len (int): Unused here. + meta_template (Dict, optional): The model's meta prompt + template if needed, in case the requirement of injecting or + wrapping of any meta instructions. + retry (int): Number of retires if the API call fails. Defaults to 2. + """ + + def __init__( + self, + path: str, + access_key: str, + secret_key: str, + url: str, + token_url: str, + project_name: str, + query_per_second: int = 2, + max_seq_len: int = 2048, + meta_template: Optional[Dict] = None, + retry: int = 2, + ): + super().__init__(path=path, + max_seq_len=max_seq_len, + query_per_second=query_per_second, + meta_template=meta_template, + retry=retry) + + self.access_key = access_key + self.secret_key = secret_key + self.url = url + self.token_url = token_url + self.project_name = project_name + self.model = path + + def generate( + self, + inputs: List[str or PromptList], + max_out_len: int = 512, + ) -> List[str]: + """Generate results given a list of inputs. + + Args: + inputs (List[str or PromptList]): A list of strings or PromptDicts. + The PromptDict should be organized in OpenCompass' + API format. + max_out_len (int): The maximum length of the output. + + Returns: + List[str]: A list of generated strings. + """ + with ThreadPoolExecutor() as executor: + results = list( + executor.map(self._generate, inputs, + [max_out_len] * len(inputs))) + self.flush() + return results + + def flush(self): + """Ensure simultaneous emptying of stdout and stderr when concurrent + resources are available. + + When employing multiprocessing with standard I/O redirected to files, + it is crucial to clear internal data for examination or prevent log + loss in case of system failures." + """ + if hasattr(self, 'tokens'): + sys.stdout.flush() + sys.stderr.flush() + + def acquire(self): + """Acquire concurrent resources if exists. + + This behavior will fall back to wait with query_per_second if there are + no concurrent resources. + """ + if hasattr(self, 'tokens'): + self.tokens.acquire() + else: + self.wait() + + def release(self): + """Release concurrent resources if acquired. + + This behavior will fall back to do nothing if there are no concurrent + resources. + """ + if hasattr(self, 'tokens'): + self.tokens.release() + + def _get_token(self): + url = self.token_url + payload = { + 'auth': { + 'identity': { + 'methods': ['hw_ak_sk'], + 'hw_ak_sk': { + 'access': { + 'key': self.access_key + }, + 'secret': { + 'key': self.secret_key + } + } + }, + 'scope': { + 'project': { + 'name': self.project_name + } + } + } + } + headers = {'Content-Type': 'application/json'} + + response = requests.request('POST', url, headers=headers, json=payload) + return response + + def _generate( + self, + input: str or PromptList, + max_out_len: int = 512, + ) -> str: + """Generate results given an input. + + Args: + inputs (str or PromptList): A string or PromptDict. + The PromptDict should be organized in OpenCompass' + API format. + max_out_len (int): The maximum length of the output. + + Returns: + str: The generated string. + """ + assert isinstance(input, (str, PromptList)) + + if isinstance(input, str): + messages = [{'role': 'user', 'content': input}] + else: + messages = [] + for item in input: + msg = {'content': item['prompt']} + if item['role'] == 'HUMAN': + msg['role'] = 'user' + elif item['role'] == 'BOT': + msg['role'] = 'system' + + messages.append(msg) + + data = {'messages': messages, 'stream': False} + + token_response = self._get_token() + if token_response.status_code == 201: + token = token_response.headers['X-Subject-Token'] + print('请求成功!') + else: + msg = 'token生成失败' + print(msg) + return '' + + headers = {'Content-Type': 'application/json', 'X-Auth-Token': token} + + max_num_retries = 0 + while max_num_retries < self.retry: + self.acquire() + raw_response = requests.request('POST', + url=self.url, + headers=headers, + json=data) + response = raw_response.json() + self.release() + + if response is None: + print('Connection error, reconnect.') + # if connect error, frequent requests will casuse + # continuous unstable network, therefore wait here + # to slow down the request + self.wait() + continue + if raw_response.status_code == 200: + # msg = json.load(response.text) + # response + msg = response['choices'][0]['message']['content'] + return msg + + if (raw_response.status_code != 200): + print(response['error_msg']) + return '' + print(response) + max_num_retries += 1 + + raise RuntimeError(response['error_msg']) diff --git a/opencompass/models/xunfei_api.py b/opencompass/models/xunfei_api.py index 72f3815f6..8eb232ca7 100644 --- a/opencompass/models/xunfei_api.py +++ b/opencompass/models/xunfei_api.py @@ -121,11 +121,12 @@ def generate( return results def flush(self): - """Flush stdout and stderr when concurrent resources exists. + """Ensure simultaneous emptying of stdout and stderr when concurrent + resources are available. - When use multiproessing with standard io rediected to files, need to - flush internal information for examination or log loss when system - breaks. + When employing multiprocessing with standard I/O redirected to files, + it is crucial to clear internal data for examination or prevent log + loss in case of system failures." """ if hasattr(self, 'tokens'): sys.stdout.flush() diff --git a/opencompass/models/zhipuai_api.py b/opencompass/models/zhipuai_api.py index 7360b12fc..f475608ff 100644 --- a/opencompass/models/zhipuai_api.py +++ b/opencompass/models/zhipuai_api.py @@ -67,11 +67,12 @@ def generate( return results def flush(self): - """Flush stdout and stderr when concurrent resources exists. + """Ensure simultaneous emptying of stdout and stderr when concurrent + resources are available. - When use multiproessing with standard io rediected to files, need to - flush internal information for examination or log loss when system - breaks. + When employing multiprocessing with standard I/O redirected to files, + it is crucial to clear internal data for examination or prevent log + loss in case of system failures." """ if hasattr(self, 'tokens'): sys.stdout.flush() diff --git a/requirements/api.txt b/requirements/api.txt index a9a20933b..c42d253db 100644 --- a/requirements/api.txt +++ b/requirements/api.txt @@ -1,2 +1,3 @@ +sseclient-py==1.7.2 websocket-client zhipuai