跳转到主内容
版本:开发版

Python API

RAGFlow Python API 的完整参考。在继续之前,请确保您已准备好 RAGFlow API 密钥以进行身份验证

注意

运行以下命令以下载 Python SDK

pip install ragflow-sdk

错误代码


代码消息描述
400错误的请求无效的请求参数
401未授权未授权的访问
403禁止访问被拒绝
404未找到资源未找到
500内部服务器错误服务器内部错误
1001无效的 Chunk ID无效的 Chunk ID
1002Chunk 更新失败Chunk 更新失败

OpenAI 兼容 API


创建聊天补全

通过 OpenAI 的 API 为给定的历史聊天对话创建模型响应。

参数

model: str, 必需

用于生成响应的模型。服务器会自动解析此参数,因此您现在可以将其设置为任何值。

messages: list[object], 必需

用于生成响应的历史聊天消息列表。此列表必须至少包含一条角色为 user 的消息。

stream: boolean

是否以流式接收响应。如果您希望一次性接收整个响应而不是以流式接收,请将此项明确设置为 false

返回

  • 成功:响应消息类似 OpenAI
  • 失败:Exception

示例

from openai import OpenAI

model = "model"
client = OpenAI(api_key="ragflow-api-key", base_url=f"http://ragflow_address/api/v1/chats_openai/<chat_id>")

stream = True
reference = True

completion = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Who are you?"},
{"role": "assistant", "content": "I am an AI assistant named..."},
{"role": "user", "content": "Can you tell me how to install neovim"},
],
stream=stream,
extra_body={"reference": reference}
)

if stream:
for chunk in completion:
print(chunk)
if reference and chunk.choices[0].finish_reason == "stop":
print(f"Reference:\n{chunk.choices[0].delta.reference}")
print(f"Final content:\n{chunk.choices[0].delta.final_content}")
else:
print(completion.choices[0].message.content)
if reference:
print(completion.choices[0].message.reference)

数据集管理


创建数据集

RAGFlow.create_dataset(
name: str,
avatar: Optional[str] = None,
description: Optional[str] = None,
embedding_model: Optional[str] = "BAAI/bge-large-zh-v1.5@BAAI",
permission: str = "me",
chunk_method: str = "naive",
parser_config: DataSet.ParserConfig = None
) -> DataSet

创建一个数据集。

参数

name: str, 必需

要创建的数据集的唯一名称。它必须遵循以下要求

  • 最多 128 个字符。
  • 不区分大小写。
avatar: str

头像的 Base64 编码。默认为 None

description: str

要创建的数据集的简要描述。默认为 None

permission

指定谁可以访问要创建的数据集。可用选项

  • "me":(默认)只有您可以管理数据集。
  • "team":所有团队成员都可以管理数据集。
chunk_method, str

要创建的数据集的分块方法。可用选项

  • "naive":通用(默认)
  • "manual:手动
  • "qa":问答
  • "table":表格
  • "paper":论文
  • "book":书籍
  • "laws":法律
  • "presentation":演示文稿
  • "picture":图片
  • "one":整体
  • "email":邮件
parser_config

数据集的解析器配置。ParserConfig 对象的属性根据所选的 chunk_method 而变化

  • chunk_method="naive"
    {"chunk_token_num":512,"delimiter":"\\n","html4excel":False,"layout_recognize":True,"raptor":{"use_raptor":False}}.
  • chunk_method="qa"
    {"raptor": {"use_raptor": False}}
  • chunk_method="manuel"
    {"raptor": {"use_raptor": False}}
  • chunk_method="table"
  • chunk_method="paper"
    {"raptor": {"use_raptor": False}}
  • chunk_method="book"
    {"raptor": {"use_raptor": False}}
  • chunk_method="laws"
    {"raptor": {"use_raptor": False}}
  • chunk_method="picture"
  • chunk_method="presentation"
    {"raptor": {"use_raptor": False}}
  • chunk_method="one"
  • chunk_method="knowledge-graph"
    {"chunk_token_num":128,"delimiter":"\\n","entity_types":["organization","person","location","event","time"]}
  • chunk_method="email"

返回

  • 成功:一个 dataset 对象。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="kb_1")

删除数据集

RAGFlow.delete_datasets(ids: list[str] | None = None)

通过 ID 删除数据集。

参数

ids: list[str]None, 必需

要删除的数据集的 ID。默认为 None

  • 如果为 None,将删除所有数据集。
  • 如果是一个 ID 数组,将只删除指定的数据集。
  • 如果是一个空数组,将不会删除任何数据集。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

rag_object.delete_datasets(ids=["d94a8dc02c9711f0930f7fbc369eab6d","e94a8dc02c9711f0930f7fbc369eab6e"])

列出数据集

RAGFlow.list_datasets(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
name: str = None
) -> list[DataSet]

列出数据集。

参数

page: int

指定显示数据集的页面。默认为 1

page_size: int

每页的数据集数量。默认为 30

orderby: str

数据集排序的字段。可用选项

  • "create_time" (默认)
  • "update_time"
desc: bool

指示检索到的数据集是否应按降序排序。默认为 True

id: str

要检索的数据集的 ID。默认为 None

name: str

要检索的数据集的名称。默认为 None

返回

  • 成功:DataSet 对象列表。
  • 失败:Exception

示例

列出所有数据集
for dataset in rag_object.list_datasets():
print(dataset)
通过 ID 检索数据集
dataset = rag_object.list_datasets(id = "id_1")
print(dataset[0])

更新数据集

DataSet.update(update_message: dict)

更新当前数据集的配置。

参数

update_message: dict[str, str|int], 必需

表示要更新的属性的字典,包含以下键

  • "name": str 数据集的修订名称。
    • 仅限基本多文种平面(BMP)
    • 最多 128 个字符
    • 不区分大小写
  • "avatar":(正文参数),string
    更新后的头像 Base64 编码。
    • 最多 65535 个字符
  • "embedding_model":(正文参数),string
    更新后的嵌入模型名称。
    • 在更新 "embedding_model" 之前,请确保 "chunk_count"0
    • 最多 255 个字符
    • 必须遵循 model_name@model_factory 格式
  • "permission":(正文参数),string
    更新后的数据集权限。可用选项
    • "me":(默认)只有您可以管理数据集。
    • "team":所有团队成员都可以管理数据集。
  • "pagerank":(正文参数),int
    请参阅设置页面排名
    • 默认:0
    • 最小:0
    • 最大:100
  • "chunk_method":(正文参数),enum<string>
    数据集的分块方法。可用选项
    • "naive":通用(默认)
    • "book":书籍
    • "email":邮件
    • "laws":法律
    • "manual":手动
    • "one":整体
    • "paper":论文
    • "picture":图片
    • "presentation":演示文稿
    • "qa":问答
    • "table":表格
    • "tag":标签

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(name="kb_name")
dataset = dataset[0]
dataset.update({"embedding_model":"BAAI/bge-zh-v1.5", "chunk_method":"manual"})

数据集内的文件管理


上传文档

DataSet.upload_documents(document_list: list[dict])

将文档上传到当前数据集。

参数

document_list: list[dict], 必需

表示要上传的文档的字典列表,每个字典包含以下键

  • "display_name":(可选)数据集中显示的文件名。
  • "blob":(可选)要上传文件的二进制内容。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

dataset = rag_object.create_dataset(name="kb_name")
dataset.upload_documents([{"display_name": "1.txt", "blob": "<BINARY_CONTENT_OF_THE_DOC>"}, {"display_name": "2.pdf", "blob": "<BINARY_CONTENT_OF_THE_DOC>"}])

更新文档

Document.update(update_message:dict)

更新当前文档的配置。

参数

update_message: dict[str, str|dict[]], 必需

表示要更新的属性的字典,包含以下键

  • "display_name": str 要更新的文档名称。
  • "meta_fields": dict[str, Any] 文档的元字段。
  • "chunk_method": str 应用于文档的解析方法。
    • "naive":通用
    • "manual:手动
    • "qa":问答
    • "table":表格
    • "paper":论文
    • "book":书籍
    • "laws":法律
    • "presentation":演示文稿
    • "picture":图片
    • "one":整体
    • "email":邮件
  • "parser_config": dict[str, Any] 文档的解析配置。其属性根据所选的 "chunk_method" 而变化
    • "chunk_method"="naive"
      {"chunk_token_num":128,"delimiter":"\\n","html4excel":False,"layout_recognize":True,"raptor":{"use_raptor":False}}.
    • chunk_method="qa"
      {"raptor": {"use_raptor": False}}
    • chunk_method="manuel"
      {"raptor": {"use_raptor": False}}
    • chunk_method="table"
    • chunk_method="paper"
      {"raptor": {"use_raptor": False}}
    • chunk_method="book"
      {"raptor": {"use_raptor": False}}
    • chunk_method="laws"
      {"raptor": {"use_raptor": False}}
    • chunk_method="presentation"
      {"raptor": {"use_raptor": False}}
    • chunk_method="picture"
    • chunk_method="one"
    • chunk_method="knowledge-graph"
      {"chunk_token_num":128,"delimiter":"\\n","entity_types":["organization","person","location","event","time"]}
    • chunk_method="email"

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id='id')
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
doc.update([{"parser_config": {"chunk_token_num": 256}}, {"chunk_method": "manual"}])

下载文档

Document.download() -> bytes

下载当前文档。

返回

下载的文档(字节)。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id="id")
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
open("~/ragflow.txt", "wb+").write(doc.download())
print(doc)

列出文档

Dataset.list_documents(
id: str = None,
keywords: str = None,
page: int = 1,
page_size: int = 30,
order_by: str = "create_time",
desc: bool = True,
create_time_from: int = 0,
create_time_to: int = 0
) -> list[Document]

列出当前数据集中的文档。

参数

id: str

要检索的文档的 ID。默认为 None

keywords: str

用于匹配文档标题的关键字。默认为 None

page: int

指定显示文档的页面。默认为 1

page_size: int

每页的最大文档数。默认为 30

orderby: str

文档排序的字段。可用选项

  • "create_time" (默认)
  • "update_time"
desc: bool

指示检索到的文档是否应按降序排序。默认为 True

create_time_from: int

用于筛选此时间之后创建的文档的 Unix 时间戳。0 表示不过滤。默认为 0。

create_time_to: int

用于筛选此时间之前创建的文档的 Unix 时间戳。0 表示不过滤。默认为 0。

返回

  • 成功:Document 对象列表。
  • 失败:Exception

一个 Document 对象包含以下属性

  • id:文档 ID。默认为 ""
  • name:文档名称。默认为 ""
  • thumbnail:文档的缩略图。默认为 None
  • dataset_id:与文档关联的数据集 ID。默认为 None
  • chunk_method 分块方法名称。默认为 "naive"
  • source_type:文档的源类型。默认为 "local"
  • type:文档的类型或类别。默认为 ""。保留供将来使用。
  • created_by: str 文档的创建者。默认为 ""
  • size: int 文档大小(字节)。默认为 0
  • token_count: int 文档中的令牌数。默认为 0
  • chunk_count: int 文档中的块数。默认为 0
  • progress: float 当前处理进度的百分比。默认为 0.0
  • progress_msg: str 指示当前进度状态的消息。默认为 ""
  • process_begin_at: datetime 文档处理的开始时间。默认为 None
  • process_duration: float 处理持续时间(秒)。默认为 0.0
  • run: str 文档的处理状态
    • "UNSTART" (默认)
    • "RUNNING"
    • "CANCEL"
    • "DONE"
    • "FAIL"
  • status: str 保留供将来使用。
  • parser_config: ParserConfig 解析器的配置对象。其属性根据所选的 chunk_method 而变化
    • chunk_method="naive"
      {"chunk_token_num":128,"delimiter":"\\n","html4excel":False,"layout_recognize":True,"raptor":{"use_raptor":False}}.
    • chunk_method="qa"
      {"raptor": {"use_raptor": False}}
    • chunk_method="manuel"
      {"raptor": {"use_raptor": False}}
    • chunk_method="table"
    • chunk_method="paper"
      {"raptor": {"use_raptor": False}}
    • chunk_method="book"
      {"raptor": {"use_raptor": False}}
    • chunk_method="laws"
      {"raptor": {"use_raptor": False}}
    • chunk_method="presentation"
      {"raptor": {"use_raptor": False}}
    • chunk_method="picure"
    • chunk_method="one"
    • chunk_method="email"

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="kb_1")

filename1 = "~/ragflow.txt"
blob = open(filename1 , "rb").read()
dataset.upload_documents([{"name":filename1,"blob":blob}])
for doc in dataset.list_documents(keywords="rag", page=0, page_size=12):
print(doc)

删除文档

DataSet.delete_documents(ids: list[str] = None)

通过 ID 删除文档。

参数

ids: list[list]

要删除的文档的 ID。默认为 None。如果未指定,将删除数据集中的所有文档。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(name="kb_1")
dataset = dataset[0]
dataset.delete_documents(ids=["id_1","id_2"])

解析文档

DataSet.async_parse_documents(document_ids:list[str]) -> None

解析当前数据集中的文档。

参数

document_ids: list[str], 必需

要解析的文档的 ID。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="dataset_name")
documents = [
{'display_name': 'test1.txt', 'blob': open('./test_data/test1.txt',"rb").read()},
{'display_name': 'test2.txt', 'blob': open('./test_data/test2.txt',"rb").read()},
{'display_name': 'test3.txt', 'blob': open('./test_data/test3.txt',"rb").read()}
]
dataset.upload_documents(documents)
documents = dataset.list_documents(keywords="test")
ids = []
for document in documents:
ids.append(document.id)
dataset.async_parse_documents(ids)
print("Async bulk parsing initiated.")

停止解析文档

DataSet.async_cancel_parse_documents(document_ids:list[str])-> None

停止解析指定的文档。

参数

document_ids: list[str], 必需

应停止解析的文档的 ID。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="dataset_name")
documents = [
{'display_name': 'test1.txt', 'blob': open('./test_data/test1.txt',"rb").read()},
{'display_name': 'test2.txt', 'blob': open('./test_data/test2.txt',"rb").read()},
{'display_name': 'test3.txt', 'blob': open('./test_data/test3.txt',"rb").read()}
]
dataset.upload_documents(documents)
documents = dataset.list_documents(keywords="test")
ids = []
for document in documents:
ids.append(document.id)
dataset.async_parse_documents(ids)
print("Async bulk parsing initiated.")
dataset.async_cancel_parse_documents(ids)
print("Async bulk parsing cancelled.")

数据集内的块管理


添加块

Document.add_chunk(content:str, important_keywords:list[str] = []) -> Chunk

向当前文档添加一个块。

参数

content: str, 必需

块的文本内容。

important_keywords: list[str]

要用块标记的关键字或短语。

返回

  • 成功:一个 Chunk 对象。
  • 失败:Exception

一个 Chunk 对象包含以下属性

  • id: str:块 ID。
  • content: str 块的文本内容。
  • important_keywords: list[str] 用块标记的关键字或短语列表。
  • create_time: str 块创建的时间(添加到文档的时间)。
  • create_timestamp: float 表示块创建时间的时间戳,以自 1970 年 1 月 1 日以来的秒数表示。
  • dataset_id: str 关联数据集的 ID。
  • document_name: str 关联文档的名称。
  • document_id: str 关联文档的 ID。
  • available: bool 块在数据集中的可用性状态。值选项
    • False:不可用
    • True:可用(默认)

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
datasets = rag_object.list_datasets(id="123")
dataset = datasets[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
chunk = doc.add_chunk(content="xxxxxxx")

列出块

Document.list_chunks(keywords: str = None, page: int = 1, page_size: int = 30, id : str = None) -> list[Chunk]

列出当前文档中的块。

参数

keywords: str

用于匹配块内容的关键字。默认为 None

page: int

指定显示块的页面。默认为 1

page_size: int

每页的最大块数。默认为 30

id: str

要检索的块的 ID。默认:None

返回

  • 成功:Chunk 对象列表。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets("123")
dataset = dataset[0]
docs = dataset.list_documents(keywords="test", page=1, page_size=12)
for chunk in docs[0].list_chunks(keywords="rag", page=0, page_size=12):
print(chunk)

删除块

Document.delete_chunks(chunk_ids: list[str])

通过 ID 删除块。

参数

chunk_ids: list[str]

要删除的块的 ID。默认为 None。如果未指定,将删除当前文档的所有块。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id="123")
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
chunk = doc.add_chunk(content="xxxxxxx")
doc.delete_chunks(["id_1","id_2"])

更新块

Chunk.update(update_message: dict)

更新当前块的内容或配置。

参数

update_message: dict[str, str|list[str]|int] 必需

表示要更新的属性的字典,包含以下键

  • "content": str 块的文本内容。
  • "important_keywords": list[str] 要用块标记的关键字或短语列表。
  • "available": bool 块在数据集中的可用性状态。值选项
    • False:不可用
    • True:可用(默认)

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id="123")
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
chunk = doc.add_chunk(content="xxxxxxx")
chunk.update({"content":"sdfx..."})

检索块

RAGFlow.retrieve(question:str="", dataset_ids:list[str]=None, document_ids=list[str]=None, page:int=1, page_size:int=30, similarity_threshold:float=0.2, vector_similarity_weight:float=0.3, top_k:int=1024,rerank_id:str=None,keyword:bool=False,highlight:bool=False) -> list[Chunk]

从指定的数据集中检索块。

参数

question: str, 必需

用户查询或查询关键字。默认为 ""

dataset_ids: list[str], 必需

要搜索的数据集的 ID。默认为 None

document_ids: list[str]

要搜索的文档的 ID。默认为 None。您必须确保所有选定的文档都使用相同的嵌入模型。否则,将会发生错误。

page: int

要检索的文档的起始索引。默认为 1

page_size: int

要检索的最大块数。默认为 30

Similarity_threshold: float

最小相似度分数。默认为 0.2

vector_similarity_weight: float

向量余弦相似度的权重。默认为 0.3。如果 x 表示向量余弦相似度,则 (1 - x) 是词项相似度的权重。

top_k: int

参与向量余弦计算的块数。默认为 1024

rerank_id: str

重排模型的 ID。默认为 None

keyword: bool

指示是否启用基于关键字的匹配

  • True:启用基于关键字的匹配。
  • False:禁用基于关键字的匹配(默认)。
highlight: bool

指定是否在结果中启用匹配词项的高亮显示

  • True:启用匹配词项的高亮显示。
  • False:禁用匹配词项的高亮显示(默认)。
cross_languages: list[string]

应翻译成的语言,以实现不同语言的关键字检索。

返回

  • 成功:表示文档块的 Chunk 对象列表。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(name="ragflow")
dataset = dataset[0]
name = 'ragflow_test.txt'
path = './test_data/ragflow_test.txt'
documents =[{"display_name":"test_retrieve_chunks.txt","blob":open(path, "rb").read()}]
docs = dataset.upload_documents(documents)
doc = docs[0]
doc.add_chunk(content="This is a chunk addition test")
for c in rag_object.retrieve(dataset_ids=[dataset.id],document_ids=[doc.id]):
print(c)

聊天助手管理


创建聊天助手

RAGFlow.create_chat(
name: str,
avatar: str = "",
dataset_ids: list[str] = [],
llm: Chat.LLM = None,
prompt: Chat.Prompt = None
) -> Chat

创建一个聊天助手。

参数

name: str, 必需

聊天助手的名称。

avatar: str

头像的 Base64 编码。默认为 ""

dataset_ids: list[str]

关联数据集的 ID。默认为 [""]

llm: Chat.LLM

要创建的聊天助手的 LLM 设置。默认为 None。当值为 None 时,将生成一个具有以下值的字典作为默认值。一个 LLM 对象包含以下属性

  • model_name: str
    聊天模型的名称。如果为 None,将使用用户的默认聊天模型。
  • temperature: float
    控制模型预测的随机性。较低的温度会产生更保守的响应,而较高的温度会产生更具创造性和多样性的响应。默认为 0.1
  • top_p: float
    也称为“核心采样”,此参数设置一个阈值,以从中选择一个较小的词集进行采样。它专注于最可能的词,并ตัด掉不太可能的词。默认为 0.3
  • presence_penalty: float
    通过惩罚对话中已出现的词,阻止模型重复相同的信息。默认为 0.2
  • frequency penalty: float
    与存在惩罚类似,这会降低模型频繁重复相同词的倾向。默认为 0.7
prompt: Chat.Prompt

LLM 要遵循的指令。一个 Prompt 对象包含以下属性

  • similarity_threshold: float RAGFlow 在检索期间采用加权关键字相似度和加权向量余弦相似度的组合,或加权关键字相似度和加权重排分数的组合。如果相似度分数低于此阈值,相应的块将从结果中排除。默认值为 0.2
  • keywords_similarity_weight: float 此参数设置混合相似度分数中关键字相似度的权重,该分数与向量余弦相似度或重排模型相似度相关。通过调整此权重,您可以控制关键字相似度相对于其他相似度度量的影响。默认值为 0.7
  • top_n: int 此参数指定馈送到 LLM 的相似度分数高于 similarity_threshold 的顶部块的数量。LLM 将访问这些“top N”块。默认值为 8
  • variables: list[dict[]] 此参数列出了在聊天配置的“系统”字段中使用的变量。请注意
    • knowledge 是一个保留变量,表示检索到的块。
    • “系统”中的所有变量都应用花括号括起来。
    • 默认值为 [{"key": "knowledge", "optional": True}]
  • rerank_model: str 如果未指定,将使用向量余弦相似度;否则,将使用重排分数。默认为 ""
  • top_k: int 指的是根据特定的排名标准对列表或集合中的项目进行重新排序或选择 top-k 项的过程。默认为 1024。
  • empty_response: str 如果数据集中未检索到用户问题的任何内容,这将用作响应。要允许 LLM 在未找到任何内容时即兴发挥,请将此项留空。默认为 None
  • opener: str 对用户的开场白。默认为 "Hi! I am your assistant, can I help you?"
  • show_quote: bool 指示是否应显示文本来源。默认为 True
  • prompt: str 提示内容。

返回

  • 成功:表示聊天助手的 Chat 对象。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
datasets = rag_object.list_datasets(name="kb_1")
dataset_ids = []
for dataset in datasets:
dataset_ids.append(dataset.id)
assistant = rag_object.create_chat("Miss R", dataset_ids=dataset_ids)

更新聊天助手

Chat.update(update_message: dict)

更新当前聊天助手的配置。

参数

update_message: dict[str, str|list[str]|dict[]], 必需

表示要更新的属性的字典,包含以下键

  • "name": str 聊天助手的修订名称。
  • "avatar": str 头像的 Base64 编码。默认为 ""
  • "dataset_ids": list[str] 要更新的数据集。
  • "llm": dict LLM 设置
    • "model_name", str 聊天模型名称。
    • "temperature", float 控制模型预测的随机性。较低的温度会产生更保守的响应,而较高的温度会产生更具创造性和多样性的响应。
    • "top_p", float 也称为“核心采样”,此参数设置一个阈值,以从中选择一个较小的词集进行采样。
    • "presence_penalty", float 通过惩罚对话中已出现的词,阻止模型重复相同的信息。
    • "frequency penalty", float 与存在惩罚类似,这会降低模型重复相同词的倾向。
  • "prompt" : LLM 要遵循的指令。
    • "similarity_threshold": float RAGFlow 在检索期间采用加权关键字相似度和加权向量余弦相似度的组合,或加权关键字相似度和加权重排分数的组合。此参数设置用户查询和块之间相似度的阈值。如果相似度分数低于此阈值,相应的块将从结果中排除。默认值为 0.2
    • "keywords_similarity_weight": float 此参数设置混合相似度分数中关键字相似度的权重,该分数与向量余弦相似度或重排模型相似度相关。通过调整此权重,您可以控制关键字相似度相对于其他相似度度量的影响。默认值为 0.7
    • "top_n": int 此参数指定馈送到 LLM 的相似度分数高于 similarity_threshold 的顶部块的数量。LLM 将访问这些“top N”块。默认值为 8
    • "variables": list[dict[]] 此参数列出了在聊天配置的“系统”字段中使用的变量。请注意
      • knowledge 是一个保留变量,表示检索到的块。
      • “系统”中的所有变量都应用花括号括起来。
      • 默认值为 [{"key": "knowledge", "optional": True}]
    • "rerank_model": str 如果未指定,将使用向量余弦相似度;否则,将使用重排分数。默认为 ""
    • "empty_response": str 如果数据集中未检索到用户问题的任何内容,这将用作响应。要允许 LLM 在未检索到任何内容时即兴发挥,请将此项留空。默认为 None
    • "opener": str 对用户的开场白。默认为 "Hi! I am your assistant, can I help you?"
    • "show_quote: bool 指示是否应显示文本来源。默认为 True
    • "prompt": str 提示内容。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
datasets = rag_object.list_datasets(name="kb_1")
dataset_id = datasets[0].id
assistant = rag_object.create_chat("Miss R", dataset_ids=[dataset_id])
assistant.update({"name": "Stefan", "llm": {"temperature": 0.8}, "prompt": {"top_n": 8}})

删除聊天助手

RAGFlow.delete_chats(ids: list[str] = None)

通过 ID 删除聊天助手。

参数

ids: list[str]

要删除的聊天助手的 ID。默认为 None。如果为空或未指定,将删除系统中的所有聊天助手。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.delete_chats(ids=["id_1","id_2"])

列出聊天助手

RAGFlow.list_chats(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
name: str = None
) -> list[Chat]

列出聊天助手。

参数

page: int

指定显示聊天助手的页面。默认为 1

page_size: int

每页的聊天助手数量。默认为 30

orderby: str

对结果进行排序的属性。可用选项

  • "create_time" (默认)
  • "update_time"
desc: bool

指示检索到的聊天助手是否应按降序排序。默认为 True

id: str

要检索的聊天助手的 ID。默认为 None

name: str

要检索的聊天助手的名称。默认为 None

返回

  • 成功:Chat 对象列表。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
for assistant in rag_object.list_chats():
print(assistant)

会话管理


与聊天助手创建会话

Chat.create_session(name: str = "New session") -> Session

与当前聊天助手创建一个会话。

参数

name: str

要创建的聊天会话的名称。

返回

  • 成功:包含以下属性的 Session 对象
    • id: str 创建的会话的自动生成的唯一标识符。
    • name: str 创建的会话的名称。
    • message: list[Message] 创建的会话的开场消息。默认:[{"role": "assistant", "content": "Hi! I am your assistant, can I help you?"}]
    • chat_id: str 关联聊天助手的 ID。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
session = assistant.create_session()

更新聊天助手的会话

Session.update(update_message: dict)

更新当前聊天助手的当前会话。

参数

update_message: dict[str, Any], 必需

表示要更新的属性的字典,只有一个键

  • "name": str 会话的修订名称。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
session = assistant.create_session("session_name")
session.update({"name": "updated_name"})

列出聊天助手的会话

Chat.list_sessions(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
name: str = None
) -> list[Session]

列出与当前聊天助手关联的会话。

参数

page: int

指定显示会话的页面。默认为 1

page_size: int

每页的会话数量。默认为 30

orderby: str

会话排序的字段。可用选项

  • "create_time" (默认)
  • "update_time"
desc: bool

指示检索到的会话是否应按降序排序。默认为 True

id: str

要检索的聊天会话的 ID。默认为 None

name: str

要检索的聊天会话的名称。默认为 None

返回

  • 成功:与当前聊天助手关联的 Session 对象列表。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
for session in assistant.list_sessions():
print(session)

删除聊天助手的会话

Chat.delete_sessions(ids:list[str] = None)

通过 ID 删除当前聊天助手的会话。

参数

ids: list[str]

要删除的会话的 ID。默认为 None。如果未指定,将删除与当前聊天助手关联的所有会话。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
assistant.delete_sessions(ids=["id_1","id_2"])

与聊天助手对话

Session.ask(question: str = "", stream: bool = False, **kwargs) -> Optional[Message, iter[Message]]

向指定的聊天助手提问,以开始一次由人工智能驱动的对话。

注意

在流模式下,并非所有响应都包含引用,这取决于系统的判断。

参数

question: str, 必需

开始由人工智能驱动的对话的问题。默认为 ""

stream: bool

指示是否以流方式输出响应

  • True:启用流(默认)。
  • False:禁用流。
**kwargs

提示(系统)中的参数。

返回

  • 如果 stream 设置为 False,则为包含问题响应的 Message 对象。
  • 如果 stream 设置为 True,则为包含多个 message 对象的迭代器(iter[Message]

以下显示了 Message 对象的属性

id: str

自动生成的消息 ID。

content: str

消息的内容。默认为 "Hi! I am your assistant, can I help you?"

reference: list[Chunk]

表示对消息引用的 Chunk 对象列表,每个对象包含以下属性

  • id str
    块 ID。
  • content str
    块的内容。
  • img_id str
    块快照的 ID。仅当块的来源是图像、PPT、PPTX 或 PDF 文件时适用。
  • document_id str
    引用的文档 ID。
  • document_name str
    引用的文档名称。
  • position list[str]
    块在引用文档中的位置信息。
  • dataset_id str
    引用文档所属的数据集 ID。
  • similarity float
    块的复合相似度分数,范围从 01,值越高表示相似度越大。它是 vector_similarityterm_similarity 的加权和。
  • vector_similarity float
    块的向量相似度分数,范围从 01,值越高表示向量嵌入之间的相似度越大。
  • term_similarity float
    块的关键字相似度分数,范围从 01,值越高表示关键字之间的相似度越大。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
session = assistant.create_session()

print("\n==================== Miss R =====================\n")
print("Hello. What can I do for you?")

while True:
question = input("\n==================== User =====================\n> ")
print("\n==================== Miss R =====================\n")

cont = ""
for ans in session.ask(question, stream=True):
print(ans.content[len(cont):], end='', flush=True)
cont = ans.content

与智能体创建会话

Agent.create_session(**kwargs) -> Session

与当前智能体创建一个会话。

参数

**kwargs

begin 组件中的参数。

返回

  • 成功:包含以下属性的 Session 对象
    • id: str 创建的会话的自动生成的唯一标识符。
    • message: list[Message] 创建的会话助手的消息。默认:[{"role": "assistant", "content": "Hi! I am your assistant, can I help you?"}]
    • agent_id: str 关联智能体的 ID。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow, Agent

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
agent_id = "AGENT_ID"
agent = rag_object.list_agents(id = agent_id)[0]
session = agent.create_session()

与智能体对话

Session.ask(question: str="", stream: bool = False) -> Optional[Message, iter[Message]]

向指定的智能体提问,以开始一次由人工智能驱动的对话。

注意

在流模式下,并非所有响应都包含引用,这取决于系统的判断。

参数

question: str

开始由人工智能驱动的对话的问题。如果开始组件接受参数,则不需要问题。

stream: bool

指示是否以流方式输出响应

  • True:启用流(默认)。
  • False:禁用流。

返回

  • 如果 stream 设置为 False,则为包含问题响应的 Message 对象
  • 如果 stream 设置为 True,则为包含多个 message 对象的迭代器(iter[Message]

以下显示了 Message 对象的属性

id: str

自动生成的消息 ID。

content: str

消息的内容。默认为 "Hi! I am your assistant, can I help you?"

reference: list[Chunk]

表示对消息引用的 Chunk 对象列表,每个对象包含以下属性

  • id str
    块 ID。
  • content str
    块的内容。
  • image_id str
    块快照的 ID。仅当块的来源是图像、PPT、PPTX 或 PDF 文件时适用。
  • document_id str
    引用的文档 ID。
  • document_name str
    引用的文档名称。
  • position list[str]
    块在引用文档中的位置信息。
  • dataset_id str
    引用文档所属的数据集 ID。
  • similarity float
    块的复合相似度分数,范围从 01,值越高表示相似度越大。它是 vector_similarityterm_similarity 的加权和。
  • vector_similarity float
    块的向量相似度分数,范围从 01,值越高表示向量嵌入之间的相似度越大。
  • term_similarity float
    块的关键字相似度分数,范围从 01,值越高表示关键字之间的相似度越大。

示例

from ragflow_sdk import RAGFlow, Agent

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
AGENT_id = "AGENT_ID"
agent = rag_object.list_agents(id = AGENT_id)[0]
session = agent.create_session()

print("\n===== Miss R ====\n")
print("Hello. What can I do for you?")

while True:
question = input("\n===== User ====\n> ")
print("\n==== Miss R ====\n")

cont = ""
for ans in session.ask(question, stream=True):
print(ans.content[len(cont):], end='', flush=True)
cont = ans.content

列出智能体会话

Agent.list_sessions(
page: int = 1,
page_size: int = 30,
orderby: str = "update_time",
desc: bool = True,
id: str = None
) -> List[Session]

列出与当前智能体关联的会话。

参数

page: int

指定显示会话的页面。默认为 1

page_size: int

每页的会话数量。默认为 30

orderby: str

会话排序的字段。可用选项

  • "create_time"
  • "update_time"(默认)
desc: bool

指示检索到的会话是否应按降序排序。默认为 True

id: str

要检索的智能体会话的 ID。默认为 None

返回

  • 成功:与当前智能体关联的 Session 对象列表。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
AGENT_id = "AGENT_ID"
agent = rag_object.list_agents(id = AGENT_id)[0]
sessons = agent.list_sessions()
for session in sessions:
print(session)

删除智能体的会话

Agent.delete_sessions(ids: list[str] = None)

通过 ID 删除智能体的会话。

参数

ids: list[str]

要删除的会话的 ID。默认为 None。如果未指定,将删除与智能体关联的所有会话。

返回

  • 成功:不返回任何值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
AGENT_id = "AGENT_ID"
agent = rag_object.list_agents(id = AGENT_id)[0]
agent.delete_sessions(ids=["id_1","id_2"])

智能体管理


列出智能体

RAGFlow.list_agents(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
title: str = None
) -> List[Agent]

列出智能体。

参数

page: int

指定显示智能体的页面。默认为 1

page_size: int

每页的智能体数量。默认为 30

orderby: str

对结果进行排序的属性。可用选项

  • "create_time" (默认)
  • "update_time"
desc: bool

指示检索到的智能体是否应按降序排序。默认为 True

id: str

要检索的智能体的 ID。默认为 None

name: str

要检索的智能体的名称。默认为 None

返回

  • 成功:Agent 对象列表。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
for agent in rag_object.list_agents():
print(agent)

创建智能体

RAGFlow.create_agent(
title: str,
dsl: dict,
description: str | None = None
) -> None

创建一个智能体。

参数

title: str

指定智能体的标题。

dsl: dict

指定智能体的画布 DSL。

description: str

智能体的描述。默认为 None

返回

  • 成功:无。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.create_agent(
title="Test Agent",
description="A test agent",
dsl={
# ... canvas DSL here ...
}
)

更新智能体

RAGFlow.update_agent(
agent_id: str,
title: str | None = None,
description: str | None = None,
dsl: dict | None = None
) -> None

更新一个智能体。

参数

agent_id: str

指定要更新的智能体的 ID。

title: str

指定智能体的新标题。如果您不想更新此项,则为 None

dsl: dict

指定智能体的新画布 DSL。如果您不想更新此项,则为 None

description: str

智能体的新描述。如果您不想更新此项,则为 None

返回

  • 成功:无。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.update_agent(
agent_id="58af890a2a8911f0a71a11b922ed82d6",
title="Test Agent",
description="A test agent",
dsl={
# ... canvas DSL here ...
}
)

删除智能体

RAGFlow.delete_agent(
agent_id: str
) -> None

删除一个智能体。

参数

agent_id: str

指定要删除的智能体的 ID。

返回

  • 成功:无。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.delete_agent("58af890a2a8911f0a71a11b922ed82d6")