import collections
import inspect
import logging
import pathlib
import re
import traceback
import typing
import warnings
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import TypeVar
import pygls.uris as Uri
from docutils.parsers.rst import Directive
from lsprotocol.converters import get_converter
from lsprotocol.types import ClientCapabilities
from lsprotocol.types import CodeAction
from lsprotocol.types import CodeActionParams
from lsprotocol.types import CompletionItem
from lsprotocol.types import CompletionItemTag
from lsprotocol.types import DeleteFilesParams
from lsprotocol.types import Diagnostic
from lsprotocol.types import DidSaveTextDocumentParams
from lsprotocol.types import DocumentLink
from lsprotocol.types import InitializedParams
from lsprotocol.types import InitializeParams
from lsprotocol.types import Location
from lsprotocol.types import MarkupKind
from lsprotocol.types import Position
from pygls import IS_WIN
from pygls.capabilities import get_capability
from pygls.server import LanguageServer
from pygls.workspace import Document
from esbonio.cli import setup_cli
from esbonio.lsp.log import setup_logging
from .config import InitializationOptions
from .config import ServerCompletionConfig
from .io import read_initial_doctree
converter = get_converter()
LF = TypeVar("LF", bound="LanguageFeature")
TRIPLE_QUOTE = re.compile("(\"\"\"|''')")
"""A regular expression matching the triple quotes used to delimit python docstrings."""
# fmt: off
# Order matters!
DEFAULT_MODULES = [
"esbonio.lsp.directives", # Generic directive support
"esbonio.lsp.roles", # Generic roles support
"esbonio.lsp.rst.directives", # Specialised support for docutils directives
"esbonio.lsp.rst.roles", # Specialised support for docutils roles
]
"""The modules to load in the default configuration of the server."""
# fmt: on
[docs]
class CompletionContext:
"""Captures the context within which a completion request has been made."""
def __init__(
self,
*,
doc: Document,
location: str,
match: "re.Match",
position: Position,
config: ServerCompletionConfig,
capabilities: ClientCapabilities,
):
self.doc: Document = doc
"""The document within which the completion request was made."""
self.location: str = location
"""The location type where the request was made.
See :meth:`~esbonio.lsp.rst.RstLanguageServer.get_location_type` for details."""
self.match: "re.Match" = match
"""The match object describing the site of the completion request."""
self.position: Position = position
"""The position at which the completion request was made."""
self.config: ServerCompletionConfig = config
"""User supplied configuration options."""
self._client_capabilities: ClientCapabilities = capabilities
def __repr__(self):
p = f"{self.position.line}:{self.position.character}"
return (
f"CompletionContext<{self.doc.uri}:{p} ({self.location}) -- {self.match}>"
)
@property
def commit_characters_support(self) -> bool:
"""Indicates if the client supports commit characters."""
return get_capability(
self._client_capabilities,
"text_document.completion.completion_item.commit_characters_support",
False,
)
@property
def deprecated_support(self) -> bool:
"""Indicates if the client supports the deprecated field on a
``CompletionItem``."""
return get_capability(
self._client_capabilities,
"text_document.completion.completion_item.deprecated_support",
False,
)
@property
def documentation_formats(self) -> List[MarkupKind]:
"""The list of documentation formats supported by the client."""
return get_capability(
self._client_capabilities,
"text_document.completion.completion_item.documentation_format",
[],
)
@property
def insert_replace_support(self) -> bool:
"""Indicates if the client supports ``InsertReplaceEdit``."""
return get_capability(
self._client_capabilities,
"text_document.completion.completion_item.insert_replace_support",
False,
)
@property
def preselect_support(self) -> bool:
"""Indicates if the client supports the preselect field on a
``CompletionItem``."""
return get_capability(
self._client_capabilities,
"text_document.completion.completion_item.preselect_support",
False,
)
@property
def snippet_support(self) -> bool:
"""Indicates if the client supports snippets"""
return get_capability(
self._client_capabilities,
"text_document.completion.completion_item.snippet_support",
False,
)
@property
def supported_tags(self) -> List[CompletionItemTag]:
"""The list of ``CompletionItemTags`` supported by the client."""
capabilities = get_capability(
self._client_capabilities,
"text_document.completion.completion_item.tag_support",
None,
)
if not capabilities:
return []
return capabilities.value_set
[docs]
class DocumentLinkContext:
"""Captures the context within which a document link request has been made."""
def __init__(self, *, doc: Document, capabilities: ClientCapabilities):
self.doc = doc
"""The document within which the document link request was made."""
self._client_capabilities = capabilities
@property
def tooltip_support(self) -> bool:
"""Indicates if the client supports tooltips."""
return get_capability(
self._client_capabilities,
"text_document.document_link.tooltip_support",
False,
)
[docs]
class DefinitionContext:
"""A class that captures the context within which a definition request has been
made."""
def __init__(
self, *, doc: Document, location: str, match: "re.Match", position: Position
):
self.doc = doc
"""The document within which the definition request was made."""
self.location = location
"""The location type where the request was made.
See :meth:`~esbonio.lsp.rst.RstLanguageServer.get_location_type` for details."""
self.match = match
"""The match object describing the site of the definition request."""
self.position = position
"""The position at which the definition request was made."""
def __repr__(self):
p = f"{self.position.line}:{self.position.character}"
return (
f"DefinitionContext<{self.doc.uri}:{p} ({self.location}) -- {self.match}>"
)
class ImplementationContext:
"""A class that captures the context within which an implementation request has been
made."""
def __init__(
self, *, doc: Document, location: str, match: "re.Match", position: Position
):
self.doc = doc
"""The document within which the implementation request was made."""
self.location = location
"""The location type where the request was made.
See :meth:`~esbonio.lsp.rst.RstLanguageServer.get_location_type` for details."""
self.match = match
"""The match object describing the site of the implementation request."""
self.position = position
"""The position at which the implementation request was made."""
def __repr__(self):
p = f"{self.position.line}:{self.position.character}"
return f"ImplementationContext<{self.doc.uri}:{p} ({self.location}) -- {self.match}>"
class HoverContext:
"""A class that captures the context within a hover request has been made."""
def __init__(
self,
*,
doc: Document,
location: str,
match: "re.Match",
position: Position,
capabilities: ClientCapabilities,
):
self.doc = doc
self.location = location
self.match = match
self.position = position
self._client_capabilities = capabilities
def __repr__(self):
p = f"{self.position.line}:{self.position.character}"
return f"HoverContext<{self.doc.uri}:{p} ({self.location}) -- {self.match}>"
@property
def content_formats(self) -> List[MarkupKind]:
"""The list of content formats supported by the client."""
return get_capability(
self._client_capabilities, "text_document.hover.content_format", []
)
[docs]
class LanguageFeature:
"""Base class for language features."""
def __init__(self, rst: "RstLanguageServer"):
self.rst = rst
self.logger = rst.logger.getChild(self.__class__.__name__)
[docs]
def initialize(self, options: InitializeParams) -> None:
"""Called once when the server is first initialized."""
[docs]
def initialized(self, params: InitializedParams) -> None:
"""Called once upon receipt of the `initialized` notification from the client."""
[docs]
def on_shutdown(self, *args):
"""Called as the server is shutting down."""
[docs]
def save(self, params: DidSaveTextDocumentParams) -> None:
"""Called each time a document is saved."""
[docs]
def delete_files(self, params: DeleteFilesParams) -> None:
"""Called each time files are deleted."""
[docs]
def code_action(self, params: CodeActionParams) -> List[CodeAction]:
"""Called when code actions should be computed."""
return []
hover_triggers: List["re.Pattern"] = []
[docs]
def hover(self, context: HoverContext) -> str:
"""Called when request textDocument/hover is sent.
This method shall return the contents value of textDocument/hover response.
"""
return ""
completion_triggers: List["re.Pattern"] = []
"""A list of regular expressions used to determine if the
:meth`~esbonio.lsp.rst.LanguageFeature.complete` method should be called on the
current line."""
[docs]
def complete(self, context: CompletionContext) -> List[CompletionItem]:
"""Called if any of the given ``completion_triggers`` match the current line.
This method should return a list of ``CompletionItem`` objects.
Parameters
----------
context:
The context of the completion request
"""
return []
[docs]
def completion_resolve(self, item: CompletionItem) -> CompletionItem:
"""Called with a completion item the user has selected in the UI,
allowing for additional details to be provided e.g. documentation.
Parameters
----------
item:
The completion item to provide additional details for.
"""
return item
definition_triggers: List["re.Pattern"] = []
"""A list of regular expressions used to determine if the
:meth:`~esbonio.lsp.rst.LanguageFeature.definition` method should be called."""
[docs]
def definition(self, context: DefinitionContext) -> List[Location]:
"""Called if any of the given ``definition_triggers`` match the current line.
This method should return a list of ``Location`` objects.
Parameters
----------
context:
The context of the definition request.
"""
return []
implementation_triggers: List["re.Pattern"] = []
"""A list of regular expressions used to determine if the
:meth:`~esbonio.lsp.rst.LanguageFeature.implementation` method should be called."""
[docs]
def implementation(self, context: ImplementationContext) -> List[Location]:
"""Called if any of the given ``implementation_triggers`` match the current line.
This method should return a list of ``Location`` objects.
Parameters
----------
context:
The context of the implementation request.
"""
return []
[docs]
def document_link(self, context: DocumentLinkContext) -> List[DocumentLink]:
"""Called whenever a ``textDocument/documentLink`` request is received."""
return []
class DiagnosticList(collections.UserList):
"""A list type dedicated to holding diagnostics.
This is mainly to ensure that only one instance of a diagnostic ever gets
reported.
"""
def append(self, item: Diagnostic):
if not isinstance(item, Diagnostic):
raise TypeError("Expected Diagnostic")
for existing in self.data:
fields = [
existing.range == item.range,
existing.message == item.message,
existing.severity == item.severity,
existing.code == item.code,
existing.source == item.source,
]
if all(fields):
# Item already added, nothing to do.
return
self.data.append(item)
[docs]
class RstLanguageServer(LanguageServer):
"""A generic reStructuredText language server."""
def __init__(self, logger: Optional[logging.Logger] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logger or logging.getLogger(__name__)
"""The base logger that should be used for all language sever log entries."""
self.converter = self.lsp._converter
"""The cattrs converter instance we should use."""
self.user_config: InitializationOptions = InitializationOptions()
"""The user's configuration."""
self._diagnostics: Dict[Tuple[str, str], List[Diagnostic]] = {}
"""Where we store and manage diagnostics."""
self._loaded_extensions: Dict[str, Any] = {}
"""Record of modules that have been loaded."""
self._features: Dict[str, LanguageFeature] = {}
"""The collection of language features registered with the server."""
@property
def configuration(self) -> Dict[str, Any]:
"""Return the server's actual configuration."""
return converter.unstructure(self.user_config)
def initialize(self, params: InitializeParams):
self.user_config = converter.structure(
params.initialization_options or {}, InitializationOptions
)
setup_logging(self, self.user_config.server)
def initialized(self, params: InitializedParams):
pass
def on_shutdown(self, *args):
pass
def save(self, params: DidSaveTextDocumentParams):
pass
def delete_files(self, params: DeleteFilesParams):
pass
def build(self, force_all: bool = False, filenames: Optional[List[str]] = None):
pass
[docs]
def load_extension(self, name: str, setup: Callable):
"""Load the given setup function as an extension.
If an extension with the given ``name`` already exists, the given setup function
will be ignored.
The ``setup`` function can declare dependencies in the form of type
annotations.
.. code-block:: python
from esbonio.lsp.roles import Roles
from esbonio.lsp.sphinx import SphinxLanguageServer
def esbonio_setup(rst: SphinxLanguageServer, roles: Roles):
...
In this example the setup function is requesting instances of the
:class:`~esbonio.lsp.sphinx.SphinxLanguageServer` and the
:class:`~esbonio.lsp.roles.Roles` language feature.
Parameters
----------
name
The name to give the extension
setup
The setup function to call
"""
if name in self._loaded_extensions:
self.logger.debug("Skipping extension '%s', already loaded", name)
return
arguments = _get_setup_arguments(self, setup, name)
if not arguments:
return
try:
setup(**arguments)
self.logger.debug("Loaded extension '%s'", name)
self._loaded_extensions[name] = setup
except Exception:
self.logger.error(
"Unable to load extension '%s'\n%s", name, traceback.format_exc()
)
[docs]
def add_feature(self, feature: "LanguageFeature"):
"""Register a language feature with the server.
Parameters
----------
feature
The language feature
"""
key = f"{feature.__module__}.{feature.__class__.__name__}"
self._features[key] = feature
@typing.overload
def get_feature(self, key: str) -> "Optional[LanguageFeature]":
...
@typing.overload
def get_feature(self, key: Type[LF]) -> Optional[LF]:
...
[docs]
def get_feature(self, key):
"""Returns the requested language feature if it exists, otherwise it returns
``None``.
Parameters
----------
key: str | Type[LanguageFeature]
A feature can be referenced either by its class definition (preferred) or by
a string representing the language feature's dotted name e.g.
``a.b.c.ClassName``.
.. deprecated:: 0.14.0
Passing a string ``key`` to this method is deprecated and will become an
error in ``v1.0``.
"""
if isinstance(key, str):
warnings.warn(
"Language features should be referenced by their class definition, "
"this will become an error in v1.0.",
DeprecationWarning,
stacklevel=2,
)
elif issubclass(key, LanguageFeature):
key = f"{key.__module__}.{key.__name__}"
else:
raise TypeError("Expected language feature definition")
return self._features.get(key, None)
def get_doctree(
self, *, docname: Optional[str] = None, uri: Optional[str] = None
) -> Optional[Any]:
# Not currently implemented for vanilla docutils projects.
return None
[docs]
def get_initial_doctree(self, uri: str) -> Optional[Any]:
"""Return the initial doctree corresponding to the specified document.
An "initial" doctree can be thought of as the abstract syntax tree of a
reStructuredText document. This method disables all role and directives
from being executed, instead they are replaced with nodes that simply
represent that they exist.
Parameters
----------
uri
Returns the doctree that corresponds with the given uri.
"""
doc = self.workspace.get_document(uri)
loctype = self.get_location_type(doc, Position(line=1, character=0))
if loctype != "rst":
return None
self.logger.debug("Getting initial doctree for: '%s'", Uri.to_fs_path(uri))
try:
return read_initial_doctree(doc, self.logger)
except Exception:
self.logger.error(traceback.format_exc())
return None
[docs]
def get_directives(self) -> Dict[str, Directive]:
"""Return a dictionary of the known directives
.. deprecated:: 0.14.2
This will be removed in ``v1.0``.
Use the :meth:`~esbonio.lsp.directives.Directives.get_directives` method
instead.
"""
clsname = self.__class__.__name__
warnings.warn(
f"{clsname}.get_directives() is deprecated and will be removed in v1.0."
" Instead call the get_directives() method on the Directives language "
"feature.",
DeprecationWarning,
stacklevel=2,
)
feature = self.get_feature("esbonio.lsp.directives.Directives")
if feature is None:
return {}
return feature.get_directives() # type: ignore
[docs]
def get_directive_options(self, name: str) -> Dict[str, Any]:
"""Return the options specification for the given directive.
.. deprecated:: 0.14.2
This will be removed in ``v1.0``
"""
clsname = self.__class__.__name__
warnings.warn(
f"{clsname}.get_directive_options() is deprecated and will be removed in "
"v1.0.",
DeprecationWarning,
stacklevel=2,
)
directive = self.get_directives().get(name, None)
if directive is None:
return {}
return directive.option_spec or {}
[docs]
def get_roles(self) -> Dict[str, Any]:
"""Return a dictionary of known roles.
.. deprecated:: 0.15.0
This will be removed in ``v1.0``.
Use the :meth:`~esbonio.lsp.roles.Roles.get_roles` method instead.
"""
clsname = self.__class__.__name__
warnings.warn(
f"{clsname}.get_roles() is deprecated and will be removed in v1.0. "
"Instead call the get_roles() method on the Roles language "
"feature.",
DeprecationWarning,
stacklevel=2,
)
feature = self.get_feature("esbonio.lsp.roles.Roles")
if feature is None:
return {}
return feature.get_roles() # type: ignore
[docs]
def get_default_role(self) -> Tuple[Optional[str], Optional[str]]:
"""Return the default role for the project."""
return None, None
[docs]
def clear_diagnostics(self, source: str, uri: Optional[str] = None) -> None:
"""Clear diagnostics from the given source.
Parameters
----------
source:
The source from which to clear diagnostics.
uri:
If given, clear diagnostics from within just this uri. Otherwise, all
diagnostics from the given source are cleared.
"""
if uri:
uri = normalise_uri(uri)
for key in self._diagnostics.keys():
clear_source = source == key[0]
clear_uri = uri == key[1] or uri is None
if clear_source and clear_uri:
self._diagnostics[key] = []
[docs]
def add_diagnostics(self, source: str, uri, diagnostic: Diagnostic):
"""Add a diagnostic to the given source and uri.
Parameters
----------
source
The source the diagnostics are from
uri
The uri the diagnostics are associated with
diagnostic
The diagnostic to add
"""
key = (source, normalise_uri(uri))
self._diagnostics.setdefault(key, []).append(diagnostic)
[docs]
def set_diagnostics(
self, source: str, uri: str, diagnostics: List[Diagnostic]
) -> None:
"""Set the diagnostics for the given source and uri.
Parameters
----------
source:
The source the diagnostics are from
uri:
The uri the diagnostics are associated with
diagnostics:
The diagnostics themselves
"""
uri = normalise_uri(uri)
self._diagnostics[(source, uri)] = diagnostics
[docs]
def sync_diagnostics(self) -> None:
"""Update the client with the currently stored diagnostics."""
uris = {uri for _, uri in self._diagnostics.keys()}
diagnostics = {uri: DiagnosticList() for uri in uris}
for (source, uri), diags in self._diagnostics.items():
for diag in diags:
diag.source = source
diagnostics[uri].append(diag)
for uri, diag_list in diagnostics.items():
self.logger.debug("Publishing %d diagnostics for: %s", len(diag_list), uri)
self.publish_diagnostics(uri, diag_list.data)
[docs]
def get_location_type(self, document: Document, position: Position) -> str:
"""Given a document and a position, return the type of location.
Returns one of the following values:
- ``rst``: Indicates that the position is within an reStructuredText document
- ``py``: Indicates that the position is within code in a Python file
- ``docstring``: Indicates that the position is within a docstring in a
Python file.
If the location type cannot be determined, this function will fall back to
``rst``.
Parameters
----------
doc
The document associated with the given position
position
The position to determine the type of
"""
doc = self.workspace.get_document(document.uri)
fpath = pathlib.Path(Uri.to_fs_path(doc.uri))
# Prefer the document's language_id, but fallback to file extensions
loctype = getattr(doc, "language_id", None) or fpath.suffix
if loctype in {".rst", "rst", "restructuredtext"}:
return "rst"
if loctype in {".py", "py", "python"}:
# Let's count how many pairs of triple quotes are above us in the file
# even => we're outside a docstring
# odd => we're within a docstring
source = self.text_to_position(doc, position)
count = len(TRIPLE_QUOTE.findall(source))
return "py" if count % 2 == 0 else "docstring"
# Fallback to rst
self.logger.debug("Unable to determine location type for uri: %s", doc.uri)
return "rst"
[docs]
def line_at_position(self, doc: Document, position: Position) -> str:
"""Return the contents of the line corresponding to the given position.
Parameters
----------
doc:
The document associated with the given position
position:
The position representing the line to retrieve
"""
try:
return doc.lines[position.line]
except IndexError:
return ""
[docs]
def line_to_position(self, doc: Document, position: Position) -> str:
"""Return the contents of the line up until the given position.
Parameters
----------
doc:
The document associated with the given position.
position:
The position representing the line to retrieve.
"""
line = self.line_at_position(doc, position)
return line[: position.character]
[docs]
def preview(self, options: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a preview of the documentation."""
name = self.__class__.__name__
self.show_message(
f"Previews are not currently supported by {name} based servers"
)
return {}
[docs]
def text_to_position(self, doc: Document, position: Position) -> str:
"""Return the contents of the document up until the given position.
Parameters
----------
doc:
The document associated with the given position
position:
The position representing the point at which to stop gathering text.
"""
idx = doc.offset_at_position(position)
return doc.source[:idx]
def normalise_uri(uri: str) -> str:
uri = Uri.from_fs_path(Uri.to_fs_path(uri))
# Paths on windows are case insensitive.
if IS_WIN:
uri = uri.lower()
return uri
def _get_setup_arguments(
server: RstLanguageServer, setup: Callable, modname: str
) -> Optional[Dict[str, Any]]:
"""Given a setup function, try to construct the collection of arguments to pass to
it.
"""
annotations = typing.get_type_hints(setup)
parameters = {
p.name: annotations[p.name]
for p in inspect.signature(setup).parameters.values()
}
args = {}
for name, type_ in parameters.items():
if issubclass(server.__class__, type_):
args[name] = server
continue
if issubclass(type_, LanguageFeature):
# Try and obtain an instance of the requested language feature.
feature = server.get_feature(type_)
if feature is not None:
args[name] = feature
continue
server.logger.debug(
"Skipping extension '%s', server missing requested feature: '%s'",
modname,
type_,
)
return None
server.logger.error(
"Skipping extension '%s', parameter '%s' has unsupported type: '%s'",
modname,
name,
type_,
)
return None
return args
cli = setup_cli("esbonio.lsp.rst", "Esbonio's reStructuredText language server.")
cli.set_defaults(modules=DEFAULT_MODULES)
cli.set_defaults(server_cls=RstLanguageServer)