Skip to content

Commit

Permalink
Merge branch 'main' into feat-rdb_summary-wide_table
Browse files Browse the repository at this point in the history
  • Loading branch information
dongzhancai1 committed Dec 12, 2024
2 parents 2149e2a + a7f4ac6 commit d817230
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 38 deletions.
2 changes: 2 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ KNOWLEDGE_GRAPH_SEARCH_TOP_SIZE=200
## you can set this value to a higher value for better performance.
## if out of memory when load large document, you can set this value to a lower value.
# KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD=10
## Maximum number of threads to use when loading chunks, please make sure your vector db can support multi-threading.
# KNOWLEDGE_MAX_THREADS=1
#KNOWLEDGE_CHUNK_OVERLAP=50
# Control whether to display the source document of knowledge on the front end.
KNOWLEDGE_CHAT_SHOW_RELATIONS=False
Expand Down
17 changes: 9 additions & 8 deletions dbgpt/_private/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,16 @@ def __init__(self) -> None:
os.environ["claude_proxyllm_api_base"] = os.getenv(
"ANTHROPIC_BASE_URL", "https://api.anthropic.com"
)
self.silicon_flow_proxy_api_key = os.getenv("SILICON_FLOW_API_KEY")
if self.silicon_flow_proxy_api_key:
self.siliconflow_proxy_api_key = os.getenv("SILICONFLOW_API_KEY")
if self.siliconflow_proxy_api_key:
os.environ[
"silicon_flow_proxyllm_proxy_api_key"
] = self.silicon_flow_proxy_api_key
os.environ["silicon_flow_proxyllm_proxyllm_backend"] = os.getenv(
"SILICON_FLOW_MODEL_VERSION", "Qwen/Qwen2.5-Coder-32B-Instruct"
"siliconflow_proxyllm_proxy_api_key"
] = self.siliconflow_proxy_api_key
os.environ["siliconflow_proxyllm_proxyllm_backend"] = os.getenv(
"SILICONFLOW_MODEL_VERSION", "Qwen/Qwen2.5-Coder-32B-Instruct"
)
os.environ["silicon_flow_proxyllm_api_base"] = os.getenv(
"SILICON_FLOW_API_BASE", "https://api.siliconflow.cn/v1"
os.environ["siliconflow_proxyllm_api_base"] = os.getenv(
"SILICONFLOW_API_BASE", "https://api.siliconflow.cn/v1"
)

self.proxy_server_url = os.getenv("PROXY_SERVER_URL")
Expand Down Expand Up @@ -280,6 +280,7 @@ def __init__(self) -> None:
self.KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD = int(
os.getenv("KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD", 10)
)
self.KNOWLEDGE_MAX_THREADS = int(os.getenv("KNOWLEDGE_MAX_THREADS", 1))
# default recall similarity score, between 0 and 1
self.KNOWLEDGE_SEARCH_RECALL_SCORE = float(
os.getenv("KNOWLEDGE_SEARCH_RECALL_SCORE", 0.3)
Expand Down
3 changes: 2 additions & 1 deletion dbgpt/configs/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def get_device() -> str:
# https://platform.deepseek.com/api-docs/
"deepseek_proxyllm": "deepseek_proxyllm",
# https://docs.siliconflow.cn/quickstart
"silicon_flow_proxyllm": "silicon_flow_proxyllm",
"siliconflow_proxyllm": "siliconflow_proxyllm",
"llama-2-7b": os.path.join(MODEL_PATH, "Llama-2-7b-chat-hf"),
"llama-2-13b": os.path.join(MODEL_PATH, "Llama-2-13b-chat-hf"),
"llama-2-70b": os.path.join(MODEL_PATH, "Llama-2-70b-chat-hf"),
Expand Down Expand Up @@ -322,6 +322,7 @@ def get_device() -> str:
"bge-reranker-large": os.path.join(MODEL_PATH, "bge-reranker-large"),
# Proxy rerank model
"rerank_proxy_http_openapi": "rerank_proxy_http_openapi",
"rerank_proxy_siliconflow": "rerank_proxy_siliconflow",
}


Expand Down
12 changes: 12 additions & 0 deletions dbgpt/model/adapter/embeddings_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ def load_rerank_model(
if proxy_param.proxy_backend:
openapi_param["model_name"] = proxy_param.proxy_backend
return OpenAPIRerankEmbeddings(**openapi_param)
elif model_name in ["rerank_proxy_siliconflow"]:
from dbgpt.rag.embedding.rerank import SiliconFlowRerankEmbeddings

proxy_param = cast(ProxyEmbeddingParameters, param)
openapi_param = {}
if proxy_param.proxy_server_url:
openapi_param["api_url"] = proxy_param.proxy_server_url
if proxy_param.proxy_api_key:
openapi_param["api_key"] = proxy_param.proxy_api_key
if proxy_param.proxy_backend:
openapi_param["model_name"] = proxy_param.proxy_backend
return SiliconFlowRerankEmbeddings(**openapi_param)
else:
from dbgpt.rag.embedding.rerank import CrossEncoderRerankEmbeddings

Expand Down
6 changes: 3 additions & 3 deletions dbgpt/model/adapter/proxy_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def support_async(self) -> bool:
return True

def do_match(self, lower_model_name_or_path: Optional[str] = None):
return lower_model_name_or_path == "silicon_flow_proxyllm"
return lower_model_name_or_path == "siliconflow_proxyllm"

def get_llm_client_class(
self, params: ProxyModelParameters
Expand All @@ -359,9 +359,9 @@ def get_llm_client_class(
return SiliconFlowLLMClient

def get_async_generate_stream_function(self, model, model_path: str):
from dbgpt.model.proxy.llms.siliconflow import silicon_flow_generate_stream
from dbgpt.model.proxy.llms.siliconflow import siliconflow_generate_stream

return silicon_flow_generate_stream
return siliconflow_generate_stream


register_model_adapter(OpenAIProxyLLMModelAdapter)
Expand Down
12 changes: 10 additions & 2 deletions dbgpt/model/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,16 @@ def is_rerank_model(self) -> bool:


_EMBEDDING_PARAMETER_CLASS_TO_NAME_CONFIG = {
ProxyEmbeddingParameters: "proxy_openai,proxy_azure,proxy_http_openapi,proxy_ollama,proxy_tongyi,proxy_qianfan,rerank_proxy_http_openapi",
ProxyEmbeddingParameters: [
"proxy_openai",
"proxy_azure",
"proxy_http_openapi",
"proxy_ollama",
"proxy_tongyi",
"proxy_qianfan",
"rerank_proxy_http_openapi",
"rerank_proxy_siliconflow",
]
}

EMBEDDING_NAME_TO_PARAMETER_CLASS_CONFIG = {}
Expand All @@ -622,7 +631,6 @@ def is_rerank_model(self) -> bool:
def _update_embedding_config():
global EMBEDDING_NAME_TO_PARAMETER_CLASS_CONFIG
for param_cls, models in _EMBEDDING_PARAMETER_CLASS_TO_NAME_CONFIG.items():
models = [m.strip() for m in models.split(",")]
for model in models:
if model not in EMBEDDING_NAME_TO_PARAMETER_CLASS_CONFIG:
EMBEDDING_NAME_TO_PARAMETER_CLASS_CONFIG[model] = param_cls
Expand Down
16 changes: 8 additions & 8 deletions dbgpt/model/proxy/llms/siliconflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
ClientType = Union[AsyncAzureOpenAI, AsyncOpenAI]


_SILICON_FLOW_DEFAULT_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct"
_SILICONFLOW_DEFAULT_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct"


async def silicon_flow_generate_stream(
async def siliconflow_generate_stream(
model: ProxyModel, tokenizer, params, device, context_len=2048
):
client: SiliconFlowLLMClient = model.proxy_llm_client
Expand All @@ -39,19 +39,19 @@ def __init__(
model: Optional[str] = None,
proxies: Optional["ProxiesTypes"] = None,
timeout: Optional[int] = 240,
model_alias: Optional[str] = "silicon_flow_proxyllm",
model_alias: Optional[str] = "siliconflow_proxyllm",
context_length: Optional[int] = None,
openai_client: Optional["ClientType"] = None,
openai_kwargs: Optional[Dict[str, Any]] = None,
**kwargs
):
api_base = (
api_base
or os.getenv("SILICON_FLOW_API_BASE")
or os.getenv("SILICONFLOW_API_BASE")
or "https://api.siliconflow.cn/v1"
)
api_key = api_key or os.getenv("SILICON_FLOW_API_KEY")
model = model or _SILICON_FLOW_DEFAULT_MODEL
api_key = api_key or os.getenv("SILICONFLOW_API_KEY")
model = model or _SILICONFLOW_DEFAULT_MODEL
if not context_length:
if "200k" in model:
context_length = 200 * 1024
Expand All @@ -60,7 +60,7 @@ def __init__(

if not api_key:
raise ValueError(
"SiliconFlow API key is required, please set 'SILICON_FLOW_API_KEY' in environment "
"SiliconFlow API key is required, please set 'SILICONFLOW_API_KEY' in environment "
"or pass it as an argument."
)

Expand All @@ -83,5 +83,5 @@ def __init__(
def default_model(self) -> str:
model = self._model
if not model:
model = _SILICON_FLOW_DEFAULT_MODEL
model = _SILICONFLOW_DEFAULT_MODEL
return model
20 changes: 15 additions & 5 deletions dbgpt/model/utils/chatgpt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class OpenAIParameters:
api_azure_deployment: Optional[str] = None
full_url: Optional[str] = None
proxies: Optional["ProxiesTypes"] = None
proxy: Optional["ProxyTypes"] = None


def _initialize_openai_v1(init_params: OpenAIParameters):
Expand Down Expand Up @@ -142,19 +143,28 @@ def _build_openai_client(init_params: OpenAIParameters) -> Tuple[str, ClientType
if api_type == "azure":
from openai import AsyncAzureOpenAI

return api_type, AsyncAzureOpenAI(
async_client = AsyncAzureOpenAI(
api_key=openai_params["api_key"],
api_version=api_version,
azure_deployment=api_azure_deployment,
azure_endpoint=openai_params["base_url"],
http_client=httpx.AsyncClient(proxies=init_params.proxies),
)
else:
from openai import AsyncOpenAI

return api_type, AsyncOpenAI(
**openai_params, http_client=httpx.AsyncClient(proxies=init_params.proxies)
)
# Remove proxies for httpx AsyncClient when httpx version >= 0.28.0
httpx_version = metadata.version("httpx")
if httpx_version >= "0.28.0":
if init_params.proxy:
http_client = httpx.AsyncClient(proxy=init_params.proxy)
else:
http_client = httpx.AsyncClient()
elif init_params.proxies:
http_client = httpx.AsyncClient(proxies=init_params.proxies)
else:
http_client = httpx.AsyncClient()
async_client = AsyncOpenAI(**openai_params, http_client=http_client)
return api_type, async_client


class OpenAIStreamingOutputOperator(TransformStreamAbsOperator[ModelOutput, str]):
Expand Down
13 changes: 11 additions & 2 deletions dbgpt/rag/assembler/embedding.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Embedding Assembler."""

from concurrent.futures import ThreadPoolExecutor
from typing import Any, List, Optional

Expand Down Expand Up @@ -130,7 +131,11 @@ def persist(self, **kwargs) -> List[str]:
Returns:
List[str]: List of chunk ids.
"""
return self._index_store.load_document(self._chunks)
max_chunks_once_load = kwargs.get("max_chunks_once_load", 10)
max_threads = kwargs.get("max_threads", 1)
return self._index_store.load_document_with_limit(
self._chunks, max_chunks_once_load, max_threads
)

async def apersist(self, **kwargs) -> List[str]:
"""Persist chunks into store.
Expand All @@ -139,7 +144,11 @@ async def apersist(self, **kwargs) -> List[str]:
List[str]: List of chunk ids.
"""
# persist chunks into vector store
return await self._index_store.aload_document(self._chunks)
max_chunks_once_load = kwargs.get("max_chunks_once_load", 10)
max_threads = kwargs.get("max_threads", 1)
return await self._index_store.aload_document_with_limit(
self._chunks, max_chunks_once_load, max_threads
)

def _extract_info(self, chunks) -> List[Chunk]:
"""Extract info from chunks."""
Expand Down
7 changes: 6 additions & 1 deletion dbgpt/rag/embedding/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
QianFanEmbeddings,
TongYiEmbeddings,
)
from .rerank import CrossEncoderRerankEmbeddings, OpenAPIRerankEmbeddings # noqa: F401
from .rerank import ( # noqa: F401
CrossEncoderRerankEmbeddings,
OpenAPIRerankEmbeddings,
SiliconFlowRerankEmbeddings,
)

__ALL__ = [
"CrossEncoderRerankEmbeddings",
Expand All @@ -32,6 +36,7 @@
"OllamaEmbeddings",
"OpenAPIEmbeddings",
"OpenAPIRerankEmbeddings",
"SiliconFlowRerankEmbeddings",
"QianFanEmbeddings",
"TongYiEmbeddings",
"WrappedEmbeddingFactory",
Expand Down
71 changes: 67 additions & 4 deletions dbgpt/rag/embedding/rerank.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Re-rank embeddings."""

import os
from typing import Any, Dict, List, Optional, cast

import aiohttp
Expand Down Expand Up @@ -104,6 +105,24 @@ def __init__(self, **kwargs):
kwargs["session"] = session
super().__init__(**kwargs)

def _parse_results(self, response: Dict[str, Any]) -> List[float]:
"""Parse the response from the API.
Args:
response: The response from the API.
Returns:
List[float]: The rank scores of the candidates.
"""
data = response.get("data")
if not data:
if "detail" in response:
raise RuntimeError(response["detail"])
raise RuntimeError("Cannot find results in the response")
if not isinstance(data, list):
raise RuntimeError("Results should be a list")
return data

def predict(self, query: str, candidates: List[str]) -> List[float]:
"""Predict the rank scores of the candidates.
Expand All @@ -126,7 +145,7 @@ def predict(self, query: str, candidates: List[str]) -> List[float]:
self.api_url, json=data, timeout=self.timeout, headers=headers
)
response.raise_for_status()
return response.json()["data"]
return self._parse_results(response.json())

async def apredict(self, query: str, candidates: List[str]) -> List[float]:
"""Predict the rank scores of the candidates asynchronously."""
Expand All @@ -142,6 +161,50 @@ async def apredict(self, query: str, candidates: List[str]) -> List[float]:
async with session.post(self.api_url, json=data) as resp:
resp.raise_for_status()
response_data = await resp.json()
if "data" not in response_data:
raise RuntimeError(response_data["detail"])
return response_data["data"]
return self._parse_results(response_data)


class SiliconFlowRerankEmbeddings(OpenAPIRerankEmbeddings):
"""SiliconFlow Rerank Model.
See `SiliconFlow API
<https://docs.siliconflow.cn/api-reference/rerank/create-rerank>`_ for more details.
"""

def __init__(self, **kwargs: Any):
"""Initialize the SiliconFlowRerankEmbeddings."""
# If the API key is not provided, try to get it from the environment
if "api_key" not in kwargs:
kwargs["api_key"] = os.getenv("SILICONFLOW_API_KEY")

if "api_url" not in kwargs:
env_api_url = os.getenv("SILICONFLOW_API_BASE")
if env_api_url:
env_api_url = env_api_url.rstrip("/")
kwargs["api_url"] = env_api_url + "/rerank"
else:
kwargs["api_url"] = "https://api.siliconflow.cn/v1/rerank"

if "model_name" not in kwargs:
kwargs["model_name"] = "BAAI/bge-reranker-v2-m3"

super().__init__(**kwargs)

def _parse_results(self, response: Dict[str, Any]) -> List[float]:
"""Parse the response from the API.
Args:
response: The response from the API.
Returns:
List[float]: The rank scores of the candidates.
"""
results = response.get("results")
if not results:
raise RuntimeError("Cannot find results in the response")
if not isinstance(results, list):
raise RuntimeError("Results should be a list")
# Sort by index, 0 in the first element
results = sorted(results, key=lambda x: x.get("index", 0))
scores = [float(result.get("relevance_score")) for result in results]
return scores
7 changes: 4 additions & 3 deletions dbgpt/rag/operators/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dbgpt.util.i18n_utils import _


class KnowledgeOperator(MapOperator[str, Knowledge]):
class KnowledgeOperator(MapOperator[dict, Knowledge]):
"""Knowledge Factory Operator."""

metadata = ViewMetadata(
Expand Down Expand Up @@ -91,10 +91,11 @@ def __init__(

async def map(self, datasource: dict) -> Knowledge:
"""Create knowledge from datasource."""
source = datasource.get("source")
if self._datasource:
datasource = self._datasource
source = self._datasource
return await self.blocking_func_to_async(
KnowledgeFactory.create, datasource, self._knowledge_type
KnowledgeFactory.create, source, self._knowledge_type
)


Expand Down
Loading

0 comments on commit d817230

Please sign in to comment.