"""
title: print_uploads
author: admin
version: 0.1
"""
import os
import requests
from datetime import datetime
from typing import Callable
from open_webui.apps.webui.models.users import Users
from langchain_community.document_loaders import FileSystemBlobLoader
from langchain.docstore.document import Document
class Tools:
def __init__(self):
pass
async def print_if_exist(
self, prompt: str, __user__: dict, __event_emitter__=None
) -> str:
"""
print files
"""
#dir_path = "/app/backend/data/uploads"
try:
# Emit the status while reading the directory
await __event_emitter__(
{
"type": "status",
"data": {"description": "Reading directory...", "done": False},
}
)
print(prompt)
dir_path = prompt
# Check if the file exists
if os.path.exists(dir_path):
# Emit message with file content
#file_content = self.read_file_content(dir_path, file_name)
try:
# Use FileSystemBlobLoader to load txt files from the folder
loader = FileSystemBlobLoader(dir_path, glob="**/*")
documents = []
for blob in loader.yield_blobs():
file_path_str = str(blob.path)
with open(file_path_str, 'r', encoding="utf-8") as file:
text = file.read()
documents.append(Document(page_content=text, metadata={"source": file_path_str}))
print(text)
await __event_emitter__(
{
"type": "message",
"data": {"content": f"--- {file_path_str} ---\n{text}\n"},
}
)
return f"File '{dir_path}' done."
except Exception as e:
await __event_emitter__(
{
"type": "message",
"data": {"content": f"'{dir_path}' not found."},
}
)
return f"An error occurred while loading the file: {str(e)}"
else:
await __event_emitter__(
{
"type": "message",
"data": {"content": f"directory '{dir_path}' not found."},
}
)
# Done emitting statuses
await __event_emitter__(
{
"type": "status",
"data": {"description": "Completed task.", "done": True},
}
)
except Exception as e:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Error: {e}", "done": True},
}
)
return f"Error encountered: {e}"