New universal API with streaming/blocking endpoints (#990)
Previous title: Add api_streaming extension and update api-example-stream to use it * Merge with latest main * Add parameter capturing encoder_repetition_penalty * Change some defaults, minor fixes * Add --api, --public-api flags * remove unneeded/broken comment from blocking API startup. The comment is already correctly emitted in try_start_cloudflared by calling the lambda we pass in. * Update on_start message for blocking_api, it should say 'non-streaming' and not 'streaming' * Update the API examples * Change a comment * Update README * Remove the gradio API * Remove unused import * Minor change * Remove unused import --------- Co-authored-by: oobabooga <112222186+oobabooga@users.noreply.github.com>
This commit is contained in:
parent
459e725af9
commit
654933c634
12 changed files with 346 additions and 286 deletions
90
extensions/api/blocking_api.py
Normal file
90
extensions/api/blocking_api.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
import json
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from threading import Thread
|
||||
|
||||
from modules import shared
|
||||
from modules.text_generation import encode, generate_reply
|
||||
|
||||
from extensions.api.util import build_parameters, try_start_cloudflared
|
||||
|
||||
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
if self.path == '/api/v1/model':
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
response = json.dumps({
|
||||
'result': shared.model_name
|
||||
})
|
||||
|
||||
self.wfile.write(response.encode('utf-8'))
|
||||
else:
|
||||
self.send_error(404)
|
||||
|
||||
def do_POST(self):
|
||||
content_length = int(self.headers['Content-Length'])
|
||||
body = json.loads(self.rfile.read(content_length).decode('utf-8'))
|
||||
|
||||
if self.path == '/api/v1/generate':
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'application/json')
|
||||
self.end_headers()
|
||||
|
||||
prompt = body['prompt']
|
||||
generate_params = build_parameters(body)
|
||||
stopping_strings = generate_params.pop('stopping_strings')
|
||||
|
||||
generator = generate_reply(
|
||||
prompt, generate_params, stopping_strings=stopping_strings)
|
||||
|
||||
answer = ''
|
||||
for a in generator:
|
||||
if isinstance(a, str):
|
||||
answer = a
|
||||
else:
|
||||
answer = a[0]
|
||||
|
||||
response = json.dumps({
|
||||
'results': [{
|
||||
'text': answer if shared.is_chat() else answer[len(prompt):]
|
||||
}]
|
||||
})
|
||||
self.wfile.write(response.encode('utf-8'))
|
||||
elif self.path == '/api/v1/token-count':
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'application/json')
|
||||
self.end_headers()
|
||||
|
||||
tokens = encode(body['prompt'])[0]
|
||||
response = json.dumps({
|
||||
'results': [{
|
||||
'tokens': len(tokens)
|
||||
}]
|
||||
})
|
||||
self.wfile.write(response.encode('utf-8'))
|
||||
else:
|
||||
self.send_error(404)
|
||||
|
||||
|
||||
def _run_server(port: int, share: bool=False):
|
||||
address = '0.0.0.0' if shared.args.listen else '127.0.0.1'
|
||||
|
||||
server = ThreadingHTTPServer((address, port), Handler)
|
||||
|
||||
def on_start(public_url: str):
|
||||
print(f'Starting non-streaming server at public url {public_url}/api')
|
||||
|
||||
if share:
|
||||
try:
|
||||
try_start_cloudflared(port, max_attempts=3, on_start=on_start)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
print(
|
||||
f'Starting API at http://{address}:{port}/api')
|
||||
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
def start_server(port: int, share: bool = False):
|
||||
Thread(target=_run_server, args=[port, share], daemon=True).start()
|
|
@ -1 +1,2 @@
|
|||
flask_cloudflared==0.0.12
|
||||
flask_cloudflared==0.0.12
|
||||
websockets==11.0.2
|
|
@ -1,115 +1,10 @@
|
|||
import json
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from threading import Thread
|
||||
|
||||
import extensions.api.blocking_api as blocking_api
|
||||
import extensions.api.streaming_api as streaming_api
|
||||
from modules import shared
|
||||
from modules.text_generation import encode, generate_reply
|
||||
|
||||
params = {
|
||||
'port': 5000,
|
||||
}
|
||||
|
||||
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
if self.path == '/api/v1/model':
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
response = json.dumps({
|
||||
'result': shared.model_name
|
||||
})
|
||||
|
||||
self.wfile.write(response.encode('utf-8'))
|
||||
else:
|
||||
self.send_error(404)
|
||||
|
||||
def do_POST(self):
|
||||
content_length = int(self.headers['Content-Length'])
|
||||
body = json.loads(self.rfile.read(content_length).decode('utf-8'))
|
||||
|
||||
if self.path == '/api/v1/generate':
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'application/json')
|
||||
self.end_headers()
|
||||
|
||||
prompt = body['prompt']
|
||||
prompt_lines = [k.strip() for k in prompt.split('\n')]
|
||||
max_context = body.get('max_context_length', 2048)
|
||||
while len(prompt_lines) >= 0 and len(encode('\n'.join(prompt_lines))) > max_context:
|
||||
prompt_lines.pop(0)
|
||||
|
||||
prompt = '\n'.join(prompt_lines)
|
||||
generate_params = {
|
||||
'max_new_tokens': int(body.get('max_length', 200)),
|
||||
'do_sample': bool(body.get('do_sample', True)),
|
||||
'temperature': float(body.get('temperature', 0.5)),
|
||||
'top_p': float(body.get('top_p', 1)),
|
||||
'typical_p': float(body.get('typical', 1)),
|
||||
'repetition_penalty': float(body.get('rep_pen', 1.1)),
|
||||
'encoder_repetition_penalty': 1,
|
||||
'top_k': int(body.get('top_k', 0)),
|
||||
'min_length': int(body.get('min_length', 0)),
|
||||
'no_repeat_ngram_size': int(body.get('no_repeat_ngram_size', 0)),
|
||||
'num_beams': int(body.get('num_beams', 1)),
|
||||
'penalty_alpha': float(body.get('penalty_alpha', 0)),
|
||||
'length_penalty': float(body.get('length_penalty', 1)),
|
||||
'early_stopping': bool(body.get('early_stopping', False)),
|
||||
'seed': int(body.get('seed', -1)),
|
||||
'add_bos_token': int(body.get('add_bos_token', True)),
|
||||
'truncation_length': int(body.get('truncation_length', 2048)),
|
||||
'ban_eos_token': bool(body.get('ban_eos_token', False)),
|
||||
'skip_special_tokens': bool(body.get('skip_special_tokens', True)),
|
||||
'custom_stopping_strings': '', # leave this blank
|
||||
'stopping_strings': body.get('stopping_strings', []),
|
||||
}
|
||||
stopping_strings = generate_params.pop('stopping_strings')
|
||||
generator = generate_reply(prompt, generate_params, stopping_strings=stopping_strings)
|
||||
answer = ''
|
||||
for a in generator:
|
||||
if isinstance(a, str):
|
||||
answer = a
|
||||
else:
|
||||
answer = a[0]
|
||||
|
||||
response = json.dumps({
|
||||
'results': [{
|
||||
'text': answer if shared.is_chat() else answer[len(prompt):]
|
||||
}]
|
||||
})
|
||||
self.wfile.write(response.encode('utf-8'))
|
||||
|
||||
elif self.path == '/api/v1/token-count':
|
||||
# Not compatible with KoboldAI api
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'application/json')
|
||||
self.end_headers()
|
||||
|
||||
tokens = encode(body['prompt'])[0]
|
||||
response = json.dumps({
|
||||
'results': [{
|
||||
'tokens': len(tokens)
|
||||
}]
|
||||
})
|
||||
self.wfile.write(response.encode('utf-8'))
|
||||
|
||||
else:
|
||||
self.send_error(404)
|
||||
|
||||
|
||||
def run_server():
|
||||
server_addr = ('0.0.0.0' if shared.args.listen else '127.0.0.1', params['port'])
|
||||
server = ThreadingHTTPServer(server_addr, Handler)
|
||||
if shared.args.share:
|
||||
try:
|
||||
from flask_cloudflared import _run_cloudflared
|
||||
public_url = _run_cloudflared(params['port'], params['port'] + 1)
|
||||
print(f'Starting KoboldAI compatible api at {public_url}/api')
|
||||
except ImportError:
|
||||
print('You should install flask_cloudflared manually')
|
||||
else:
|
||||
print(f'Starting KoboldAI compatible api at http://{server_addr[0]}:{server_addr[1]}/api')
|
||||
server.serve_forever()
|
||||
|
||||
BLOCKING_PORT = 5000
|
||||
STREAMING_PORT = 5005
|
||||
|
||||
def setup():
|
||||
Thread(target=run_server, daemon=True).start()
|
||||
blocking_api.start_server(BLOCKING_PORT, share=shared.args.public_api)
|
||||
streaming_api.start_server(STREAMING_PORT, share=shared.args.public_api)
|
||||
|
|
80
extensions/api/streaming_api.py
Normal file
80
extensions/api/streaming_api.py
Normal file
|
@ -0,0 +1,80 @@
|
|||
import json
|
||||
import asyncio
|
||||
from websockets.server import serve
|
||||
from threading import Thread
|
||||
|
||||
from modules import shared
|
||||
from modules.text_generation import generate_reply
|
||||
|
||||
from extensions.api.util import build_parameters, try_start_cloudflared
|
||||
|
||||
PATH = '/api/v1/stream'
|
||||
|
||||
|
||||
async def _handle_connection(websocket, path):
|
||||
|
||||
if path != PATH:
|
||||
print(f'Streaming api: unknown path: {path}')
|
||||
return
|
||||
|
||||
async for message in websocket:
|
||||
message = json.loads(message)
|
||||
|
||||
prompt = message['prompt']
|
||||
generate_params = build_parameters(message)
|
||||
stopping_strings = generate_params.pop('stopping_strings')
|
||||
|
||||
generator = generate_reply(
|
||||
prompt, generate_params, stopping_strings=stopping_strings)
|
||||
|
||||
# As we stream, only send the new bytes.
|
||||
skip_index = len(prompt) if not shared.is_chat() else 0
|
||||
message_num = 0
|
||||
|
||||
for a in generator:
|
||||
to_send = ''
|
||||
if isinstance(a, str):
|
||||
to_send = a[skip_index:]
|
||||
else:
|
||||
to_send = a[0][skip_index:]
|
||||
|
||||
await websocket.send(json.dumps({
|
||||
'event': 'text_stream',
|
||||
'message_num': message_num,
|
||||
'text': to_send
|
||||
}))
|
||||
|
||||
skip_index += len(to_send)
|
||||
message_num += 1
|
||||
|
||||
await websocket.send(json.dumps({
|
||||
'event': 'stream_end',
|
||||
'message_num': message_num
|
||||
}))
|
||||
|
||||
|
||||
async def _run(host: str, port: int):
|
||||
async with serve(_handle_connection, host, port):
|
||||
await asyncio.Future() # run forever
|
||||
|
||||
|
||||
def _run_server(port: int, share: bool = False):
|
||||
address = '0.0.0.0' if shared.args.listen else '127.0.0.1'
|
||||
|
||||
def on_start(public_url: str):
|
||||
public_url = public_url.replace('https://', 'wss://')
|
||||
print(f'Starting streaming server at public url {public_url}{PATH}')
|
||||
|
||||
if share:
|
||||
try:
|
||||
try_start_cloudflared(port, max_attempts=3, on_start=on_start)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
else:
|
||||
print(f'Starting streaming server at ws://{address}:{port}{PATH}')
|
||||
|
||||
asyncio.run(_run(host=address, port=port))
|
||||
|
||||
|
||||
def start_server(port: int, share: bool = False):
|
||||
Thread(target=_run_server, args=[port, share], daemon=True).start()
|
69
extensions/api/util.py
Normal file
69
extensions/api/util.py
Normal file
|
@ -0,0 +1,69 @@
|
|||
|
||||
from threading import Thread
|
||||
import time
|
||||
from typing import Callable, Optional
|
||||
from modules.text_generation import encode
|
||||
|
||||
|
||||
def build_parameters(body):
|
||||
prompt = body['prompt']
|
||||
|
||||
prompt_lines = [k.strip() for k in prompt.split('\n')]
|
||||
max_context = body.get('max_context_length', 2048)
|
||||
while len(prompt_lines) >= 0 and len(encode('\n'.join(prompt_lines))) > max_context:
|
||||
prompt_lines.pop(0)
|
||||
|
||||
prompt = '\n'.join(prompt_lines)
|
||||
|
||||
generate_params = {
|
||||
'max_new_tokens': int(body.get('max_new_tokens', body.get('max_length', 200))),
|
||||
'do_sample': bool(body.get('do_sample', True)),
|
||||
'temperature': float(body.get('temperature', 0.5)),
|
||||
'top_p': float(body.get('top_p', 1)),
|
||||
'typical_p': float(body.get('typical_p', body.get('typical', 1))),
|
||||
'repetition_penalty': float(body.get('repetition_penalty', body.get('rep_pen', 1.1))),
|
||||
'encoder_repetition_penalty': float(body.get('encoder_repetition_penalty', 1.0)),
|
||||
'top_k': int(body.get('top_k', 0)),
|
||||
'min_length': int(body.get('min_length', 0)),
|
||||
'no_repeat_ngram_size': int(body.get('no_repeat_ngram_size', 0)),
|
||||
'num_beams': int(body.get('num_beams', 1)),
|
||||
'penalty_alpha': float(body.get('penalty_alpha', 0)),
|
||||
'length_penalty': float(body.get('length_penalty', 1)),
|
||||
'early_stopping': bool(body.get('early_stopping', False)),
|
||||
'seed': int(body.get('seed', -1)),
|
||||
'add_bos_token': int(body.get('add_bos_token', True)),
|
||||
'truncation_length': int(body.get('truncation_length', 2048)),
|
||||
'ban_eos_token': bool(body.get('ban_eos_token', False)),
|
||||
'skip_special_tokens': bool(body.get('skip_special_tokens', True)),
|
||||
'custom_stopping_strings': '', # leave this blank
|
||||
'stopping_strings': body.get('stopping_strings', []),
|
||||
}
|
||||
|
||||
return generate_params
|
||||
|
||||
|
||||
def try_start_cloudflared(port: int, max_attempts: int = 3, on_start: Optional[Callable[[str], None]] = None):
|
||||
Thread(target=_start_cloudflared, args=[
|
||||
port, max_attempts, on_start], daemon=True).start()
|
||||
|
||||
|
||||
def _start_cloudflared(port: int, max_attempts: int = 3, on_start: Optional[Callable[[str], None]] = None):
|
||||
try:
|
||||
from flask_cloudflared import _run_cloudflared
|
||||
except ImportError:
|
||||
print('You should install flask_cloudflared manually')
|
||||
raise Exception(
|
||||
'flask_cloudflared not installed. Make sure you installed the requirements.txt for this extension.')
|
||||
|
||||
for _ in range(max_attempts):
|
||||
try:
|
||||
public_url = _run_cloudflared(port, port + 1)
|
||||
|
||||
if on_start:
|
||||
on_start(public_url)
|
||||
|
||||
return
|
||||
except Exception:
|
||||
time.sleep(3)
|
||||
|
||||
raise Exception('Could not start cloudflared.')
|
Loading…
Add table
Add a link
Reference in a new issue