"""
title: Elib Tool
description: Search the electronic library of the German Aerospace Center (DLR)
author: Jan-Timo Hesse
author_url: https://github.com/JTHesse/
funding_url: https://github.com/JTHesse/
version: 0.1.1
license: MIT
"""
import json
import requests
from pydantic import BaseModel, Field
from typing import Callable, Any
async def _get_elib_data(query: str, results_no: int) -> str:
query = query.replace(" ", "+")
r = requests.get(
f"https://elib.dlr.de/cgi/search/simple/export_dlr_JSON.js?q={query}&output=JSON"
)
data = r.json()
result_json = []
for entry in data[0:results_no]:
title = entry['title']
link = entry['uri']
abstract = ""
if 'abstract' in entry:
abstract = entry['abstract']
article = {
"title": title.strip(),
"url": link,
"content": abstract.strip()
}
result_json.append(article)
return result_json
def status_object(
description="Unknown State", status="in_progress", done=False
):
object = {
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
return object
def generate_excerpt(content, max_length=1000):
return (
content[:max_length] + "..." if len(content) > max_length else content
)
class Tools:
class Valves(BaseModel):
CITATION_LINKS: bool = Field(
default=True,
description="If True, send custom citations with links",
)
class UserValves(BaseModel):
ELIB_RESULT_NO: int = Field(
default=3,
description="The number of Elib Results",
)
def __init__(self):
self.valves = self.Valves()
pass
async def search_elib(
self,
query: str,
__user__: dict = {},
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Search the elib literature database and get the content of the relevant articles.
:params query: Web Query used in search engine.
:return: The content of the pages in json format.
"""
print(f"search elib with query: {query}")
await __event_emitter__(
status_object(
description="Retrieving Elib data",
done=False,
)
)
try:
results_json = await _get_elib_data(query, __user__["valves"].ELIB_RESULT_NO)
except Exception as e:
await __event_emitter__(
status_object(
status="error",
description=f"Error: {e}",
done=True,
)
)
return
if len(results_json) == 0:
await __event_emitter__(
status_object(
status="error",
description="No results found for query: " + query,
done=True,
)
)
return
if self.valves.CITATION_LINKS and __event_emitter__:
for result in results_json:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [generate_excerpt(result["content"])],
"metadata": [{"source": result["url"]}],
"source": {"name": result["title"]},
},
}
)
await __event_emitter__(
status_object(
status="complete",
description=f"Elib search completed. Retrieved content from {len(results_json)} pages",
done=True,
)
)
return json.dumps(results_json, ensure_ascii=False)