aboutsummaryrefslogtreecommitdiffstats
path: root/python/vhdl_langserver/workspace.py
diff options
context:
space:
mode:
authortgingold <tgingold@users.noreply.github.com>2020-12-29 17:19:09 +0100
committerGitHub <noreply@github.com>2020-12-29 17:19:09 +0100
commit7ca54117b8f757396ba5ef04c83ff1228ca94384 (patch)
treecc5f7f4166cc0570bda5cf2047d3ef7abbc36e37 /python/vhdl_langserver/workspace.py
parent340fc792bba2ffdb4f930bc427a39ea3a1b659b2 (diff)
parent50adcf884c3cfa4e33ca45769295f163baa63a3e (diff)
downloadghdl-7ca54117b8f757396ba5ef04c83ff1228ca94384.tar.gz
ghdl-7ca54117b8f757396ba5ef04c83ff1228ca94384.tar.bz2
ghdl-7ca54117b8f757396ba5ef04c83ff1228ca94384.zip
Merge pull request #1556 from Paebbels/paebbels/pyGHDL
Cleanup and Restructuring of pyGHDL
Diffstat (limited to 'python/vhdl_langserver/workspace.py')
-rw-r--r--python/vhdl_langserver/workspace.py499
1 files changed, 0 insertions, 499 deletions
diff --git a/python/vhdl_langserver/workspace.py b/python/vhdl_langserver/workspace.py
deleted file mode 100644
index 9d61225ee..000000000
--- a/python/vhdl_langserver/workspace.py
+++ /dev/null
@@ -1,499 +0,0 @@
-import logging
-import os
-import json
-from ctypes import byref
-import libghdl
-import libghdl.thin.errorout_memory as errorout_memory
-import libghdl.thin.flags
-import libghdl.thin.errorout as errorout
-import libghdl.thin.files_map as files_map
-import libghdl.thin.libraries as libraries
-import libghdl.thin.name_table as name_table
-import libghdl.thin.vhdl.nodes as nodes
-import libghdl.thin.vhdl.lists as lists
-import libghdl.thin.vhdl.std_package as std_package
-import libghdl.thin.vhdl.parse
-import libghdl.thin.vhdl.pyutils as pyutils
-import libghdl.thin.vhdl.sem_lib as sem_lib
-
-from . import lsp
-from . import document, symbols
-
-log = logging.getLogger(__name__)
-
-
-class ProjectError(Exception):
- "Exception raised in case of unrecoverable error in the project file."
-
- def __init__(self, msg):
- super().__init__()
- self.msg = msg
-
-
-class Workspace(object):
- def __init__(self, root_uri, server):
- self._root_uri = root_uri
- self._server = server
- self._root_path = lsp.path_from_uri(self._root_uri)
- self._docs = {} # uri -> doc
- self._fe_map = {} # fe -> doc
- self._prj = {}
- self._last_linted_doc = None
- errorout_memory.Install_Handler()
- libghdl.thin.flags.Flag_Elocations.value = True
- # thin.Flags.Verbose.value = True
- # We do analysis even in case of errors.
- libghdl.thin.vhdl.parse.Flag_Parse_Parenthesis.value = True
- # Force analysis to get more feedback + navigation even in case
- # of errors.
- libghdl.thin.flags.Flag_Force_Analysis.value = True
- # Do not consider analysis order issues.
- libghdl.thin.flags.Flag_Elaborate_With_Outdated.value = True
- libghdl.thin.errorout.Enable_Warning(errorout.Msgid.Warnid_Unused, True)
- self.read_project()
- self.set_options_from_project()
- libghdl.analyze_init()
- self._diags_set = set() # Documents with at least one diagnostic.
- self.read_files_from_project()
- self.gather_diagnostics(None)
-
- @property
- def documents(self):
- return self._docs
-
- @property
- def root_path(self):
- return self._root_path
-
- @property
- def root_uri(self):
- return self._root_uri
-
- def _create_document(self, doc_uri, sfe, version=None):
- """Create a document and put it in this workspace."""
- doc = document.Document(doc_uri, sfe, version)
- self._docs[doc_uri] = doc
- self._fe_map[sfe] = doc
- return doc
-
- def create_document_from_sfe(self, sfe, abspath):
- # A filename has been given without a corresponding document.
- # Create the document.
- # Common case: an error message was reported in a non-open document.
- # Create a document so that it could be reported to the client.
- doc_uri = lsp.path_to_uri(os.path.normpath(abspath))
- return self._create_document(doc_uri, sfe)
-
- def create_document_from_uri(self, doc_uri, source=None, version=None):
- # A document is referenced by an uri but not known. Load it.
- # We assume the path is correct.
- path = lsp.path_from_uri(doc_uri)
- if source is None:
- source = open(path).read()
- sfe = document.Document.load(
- source, os.path.dirname(path), os.path.basename(path)
- )
- return self._create_document(doc_uri, sfe)
-
- def get_or_create_document(self, doc_uri):
- res = self.get_document(doc_uri)
- if res is not None:
- return res
- res = self.create_document_from_uri(doc_uri)
- res.parse_document()
- return res
-
- def get_document(self, doc_uri):
- """Get a document from :param doc_uri: Note that the document may not exist,
- and this function may return None."""
- return self._docs.get(doc_uri)
-
- def put_document(self, doc_uri, source, version=None):
- doc = self.get_document(doc_uri)
- if doc is None:
- doc = self.create_document_from_uri(doc_uri, source=source, version=version)
- else:
- # The document may already be present (loaded from a project)
- # In that case, overwrite it as the client may have a more
- # recent version.
- doc.reload(source)
- return doc
-
- def sfe_to_document(self, sfe):
- """Get the document correspond to :param sfe: source file.
- Can create the document if needed."""
- assert sfe != 0
- doc = self._fe_map.get(sfe, None)
- if doc is None:
- # Could be a document from outside...
- filename = pyutils.name_image(files_map.Get_File_Name(sfe))
- if not os.path.isabs(filename):
- dirname = pyutils.name_image(files_map.Get_Directory_Name(sfe))
- filename = os.path.join(dirname, filename)
- doc = self.create_document_from_sfe(sfe, filename)
- return doc
-
- def add_vhdl_file(self, name):
- log.info("loading %s", name)
- if os.path.isabs(name):
- absname = name
- else:
- absname = os.path.join(self._root_path, name)
- # Create a document for this file.
- try:
- fd = open(absname)
- sfe = document.Document.load(fd.read(), self._root_path, name)
- fd.close()
- except OSError as err:
- self._server.show_message(
- lsp.MessageType.Error, "cannot load {}: {}".format(name, err.strerror)
- )
- return
- doc = self.create_document_from_sfe(sfe, absname)
- doc.parse_document()
-
- def read_project(self):
- prj_file = os.path.join(self.root_path, "hdl-prj.json")
- if not os.path.exists(prj_file):
- log.info("project file %s does not exist", prj_file)
- return
- try:
- f = open(prj_file)
- except OSError as err:
- self._server.show_message(
- lsp.MessageType.Error,
- "cannot open project file {}: {}".format(prj_file, err.strerror),
- )
- return
- log.info("reading project file %s", prj_file)
- try:
- self._prj = json.load(f)
- except json.decoder.JSONDecodeError as e:
- log.info("error in project file")
- self._server.show_message(
- lsp.MessageType.Error,
- "json error in project file {}:{}:{}".format(
- prj_file, e.lineno, e.colno
- ),
- )
- f.close()
-
- def set_options_from_project(self):
- try:
- if self._prj is None:
- return
- if not isinstance(self._prj, dict):
- raise ProjectError("project file is not a dictionnary")
- opts = self._prj.get("options", None)
- if opts is None:
- return
- if not isinstance(opts, dict):
- raise ProjectError("'options' is not a dictionnary")
- ghdl_opts = opts.get("ghdl_analysis", None)
- if ghdl_opts is None:
- return
- log.info("Using options: %s", ghdl_opts)
- for opt in ghdl_opts:
- if not libghdl.set_option(opt.encode("utf-8")):
- self._server.show_message(
- lsp.MessageType.Error, "error with option: {}".format(opt)
- )
- except ProjectError as e:
- self._server.show_message(
- lsp.MessageType.Error, "error in project file: {}".format(e.msg)
- )
-
- def read_files_from_project(self):
- try:
- files = self._prj.get("files", [])
- if not isinstance(files, list):
- raise ProjectError("'files' is not a list")
- for f in files:
- if not isinstance(f, dict):
- raise ProjectError("an element of 'files' is not a dict")
- name = f.get("file")
- if not isinstance(name, str):
- raise ProjectError("a 'file' is not a string")
- lang = f.get("language", "vhdl")
- if lang == "vhdl":
- self.add_vhdl_file(name)
- except ProjectError as e:
- self._server.show_message(
- lsp.MessageType.Error, "error in project file: {}".format(e.msg)
- )
-
- def get_configuration(self):
- self._server.configuration(
- [{"scopeUri": "", "section": "vhdl.maxNumberOfProblems"}]
- )
-
- def gather_diagnostics(self, doc):
- # Gather messages (per file)
- nbr_msgs = errorout_memory.Get_Nbr_Messages()
- diags = {}
- diag = {}
- for i in range(nbr_msgs):
- hdr = errorout_memory.Get_Error_Record(i + 1)
- msg = errorout_memory.Get_Error_Message(i + 1).decode("utf-8")
- if hdr.file == 0:
- # Possible for error limit reached.
- continue
- err_range = {
- "start": {"line": hdr.line - 1, "character": hdr.offset},
- "end": {"line": hdr.line - 1, "character": hdr.offset + hdr.length},
- }
- if hdr.group <= errorout_memory.Msg_Main:
- if hdr.id <= errorout.Msgid.Msgid_Note:
- severity = lsp.DiagnosticSeverity.Information
- elif hdr.id <= errorout.Msgid.Msgid_Warning:
- severity = lsp.DiagnosticSeverity.Warning
- else:
- severity = lsp.DiagnosticSeverity.Error
- diag = {
- "source": "ghdl",
- "range": err_range,
- "message": msg,
- "severity": severity,
- }
- if hdr.group == errorout_memory.Msg_Main:
- diag["relatedInformation"] = []
- fdiag = diags.get(hdr.file, None)
- if fdiag is None:
- diags[hdr.file] = [diag]
- else:
- fdiag.append(diag)
- else:
- assert diag
- if True:
- doc = self.sfe_to_document(hdr.file)
- diag["relatedInformation"].append(
- {
- "location": {"uri": doc.uri, "range": err_range},
- "message": msg,
- }
- )
- errorout_memory.Clear_Errors()
- # Publish diagnostics
- for sfe, diag in diags.items():
- doc = self.sfe_to_document(sfe)
- self.publish_diagnostics(doc.uri, diag)
- if doc is not None and doc._fe not in diags:
- # Clear previous diagnostics for the doc.
- self.publish_diagnostics(doc.uri, [])
-
- def obsolete_dependent_units(self, unit, antideps):
- """Obsolete units that depends of :param unit:"""
- udeps = antideps.get(unit, None)
- if udeps is None:
- # There are no units.
- return
- # Avoid infinite recursion
- antideps[unit] = None
- for un in udeps:
- log.debug(
- "obsolete %d %s", un, pyutils.name_image(nodes.Get_Identifier(un))
- )
- # Recurse
- self.obsolete_dependent_units(un, antideps)
- if nodes.Set_Date_State(un) == nodes.Date_State.Disk:
- # Already obsolete!
- continue
- # FIXME: just de-analyze ?
- nodes.Set_Date_State(un, nodes.Date_State.Disk)
- sem_lib.Free_Dependence_List(un)
- loc = nodes.Get_Location(un)
- fil = files_map.Location_To_File(loc)
- pos = files_map.Location_File_To_Pos(loc, fil)
- line = files_map.Location_File_To_Line(loc, fil)
- col = files_map.Location_File_Line_To_Offset(loc, fil, line)
- nodes.Set_Design_Unit_Source_Pos(un, pos)
- nodes.Set_Design_Unit_Source_Line(un, line)
- nodes.Set_Design_Unit_Source_Col(un, col)
-
- def obsolete_doc(self, doc):
- if doc._tree == nodes.Null_Iir:
- return
- # Free old tree
- assert nodes.Get_Kind(doc._tree) == nodes.Iir_Kind.Design_File
- if self._last_linted_doc == doc:
- antideps = None
- else:
- antideps = self.compute_anti_dependences()
- unit = nodes.Get_First_Design_Unit(doc._tree)
- while unit != nodes.Null_Iir:
- if antideps is not None:
- self.obsolete_dependent_units(unit, antideps)
- # FIXME: free unit; it is not referenced.
- unit = nodes.Get_Chain(unit)
- libraries.Purge_Design_File(doc._tree)
- doc._tree = nodes.Null_Iir
-
- def lint(self, doc_uri):
- doc = self.get_document(doc_uri)
- self.obsolete_doc(doc)
- doc.compute_diags()
- self.gather_diagnostics(doc)
-
- def apply_changes(self, doc_uri, contentChanges, new_version):
- doc = self.get_document(doc_uri)
- assert doc is not None, "try to modify a non-loaded document"
- self.obsolete_doc(doc)
- prev_sfe = doc._fe
- for change in contentChanges:
- doc.apply_change(change)
- if doc._fe != prev_sfe:
- del self._fe_map[prev_sfe]
- self._fe_map[doc._fe] = doc
- # Like lint
- doc.compute_diags()
- self.gather_diagnostics(doc)
-
- def check_document(self, doc_uri, source):
- self._docs[doc_uri].check_document(source)
-
- def rm_document(self, doc_uri):
- pass
-
- def apply_edit(self, edit):
- return self._server.request("workspace/applyEdit", {"edit": edit})
-
- def publish_diagnostics(self, doc_uri, diagnostics):
- self._server.notify(
- "textDocument/publishDiagnostics",
- params={"uri": doc_uri, "diagnostics": diagnostics},
- )
-
- def show_message(self, message, msg_type=lsp.MessageType.Info):
- self._server.notify(
- "window/showMessage", params={"type": msg_type, "message": message}
- )
-
- def declaration_to_location(self, decl):
- "Convert declaration :param decl: to an LSP Location"
- decl_loc = nodes.Get_Location(decl)
- if decl_loc == std_package.Std_Location.value:
- # There is no real file for the std.standard package.
- return None
- if decl_loc == libraries.Library_Location.value:
- # Libraries declaration are virtual.
- return None
- fe = files_map.Location_To_File(decl_loc)
- doc = self.sfe_to_document(fe)
- res = {"uri": doc.uri}
- nid = nodes.Get_Identifier(decl)
- res["range"] = {
- "start": symbols.location_to_position(fe, decl_loc),
- "end": symbols.location_to_position(
- fe, decl_loc + name_table.Get_Name_Length(nid)
- ),
- }
- return res
-
- def goto_definition(self, doc_uri, position):
- decl = self._docs[doc_uri].goto_definition(position)
- if decl is None:
- return None
- decl_loc = self.declaration_to_location(decl)
- if decl_loc is None:
- return None
- res = [decl_loc]
- if nodes.Get_Kind(decl) == nodes.Iir_Kind.Component_Declaration:
- ent = libraries.Find_Entity_For_Component(nodes.Get_Identifier(decl))
- if ent != nodes.Null_Iir:
- res.append(self.declaration_to_location(nodes.Get_Library_Unit(ent)))
- return res
-
- def x_show_all_files(self):
- res = []
- for fe in range(1, files_map.Get_Last_Source_File_Entry() + 1):
- doc = self._fe_map.get(fe, None)
- res.append(
- {
- "fe": fe,
- "uri": doc.uri if doc is not None else None,
- "name": pyutils.name_image(files_map.Get_File_Name(fe)),
- "dir": pyutils.name_image(files_map.Get_Directory_Name(fe)),
- }
- )
- return res
-
- def x_get_all_entities(self):
- res = []
- lib = libraries.Get_Libraries_Chain()
- while lib != nodes.Null_Iir:
- files = nodes.Get_Design_File_Chain(lib)
- ents = []
- while files != nodes.Null_Iir:
- units = nodes.Get_First_Design_Unit(files)
- while units != nodes.Null_Iir:
- unitlib = nodes.Get_Library_Unit(units)
- if nodes.Get_Kind(unitlib) == nodes.Iir_Kind.Entity_Declaration:
- ents.append(unitlib)
- units = nodes.Get_Chain(units)
- files = nodes.Get_Chain(files)
- ents = [pyutils.name_image(nodes.Get_Identifier(e)) for e in ents]
- lib_name = pyutils.name_image(nodes.Get_Identifier(lib))
- res.extend([{"name": n, "library": lib_name} for n in ents])
- lib = nodes.Get_Chain(lib)
- return res
-
- def x_get_entity_interface(self, library, name):
- def create_interfaces(inters):
- res = []
- while inters != nodes.Null_Iir:
- res.append(
- {
- "name": name_table.Get_Name_Ptr(
- nodes.Get_Identifier(inters)
- ).decode("latin-1")
- }
- )
- inters = nodes.Get_Chain(inters)
- return res
-
- # Find library
- lib_id = name_table.Get_Identifier(library.encode("utf-8"))
- lib = libraries.Get_Library_No_Create(lib_id)
- if lib == name_table.Null_Identifier:
- return None
- # Find entity
- ent_id = name_table.Get_Identifier(name.encode("utf-8"))
- unit = libraries.Find_Primary_Unit(lib, ent_id)
- if unit == nodes.Null_Iir:
- return None
- ent = nodes.Get_Library_Unit(unit)
- return {
- "library": library,
- "entity": name,
- "generics": create_interfaces(nodes.Get_Generic_Chain(ent)),
- "ports": create_interfaces(nodes.Get_Port_Chain(ent)),
- }
-
- def compute_anti_dependences(self):
- """Return a dictionnary of anti dependencies for design unit"""
- res = {}
- lib = libraries.Get_Libraries_Chain()
- while lib != nodes.Null_Iir:
- files = nodes.Get_Design_File_Chain(lib)
- while files != nodes.Null_Iir:
- units = nodes.Get_First_Design_Unit(files)
- while units != nodes.Null_Iir:
- if nodes.Get_Date_State(units) == nodes.Date_State.Analyze:
- # The unit has been analyzed, so the dependencies are know.
- deps = nodes.Get_Dependence_List(units)
- assert deps != nodes.Null_Iir_List
- deps_it = lists.Iterate(deps)
- while lists.Is_Valid(byref(deps_it)):
- el = lists.Get_Element(byref(deps_it))
- if nodes.Get_Kind(el) == nodes.Iir_Kind.Design_Unit:
- if res.get(el, None):
- res[el].append(units)
- else:
- res[el] = [units]
- else:
- assert False
- lists.Next(byref(deps_it))
- units = nodes.Get_Chain(units)
- files = nodes.Get_Chain(files)
- lib = nodes.Get_Chain(lib)
- return res