from collections.abc import Callable from dataclasses import dataclass, field from logging import DEBUG, basicConfig, getLogger from pathlib import Path from typing import Any from cattrs import Converter from lsprotocol.types import ( TEXT_DOCUMENT_DID_CHANGE, TEXT_DOCUMENT_DID_CLOSE, TEXT_DOCUMENT_DID_OPEN, INITIALIZE, TEXT_DOCUMENT_DOCUMENT_SYMBOL, TEXT_DOCUMENT_INLAY_HINT, Diagnostic, DiagnosticSeverity, DidChangeTextDocumentParams, DidCloseTextDocumentParams, DidOpenTextDocumentParams, DocumentSymbol, DocumentSymbolParams, InitializeParams, InlayHint, InlayHintKind, InlayHintParams, MessageType, NotebookDocumentSyncOptions, PublishDiagnosticsNotification, PublishDiagnosticsParams, ShowMessageParams, TextDocumentSyncKind, ) from pygls.lsp.server import LanguageServer from pygls.protocol import LanguageServerProtocol, default_converter from skillls.checker import ParenMismatchError, ParenMismatchErrorKind from skillls.helpers import parse_file from skillls.types import URI, Node basicConfig( filename="skillls.log", filemode="w", level=DEBUG, format="%(asctime)s [%(levelname)s]: %(message)s", ) logger = getLogger(__name__) class SkillLanguageServer(LanguageServer): ws_files: set[URI] opened_files: set[URI] scopes: dict[URI, list[Node]] errs: dict[URI, ExceptionGroup] def __init__( self, name: str, version: str, text_document_sync_kind: TextDocumentSyncKind = TextDocumentSyncKind.Incremental, notebook_document_sync: NotebookDocumentSyncOptions | None = None, ): super().__init__(name, version, text_document_sync_kind, notebook_document_sync) self.ws_files = set() self.opened_files = set() self.scopes = {} self.errs = {} def update_diagnostics(self) -> None: for uri in self.opened_files: diags: list[Diagnostic] = [] if eg := self.errs.get(uri): for exc in eg.exceptions: match exc: case ParenMismatchError(): diags.append( Diagnostic( message=f"[skill_ls] {Path.from_uri(uri).name}:{exc.loc.start.line} {exc.kind.value}", severity=DiagnosticSeverity.Error, range=exc.loc, ) ) # if diags: self.text_document_publish_diagnostics( PublishDiagnosticsParams( uri=uri, version=self.workspace.get_text_document(uri).version, diagnostics=diags, ) ) server = SkillLanguageServer("SkillLS", "0.2.0") @server.feature(INITIALIZE) def lsp_initialize(server: SkillLanguageServer, params: InitializeParams) -> None: init_options: dict[str, Any] = params.initialization_options or {} logger.info("done init") logger.debug(init_options) ws_dir = server.workspace.root_path logger.debug(ws_dir) if ws_dir: root_dir = Path(ws_dir) for file in (*root_dir.rglob("*.il"), *root_dir.rglob("*.ocn")): uri = file.as_uri() logger.debug(uri) server.ws_files.add(uri) try: server.scopes[uri] = parse_file(server.workspace.get_text_document(uri)) if server.errs.get(uri): del server.errs[uri] except ExceptionGroup as eg: server.errs[uri] = eg @server.feature(TEXT_DOCUMENT_DID_OPEN) def on_open(server: SkillLanguageServer, params: DidOpenTextDocumentParams) -> None: server.opened_files.add(params.text_document.uri) server.update_diagnostics() @server.feature(TEXT_DOCUMENT_DID_CLOSE) def on_close(server: SkillLanguageServer, params: DidCloseTextDocumentParams) -> None: server.opened_files.remove(params.text_document.uri) @server.feature(TEXT_DOCUMENT_DID_CHANGE) def on_change(server: SkillLanguageServer, params: DidChangeTextDocumentParams) -> None: try: server.scopes[params.text_document.uri] = parse_file( server.workspace.get_text_document(params.text_document.uri) ) if server.errs.get(params.text_document.uri): del server.errs[params.text_document.uri] except ExceptionGroup as eg: server.errs[params.text_document.uri] = eg server.update_diagnostics() @server.feature(TEXT_DOCUMENT_INLAY_HINT) def on_inlay(server: SkillLanguageServer, params: InlayHintParams) -> list[InlayHint]: hints: list[InlayHint] = [] for uri in server.opened_files: for node in server.scopes.get(uri, []): hints.append( InlayHint( label=node.node, kind=InlayHintKind.Type, padding_left=True, position=node.location.end, ) ) return hints @server.feature(TEXT_DOCUMENT_DOCUMENT_SYMBOL) def on_symbols( server: SkillLanguageServer, params: DocumentSymbolParams ) -> list[DocumentSymbol] | None: return [node.as_doc_symbol() for node in server.scopes[params.text_document.uri]] def main(): server.start_io()