Skip to content

Ontology Handling

Ontology Base Class

A class that represents the ontological "backbone" of a KG.

The ontology can be built from a single resource, or hybridised from a combination of resources, with one resource being the "head" ontology, while an arbitrary number of other resources can become "tail" ontologies at arbitrary fusion points inside the "head" ontology.

Source code in biocypher/_ontology.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
class Ontology:
    """A class that represents the ontological "backbone" of a KG.

    The ontology can be built from a single resource, or hybridised from a
    combination of resources, with one resource being the "head" ontology, while
    an arbitrary number of other resources can become "tail" ontologies at
    arbitrary fusion points inside the "head" ontology.
    """

    def __init__(
        self,
        head_ontology: dict,
        ontology_mapping: Optional["OntologyMapping"] = None,
        tail_ontologies: dict | None = None,
    ):
        """Initialize the Ontology class.

        Args:
        ----
            head_ontology (OntologyAdapter): The head ontology.

            tail_ontologies (list): A list of OntologyAdapters that will be
                added to the head ontology. Defaults to None.

        """
        self._head_ontology_meta = head_ontology
        self.mapping = ontology_mapping
        self._tail_ontology_meta = tail_ontologies

        self._tail_ontologies = None
        self._nx_graph = None

        # keep track of nodes that have been extended
        self._extended_nodes = set()

        self._main()

    def _main(self) -> None:
        """Instantiate the ontology.

        Loads the ontologies, joins them, and returns the hybrid ontology.
        Loads only the head ontology if nothing else is given. Adds user
        extensions and properties from the mapping.
        """
        self._load_ontologies()

        if self._tail_ontologies:
            for adapter in self._tail_ontologies.values():
                head_join_node = self._get_head_join_node(adapter)
                self._join_ontologies(adapter, head_join_node)
        else:
            self._nx_graph = self._head_ontology.get_nx_graph()

        if self.mapping:
            self._extend_ontology()

            # experimental: add connections of disjoint classes to entity
            # self._connect_biolink_classes()

            self._add_properties()

    def _load_ontologies(self) -> None:
        """For each ontology, load the OntologyAdapter object.

        Store it as an instance variable (head) or in an instance dictionary
        (tail).
        """
        logger.info("Loading ontologies...")

        self._head_ontology = OntologyAdapter(
            ontology_file=self._head_ontology_meta["url"],
            root_label=self._head_ontology_meta["root_node"],
            ontology_file_format=self._head_ontology_meta.get("format", None),
            switch_label_and_id=self._head_ontology_meta.get("switch_label_and_id", True),
        )

        if self._tail_ontology_meta:
            self._tail_ontologies = {}
            for key, value in self._tail_ontology_meta.items():
                self._tail_ontologies[key] = OntologyAdapter(
                    ontology_file=value["url"],
                    root_label=value["tail_join_node"],
                    head_join_node_label=value["head_join_node"],
                    ontology_file_format=value.get("format", None),
                    merge_nodes=value.get("merge_nodes", True),
                    switch_label_and_id=value.get("switch_label_and_id", True),
                )

    def _get_head_join_node(self, adapter: OntologyAdapter) -> str:
        """Try to find the head join node of the given ontology adapter.

        Find the node in the head ontology that is the head join node. If the
        join node is not found, the method will raise an error.

        Args:
        ----
            adapter (OntologyAdapter): The ontology adapter of which to find the
                join node in the head ontology.

        Returns:
        -------
            str: The head join node in the head ontology.

        Raises:
        ------
            ValueError: If the head join node is not found in the head ontology.

        """
        head_join_node = None
        user_defined_head_join_node_label = adapter.get_head_join_node()
        head_join_node_label_in_bc_format = to_lower_sentence_case(user_defined_head_join_node_label.replace("_", " "))

        if self._head_ontology._switch_label_and_id:
            head_join_node = head_join_node_label_in_bc_format
        elif not self._head_ontology._switch_label_and_id:
            for node_id, data in self._head_ontology.get_nx_graph().nodes(data=True):
                if "label" in data and data["label"] == head_join_node_label_in_bc_format:
                    head_join_node = node_id
                    break

        if head_join_node not in self._head_ontology.get_nx_graph().nodes:
            head_ontology = self._head_ontology._rdf_to_nx(
                self._head_ontology.get_rdf_graph(),
                self._head_ontology._root_label,
                self._head_ontology._switch_label_and_id,
                rename_nodes=False,
            )
            msg = (
                f"Head join node '{head_join_node}' not found in head ontology. "
                f"The head ontology contains the following nodes: {head_ontology.nodes}."
            )
            logger.error(msg)
            raise ValueError(msg)
        return head_join_node

    def _join_ontologies(self, adapter: OntologyAdapter, head_join_node) -> None:
        """Join the present ontologies.

        Join two ontologies by adding the tail ontology as a subgraph to the
        head ontology at the specified join nodes.

        Args:
        ----
            adapter (OntologyAdapter): The ontology adapter of the tail ontology
                to be added to the head ontology.

        """
        if not self._nx_graph:
            self._nx_graph = self._head_ontology.get_nx_graph().copy()

        tail_join_node = adapter.get_root_node()
        tail_ontology = adapter.get_nx_graph()

        # subtree of tail ontology at join node
        tail_ontology_subtree = nx.dfs_tree(tail_ontology.reverse(), tail_join_node).reverse()

        # transfer node attributes from tail ontology to subtree
        for node in tail_ontology_subtree.nodes:
            tail_ontology_subtree.nodes[node].update(tail_ontology.nodes[node])

        # if merge_nodes is False, create parent of tail join node from head
        # join node
        if not adapter._merge_nodes:
            # add head join node from head ontology to tail ontology subtree
            # as parent of tail join node
            tail_ontology_subtree.add_node(
                head_join_node,
                **self._head_ontology.get_nx_graph().nodes[head_join_node],
            )
            tail_ontology_subtree.add_edge(tail_join_node, head_join_node)

        # else rename tail join node to match head join node if necessary
        elif tail_join_node != head_join_node:
            tail_ontology_subtree = nx.relabel_nodes(tail_ontology_subtree, {tail_join_node: head_join_node})

        # combine head ontology and tail subtree
        self._nx_graph = nx.compose(self._nx_graph, tail_ontology_subtree)

    def _extend_ontology(self) -> None:
        """Add the user extensions to the ontology.

        Tries to find the parent in the ontology, adds it if necessary, and adds
        the child and a directed edge from child to parent. Can handle multiple
        parents.
        """
        if not self._nx_graph:
            self._nx_graph = self._head_ontology.get_nx_graph().copy()

        for key, value in self.mapping.extended_schema.items():
            # If this class is either a root or a synonym.
            if not value.get("is_a"):
                # If it is a synonym.
                if self._nx_graph.has_node(value.get("synonym_for")):
                    continue

                # If this class is in the schema, but not in the loaded vocabulary.
                if not self._nx_graph.has_node(key):
                    msg = (
                        f"Node {key} not found in ontology, but also has no inheritance definition. Please check your "
                        "schema for spelling errors, first letter not in lower case, use of underscores, a missing "
                        "`is_a` definition (SubClassOf a root node), or missing labels in class or super-classes."
                    )
                    logger.error(msg)
                    raise ValueError(msg)

                # It is a root and it is in the loaded vocabulary.
                continue

            # It is not a root.
            parents = to_list(value.get("is_a"))
            child = key

            while parents:
                parent = parents.pop(0)

                if parent not in self._nx_graph.nodes:
                    self._nx_graph.add_node(parent)
                    self._nx_graph.nodes[parent]["label"] = sentencecase_to_pascalcase(parent)

                    # mark parent as user extension
                    self._nx_graph.nodes[parent]["user_extension"] = True
                    self._extended_nodes.add(parent)

                if child not in self._nx_graph.nodes:
                    self._nx_graph.add_node(child)
                    self._nx_graph.nodes[child]["label"] = sentencecase_to_pascalcase(child)

                    # mark child as user extension
                    self._nx_graph.nodes[child]["user_extension"] = True
                    self._extended_nodes.add(child)

                self._nx_graph.add_edge(child, parent)

                child = parent

    def _connect_biolink_classes(self) -> None:
        """Experimental: Adds edges from disjoint classes to the entity node."""
        if not self._nx_graph:
            self._nx_graph = self._head_ontology.get_nx_graph().copy()

        if "entity" not in self._nx_graph.nodes:
            return

        # biolink classes that are disjoint from entity
        disjoint_classes = [
            "frequency qualifier mixin",
            "chemical entity to entity association mixin",
            "ontology class",
            "relationship quantifier",
            "physical essence or occurrent",
            "gene or gene product",
            "subject of investigation",
        ]

        for node in disjoint_classes:
            if not self._nx_graph.nodes.get(node):
                self._nx_graph.add_node(node)
                self._nx_graph.nodes[node]["label"] = sentencecase_to_pascalcase(node)

            self._nx_graph.add_edge(node, "entity")

    def _add_properties(self) -> None:
        """Add properties to the ontology.

        For each entity in the mapping, update the ontology with the properties
        specified in the mapping. Updates synonym information in the graph,
        setting the synonym as the primary node label.
        """
        for key, value in self.mapping.extended_schema.items():
            if key in self._nx_graph.nodes:
                self._nx_graph.nodes[key].update(value)

            if value.get("synonym_for"):
                # change node label to synonym
                if value["synonym_for"] not in self._nx_graph.nodes:
                    msg = f"Node {value['synonym_for']} not found in ontology."
                    logger.error(msg)
                    raise ValueError(msg)

                self._nx_graph = nx.relabel_nodes(self._nx_graph, {value["synonym_for"]: key})

    def get_ancestors(self, node_label: str) -> list:
        """Get the ancestors of a node in the ontology.

        Args:
        ----
            node_label (str): The label of the node in the ontology.

        Returns:
        -------
            list: A list of the ancestors of the node.

        """
        return nx.dfs_tree(self._nx_graph, node_label)

    def show_ontology_structure(self, to_disk: str = None, full: bool = False):
        """Show the ontology structure using treelib or write to GRAPHML file.

        Args:
        ----
            to_disk (str): If specified, the ontology structure will be saved
                to disk as a GRAPHML file at the location (directory) specified
                by the `to_disk` string, to be opened in your favourite graph
                visualisation tool.

            full (bool): If True, the full ontology structure will be shown,
                including all nodes and edges. If False, only the nodes and
                edges that are relevant to the extended schema will be shown.

        """
        if not full and not self.mapping.extended_schema:
            msg = (
                "You are attempting to visualise a subset of the loaded"
                "ontology, but have not provided a schema configuration. "
                "To display a partial ontology graph, please provide a schema "
                "configuration file; to visualise the full graph, please use "
                "the parameter `full = True`.",
            )
            logger.error(msg)
            raise ValueError(msg)

        if not self._nx_graph:
            msg = "Ontology not loaded."
            logger.error(msg)
            raise ValueError(msg)

        if not self._tail_ontologies:
            msg = f"Showing ontology structure based on {self._head_ontology._ontology_file}"

        else:
            msg = f"Showing ontology structure based on {len(self._tail_ontology_meta) + 1} ontologies: "

        logger.info(msg)

        if not full:
            # set of leaves and their intermediate parents up to the root
            filter_nodes = set(self.mapping.extended_schema.keys())

            for node in self.mapping.extended_schema.keys():
                filter_nodes.update(self.get_ancestors(node).nodes)

            # filter graph
            G = self._nx_graph.subgraph(filter_nodes)

        else:
            G = self._nx_graph

        if not to_disk:
            # create tree
            tree = create_tree_visualisation(G)

            # add synonym information
            for node in self.mapping.extended_schema:
                if not isinstance(self.mapping.extended_schema[node], dict):
                    continue
                if self.mapping.extended_schema[node].get("synonym_for"):
                    tree.nodes[node].tag = f"{node} = {self.mapping.extended_schema[node].get('synonym_for')}"

            logger.info(f"\n{tree}")

            return tree

        else:
            # convert lists/dicts to strings for vis only
            for node in G.nodes:
                # rename node and use former id as label
                label = G.nodes[node].get("label")

                if not label:
                    label = node

                G = nx.relabel_nodes(G, {node: label})
                G.nodes[label]["label"] = node

                for attrib in G.nodes[label]:
                    if type(G.nodes[label][attrib]) in [list, dict]:
                        G.nodes[label][attrib] = str(G.nodes[label][attrib])

            path = os.path.join(to_disk, "ontology_structure.graphml")

            logger.info(f"Writing ontology structure to {path}.")

            nx.write_graphml(G, path)

            return True

    def get_dict(self) -> dict:
        """Return a dictionary representation of the ontology.

        The dictionary is compatible with a BioCypher node for compatibility
        with the Neo4j driver.
        """
        d = {
            "node_id": self._get_current_id(),
            "node_label": "BioCypher",
            "properties": {
                "schema": "self.ontology_mapping.extended_schema",
            },
        }

        return d

    def _get_current_id(self):
        """Instantiate a version ID for the current session.

        For now does simple versioning using datetime.

        Can later implement incremental versioning, versioning from
        config file, or manual specification via argument.
        """
        now = datetime.now()
        return now.strftime("v%Y%m%d-%H%M%S")

    def get_rdf_graph(self):
        """Return the merged RDF graph.

        Return the merged graph of all loaded ontologies (head and tails).
        """
        graph = self._head_ontology.get_rdf_graph()
        if self._tail_ontologies:
            for key, onto in self._tail_ontologies.items():
                assert type(onto) == OntologyAdapter
                # RDFlib uses the + operator for merging.
                graph += onto.get_rdf_graph()
        return graph

__init__(head_ontology, ontology_mapping=None, tail_ontologies=None)

Initialize the Ontology class.


head_ontology (OntologyAdapter): The head ontology.

tail_ontologies (list): A list of OntologyAdapters that will be
    added to the head ontology. Defaults to None.
Source code in biocypher/_ontology.py
def __init__(
    self,
    head_ontology: dict,
    ontology_mapping: Optional["OntologyMapping"] = None,
    tail_ontologies: dict | None = None,
):
    """Initialize the Ontology class.

    Args:
    ----
        head_ontology (OntologyAdapter): The head ontology.

        tail_ontologies (list): A list of OntologyAdapters that will be
            added to the head ontology. Defaults to None.

    """
    self._head_ontology_meta = head_ontology
    self.mapping = ontology_mapping
    self._tail_ontology_meta = tail_ontologies

    self._tail_ontologies = None
    self._nx_graph = None

    # keep track of nodes that have been extended
    self._extended_nodes = set()

    self._main()

_add_properties()

Add properties to the ontology.

For each entity in the mapping, update the ontology with the properties specified in the mapping. Updates synonym information in the graph, setting the synonym as the primary node label.

Source code in biocypher/_ontology.py
def _add_properties(self) -> None:
    """Add properties to the ontology.

    For each entity in the mapping, update the ontology with the properties
    specified in the mapping. Updates synonym information in the graph,
    setting the synonym as the primary node label.
    """
    for key, value in self.mapping.extended_schema.items():
        if key in self._nx_graph.nodes:
            self._nx_graph.nodes[key].update(value)

        if value.get("synonym_for"):
            # change node label to synonym
            if value["synonym_for"] not in self._nx_graph.nodes:
                msg = f"Node {value['synonym_for']} not found in ontology."
                logger.error(msg)
                raise ValueError(msg)

            self._nx_graph = nx.relabel_nodes(self._nx_graph, {value["synonym_for"]: key})

Experimental: Adds edges from disjoint classes to the entity node.

Source code in biocypher/_ontology.py
def _connect_biolink_classes(self) -> None:
    """Experimental: Adds edges from disjoint classes to the entity node."""
    if not self._nx_graph:
        self._nx_graph = self._head_ontology.get_nx_graph().copy()

    if "entity" not in self._nx_graph.nodes:
        return

    # biolink classes that are disjoint from entity
    disjoint_classes = [
        "frequency qualifier mixin",
        "chemical entity to entity association mixin",
        "ontology class",
        "relationship quantifier",
        "physical essence or occurrent",
        "gene or gene product",
        "subject of investigation",
    ]

    for node in disjoint_classes:
        if not self._nx_graph.nodes.get(node):
            self._nx_graph.add_node(node)
            self._nx_graph.nodes[node]["label"] = sentencecase_to_pascalcase(node)

        self._nx_graph.add_edge(node, "entity")

_extend_ontology()

Add the user extensions to the ontology.

Tries to find the parent in the ontology, adds it if necessary, and adds the child and a directed edge from child to parent. Can handle multiple parents.

Source code in biocypher/_ontology.py
def _extend_ontology(self) -> None:
    """Add the user extensions to the ontology.

    Tries to find the parent in the ontology, adds it if necessary, and adds
    the child and a directed edge from child to parent. Can handle multiple
    parents.
    """
    if not self._nx_graph:
        self._nx_graph = self._head_ontology.get_nx_graph().copy()

    for key, value in self.mapping.extended_schema.items():
        # If this class is either a root or a synonym.
        if not value.get("is_a"):
            # If it is a synonym.
            if self._nx_graph.has_node(value.get("synonym_for")):
                continue

            # If this class is in the schema, but not in the loaded vocabulary.
            if not self._nx_graph.has_node(key):
                msg = (
                    f"Node {key} not found in ontology, but also has no inheritance definition. Please check your "
                    "schema for spelling errors, first letter not in lower case, use of underscores, a missing "
                    "`is_a` definition (SubClassOf a root node), or missing labels in class or super-classes."
                )
                logger.error(msg)
                raise ValueError(msg)

            # It is a root and it is in the loaded vocabulary.
            continue

        # It is not a root.
        parents = to_list(value.get("is_a"))
        child = key

        while parents:
            parent = parents.pop(0)

            if parent not in self._nx_graph.nodes:
                self._nx_graph.add_node(parent)
                self._nx_graph.nodes[parent]["label"] = sentencecase_to_pascalcase(parent)

                # mark parent as user extension
                self._nx_graph.nodes[parent]["user_extension"] = True
                self._extended_nodes.add(parent)

            if child not in self._nx_graph.nodes:
                self._nx_graph.add_node(child)
                self._nx_graph.nodes[child]["label"] = sentencecase_to_pascalcase(child)

                # mark child as user extension
                self._nx_graph.nodes[child]["user_extension"] = True
                self._extended_nodes.add(child)

            self._nx_graph.add_edge(child, parent)

            child = parent

_get_current_id()

Instantiate a version ID for the current session.

For now does simple versioning using datetime.

Can later implement incremental versioning, versioning from config file, or manual specification via argument.

Source code in biocypher/_ontology.py
def _get_current_id(self):
    """Instantiate a version ID for the current session.

    For now does simple versioning using datetime.

    Can later implement incremental versioning, versioning from
    config file, or manual specification via argument.
    """
    now = datetime.now()
    return now.strftime("v%Y%m%d-%H%M%S")

_get_head_join_node(adapter)

Try to find the head join node of the given ontology adapter.

Find the node in the head ontology that is the head join node. If the join node is not found, the method will raise an error.


adapter (OntologyAdapter): The ontology adapter of which to find the
    join node in the head ontology.

str: The head join node in the head ontology.

ValueError: If the head join node is not found in the head ontology.
Source code in biocypher/_ontology.py
def _get_head_join_node(self, adapter: OntologyAdapter) -> str:
    """Try to find the head join node of the given ontology adapter.

    Find the node in the head ontology that is the head join node. If the
    join node is not found, the method will raise an error.

    Args:
    ----
        adapter (OntologyAdapter): The ontology adapter of which to find the
            join node in the head ontology.

    Returns:
    -------
        str: The head join node in the head ontology.

    Raises:
    ------
        ValueError: If the head join node is not found in the head ontology.

    """
    head_join_node = None
    user_defined_head_join_node_label = adapter.get_head_join_node()
    head_join_node_label_in_bc_format = to_lower_sentence_case(user_defined_head_join_node_label.replace("_", " "))

    if self._head_ontology._switch_label_and_id:
        head_join_node = head_join_node_label_in_bc_format
    elif not self._head_ontology._switch_label_and_id:
        for node_id, data in self._head_ontology.get_nx_graph().nodes(data=True):
            if "label" in data and data["label"] == head_join_node_label_in_bc_format:
                head_join_node = node_id
                break

    if head_join_node not in self._head_ontology.get_nx_graph().nodes:
        head_ontology = self._head_ontology._rdf_to_nx(
            self._head_ontology.get_rdf_graph(),
            self._head_ontology._root_label,
            self._head_ontology._switch_label_and_id,
            rename_nodes=False,
        )
        msg = (
            f"Head join node '{head_join_node}' not found in head ontology. "
            f"The head ontology contains the following nodes: {head_ontology.nodes}."
        )
        logger.error(msg)
        raise ValueError(msg)
    return head_join_node

_join_ontologies(adapter, head_join_node)

Join the present ontologies.

Join two ontologies by adding the tail ontology as a subgraph to the head ontology at the specified join nodes.


adapter (OntologyAdapter): The ontology adapter of the tail ontology
    to be added to the head ontology.
Source code in biocypher/_ontology.py
def _join_ontologies(self, adapter: OntologyAdapter, head_join_node) -> None:
    """Join the present ontologies.

    Join two ontologies by adding the tail ontology as a subgraph to the
    head ontology at the specified join nodes.

    Args:
    ----
        adapter (OntologyAdapter): The ontology adapter of the tail ontology
            to be added to the head ontology.

    """
    if not self._nx_graph:
        self._nx_graph = self._head_ontology.get_nx_graph().copy()

    tail_join_node = adapter.get_root_node()
    tail_ontology = adapter.get_nx_graph()

    # subtree of tail ontology at join node
    tail_ontology_subtree = nx.dfs_tree(tail_ontology.reverse(), tail_join_node).reverse()

    # transfer node attributes from tail ontology to subtree
    for node in tail_ontology_subtree.nodes:
        tail_ontology_subtree.nodes[node].update(tail_ontology.nodes[node])

    # if merge_nodes is False, create parent of tail join node from head
    # join node
    if not adapter._merge_nodes:
        # add head join node from head ontology to tail ontology subtree
        # as parent of tail join node
        tail_ontology_subtree.add_node(
            head_join_node,
            **self._head_ontology.get_nx_graph().nodes[head_join_node],
        )
        tail_ontology_subtree.add_edge(tail_join_node, head_join_node)

    # else rename tail join node to match head join node if necessary
    elif tail_join_node != head_join_node:
        tail_ontology_subtree = nx.relabel_nodes(tail_ontology_subtree, {tail_join_node: head_join_node})

    # combine head ontology and tail subtree
    self._nx_graph = nx.compose(self._nx_graph, tail_ontology_subtree)

_load_ontologies()

For each ontology, load the OntologyAdapter object.

Store it as an instance variable (head) or in an instance dictionary (tail).

Source code in biocypher/_ontology.py
def _load_ontologies(self) -> None:
    """For each ontology, load the OntologyAdapter object.

    Store it as an instance variable (head) or in an instance dictionary
    (tail).
    """
    logger.info("Loading ontologies...")

    self._head_ontology = OntologyAdapter(
        ontology_file=self._head_ontology_meta["url"],
        root_label=self._head_ontology_meta["root_node"],
        ontology_file_format=self._head_ontology_meta.get("format", None),
        switch_label_and_id=self._head_ontology_meta.get("switch_label_and_id", True),
    )

    if self._tail_ontology_meta:
        self._tail_ontologies = {}
        for key, value in self._tail_ontology_meta.items():
            self._tail_ontologies[key] = OntologyAdapter(
                ontology_file=value["url"],
                root_label=value["tail_join_node"],
                head_join_node_label=value["head_join_node"],
                ontology_file_format=value.get("format", None),
                merge_nodes=value.get("merge_nodes", True),
                switch_label_and_id=value.get("switch_label_and_id", True),
            )

_main()

Instantiate the ontology.

Loads the ontologies, joins them, and returns the hybrid ontology. Loads only the head ontology if nothing else is given. Adds user extensions and properties from the mapping.

Source code in biocypher/_ontology.py
def _main(self) -> None:
    """Instantiate the ontology.

    Loads the ontologies, joins them, and returns the hybrid ontology.
    Loads only the head ontology if nothing else is given. Adds user
    extensions and properties from the mapping.
    """
    self._load_ontologies()

    if self._tail_ontologies:
        for adapter in self._tail_ontologies.values():
            head_join_node = self._get_head_join_node(adapter)
            self._join_ontologies(adapter, head_join_node)
    else:
        self._nx_graph = self._head_ontology.get_nx_graph()

    if self.mapping:
        self._extend_ontology()

        # experimental: add connections of disjoint classes to entity
        # self._connect_biolink_classes()

        self._add_properties()

get_ancestors(node_label)

Get the ancestors of a node in the ontology.


node_label (str): The label of the node in the ontology.

list: A list of the ancestors of the node.
Source code in biocypher/_ontology.py
def get_ancestors(self, node_label: str) -> list:
    """Get the ancestors of a node in the ontology.

    Args:
    ----
        node_label (str): The label of the node in the ontology.

    Returns:
    -------
        list: A list of the ancestors of the node.

    """
    return nx.dfs_tree(self._nx_graph, node_label)

get_dict()

Return a dictionary representation of the ontology.

The dictionary is compatible with a BioCypher node for compatibility with the Neo4j driver.

Source code in biocypher/_ontology.py
def get_dict(self) -> dict:
    """Return a dictionary representation of the ontology.

    The dictionary is compatible with a BioCypher node for compatibility
    with the Neo4j driver.
    """
    d = {
        "node_id": self._get_current_id(),
        "node_label": "BioCypher",
        "properties": {
            "schema": "self.ontology_mapping.extended_schema",
        },
    }

    return d

get_rdf_graph()

Return the merged RDF graph.

Return the merged graph of all loaded ontologies (head and tails).

Source code in biocypher/_ontology.py
def get_rdf_graph(self):
    """Return the merged RDF graph.

    Return the merged graph of all loaded ontologies (head and tails).
    """
    graph = self._head_ontology.get_rdf_graph()
    if self._tail_ontologies:
        for key, onto in self._tail_ontologies.items():
            assert type(onto) == OntologyAdapter
            # RDFlib uses the + operator for merging.
            graph += onto.get_rdf_graph()
    return graph

show_ontology_structure(to_disk=None, full=False)

Show the ontology structure using treelib or write to GRAPHML file.


to_disk (str): If specified, the ontology structure will be saved
    to disk as a GRAPHML file at the location (directory) specified
    by the `to_disk` string, to be opened in your favourite graph
    visualisation tool.

full (bool): If True, the full ontology structure will be shown,
    including all nodes and edges. If False, only the nodes and
    edges that are relevant to the extended schema will be shown.
Source code in biocypher/_ontology.py
def show_ontology_structure(self, to_disk: str = None, full: bool = False):
    """Show the ontology structure using treelib or write to GRAPHML file.

    Args:
    ----
        to_disk (str): If specified, the ontology structure will be saved
            to disk as a GRAPHML file at the location (directory) specified
            by the `to_disk` string, to be opened in your favourite graph
            visualisation tool.

        full (bool): If True, the full ontology structure will be shown,
            including all nodes and edges. If False, only the nodes and
            edges that are relevant to the extended schema will be shown.

    """
    if not full and not self.mapping.extended_schema:
        msg = (
            "You are attempting to visualise a subset of the loaded"
            "ontology, but have not provided a schema configuration. "
            "To display a partial ontology graph, please provide a schema "
            "configuration file; to visualise the full graph, please use "
            "the parameter `full = True`.",
        )
        logger.error(msg)
        raise ValueError(msg)

    if not self._nx_graph:
        msg = "Ontology not loaded."
        logger.error(msg)
        raise ValueError(msg)

    if not self._tail_ontologies:
        msg = f"Showing ontology structure based on {self._head_ontology._ontology_file}"

    else:
        msg = f"Showing ontology structure based on {len(self._tail_ontology_meta) + 1} ontologies: "

    logger.info(msg)

    if not full:
        # set of leaves and their intermediate parents up to the root
        filter_nodes = set(self.mapping.extended_schema.keys())

        for node in self.mapping.extended_schema.keys():
            filter_nodes.update(self.get_ancestors(node).nodes)

        # filter graph
        G = self._nx_graph.subgraph(filter_nodes)

    else:
        G = self._nx_graph

    if not to_disk:
        # create tree
        tree = create_tree_visualisation(G)

        # add synonym information
        for node in self.mapping.extended_schema:
            if not isinstance(self.mapping.extended_schema[node], dict):
                continue
            if self.mapping.extended_schema[node].get("synonym_for"):
                tree.nodes[node].tag = f"{node} = {self.mapping.extended_schema[node].get('synonym_for')}"

        logger.info(f"\n{tree}")

        return tree

    else:
        # convert lists/dicts to strings for vis only
        for node in G.nodes:
            # rename node and use former id as label
            label = G.nodes[node].get("label")

            if not label:
                label = node

            G = nx.relabel_nodes(G, {node: label})
            G.nodes[label]["label"] = node

            for attrib in G.nodes[label]:
                if type(G.nodes[label][attrib]) in [list, dict]:
                    G.nodes[label][attrib] = str(G.nodes[label][attrib])

        path = os.path.join(to_disk, "ontology_structure.graphml")

        logger.info(f"Writing ontology structure to {path}.")

        nx.write_graphml(G, path)

        return True

Ontology Adapter

Class that represents an ontology to be used in the Biocypher framework.

Can read from a variety of formats, including OWL, OBO, and RDF/XML. The ontology is represented by a networkx.DiGraph object; an RDFlib graph is also kept. By default, the DiGraph reverses the label and identifier of the nodes, such that the node name in the graph is the human-readable label. The edges are oriented from child to parent. Labels are formatted in lower sentence case and underscores are replaced by spaces. Identifiers are taken as defined and the prefixes are removed by default.

Source code in biocypher/_ontology.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
class OntologyAdapter:
    """Class that represents an ontology to be used in the Biocypher framework.

    Can read from a variety of formats, including OWL, OBO, and RDF/XML. The
    ontology is represented by a networkx.DiGraph object; an RDFlib graph is
    also kept. By default, the DiGraph reverses the label and identifier of the
    nodes, such that the node name in the graph is the human-readable label. The
    edges are oriented from child to parent. Labels are formatted in lower
    sentence case and underscores are replaced by spaces. Identifiers are taken
    as defined and the prefixes are removed by default.
    """

    def __init__(
        self,
        ontology_file: str,
        root_label: str,
        ontology_file_format: str | None = None,
        head_join_node_label: str | None = None,
        merge_nodes: bool | None = True,
        switch_label_and_id: bool = True,
        remove_prefixes: bool = True,
    ):
        """Initialize the OntologyAdapter class.

        Args:
        ----
            ontology_file (str): Path to the ontology file. Can be local or
                remote.

            root_label (str): The label of the root node in the ontology. In
                case of a tail ontology, this is the tail join node.

            ontology_file_format (str): The format of the ontology file (e.g. "application/rdf+xml")
                If format is not passed, it is determined automatically.

            head_join_node_label (str): Optional variable to store the label of the
                node in the head ontology that should be used to join to the
                root node of the tail ontology. Defaults to None.

            merge_nodes (bool): If True, head and tail join nodes will be
                merged, using the label of the head join node. If False, the
                tail join node will be attached as a child of the head join
                node.

            switch_label_and_id (bool): If True, the node names in the graph will be
                the human-readable labels. If False, the node names will be the
                identifiers. Defaults to True.

            remove_prefixes (bool): If True, the prefixes of the identifiers will
                be removed. Defaults to True.

        """
        logger.info(f"Instantiating OntologyAdapter class for {ontology_file}.")

        self._ontology_file = ontology_file
        self._root_label = root_label
        self._format = ontology_file_format
        self._merge_nodes = merge_nodes
        self._head_join_node = head_join_node_label
        self._switch_label_and_id = switch_label_and_id
        self._remove_prefixes = remove_prefixes

        self._rdf_graph = self._load_rdf_graph(ontology_file)

        self._nx_graph = self._rdf_to_nx(self._rdf_graph, root_label, switch_label_and_id)

    def _rdf_to_nx(
        self,
        _rdf_graph: rdflib.Graph,
        root_label: str,
        switch_label_and_id: bool,
        rename_nodes: bool = True,
    ) -> nx.DiGraph:
        one_to_one_triples, one_to_many_dict = self._get_relevant_rdf_triples(_rdf_graph)
        nx_graph = self._convert_to_nx(one_to_one_triples, one_to_many_dict)
        nx_graph = self._add_labels_to_nodes(nx_graph, switch_label_and_id)
        nx_graph = self._change_nodes_to_biocypher_format(nx_graph, switch_label_and_id, rename_nodes)
        nx_graph = self._get_all_ancestors(nx_graph, root_label, switch_label_and_id, rename_nodes)
        return nx.DiGraph(nx_graph)

    def _get_relevant_rdf_triples(self, g: rdflib.Graph) -> tuple:
        one_to_one_inheritance_graph = self._get_one_to_one_inheritance_triples(g)
        intersection = self._get_multiple_inheritance_dict(g)
        return one_to_one_inheritance_graph, intersection

    def _get_one_to_one_inheritance_triples(self, g: rdflib.Graph) -> rdflib.Graph:
        """Get the one to one inheritance triples from the RDF graph.

        Args:
        ----
            g (rdflib.Graph): The RDF graph

        Returns:
        -------
            rdflib.Graph: The one to one inheritance graph

        """
        one_to_one_inheritance_graph = Graph()
        # for s, p, o in g.triples((None, rdflib.RDFS.subClassOf, None)):
        for s, p, o in chain(
            g.triples((None, rdflib.RDFS.subClassOf, None)),  # Node classes
            g.triples((None, rdflib.RDF.type, rdflib.RDFS.Class)),  # Root classes
            g.triples((None, rdflib.RDFS.subPropertyOf, None)),  # OWL "edges" classes
            g.triples((None, rdflib.RDF.type, rdflib.OWL.ObjectProperty)),  # OWL "edges" root classes
        ):
            if self.has_label(s, g):
                one_to_one_inheritance_graph.add((s, p, o))
        return one_to_one_inheritance_graph

    def _get_multiple_inheritance_dict(self, g: rdflib.Graph) -> dict:
        """Get the multiple inheritance dictionary from the RDF graph.

        Args:
        ----
            g (rdflib.Graph): The RDF graph

        Returns:
        -------
            dict: The multiple inheritance dictionary

        """
        multiple_inheritance = g.triples((None, rdflib.OWL.intersectionOf, None))
        intersection = {}
        for (
            node,
            has_multiple_parents,
            first_node_of_intersection_list,
        ) in multiple_inheritance:
            parents = self._retrieve_rdf_linked_list(first_node_of_intersection_list)
            child_name = None
            for s_, _, _ in chain(
                g.triples((None, rdflib.RDFS.subClassOf, node)),
                g.triples((None, rdflib.RDFS.subPropertyOf, node)),
            ):
                child_name = s_

            # Handle Snomed CT post coordinated expressions
            if not child_name:
                for s_, _, _ in g.triples((None, rdflib.OWL.equivalentClass, node)):
                    child_name = s_

            if child_name:
                intersection[node] = {
                    "child_name": child_name,
                    "parent_node_names": parents,
                }
        return intersection

    def has_label(self, node: rdflib.URIRef, g: rdflib.Graph) -> bool:
        """Check if the node has a label in the graph.

        Args:
        ----
            node (rdflib.URIRef): The node to check
            g (rdflib.Graph): The graph to check in
        Returns:
            bool: True if the node has a label, False otherwise

        """
        return (node, rdflib.RDFS.label, None) in g

    def _retrieve_rdf_linked_list(self, subject: rdflib.URIRef) -> list:
        """Recursively retrieve a linked list from RDF.

        Example RDF list with the items [item1, item2]:
        list_node - first -> item1
        list_node - rest -> list_node2
        list_node2 - first -> item2
        list_node2 - rest -> nil

        Args:
        ----
            subject (rdflib.URIRef): One list_node of the RDF list

        Returns:
        -------
            list: The items of the RDF list

        """
        g = self._rdf_graph
        rdf_list = []
        for s, p, o in g.triples((subject, rdflib.RDF.first, None)):
            rdf_list.append(o)
        for s, p, o in g.triples((subject, rdflib.RDF.rest, None)):
            if o != rdflib.RDF.nil:
                rdf_list.extend(self._retrieve_rdf_linked_list(o))
        return rdf_list

    def _convert_to_nx(self, one_to_one: rdflib.Graph, one_to_many: dict) -> nx.DiGraph:
        """Convert the one to one and one to many inheritance graphs to networkx.

        Args:
        ----
            one_to_one (rdflib.Graph): The one to one inheritance graph
            one_to_many (dict): The one to many inheritance dictionary

        Returns:
        -------
            nx.DiGraph: The networkx graph

        """
        nx_graph = rdflib_to_networkx_digraph(one_to_one, edge_attrs=lambda s, p, o: {}, calc_weights=False)
        for key, value in one_to_many.items():
            nx_graph.add_edges_from([(value["child_name"], parent) for parent in value["parent_node_names"]])
            if key in nx_graph.nodes:
                nx_graph.remove_node(key)
        return nx_graph

    def _add_labels_to_nodes(self, nx_graph: nx.DiGraph, switch_label_and_id: bool) -> nx.DiGraph:
        """Add labels to the nodes in the networkx graph.

        Args:
        ----
            nx_graph (nx.DiGraph): The networkx graph
            switch_label_and_id (bool): If True, id and label are switched

        Returns:
        -------
            nx.DiGraph: The networkx graph with labels

        """
        for node in list(nx_graph.nodes):
            nx_id, nx_label = self._get_nx_id_and_label(node, switch_label_and_id)
            if nx_id == "none":
                # remove node if it has no id
                nx_graph.remove_node(node)
                continue

            nx_graph.nodes[node]["label"] = nx_label
        return nx_graph

    def _change_nodes_to_biocypher_format(
        self,
        nx_graph: nx.DiGraph,
        switch_label_and_id: bool,
        rename_nodes: bool = True,
    ) -> nx.DiGraph:
        """Change the nodes in the networkx graph to BioCypher format.

        This involves:
            - removing the prefix of the identifier
            - switching the id and label if requested
            - adapting the labels (replace _ with space and convert to lower
                sentence case)
        Args:
        ----
            nx_graph (nx.DiGraph): The networkx graph
            switch_label_and_id (bool): If True, id and label are switched
            rename_nodes (bool): If True, the nodes are renamed

        Returns:
        -------
            nx.DiGraph: The networkx ontology graph in BioCypher format

        """
        mapping = {
            node: self._get_nx_id_and_label(node, switch_label_and_id, rename_nodes)[0] for node in nx_graph.nodes
        }
        renamed = nx.relabel_nodes(nx_graph, mapping, copy=False)
        return renamed

    def _get_all_ancestors(
        self,
        renamed: nx.DiGraph,
        root_label: str,
        switch_label_and_id: bool,
        rename_nodes: bool = True,
    ) -> nx.DiGraph:
        """Get all ancestors of the root node in the networkx graph.

        Args:
        ----
            renamed (nx.DiGraph): The renamed networkx graph
            root_label (str): The label of the root node in the ontology
            switch_label_and_id (bool): If True, id and label are switched
            rename_nodes (bool): If True, the nodes are renamed

        Returns:
        -------
            nx.DiGraph: The filtered networkx graph

        """
        root = self._get_nx_id_and_label(
            self._find_root_label(self._rdf_graph, root_label),
            switch_label_and_id,
            rename_nodes,
        )[0]
        ancestors = nx.ancestors(renamed, root)
        ancestors.add(root)
        filtered_graph = renamed.subgraph(ancestors)
        return filtered_graph

    def _get_nx_id_and_label(self, node, switch_id_and_label: bool, rename_nodes: bool = True) -> tuple[str, str]:
        """Rename node id and label for nx graph.

        Args:
        ----
            node (str): The node to rename
            switch_id_and_label (bool): If True, switch id and label

        Returns:
        -------
            tuple[str, str]: The renamed node id and label

        """
        node_id_str = self._remove_prefix(str(node))
        node_label_str = str(self._rdf_graph.value(node, rdflib.RDFS.label))
        if rename_nodes:
            node_label_str = node_label_str.replace("_", " ")
            node_label_str = to_lower_sentence_case(node_label_str)
        nx_id = node_label_str if switch_id_and_label else node_id_str
        nx_label = node_id_str if switch_id_and_label else node_label_str
        return nx_id, nx_label

    def _find_root_label(self, g, root_label):
        # Loop through all labels in the ontology
        for label_subject, _, label_in_ontology in g.triples((None, rdflib.RDFS.label, None)):
            # If the label is the root label, set the root node to the label's subject
            if str(label_in_ontology) == root_label:
                root = label_subject
                break
        else:
            labels_in_ontology = []
            for label_subject, _, label_in_ontology in g.triples((None, rdflib.RDFS.label, None)):
                labels_in_ontology.append(str(label_in_ontology))
            msg = (
                f"Could not find root node with label '{root_label}'. "
                f"The ontology contains the following labels: {labels_in_ontology}"
            )
            logger.error(msg)
            raise ValueError(msg)
        return root

    def _remove_prefix(self, uri: str) -> str:
        """Remove the prefix of a URI.

        URIs can contain either "#" or "/" as a separator between the prefix
        and the local name. The prefix is everything before the last separator.

        Args:
        ----
            uri (str): The URI to remove the prefix from

        Returns:
        -------
            str: The URI without the prefix

        """
        if self._remove_prefixes:
            return uri.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
        else:
            return uri

    def _load_rdf_graph(self, ontology_file):
        """Load the ontology into an RDFlib graph.

        The ontology file can be in OWL, OBO, or RDF/XML format.

        Args:
        ----
            ontology_file (str): The path to the ontology file

        Returns:
        -------
            rdflib.Graph: The RDFlib graph

        """
        g = rdflib.Graph()
        g.parse(ontology_file, format=self._get_format(ontology_file))
        return g

    def _get_format(self, ontology_file):
        """Get the format of the ontology file."""
        if self._format:
            if self._format == "owl":
                return "application/rdf+xml"
            elif self._format == "obo":
                raise NotImplementedError("OBO format not yet supported")
            elif self._format == "rdf":
                return "application/rdf+xml"
            elif self._format == "ttl":
                return self._format
            else:
                msg = f"Could not determine format of ontology file {ontology_file}"
                logger.error(msg)
                raise ValueError(msg)

        if ontology_file.endswith(".owl"):
            return "application/rdf+xml"
        elif ontology_file.endswith(".obo"):
            msg = "OBO format not yet supported"
            logger.error(msg)
            raise NotImplementedError(msg)
        elif ontology_file.endswith(".rdf"):
            return "application/rdf+xml"
        elif ontology_file.endswith(".ttl"):
            return "ttl"
        else:
            msg = f"Could not determine format of ontology file {ontology_file}"
            logger.error(msg)
            raise ValueError(msg)

    def get_nx_graph(self):
        """Get the networkx graph representing the ontology."""
        return self._nx_graph

    def get_rdf_graph(self):
        """Get the RDFlib graph representing the ontology."""
        return self._rdf_graph

    def get_root_node(self):
        """Get root node in the ontology.

        Returns
        -------
            root_node: If _switch_label_and_id is True, the root node label is
                returned, otherwise the root node id is returned.

        """
        root_node = None
        root_label = self._root_label.replace("_", " ")

        if self._switch_label_and_id:
            root_node = to_lower_sentence_case(root_label)
        elif not self._switch_label_and_id:
            for node, data in self.get_nx_graph().nodes(data=True):
                if "label" in data and data["label"] == to_lower_sentence_case(root_label):
                    root_node = node
                    break

        return root_node

    def get_ancestors(self, node_label):
        """Get the ancestors of a node in the ontology."""
        return nx.dfs_preorder_nodes(self._nx_graph, node_label)

    def get_head_join_node(self):
        """Get the head join node of the ontology."""
        return self._head_join_node

__init__(ontology_file, root_label, ontology_file_format=None, head_join_node_label=None, merge_nodes=True, switch_label_and_id=True, remove_prefixes=True)

Initialize the OntologyAdapter class.


ontology_file (str): Path to the ontology file. Can be local or
    remote.

root_label (str): The label of the root node in the ontology. In
    case of a tail ontology, this is the tail join node.

ontology_file_format (str): The format of the ontology file (e.g. "application/rdf+xml")
    If format is not passed, it is determined automatically.

head_join_node_label (str): Optional variable to store the label of the
    node in the head ontology that should be used to join to the
    root node of the tail ontology. Defaults to None.

merge_nodes (bool): If True, head and tail join nodes will be
    merged, using the label of the head join node. If False, the
    tail join node will be attached as a child of the head join
    node.

switch_label_and_id (bool): If True, the node names in the graph will be
    the human-readable labels. If False, the node names will be the
    identifiers. Defaults to True.

remove_prefixes (bool): If True, the prefixes of the identifiers will
    be removed. Defaults to True.
Source code in biocypher/_ontology.py
def __init__(
    self,
    ontology_file: str,
    root_label: str,
    ontology_file_format: str | None = None,
    head_join_node_label: str | None = None,
    merge_nodes: bool | None = True,
    switch_label_and_id: bool = True,
    remove_prefixes: bool = True,
):
    """Initialize the OntologyAdapter class.

    Args:
    ----
        ontology_file (str): Path to the ontology file. Can be local or
            remote.

        root_label (str): The label of the root node in the ontology. In
            case of a tail ontology, this is the tail join node.

        ontology_file_format (str): The format of the ontology file (e.g. "application/rdf+xml")
            If format is not passed, it is determined automatically.

        head_join_node_label (str): Optional variable to store the label of the
            node in the head ontology that should be used to join to the
            root node of the tail ontology. Defaults to None.

        merge_nodes (bool): If True, head and tail join nodes will be
            merged, using the label of the head join node. If False, the
            tail join node will be attached as a child of the head join
            node.

        switch_label_and_id (bool): If True, the node names in the graph will be
            the human-readable labels. If False, the node names will be the
            identifiers. Defaults to True.

        remove_prefixes (bool): If True, the prefixes of the identifiers will
            be removed. Defaults to True.

    """
    logger.info(f"Instantiating OntologyAdapter class for {ontology_file}.")

    self._ontology_file = ontology_file
    self._root_label = root_label
    self._format = ontology_file_format
    self._merge_nodes = merge_nodes
    self._head_join_node = head_join_node_label
    self._switch_label_and_id = switch_label_and_id
    self._remove_prefixes = remove_prefixes

    self._rdf_graph = self._load_rdf_graph(ontology_file)

    self._nx_graph = self._rdf_to_nx(self._rdf_graph, root_label, switch_label_and_id)

_add_labels_to_nodes(nx_graph, switch_label_and_id)

Add labels to the nodes in the networkx graph.


nx_graph (nx.DiGraph): The networkx graph
switch_label_and_id (bool): If True, id and label are switched

nx.DiGraph: The networkx graph with labels
Source code in biocypher/_ontology.py
def _add_labels_to_nodes(self, nx_graph: nx.DiGraph, switch_label_and_id: bool) -> nx.DiGraph:
    """Add labels to the nodes in the networkx graph.

    Args:
    ----
        nx_graph (nx.DiGraph): The networkx graph
        switch_label_and_id (bool): If True, id and label are switched

    Returns:
    -------
        nx.DiGraph: The networkx graph with labels

    """
    for node in list(nx_graph.nodes):
        nx_id, nx_label = self._get_nx_id_and_label(node, switch_label_and_id)
        if nx_id == "none":
            # remove node if it has no id
            nx_graph.remove_node(node)
            continue

        nx_graph.nodes[node]["label"] = nx_label
    return nx_graph

_change_nodes_to_biocypher_format(nx_graph, switch_label_and_id, rename_nodes=True)

Change the nodes in the networkx graph to BioCypher format.

This involves
  • removing the prefix of the identifier
  • switching the id and label if requested
  • adapting the labels (replace _ with space and convert to lower sentence case)
Args:
nx_graph (nx.DiGraph): The networkx graph
switch_label_and_id (bool): If True, id and label are switched
rename_nodes (bool): If True, the nodes are renamed

nx.DiGraph: The networkx ontology graph in BioCypher format
Source code in biocypher/_ontology.py
def _change_nodes_to_biocypher_format(
    self,
    nx_graph: nx.DiGraph,
    switch_label_and_id: bool,
    rename_nodes: bool = True,
) -> nx.DiGraph:
    """Change the nodes in the networkx graph to BioCypher format.

    This involves:
        - removing the prefix of the identifier
        - switching the id and label if requested
        - adapting the labels (replace _ with space and convert to lower
            sentence case)
    Args:
    ----
        nx_graph (nx.DiGraph): The networkx graph
        switch_label_and_id (bool): If True, id and label are switched
        rename_nodes (bool): If True, the nodes are renamed

    Returns:
    -------
        nx.DiGraph: The networkx ontology graph in BioCypher format

    """
    mapping = {
        node: self._get_nx_id_and_label(node, switch_label_and_id, rename_nodes)[0] for node in nx_graph.nodes
    }
    renamed = nx.relabel_nodes(nx_graph, mapping, copy=False)
    return renamed

_convert_to_nx(one_to_one, one_to_many)

Convert the one to one and one to many inheritance graphs to networkx.


one_to_one (rdflib.Graph): The one to one inheritance graph
one_to_many (dict): The one to many inheritance dictionary

nx.DiGraph: The networkx graph
Source code in biocypher/_ontology.py
def _convert_to_nx(self, one_to_one: rdflib.Graph, one_to_many: dict) -> nx.DiGraph:
    """Convert the one to one and one to many inheritance graphs to networkx.

    Args:
    ----
        one_to_one (rdflib.Graph): The one to one inheritance graph
        one_to_many (dict): The one to many inheritance dictionary

    Returns:
    -------
        nx.DiGraph: The networkx graph

    """
    nx_graph = rdflib_to_networkx_digraph(one_to_one, edge_attrs=lambda s, p, o: {}, calc_weights=False)
    for key, value in one_to_many.items():
        nx_graph.add_edges_from([(value["child_name"], parent) for parent in value["parent_node_names"]])
        if key in nx_graph.nodes:
            nx_graph.remove_node(key)
    return nx_graph

_get_all_ancestors(renamed, root_label, switch_label_and_id, rename_nodes=True)

Get all ancestors of the root node in the networkx graph.


renamed (nx.DiGraph): The renamed networkx graph
root_label (str): The label of the root node in the ontology
switch_label_and_id (bool): If True, id and label are switched
rename_nodes (bool): If True, the nodes are renamed

nx.DiGraph: The filtered networkx graph
Source code in biocypher/_ontology.py
def _get_all_ancestors(
    self,
    renamed: nx.DiGraph,
    root_label: str,
    switch_label_and_id: bool,
    rename_nodes: bool = True,
) -> nx.DiGraph:
    """Get all ancestors of the root node in the networkx graph.

    Args:
    ----
        renamed (nx.DiGraph): The renamed networkx graph
        root_label (str): The label of the root node in the ontology
        switch_label_and_id (bool): If True, id and label are switched
        rename_nodes (bool): If True, the nodes are renamed

    Returns:
    -------
        nx.DiGraph: The filtered networkx graph

    """
    root = self._get_nx_id_and_label(
        self._find_root_label(self._rdf_graph, root_label),
        switch_label_and_id,
        rename_nodes,
    )[0]
    ancestors = nx.ancestors(renamed, root)
    ancestors.add(root)
    filtered_graph = renamed.subgraph(ancestors)
    return filtered_graph

_get_format(ontology_file)

Get the format of the ontology file.

Source code in biocypher/_ontology.py
def _get_format(self, ontology_file):
    """Get the format of the ontology file."""
    if self._format:
        if self._format == "owl":
            return "application/rdf+xml"
        elif self._format == "obo":
            raise NotImplementedError("OBO format not yet supported")
        elif self._format == "rdf":
            return "application/rdf+xml"
        elif self._format == "ttl":
            return self._format
        else:
            msg = f"Could not determine format of ontology file {ontology_file}"
            logger.error(msg)
            raise ValueError(msg)

    if ontology_file.endswith(".owl"):
        return "application/rdf+xml"
    elif ontology_file.endswith(".obo"):
        msg = "OBO format not yet supported"
        logger.error(msg)
        raise NotImplementedError(msg)
    elif ontology_file.endswith(".rdf"):
        return "application/rdf+xml"
    elif ontology_file.endswith(".ttl"):
        return "ttl"
    else:
        msg = f"Could not determine format of ontology file {ontology_file}"
        logger.error(msg)
        raise ValueError(msg)

_get_multiple_inheritance_dict(g)

Get the multiple inheritance dictionary from the RDF graph.


g (rdflib.Graph): The RDF graph

dict: The multiple inheritance dictionary
Source code in biocypher/_ontology.py
def _get_multiple_inheritance_dict(self, g: rdflib.Graph) -> dict:
    """Get the multiple inheritance dictionary from the RDF graph.

    Args:
    ----
        g (rdflib.Graph): The RDF graph

    Returns:
    -------
        dict: The multiple inheritance dictionary

    """
    multiple_inheritance = g.triples((None, rdflib.OWL.intersectionOf, None))
    intersection = {}
    for (
        node,
        has_multiple_parents,
        first_node_of_intersection_list,
    ) in multiple_inheritance:
        parents = self._retrieve_rdf_linked_list(first_node_of_intersection_list)
        child_name = None
        for s_, _, _ in chain(
            g.triples((None, rdflib.RDFS.subClassOf, node)),
            g.triples((None, rdflib.RDFS.subPropertyOf, node)),
        ):
            child_name = s_

        # Handle Snomed CT post coordinated expressions
        if not child_name:
            for s_, _, _ in g.triples((None, rdflib.OWL.equivalentClass, node)):
                child_name = s_

        if child_name:
            intersection[node] = {
                "child_name": child_name,
                "parent_node_names": parents,
            }
    return intersection

_get_nx_id_and_label(node, switch_id_and_label, rename_nodes=True)

Rename node id and label for nx graph.


node (str): The node to rename
switch_id_and_label (bool): If True, switch id and label

tuple[str, str]: The renamed node id and label
Source code in biocypher/_ontology.py
def _get_nx_id_and_label(self, node, switch_id_and_label: bool, rename_nodes: bool = True) -> tuple[str, str]:
    """Rename node id and label for nx graph.

    Args:
    ----
        node (str): The node to rename
        switch_id_and_label (bool): If True, switch id and label

    Returns:
    -------
        tuple[str, str]: The renamed node id and label

    """
    node_id_str = self._remove_prefix(str(node))
    node_label_str = str(self._rdf_graph.value(node, rdflib.RDFS.label))
    if rename_nodes:
        node_label_str = node_label_str.replace("_", " ")
        node_label_str = to_lower_sentence_case(node_label_str)
    nx_id = node_label_str if switch_id_and_label else node_id_str
    nx_label = node_id_str if switch_id_and_label else node_label_str
    return nx_id, nx_label

_get_one_to_one_inheritance_triples(g)

Get the one to one inheritance triples from the RDF graph.


g (rdflib.Graph): The RDF graph

rdflib.Graph: The one to one inheritance graph
Source code in biocypher/_ontology.py
def _get_one_to_one_inheritance_triples(self, g: rdflib.Graph) -> rdflib.Graph:
    """Get the one to one inheritance triples from the RDF graph.

    Args:
    ----
        g (rdflib.Graph): The RDF graph

    Returns:
    -------
        rdflib.Graph: The one to one inheritance graph

    """
    one_to_one_inheritance_graph = Graph()
    # for s, p, o in g.triples((None, rdflib.RDFS.subClassOf, None)):
    for s, p, o in chain(
        g.triples((None, rdflib.RDFS.subClassOf, None)),  # Node classes
        g.triples((None, rdflib.RDF.type, rdflib.RDFS.Class)),  # Root classes
        g.triples((None, rdflib.RDFS.subPropertyOf, None)),  # OWL "edges" classes
        g.triples((None, rdflib.RDF.type, rdflib.OWL.ObjectProperty)),  # OWL "edges" root classes
    ):
        if self.has_label(s, g):
            one_to_one_inheritance_graph.add((s, p, o))
    return one_to_one_inheritance_graph

_load_rdf_graph(ontology_file)

Load the ontology into an RDFlib graph.

The ontology file can be in OWL, OBO, or RDF/XML format.


ontology_file (str): The path to the ontology file

rdflib.Graph: The RDFlib graph
Source code in biocypher/_ontology.py
def _load_rdf_graph(self, ontology_file):
    """Load the ontology into an RDFlib graph.

    The ontology file can be in OWL, OBO, or RDF/XML format.

    Args:
    ----
        ontology_file (str): The path to the ontology file

    Returns:
    -------
        rdflib.Graph: The RDFlib graph

    """
    g = rdflib.Graph()
    g.parse(ontology_file, format=self._get_format(ontology_file))
    return g

_remove_prefix(uri)

Remove the prefix of a URI.

URIs can contain either "#" or "/" as a separator between the prefix and the local name. The prefix is everything before the last separator.


uri (str): The URI to remove the prefix from

str: The URI without the prefix
Source code in biocypher/_ontology.py
def _remove_prefix(self, uri: str) -> str:
    """Remove the prefix of a URI.

    URIs can contain either "#" or "/" as a separator between the prefix
    and the local name. The prefix is everything before the last separator.

    Args:
    ----
        uri (str): The URI to remove the prefix from

    Returns:
    -------
        str: The URI without the prefix

    """
    if self._remove_prefixes:
        return uri.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
    else:
        return uri

_retrieve_rdf_linked_list(subject)

Recursively retrieve a linked list from RDF.

Example RDF list with the items [item1, item2]: list_node - first -> item1 list_node - rest -> list_node2 list_node2 - first -> item2 list_node2 - rest -> nil


subject (rdflib.URIRef): One list_node of the RDF list

list: The items of the RDF list
Source code in biocypher/_ontology.py
def _retrieve_rdf_linked_list(self, subject: rdflib.URIRef) -> list:
    """Recursively retrieve a linked list from RDF.

    Example RDF list with the items [item1, item2]:
    list_node - first -> item1
    list_node - rest -> list_node2
    list_node2 - first -> item2
    list_node2 - rest -> nil

    Args:
    ----
        subject (rdflib.URIRef): One list_node of the RDF list

    Returns:
    -------
        list: The items of the RDF list

    """
    g = self._rdf_graph
    rdf_list = []
    for s, p, o in g.triples((subject, rdflib.RDF.first, None)):
        rdf_list.append(o)
    for s, p, o in g.triples((subject, rdflib.RDF.rest, None)):
        if o != rdflib.RDF.nil:
            rdf_list.extend(self._retrieve_rdf_linked_list(o))
    return rdf_list

get_ancestors(node_label)

Get the ancestors of a node in the ontology.

Source code in biocypher/_ontology.py
def get_ancestors(self, node_label):
    """Get the ancestors of a node in the ontology."""
    return nx.dfs_preorder_nodes(self._nx_graph, node_label)

get_head_join_node()

Get the head join node of the ontology.

Source code in biocypher/_ontology.py
def get_head_join_node(self):
    """Get the head join node of the ontology."""
    return self._head_join_node

get_nx_graph()

Get the networkx graph representing the ontology.

Source code in biocypher/_ontology.py
def get_nx_graph(self):
    """Get the networkx graph representing the ontology."""
    return self._nx_graph

get_rdf_graph()

Get the RDFlib graph representing the ontology.

Source code in biocypher/_ontology.py
def get_rdf_graph(self):
    """Get the RDFlib graph representing the ontology."""
    return self._rdf_graph

get_root_node()

Get root node in the ontology.

Returns
root_node: If _switch_label_and_id is True, the root node label is
    returned, otherwise the root node id is returned.
Source code in biocypher/_ontology.py
def get_root_node(self):
    """Get root node in the ontology.

    Returns
    -------
        root_node: If _switch_label_and_id is True, the root node label is
            returned, otherwise the root node id is returned.

    """
    root_node = None
    root_label = self._root_label.replace("_", " ")

    if self._switch_label_and_id:
        root_node = to_lower_sentence_case(root_label)
    elif not self._switch_label_and_id:
        for node, data in self.get_nx_graph().nodes(data=True):
            if "label" in data and data["label"] == to_lower_sentence_case(root_label):
                root_node = node
                break

    return root_node

has_label(node, g)

Check if the node has a label in the graph.


node (rdflib.URIRef): The node to check
g (rdflib.Graph): The graph to check in

Returns: bool: True if the node has a label, False otherwise

Source code in biocypher/_ontology.py
def has_label(self, node: rdflib.URIRef, g: rdflib.Graph) -> bool:
    """Check if the node has a label in the graph.

    Args:
    ----
        node (rdflib.URIRef): The node to check
        g (rdflib.Graph): The graph to check in
    Returns:
        bool: True if the node has a label, False otherwise

    """
    return (node, rdflib.RDFS.label, None) in g

Mapping of data inputs to KG ontology

Class to store the ontology mapping and extensions.

Source code in biocypher/_mapping.py
class OntologyMapping:
    """
    Class to store the ontology mapping and extensions.
    """

    def __init__(self, config_file: str = None):
        self.schema = self._read_config(config_file)

        self.extended_schema = self._extend_schema()

    def _read_config(self, config_file: str = None):
        """
        Read the configuration file and store the ontology mapping and extensions.
        """
        if config_file is None:
            schema_config = {}

        # load yaml file from web
        elif config_file.startswith("http"):
            with urlopen(config_file) as f:
                schema_config = yaml.safe_load(f)

        # get graph state from config (assume file is local)
        else:
            with open(config_file, "r") as f:
                schema_config = yaml.safe_load(f)

        return schema_config

    def _extend_schema(self, d: Optional[dict] = None) -> dict:
        """
        Get leaves of the tree hierarchy from the data structure dict
        contained in the `schema_config.yaml`. Creates virtual leaves
        (as children) from entries that provide more than one preferred
        id type (and corresponding inputs).

        Args:
            d:
                Data structure dict from yaml file.

        """

        d = d or self.schema

        extended_schema = dict()

        # first pass: get parent leaves with direct representation in ontology
        for k, v in d.items():
            # k is not an entity
            if "represented_as" not in v:
                continue

            # preferred_id optional: if not provided, use `id`
            if not v.get("preferred_id"):
                v["preferred_id"] = "id"

            # k is an entity that is present in the ontology
            if "is_a" not in v:
                extended_schema[k] = v

        # second pass: "vertical" inheritance
        d = self._vertical_property_inheritance(d)
        for k, v in d.items():
            if "is_a" in v:
                # prevent loops
                if k == v["is_a"]:
                    logger.warning(
                        f"Loop detected in ontology mapping: {k} -> {v}. "
                        "Removing item. Please fix the inheritance if you want "
                        "to use this item."
                    )
                    continue

                extended_schema[k] = v

        # "horizontal" inheritance: create siblings for multiple identifiers or
        # sources -> virtual leaves or implicit children
        mi_leaves = {}
        ms_leaves = {}
        for k, v in d.items():
            # k is not an entity
            if "represented_as" not in v:
                continue

            if isinstance(v.get("preferred_id"), list):
                mi_leaves = self._horizontal_inheritance_pid(k, v)
                extended_schema.update(mi_leaves)

            elif isinstance(v.get("source"), list):
                ms_leaves = self._horizontal_inheritance_source(k, v)
                extended_schema.update(ms_leaves)

        return extended_schema

    def _vertical_property_inheritance(self, d):
        """
        Inherit properties from parents to children and update `d` accordingly.
        """
        for k, v in d.items():
            # k is not an entity
            if "represented_as" not in v:
                continue

            # k is an entity that is present in the ontology
            if "is_a" not in v:
                continue

            # "vertical" inheritance: inherit properties from parent
            if v.get("inherit_properties", False):
                # get direct ancestor
                if isinstance(v["is_a"], list):
                    parent = v["is_a"][0]
                else:
                    parent = v["is_a"]

                # ensure child has properties and exclude_properties
                if "properties" not in v:
                    v["properties"] = {}
                if "exclude_properties" not in v:
                    v["exclude_properties"] = {}

                # update properties of child
                parent_props = self.schema[parent].get("properties", {})
                if parent_props:
                    v["properties"].update(parent_props)

                parent_excl_props = self.schema[parent].get("exclude_properties", {})
                if parent_excl_props:
                    v["exclude_properties"].update(parent_excl_props)

                # update schema (d)
                d[k] = v

        return d

    def _horizontal_inheritance_pid(self, key, value):
        """
        Create virtual leaves for multiple preferred id types or sources.

        If we create virtual leaves, input_label/label_in_input always has to be
        a list.
        """

        leaves = {}

        preferred_id = value["preferred_id"]
        input_label = value.get("input_label") or value["label_in_input"]
        represented_as = value["represented_as"]

        # adjust lengths
        max_l = max(
            [
                len(_misc.to_list(preferred_id)),
                len(_misc.to_list(input_label)),
                len(_misc.to_list(represented_as)),
            ],
        )

        # adjust pid length if necessary
        if isinstance(preferred_id, str):
            pids = [preferred_id] * max_l
        else:
            pids = preferred_id

        # adjust rep length if necessary
        if isinstance(represented_as, str):
            reps = [represented_as] * max_l
        else:
            reps = represented_as

        for pid, lab, rep in zip(pids, input_label, reps):
            skey = pid + "." + key
            svalue = {
                "preferred_id": pid,
                "input_label": lab,
                "represented_as": rep,
                # mark as virtual
                "virtual": True,
            }

            # inherit is_a if exists
            if "is_a" in value.keys():
                # treat as multiple inheritance
                if isinstance(value["is_a"], list):
                    v = list(value["is_a"])
                    v.insert(0, key)
                    svalue["is_a"] = v

                else:
                    svalue["is_a"] = [key, value["is_a"]]

            else:
                # set parent as is_a
                svalue["is_a"] = key

            # inherit everything except core attributes
            for k, v in value.items():
                if k not in [
                    "is_a",
                    "preferred_id",
                    "input_label",
                    "label_in_input",
                    "represented_as",
                ]:
                    svalue[k] = v

            leaves[skey] = svalue

        return leaves

    def _horizontal_inheritance_source(self, key, value):
        """
        Create virtual leaves for multiple sources.

        If we create virtual leaves, input_label/label_in_input always has to be
        a list.
        """

        leaves = {}

        source = value["source"]
        input_label = value.get("input_label") or value["label_in_input"]
        represented_as = value["represented_as"]

        # adjust lengths
        src_l = len(source)

        # adjust label length if necessary
        if isinstance(input_label, str):
            labels = [input_label] * src_l
        else:
            labels = input_label

        # adjust rep length if necessary
        if isinstance(represented_as, str):
            reps = [represented_as] * src_l
        else:
            reps = represented_as

        for src, lab, rep in zip(source, labels, reps):
            skey = src + "." + key
            svalue = {
                "source": src,
                "input_label": lab,
                "represented_as": rep,
                # mark as virtual
                "virtual": True,
            }

            # inherit is_a if exists
            if "is_a" in value.keys():
                # treat as multiple inheritance
                if isinstance(value["is_a"], list):
                    v = list(value["is_a"])
                    v.insert(0, key)
                    svalue["is_a"] = v

                else:
                    svalue["is_a"] = [key, value["is_a"]]

            else:
                # set parent as is_a
                svalue["is_a"] = key

            # inherit everything except core attributes
            for k, v in value.items():
                if k not in [
                    "is_a",
                    "source",
                    "input_label",
                    "label_in_input",
                    "represented_as",
                ]:
                    svalue[k] = v

            leaves[skey] = svalue

        return leaves

_extend_schema(d=None)

Get leaves of the tree hierarchy from the data structure dict contained in the schema_config.yaml. Creates virtual leaves (as children) from entries that provide more than one preferred id type (and corresponding inputs).

Parameters:

Name Type Description Default
d Optional[dict]

Data structure dict from yaml file.

None
Source code in biocypher/_mapping.py
def _extend_schema(self, d: Optional[dict] = None) -> dict:
    """
    Get leaves of the tree hierarchy from the data structure dict
    contained in the `schema_config.yaml`. Creates virtual leaves
    (as children) from entries that provide more than one preferred
    id type (and corresponding inputs).

    Args:
        d:
            Data structure dict from yaml file.

    """

    d = d or self.schema

    extended_schema = dict()

    # first pass: get parent leaves with direct representation in ontology
    for k, v in d.items():
        # k is not an entity
        if "represented_as" not in v:
            continue

        # preferred_id optional: if not provided, use `id`
        if not v.get("preferred_id"):
            v["preferred_id"] = "id"

        # k is an entity that is present in the ontology
        if "is_a" not in v:
            extended_schema[k] = v

    # second pass: "vertical" inheritance
    d = self._vertical_property_inheritance(d)
    for k, v in d.items():
        if "is_a" in v:
            # prevent loops
            if k == v["is_a"]:
                logger.warning(
                    f"Loop detected in ontology mapping: {k} -> {v}. "
                    "Removing item. Please fix the inheritance if you want "
                    "to use this item."
                )
                continue

            extended_schema[k] = v

    # "horizontal" inheritance: create siblings for multiple identifiers or
    # sources -> virtual leaves or implicit children
    mi_leaves = {}
    ms_leaves = {}
    for k, v in d.items():
        # k is not an entity
        if "represented_as" not in v:
            continue

        if isinstance(v.get("preferred_id"), list):
            mi_leaves = self._horizontal_inheritance_pid(k, v)
            extended_schema.update(mi_leaves)

        elif isinstance(v.get("source"), list):
            ms_leaves = self._horizontal_inheritance_source(k, v)
            extended_schema.update(ms_leaves)

    return extended_schema

_horizontal_inheritance_pid(key, value)

Create virtual leaves for multiple preferred id types or sources.

If we create virtual leaves, input_label/label_in_input always has to be a list.

Source code in biocypher/_mapping.py
def _horizontal_inheritance_pid(self, key, value):
    """
    Create virtual leaves for multiple preferred id types or sources.

    If we create virtual leaves, input_label/label_in_input always has to be
    a list.
    """

    leaves = {}

    preferred_id = value["preferred_id"]
    input_label = value.get("input_label") or value["label_in_input"]
    represented_as = value["represented_as"]

    # adjust lengths
    max_l = max(
        [
            len(_misc.to_list(preferred_id)),
            len(_misc.to_list(input_label)),
            len(_misc.to_list(represented_as)),
        ],
    )

    # adjust pid length if necessary
    if isinstance(preferred_id, str):
        pids = [preferred_id] * max_l
    else:
        pids = preferred_id

    # adjust rep length if necessary
    if isinstance(represented_as, str):
        reps = [represented_as] * max_l
    else:
        reps = represented_as

    for pid, lab, rep in zip(pids, input_label, reps):
        skey = pid + "." + key
        svalue = {
            "preferred_id": pid,
            "input_label": lab,
            "represented_as": rep,
            # mark as virtual
            "virtual": True,
        }

        # inherit is_a if exists
        if "is_a" in value.keys():
            # treat as multiple inheritance
            if isinstance(value["is_a"], list):
                v = list(value["is_a"])
                v.insert(0, key)
                svalue["is_a"] = v

            else:
                svalue["is_a"] = [key, value["is_a"]]

        else:
            # set parent as is_a
            svalue["is_a"] = key

        # inherit everything except core attributes
        for k, v in value.items():
            if k not in [
                "is_a",
                "preferred_id",
                "input_label",
                "label_in_input",
                "represented_as",
            ]:
                svalue[k] = v

        leaves[skey] = svalue

    return leaves

_horizontal_inheritance_source(key, value)

Create virtual leaves for multiple sources.

If we create virtual leaves, input_label/label_in_input always has to be a list.

Source code in biocypher/_mapping.py
def _horizontal_inheritance_source(self, key, value):
    """
    Create virtual leaves for multiple sources.

    If we create virtual leaves, input_label/label_in_input always has to be
    a list.
    """

    leaves = {}

    source = value["source"]
    input_label = value.get("input_label") or value["label_in_input"]
    represented_as = value["represented_as"]

    # adjust lengths
    src_l = len(source)

    # adjust label length if necessary
    if isinstance(input_label, str):
        labels = [input_label] * src_l
    else:
        labels = input_label

    # adjust rep length if necessary
    if isinstance(represented_as, str):
        reps = [represented_as] * src_l
    else:
        reps = represented_as

    for src, lab, rep in zip(source, labels, reps):
        skey = src + "." + key
        svalue = {
            "source": src,
            "input_label": lab,
            "represented_as": rep,
            # mark as virtual
            "virtual": True,
        }

        # inherit is_a if exists
        if "is_a" in value.keys():
            # treat as multiple inheritance
            if isinstance(value["is_a"], list):
                v = list(value["is_a"])
                v.insert(0, key)
                svalue["is_a"] = v

            else:
                svalue["is_a"] = [key, value["is_a"]]

        else:
            # set parent as is_a
            svalue["is_a"] = key

        # inherit everything except core attributes
        for k, v in value.items():
            if k not in [
                "is_a",
                "source",
                "input_label",
                "label_in_input",
                "represented_as",
            ]:
                svalue[k] = v

        leaves[skey] = svalue

    return leaves

_read_config(config_file=None)

Read the configuration file and store the ontology mapping and extensions.

Source code in biocypher/_mapping.py
def _read_config(self, config_file: str = None):
    """
    Read the configuration file and store the ontology mapping and extensions.
    """
    if config_file is None:
        schema_config = {}

    # load yaml file from web
    elif config_file.startswith("http"):
        with urlopen(config_file) as f:
            schema_config = yaml.safe_load(f)

    # get graph state from config (assume file is local)
    else:
        with open(config_file, "r") as f:
            schema_config = yaml.safe_load(f)

    return schema_config

_vertical_property_inheritance(d)

Inherit properties from parents to children and update d accordingly.

Source code in biocypher/_mapping.py
def _vertical_property_inheritance(self, d):
    """
    Inherit properties from parents to children and update `d` accordingly.
    """
    for k, v in d.items():
        # k is not an entity
        if "represented_as" not in v:
            continue

        # k is an entity that is present in the ontology
        if "is_a" not in v:
            continue

        # "vertical" inheritance: inherit properties from parent
        if v.get("inherit_properties", False):
            # get direct ancestor
            if isinstance(v["is_a"], list):
                parent = v["is_a"][0]
            else:
                parent = v["is_a"]

            # ensure child has properties and exclude_properties
            if "properties" not in v:
                v["properties"] = {}
            if "exclude_properties" not in v:
                v["exclude_properties"] = {}

            # update properties of child
            parent_props = self.schema[parent].get("properties", {})
            if parent_props:
                v["properties"].update(parent_props)

            parent_excl_props = self.schema[parent].get("exclude_properties", {})
            if parent_excl_props:
                v["exclude_properties"].update(parent_excl_props)

            # update schema (d)
            d[k] = v

    return d