Python API
RAGFlow Python API 的完整参考。在继续之前,请确保您已准备好 RAGFlow API 密钥以进行身份验证。
运行以下命令以下载 Python SDK
pip install ragflow-sdk
错误代码
代码 | 消息 | 描述 |
---|---|---|
400 | 错误的请求 | 无效的请求参数 |
401 | 未授权 | 未授权的访问 |
403 | 禁止 | 访问被拒绝 |
404 | 未找到 | 资源未找到 |
500 | 内部服务器错误 | 服务器内部错误 |
1001 | 无效的 Chunk ID | 无效的 Chunk ID |
1002 | Chunk 更新失败 | Chunk 更新失败 |
OpenAI 兼容 API
创建聊天补全
通过 OpenAI 的 API 为给定的历史聊天对话创建模型响应。
参数
model: str
, 必需
用于生成响应的模型。服务器会自动解析此参数,因此您现在可以将其设置为任何值。
messages: list[object]
, 必需
用于生成响应的历史聊天消息列表。此列表必须至少包含一条角色为 user
的消息。
stream: boolean
是否以流式接收响应。如果您希望一次性接收整个响应而不是以流式接收,请将此项明确设置为 false
。
返回
- 成功:响应消息类似 OpenAI
- 失败:
Exception
示例
from openai import OpenAI
model = "model"
client = OpenAI(api_key="ragflow-api-key", base_url=f"http://ragflow_address/api/v1/chats_openai/<chat_id>")
stream = True
reference = True
completion = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Who are you?"},
{"role": "assistant", "content": "I am an AI assistant named..."},
{"role": "user", "content": "Can you tell me how to install neovim"},
],
stream=stream,
extra_body={"reference": reference}
)
if stream:
for chunk in completion:
print(chunk)
if reference and chunk.choices[0].finish_reason == "stop":
print(f"Reference:\n{chunk.choices[0].delta.reference}")
print(f"Final content:\n{chunk.choices[0].delta.final_content}")
else:
print(completion.choices[0].message.content)
if reference:
print(completion.choices[0].message.reference)
数据集管理
创建数据集
RAGFlow.create_dataset(
name: str,
avatar: Optional[str] = None,
description: Optional[str] = None,
embedding_model: Optional[str] = "BAAI/bge-large-zh-v1.5@BAAI",
permission: str = "me",
chunk_method: str = "naive",
parser_config: DataSet.ParserConfig = None
) -> DataSet
创建一个数据集。
参数
name: str
, 必需
要创建的数据集的唯一名称。它必须遵循以下要求
- 最多 128 个字符。
- 不区分大小写。
avatar: str
头像的 Base64 编码。默认为 None
description: str
要创建的数据集的简要描述。默认为 None
。
permission
指定谁可以访问要创建的数据集。可用选项
"me"
:(默认)只有您可以管理数据集。"team"
:所有团队成员都可以管理数据集。
chunk_method, str
要创建的数据集的分块方法。可用选项
"naive"
:通用(默认)"manual
:手动"qa"
:问答"table"
:表格"paper"
:论文"book"
:书籍"laws"
:法律"presentation"
:演示文稿"picture"
:图片"one"
:整体"email"
:邮件
parser_config
数据集的解析器配置。ParserConfig
对象的属性根据所选的 chunk_method
而变化
chunk_method
="naive"
{"chunk_token_num":512,"delimiter":"\\n","html4excel":False,"layout_recognize":True,"raptor":{"use_raptor":False}}
.chunk_method
="qa"
{"raptor": {"use_raptor": False}}
chunk_method
="manuel"
{"raptor": {"use_raptor": False}}
chunk_method
="table"
无
chunk_method
="paper"
{"raptor": {"use_raptor": False}}
chunk_method
="book"
{"raptor": {"use_raptor": False}}
chunk_method
="laws"
{"raptor": {"use_raptor": False}}
chunk_method
="picture"
无
chunk_method
="presentation"
{"raptor": {"use_raptor": False}}
chunk_method
="one"
无
chunk_method
="knowledge-graph"
{"chunk_token_num":128,"delimiter":"\\n","entity_types":["organization","person","location","event","time"]}
chunk_method
="email"
无
返回
- 成功:一个
dataset
对象。 - 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="kb_1")
删除数据集
RAGFlow.delete_datasets(ids: list[str] | None = None)
通过 ID 删除数据集。
参数
ids: list[str]
或 None
, 必需
要删除的数据集的 ID。默认为 None
。
- 如果为
None
,将删除所有数据集。 - 如果是一个 ID 数组,将只删除指定的数据集。
- 如果是一个空数组,将不会删除任何数据集。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
rag_object.delete_datasets(ids=["d94a8dc02c9711f0930f7fbc369eab6d","e94a8dc02c9711f0930f7fbc369eab6e"])
列出数据集
RAGFlow.list_datasets(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
name: str = None
) -> list[DataSet]
列出数据集。
参数
page: int
指定显示数据集的页面。默认为 1
。
page_size: int
每页的数据集数量。默认为 30
。
orderby: str
数据集排序的字段。可用选项
"create_time"
(默认)"update_time"
desc: bool
指示检索到的数据集是否应按降序排序。默认为 True
。
id: str
要检索的数据集的 ID。默认为 None
。
name: str
要检索的数据集的名称。默认为 None
。
返回
- 成功:
DataSet
对象列表。 - 失败:
Exception
。
示例
列出所有数据集
for dataset in rag_object.list_datasets():
print(dataset)
通过 ID 检索数据集
dataset = rag_object.list_datasets(id = "id_1")
print(dataset[0])
更新数据集
DataSet.update(update_message: dict)
更新当前数据集的配置。
参数
update_message: dict[str, str|int]
, 必需
表示要更新的属性的字典,包含以下键
"name"
:str
数据集的修订名称。- 仅限基本多文种平面(BMP)
- 最多 128 个字符
- 不区分大小写
"avatar"
:(正文参数),string
更新后的头像 Base64 编码。- 最多 65535 个字符
"embedding_model"
:(正文参数),string
更新后的嵌入模型名称。- 在更新
"embedding_model"
之前,请确保"chunk_count"
为0
。 - 最多 255 个字符
- 必须遵循
model_name@model_factory
格式
- 在更新
"permission"
:(正文参数),string
更新后的数据集权限。可用选项"me"
:(默认)只有您可以管理数据集。"team"
:所有团队成员都可以管理数据集。
"pagerank"
:(正文参数),int
请参阅设置页面排名- 默认:
0
- 最小:
0
- 最大:
100
- 默认:
"chunk_method"
:(正文参数),enum<string>
数据集的分块方法。可用选项"naive"
:通用(默认)"book"
:书籍"email"
:邮件"laws"
:法律"manual"
:手动"one"
:整体"paper"
:论文"picture"
:图片"presentation"
:演示文稿"qa"
:问答"table"
:表格"tag"
:标签
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(name="kb_name")
dataset = dataset[0]
dataset.update({"embedding_model":"BAAI/bge-zh-v1.5", "chunk_method":"manual"})
数据集内的文件管理
上传文档
DataSet.upload_documents(document_list: list[dict])
将文档上传到当前数据集。
参数
document_list: list[dict]
, 必需
表示要上传的文档的字典列表,每个字典包含以下键
"display_name"
:(可选)数据集中显示的文件名。"blob"
:(可选)要上传文件的二进制内容。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
dataset = rag_object.create_dataset(name="kb_name")
dataset.upload_documents([{"display_name": "1.txt", "blob": "<BINARY_CONTENT_OF_THE_DOC>"}, {"display_name": "2.pdf", "blob": "<BINARY_CONTENT_OF_THE_DOC>"}])
更新文档
Document.update(update_message:dict)
更新当前文档的配置。
参数
update_message: dict[str, str|dict[]]
, 必需
表示要更新的属性的字典,包含以下键
"display_name"
:str
要更新的文档名称。"meta_fields"
:dict[str, Any]
文档的元字段。"chunk_method"
:str
应用于文档的解析方法。"naive"
:通用"manual
:手动"qa"
:问答"table"
:表格"paper"
:论文"book"
:书籍"laws"
:法律"presentation"
:演示文稿"picture"
:图片"one"
:整体"email"
:邮件
"parser_config"
:dict[str, Any]
文档的解析配置。其属性根据所选的"chunk_method"
而变化"chunk_method"
="naive"
{"chunk_token_num":128,"delimiter":"\\n","html4excel":False,"layout_recognize":True,"raptor":{"use_raptor":False}}
.chunk_method
="qa"
{"raptor": {"use_raptor": False}}
chunk_method
="manuel"
{"raptor": {"use_raptor": False}}
chunk_method
="table"
无
chunk_method
="paper"
{"raptor": {"use_raptor": False}}
chunk_method
="book"
{"raptor": {"use_raptor": False}}
chunk_method
="laws"
{"raptor": {"use_raptor": False}}
chunk_method
="presentation"
{"raptor": {"use_raptor": False}}
chunk_method
="picture"
无
chunk_method
="one"
无
chunk_method
="knowledge-graph"
{"chunk_token_num":128,"delimiter":"\\n","entity_types":["organization","person","location","event","time"]}
chunk_method
="email"
无
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id='id')
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
doc.update([{"parser_config": {"chunk_token_num": 256}}, {"chunk_method": "manual"}])
下载文档
Document.download() -> bytes
下载当前文档。
返回
下载的文档(字节)。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id="id")
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
open("~/ragflow.txt", "wb+").write(doc.download())
print(doc)
列出文档
Dataset.list_documents(
id: str = None,
keywords: str = None,
page: int = 1,
page_size: int = 30,
order_by: str = "create_time",
desc: bool = True,
create_time_from: int = 0,
create_time_to: int = 0
) -> list[Document]
列出当前数据集中的文档。
参数
id: str
要检索的文档的 ID。默认为 None
。
keywords: str
用于匹配文档标题的关键字。默认为 None
。
page: int
指定显示文档的页面。默认为 1
。
page_size: int
每页的最大文档数。默认为 30
。
orderby: str
文档排序的字段。可用选项
"create_time"
(默认)"update_time"
desc: bool
指示检索到的文档是否应按降序排序。默认为 True
。
create_time_from: int
用于筛选此时间之后创建的文档的 Unix 时间戳。0 表示不过滤。默认为 0。
create_time_to: int
用于筛选此时间之前创建的文档的 Unix 时间戳。0 表示不过滤。默认为 0。
返回
- 成功:
Document
对象列表。 - 失败:
Exception
。
一个 Document
对象包含以下属性
id
:文档 ID。默认为""
。name
:文档名称。默认为""
。thumbnail
:文档的缩略图。默认为None
。dataset_id
:与文档关联的数据集 ID。默认为None
。chunk_method
分块方法名称。默认为"naive"
。source_type
:文档的源类型。默认为"local"
。type
:文档的类型或类别。默认为""
。保留供将来使用。created_by
:str
文档的创建者。默认为""
。size
:int
文档大小(字节)。默认为0
。token_count
:int
文档中的令牌数。默认为0
。chunk_count
:int
文档中的块数。默认为0
。progress
:float
当前处理进度的百分比。默认为0.0
。progress_msg
:str
指示当前进度状态的消息。默认为""
。process_begin_at
:datetime
文档处理的开始时间。默认为None
。process_duration
:float
处理持续时间(秒)。默认为0.0
。run
:str
文档的处理状态"UNSTART"
(默认)"RUNNING"
"CANCEL"
"DONE"
"FAIL"
status
:str
保留供将来使用。parser_config
:ParserConfig
解析器的配置对象。其属性根据所选的chunk_method
而变化chunk_method
="naive"
{"chunk_token_num":128,"delimiter":"\\n","html4excel":False,"layout_recognize":True,"raptor":{"use_raptor":False}}
.chunk_method
="qa"
{"raptor": {"use_raptor": False}}
chunk_method
="manuel"
{"raptor": {"use_raptor": False}}
chunk_method
="table"
无
chunk_method
="paper"
{"raptor": {"use_raptor": False}}
chunk_method
="book"
{"raptor": {"use_raptor": False}}
chunk_method
="laws"
{"raptor": {"use_raptor": False}}
chunk_method
="presentation"
{"raptor": {"use_raptor": False}}
chunk_method
="picure"
无
chunk_method
="one"
无
chunk_method
="email"
无
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="kb_1")
filename1 = "~/ragflow.txt"
blob = open(filename1 , "rb").read()
dataset.upload_documents([{"name":filename1,"blob":blob}])
for doc in dataset.list_documents(keywords="rag", page=0, page_size=12):
print(doc)
删除文档
DataSet.delete_documents(ids: list[str] = None)
通过 ID 删除文档。
参数
ids: list[list]
要删除的文档的 ID。默认为 None
。如果未指定,将删除数据集中的所有文档。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(name="kb_1")
dataset = dataset[0]
dataset.delete_documents(ids=["id_1","id_2"])
解析文档
DataSet.async_parse_documents(document_ids:list[str]) -> None
解析当前数据集中的文档。
参数
document_ids: list[str]
, 必需
要解析的文档的 ID。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="dataset_name")
documents = [
{'display_name': 'test1.txt', 'blob': open('./test_data/test1.txt',"rb").read()},
{'display_name': 'test2.txt', 'blob': open('./test_data/test2.txt',"rb").read()},
{'display_name': 'test3.txt', 'blob': open('./test_data/test3.txt',"rb").read()}
]
dataset.upload_documents(documents)
documents = dataset.list_documents(keywords="test")
ids = []
for document in documents:
ids.append(document.id)
dataset.async_parse_documents(ids)
print("Async bulk parsing initiated.")
停止解析文档
DataSet.async_cancel_parse_documents(document_ids:list[str])-> None
停止解析指定的文档。
参数
document_ids: list[str]
, 必需
应停止解析的文档的 ID。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="dataset_name")
documents = [
{'display_name': 'test1.txt', 'blob': open('./test_data/test1.txt',"rb").read()},
{'display_name': 'test2.txt', 'blob': open('./test_data/test2.txt',"rb").read()},
{'display_name': 'test3.txt', 'blob': open('./test_data/test3.txt',"rb").read()}
]
dataset.upload_documents(documents)
documents = dataset.list_documents(keywords="test")
ids = []
for document in documents:
ids.append(document.id)
dataset.async_parse_documents(ids)
print("Async bulk parsing initiated.")
dataset.async_cancel_parse_documents(ids)
print("Async bulk parsing cancelled.")
数据集内的块管理
添加块
Document.add_chunk(content:str, important_keywords:list[str] = []) -> Chunk
向当前文档添加一个块。
参数
content: str
, 必需
块的文本内容。
important_keywords: list[str]
要用块标记的关键字或短语。
返回
- 成功:一个
Chunk
对象。 - 失败:
Exception
。
一个 Chunk
对象包含以下属性
id
:str
:块 ID。content
:str
块的文本内容。important_keywords
:list[str]
用块标记的关键字或短语列表。create_time
:str
块创建的时间(添加到文档的时间)。create_timestamp
:float
表示块创建时间的时间戳,以自 1970 年 1 月 1 日以来的秒数表示。dataset_id
:str
关联数据集的 ID。document_name
:str
关联文档的名称。document_id
:str
关联文档的 ID。available
:bool
块在数据集中的可用性状态。值选项False
:不可用True
:可用(默认)
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
datasets = rag_object.list_datasets(id="123")
dataset = datasets[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
chunk = doc.add_chunk(content="xxxxxxx")
列出块
Document.list_chunks(keywords: str = None, page: int = 1, page_size: int = 30, id : str = None) -> list[Chunk]
列出当前文档中的块。
参数
keywords: str
用于匹配块内容的关键字。默认为 None
page: int
指定显示块的页面。默认为 1
。
page_size: int
每页的最大块数。默认为 30
。
id: str
要检索的块的 ID。默认:None
返回
- 成功:
Chunk
对象列表。 - 失败:
Exception
。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets("123")
dataset = dataset[0]
docs = dataset.list_documents(keywords="test", page=1, page_size=12)
for chunk in docs[0].list_chunks(keywords="rag", page=0, page_size=12):
print(chunk)
删除块
Document.delete_chunks(chunk_ids: list[str])
通过 ID 删除块。
参数
chunk_ids: list[str]
要删除的块的 ID。默认为 None
。如果未指定,将删除当前文档的所有块。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id="123")
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
chunk = doc.add_chunk(content="xxxxxxx")
doc.delete_chunks(["id_1","id_2"])
更新块
Chunk.update(update_message: dict)
更新当前块的内容或配置。
参数
update_message: dict[str, str|list[str]|int]
必需
表示要更新的属性的字典,包含以下键
"content"
:str
块的文本内容。"important_keywords"
:list[str]
要用块标记的关键字或短语列表。"available"
:bool
块在数据集中的可用性状态。值选项False
:不可用True
:可用(默认)
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id="123")
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
chunk = doc.add_chunk(content="xxxxxxx")
chunk.update({"content":"sdfx..."})
检索块
RAGFlow.retrieve(question:str="", dataset_ids:list[str]=None, document_ids=list[str]=None, page:int=1, page_size:int=30, similarity_threshold:float=0.2, vector_similarity_weight:float=0.3, top_k:int=1024,rerank_id:str=None,keyword:bool=False,highlight:bool=False) -> list[Chunk]
从指定的数据集中检索块。
参数
question: str
, 必需
用户查询或查询关键字。默认为 ""
。
dataset_ids: list[str]
, 必需
要搜索的数据集的 ID。默认为 None
。
document_ids: list[str]
要搜索的文档的 ID。默认为 None
。您必须确保所有选定的文档都使用相同的嵌入模型。否则,将会发生错误。
page: int
要检索的文档的起始索引。默认为 1
。
page_size: int
要检索的最大块数。默认为 30
。
Similarity_threshold: float
最小相似度分数。默认为 0.2
。
vector_similarity_weight: float
向量余弦相似度的权重。默认为 0.3
。如果 x 表示向量余弦相似度,则 (1 - x) 是词项相似度的权重。
top_k: int
参与向量余弦计算的块数。默认为 1024
。
rerank_id: str
重排模型的 ID。默认为 None
。
keyword: bool
指示是否启用基于关键字的匹配
True
:启用基于关键字的匹配。False
:禁用基于关键字的匹配(默认)。
highlight: bool
指定是否在结果中启用匹配词项的高亮显示
True
:启用匹配词项的高亮显示。False
:禁用匹配词项的高亮显示(默认)。
cross_languages: list[string]
应翻译成的语言,以实现不同语言的关键字检索。
返回
- 成功:表示文档块的
Chunk
对象列表。 - 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(name="ragflow")
dataset = dataset[0]
name = 'ragflow_test.txt'
path = './test_data/ragflow_test.txt'
documents =[{"display_name":"test_retrieve_chunks.txt","blob":open(path, "rb").read()}]
docs = dataset.upload_documents(documents)
doc = docs[0]
doc.add_chunk(content="This is a chunk addition test")
for c in rag_object.retrieve(dataset_ids=[dataset.id],document_ids=[doc.id]):
print(c)
聊天助手管理
创建聊天助手
RAGFlow.create_chat(
name: str,
avatar: str = "",
dataset_ids: list[str] = [],
llm: Chat.LLM = None,
prompt: Chat.Prompt = None
) -> Chat
创建一个聊天助手。
参数
name: str
, 必需
聊天助手的名称。
avatar: str
头像的 Base64 编码。默认为 ""
。
dataset_ids: list[str]
关联数据集的 ID。默认为 [""]
。
llm: Chat.LLM
要创建的聊天助手的 LLM 设置。默认为 None
。当值为 None
时,将生成一个具有以下值的字典作为默认值。一个 LLM
对象包含以下属性
model_name
:str
聊天模型的名称。如果为None
,将使用用户的默认聊天模型。temperature
:float
控制模型预测的随机性。较低的温度会产生更保守的响应,而较高的温度会产生更具创造性和多样性的响应。默认为0.1
。top_p
:float
也称为“核心采样”,此参数设置一个阈值,以从中选择一个较小的词集进行采样。它专注于最可能的词,并ตัด掉不太可能的词。默认为0.3
presence_penalty
:float
通过惩罚对话中已出现的词,阻止模型重复相同的信息。默认为0.2
。frequency penalty
:float
与存在惩罚类似,这会降低模型频繁重复相同词的倾向。默认为0.7
。
prompt: Chat.Prompt
LLM 要遵循的指令。一个 Prompt
对象包含以下属性
similarity_threshold
:float
RAGFlow 在检索期间采用加权关键字相似度和加权向量余弦相似度的组合,或加权关键字相似度和加权重排分数的组合。如果相似度分数低于此阈值,相应的块将从结果中排除。默认值为0.2
。keywords_similarity_weight
:float
此参数设置混合相似度分数中关键字相似度的权重,该分数与向量余弦相似度或重排模型相似度相关。通过调整此权重,您可以控制关键字相似度相对于其他相似度度量的影响。默认值为0.7
。top_n
:int
此参数指定馈送到 LLM 的相似度分数高于similarity_threshold
的顶部块的数量。LLM 将仅访问这些“top N”块。默认值为8
。variables
:list[dict[]]
此参数列出了在聊天配置的“系统”字段中使用的变量。请注意knowledge
是一个保留变量,表示检索到的块。- “系统”中的所有变量都应用花括号括起来。
- 默认值为
[{"key": "knowledge", "optional": True}]
。
rerank_model
:str
如果未指定,将使用向量余弦相似度;否则,将使用重排分数。默认为""
。top_k
:int
指的是根据特定的排名标准对列表或集合中的项目进行重新排序或选择 top-k 项的过程。默认为 1024。empty_response
:str
如果数据集中未检索到用户问题的任何内容,这将用作响应。要允许 LLM 在未找到任何内容时即兴发挥,请将此项留空。默认为None
。opener
:str
对用户的开场白。默认为"Hi! I am your assistant, can I help you?"
。show_quote
:bool
指示是否应显示文本来源。默认为True
。prompt
:str
提示内容。
返回
- 成功:表示聊天助手的
Chat
对象。 - 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
datasets = rag_object.list_datasets(name="kb_1")
dataset_ids = []
for dataset in datasets:
dataset_ids.append(dataset.id)
assistant = rag_object.create_chat("Miss R", dataset_ids=dataset_ids)
更新聊天助手
Chat.update(update_message: dict)
更新当前聊天助手的配置。
参数
update_message: dict[str, str|list[str]|dict[]]
, 必需
表示要更新的属性的字典,包含以下键
"name"
:str
聊天助手的修订名称。"avatar"
:str
头像的 Base64 编码。默认为""
"dataset_ids"
:list[str]
要更新的数据集。"llm"
:dict
LLM 设置"model_name"
,str
聊天模型名称。"temperature"
,float
控制模型预测的随机性。较低的温度会产生更保守的响应,而较高的温度会产生更具创造性和多样性的响应。"top_p"
,float
也称为“核心采样”,此参数设置一个阈值,以从中选择一个较小的词集进行采样。"presence_penalty"
,float
通过惩罚对话中已出现的词,阻止模型重复相同的信息。"frequency penalty"
,float
与存在惩罚类似,这会降低模型重复相同词的倾向。
"prompt"
: LLM 要遵循的指令。"similarity_threshold"
:float
RAGFlow 在检索期间采用加权关键字相似度和加权向量余弦相似度的组合,或加权关键字相似度和加权重排分数的组合。此参数设置用户查询和块之间相似度的阈值。如果相似度分数低于此阈值,相应的块将从结果中排除。默认值为0.2
。"keywords_similarity_weight"
:float
此参数设置混合相似度分数中关键字相似度的权重,该分数与向量余弦相似度或重排模型相似度相关。通过调整此权重,您可以控制关键字相似度相对于其他相似度度量的影响。默认值为0.7
。"top_n"
:int
此参数指定馈送到 LLM 的相似度分数高于similarity_threshold
的顶部块的数量。LLM 将仅访问这些“top N”块。默认值为8
。"variables"
:list[dict[]]
此参数列出了在聊天配置的“系统”字段中使用的变量。请注意knowledge
是一个保留变量,表示检索到的块。- “系统”中的所有变量都应用花括号括起来。
- 默认值为
[{"key": "knowledge", "optional": True}]
。
"rerank_model"
:str
如果未指定,将使用向量余弦相似度;否则,将使用重排分数。默认为""
。"empty_response"
:str
如果数据集中未检索到用户问题的任何内容,这将用作响应。要允许 LLM 在未检索到任何内容时即兴发挥,请将此项留空。默认为None
。"opener"
:str
对用户的开场白。默认为"Hi! I am your assistant, can I help you?"
。"show_quote
:bool
指示是否应显示文本来源。默认为True
。"prompt"
:str
提示内容。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
datasets = rag_object.list_datasets(name="kb_1")
dataset_id = datasets[0].id
assistant = rag_object.create_chat("Miss R", dataset_ids=[dataset_id])
assistant.update({"name": "Stefan", "llm": {"temperature": 0.8}, "prompt": {"top_n": 8}})
删除聊天助手
RAGFlow.delete_chats(ids: list[str] = None)
通过 ID 删除聊天助手。
参数
ids: list[str]
要删除的聊天助手的 ID。默认为 None
。如果为空或未指定,将删除系统中的所有聊天助手。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.delete_chats(ids=["id_1","id_2"])
列出聊天助手
RAGFlow.list_chats(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
name: str = None
) -> list[Chat]
列出聊天助手。
参数
page: int
指定显示聊天助手的页面。默认为 1
。
page_size: int
每页的聊天助手数量。默认为 30
。
orderby: str
对结果进行排序的属性。可用选项
"create_time"
(默认)"update_time"
desc: bool
指示检索到的聊天助手是否应按降序排序。默认为 True
。
id: str
要检索的聊天助手的 ID。默认为 None
。
name: str
要检索的聊天助手的名称。默认为 None
。
返回
- 成功:
Chat
对象列表。 - 失败:
Exception
。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
for assistant in rag_object.list_chats():
print(assistant)
会话管理
与聊天助手创建会话
Chat.create_session(name: str = "New session") -> Session
与当前聊天助手创建一个会话。
参数
name: str
要创建的聊天会话的名称。
返回
- 成功:包含以下属性的
Session
对象id
:str
创建的会话的自动生成的唯一标识符。name
:str
创建的会话的名称。message
:list[Message]
创建的会话的开场消息。默认:[{"role": "assistant", "content": "Hi! I am your assistant, can I help you?"}]
chat_id
:str
关联聊天助手的 ID。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
session = assistant.create_session()
更新聊天助手的会话
Session.update(update_message: dict)
更新当前聊天助手的当前会话。
参数
update_message: dict[str, Any]
, 必需
表示要更新的属性的字典,只有一个键
"name"
:str
会话的修订名称。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
session = assistant.create_session("session_name")
session.update({"name": "updated_name"})
列出聊天助手的会话
Chat.list_sessions(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
name: str = None
) -> list[Session]
列出与当前聊天助手关联的会话。
参数
page: int
指定显示会话的页面。默认为 1
。
page_size: int
每页的会话数量。默认为 30
。
orderby: str
会话排序的字段。可用选项
"create_time"
(默认)"update_time"
desc: bool
指示检索到的会话是否应按降序排序。默认为 True
。
id: str
要检索的聊天会话的 ID。默认为 None
。
name: str
要检索的聊天会话的名称。默认为 None
。
返回
- 成功:与当前聊天助手关联的
Session
对象列表。 - 失败:
Exception
。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
for session in assistant.list_sessions():
print(session)
删除聊天助手的会话
Chat.delete_sessions(ids:list[str] = None)
通过 ID 删除当前聊天助手的会话。
参数
ids: list[str]
要删除的会话的 ID。默认为 None
。如果未指定,将删除与当前聊天助手关联的所有会话。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
assistant.delete_sessions(ids=["id_1","id_2"])
与聊天助手对话
Session.ask(question: str = "", stream: bool = False, **kwargs) -> Optional[Message, iter[Message]]
向指定的聊天助手提问,以开始一次由人工智能驱动的对话。
在流模式下,并非所有响应都包含引用,这取决于系统的判断。
参数
question: str
, 必需
开始由人工智能驱动的对话的问题。默认为 ""
stream: bool
指示是否以流方式输出响应
True
:启用流(默认)。False
:禁用流。
**kwargs
提示(系统)中的参数。
返回
- 如果
stream
设置为False
,则为包含问题响应的Message
对象。 - 如果
stream
设置为True
,则为包含多个message
对象的迭代器(iter[Message]
)
以下显示了 Message
对象的属性
id: str
自动生成的消息 ID。
content: str
消息的内容。默认为 "Hi! I am your assistant, can I help you?"
。
reference: list[Chunk]
表示对消息引用的 Chunk
对象列表,每个对象包含以下属性
id
str
块 ID。content
str
块的内容。img_id
str
块快照的 ID。仅当块的来源是图像、PPT、PPTX 或 PDF 文件时适用。document_id
str
引用的文档 ID。document_name
str
引用的文档名称。position
list[str]
块在引用文档中的位置信息。dataset_id
str
引用文档所属的数据集 ID。similarity
float
块的复合相似度分数,范围从0
到1
,值越高表示相似度越大。它是vector_similarity
和term_similarity
的加权和。vector_similarity
float
块的向量相似度分数,范围从0
到1
,值越高表示向量嵌入之间的相似度越大。term_similarity
float
块的关键字相似度分数,范围从0
到1
,值越高表示关键字之间的相似度越大。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
session = assistant.create_session()
print("\n==================== Miss R =====================\n")
print("Hello. What can I do for you?")
while True:
question = input("\n==================== User =====================\n> ")
print("\n==================== Miss R =====================\n")
cont = ""
for ans in session.ask(question, stream=True):
print(ans.content[len(cont):], end='', flush=True)
cont = ans.content
与智能体创建会话
Agent.create_session(**kwargs) -> Session
与当前智能体创建一个会话。
参数
**kwargs
begin
组件中的参数。
返回
- 成功:包含以下属性的
Session
对象id
:str
创建的会话的自动生成的唯一标识符。message
:list[Message]
创建的会话助手的消息。默认:[{"role": "assistant", "content": "Hi! I am your assistant, can I help you?"}]
agent_id
:str
关联智能体的 ID。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow, Agent
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
agent_id = "AGENT_ID"
agent = rag_object.list_agents(id = agent_id)[0]
session = agent.create_session()
与智能体对话
Session.ask(question: str="", stream: bool = False) -> Optional[Message, iter[Message]]
向指定的智能体提问,以开始一次由人工智能驱动的对话。
在流模式下,并非所有响应都包含引用,这取决于系统的判断。
参数
question: str
开始由人工智能驱动的对话的问题。如果开始组件接受参数,则不需要问题。
stream: bool
指示是否以流方式输出响应
True
:启用流(默认)。False
:禁用流。
返回
- 如果
stream
设置为False
,则为包含问题响应的Message
对象 - 如果
stream
设置为True
,则为包含多个message
对象的迭代器(iter[Message]
)
以下显示了 Message
对象的属性
id: str
自动生成的消息 ID。
content: str
消息的内容。默认为 "Hi! I am your assistant, can I help you?"
。
reference: list[Chunk]
表示对消息引用的 Chunk
对象列表,每个对象包含以下属性
id
str
块 ID。content
str
块的内容。image_id
str
块快照的 ID。仅当块的来源是图像、PPT、PPTX 或 PDF 文件时适用。document_id
str
引用的文档 ID。document_name
str
引用的文档名称。position
list[str]
块在引用文档中的位置信息。dataset_id
str
引用文档所属的数据集 ID。similarity
float
块的复合相似度分数,范围从0
到1
,值越高表示相似度越大。它是vector_similarity
和term_similarity
的加权和。vector_similarity
float
块的向量相似度分数,范围从0
到1
,值越高表示向量嵌入之间的相似度越大。term_similarity
float
块的关键字相似度分数,范围从0
到1
,值越高表示关键字之间的相似度越大。
示例
from ragflow_sdk import RAGFlow, Agent
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
AGENT_id = "AGENT_ID"
agent = rag_object.list_agents(id = AGENT_id)[0]
session = agent.create_session()
print("\n===== Miss R ====\n")
print("Hello. What can I do for you?")
while True:
question = input("\n===== User ====\n> ")
print("\n==== Miss R ====\n")
cont = ""
for ans in session.ask(question, stream=True):
print(ans.content[len(cont):], end='', flush=True)
cont = ans.content
列出智能体会话
Agent.list_sessions(
page: int = 1,
page_size: int = 30,
orderby: str = "update_time",
desc: bool = True,
id: str = None
) -> List[Session]
列出与当前智能体关联的会话。
参数
page: int
指定显示会话的页面。默认为 1
。
page_size: int
每页的会话数量。默认为 30
。
orderby: str
会话排序的字段。可用选项
"create_time"
"update_time"
(默认)
desc: bool
指示检索到的会话是否应按降序排序。默认为 True
。
id: str
要检索的智能体会话的 ID。默认为 None
。
返回
- 成功:与当前智能体关联的
Session
对象列表。 - 失败:
Exception
。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
AGENT_id = "AGENT_ID"
agent = rag_object.list_agents(id = AGENT_id)[0]
sessons = agent.list_sessions()
for session in sessions:
print(session)
删除智能体的会话
Agent.delete_sessions(ids: list[str] = None)
通过 ID 删除智能体的会话。
参数
ids: list[str]
要删除的会话的 ID。默认为 None
。如果未指定,将删除与智能体关联的所有会话。
返回
- 成功:不返回任何值。
- 失败:
Exception
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
AGENT_id = "AGENT_ID"
agent = rag_object.list_agents(id = AGENT_id)[0]
agent.delete_sessions(ids=["id_1","id_2"])
智能体管理
列出智能体
RAGFlow.list_agents(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
title: str = None
) -> List[Agent]
列出智能体。
参数
page: int
指定显示智能体的页面。默认为 1
。
page_size: int
每页的智能体数量。默认为 30
。
orderby: str
对结果进行排序的属性。可用选项
"create_time"
(默认)"update_time"
desc: bool
指示检索到的智能体是否应按降序排序。默认为 True
。
id: str
要检索的智能体的 ID。默认为 None
。
name: str
要检索的智能体的名称。默认为 None
。
返回
- 成功:
Agent
对象列表。 - 失败:
Exception
。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
for agent in rag_object.list_agents():
print(agent)
创建智能体
RAGFlow.create_agent(
title: str,
dsl: dict,
description: str | None = None
) -> None
创建一个智能体。
参数
title: str
指定智能体的标题。
dsl: dict
指定智能体的画布 DSL。
description: str
智能体的描述。默认为 None
。
返回
- 成功:无。
- 失败:
Exception
。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.create_agent(
title="Test Agent",
description="A test agent",
dsl={
# ... canvas DSL here ...
}
)
更新智能体
RAGFlow.update_agent(
agent_id: str,
title: str | None = None,
description: str | None = None,
dsl: dict | None = None
) -> None
更新一个智能体。
参数
agent_id: str
指定要更新的智能体的 ID。
title: str
指定智能体的新标题。如果您不想更新此项,则为 None
。
dsl: dict
指定智能体的新画布 DSL。如果您不想更新此项,则为 None
。
description: str
智能体的新描述。如果您不想更新此项,则为 None
。
返回
- 成功:无。
- 失败:
Exception
。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.update_agent(
agent_id="58af890a2a8911f0a71a11b922ed82d6",
title="Test Agent",
description="A test agent",
dsl={
# ... canvas DSL here ...
}
)
删除智能体
RAGFlow.delete_agent(
agent_id: str
) -> None
删除一个智能体。
参数
agent_id: str
指定要删除的智能体的 ID。
返回
- 成功:无。
- 失败:
Exception
。
示例
from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.delete_agent("58af890a2a8911f0a71a11b922ed82d6")