examples: gpt oss browser tool (#588)

---------

Co-authored-by: nicole pardal <nicolepardall@gmail.com>
This commit is contained in:
Parth Sareen 2025-09-24 15:40:53 -07:00 committed by GitHub
parent ab49a669cd
commit d967f048d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 615 additions and 376 deletions

View File

@ -36,8 +36,6 @@ See [ollama/docs/api.md](https://github.com/ollama/ollama/blob/main/docs/api.md)
- [gpt-oss-tools.py](gpt-oss-tools.py)
- [gpt-oss-tools-stream.py](gpt-oss-tools-stream.py)
- [gpt-oss-tools-browser.py](gpt-oss-tools-browser.py) - Using browser research tools with gpt-oss
- [gpt-oss-tools-browser-stream.py](gpt-oss-tools-browser-stream.py) - Using browser research tools with gpt-oss, with streaming enabled
### Web search
@ -48,6 +46,7 @@ export OLLAMA_API_KEY="your_api_key_here"
```
- [web-search.py](web-search.py)
- [web-search-gpt-oss.py](web-search-gpt-oss.py) - Using browser research tools with gpt-oss
#### MCP server

View File

@ -1,198 +0,0 @@
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "gpt-oss",
# "ollama",
# "rich",
# ]
# ///
import asyncio
import json
from typing import Iterator, Optional
from gpt_oss.tools.simple_browser import ExaBackend, SimpleBrowserTool
from openai_harmony import Author, Role, TextContent
from openai_harmony import Message as HarmonyMessage
from rich import print
from ollama import Client
from ollama._types import ChatResponse
_backend = ExaBackend(source='web')
_browser_tool = SimpleBrowserTool(backend=_backend)
def heading(text):
print(text)
print('=' * (len(text) + 3))
async def _browser_search_async(query: str, topn: int = 10, source: str | None = None) -> str:
# map Ollama message to Harmony format
harmony_message = HarmonyMessage(
author=Author(role=Role.USER),
content=[TextContent(text=json.dumps({'query': query, 'topn': topn}))],
recipient='browser.search',
)
result_text: str = ''
async for response in _browser_tool._process(harmony_message):
if response.content:
for content in response.content:
if isinstance(content, TextContent):
result_text += content.text
return result_text or f'No results for query: {query}'
async def _browser_open_async(id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, *, view_source: bool = False, source: str | None = None) -> str:
payload = {'id': id, 'cursor': cursor, 'loc': loc, 'num_lines': num_lines, 'view_source': view_source, 'source': source}
harmony_message = HarmonyMessage(
author=Author(role=Role.USER),
content=[TextContent(text=json.dumps(payload))],
recipient='browser.open',
)
result_text: str = ''
async for response in _browser_tool._process(harmony_message):
if response.content:
for content in response.content:
if isinstance(content, TextContent):
result_text += content.text
return result_text or f'Could not open: {id}'
async def _browser_find_async(pattern: str, cursor: int = -1) -> str:
payload = {'pattern': pattern, 'cursor': cursor}
harmony_message = HarmonyMessage(
author=Author(role=Role.USER),
content=[TextContent(text=json.dumps(payload))],
recipient='browser.find',
)
result_text: str = ''
async for response in _browser_tool._process(harmony_message):
if response.content:
for content in response.content:
if isinstance(content, TextContent):
result_text += content.text
return result_text or f'Pattern not found: {pattern}'
def browser_search(query: str, topn: int = 10, source: Optional[str] = None) -> str:
return asyncio.run(_browser_search_async(query=query, topn=topn, source=source))
def browser_open(id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, *, view_source: bool = False, source: Optional[str] = None) -> str:
return asyncio.run(_browser_open_async(id=id, cursor=cursor, loc=loc, num_lines=num_lines, view_source=view_source, source=source))
def browser_find(pattern: str, cursor: int = -1) -> str:
return asyncio.run(_browser_find_async(pattern=pattern, cursor=cursor))
# Schema definitions for each browser tool
browser_search_schema = {
'type': 'function',
'function': {
'name': 'browser.search',
},
}
browser_open_schema = {
'type': 'function',
'function': {
'name': 'browser.open',
},
}
browser_find_schema = {
'type': 'function',
'function': {
'name': 'browser.find',
},
}
available_tools = {
'browser.search': browser_search,
'browser.open': browser_open,
'browser.find': browser_find,
}
model = 'gpt-oss:20b'
print('Model: ', model, '\n')
prompt = 'What is Ollama?'
print('You: ', prompt, '\n')
messages = [{'role': 'user', 'content': prompt}]
client = Client()
# gpt-oss can call tools while "thinking"
# a loop is needed to call the tools and get the results
final = True
while True:
response_stream: Iterator[ChatResponse] = client.chat(
model=model,
messages=messages,
tools=[browser_search_schema, browser_open_schema, browser_find_schema],
options={'num_ctx': 8192}, # 8192 is the recommended lower limit for the context window
stream=True,
)
tool_calls = []
thinking = ''
content = ''
for chunk in response_stream:
if chunk.message.tool_calls:
tool_calls.extend(chunk.message.tool_calls)
if chunk.message.content:
if not (chunk.message.thinking or chunk.message.thinking == '') and final:
heading('\n\nFinal result: ')
final = False
print(chunk.message.content, end='', flush=True)
if chunk.message.thinking:
thinking += chunk.message.thinking
print(chunk.message.thinking, end='', flush=True)
if thinking != '':
messages.append({'role': 'assistant', 'content': thinking, 'tool_calls': tool_calls})
print()
if tool_calls:
for tool_call in tool_calls:
tool_name = tool_call.function.name
args = tool_call.function.arguments or {}
function_to_call = available_tools.get(tool_name)
if function_to_call:
heading(f'\nCalling tool: {tool_name}')
if args:
print(f'Arguments: {args}')
try:
result = function_to_call(**args)
print(f'Tool result: {result[:200]}')
if len(result) > 200:
heading('... [truncated]')
print()
result_message = {'role': 'tool', 'content': result, 'tool_name': tool_name}
messages.append(result_message)
except Exception as e:
err = f'Error from {tool_name}: {e}'
print(err)
messages.append({'role': 'tool', 'content': err, 'tool_name': tool_name})
else:
print(f'Tool {tool_name} not found')
else:
# no more tool calls, we can stop the loop
break

View File

@ -1,175 +0,0 @@
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "gpt-oss",
# "ollama",
# "rich",
# ]
# ///
import asyncio
import json
from typing import Optional
from gpt_oss.tools.simple_browser import ExaBackend, SimpleBrowserTool
from openai_harmony import Author, Role, TextContent
from openai_harmony import Message as HarmonyMessage
from ollama import Client
_backend = ExaBackend(source='web')
_browser_tool = SimpleBrowserTool(backend=_backend)
def heading(text):
print(text)
print('=' * (len(text) + 3))
async def _browser_search_async(query: str, topn: int = 10, source: str | None = None) -> str:
# map Ollama message to Harmony format
harmony_message = HarmonyMessage(
author=Author(role=Role.USER),
content=[TextContent(text=json.dumps({'query': query, 'topn': topn}))],
recipient='browser.search',
)
result_text: str = ''
async for response in _browser_tool._process(harmony_message):
if response.content:
for content in response.content:
if isinstance(content, TextContent):
result_text += content.text
return result_text or f'No results for query: {query}'
async def _browser_open_async(id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, *, view_source: bool = False, source: str | None = None) -> str:
payload = {'id': id, 'cursor': cursor, 'loc': loc, 'num_lines': num_lines, 'view_source': view_source, 'source': source}
harmony_message = HarmonyMessage(
author=Author(role=Role.USER),
content=[TextContent(text=json.dumps(payload))],
recipient='browser.open',
)
result_text: str = ''
async for response in _browser_tool._process(harmony_message):
if response.content:
for content in response.content:
if isinstance(content, TextContent):
result_text += content.text
return result_text or f'Could not open: {id}'
async def _browser_find_async(pattern: str, cursor: int = -1) -> str:
payload = {'pattern': pattern, 'cursor': cursor}
harmony_message = HarmonyMessage(
author=Author(role=Role.USER),
content=[TextContent(text=json.dumps(payload))],
recipient='browser.find',
)
result_text: str = ''
async for response in _browser_tool._process(harmony_message):
if response.content:
for content in response.content:
if isinstance(content, TextContent):
result_text += content.text
return result_text or f'Pattern not found: {pattern}'
def browser_search(query: str, topn: int = 10, source: Optional[str] = None) -> str:
return asyncio.run(_browser_search_async(query=query, topn=topn, source=source))
def browser_open(id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, *, view_source: bool = False, source: Optional[str] = None) -> str:
return asyncio.run(_browser_open_async(id=id, cursor=cursor, loc=loc, num_lines=num_lines, view_source=view_source, source=source))
def browser_find(pattern: str, cursor: int = -1) -> str:
return asyncio.run(_browser_find_async(pattern=pattern, cursor=cursor))
# Schema definitions for each browser tool
browser_search_schema = {
'type': 'function',
'function': {
'name': 'browser.search',
},
}
browser_open_schema = {
'type': 'function',
'function': {
'name': 'browser.open',
},
}
browser_find_schema = {
'type': 'function',
'function': {
'name': 'browser.find',
},
}
available_tools = {
'browser.search': browser_search,
'browser.open': browser_open,
'browser.find': browser_find,
}
model = 'gpt-oss:20b'
print('Model: ', model, '\n')
prompt = 'What is Ollama?'
print('You: ', prompt, '\n')
messages = [{'role': 'user', 'content': prompt}]
client = Client()
while True:
response = client.chat(
model=model,
messages=messages,
tools=[browser_search_schema, browser_open_schema, browser_find_schema],
options={'num_ctx': 8192}, # 8192 is the recommended lower limit for the context window
)
if hasattr(response.message, 'thinking') and response.message.thinking:
heading('Thinking')
print(response.message.thinking.strip() + '\n')
if hasattr(response.message, 'content') and response.message.content:
heading('Assistant')
print(response.message.content.strip() + '\n')
# add message to chat history
messages.append(response.message)
if response.message.tool_calls:
for tool_call in response.message.tool_calls:
tool_name = tool_call.function.name
args = tool_call.function.arguments or {}
function_to_call = available_tools.get(tool_name)
if not function_to_call:
print(f'Unknown tool: {tool_name}')
continue
try:
result = function_to_call(**args)
heading(f'Tool: {tool_name}')
if args:
print(f'Arguments: {args}')
print(result[:200])
if len(result) > 200:
print('... [truncated]')
print()
messages.append({'role': 'tool', 'content': result, 'tool_name': tool_name})
except Exception as e:
err = f'Error from {tool_name}: {e}'
print(err)
messages.append({'role': 'tool', 'content': err, 'tool_name': tool_name})
else:
# break on no more tool calls
break

View File

@ -0,0 +1,99 @@
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "ollama",
# ]
# ///
from typing import Any, Dict, List
from web_search_gpt_oss_helper import Browser
from ollama import Client
def main() -> None:
client = Client()
browser = Browser(initial_state=None, client=client)
def browser_search(query: str, topn: int = 10) -> str:
return browser.search(query=query, topn=topn)['pageText']
def browser_open(id: int | str | None = None, cursor: int = -1, loc: int = -1, num_lines: int = -1) -> str:
return browser.open(id=id, cursor=cursor, loc=loc, num_lines=num_lines)['pageText']
def browser_find(pattern: str, cursor: int = -1, **_: Any) -> str:
return browser.find(pattern=pattern, cursor=cursor)['pageText']
browser_search_schema = {
'type': 'function',
'function': {
'name': 'browser.search',
},
}
browser_open_schema = {
'type': 'function',
'function': {
'name': 'browser.open',
},
}
browser_find_schema = {
'type': 'function',
'function': {
'name': 'browser.find',
},
}
available_tools = {
'browser.search': browser_search,
'browser.open': browser_open,
'browser.find': browser_find,
}
query = "what is ollama's new engine"
print('Prompt:', query, '\n')
messages: List[Dict[str, Any]] = [{'role': 'user', 'content': query}]
while True:
resp = client.chat(
model='gpt-oss:120b-cloud',
messages=messages,
tools=[browser_search_schema, browser_open_schema, browser_find_schema],
think=True,
)
if resp.message.thinking:
print('Thinking:\n========\n')
print(resp.message.thinking + '\n')
if resp.message.content:
print('Response:\n========\n')
print(resp.message.content + '\n')
messages.append(resp.message)
if not resp.message.tool_calls:
break
for tc in resp.message.tool_calls:
tool_name = tc.function.name
args = tc.function.arguments or {}
print(f'Tool name: {tool_name}, args: {args}')
fn = available_tools.get(tool_name)
if not fn:
messages.append({'role': 'tool', 'content': f'Tool {tool_name} not found', 'tool_name': tool_name})
continue
try:
result_text = fn(**args)
print('Result: ', result_text[:200] + '...')
except Exception as e:
result_text = f'Error from {tool_name}: {e}'
messages.append({'role': 'tool', 'content': result_text, 'tool_name': tool_name})
if __name__ == '__main__':
main()

View File

@ -49,7 +49,7 @@ print('Query: ', query)
messages = [{'role': 'user', 'content': query}]
while True:
response = chat(model='qwen3', messages=messages, tools=[web_search, web_fetch], think=True)
response = chat(model='deepseek-v3.1:671b-cloud', messages=messages, tools=[web_search, web_fetch], think=True)
if response.message.thinking:
print('Thinking: ')
print(response.message.thinking + '\n\n')

View File

@ -0,0 +1,514 @@
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional, Protocol, Tuple
from urllib.parse import urlparse
from ollama import Client
@dataclass
class Page:
url: str
title: str
text: str
lines: List[str]
links: Dict[int, str]
fetched_at: datetime
@dataclass
class BrowserStateData:
page_stack: List[str] = field(default_factory=list)
view_tokens: int = 1024
url_to_page: Dict[str, Page] = field(default_factory=dict)
@dataclass
class WebSearchResult:
title: str
url: str
content: Dict[str, str]
class SearchClient(Protocol):
def search(self, queries: List[str], max_results: Optional[int] = None): ...
class CrawlClient(Protocol):
def crawl(self, urls: List[str]): ...
# ---- Constants ---------------------------------------------------------------
DEFAULT_VIEW_TOKENS = 1024
CAPPED_TOOL_CONTENT_LEN = 8000
# ---- Helpers ----------------------------------------------------------------
def cap_tool_content(text: str) -> str:
if not text:
return text
if len(text) <= CAPPED_TOOL_CONTENT_LEN:
return text
if CAPPED_TOOL_CONTENT_LEN <= 1:
return text[:CAPPED_TOOL_CONTENT_LEN]
return text[: CAPPED_TOOL_CONTENT_LEN - 1] + ''
def _safe_domain(u: str) -> str:
try:
parsed = urlparse(u)
host = parsed.netloc or u
return host.replace('www.', '') if host else u
except Exception:
return u
# ---- BrowserState ------------------------------------------------------------
class BrowserState:
def __init__(self, initial_state: Optional[BrowserStateData] = None):
self._data = initial_state or BrowserStateData(view_tokens=DEFAULT_VIEW_TOKENS)
def get_data(self) -> BrowserStateData:
return self._data
def set_data(self, data: BrowserStateData) -> None:
self._data = data
# ---- Browser ----------------------------------------------------------------
class Browser:
def __init__(
self,
initial_state: Optional[BrowserStateData] = None,
client: Optional[Client] = None,
):
self.state = BrowserState(initial_state)
self._client: Optional[Client] = client
def set_client(self, client: Client) -> None:
self._client = client
def get_state(self) -> BrowserStateData:
return self.state.get_data()
# ---- internal utils ----
def _save_page(self, page: Page) -> None:
data = self.state.get_data()
data.url_to_page[page.url] = page
data.page_stack.append(page.url)
self.state.set_data(data)
def _page_from_stack(self, url: str) -> Page:
data = self.state.get_data()
page = data.url_to_page.get(url)
if not page:
raise ValueError(f'Page not found for url {url}')
return page
def _join_lines_with_numbers(self, lines: List[str]) -> str:
result = []
for i, line in enumerate(lines):
result.append(f'L{i}: {line}')
return '\n'.join(result)
def _wrap_lines(self, text: str, width: int = 80) -> List[str]:
if width <= 0:
width = 80
src_lines = text.split('\n')
wrapped: List[str] = []
for line in src_lines:
if line == '':
wrapped.append('')
elif len(line) <= width:
wrapped.append(line)
else:
words = re.split(r'\s+', line)
if not words:
wrapped.append(line)
continue
curr = ''
for w in words:
test = (curr + ' ' + w) if curr else w
if len(test) > width and curr:
wrapped.append(curr)
curr = w
else:
curr = test
if curr:
wrapped.append(curr)
return wrapped
def _process_markdown_links(self, text: str) -> Tuple[str, Dict[int, str]]:
links: Dict[int, str] = {}
link_id = 0
multiline_pattern = re.compile(r'\[([^\]]+)\]\s*\n\s*\(([^)]+)\)')
text = multiline_pattern.sub(lambda m: f'[{m.group(1)}]({m.group(2)})', text)
text = re.sub(r'\s+', ' ', text)
link_pattern = re.compile(r'\[([^\]]+)\]\(([^)]+)\)')
def _repl(m: re.Match) -> str:
nonlocal link_id
link_text = m.group(1).strip()
link_url = m.group(2).strip()
domain = _safe_domain(link_url)
formatted = f'{link_id}{link_text}{domain}'
links[link_id] = link_url
link_id += 1
return formatted
processed = link_pattern.sub(_repl, text)
return processed, links
def _get_end_loc(self, loc: int, num_lines: int, total_lines: int, lines: List[str]) -> int:
if num_lines <= 0:
txt = self._join_lines_with_numbers(lines[loc:])
data = self.state.get_data()
chars_per_token = 4
max_chars = min(data.view_tokens * chars_per_token, len(txt))
num_lines = txt[:max_chars].count('\n') + 1
return min(loc + num_lines, total_lines)
def _display_page(self, page: Page, cursor: int, loc: int, num_lines: int) -> str:
total_lines = len(page.lines) or 0
if total_lines == 0:
page.lines = ['']
total_lines = 1
if loc != loc or loc < 0:
loc = 0
elif loc >= total_lines:
loc = max(0, total_lines - 1)
end_loc = self._get_end_loc(loc, num_lines, total_lines, page.lines)
header = f'[{cursor}] {page.title}'
header += f'({page.url})\n' if page.url else '\n'
header += f'**viewing lines [{loc} - {end_loc - 1}] of {total_lines - 1}**\n\n'
body_lines = []
for i in range(loc, end_loc):
body_lines.append(f'L{i}: {page.lines[i]}')
return header + '\n'.join(body_lines)
# ---- page builders ----
def _build_search_results_page_collection(self, query: str, results: Dict[str, Any]) -> Page:
page = Page(
url=f'search_results_{query}',
title=query,
text='',
lines=[],
links={},
fetched_at=datetime.utcnow(),
)
tb = []
tb.append('')
tb.append('# Search Results')
tb.append('')
link_idx = 0
for query_results in results.get('results', {}).values():
for result in query_results:
domain = _safe_domain(result.get('url', ''))
link_fmt = f'* 【{link_idx}{result.get("title", "")}{domain}'
tb.append(link_fmt)
raw_snip = result.get('content') or ''
capped = (raw_snip[:400] + '') if len(raw_snip) > 400 else raw_snip
cleaned = re.sub(r'\d{40,}', lambda m: m.group(0)[:40] + '', capped)
cleaned = re.sub(r'\s{3,}', ' ', cleaned)
tb.append(cleaned)
page.links[link_idx] = result.get('url', '')
link_idx += 1
page.text = '\n'.join(tb)
page.lines = self._wrap_lines(page.text, 80)
return page
def _build_search_result_page(self, result: WebSearchResult, link_idx: int) -> Page:
page = Page(
url=result.url,
title=result.title,
text='',
lines=[],
links={},
fetched_at=datetime.utcnow(),
)
link_fmt = f'{link_idx}{result.title}\n'
preview = link_fmt + f'URL: {result.url}\n'
full_text = result.content.get('fullText', '') if result.content else ''
preview += full_text[:300] + '\n\n'
if not full_text:
page.links[link_idx] = result.url
if full_text:
raw = f'URL: {result.url}\n{full_text}'
processed, links = self._process_markdown_links(raw)
page.text = processed
page.links = links
else:
page.text = preview
page.lines = self._wrap_lines(page.text, 80)
return page
def _build_page_from_fetch(self, requested_url: str, fetch_response: Dict[str, Any]) -> Page:
page = Page(
url=requested_url,
title=requested_url,
text='',
lines=[],
links={},
fetched_at=datetime.utcnow(),
)
for url, url_results in fetch_response.get('results', {}).items():
if url_results:
r0 = url_results[0]
if r0.get('content'):
page.text = r0['content']
if r0.get('title'):
page.title = r0['title']
page.url = url
break
if not page.text:
page.text = 'No content could be extracted from this page.'
else:
page.text = f'URL: {page.url}\n{page.text}'
processed, links = self._process_markdown_links(page.text)
page.text = processed
page.links = links
page.lines = self._wrap_lines(page.text, 80)
return page
def _build_find_results_page(self, pattern: str, page: Page) -> Page:
find_page = Page(
url=f'find_results_{pattern}',
title=f'Find results for text: `{pattern}` in `{page.title}`',
text='',
lines=[],
links={},
fetched_at=datetime.utcnow(),
)
max_results = 50
num_show_lines = 4
pattern_lower = pattern.lower()
result_chunks: List[str] = []
line_idx = 0
while line_idx < len(page.lines):
line = page.lines[line_idx]
if pattern_lower not in line.lower():
line_idx += 1
continue
end_line = min(line_idx + num_show_lines, len(page.lines))
snippet = '\n'.join(page.lines[line_idx:end_line])
link_fmt = f'{len(result_chunks)}†match at L{line_idx}'
result_chunks.append(f'{link_fmt}\n{snippet}')
if len(result_chunks) >= max_results:
break
line_idx += num_show_lines
if not result_chunks:
find_page.text = f'No `find` results for pattern: `{pattern}`'
else:
find_page.text = '\n\n'.join(result_chunks)
find_page.lines = self._wrap_lines(find_page.text, 80)
return find_page
# ---- public API: search / open / find ------------------------------------
def search(self, *, query: str, topn: int = 5) -> Dict[str, Any]:
if not self._client:
raise RuntimeError('Client not provided')
resp = self._client.web_search(query, max_results=topn)
normalized: Dict[str, Any] = {'results': {}}
rows: List[Dict[str, str]] = []
for item in resp.results:
content = item.content or ''
rows.append(
{
'title': item.title,
'url': item.url,
'content': content,
}
)
normalized['results'][query] = rows
search_page = self._build_search_results_page_collection(query, normalized)
self._save_page(search_page)
cursor = len(self.get_state().page_stack) - 1
for query_results in normalized.get('results', {}).values():
for i, r in enumerate(query_results):
ws = WebSearchResult(
title=r.get('title', ''),
url=r.get('url', ''),
content={'fullText': r.get('content', '') or ''},
)
result_page = self._build_search_result_page(ws, i + 1)
data = self.get_state()
data.url_to_page[result_page.url] = result_page
self.state.set_data(data)
page_text = self._display_page(search_page, cursor, loc=0, num_lines=-1)
return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)}
def open(
self,
*,
id: Optional[str | int] = None,
cursor: int = -1,
loc: int = 0,
num_lines: int = -1,
) -> Dict[str, Any]:
if not self._client:
raise RuntimeError('Client not provided')
state = self.get_state()
if isinstance(id, str):
url = id
if url in state.url_to_page:
self._save_page(state.url_to_page[url])
cursor = len(self.get_state().page_stack) - 1
page_text = self._display_page(state.url_to_page[url], cursor, loc, num_lines)
return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)}
fetch_response = self._client.web_fetch(url)
normalized: Dict[str, Any] = {
'results': {
url: [
{
'title': fetch_response.title or url,
'url': url,
'content': fetch_response.content or '',
}
]
}
}
new_page = self._build_page_from_fetch(url, normalized)
self._save_page(new_page)
cursor = len(self.get_state().page_stack) - 1
page_text = self._display_page(new_page, cursor, loc, num_lines)
return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)}
# Resolve current page from stack only if needed (int id or no id)
page: Optional[Page] = None
if cursor >= 0:
if state.page_stack:
if cursor >= len(state.page_stack):
cursor = max(0, len(state.page_stack) - 1)
page = self._page_from_stack(state.page_stack[cursor])
else:
page = None
else:
if state.page_stack:
page = self._page_from_stack(state.page_stack[-1])
if isinstance(id, int):
if not page:
raise RuntimeError('No current page to resolve link from')
link_url = page.links.get(id)
if not link_url:
err = Page(
url=f'invalid_link_{id}',
title=f'No link with id {id} on `{page.title}`',
text='',
lines=[],
links={},
fetched_at=datetime.utcnow(),
)
available = sorted(page.links.keys())
available_list = ', '.join(map(str, available)) if available else '(none)'
err.text = '\n'.join(
[
f'Requested link id: {id}',
f'Current page: {page.title}',
f'Available link ids on this page: {available_list}',
'',
'Tips:',
'- To scroll this page, call browser_open with { loc, num_lines } (no id).',
'- To open a result from a search results page, pass the correct { cursor, id }.',
]
)
err.lines = self._wrap_lines(err.text, 80)
self._save_page(err)
cursor = len(self.get_state().page_stack) - 1
page_text = self._display_page(err, cursor, 0, -1)
return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)}
new_page = state.url_to_page.get(link_url)
if not new_page:
fetch_response = self._client.web_fetch(link_url)
normalized: Dict[str, Any] = {
'results': {
link_url: [
{
'title': fetch_response.title or link_url,
'url': link_url,
'content': fetch_response.content or '',
}
]
}
}
new_page = self._build_page_from_fetch(link_url, normalized)
self._save_page(new_page)
cursor = len(self.get_state().page_stack) - 1
page_text = self._display_page(new_page, cursor, loc, num_lines)
return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)}
if not page:
raise RuntimeError('No current page to display')
cur = self.get_state()
cur.page_stack.append(page.url)
self.state.set_data(cur)
cursor = len(cur.page_stack) - 1
page_text = self._display_page(page, cursor, loc, num_lines)
return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)}
def find(self, *, pattern: str, cursor: int = -1) -> Dict[str, Any]:
state = self.get_state()
if cursor == -1:
if not state.page_stack:
raise RuntimeError('No pages to search in')
page = self._page_from_stack(state.page_stack[-1])
cursor = len(state.page_stack) - 1
else:
if cursor < 0 or cursor >= len(state.page_stack):
cursor = max(0, min(cursor, len(state.page_stack) - 1))
page = self._page_from_stack(state.page_stack[cursor])
find_page = self._build_find_results_page(pattern, page)
self._save_page(find_page)
new_cursor = len(self.get_state().page_stack) - 1
page_text = self._display_page(find_page, new_cursor, 0, -1)
return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)}