science_live.core.endpoints#

Nanopub Endpoint Management#

Manages connections to nanopublication servers and provides a unified interface for SPARQL queries and nanopub retrieval.

Module Contents#

Classes#

NanopubEndpoint

Abstract base class for nanopub data sources

StandardNanopubEndpoint

Standard nanopub network endpoint implementation

MockNanopubEndpoint

Test nanopub endpoint with mock data

EndpointManager

Manages multiple nanopub endpoints with failover support

API#

class science_live.core.endpoints.NanopubEndpoint[source]#

Bases: abc.ABC

Abstract base class for nanopub data sources

abstractmethod async execute_sparql(query: str) Dict[str, Any][source]#

Execute a SPARQL query against the endpoint

abstractmethod async fetch_nanopub(uri: str) Dict[str, Any][source]#

Fetch a specific nanopublication by URI

abstractmethod async search_text(text: str, limit: int = 10) List[Dict][source]#

Search nanopubs by text

class science_live.core.endpoints.StandardNanopubEndpoint(base_url: str, timeout: int = 30)[source]#

Bases: science_live.core.endpoints.NanopubEndpoint

Standard nanopub network endpoint implementation

Initialization

async _get_session() aiohttp.ClientSession[source]#

Get or create HTTP session

async close()[source]#

Close HTTP session

async execute_sparql(query: str) Dict[str, Any][source]#

Execute SPARQL query against nanopub endpoint

async fetch_nanopub(uri: str) Dict[str, Any][source]#

Fetch nanopub by URI

async search_text(text: str, limit: int = 10) List[Dict][source]#

Search nanopubs by text using SPARQL

class science_live.core.endpoints.MockNanopubEndpoint(base_url: str = 'https://test.nanopub.org', simulate_delay: bool = True, delay_range: Tuple[float, float] = (0.01, 0.05))[source]#

Bases: science_live.core.endpoints.NanopubEndpoint

Test nanopub endpoint with mock data

Initialization

async _simulate_network_delay()[source]#

Simulate realistic network delay for testing

async execute_sparql(query: str) Dict[str, Any][source]#

Return mock SPARQL results with simulated delay

async fetch_nanopub(uri: str) Dict[str, Any][source]#

Return mock nanopub data with simulated delay

async search_text(text: str, limit: int = 10) List[Dict][source]#

Return mock search results with simulated delay

class science_live.core.endpoints.EndpointManager[source]#

Manages multiple nanopub endpoints with failover support

Initialization

register_endpoint(name: str, endpoint: science_live.core.endpoints.NanopubEndpoint, is_default: bool = False)[source]#

Register a nanopub endpoint

get_endpoint(name: Optional[str] = None) science_live.core.endpoints.NanopubEndpoint[source]#

Get endpoint by name or return default

list_endpoints() List[str][source]#

List all registered endpoints

async close_all()[source]#

Close all endpoints