Skip to content

Output Writing

Writer Retrieval

Return the writer class based on the selection in the config file.


dbms: the database management system; for options, see DBMS_TO_CLASS.
translator: the Translator object.
deduplicator: the Deduplicator object.
output_directory: the directory to output.write the output files to.
strict_mode: whether to use strict mode.

instance: an instance of the selected writer class.
Source code in biocypher/output/write/_get_writer.py
def get_writer(
    dbms: str,
    translator: "Translator",
    deduplicator: "Deduplicator",
    output_directory: str,
    strict_mode: bool,
) -> _BatchWriter | None:
    """Return the writer class based on the selection in the config file.

    Args:
    ----
        dbms: the database management system; for options, see DBMS_TO_CLASS.
        translator: the Translator object.
        deduplicator: the Deduplicator object.
        output_directory: the directory to output.write the output files to.
        strict_mode: whether to use strict mode.

    Returns:
    -------
        instance: an instance of the selected writer class.

    """
    dbms_config = _config(dbms) or {}

    writer = DBMS_TO_CLASS[dbms]

    if "rdf_format" in dbms_config:
        logger.warning("The 'rdf_format' config option is deprecated, use 'file_format' instead.")
        if "file_format" not in dbms_config:
            format = dbms_config["rdf_format"]
            logger.warning(f"I will set 'file_format: {format}' for you.")
            dbms_config["file_format"] = format
            dbms_config.pop("rdf_format")
        logger.warning("NOTE: this warning will become an error in next versions.")

    if not writer:
        msg = f"Unknown dbms: {dbms}"
        raise ValueError(msg)

    if writer is not None:
        return writer(
            translator=translator,
            deduplicator=deduplicator,
            delimiter=dbms_config.get("delimiter"),
            array_delimiter=dbms_config.get("array_delimiter"),
            quote=dbms_config.get("quote_character"),
            output_directory=output_directory,
            db_name=dbms_config.get("database_name"),
            import_call_bin_prefix=dbms_config.get("import_call_bin_prefix"),
            import_call_file_prefix=dbms_config.get("import_call_file_prefix"),
            wipe=dbms_config.get("wipe"),
            strict_mode=strict_mode,
            skip_bad_relationships=dbms_config.get("skip_bad_relationships"),  # neo4j
            skip_duplicate_nodes=dbms_config.get("skip_duplicate_nodes"),  # neo4j
            db_user=dbms_config.get("user"),  # psql
            db_password=dbms_config.get("password"),  # psql
            db_port=dbms_config.get("port"),  # psql
            file_format=dbms_config.get("file_format"),  # rdf, owl
            rdf_namespaces=dbms_config.get("rdf_namespaces"),  # rdf, owl
            edge_model=dbms_config.get("edge_model"),  # owl
        )
    return None

Writer Base Class

Bases: ABC

Abstract class for writing node and edge representations to disk. Specifics of the different writers (e.g. neo4j, postgresql, csv, etc.) are implemented in the child classes. Any concrete writer needs to implement at least: - _write_node_data - _write_edge_data - _construct_import_call - _get_import_script_name


translator (Translator): Instance of :py:class:`Translator` to enable translation of
    nodes and manipulation of properties.
deduplicator (Deduplicator): Instance of :py:class:`Deduplicator` to enable deduplication
    of nodes and edges.
output_directory (str, optional): Path for exporting CSV files. Defaults to None.
strict_mode (bool, optional): Whether to enforce source, version, and license properties. Defaults to False.

NotImplementedError: Writer implementation must override '_write_node_data'
NotImplementedError: Writer implementation must override '_write_edge_data'
NotImplementedError: Writer implementation must override '_construct_import_call'
NotImplementedError: Writer implementation must override '_get_import_script_name'
Source code in biocypher/output/write/_writer.py
class _Writer(ABC):
    """Abstract class for writing node and edge representations to disk.
    Specifics of the different writers (e.g. neo4j, postgresql, csv, etc.)
    are implemented in the child classes. Any concrete writer needs to
    implement at least:
    - _write_node_data
    - _write_edge_data
    - _construct_import_call
    - _get_import_script_name

    Args:
    ----
        translator (Translator): Instance of :py:class:`Translator` to enable translation of
            nodes and manipulation of properties.
        deduplicator (Deduplicator): Instance of :py:class:`Deduplicator` to enable deduplication
            of nodes and edges.
        output_directory (str, optional): Path for exporting CSV files. Defaults to None.
        strict_mode (bool, optional): Whether to enforce source, version, and license properties. Defaults to False.

    Raises:
    ------
        NotImplementedError: Writer implementation must override '_write_node_data'
        NotImplementedError: Writer implementation must override '_write_edge_data'
        NotImplementedError: Writer implementation must override '_construct_import_call'
        NotImplementedError: Writer implementation must override '_get_import_script_name'

    """

    def __init__(
        self,
        translator: Translator,
        deduplicator: Deduplicator,
        output_directory: str | None = None,
        strict_mode: bool = False,
        *args,
        **kwargs,
    ):
        """Abstract class for writing node and edge representations to disk.

        Args:
        ----
            translator (Translator): Instance of :py:class:`Translator` to enable translation of
                nodes and manipulation of properties.
            deduplicator (Deduplicator): Instance of :py:class:`Deduplicator` to enable deduplication
                of nodes and edges.
            output_directory (str, optional): Path for exporting CSV files. Defaults to None.
            strict_mode (bool, optional): Whether to enforce source, version, and license properties. Defaults to False.

        """
        self.translator = translator
        self.deduplicator = deduplicator
        self.strict_mode = strict_mode
        self.output_directory = output_directory

        if os.path.exists(self.output_directory):
            if kwargs.get("write_to_file", True):
                logger.warning(
                    f"Output directory `{self.output_directory}` already exists. "
                    "If this is not planned, file consistency may be compromised.",
                )
        else:
            logger.info(f"Creating output directory `{self.output_directory}`.")
            os.makedirs(self.output_directory)

    @abstractmethod
    def _write_node_data(
        self,
        nodes: Iterable[BioCypherNode | BioCypherEdge | BioCypherRelAsNode],
    ) -> bool:
        """Implement how to output.write nodes to disk.

        Args:
        ----
            nodes (Iterable): An iterable of BioCypherNode / BioCypherEdge / BioCypherRelAsNode objects.

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        raise NotImplementedError("Writer implementation must override 'write_nodes'")

    @abstractmethod
    def _write_edge_data(
        self,
        edges: Iterable[BioCypherNode | BioCypherEdge | BioCypherRelAsNode],
    ) -> bool:
        """Implement how to output.write edges to disk.

        Args:
        ----
            edges (Iterable): An iterable of BioCypherNode / BioCypherEdge / BioCypherRelAsNode objects.

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        raise NotImplementedError("Writer implementation must override 'write_edges'")

    @abstractmethod
    def _construct_import_call(self) -> str:
        """Function to construct the import call detailing folder and
        individual node and edge headers and data files, as well as
        delimiters and database name. Built after all data has been
        processed to ensure that nodes are called before any edges.

        Returns
        -------
            str: command for importing the output files into a DBMS.

        """
        raise NotImplementedError("Writer implementation must override '_construct_import_call'")

    @abstractmethod
    def _get_import_script_name(self) -> str:
        """Returns the name of the import script.

        Returns
        -------
            str: The name of the import script (ending in .sh)

        """
        raise NotImplementedError("Writer implementation must override '_get_import_script_name'")

    def write_nodes(self, nodes, batch_size: int = int(1e6), force: bool = False):
        """Wrapper for writing nodes.

        Args:
        ----
            nodes (BioCypherNode): a list or generator of nodes in
                :py:class:`BioCypherNode` format
            batch_size (int): The batch size for writing nodes.
            force (bool): Whether to force writing nodes even if their type is
                not present in the schema.

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        passed = self._write_node_data(nodes)
        if not passed:
            logger.error("Error while writing node data.")
            return False
        return True

    def write_edges(self, edges, batch_size: int = int(1e6), force: bool = False):
        """Wrapper for writing edges.

        Args:
        ----
            nodes (BioCypherNode): a list or generator of nodes in
                :py:class:`BioCypherNode` format
            batch_size (int): The batch size for writing nodes.
            force (bool): Whether to force writing nodes even if their type is
                not present in the schema.

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        passed = self._write_edge_data(edges)
        if not passed:
            logger.error("Error while writing edge data.")
            return False
        return True

    def write_import_call(self):
        """Function to output.write the import call detailing folder and
        individual node and edge headers and data files, as well as
        delimiters and database name, to the export folder as txt.

        Returns
        -------
            str: The path of the file holding the import call.

        """
        file_path = os.path.join(self.output_directory, self._get_import_script_name())
        logger.info(f"Writing {self.__class__.__name__} import call to `{file_path}`.")

        with open(file_path, "w", encoding="utf-8") as f:
            f.write(self._construct_import_call())

        return file_path

__init__(translator, deduplicator, output_directory=None, strict_mode=False, *args, **kwargs)

Abstract class for writing node and edge representations to disk.


translator (Translator): Instance of :py:class:`Translator` to enable translation of
    nodes and manipulation of properties.
deduplicator (Deduplicator): Instance of :py:class:`Deduplicator` to enable deduplication
    of nodes and edges.
output_directory (str, optional): Path for exporting CSV files. Defaults to None.
strict_mode (bool, optional): Whether to enforce source, version, and license properties. Defaults to False.
Source code in biocypher/output/write/_writer.py
def __init__(
    self,
    translator: Translator,
    deduplicator: Deduplicator,
    output_directory: str | None = None,
    strict_mode: bool = False,
    *args,
    **kwargs,
):
    """Abstract class for writing node and edge representations to disk.

    Args:
    ----
        translator (Translator): Instance of :py:class:`Translator` to enable translation of
            nodes and manipulation of properties.
        deduplicator (Deduplicator): Instance of :py:class:`Deduplicator` to enable deduplication
            of nodes and edges.
        output_directory (str, optional): Path for exporting CSV files. Defaults to None.
        strict_mode (bool, optional): Whether to enforce source, version, and license properties. Defaults to False.

    """
    self.translator = translator
    self.deduplicator = deduplicator
    self.strict_mode = strict_mode
    self.output_directory = output_directory

    if os.path.exists(self.output_directory):
        if kwargs.get("write_to_file", True):
            logger.warning(
                f"Output directory `{self.output_directory}` already exists. "
                "If this is not planned, file consistency may be compromised.",
            )
    else:
        logger.info(f"Creating output directory `{self.output_directory}`.")
        os.makedirs(self.output_directory)

_construct_import_call() abstractmethod

Function to construct the import call detailing folder and individual node and edge headers and data files, as well as delimiters and database name. Built after all data has been processed to ensure that nodes are called before any edges.

Returns
str: command for importing the output files into a DBMS.
Source code in biocypher/output/write/_writer.py
@abstractmethod
def _construct_import_call(self) -> str:
    """Function to construct the import call detailing folder and
    individual node and edge headers and data files, as well as
    delimiters and database name. Built after all data has been
    processed to ensure that nodes are called before any edges.

    Returns
    -------
        str: command for importing the output files into a DBMS.

    """
    raise NotImplementedError("Writer implementation must override '_construct_import_call'")

_get_import_script_name() abstractmethod

Returns the name of the import script.

Returns
str: The name of the import script (ending in .sh)
Source code in biocypher/output/write/_writer.py
@abstractmethod
def _get_import_script_name(self) -> str:
    """Returns the name of the import script.

    Returns
    -------
        str: The name of the import script (ending in .sh)

    """
    raise NotImplementedError("Writer implementation must override '_get_import_script_name'")

_write_edge_data(edges) abstractmethod

Implement how to output.write edges to disk.


edges (Iterable): An iterable of BioCypherNode / BioCypherEdge / BioCypherRelAsNode objects.

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_writer.py
@abstractmethod
def _write_edge_data(
    self,
    edges: Iterable[BioCypherNode | BioCypherEdge | BioCypherRelAsNode],
) -> bool:
    """Implement how to output.write edges to disk.

    Args:
    ----
        edges (Iterable): An iterable of BioCypherNode / BioCypherEdge / BioCypherRelAsNode objects.

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    raise NotImplementedError("Writer implementation must override 'write_edges'")

_write_node_data(nodes) abstractmethod

Implement how to output.write nodes to disk.


nodes (Iterable): An iterable of BioCypherNode / BioCypherEdge / BioCypherRelAsNode objects.

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_writer.py
@abstractmethod
def _write_node_data(
    self,
    nodes: Iterable[BioCypherNode | BioCypherEdge | BioCypherRelAsNode],
) -> bool:
    """Implement how to output.write nodes to disk.

    Args:
    ----
        nodes (Iterable): An iterable of BioCypherNode / BioCypherEdge / BioCypherRelAsNode objects.

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    raise NotImplementedError("Writer implementation must override 'write_nodes'")

write_edges(edges, batch_size=int(1000000.0), force=False)

Wrapper for writing edges.


nodes (BioCypherNode): a list or generator of nodes in
    :py:class:`BioCypherNode` format
batch_size (int): The batch size for writing nodes.
force (bool): Whether to force writing nodes even if their type is
    not present in the schema.

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_writer.py
def write_edges(self, edges, batch_size: int = int(1e6), force: bool = False):
    """Wrapper for writing edges.

    Args:
    ----
        nodes (BioCypherNode): a list or generator of nodes in
            :py:class:`BioCypherNode` format
        batch_size (int): The batch size for writing nodes.
        force (bool): Whether to force writing nodes even if their type is
            not present in the schema.

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    passed = self._write_edge_data(edges)
    if not passed:
        logger.error("Error while writing edge data.")
        return False
    return True

write_import_call()

Function to output.write the import call detailing folder and individual node and edge headers and data files, as well as delimiters and database name, to the export folder as txt.

Returns
str: The path of the file holding the import call.
Source code in biocypher/output/write/_writer.py
def write_import_call(self):
    """Function to output.write the import call detailing folder and
    individual node and edge headers and data files, as well as
    delimiters and database name, to the export folder as txt.

    Returns
    -------
        str: The path of the file holding the import call.

    """
    file_path = os.path.join(self.output_directory, self._get_import_script_name())
    logger.info(f"Writing {self.__class__.__name__} import call to `{file_path}`.")

    with open(file_path, "w", encoding="utf-8") as f:
        f.write(self._construct_import_call())

    return file_path

write_nodes(nodes, batch_size=int(1000000.0), force=False)

Wrapper for writing nodes.


nodes (BioCypherNode): a list or generator of nodes in
    :py:class:`BioCypherNode` format
batch_size (int): The batch size for writing nodes.
force (bool): Whether to force writing nodes even if their type is
    not present in the schema.

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_writer.py
def write_nodes(self, nodes, batch_size: int = int(1e6), force: bool = False):
    """Wrapper for writing nodes.

    Args:
    ----
        nodes (BioCypherNode): a list or generator of nodes in
            :py:class:`BioCypherNode` format
        batch_size (int): The batch size for writing nodes.
        force (bool): Whether to force writing nodes even if their type is
            not present in the schema.

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    passed = self._write_node_data(nodes)
    if not passed:
        logger.error("Error while writing node data.")
        return False
    return True

Batch Writer Base Class

Bases: _Writer, ABC

Abstract batch writer class.

Source code in biocypher/output/write/_batch_writer.py
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
class _BatchWriter(_Writer, ABC):
    """Abstract batch writer class."""

    @abstractmethod
    def _quote_string(self, value: str) -> str:
        """Quote a string.

        Escaping is handled by the database-specific writer.
        """
        msg = "Database writer must override '_quote_string'"
        logger.error(msg)
        raise NotImplementedError(msg)

    @abstractmethod
    def _get_default_import_call_bin_prefix(self):
        """Provide the default string for the import call bin prefix.

        Returns
        -------
            str: The database-specific string for the path to the import call bin prefix

        """
        msg = "Database writer must override '_get_default_import_call_bin_prefix'"
        logger.error(msg)
        raise NotImplementedError(msg)

    @abstractmethod
    def _write_array_string(self, string_list):
        """Write the string representation of an array into a .csv file.

        Different databases require different formats of array to optimize
        import speed.

        Args:
        ----
            string_list (list): list of ontology strings

        Returns:
        -------
            str: The database-specific string representation of an array

        """
        msg = "Database writer must override '_write_array_string'"
        logger.error(msg)
        raise NotImplementedError(msg)

    @abstractmethod
    def _write_node_headers(self):
        """Write header files for nodes.

        Write header files (node properties) for nodes as per the
        definition in the `schema_config.yaml`.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        msg = "Database writer must override '_write_node_headers'"
        logger.error(msg)
        raise NotImplementedError(msg)

    @abstractmethod
    def _write_edge_headers(self):
        """Write a database import-file for an edge.

        Write a database import-file for an edge as per the definition in
        the `schema_config.yaml`, containing only the header for this type
        of edge.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        msg = "Database writer must override '_write_edge_headers'"
        logger.error(msg)
        raise NotImplementedError(msg)

    @abstractmethod
    def _construct_import_call(self) -> str:
        """Construct the import call.

        Construct the import call detailing folder and individual node and
        edge headers and data files, as well as delimiters and database name.
        Built after all data has been processed to ensure that nodes are
        called before any edges.

        Returns
        -------
            str: A bash command for csv import.

        """
        msg = "Database writer must override '_construct_import_call'"
        logger.error(msg)
        raise NotImplementedError(msg)

    @abstractmethod
    def _get_import_script_name(self) -> str:
        """Return the name of the import script.

        The name will be chosen based on the used database.

        Returns
        -------
            str: The name of the import script (ending in .sh)

        """
        msg = "Database writer must override '_get_import_script_name'"
        logger.error(msg)
        raise NotImplementedError(msg)

    def __init__(
        self,
        translator: "Translator",
        deduplicator: "Deduplicator",
        delimiter: str,
        array_delimiter: str = ",",
        quote: str = '"',
        output_directory: str | None = None,
        db_name: str = "neo4j",
        import_call_bin_prefix: str | None = None,
        import_call_file_prefix: str | None = None,
        wipe: bool = True,
        strict_mode: bool = False,
        skip_bad_relationships: bool = False,
        skip_duplicate_nodes: bool = False,
        db_user: str = None,
        db_password: str = None,
        db_host: str = None,
        db_port: str = None,
        file_format: str = None,
        rdf_namespaces: dict = {},
        labels_order: str = "Ascending",
        **kwargs,
    ):
        """Write node and edge representations to disk.

        Abstract parent class for writing node and edge representations to disk
        using the format specified by each database type. The database-specific
        functions are implemented by the respective child-classes. This abstract
        class contains all methods expected by a bach writer instance, some of
        which need to be overwritten by the child classes.

        Each batch writer instance has a fixed representation that needs to be
        passed at instantiation via the :py:attr:`schema` argument. The instance
        also expects an ontology adapter via :py:attr:`ontology_adapter` to be
        able to convert and extend the hierarchy.

        Requires the following methods to be overwritten by database-specific
        writer classes:

            - _write_node_headers
            - _write_edge_headers
            - _construct_import_call
            - _write_array_string
            - _get_import_script_name

        Args:
        ----
            translator:
                Instance of :py:class:`Translator` to enable translation of
                nodes and manipulation of properties.

            deduplicator:
                Instance of :py:class:`Deduplicator` to enable deduplication
                of nodes and edges.

            delimiter:
                The delimiter to use for the CSV files.

            array_delimiter:
                The delimiter to use for array properties.

            quote:
                The quote character to use for the CSV files.

            output_directory:
                Path for exporting CSV files.

            db_name:
                Name of the database that will be used in the generated
                commands.

            import_call_bin_prefix:
                Path prefix for the admin import call binary.

            import_call_file_prefix:
                Path prefix for the data files (headers and parts) in the import
                call.

            wipe:
                Whether to force import (removing existing DB content).
                    (Specific to Neo4j.)

            strict_mode:
                Whether to enforce source, version, and license properties.

            skip_bad_relationships:
                Whether to skip relationships that do not have a valid
                start and end node. (Specific to Neo4j.)

            skip_duplicate_nodes:
                Whether to skip duplicate nodes. (Specific to Neo4j.)

            db_user:
                The database user.

            db_password:
                The database password.

            db_host:
                The database host. Defaults to localhost.

            db_port:
                The database port.

            file_format:
                The format of RDF.

            rdf_namespaces:
                The namespaces for RDF.

            labels_order:
                The order of labels, to reflect the hierarchy (or not).
                Default: "Ascending" (from more specific to more generic).

        """
        super().__init__(
            translator=translator,
            deduplicator=deduplicator,
            output_directory=output_directory,
            strict_mode=strict_mode,
        )
        self.db_name = db_name
        self.db_user = db_user
        self.db_password = db_password
        self.db_host = db_host or "localhost"
        self.db_port = db_port
        self.file_format = file_format
        self.rdf_namespaces = rdf_namespaces

        self.delim, self.escaped_delim = self._process_delimiter(delimiter)
        self.adelim, self.escaped_adelim = self._process_delimiter(array_delimiter)
        self.quote = quote
        self.skip_bad_relationships = skip_bad_relationships
        self.skip_duplicate_nodes = skip_duplicate_nodes

        if import_call_bin_prefix is None:
            self.import_call_bin_prefix = self._get_default_import_call_bin_prefix()
        else:
            self.import_call_bin_prefix = import_call_bin_prefix

        self.wipe = wipe
        self.strict_mode = strict_mode

        self.translator = translator
        self.deduplicator = deduplicator
        self.node_property_dict = {}
        self.edge_property_dict = {}
        self.import_call_nodes = set()
        self.import_call_edges = set()

        self.outdir = output_directory

        self._import_call_file_prefix = import_call_file_prefix

        self.parts = {}  # dict to store the paths of part files for each label

        self._labels_orders = ["Alphabetical", "Ascending", "Descending", "Leaves"]
        if labels_order not in self._labels_orders:
            msg = (
                f"neo4j's 'labels_order' parameter cannot be '{labels_order}',"
                "must be one of: {' ,'.join(self._labels_orders)}",
            )
            raise ValueError(msg)
        self.labels_order = labels_order

        # TODO not memory efficient, but should be fine for most cases; is
        # there a more elegant solution?

    @property
    def import_call_file_prefix(self):
        """Property for output directory path."""
        if self._import_call_file_prefix is None:
            return self.outdir
        else:
            return self._import_call_file_prefix

    def _process_delimiter(self, delimiter: str) -> str:
        """Process a delimited to escape correctly.

        Args:
        ----
            delimiter (str): The delimiter to process.

        Returns:
        -------
            tuple: The delimiter and its escaped representation.

        """
        if delimiter == "\\t":
            return "\t", "\\t"

        else:
            return delimiter, delimiter

    def write_nodes(self, nodes, batch_size: int = int(1e6), force: bool = False):
        """Write nodes and their headers.

        Args:
        ----
            nodes (BioCypherNode): a list or generator of nodes in
                :py:class:`BioCypherNode` format

            batch_size (int): The batch size for writing nodes.

            force (bool): Whether to force writing nodes even if their type is
                not present in the schema.


        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        # TODO check represented_as

        # write node data
        passed = self._write_node_data(nodes, batch_size, force)
        if not passed:
            logger.error("Error while writing node data.")
            return False
        # pass property data to header writer per node type written
        passed = self._write_node_headers()
        if not passed:
            logger.error("Error while writing node headers.")
            return False

        return True

    def write_edges(
        self,
        edges: list | GeneratorType,
        batch_size: int = int(1e6),
    ) -> bool:
        """Write edges and their headers.

        Args:
        ----
            edges (BioCypherEdge): a list or generator of edges in
                :py:class:`BioCypherEdge` or :py:class:`BioCypherRelAsNode`
                format

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        passed = False
        edges = list(edges)  # force evaluation to handle empty generator
        if edges:
            nodes_flat = []
            edges_flat = []
            for edge in edges:
                if isinstance(edge, BioCypherRelAsNode):
                    # check if relationship has already been written, if so skip
                    if self.deduplicator.rel_as_node_seen(edge):
                        continue

                    nodes_flat.append(edge.get_node())
                    edges_flat.append(edge.get_source_edge())
                    edges_flat.append(edge.get_target_edge())

                else:
                    # check if relationship has already been written, if so skip
                    if self.deduplicator.edge_seen(edge):
                        continue

                    edges_flat.append(edge)

            if nodes_flat and edges_flat:
                passed = self.write_nodes(nodes_flat) and self._write_edge_data(
                    edges_flat,
                    batch_size,
                )
            else:
                passed = self._write_edge_data(edges_flat, batch_size)

        else:
            # is this a problem? if the generator or list is empty, we
            # don't write anything.
            logger.debug(
                "No edges to write, possibly due to no matched Biolink classes.",
            )

        if not passed:
            logger.error("Error while writing edge data.")
            return False
        # pass property data to header writer per edge type written
        passed = self._write_edge_headers()
        if not passed:
            logger.error("Error while writing edge headers.")
            return False

        return True

    def _write_node_data(self, nodes, batch_size, force: bool = False):
        """Write biocypher nodes to CSV.

        Conforms to the headers created with `_write_node_headers()`, and
        is actually required to be run before calling `_write_node_headers()`
        to set the :py:attr:`self.node_property_dict` for passing the node
        properties to the instance. Expects list or generator of nodes from
        the :py:class:`BioCypherNode` class.

        Args:
        ----
            nodes (BioCypherNode): a list or generator of nodes in
                :py:class:`BioCypherNode` format

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        if isinstance(nodes, GeneratorType | peekable):
            logger.debug("Writing node CSV from generator.")

            bins = defaultdict(list)  # dict to store a list for each
            # label that is passed in
            bin_l = {}  # dict to store the length of each list for
            # batching cutoff
            reference_props = defaultdict(
                dict,
            )  # dict to store a dict of properties
            # for each label to check for consistency and their type
            # for now, relevant for `int`
            labels = {}  # dict to store the additional labels for each
            # primary graph constituent from biolink hierarchy
            for node in nodes:
                # check if node has already been written, if so skip
                if self.deduplicator.node_seen(node):
                    continue

                _id = node.get_id()
                label = node.get_label()

                # check for non-id
                if not _id:
                    logger.warning(f"Node {label} has no id; skipping.")
                    continue

                if label not in bins.keys():
                    # start new list
                    all_labels = None
                    bins[label].append(node)
                    bin_l[label] = 1

                    # get properties from config if present
                    if label in self.translator.ontology.mapping.extended_schema:
                        cprops = self.translator.ontology.mapping.extended_schema.get(label).get(
                            "properties",
                        )
                    else:
                        cprops = None
                    if cprops:
                        d = dict(cprops)

                        # add id and preferred id to properties; these are
                        # created in node creation (`_create.BioCypherNode`)
                        d["id"] = "str"
                        d["preferred_id"] = "str"

                        # add strict mode properties
                        if self.strict_mode:
                            d["source"] = "str"
                            d["version"] = "str"
                            d["licence"] = "str"

                    else:
                        d = dict(node.get_properties())
                        # encode property type
                        for k, v in d.items():
                            if d[k] is not None:
                                d[k] = type(v).__name__
                    # else use first encountered node to define properties for
                    # checking; could later be by checking all nodes but much
                    # more complicated, particularly involving batch writing
                    # (would require "do-overs"). for now, we output a warning
                    # if node properties diverge from reference properties (in
                    # write_single_node_list_to_file) TODO if it occurs, ask
                    # user to select desired properties and restart the process

                    reference_props[label] = d

                    # get label hierarchy
                    # multiple labels:
                    if not force:
                        all_labels = self.translator.ontology.get_ancestors(label)
                    else:
                        all_labels = None

                    if all_labels:
                        # convert to pascal case
                        all_labels = [self.translator.name_sentence_to_pascal(label) for label in all_labels]
                        # remove duplicates
                        all_labels = list(OrderedDict.fromkeys(all_labels))
                        match self.labels_order:
                            case "Ascending":
                                pass  # Default from get_ancestors.
                            case "Alphabetical":
                                all_labels.sort()
                            case "Descending":
                                all_labels.reverse()
                            case "Leaves":
                                if len(all_labels) < 1:
                                    msg = "Labels list cannot be empty when using 'Leaves' order."
                                    raise ValueError(msg)
                                all_labels = [all_labels[0]]
                            case _:
                                # In case someone touched _label_orders after constructor.
                                if self.labels_order not in self._labels_orders:
                                    msg = (
                                        f"Invalid labels_order: {self.labels_order}. "
                                        f"Must be one of {self._labels_orders}"
                                    )
                                    raise ValueError(msg)
                        # concatenate with array delimiter
                        all_labels = self._write_array_string(all_labels)
                    else:
                        all_labels = self.translator.name_sentence_to_pascal(label)

                    labels[label] = all_labels

                else:
                    # add to list
                    bins[label].append(node)
                    bin_l[label] += 1
                    if not bin_l[label] < batch_size:
                        # batch size controlled here
                        passed = self._write_single_node_list_to_file(
                            bins[label],
                            label,
                            reference_props[label],
                            labels[label],
                        )

                        if not passed:
                            return False

                        bins[label] = []
                        bin_l[label] = 0

            # after generator depleted, write remainder of bins
            for label, nl in bins.items():
                passed = self._write_single_node_list_to_file(
                    nl,
                    label,
                    reference_props[label],
                    labels[label],
                )

                if not passed:
                    return False

            # use complete bin list to write header files
            # TODO if a node type has varying properties
            # (ie missingness), we'd need to collect all possible
            # properties in the generator pass

            # save config or first-node properties to instance attribute
            for label in reference_props.keys():
                self.node_property_dict[label] = reference_props[label]

            return True
        elif not isinstance(nodes, list):
            logger.error("Nodes must be passed as list or generator.")
            return False
        else:

            def gen(nodes):
                yield from nodes

            return self._write_node_data(gen(nodes), batch_size=batch_size)

    def _write_single_node_list_to_file(
        self,
        node_list: list,
        label: str,
        prop_dict: dict,
        labels: str,
    ):
        """Write a list of biocypher nodes to a CSV file.

        This function takes one list of biocypher nodes and writes them
        to a Neo4j admin import compatible CSV file.

        Args:
        ----
            node_list (list): list of BioCypherNodes to be written
            label (str): the primary label of the node
            prop_dict (dict): properties of node class passed from parsing
                function and their types
            labels (str): string of one or several concatenated labels
                for the node class

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        if not all(isinstance(n, BioCypherNode) for n in node_list):
            logger.error("Nodes must be passed as type BioCypherNode.")
            return False

        # from list of nodes to list of strings
        lines = []

        for n in node_list:
            # check for deviations in properties
            # node properties
            n_props = n.get_properties()
            n_keys = list(n_props.keys())
            # reference properties
            ref_props = list(prop_dict.keys())

            # compare lists order invariant
            if set(ref_props) != set(n_keys):
                onode = n.get_id()
                oprop1 = set(ref_props).difference(n_keys)
                oprop2 = set(n_keys).difference(ref_props)
                logger.error(
                    f"At least one node of the class {n.get_label()} "
                    f"has more or fewer properties than another. "
                    f"Offending node: {onode!r}, offending property: "
                    f"{max([oprop1, oprop2])}. "
                    f"All reference properties: {ref_props}, "
                    f"All node properties: {n_keys}.",
                )
                return False

            line = [n.get_id()]

            if ref_props:
                plist = []
                # make all into strings, put actual strings in quotes
                for k, v in prop_dict.items():
                    p = n_props.get(k)
                    if p is None:  # TODO make field empty instead of ""?
                        plist.append("")
                    elif v in [
                        "int",
                        "integer",
                        "long",
                        "float",
                        "double",
                        "dbl",
                        "bool",
                        "boolean",
                    ]:
                        plist.append(str(p))
                    elif isinstance(p, list):
                        plist.append(self._write_array_string(p))
                    else:
                        plist.append(f"{self.quote}{p!s}{self.quote}")

                line.append(self.delim.join(plist))
            line.append(labels)

            lines.append(self.delim.join(line) + "\n")

        # avoid writing empty files
        if lines:
            self._write_next_part(label, lines)

        return True

    def _write_edge_data(self, edges, batch_size):
        """Write biocypher edges to CSV.

        Writes biocypher edges to CSV conforming to the headers created
        with `_write_edge_headers()`, and is actually required to be run
        before calling `_write_node_headers()` to set the
        :py:attr:`self.edge_property_dict` for passing the edge
        properties to the instance. Expects list or generator of edges
        from the :py:class:`BioCypherEdge` class.

        Args:
        ----
            edges (BioCypherEdge): a list or generator of edges in
                :py:class:`BioCypherEdge` format

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        Todo:
        ----
            - currently works for mixed edges but in practice often is
              called on one iterable containing one type of edge only

        """
        if isinstance(edges, GeneratorType):
            logger.debug("Writing edge CSV from generator.")

            bins = defaultdict(list)  # dict to store a list for each
            # label that is passed in
            bin_l = {}  # dict to store the length of each list for
            # batching cutoff
            reference_props = defaultdict(
                dict,
            )  # dict to store a dict of properties
            # for each label to check for consistency and their type
            # for now, relevant for `int`
            for edge in edges:
                if not (edge.get_source_id() and edge.get_target_id()):
                    logger.error(
                        f"Edge must have source and target node. Caused by: {edge}",
                    )
                    continue

                label = edge.get_label()

                if label not in bins.keys():
                    # start new list
                    bins[label].append(edge)
                    bin_l[label] = 1

                    # get properties from config if present

                    # check whether label is in ontology_adapter.leaves
                    # (may not be if it is an edge that carries the
                    # "label_as_edge" property)
                    cprops = None
                    if label in self.translator.ontology.mapping.extended_schema:
                        cprops = self.translator.ontology.mapping.extended_schema.get(label).get(
                            "properties",
                        )
                    else:
                        # try via "label_as_edge"
                        for (
                            k,
                            v,
                        ) in self.translator.ontology.mapping.extended_schema.items():
                            if isinstance(v, dict):
                                if v.get("label_as_edge") == label:
                                    cprops = v.get("properties")
                                    break
                    if cprops:
                        d = cprops

                        # add strict mode properties
                        if self.strict_mode:
                            d["source"] = "str"
                            d["version"] = "str"
                            d["licence"] = "str"

                    else:
                        d = dict(edge.get_properties())
                        # encode property type
                        for k, v in d.items():
                            if d[k] is not None:
                                d[k] = type(v).__name__
                    # else use first encountered edge to define
                    # properties for checking; could later be by
                    # checking all edges but much more complicated,
                    # particularly involving batch writing (would
                    # require "do-overs"). for now, we output a warning
                    # if edge properties diverge from reference
                    # properties (in write_single_edge_list_to_file)
                    # TODO

                    reference_props[label] = d

                else:
                    # add to list
                    bins[label].append(edge)
                    bin_l[label] += 1
                    if not bin_l[label] < batch_size:
                        # batch size controlled here
                        passed = self._write_single_edge_list_to_file(
                            bins[label],
                            label,
                            reference_props[label],
                        )

                        if not passed:
                            return False

                        bins[label] = []
                        bin_l[label] = 0

            # after generator depleted, write remainder of bins
            for label, nl in bins.items():
                passed = self._write_single_edge_list_to_file(
                    nl,
                    label,
                    reference_props[label],
                )

                if not passed:
                    return False

            # use complete bin list to write header files
            # TODO if a edge type has varying properties
            # (ie missingness), we'd need to collect all possible
            # properties in the generator pass

            # save first-edge properties to instance attribute
            for label in reference_props.keys():
                self.edge_property_dict[label] = reference_props[label]

            return True
        elif not isinstance(edges, list):
            logger.error("Edges must be passed as list or generator.")
            return False
        else:

            def gen(edges):
                yield from edges

            return self._write_edge_data(gen(edges), batch_size=batch_size)

    def _write_single_edge_list_to_file(
        self,
        edge_list: list,
        label: str,
        prop_dict: dict,
    ):
        """Write a list of biocypher edges to a CSV file.

        This function takes one list of biocypher edges and writes them
        to a Neo4j admin import compatible CSV file.

        Args:
        ----
            edge_list (list): list of BioCypherEdges to be written

            label (str): the label (type) of the edge

            prop_dict (dict): properties of node class passed from parsing
                function and their types

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        if not all(isinstance(n, BioCypherEdge) for n in edge_list):
            logger.error("Edges must be passed as type BioCypherEdge.")
            return False

        # from list of edges to list of strings
        lines = []
        for e in edge_list:
            # check for deviations in properties
            # edge properties
            e_props = e.get_properties()
            e_keys = list(e_props.keys())
            ref_props = list(prop_dict.keys())

            # compare list order invariant
            if set(ref_props) != set(e_keys):
                oedge = f"{e.get_source_id()}-{e.get_target_id()}"
                oprop1 = set(ref_props).difference(e_keys)
                oprop2 = set(e_keys).difference(ref_props)
                logger.error(
                    f"At least one edge of the class {e.get_label()} "
                    f"has more or fewer properties than another. "
                    f"Offending edge: {oedge!r}, offending property: "
                    f"{max([oprop1, oprop2])}. "
                    f"All reference properties: {ref_props}, "
                    f"All edge properties: {e_keys}.",
                )
                return False

            plist = []
            # make all into strings, put actual strings in quotes
            for k, v in prop_dict.items():
                p = e_props.get(k)
                if p is None:  # TODO make field empty instead of ""?
                    plist.append("")
                elif v in [
                    "int",
                    "integer",
                    "long",
                    "float",
                    "double",
                    "dbl",
                    "bool",
                    "boolean",
                ]:
                    plist.append(str(p))
                elif isinstance(p, list):
                    plist.append(self._write_array_string(p))
                else:
                    plist.append(self.quote + str(p) + self.quote)

            entries = [e.get_source_id()]

            skip_id = False
            schema_label = None

            if label in ["IS_SOURCE_OF", "IS_TARGET_OF", "IS_PART_OF"]:
                skip_id = True
            elif not self.translator.ontology.mapping.extended_schema.get(label):
                # find label in schema by label_as_edge
                for (
                    k,
                    v,
                ) in self.translator.ontology.mapping.extended_schema.items():
                    if v.get("label_as_edge") == label:
                        schema_label = k
                        break
            else:
                schema_label = label

            if schema_label:
                if (
                    self.translator.ontology.mapping.extended_schema.get(
                        schema_label,
                    ).get("use_id")
                    == False  # noqa: E712 (seems to not work with 'not')
                ):
                    skip_id = True

            if not skip_id:
                entries.append(e.get_id() or "")

            if ref_props:
                entries.append(self.delim.join(plist))

            entries.append(e.get_target_id())
            entries.append(
                self.translator.name_sentence_to_pascal(
                    e.get_label(),
                ),
            )

            lines.append(
                self.delim.join(entries) + "\n",
            )

        # avoid writing empty files
        if lines:
            self._write_next_part(label, lines)

        return True

    def _write_next_part(self, label: str, lines: list):
        """Write a list of strings to a new part file.

        Args:
        ----
            label (str): the label (type) of the edge; internal
            representation sentence case -> needs to become PascalCase
            for disk representation

            lines (list): list of strings to be written

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        # translate label to PascalCase
        label_pascal = self.translator.name_sentence_to_pascal(parse_label(label))

        # list files in self.outdir
        files = glob.glob(os.path.join(self.outdir, f"{label_pascal}-part*.csv"))
        # find file with highest part number
        if not files:
            next_part = 0

        else:
            next_part = (
                max(
                    [int(f.split(".")[-2].split("-")[-1].replace("part", "")) for f in files],
                )
                + 1
            )

        # write to file
        padded_part = str(next_part).zfill(3)
        logger.info(
            f"Writing {len(lines)} entries to {label_pascal}-part{padded_part}.csv",
        )

        # store name only in case import_call_file_prefix is set
        part = f"{label_pascal}-part{padded_part}.csv"
        file_path = os.path.join(self.outdir, part)

        with open(file_path, "w", encoding="utf-8") as f:
            # concatenate with delimiter
            f.writelines(lines)

        if not self.parts.get(label):
            self.parts[label] = [part]
        else:
            self.parts[label].append(part)

    def get_import_call(self) -> str:
        """Eeturn the import call.

        Return the import call detailing folder and individual node and
        edge headers and data files, as well as delimiters and database name.

        Returns
        -------
            str: a bash command for the database import

        """
        return self._construct_import_call()

    def write_import_call(self) -> str:
        """Write the import call.

        Function to write the import call detailing folder and
        individual node and edge headers and data files, as well as
        delimiters and database name, to the export folder as txt.

        Returns
        -------
            str: The path of the file holding the import call.

        """
        file_path = os.path.join(self.outdir, self._get_import_script_name())
        logger.info(f"Writing {self.db_name + ' ' if self.db_name else ''}import call to `{file_path}`.")

        with open(file_path, "w", encoding="utf-8") as f:
            f.write(self._construct_import_call())

        return file_path

import_call_file_prefix property

Property for output directory path.

__init__(translator, deduplicator, delimiter, array_delimiter=',', quote='"', output_directory=None, db_name='neo4j', import_call_bin_prefix=None, import_call_file_prefix=None, wipe=True, strict_mode=False, skip_bad_relationships=False, skip_duplicate_nodes=False, db_user=None, db_password=None, db_host=None, db_port=None, file_format=None, rdf_namespaces={}, labels_order='Ascending', **kwargs)

Write node and edge representations to disk.

Abstract parent class for writing node and edge representations to disk using the format specified by each database type. The database-specific functions are implemented by the respective child-classes. This abstract class contains all methods expected by a bach writer instance, some of which need to be overwritten by the child classes.

Each batch writer instance has a fixed representation that needs to be passed at instantiation via the 🇵🇾attr:schema argument. The instance also expects an ontology adapter via 🇵🇾attr:ontology_adapter to be able to convert and extend the hierarchy.

Requires the following methods to be overwritten by database-specific writer classes:

- _write_node_headers
- _write_edge_headers
- _construct_import_call
- _write_array_string
- _get_import_script_name

translator:
    Instance of :py:class:`Translator` to enable translation of
    nodes and manipulation of properties.

deduplicator:
    Instance of :py:class:`Deduplicator` to enable deduplication
    of nodes and edges.

delimiter:
    The delimiter to use for the CSV files.

array_delimiter:
    The delimiter to use for array properties.

quote:
    The quote character to use for the CSV files.

output_directory:
    Path for exporting CSV files.

db_name:
    Name of the database that will be used in the generated
    commands.

import_call_bin_prefix:
    Path prefix for the admin import call binary.

import_call_file_prefix:
    Path prefix for the data files (headers and parts) in the import
    call.

wipe:
    Whether to force import (removing existing DB content).
        (Specific to Neo4j.)

strict_mode:
    Whether to enforce source, version, and license properties.

skip_bad_relationships:
    Whether to skip relationships that do not have a valid
    start and end node. (Specific to Neo4j.)

skip_duplicate_nodes:
    Whether to skip duplicate nodes. (Specific to Neo4j.)

db_user:
    The database user.

db_password:
    The database password.

db_host:
    The database host. Defaults to localhost.

db_port:
    The database port.

file_format:
    The format of RDF.

rdf_namespaces:
    The namespaces for RDF.

labels_order:
    The order of labels, to reflect the hierarchy (or not).
    Default: "Ascending" (from more specific to more generic).
Source code in biocypher/output/write/_batch_writer.py
def __init__(
    self,
    translator: "Translator",
    deduplicator: "Deduplicator",
    delimiter: str,
    array_delimiter: str = ",",
    quote: str = '"',
    output_directory: str | None = None,
    db_name: str = "neo4j",
    import_call_bin_prefix: str | None = None,
    import_call_file_prefix: str | None = None,
    wipe: bool = True,
    strict_mode: bool = False,
    skip_bad_relationships: bool = False,
    skip_duplicate_nodes: bool = False,
    db_user: str = None,
    db_password: str = None,
    db_host: str = None,
    db_port: str = None,
    file_format: str = None,
    rdf_namespaces: dict = {},
    labels_order: str = "Ascending",
    **kwargs,
):
    """Write node and edge representations to disk.

    Abstract parent class for writing node and edge representations to disk
    using the format specified by each database type. The database-specific
    functions are implemented by the respective child-classes. This abstract
    class contains all methods expected by a bach writer instance, some of
    which need to be overwritten by the child classes.

    Each batch writer instance has a fixed representation that needs to be
    passed at instantiation via the :py:attr:`schema` argument. The instance
    also expects an ontology adapter via :py:attr:`ontology_adapter` to be
    able to convert and extend the hierarchy.

    Requires the following methods to be overwritten by database-specific
    writer classes:

        - _write_node_headers
        - _write_edge_headers
        - _construct_import_call
        - _write_array_string
        - _get_import_script_name

    Args:
    ----
        translator:
            Instance of :py:class:`Translator` to enable translation of
            nodes and manipulation of properties.

        deduplicator:
            Instance of :py:class:`Deduplicator` to enable deduplication
            of nodes and edges.

        delimiter:
            The delimiter to use for the CSV files.

        array_delimiter:
            The delimiter to use for array properties.

        quote:
            The quote character to use for the CSV files.

        output_directory:
            Path for exporting CSV files.

        db_name:
            Name of the database that will be used in the generated
            commands.

        import_call_bin_prefix:
            Path prefix for the admin import call binary.

        import_call_file_prefix:
            Path prefix for the data files (headers and parts) in the import
            call.

        wipe:
            Whether to force import (removing existing DB content).
                (Specific to Neo4j.)

        strict_mode:
            Whether to enforce source, version, and license properties.

        skip_bad_relationships:
            Whether to skip relationships that do not have a valid
            start and end node. (Specific to Neo4j.)

        skip_duplicate_nodes:
            Whether to skip duplicate nodes. (Specific to Neo4j.)

        db_user:
            The database user.

        db_password:
            The database password.

        db_host:
            The database host. Defaults to localhost.

        db_port:
            The database port.

        file_format:
            The format of RDF.

        rdf_namespaces:
            The namespaces for RDF.

        labels_order:
            The order of labels, to reflect the hierarchy (or not).
            Default: "Ascending" (from more specific to more generic).

    """
    super().__init__(
        translator=translator,
        deduplicator=deduplicator,
        output_directory=output_directory,
        strict_mode=strict_mode,
    )
    self.db_name = db_name
    self.db_user = db_user
    self.db_password = db_password
    self.db_host = db_host or "localhost"
    self.db_port = db_port
    self.file_format = file_format
    self.rdf_namespaces = rdf_namespaces

    self.delim, self.escaped_delim = self._process_delimiter(delimiter)
    self.adelim, self.escaped_adelim = self._process_delimiter(array_delimiter)
    self.quote = quote
    self.skip_bad_relationships = skip_bad_relationships
    self.skip_duplicate_nodes = skip_duplicate_nodes

    if import_call_bin_prefix is None:
        self.import_call_bin_prefix = self._get_default_import_call_bin_prefix()
    else:
        self.import_call_bin_prefix = import_call_bin_prefix

    self.wipe = wipe
    self.strict_mode = strict_mode

    self.translator = translator
    self.deduplicator = deduplicator
    self.node_property_dict = {}
    self.edge_property_dict = {}
    self.import_call_nodes = set()
    self.import_call_edges = set()

    self.outdir = output_directory

    self._import_call_file_prefix = import_call_file_prefix

    self.parts = {}  # dict to store the paths of part files for each label

    self._labels_orders = ["Alphabetical", "Ascending", "Descending", "Leaves"]
    if labels_order not in self._labels_orders:
        msg = (
            f"neo4j's 'labels_order' parameter cannot be '{labels_order}',"
            "must be one of: {' ,'.join(self._labels_orders)}",
        )
        raise ValueError(msg)
    self.labels_order = labels_order

_construct_import_call() abstractmethod

Construct the import call.

Construct the import call detailing folder and individual node and edge headers and data files, as well as delimiters and database name. Built after all data has been processed to ensure that nodes are called before any edges.

Returns
str: A bash command for csv import.
Source code in biocypher/output/write/_batch_writer.py
@abstractmethod
def _construct_import_call(self) -> str:
    """Construct the import call.

    Construct the import call detailing folder and individual node and
    edge headers and data files, as well as delimiters and database name.
    Built after all data has been processed to ensure that nodes are
    called before any edges.

    Returns
    -------
        str: A bash command for csv import.

    """
    msg = "Database writer must override '_construct_import_call'"
    logger.error(msg)
    raise NotImplementedError(msg)

_get_default_import_call_bin_prefix() abstractmethod

Provide the default string for the import call bin prefix.

Returns
str: The database-specific string for the path to the import call bin prefix
Source code in biocypher/output/write/_batch_writer.py
@abstractmethod
def _get_default_import_call_bin_prefix(self):
    """Provide the default string for the import call bin prefix.

    Returns
    -------
        str: The database-specific string for the path to the import call bin prefix

    """
    msg = "Database writer must override '_get_default_import_call_bin_prefix'"
    logger.error(msg)
    raise NotImplementedError(msg)

_get_import_script_name() abstractmethod

Return the name of the import script.

The name will be chosen based on the used database.

Returns
str: The name of the import script (ending in .sh)
Source code in biocypher/output/write/_batch_writer.py
@abstractmethod
def _get_import_script_name(self) -> str:
    """Return the name of the import script.

    The name will be chosen based on the used database.

    Returns
    -------
        str: The name of the import script (ending in .sh)

    """
    msg = "Database writer must override '_get_import_script_name'"
    logger.error(msg)
    raise NotImplementedError(msg)

_process_delimiter(delimiter)

Process a delimited to escape correctly.


delimiter (str): The delimiter to process.

tuple: The delimiter and its escaped representation.
Source code in biocypher/output/write/_batch_writer.py
def _process_delimiter(self, delimiter: str) -> str:
    """Process a delimited to escape correctly.

    Args:
    ----
        delimiter (str): The delimiter to process.

    Returns:
    -------
        tuple: The delimiter and its escaped representation.

    """
    if delimiter == "\\t":
        return "\t", "\\t"

    else:
        return delimiter, delimiter

_quote_string(value) abstractmethod

Quote a string.

Escaping is handled by the database-specific writer.

Source code in biocypher/output/write/_batch_writer.py
@abstractmethod
def _quote_string(self, value: str) -> str:
    """Quote a string.

    Escaping is handled by the database-specific writer.
    """
    msg = "Database writer must override '_quote_string'"
    logger.error(msg)
    raise NotImplementedError(msg)

_write_array_string(string_list) abstractmethod

Write the string representation of an array into a .csv file.

Different databases require different formats of array to optimize import speed.


string_list (list): list of ontology strings

str: The database-specific string representation of an array
Source code in biocypher/output/write/_batch_writer.py
@abstractmethod
def _write_array_string(self, string_list):
    """Write the string representation of an array into a .csv file.

    Different databases require different formats of array to optimize
    import speed.

    Args:
    ----
        string_list (list): list of ontology strings

    Returns:
    -------
        str: The database-specific string representation of an array

    """
    msg = "Database writer must override '_write_array_string'"
    logger.error(msg)
    raise NotImplementedError(msg)

_write_edge_data(edges, batch_size)

Write biocypher edges to CSV.

Writes biocypher edges to CSV conforming to the headers created with _write_edge_headers(), and is actually required to be run before calling _write_node_headers() to set the 🇵🇾attr:self.edge_property_dict for passing the edge properties to the instance. Expects list or generator of edges from the 🇵🇾class:BioCypherEdge class.


edges (BioCypherEdge): a list or generator of edges in
    :py:class:`BioCypherEdge` format

bool: The return value. True for success, False otherwise.
Todo:
- currently works for mixed edges but in practice often is
  called on one iterable containing one type of edge only
Source code in biocypher/output/write/_batch_writer.py
def _write_edge_data(self, edges, batch_size):
    """Write biocypher edges to CSV.

    Writes biocypher edges to CSV conforming to the headers created
    with `_write_edge_headers()`, and is actually required to be run
    before calling `_write_node_headers()` to set the
    :py:attr:`self.edge_property_dict` for passing the edge
    properties to the instance. Expects list or generator of edges
    from the :py:class:`BioCypherEdge` class.

    Args:
    ----
        edges (BioCypherEdge): a list or generator of edges in
            :py:class:`BioCypherEdge` format

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    Todo:
    ----
        - currently works for mixed edges but in practice often is
          called on one iterable containing one type of edge only

    """
    if isinstance(edges, GeneratorType):
        logger.debug("Writing edge CSV from generator.")

        bins = defaultdict(list)  # dict to store a list for each
        # label that is passed in
        bin_l = {}  # dict to store the length of each list for
        # batching cutoff
        reference_props = defaultdict(
            dict,
        )  # dict to store a dict of properties
        # for each label to check for consistency and their type
        # for now, relevant for `int`
        for edge in edges:
            if not (edge.get_source_id() and edge.get_target_id()):
                logger.error(
                    f"Edge must have source and target node. Caused by: {edge}",
                )
                continue

            label = edge.get_label()

            if label not in bins.keys():
                # start new list
                bins[label].append(edge)
                bin_l[label] = 1

                # get properties from config if present

                # check whether label is in ontology_adapter.leaves
                # (may not be if it is an edge that carries the
                # "label_as_edge" property)
                cprops = None
                if label in self.translator.ontology.mapping.extended_schema:
                    cprops = self.translator.ontology.mapping.extended_schema.get(label).get(
                        "properties",
                    )
                else:
                    # try via "label_as_edge"
                    for (
                        k,
                        v,
                    ) in self.translator.ontology.mapping.extended_schema.items():
                        if isinstance(v, dict):
                            if v.get("label_as_edge") == label:
                                cprops = v.get("properties")
                                break
                if cprops:
                    d = cprops

                    # add strict mode properties
                    if self.strict_mode:
                        d["source"] = "str"
                        d["version"] = "str"
                        d["licence"] = "str"

                else:
                    d = dict(edge.get_properties())
                    # encode property type
                    for k, v in d.items():
                        if d[k] is not None:
                            d[k] = type(v).__name__
                # else use first encountered edge to define
                # properties for checking; could later be by
                # checking all edges but much more complicated,
                # particularly involving batch writing (would
                # require "do-overs"). for now, we output a warning
                # if edge properties diverge from reference
                # properties (in write_single_edge_list_to_file)
                # TODO

                reference_props[label] = d

            else:
                # add to list
                bins[label].append(edge)
                bin_l[label] += 1
                if not bin_l[label] < batch_size:
                    # batch size controlled here
                    passed = self._write_single_edge_list_to_file(
                        bins[label],
                        label,
                        reference_props[label],
                    )

                    if not passed:
                        return False

                    bins[label] = []
                    bin_l[label] = 0

        # after generator depleted, write remainder of bins
        for label, nl in bins.items():
            passed = self._write_single_edge_list_to_file(
                nl,
                label,
                reference_props[label],
            )

            if not passed:
                return False

        # use complete bin list to write header files
        # TODO if a edge type has varying properties
        # (ie missingness), we'd need to collect all possible
        # properties in the generator pass

        # save first-edge properties to instance attribute
        for label in reference_props.keys():
            self.edge_property_dict[label] = reference_props[label]

        return True
    elif not isinstance(edges, list):
        logger.error("Edges must be passed as list or generator.")
        return False
    else:

        def gen(edges):
            yield from edges

        return self._write_edge_data(gen(edges), batch_size=batch_size)

_write_edge_headers() abstractmethod

Write a database import-file for an edge.

Write a database import-file for an edge as per the definition in the schema_config.yaml, containing only the header for this type of edge.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_batch_writer.py
@abstractmethod
def _write_edge_headers(self):
    """Write a database import-file for an edge.

    Write a database import-file for an edge as per the definition in
    the `schema_config.yaml`, containing only the header for this type
    of edge.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    msg = "Database writer must override '_write_edge_headers'"
    logger.error(msg)
    raise NotImplementedError(msg)

_write_next_part(label, lines)

Write a list of strings to a new part file.


label (str): the label (type) of the edge; internal
representation sentence case -> needs to become PascalCase
for disk representation

lines (list): list of strings to be written

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_batch_writer.py
def _write_next_part(self, label: str, lines: list):
    """Write a list of strings to a new part file.

    Args:
    ----
        label (str): the label (type) of the edge; internal
        representation sentence case -> needs to become PascalCase
        for disk representation

        lines (list): list of strings to be written

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    # translate label to PascalCase
    label_pascal = self.translator.name_sentence_to_pascal(parse_label(label))

    # list files in self.outdir
    files = glob.glob(os.path.join(self.outdir, f"{label_pascal}-part*.csv"))
    # find file with highest part number
    if not files:
        next_part = 0

    else:
        next_part = (
            max(
                [int(f.split(".")[-2].split("-")[-1].replace("part", "")) for f in files],
            )
            + 1
        )

    # write to file
    padded_part = str(next_part).zfill(3)
    logger.info(
        f"Writing {len(lines)} entries to {label_pascal}-part{padded_part}.csv",
    )

    # store name only in case import_call_file_prefix is set
    part = f"{label_pascal}-part{padded_part}.csv"
    file_path = os.path.join(self.outdir, part)

    with open(file_path, "w", encoding="utf-8") as f:
        # concatenate with delimiter
        f.writelines(lines)

    if not self.parts.get(label):
        self.parts[label] = [part]
    else:
        self.parts[label].append(part)

_write_node_data(nodes, batch_size, force=False)

Write biocypher nodes to CSV.

Conforms to the headers created with _write_node_headers(), and is actually required to be run before calling _write_node_headers() to set the 🇵🇾attr:self.node_property_dict for passing the node properties to the instance. Expects list or generator of nodes from the 🇵🇾class:BioCypherNode class.


nodes (BioCypherNode): a list or generator of nodes in
    :py:class:`BioCypherNode` format

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_batch_writer.py
def _write_node_data(self, nodes, batch_size, force: bool = False):
    """Write biocypher nodes to CSV.

    Conforms to the headers created with `_write_node_headers()`, and
    is actually required to be run before calling `_write_node_headers()`
    to set the :py:attr:`self.node_property_dict` for passing the node
    properties to the instance. Expects list or generator of nodes from
    the :py:class:`BioCypherNode` class.

    Args:
    ----
        nodes (BioCypherNode): a list or generator of nodes in
            :py:class:`BioCypherNode` format

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    if isinstance(nodes, GeneratorType | peekable):
        logger.debug("Writing node CSV from generator.")

        bins = defaultdict(list)  # dict to store a list for each
        # label that is passed in
        bin_l = {}  # dict to store the length of each list for
        # batching cutoff
        reference_props = defaultdict(
            dict,
        )  # dict to store a dict of properties
        # for each label to check for consistency and their type
        # for now, relevant for `int`
        labels = {}  # dict to store the additional labels for each
        # primary graph constituent from biolink hierarchy
        for node in nodes:
            # check if node has already been written, if so skip
            if self.deduplicator.node_seen(node):
                continue

            _id = node.get_id()
            label = node.get_label()

            # check for non-id
            if not _id:
                logger.warning(f"Node {label} has no id; skipping.")
                continue

            if label not in bins.keys():
                # start new list
                all_labels = None
                bins[label].append(node)
                bin_l[label] = 1

                # get properties from config if present
                if label in self.translator.ontology.mapping.extended_schema:
                    cprops = self.translator.ontology.mapping.extended_schema.get(label).get(
                        "properties",
                    )
                else:
                    cprops = None
                if cprops:
                    d = dict(cprops)

                    # add id and preferred id to properties; these are
                    # created in node creation (`_create.BioCypherNode`)
                    d["id"] = "str"
                    d["preferred_id"] = "str"

                    # add strict mode properties
                    if self.strict_mode:
                        d["source"] = "str"
                        d["version"] = "str"
                        d["licence"] = "str"

                else:
                    d = dict(node.get_properties())
                    # encode property type
                    for k, v in d.items():
                        if d[k] is not None:
                            d[k] = type(v).__name__
                # else use first encountered node to define properties for
                # checking; could later be by checking all nodes but much
                # more complicated, particularly involving batch writing
                # (would require "do-overs"). for now, we output a warning
                # if node properties diverge from reference properties (in
                # write_single_node_list_to_file) TODO if it occurs, ask
                # user to select desired properties and restart the process

                reference_props[label] = d

                # get label hierarchy
                # multiple labels:
                if not force:
                    all_labels = self.translator.ontology.get_ancestors(label)
                else:
                    all_labels = None

                if all_labels:
                    # convert to pascal case
                    all_labels = [self.translator.name_sentence_to_pascal(label) for label in all_labels]
                    # remove duplicates
                    all_labels = list(OrderedDict.fromkeys(all_labels))
                    match self.labels_order:
                        case "Ascending":
                            pass  # Default from get_ancestors.
                        case "Alphabetical":
                            all_labels.sort()
                        case "Descending":
                            all_labels.reverse()
                        case "Leaves":
                            if len(all_labels) < 1:
                                msg = "Labels list cannot be empty when using 'Leaves' order."
                                raise ValueError(msg)
                            all_labels = [all_labels[0]]
                        case _:
                            # In case someone touched _label_orders after constructor.
                            if self.labels_order not in self._labels_orders:
                                msg = (
                                    f"Invalid labels_order: {self.labels_order}. "
                                    f"Must be one of {self._labels_orders}"
                                )
                                raise ValueError(msg)
                    # concatenate with array delimiter
                    all_labels = self._write_array_string(all_labels)
                else:
                    all_labels = self.translator.name_sentence_to_pascal(label)

                labels[label] = all_labels

            else:
                # add to list
                bins[label].append(node)
                bin_l[label] += 1
                if not bin_l[label] < batch_size:
                    # batch size controlled here
                    passed = self._write_single_node_list_to_file(
                        bins[label],
                        label,
                        reference_props[label],
                        labels[label],
                    )

                    if not passed:
                        return False

                    bins[label] = []
                    bin_l[label] = 0

        # after generator depleted, write remainder of bins
        for label, nl in bins.items():
            passed = self._write_single_node_list_to_file(
                nl,
                label,
                reference_props[label],
                labels[label],
            )

            if not passed:
                return False

        # use complete bin list to write header files
        # TODO if a node type has varying properties
        # (ie missingness), we'd need to collect all possible
        # properties in the generator pass

        # save config or first-node properties to instance attribute
        for label in reference_props.keys():
            self.node_property_dict[label] = reference_props[label]

        return True
    elif not isinstance(nodes, list):
        logger.error("Nodes must be passed as list or generator.")
        return False
    else:

        def gen(nodes):
            yield from nodes

        return self._write_node_data(gen(nodes), batch_size=batch_size)

_write_node_headers() abstractmethod

Write header files for nodes.

Write header files (node properties) for nodes as per the definition in the schema_config.yaml.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_batch_writer.py
@abstractmethod
def _write_node_headers(self):
    """Write header files for nodes.

    Write header files (node properties) for nodes as per the
    definition in the `schema_config.yaml`.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    msg = "Database writer must override '_write_node_headers'"
    logger.error(msg)
    raise NotImplementedError(msg)

_write_single_edge_list_to_file(edge_list, label, prop_dict)

Write a list of biocypher edges to a CSV file.

This function takes one list of biocypher edges and writes them to a Neo4j admin import compatible CSV file.


edge_list (list): list of BioCypherEdges to be written

label (str): the label (type) of the edge

prop_dict (dict): properties of node class passed from parsing
    function and their types

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_batch_writer.py
def _write_single_edge_list_to_file(
    self,
    edge_list: list,
    label: str,
    prop_dict: dict,
):
    """Write a list of biocypher edges to a CSV file.

    This function takes one list of biocypher edges and writes them
    to a Neo4j admin import compatible CSV file.

    Args:
    ----
        edge_list (list): list of BioCypherEdges to be written

        label (str): the label (type) of the edge

        prop_dict (dict): properties of node class passed from parsing
            function and their types

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    if not all(isinstance(n, BioCypherEdge) for n in edge_list):
        logger.error("Edges must be passed as type BioCypherEdge.")
        return False

    # from list of edges to list of strings
    lines = []
    for e in edge_list:
        # check for deviations in properties
        # edge properties
        e_props = e.get_properties()
        e_keys = list(e_props.keys())
        ref_props = list(prop_dict.keys())

        # compare list order invariant
        if set(ref_props) != set(e_keys):
            oedge = f"{e.get_source_id()}-{e.get_target_id()}"
            oprop1 = set(ref_props).difference(e_keys)
            oprop2 = set(e_keys).difference(ref_props)
            logger.error(
                f"At least one edge of the class {e.get_label()} "
                f"has more or fewer properties than another. "
                f"Offending edge: {oedge!r}, offending property: "
                f"{max([oprop1, oprop2])}. "
                f"All reference properties: {ref_props}, "
                f"All edge properties: {e_keys}.",
            )
            return False

        plist = []
        # make all into strings, put actual strings in quotes
        for k, v in prop_dict.items():
            p = e_props.get(k)
            if p is None:  # TODO make field empty instead of ""?
                plist.append("")
            elif v in [
                "int",
                "integer",
                "long",
                "float",
                "double",
                "dbl",
                "bool",
                "boolean",
            ]:
                plist.append(str(p))
            elif isinstance(p, list):
                plist.append(self._write_array_string(p))
            else:
                plist.append(self.quote + str(p) + self.quote)

        entries = [e.get_source_id()]

        skip_id = False
        schema_label = None

        if label in ["IS_SOURCE_OF", "IS_TARGET_OF", "IS_PART_OF"]:
            skip_id = True
        elif not self.translator.ontology.mapping.extended_schema.get(label):
            # find label in schema by label_as_edge
            for (
                k,
                v,
            ) in self.translator.ontology.mapping.extended_schema.items():
                if v.get("label_as_edge") == label:
                    schema_label = k
                    break
        else:
            schema_label = label

        if schema_label:
            if (
                self.translator.ontology.mapping.extended_schema.get(
                    schema_label,
                ).get("use_id")
                == False  # noqa: E712 (seems to not work with 'not')
            ):
                skip_id = True

        if not skip_id:
            entries.append(e.get_id() or "")

        if ref_props:
            entries.append(self.delim.join(plist))

        entries.append(e.get_target_id())
        entries.append(
            self.translator.name_sentence_to_pascal(
                e.get_label(),
            ),
        )

        lines.append(
            self.delim.join(entries) + "\n",
        )

    # avoid writing empty files
    if lines:
        self._write_next_part(label, lines)

    return True

_write_single_node_list_to_file(node_list, label, prop_dict, labels)

Write a list of biocypher nodes to a CSV file.

This function takes one list of biocypher nodes and writes them to a Neo4j admin import compatible CSV file.


node_list (list): list of BioCypherNodes to be written
label (str): the primary label of the node
prop_dict (dict): properties of node class passed from parsing
    function and their types
labels (str): string of one or several concatenated labels
    for the node class

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_batch_writer.py
def _write_single_node_list_to_file(
    self,
    node_list: list,
    label: str,
    prop_dict: dict,
    labels: str,
):
    """Write a list of biocypher nodes to a CSV file.

    This function takes one list of biocypher nodes and writes them
    to a Neo4j admin import compatible CSV file.

    Args:
    ----
        node_list (list): list of BioCypherNodes to be written
        label (str): the primary label of the node
        prop_dict (dict): properties of node class passed from parsing
            function and their types
        labels (str): string of one or several concatenated labels
            for the node class

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    if not all(isinstance(n, BioCypherNode) for n in node_list):
        logger.error("Nodes must be passed as type BioCypherNode.")
        return False

    # from list of nodes to list of strings
    lines = []

    for n in node_list:
        # check for deviations in properties
        # node properties
        n_props = n.get_properties()
        n_keys = list(n_props.keys())
        # reference properties
        ref_props = list(prop_dict.keys())

        # compare lists order invariant
        if set(ref_props) != set(n_keys):
            onode = n.get_id()
            oprop1 = set(ref_props).difference(n_keys)
            oprop2 = set(n_keys).difference(ref_props)
            logger.error(
                f"At least one node of the class {n.get_label()} "
                f"has more or fewer properties than another. "
                f"Offending node: {onode!r}, offending property: "
                f"{max([oprop1, oprop2])}. "
                f"All reference properties: {ref_props}, "
                f"All node properties: {n_keys}.",
            )
            return False

        line = [n.get_id()]

        if ref_props:
            plist = []
            # make all into strings, put actual strings in quotes
            for k, v in prop_dict.items():
                p = n_props.get(k)
                if p is None:  # TODO make field empty instead of ""?
                    plist.append("")
                elif v in [
                    "int",
                    "integer",
                    "long",
                    "float",
                    "double",
                    "dbl",
                    "bool",
                    "boolean",
                ]:
                    plist.append(str(p))
                elif isinstance(p, list):
                    plist.append(self._write_array_string(p))
                else:
                    plist.append(f"{self.quote}{p!s}{self.quote}")

            line.append(self.delim.join(plist))
        line.append(labels)

        lines.append(self.delim.join(line) + "\n")

    # avoid writing empty files
    if lines:
        self._write_next_part(label, lines)

    return True

get_import_call()

Eeturn the import call.

Return the import call detailing folder and individual node and edge headers and data files, as well as delimiters and database name.

Returns
str: a bash command for the database import
Source code in biocypher/output/write/_batch_writer.py
def get_import_call(self) -> str:
    """Eeturn the import call.

    Return the import call detailing folder and individual node and
    edge headers and data files, as well as delimiters and database name.

    Returns
    -------
        str: a bash command for the database import

    """
    return self._construct_import_call()

write_edges(edges, batch_size=int(1000000.0))

Write edges and their headers.


edges (BioCypherEdge): a list or generator of edges in
    :py:class:`BioCypherEdge` or :py:class:`BioCypherRelAsNode`
    format

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_batch_writer.py
def write_edges(
    self,
    edges: list | GeneratorType,
    batch_size: int = int(1e6),
) -> bool:
    """Write edges and their headers.

    Args:
    ----
        edges (BioCypherEdge): a list or generator of edges in
            :py:class:`BioCypherEdge` or :py:class:`BioCypherRelAsNode`
            format

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    passed = False
    edges = list(edges)  # force evaluation to handle empty generator
    if edges:
        nodes_flat = []
        edges_flat = []
        for edge in edges:
            if isinstance(edge, BioCypherRelAsNode):
                # check if relationship has already been written, if so skip
                if self.deduplicator.rel_as_node_seen(edge):
                    continue

                nodes_flat.append(edge.get_node())
                edges_flat.append(edge.get_source_edge())
                edges_flat.append(edge.get_target_edge())

            else:
                # check if relationship has already been written, if so skip
                if self.deduplicator.edge_seen(edge):
                    continue

                edges_flat.append(edge)

        if nodes_flat and edges_flat:
            passed = self.write_nodes(nodes_flat) and self._write_edge_data(
                edges_flat,
                batch_size,
            )
        else:
            passed = self._write_edge_data(edges_flat, batch_size)

    else:
        # is this a problem? if the generator or list is empty, we
        # don't write anything.
        logger.debug(
            "No edges to write, possibly due to no matched Biolink classes.",
        )

    if not passed:
        logger.error("Error while writing edge data.")
        return False
    # pass property data to header writer per edge type written
    passed = self._write_edge_headers()
    if not passed:
        logger.error("Error while writing edge headers.")
        return False

    return True

write_import_call()

Write the import call.

Function to write the import call detailing folder and individual node and edge headers and data files, as well as delimiters and database name, to the export folder as txt.

Returns
str: The path of the file holding the import call.
Source code in biocypher/output/write/_batch_writer.py
def write_import_call(self) -> str:
    """Write the import call.

    Function to write the import call detailing folder and
    individual node and edge headers and data files, as well as
    delimiters and database name, to the export folder as txt.

    Returns
    -------
        str: The path of the file holding the import call.

    """
    file_path = os.path.join(self.outdir, self._get_import_script_name())
    logger.info(f"Writing {self.db_name + ' ' if self.db_name else ''}import call to `{file_path}`.")

    with open(file_path, "w", encoding="utf-8") as f:
        f.write(self._construct_import_call())

    return file_path

write_nodes(nodes, batch_size=int(1000000.0), force=False)

Write nodes and their headers.


nodes (BioCypherNode): a list or generator of nodes in
    :py:class:`BioCypherNode` format

batch_size (int): The batch size for writing nodes.

force (bool): Whether to force writing nodes even if their type is
    not present in the schema.

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/_batch_writer.py
def write_nodes(self, nodes, batch_size: int = int(1e6), force: bool = False):
    """Write nodes and their headers.

    Args:
    ----
        nodes (BioCypherNode): a list or generator of nodes in
            :py:class:`BioCypherNode` format

        batch_size (int): The batch size for writing nodes.

        force (bool): Whether to force writing nodes even if their type is
            not present in the schema.


    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    # TODO check represented_as

    # write node data
    passed = self._write_node_data(nodes, batch_size, force)
    if not passed:
        logger.error("Error while writing node data.")
        return False
    # pass property data to header writer per node type written
    passed = self._write_node_headers()
    if not passed:
        logger.error("Error while writing node headers.")
        return False

    return True

Neo4j Batch Writer

Bases: _BatchWriter

Class for writing node and edge representations to disk using the format specified by Neo4j for the use of admin import. Each batch writer instance has a fixed representation that needs to be passed at instantiation via the 🇵🇾attr:schema argument. The instance also expects an ontology adapter via 🇵🇾attr:ontology_adapter to be able to convert and extend the hierarchy.

This class inherits from the abstract class "_BatchWriter" and implements the Neo4j-specific methods:

- _write_node_headers
- _write_edge_headers
- _construct_import_call
- _write_array_string
Source code in biocypher/output/write/graph/_neo4j.py
class _Neo4jBatchWriter(_BatchWriter):
    """Class for writing node and edge representations to disk using the
    format specified by Neo4j for the use of admin import. Each batch
    writer instance has a fixed representation that needs to be passed
    at instantiation via the :py:attr:`schema` argument. The instance
    also expects an ontology adapter via :py:attr:`ontology_adapter` to be able
    to convert and extend the hierarchy.

    This class inherits from the abstract class "_BatchWriter" and implements the
    Neo4j-specific methods:

        - _write_node_headers
        - _write_edge_headers
        - _construct_import_call
        - _write_array_string
    """

    def __init__(self, *args, **kwargs):
        """Constructor.

        Check the version of Neo4j and adds a command scope if version >= 5.

        Returns
        -------
            _Neo4jBatchWriter: An instance of the writer.

        """
        # Should read the configuration and setup import_call_bin_prefix.
        super().__init__(*args, **kwargs)

    def _get_default_import_call_bin_prefix(self):
        """Method to provide the default string for the import call bin prefix.

        Returns
        -------
            str: The default location for the neo4j admin import location

        """
        return "bin/"

    def _quote_string(self, value: str) -> str:
        """Quote a string. Quote character is escaped by doubling it."""
        return f"{self.quote}{value.replace(self.quote, self.quote * 2)}{self.quote}"

    def _write_array_string(self, string_list):
        """Abstract method to output.write the string representation of an array into a .csv file
        as required by the neo4j admin-import.

        Args:
        ----
            string_list (list): list of ontology strings

        Returns:
        -------
            str: The string representation of an array for the neo4j admin import

        """
        string = self.adelim.join(string_list)
        return self._quote_string(string)

    def _write_node_headers(self):
        """Writes single CSV file for a graph entity that is represented
        as a node as per the definition in the `schema_config.yaml`,
        containing only the header for this type of node.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        # load headers from data parse
        if not self.node_property_dict:
            logger.error(
                "Header information not found. Was the data parsed first?",
            )
            return False

        for label, props in self.node_property_dict.items():
            _id = ":ID"

            # translate label to PascalCase
            pascal_label = self.translator.name_sentence_to_pascal(parse_label(label))

            header = f"{pascal_label}-header.csv"
            header_path = os.path.join(
                self.outdir,
                header,
            )
            parts = f"{pascal_label}-part.*"

            # check if file already exists
            if os.path.exists(header_path):
                logger.warning(
                    f"Header file `{header_path}` already exists. Overwriting.",
                )

            # concatenate key:value in props
            props_list = []
            for k, v in props.items():
                if v in ["int", "long", "integer"]:
                    props_list.append(f"{k}:long")
                elif v in ["int[]", "long[]", "integer[]"]:
                    props_list.append(f"{k}:long[]")
                elif v in ["float", "double", "dbl"]:
                    props_list.append(f"{k}:double")
                elif v in ["float[]", "double[]"]:
                    props_list.append(f"{k}:double[]")
                elif v in ["bool", "boolean"]:
                    # TODO Neo4j boolean support / spelling?
                    props_list.append(f"{k}:boolean")
                elif v in ["bool[]", "boolean[]"]:
                    props_list.append(f"{k}:boolean[]")
                elif v in ["str[]", "string[]"]:
                    props_list.append(f"{k}:string[]")
                else:
                    props_list.append(f"{k}")

            # create list of lists and flatten
            out_list = [[_id], props_list, [":LABEL"]]
            out_list = [val for sublist in out_list for val in sublist]

            with open(header_path, "w", encoding="utf-8") as f:
                # concatenate with delimiter
                row = self.delim.join(out_list)
                f.write(row)

            # add file path to neo4 admin import statement (import call file
            # path may be different from actual file path)
            import_call_header_path = os.path.join(
                self.import_call_file_prefix,
                header,
            )
            import_call_parts_path = os.path.join(
                self.import_call_file_prefix,
                parts,
            )
            self.import_call_nodes.add((import_call_header_path, import_call_parts_path))

        return True

    def _write_edge_headers(self):
        """Writes single CSV file for a graph entity that is represented
        as an edge as per the definition in the `schema_config.yaml`,
        containing only the header for this type of edge.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        # load headers from data parse
        if not self.edge_property_dict:
            logger.error(
                "Header information not found. Was the data parsed first?",
            )
            return False

        for label, props in self.edge_property_dict.items():
            # translate label to PascalCase
            pascal_label = self.translator.name_sentence_to_pascal(parse_label(label))

            # paths
            header = f"{pascal_label}-header.csv"
            header_path = os.path.join(
                self.outdir,
                header,
            )
            parts = f"{pascal_label}-part.*"

            # check for file exists
            if os.path.exists(header_path):
                logger.warning(f"File {header_path} already exists. Overwriting.")

            # concatenate key:value in props
            props_list = []
            for k, v in props.items():
                if v in ["int", "long", "integer"]:
                    props_list.append(f"{k}:long")
                elif v in ["int[]", "long[]", "integer[]"]:
                    props_list.append(f"{k}:long[]")
                elif v in ["float", "double"]:
                    props_list.append(f"{k}:double")
                elif v in ["float[]", "double[]"]:
                    props_list.append(f"{k}:double[]")
                elif v in [
                    "bool",
                    "boolean",
                ]:  # TODO does Neo4j support bool?
                    props_list.append(f"{k}:boolean")
                elif v in ["bool[]", "boolean[]"]:
                    props_list.append(f"{k}:boolean[]")
                elif v in ["str[]", "string[]"]:
                    props_list.append(f"{k}:string[]")
                else:
                    props_list.append(f"{k}")

            skip_id = False
            schema_label = None

            if label in ["IS_SOURCE_OF", "IS_TARGET_OF", "IS_PART_OF"]:
                skip_id = True
            elif not self.translator.ontology.mapping.extended_schema.get(label):
                # find label in schema by label_as_edge
                for (
                    k,
                    v,
                ) in self.translator.ontology.mapping.extended_schema.items():
                    if v.get("label_as_edge") == label:
                        schema_label = k
                        break
            else:
                schema_label = label

            out_list = [":START_ID"]

            if schema_label:
                if (
                    self.translator.ontology.mapping.extended_schema.get(  # (seems to not work with 'not')
                        schema_label,
                    ).get("use_id")
                    == False  # noqa: E712 (seems to not work with 'not')
                ):
                    skip_id = True

            if not skip_id:
                out_list.append("id")

            out_list.extend(props_list)
            out_list.extend([":END_ID", ":TYPE"])

            with open(header_path, "w", encoding="utf-8") as f:
                # concatenate with delimiter
                row = self.delim.join(out_list)
                f.write(row)

            # add file path to neo4 admin import statement (import call file
            # path may be different from actual file path)
            import_call_header_path = os.path.join(
                self.import_call_file_prefix,
                header,
            )
            import_call_parts_path = os.path.join(
                self.import_call_file_prefix,
                parts,
            )
            self.import_call_edges.add((import_call_header_path, import_call_parts_path))

        return True

    def _get_import_script_name(self) -> str:
        """Returns the name of the neo4j admin import script

        Returns
        -------
            str: The name of the import script (ending in .sh)

        """
        return "neo4j-admin-import-call.sh"

    def _construct_import_call(self) -> str:
        """Function to construct the import call detailing folder and
        individual node and edge headers and data files, as well as
        delimiters and database name. Built after all data has been
        processed to ensure that nodes are called before any edges.

        Returns
        -------
            str: a bash command for neo4j-admin import

        """
        import_call_neo4j_v4 = self._get_import_call("import", "--database=", "--force=")
        import_call_neo4j_v5 = self._get_import_call("database import full", "", "--overwrite-destination=")
        neo4j_version_check = (
            f"version=$({self._get_default_import_call_bin_prefix()}neo4j-admin --version | cut -d '.' -f 1)"
        )

        import_script = (
            f"#!/bin/bash\n{neo4j_version_check}\nif [[ $version -ge 5 ]]; "
            f"then\n\t{import_call_neo4j_v5}\nelse\n\t{import_call_neo4j_v4}\nfi"
        )
        return import_script

    def _get_import_call(self, import_cmd: str, database_cmd: str, wipe_cmd: str) -> str:
        """Get parametrized import call for Neo4j 4 or 5+.

        Args:
        ----
            import_cmd (str): The import command to use.
            database_cmd (str): The database command to use.
            wipe_cmd (str): The wipe command to use.

        Returns:
        -------
            str: The import call.

        """
        import_call = f"{self.import_call_bin_prefix}neo4j-admin {import_cmd} "

        import_call += f"{database_cmd}{self.db_name} "

        import_call += f'--delimiter="{self.escaped_delim}" '

        import_call += f'--array-delimiter="{self.escaped_adelim}" '

        if self.quote == "'":
            import_call += f'--quote="{self.quote}" '
        else:
            import_call += f"--quote='{self.quote}' "

        if self.wipe:
            import_call += f"{wipe_cmd}true "
        if self.skip_bad_relationships:
            import_call += "--skip-bad-relationships=true "
        if self.skip_duplicate_nodes:
            import_call += "--skip-duplicate-nodes=true "

        # append node import calls
        for header_path, parts_path in self.import_call_nodes:
            import_call += f'--nodes="{header_path},{parts_path}" '

        # append edge import calls
        for header_path, parts_path in self.import_call_edges:
            import_call += f'--relationships="{header_path},{parts_path}" '

        return import_call

__init__(*args, **kwargs)

Constructor.

Check the version of Neo4j and adds a command scope if version >= 5.

Returns
_Neo4jBatchWriter: An instance of the writer.
Source code in biocypher/output/write/graph/_neo4j.py
def __init__(self, *args, **kwargs):
    """Constructor.

    Check the version of Neo4j and adds a command scope if version >= 5.

    Returns
    -------
        _Neo4jBatchWriter: An instance of the writer.

    """
    # Should read the configuration and setup import_call_bin_prefix.
    super().__init__(*args, **kwargs)

_construct_import_call()

Function to construct the import call detailing folder and individual node and edge headers and data files, as well as delimiters and database name. Built after all data has been processed to ensure that nodes are called before any edges.

Returns
str: a bash command for neo4j-admin import
Source code in biocypher/output/write/graph/_neo4j.py
def _construct_import_call(self) -> str:
    """Function to construct the import call detailing folder and
    individual node and edge headers and data files, as well as
    delimiters and database name. Built after all data has been
    processed to ensure that nodes are called before any edges.

    Returns
    -------
        str: a bash command for neo4j-admin import

    """
    import_call_neo4j_v4 = self._get_import_call("import", "--database=", "--force=")
    import_call_neo4j_v5 = self._get_import_call("database import full", "", "--overwrite-destination=")
    neo4j_version_check = (
        f"version=$({self._get_default_import_call_bin_prefix()}neo4j-admin --version | cut -d '.' -f 1)"
    )

    import_script = (
        f"#!/bin/bash\n{neo4j_version_check}\nif [[ $version -ge 5 ]]; "
        f"then\n\t{import_call_neo4j_v5}\nelse\n\t{import_call_neo4j_v4}\nfi"
    )
    return import_script

_get_default_import_call_bin_prefix()

Method to provide the default string for the import call bin prefix.

Returns
str: The default location for the neo4j admin import location
Source code in biocypher/output/write/graph/_neo4j.py
def _get_default_import_call_bin_prefix(self):
    """Method to provide the default string for the import call bin prefix.

    Returns
    -------
        str: The default location for the neo4j admin import location

    """
    return "bin/"

_get_import_call(import_cmd, database_cmd, wipe_cmd)

Get parametrized import call for Neo4j 4 or 5+.


import_cmd (str): The import command to use.
database_cmd (str): The database command to use.
wipe_cmd (str): The wipe command to use.

str: The import call.
Source code in biocypher/output/write/graph/_neo4j.py
def _get_import_call(self, import_cmd: str, database_cmd: str, wipe_cmd: str) -> str:
    """Get parametrized import call for Neo4j 4 or 5+.

    Args:
    ----
        import_cmd (str): The import command to use.
        database_cmd (str): The database command to use.
        wipe_cmd (str): The wipe command to use.

    Returns:
    -------
        str: The import call.

    """
    import_call = f"{self.import_call_bin_prefix}neo4j-admin {import_cmd} "

    import_call += f"{database_cmd}{self.db_name} "

    import_call += f'--delimiter="{self.escaped_delim}" '

    import_call += f'--array-delimiter="{self.escaped_adelim}" '

    if self.quote == "'":
        import_call += f'--quote="{self.quote}" '
    else:
        import_call += f"--quote='{self.quote}' "

    if self.wipe:
        import_call += f"{wipe_cmd}true "
    if self.skip_bad_relationships:
        import_call += "--skip-bad-relationships=true "
    if self.skip_duplicate_nodes:
        import_call += "--skip-duplicate-nodes=true "

    # append node import calls
    for header_path, parts_path in self.import_call_nodes:
        import_call += f'--nodes="{header_path},{parts_path}" '

    # append edge import calls
    for header_path, parts_path in self.import_call_edges:
        import_call += f'--relationships="{header_path},{parts_path}" '

    return import_call

_get_import_script_name()

Returns the name of the neo4j admin import script

Returns
str: The name of the import script (ending in .sh)
Source code in biocypher/output/write/graph/_neo4j.py
def _get_import_script_name(self) -> str:
    """Returns the name of the neo4j admin import script

    Returns
    -------
        str: The name of the import script (ending in .sh)

    """
    return "neo4j-admin-import-call.sh"

_quote_string(value)

Quote a string. Quote character is escaped by doubling it.

Source code in biocypher/output/write/graph/_neo4j.py
def _quote_string(self, value: str) -> str:
    """Quote a string. Quote character is escaped by doubling it."""
    return f"{self.quote}{value.replace(self.quote, self.quote * 2)}{self.quote}"

_write_array_string(string_list)

Abstract method to output.write the string representation of an array into a .csv file as required by the neo4j admin-import.


string_list (list): list of ontology strings

str: The string representation of an array for the neo4j admin import
Source code in biocypher/output/write/graph/_neo4j.py
def _write_array_string(self, string_list):
    """Abstract method to output.write the string representation of an array into a .csv file
    as required by the neo4j admin-import.

    Args:
    ----
        string_list (list): list of ontology strings

    Returns:
    -------
        str: The string representation of an array for the neo4j admin import

    """
    string = self.adelim.join(string_list)
    return self._quote_string(string)

_write_edge_headers()

Writes single CSV file for a graph entity that is represented as an edge as per the definition in the schema_config.yaml, containing only the header for this type of edge.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/graph/_neo4j.py
def _write_edge_headers(self):
    """Writes single CSV file for a graph entity that is represented
    as an edge as per the definition in the `schema_config.yaml`,
    containing only the header for this type of edge.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    # load headers from data parse
    if not self.edge_property_dict:
        logger.error(
            "Header information not found. Was the data parsed first?",
        )
        return False

    for label, props in self.edge_property_dict.items():
        # translate label to PascalCase
        pascal_label = self.translator.name_sentence_to_pascal(parse_label(label))

        # paths
        header = f"{pascal_label}-header.csv"
        header_path = os.path.join(
            self.outdir,
            header,
        )
        parts = f"{pascal_label}-part.*"

        # check for file exists
        if os.path.exists(header_path):
            logger.warning(f"File {header_path} already exists. Overwriting.")

        # concatenate key:value in props
        props_list = []
        for k, v in props.items():
            if v in ["int", "long", "integer"]:
                props_list.append(f"{k}:long")
            elif v in ["int[]", "long[]", "integer[]"]:
                props_list.append(f"{k}:long[]")
            elif v in ["float", "double"]:
                props_list.append(f"{k}:double")
            elif v in ["float[]", "double[]"]:
                props_list.append(f"{k}:double[]")
            elif v in [
                "bool",
                "boolean",
            ]:  # TODO does Neo4j support bool?
                props_list.append(f"{k}:boolean")
            elif v in ["bool[]", "boolean[]"]:
                props_list.append(f"{k}:boolean[]")
            elif v in ["str[]", "string[]"]:
                props_list.append(f"{k}:string[]")
            else:
                props_list.append(f"{k}")

        skip_id = False
        schema_label = None

        if label in ["IS_SOURCE_OF", "IS_TARGET_OF", "IS_PART_OF"]:
            skip_id = True
        elif not self.translator.ontology.mapping.extended_schema.get(label):
            # find label in schema by label_as_edge
            for (
                k,
                v,
            ) in self.translator.ontology.mapping.extended_schema.items():
                if v.get("label_as_edge") == label:
                    schema_label = k
                    break
        else:
            schema_label = label

        out_list = [":START_ID"]

        if schema_label:
            if (
                self.translator.ontology.mapping.extended_schema.get(  # (seems to not work with 'not')
                    schema_label,
                ).get("use_id")
                == False  # noqa: E712 (seems to not work with 'not')
            ):
                skip_id = True

        if not skip_id:
            out_list.append("id")

        out_list.extend(props_list)
        out_list.extend([":END_ID", ":TYPE"])

        with open(header_path, "w", encoding="utf-8") as f:
            # concatenate with delimiter
            row = self.delim.join(out_list)
            f.write(row)

        # add file path to neo4 admin import statement (import call file
        # path may be different from actual file path)
        import_call_header_path = os.path.join(
            self.import_call_file_prefix,
            header,
        )
        import_call_parts_path = os.path.join(
            self.import_call_file_prefix,
            parts,
        )
        self.import_call_edges.add((import_call_header_path, import_call_parts_path))

    return True

_write_node_headers()

Writes single CSV file for a graph entity that is represented as a node as per the definition in the schema_config.yaml, containing only the header for this type of node.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/graph/_neo4j.py
def _write_node_headers(self):
    """Writes single CSV file for a graph entity that is represented
    as a node as per the definition in the `schema_config.yaml`,
    containing only the header for this type of node.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    # load headers from data parse
    if not self.node_property_dict:
        logger.error(
            "Header information not found. Was the data parsed first?",
        )
        return False

    for label, props in self.node_property_dict.items():
        _id = ":ID"

        # translate label to PascalCase
        pascal_label = self.translator.name_sentence_to_pascal(parse_label(label))

        header = f"{pascal_label}-header.csv"
        header_path = os.path.join(
            self.outdir,
            header,
        )
        parts = f"{pascal_label}-part.*"

        # check if file already exists
        if os.path.exists(header_path):
            logger.warning(
                f"Header file `{header_path}` already exists. Overwriting.",
            )

        # concatenate key:value in props
        props_list = []
        for k, v in props.items():
            if v in ["int", "long", "integer"]:
                props_list.append(f"{k}:long")
            elif v in ["int[]", "long[]", "integer[]"]:
                props_list.append(f"{k}:long[]")
            elif v in ["float", "double", "dbl"]:
                props_list.append(f"{k}:double")
            elif v in ["float[]", "double[]"]:
                props_list.append(f"{k}:double[]")
            elif v in ["bool", "boolean"]:
                # TODO Neo4j boolean support / spelling?
                props_list.append(f"{k}:boolean")
            elif v in ["bool[]", "boolean[]"]:
                props_list.append(f"{k}:boolean[]")
            elif v in ["str[]", "string[]"]:
                props_list.append(f"{k}:string[]")
            else:
                props_list.append(f"{k}")

        # create list of lists and flatten
        out_list = [[_id], props_list, [":LABEL"]]
        out_list = [val for sublist in out_list for val in sublist]

        with open(header_path, "w", encoding="utf-8") as f:
            # concatenate with delimiter
            row = self.delim.join(out_list)
            f.write(row)

        # add file path to neo4 admin import statement (import call file
        # path may be different from actual file path)
        import_call_header_path = os.path.join(
            self.import_call_file_prefix,
            header,
        )
        import_call_parts_path = os.path.join(
            self.import_call_file_prefix,
            parts,
        )
        self.import_call_nodes.add((import_call_header_path, import_call_parts_path))

    return True

ArangoDB Batch Writer

Bases: _Neo4jBatchWriter

Class for writing node and edge representations to disk.

Uses the format specified by ArangoDB for the use of "arangoimport". Output files are similar to Neo4j, but with a different header format.

Source code in biocypher/output/write/graph/_arangodb.py
class _ArangoDBBatchWriter(_Neo4jBatchWriter):
    """Class for writing node and edge representations to disk.

    Uses the format specified by ArangoDB for the use of "arangoimport".
    Output files are similar to Neo4j, but with a different header format.
    """

    def _get_default_import_call_bin_prefix(self):
        """Provide the default string for the import call bin prefix.

        Returns
        -------
            str: The default location for the neo4j admin import location

        """
        return ""

    def _get_import_script_name(self) -> str:
        """Return the name of the neo4j admin import script.

        Returns
        -------
            str: The name of the import script (ending in .sh)

        """
        return "arangodb-import-call.sh"

    def _write_node_headers(self):
        """Write single CSV file for a graph entity.

        The graph entity is represented as a node as per the definition
        in the `schema_config.yaml`, containing only the header for this type
        of node.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        # load headers from data parse
        if not self.node_property_dict:
            logger.error(
                "Header information not found. Was the data parsed first?",
            )
            return False

        for label, props in self.node_property_dict.items():
            # create header CSV with ID, properties, labels

            _id = "_key"

            # translate label to PascalCase
            pascal_label = self.translator.name_sentence_to_pascal(label)

            header = f"{pascal_label}-header.csv"
            header_path = os.path.join(
                self.outdir,
                header,
            )

            # check if file already exists
            if os.path.exists(header_path):
                logger.warning(f"File {header_path} already exists. Overwriting.")

            # concatenate key:value in props
            props_list = []
            for k in props.keys():
                props_list.append(f"{k}")

            # create list of lists and flatten
            # removes need for empty check of property list
            out_list = [[_id], props_list]
            out_list = [val for sublist in out_list for val in sublist]

            with open(header_path, "w", encoding="utf-8") as f:
                # concatenate with delimiter
                row = self.delim.join(out_list)
                f.write(row)

            # add collection from schema config
            collection = self.translator.ontology.mapping.extended_schema[label].get("db_collection_name", None)

            # add file path to neo4 admin import statement
            # do once for each part file
            parts = self.parts.get(label, [])

            if not parts:
                msg = f"No parts found for node label {label}. Check that the data was parsed first."
                logger.error(msg)
                raise ValueError(msg)

            for part in parts:
                import_call_header_path = os.path.join(
                    self.import_call_file_prefix,
                    header,
                )
                import_call_parts_path = os.path.join(
                    self.import_call_file_prefix,
                    part,
                )

                self.import_call_nodes.add(
                    (
                        import_call_header_path,
                        import_call_parts_path,
                        collection,
                    ),
                )

        return True

    def _write_edge_headers(self):
        """Write single CSV file for a graph entity.

        The graph entity is represented as an edge as per the definition
        in the `schema_config.yaml`, containing only the header for this type
        of edge.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        # load headers from data parse
        if not self.edge_property_dict:
            logger.error(
                "Header information not found. Was the data parsed first?",
            )
            return False

        for label, props in self.edge_property_dict.items():
            # translate label to PascalCase
            pascal_label = self.translator.name_sentence_to_pascal(label)

            # paths
            header = f"{pascal_label}-header.csv"
            header_path = os.path.join(
                self.outdir,
                header,
            )
            parts = f"{pascal_label}-part.*"

            # check for file exists
            if os.path.exists(header_path):
                logger.warning(f"Header file {header_path} already exists. Overwriting.")

            # concatenate key:value in props
            props_list = []
            for k in props.keys():
                props_list.append(f"{k}")

            out_list = ["_from", "_key", *props_list, "_to"]

            with open(header_path, "w", encoding="utf-8") as f:
                # concatenate with delimiter
                row = self.delim.join(out_list)
                f.write(row)

            # add collection from schema config
            if not self.translator.ontology.mapping.extended_schema.get(label):
                for (
                    _,
                    v,
                ) in self.translator.ontology.mapping.extended_schema.items():
                    if v.get("label_as_edge") == label:
                        collection = v.get("db_collection_name", None)
                        break

            else:
                collection = self.translator.ontology.mapping.extended_schema[label].get("db_collection_name", None)

            # add file path to neo4 admin import statement (import call path
            # may be different from actual output path)
            header_import_call_path = os.path.join(
                self.import_call_file_prefix,
                header,
            )
            parts_import_call_path = os.path.join(
                self.import_call_file_prefix,
                parts,
            )
            self.import_call_edges.add(
                (
                    header_import_call_path,
                    parts_import_call_path,
                    collection,
                ),
            )

        return True

    def _construct_import_call(self) -> str:
        """Construct the import call.

        Details folder and individual node and edge headers and data files,
        as well as delimiters and database name. Built after all data has been
        processed to ensure that nodes are called before any edges.

        Returns
        -------
            str: a bash command for arangoimport

        """
        import_call = f"{self.import_call_bin_prefix}arangoimp --type csv " f'--separator="{self.escaped_delim}" '

        if self.quote == "'":
            import_call += f'--quote="{self.quote}" '
        else:
            import_call += f"--quote='{self.quote}' "

        node_lines = ""

        # node import calls: one line per node type
        for header_path, parts_path, collection in self.import_call_nodes:
            line = f"{import_call} --headers-file {header_path} --file= {parts_path} "

            if collection:
                line += f"--create-collection --collection {collection} "

            node_lines += f"{line}\n"

        edge_lines = ""

        # edge import calls: one line per edge type
        for header_path, parts_path, collection in self.import_call_edges:
            import_call += f'--relationships="{header_path},{parts_path}" '

        return node_lines + edge_lines

_construct_import_call()

Construct the import call.

Details folder and individual node and edge headers and data files, as well as delimiters and database name. Built after all data has been processed to ensure that nodes are called before any edges.

Returns
str: a bash command for arangoimport
Source code in biocypher/output/write/graph/_arangodb.py
def _construct_import_call(self) -> str:
    """Construct the import call.

    Details folder and individual node and edge headers and data files,
    as well as delimiters and database name. Built after all data has been
    processed to ensure that nodes are called before any edges.

    Returns
    -------
        str: a bash command for arangoimport

    """
    import_call = f"{self.import_call_bin_prefix}arangoimp --type csv " f'--separator="{self.escaped_delim}" '

    if self.quote == "'":
        import_call += f'--quote="{self.quote}" '
    else:
        import_call += f"--quote='{self.quote}' "

    node_lines = ""

    # node import calls: one line per node type
    for header_path, parts_path, collection in self.import_call_nodes:
        line = f"{import_call} --headers-file {header_path} --file= {parts_path} "

        if collection:
            line += f"--create-collection --collection {collection} "

        node_lines += f"{line}\n"

    edge_lines = ""

    # edge import calls: one line per edge type
    for header_path, parts_path, collection in self.import_call_edges:
        import_call += f'--relationships="{header_path},{parts_path}" '

    return node_lines + edge_lines

_get_default_import_call_bin_prefix()

Provide the default string for the import call bin prefix.

Returns
str: The default location for the neo4j admin import location
Source code in biocypher/output/write/graph/_arangodb.py
def _get_default_import_call_bin_prefix(self):
    """Provide the default string for the import call bin prefix.

    Returns
    -------
        str: The default location for the neo4j admin import location

    """
    return ""

_get_import_script_name()

Return the name of the neo4j admin import script.

Returns
str: The name of the import script (ending in .sh)
Source code in biocypher/output/write/graph/_arangodb.py
def _get_import_script_name(self) -> str:
    """Return the name of the neo4j admin import script.

    Returns
    -------
        str: The name of the import script (ending in .sh)

    """
    return "arangodb-import-call.sh"

_write_edge_headers()

Write single CSV file for a graph entity.

The graph entity is represented as an edge as per the definition in the schema_config.yaml, containing only the header for this type of edge.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/graph/_arangodb.py
def _write_edge_headers(self):
    """Write single CSV file for a graph entity.

    The graph entity is represented as an edge as per the definition
    in the `schema_config.yaml`, containing only the header for this type
    of edge.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    # load headers from data parse
    if not self.edge_property_dict:
        logger.error(
            "Header information not found. Was the data parsed first?",
        )
        return False

    for label, props in self.edge_property_dict.items():
        # translate label to PascalCase
        pascal_label = self.translator.name_sentence_to_pascal(label)

        # paths
        header = f"{pascal_label}-header.csv"
        header_path = os.path.join(
            self.outdir,
            header,
        )
        parts = f"{pascal_label}-part.*"

        # check for file exists
        if os.path.exists(header_path):
            logger.warning(f"Header file {header_path} already exists. Overwriting.")

        # concatenate key:value in props
        props_list = []
        for k in props.keys():
            props_list.append(f"{k}")

        out_list = ["_from", "_key", *props_list, "_to"]

        with open(header_path, "w", encoding="utf-8") as f:
            # concatenate with delimiter
            row = self.delim.join(out_list)
            f.write(row)

        # add collection from schema config
        if not self.translator.ontology.mapping.extended_schema.get(label):
            for (
                _,
                v,
            ) in self.translator.ontology.mapping.extended_schema.items():
                if v.get("label_as_edge") == label:
                    collection = v.get("db_collection_name", None)
                    break

        else:
            collection = self.translator.ontology.mapping.extended_schema[label].get("db_collection_name", None)

        # add file path to neo4 admin import statement (import call path
        # may be different from actual output path)
        header_import_call_path = os.path.join(
            self.import_call_file_prefix,
            header,
        )
        parts_import_call_path = os.path.join(
            self.import_call_file_prefix,
            parts,
        )
        self.import_call_edges.add(
            (
                header_import_call_path,
                parts_import_call_path,
                collection,
            ),
        )

    return True

_write_node_headers()

Write single CSV file for a graph entity.

The graph entity is represented as a node as per the definition in the schema_config.yaml, containing only the header for this type of node.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/graph/_arangodb.py
def _write_node_headers(self):
    """Write single CSV file for a graph entity.

    The graph entity is represented as a node as per the definition
    in the `schema_config.yaml`, containing only the header for this type
    of node.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    # load headers from data parse
    if not self.node_property_dict:
        logger.error(
            "Header information not found. Was the data parsed first?",
        )
        return False

    for label, props in self.node_property_dict.items():
        # create header CSV with ID, properties, labels

        _id = "_key"

        # translate label to PascalCase
        pascal_label = self.translator.name_sentence_to_pascal(label)

        header = f"{pascal_label}-header.csv"
        header_path = os.path.join(
            self.outdir,
            header,
        )

        # check if file already exists
        if os.path.exists(header_path):
            logger.warning(f"File {header_path} already exists. Overwriting.")

        # concatenate key:value in props
        props_list = []
        for k in props.keys():
            props_list.append(f"{k}")

        # create list of lists and flatten
        # removes need for empty check of property list
        out_list = [[_id], props_list]
        out_list = [val for sublist in out_list for val in sublist]

        with open(header_path, "w", encoding="utf-8") as f:
            # concatenate with delimiter
            row = self.delim.join(out_list)
            f.write(row)

        # add collection from schema config
        collection = self.translator.ontology.mapping.extended_schema[label].get("db_collection_name", None)

        # add file path to neo4 admin import statement
        # do once for each part file
        parts = self.parts.get(label, [])

        if not parts:
            msg = f"No parts found for node label {label}. Check that the data was parsed first."
            logger.error(msg)
            raise ValueError(msg)

        for part in parts:
            import_call_header_path = os.path.join(
                self.import_call_file_prefix,
                header,
            )
            import_call_parts_path = os.path.join(
                self.import_call_file_prefix,
                part,
            )

            self.import_call_nodes.add(
                (
                    import_call_header_path,
                    import_call_parts_path,
                    collection,
                ),
            )

    return True

RDF Writer

Bases: _BatchWriter

Write BioCypher's property graph into an RDF format.

Uses rdflib and all the extensions it supports (RDF/XML, N3, NTriples, N-Quads, Turtle, TriX, Trig and JSON-LD). By default, the conversion is done keeping only the minimum information about node and edges, skipping all properties.

Source code in biocypher/output/write/graph/_rdf.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
class _RDFWriter(_BatchWriter):
    """Write BioCypher's property graph into an RDF format.

    Uses `rdflib` and all the extensions it supports (RDF/XML, N3, NTriples,
    N-Quads, Turtle, TriX, Trig and JSON-LD). By default, the conversion
    is done keeping only the minimum information about node and edges,
    skipping all properties.
    """

    def __init__(
        self,
        translator: Translator,
        deduplicator: Deduplicator,
        delimiter: str,
        array_delimiter: str = ",",
        quote: str = '"',
        output_directory: str | None = None,
        db_name: str = "neo4j",
        import_call_bin_prefix: str | None = None,
        import_call_file_prefix: str | None = None,
        wipe: bool = True,
        strict_mode: bool = False,
        skip_bad_relationships: bool = False,
        skip_duplicate_nodes: bool = False,
        db_user: str = None,
        db_password: str = None,
        db_host: str = None,
        db_port: str = None,
        file_format: str = None,
        rdf_namespaces: dict = {},
        labels_order: str = "Ascending",
        **kwargs,
    ):
        super().__init__(
            translator=translator,
            deduplicator=deduplicator,
            delimiter=delimiter,
            array_delimiter=array_delimiter,
            quote=quote,
            output_directory=output_directory,
            db_name=db_name,
            import_call_bin_prefix=import_call_bin_prefix,
            import_call_file_prefix=import_call_file_prefix,
            wipe=wipe,
            strict_mode=strict_mode,
            skip_bad_relationships=skip_bad_relationships,
            skip_duplicate_nodes=skip_duplicate_nodes,
            db_user=db_user,
            db_password=db_password,
            db_host=db_host,
            db_port=db_port,
            file_format=file_format,
            rdf_namespaces=rdf_namespaces,
            labels_order=labels_order,
            **kwargs,
        )
        if not self.rdf_namespaces:
            # For some reason, the config can pass
            # the None object.
            self.rdf_namespaces = {}

        if "rdf_format" in kwargs:
            logger.warning("The 'rdf_format' config option is deprecated, use 'file_format' instead.")
            if not file_format:
                format = kwargs["rdf_format"]
                logger.warning(f"I will set 'file_format: {format}' for you.")
                self.file_format = format
                kwargs.pop("rdf_format")
            logger.warning("NOTE: this warning will become an error in next versions.")

        if not file_format:
            msg = "You need to indicate a 'file_format'."
            logger.error(msg)
            raise RuntimeError(msg)

        self.namespaces = {}

    def _get_import_script_name(self) -> str:
        """Return the name of the RDF admin import script.

        This function is used for RDF export.

        Returns
        -------
            str: The name of the import script (ending in .sh)

        """
        return "rdf-import-call.sh"

    def _get_default_import_call_bin_prefix(self):
        """Provide the default string for the import call bin prefix.

        Returns
        -------
            str: The default location for the RDF admin import location

        """
        return "bin/"

    def _is_rdf_format_supported(self, file_format: str) -> bool:
        """Check if the specified RDF format is supported.

        Args:
        ----
            file_format (str): The RDF format to check.

        Returns:
        -------
            bool: Returns True if rdf format supported, False otherwise.

        """
        supported_formats = [
            "xml",
            "n3",
            "turtle",
            "ttl",
            "nt",
            "pretty-xml",
            "trix",
            "trig",
            "nquads",
            "json-ld",
        ]
        if file_format not in supported_formats:
            logger.error(
                f"Incorrect or unsupported RDF format: '{file_format}',"
                f"use one of the following: {', '.join(supported_formats)}.",
            )
            return False
        else:
            # Set the file extension to match the format
            if self.file_format == "turtle":
                self.extension = "ttl"
            else:
                self.extension = self.file_format
            return True

    def _write_single_edge_list_to_file(
        self,
        edge_list: list,
        label: str,
        prop_dict: dict,
    ):
        """Write a list of BioCypherEdges to an RDF file.

        Args:
        ----
            edge_list (list): list of BioCypherEdges to be written

            label (str): the label (type) of the edge

            prop_dict (dict): properties of node class passed from parsing
                function and their types

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        # NOTE: prop_dict is not used. Remove in next refactor.

        if not all(isinstance(n, BioCypherEdge) for n in edge_list):
            logger.error("Edges must be passed as type BioCypherEdge.")
            return False

        # translate label to PascalCase
        label_pascal = self.translator.name_sentence_to_pascal(label)

        # create file name
        file_name = os.path.join(self.outdir, f"{label_pascal}.{self.extension}")

        # write data in graph
        graph = Graph()
        self._init_namespaces(graph)

        for edge in edge_list:
            rdf_subject = edge.get_source_id()
            rdf_object = edge.get_target_id()
            rdf_predicate = edge.get_id()
            rdf_properties = edge.get_properties()
            if rdf_predicate is None:
                rdf_predicate = rdf_subject + rdf_object

            edge_label = self.translator.name_sentence_to_pascal(edge.get_label())
            edge_uri = self.as_uri(edge_label, "biocypher")
            graph.add((edge_uri, RDF.type, RDFS.Class))
            graph.add(
                (
                    self.as_uri(rdf_predicate, "biocypher"),
                    RDF.type,
                    edge_uri,
                ),
            )
            graph.add(
                (
                    self.as_uri(rdf_predicate, "biocypher"),
                    self.as_uri("subject", "biocypher"),
                    self.to_uri(rdf_subject),
                ),
            )
            graph.add(
                (
                    self.as_uri(rdf_predicate, "biocypher"),
                    self.as_uri("object", "biocypher"),
                    self.to_uri(rdf_object),
                ),
            )

            # add properties to the transformed edge --> node
            for key, value in rdf_properties.items():
                # only write value if it exists.
                if value:
                    self.add_property_to_graph(graph, rdf_predicate, value, key)

        graph.serialize(destination=file_name, format=self.file_format)

        logger.info(
            f"Writing {len(edge_list)} entries to {label_pascal}.{self.file_format}",
        )

        return True

    def add_property_to_graph(
        self,
        graph: Graph,
        rdf_subject: str,
        rdf_object: str,
        rdf_predicate: str,
    ):
        """Add the properties to an RDF node.

        It takes the graph, the subject, object, and predicate of the RDF
        triple. It checks if the property is a list and adds it to the graph
        accordingly. Otherwise it checks if the string represents a list. If it
        does, it transforms it to a list and adds it to the graph. If not, it
        adds the property to the graph as a literal. If the property is neither
        a list or string, it will also be added as a literal.

        Args:
        ----
            graph (RDFLib.Graph): The RDF graph to add the nodes to.

            rdf_subject (str): The subject of the RDF triple.

            rdf_object (str): The object of the RDF triple.

            rdf_predicate (str): The predicate of the RDF triple.

        Returns:
        -------
            None

        """
        if isinstance(rdf_object, list):
            for obj in rdf_object:
                graph.add(
                    (
                        self.to_uri(rdf_subject),
                        self.property_to_uri(rdf_predicate),
                        Literal(obj),
                    ),
                )
        elif isinstance(rdf_object, str):
            if rdf_object.startswith("[") and rdf_object.endswith("]"):
                self.add_property_to_graph(
                    graph,
                    rdf_subject,
                    self.transform_string_to_list(rdf_object),
                    rdf_predicate,
                )
            else:
                graph.add(
                    (
                        self.to_uri(rdf_subject),
                        self.property_to_uri(rdf_predicate),
                        Literal(rdf_object),
                    ),
                )
        else:
            graph.add(
                (
                    self.to_uri(rdf_subject),
                    self.property_to_uri(rdf_predicate),
                    Literal(rdf_object),
                ),
            )

    def transform_string_to_list(self, string_list: str) -> list:
        """Transform a string representation of a list into a list.

        Args:
        ----
            string_list (str): The string representation of the list.

        Returns:
        -------
            list: The list representation of the input string.

        """
        return string_list.replace("[", "").replace("]", "").replace("'", "").split(", ")

    def _write_single_node_list_to_file(
        self,
        node_list: list,
        label: str,
        prop_dict: dict,
        labels: str,
    ):
        """Write a list of BioCypherNodes to an RDF file.

        Args:
        ----
            node_list (list): A list of BioCypherNodes to be written.

            label (str): The label (type) of the nodes.

            prop_dict (dict): A dictionary of properties and their types for the node class.

            labels (str): string of one or several concatenated labels

        Returns:
        -------
            bool: True if the writing is successful, False otherwise.

        """
        # NOTE: labels and prop_dict are not used.

        if not all(isinstance(n, BioCypherNode) for n in node_list):
            logger.error("Nodes must be passed as type BioCypherNode.")
            return False

        # translate label to PascalCase
        label_pascal = self.translator.name_sentence_to_pascal(label)

        # create file name
        file_name = os.path.join(self.outdir, f"{label_pascal}.{self.extension}")

        # write data in graph
        graph = Graph()
        self._init_namespaces(graph)

        for n in node_list:
            rdf_subject = n.get_id()
            rdf_object = n.get_label()
            properties = n.get_properties()
            class_name = self.translator.name_sentence_to_pascal(rdf_object)
            graph.add(
                (
                    self.as_uri(class_name, "biocypher"),
                    RDF.type,
                    RDFS.Class,
                ),
            )
            graph.add(
                (
                    self.to_uri(rdf_subject),
                    RDF.type,
                    self.as_uri(class_name, "biocypher"),
                ),
            )
            for key, value in properties.items():
                # only write value if it exists.
                if value:
                    self.add_property_to_graph(graph, rdf_subject, value, key)

        graph.serialize(destination=file_name, format=self.file_format)

        logger.info(
            f"Writing {len(node_list)} entries to {label_pascal}.{self.file_format}",
        )

        return True

    def write_nodes(self, nodes, batch_size: int = int(1e6), force: bool = False) -> bool:
        """Write nodes in RDF format.

        Args:
        ----
            nodes (list or generator): A list or generator of nodes in
                BioCypherNode format.
            batch_size (int): The number of nodes to write in each batch.
            force (bool): Flag to force the writing even if the output file
                already exists.

        Returns:
        -------
            bool: True if the writing is successful, False otherwise.

        """
        # check if specified output format is correct
        passed = self._is_rdf_format_supported(self.file_format)
        if not passed:
            logger.error("Error while writing node data, wrong RDF format")
            return False
        # write node data using _write_node_data method
        passed = self._write_node_data(nodes, batch_size, force)
        if not passed:
            logger.error("Error while writing node data.")
            return False
        return True

    def write_edges(
        self,
        edges: list | GeneratorType,
        batch_size: int = int(1e6),
    ) -> bool:
        """Write edges in RDF format.

        Args:
        ----
            edges (BioCypherEdge): a list or generator of edges in
                :py:class:`BioCypherEdge` format
            batch_size (int): The number of edges to write in each batch.

        Returns:
        -------
            bool: The return value. True for success, False otherwise.

        """
        # check if specified output format is correct
        passed = self._is_rdf_format_supported(self.file_format)
        if not passed:
            logger.error("Error while writing edge data, wrong RDF format")
            return False
        # write edge data using _write_edge_data method
        passed = self._write_edge_data(edges, batch_size=batch_size)
        if not passed:
            logger.error("Error while writing edge data.")
            return False

        return True

    def _construct_import_call(self) -> bool:
        """Write the import call.

        This function is not applicable for RDF.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        return ""

    def _quote_string(self, value: str) -> str:
        """Quote a string."""
        return f"{self.quote}{value}{self.quote}"

    def _write_array_string(self, string_list):
        """Write the string representation of an array into a .csv file.

        This function is not applicable for RDF.

        Args:
        ----
            string_list (list): list of ontology strings

        Returns:
        -------
            str: The string representation of an array for the neo4j admin import

        """
        return True

    def _write_node_headers(self):
        """Import properties of a graph entity.

        This function is not applicable for RDF.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        return True

    def _write_edge_headers(self):
        """Write a database import-file for a graph entity.

        This function is not applicable for RDF.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        return True

    def as_uri(self, name: str, namespace: str = "") -> str:
        """Return an RDFlib object with the given namespace as a URI.

        There is often a default for empty namespaces, which would have been
        loaded with the ontology, and put in `self.namespace` by
        `self._init_namespaces`.

        Args:
        ----
            name (str): The name to be transformed.
            namespace (str): The namespace to be used.

        Returns:
        -------
            str: The URI for the given name and namespace.

        """
        if namespace in self.namespaces:
            return URIRef(self.namespaces[namespace][name])
        else:
            assert "biocypher" in self.namespaces
            # If no default empty NS, use the biocypher one,
            # which is always there.
            logger.debug(f"I'll consider '{name}' as part of 'biocypher' namespace.")
            return URIRef(self.namespaces["biocypher"][name])

    def to_uri(self, subject: str) -> str:
        """Extract the namespace from the given subject.

        Split the subject's string on ":". Then convert the subject to a
        proper URI, if the namespace is known. If namespace is unknown,
        defaults to the default prefix of the ontology.

        Args:
        ----
            subject (str): The subject to be converted to a URI.

        Returns:
        -------
            str: The corresponding URI for the subject.

        """
        pref_id = subject.split(":")
        if len(pref_id) == 2:
            pref, id = pref_id
            return self.as_uri(id, pref)
        else:
            return self.as_uri(subject)

    def find_uri(self, regexp: str) -> str:
        query = f'SELECT DISTINCT ?s WHERE {{ ?s ?p ?o . FILTER regex(str(?s), "{regexp}")}}'
        gen = self.graph.query(query)
        uris = list(gen)
        if len(uris) > 1:
            logger.warning(
                f"Found several terms matching `{regexp}`, I will consider only the first one: `{uris[0][0]}`",
            )
            logger.debug("\tothers:")
            for u in uris[1:]:
                logger.debug(f"\t{u[0]}")
        if uris:
            logger.debug(f"Found {len(uris)} terms, returning: `{uris[0][0]}`")
            return uris[0][0]
        else:
            logger.debug(f"Found no term matching: `{query}`")
            return None

    def property_to_uri(self, property_name: str) -> dict[str, str]:
        """Convert a property name to its corresponding URI.

        This function takes a property name and searches for its corresponding
        URI in various namespaces. It first checks the core namespaces for
        rdflib, including owl, rdf, rdfs, xsd, and xml.

        Args:
        ----
            property_name (str): The property name to be converted to a URI.

        Returns:
        -------
            str: The corresponding URI for the input property name.

        """
        # These namespaces are core for rdflib; owl, rdf, rdfs, xsd and xml
        for namespace in _NAMESPACE_PREFIXES_CORE.values():
            if property_name in namespace:
                return namespace[property_name]

        # If the property name is not found in the core namespaces, search in
        # the SKOS, DC, and DCTERMS namespaces
        for namespace in [SKOS, DC, DCTERMS]:
            if property_name in namespace:
                return namespace[property_name]

        # If the property name is still not found, try other namespaces from
        # rdflib.
        for namespace in _NAMESPACE_PREFIXES_RDFLIB.values():
            if property_name in namespace:
                return namespace[property_name]

        # If the property name is "licence", it recursively calls the function
        # with "license" as the input.
        if property_name == "licence":
            return self.property_to_uri("license")

        # TODO: add an option to search trough manually implemented namespaces

        # If the input is not found in any of the namespaces, it returns
        # the corresponding URI from the biocypher namespace.
        # TODO: give a warning and try to prevent this option altogether
        return self.as_uri(property_name, "biocypher")

    def _init_namespaces(self, graph: Graph):
        """Initialise the namespaces for the RDF graph.

        This function adds the biocypher standard namespace to the `namespaces`
        attribute of the class. If `namespaces` is empty, it sets it to the
        biocypher standard namespace. Otherwise, it merges the biocypher
        standard namespace with the namespaces defined in the
        biocypher_config.yaml.

        Args:
        ----
            graph (RDFLib.Graph): The RDF graph to bind the namespaces to.

        Returns:
        -------
            None

        """
        # Bind and keep the biocypher namespace.
        bcns = Namespace("https://biocypher.org/biocypher#")
        bck = "biocypher"
        self.namespaces = {bck: bcns}
        graph.bind(bck, bcns)

        # Keep track of namespaces loaded with the ontologies in the given graph.
        logger.debug("Bind namespaces:")
        for prefix, ns in graph.namespaces():
            if prefix in self.namespaces and str(ns) != str(self.namespaces[prefix]):
                logger.warning(
                    f"Namespace '{prefix}' was already loaded"
                    f"as '{self.namespaces[prefix]}',"
                    f"I will overwrite it with '{ns}'.",
                )
            logger.debug(f"\t'{prefix}'\t=>\t'{ns}'")
            self.namespaces[prefix] = Namespace(ns)

        # Bind and keep the namespaces given in the config.
        for prefix, ns in self.rdf_namespaces.items():
            assert prefix not in self.namespaces
            self.namespaces[prefix] = Namespace(ns)
            logger.debug(f"\t'{prefix}'\t->\t{ns}")
            graph.bind(prefix, self.namespaces[prefix])

_construct_import_call()

Write the import call.

This function is not applicable for RDF.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/graph/_rdf.py
def _construct_import_call(self) -> bool:
    """Write the import call.

    This function is not applicable for RDF.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    return ""

_get_default_import_call_bin_prefix()

Provide the default string for the import call bin prefix.

Returns
str: The default location for the RDF admin import location
Source code in biocypher/output/write/graph/_rdf.py
def _get_default_import_call_bin_prefix(self):
    """Provide the default string for the import call bin prefix.

    Returns
    -------
        str: The default location for the RDF admin import location

    """
    return "bin/"

_get_import_script_name()

Return the name of the RDF admin import script.

This function is used for RDF export.

Returns
str: The name of the import script (ending in .sh)
Source code in biocypher/output/write/graph/_rdf.py
def _get_import_script_name(self) -> str:
    """Return the name of the RDF admin import script.

    This function is used for RDF export.

    Returns
    -------
        str: The name of the import script (ending in .sh)

    """
    return "rdf-import-call.sh"

_init_namespaces(graph)

Initialise the namespaces for the RDF graph.

This function adds the biocypher standard namespace to the namespaces attribute of the class. If namespaces is empty, it sets it to the biocypher standard namespace. Otherwise, it merges the biocypher standard namespace with the namespaces defined in the biocypher_config.yaml.


graph (RDFLib.Graph): The RDF graph to bind the namespaces to.

None
Source code in biocypher/output/write/graph/_rdf.py
def _init_namespaces(self, graph: Graph):
    """Initialise the namespaces for the RDF graph.

    This function adds the biocypher standard namespace to the `namespaces`
    attribute of the class. If `namespaces` is empty, it sets it to the
    biocypher standard namespace. Otherwise, it merges the biocypher
    standard namespace with the namespaces defined in the
    biocypher_config.yaml.

    Args:
    ----
        graph (RDFLib.Graph): The RDF graph to bind the namespaces to.

    Returns:
    -------
        None

    """
    # Bind and keep the biocypher namespace.
    bcns = Namespace("https://biocypher.org/biocypher#")
    bck = "biocypher"
    self.namespaces = {bck: bcns}
    graph.bind(bck, bcns)

    # Keep track of namespaces loaded with the ontologies in the given graph.
    logger.debug("Bind namespaces:")
    for prefix, ns in graph.namespaces():
        if prefix in self.namespaces and str(ns) != str(self.namespaces[prefix]):
            logger.warning(
                f"Namespace '{prefix}' was already loaded"
                f"as '{self.namespaces[prefix]}',"
                f"I will overwrite it with '{ns}'.",
            )
        logger.debug(f"\t'{prefix}'\t=>\t'{ns}'")
        self.namespaces[prefix] = Namespace(ns)

    # Bind and keep the namespaces given in the config.
    for prefix, ns in self.rdf_namespaces.items():
        assert prefix not in self.namespaces
        self.namespaces[prefix] = Namespace(ns)
        logger.debug(f"\t'{prefix}'\t->\t{ns}")
        graph.bind(prefix, self.namespaces[prefix])

_is_rdf_format_supported(file_format)

Check if the specified RDF format is supported.


file_format (str): The RDF format to check.

bool: Returns True if rdf format supported, False otherwise.
Source code in biocypher/output/write/graph/_rdf.py
def _is_rdf_format_supported(self, file_format: str) -> bool:
    """Check if the specified RDF format is supported.

    Args:
    ----
        file_format (str): The RDF format to check.

    Returns:
    -------
        bool: Returns True if rdf format supported, False otherwise.

    """
    supported_formats = [
        "xml",
        "n3",
        "turtle",
        "ttl",
        "nt",
        "pretty-xml",
        "trix",
        "trig",
        "nquads",
        "json-ld",
    ]
    if file_format not in supported_formats:
        logger.error(
            f"Incorrect or unsupported RDF format: '{file_format}',"
            f"use one of the following: {', '.join(supported_formats)}.",
        )
        return False
    else:
        # Set the file extension to match the format
        if self.file_format == "turtle":
            self.extension = "ttl"
        else:
            self.extension = self.file_format
        return True

_quote_string(value)

Quote a string.

Source code in biocypher/output/write/graph/_rdf.py
def _quote_string(self, value: str) -> str:
    """Quote a string."""
    return f"{self.quote}{value}{self.quote}"

_write_array_string(string_list)

Write the string representation of an array into a .csv file.

This function is not applicable for RDF.


string_list (list): list of ontology strings

str: The string representation of an array for the neo4j admin import
Source code in biocypher/output/write/graph/_rdf.py
def _write_array_string(self, string_list):
    """Write the string representation of an array into a .csv file.

    This function is not applicable for RDF.

    Args:
    ----
        string_list (list): list of ontology strings

    Returns:
    -------
        str: The string representation of an array for the neo4j admin import

    """
    return True

_write_edge_headers()

Write a database import-file for a graph entity.

This function is not applicable for RDF.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/graph/_rdf.py
def _write_edge_headers(self):
    """Write a database import-file for a graph entity.

    This function is not applicable for RDF.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    return True

_write_node_headers()

Import properties of a graph entity.

This function is not applicable for RDF.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/graph/_rdf.py
def _write_node_headers(self):
    """Import properties of a graph entity.

    This function is not applicable for RDF.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    return True

_write_single_edge_list_to_file(edge_list, label, prop_dict)

Write a list of BioCypherEdges to an RDF file.


edge_list (list): list of BioCypherEdges to be written

label (str): the label (type) of the edge

prop_dict (dict): properties of node class passed from parsing
    function and their types

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/graph/_rdf.py
def _write_single_edge_list_to_file(
    self,
    edge_list: list,
    label: str,
    prop_dict: dict,
):
    """Write a list of BioCypherEdges to an RDF file.

    Args:
    ----
        edge_list (list): list of BioCypherEdges to be written

        label (str): the label (type) of the edge

        prop_dict (dict): properties of node class passed from parsing
            function and their types

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    # NOTE: prop_dict is not used. Remove in next refactor.

    if not all(isinstance(n, BioCypherEdge) for n in edge_list):
        logger.error("Edges must be passed as type BioCypherEdge.")
        return False

    # translate label to PascalCase
    label_pascal = self.translator.name_sentence_to_pascal(label)

    # create file name
    file_name = os.path.join(self.outdir, f"{label_pascal}.{self.extension}")

    # write data in graph
    graph = Graph()
    self._init_namespaces(graph)

    for edge in edge_list:
        rdf_subject = edge.get_source_id()
        rdf_object = edge.get_target_id()
        rdf_predicate = edge.get_id()
        rdf_properties = edge.get_properties()
        if rdf_predicate is None:
            rdf_predicate = rdf_subject + rdf_object

        edge_label = self.translator.name_sentence_to_pascal(edge.get_label())
        edge_uri = self.as_uri(edge_label, "biocypher")
        graph.add((edge_uri, RDF.type, RDFS.Class))
        graph.add(
            (
                self.as_uri(rdf_predicate, "biocypher"),
                RDF.type,
                edge_uri,
            ),
        )
        graph.add(
            (
                self.as_uri(rdf_predicate, "biocypher"),
                self.as_uri("subject", "biocypher"),
                self.to_uri(rdf_subject),
            ),
        )
        graph.add(
            (
                self.as_uri(rdf_predicate, "biocypher"),
                self.as_uri("object", "biocypher"),
                self.to_uri(rdf_object),
            ),
        )

        # add properties to the transformed edge --> node
        for key, value in rdf_properties.items():
            # only write value if it exists.
            if value:
                self.add_property_to_graph(graph, rdf_predicate, value, key)

    graph.serialize(destination=file_name, format=self.file_format)

    logger.info(
        f"Writing {len(edge_list)} entries to {label_pascal}.{self.file_format}",
    )

    return True

_write_single_node_list_to_file(node_list, label, prop_dict, labels)

Write a list of BioCypherNodes to an RDF file.


node_list (list): A list of BioCypherNodes to be written.

label (str): The label (type) of the nodes.

prop_dict (dict): A dictionary of properties and their types for the node class.

labels (str): string of one or several concatenated labels

bool: True if the writing is successful, False otherwise.
Source code in biocypher/output/write/graph/_rdf.py
def _write_single_node_list_to_file(
    self,
    node_list: list,
    label: str,
    prop_dict: dict,
    labels: str,
):
    """Write a list of BioCypherNodes to an RDF file.

    Args:
    ----
        node_list (list): A list of BioCypherNodes to be written.

        label (str): The label (type) of the nodes.

        prop_dict (dict): A dictionary of properties and their types for the node class.

        labels (str): string of one or several concatenated labels

    Returns:
    -------
        bool: True if the writing is successful, False otherwise.

    """
    # NOTE: labels and prop_dict are not used.

    if not all(isinstance(n, BioCypherNode) for n in node_list):
        logger.error("Nodes must be passed as type BioCypherNode.")
        return False

    # translate label to PascalCase
    label_pascal = self.translator.name_sentence_to_pascal(label)

    # create file name
    file_name = os.path.join(self.outdir, f"{label_pascal}.{self.extension}")

    # write data in graph
    graph = Graph()
    self._init_namespaces(graph)

    for n in node_list:
        rdf_subject = n.get_id()
        rdf_object = n.get_label()
        properties = n.get_properties()
        class_name = self.translator.name_sentence_to_pascal(rdf_object)
        graph.add(
            (
                self.as_uri(class_name, "biocypher"),
                RDF.type,
                RDFS.Class,
            ),
        )
        graph.add(
            (
                self.to_uri(rdf_subject),
                RDF.type,
                self.as_uri(class_name, "biocypher"),
            ),
        )
        for key, value in properties.items():
            # only write value if it exists.
            if value:
                self.add_property_to_graph(graph, rdf_subject, value, key)

    graph.serialize(destination=file_name, format=self.file_format)

    logger.info(
        f"Writing {len(node_list)} entries to {label_pascal}.{self.file_format}",
    )

    return True

add_property_to_graph(graph, rdf_subject, rdf_object, rdf_predicate)

Add the properties to an RDF node.

It takes the graph, the subject, object, and predicate of the RDF triple. It checks if the property is a list and adds it to the graph accordingly. Otherwise it checks if the string represents a list. If it does, it transforms it to a list and adds it to the graph. If not, it adds the property to the graph as a literal. If the property is neither a list or string, it will also be added as a literal.


graph (RDFLib.Graph): The RDF graph to add the nodes to.

rdf_subject (str): The subject of the RDF triple.

rdf_object (str): The object of the RDF triple.

rdf_predicate (str): The predicate of the RDF triple.

None
Source code in biocypher/output/write/graph/_rdf.py
def add_property_to_graph(
    self,
    graph: Graph,
    rdf_subject: str,
    rdf_object: str,
    rdf_predicate: str,
):
    """Add the properties to an RDF node.

    It takes the graph, the subject, object, and predicate of the RDF
    triple. It checks if the property is a list and adds it to the graph
    accordingly. Otherwise it checks if the string represents a list. If it
    does, it transforms it to a list and adds it to the graph. If not, it
    adds the property to the graph as a literal. If the property is neither
    a list or string, it will also be added as a literal.

    Args:
    ----
        graph (RDFLib.Graph): The RDF graph to add the nodes to.

        rdf_subject (str): The subject of the RDF triple.

        rdf_object (str): The object of the RDF triple.

        rdf_predicate (str): The predicate of the RDF triple.

    Returns:
    -------
        None

    """
    if isinstance(rdf_object, list):
        for obj in rdf_object:
            graph.add(
                (
                    self.to_uri(rdf_subject),
                    self.property_to_uri(rdf_predicate),
                    Literal(obj),
                ),
            )
    elif isinstance(rdf_object, str):
        if rdf_object.startswith("[") and rdf_object.endswith("]"):
            self.add_property_to_graph(
                graph,
                rdf_subject,
                self.transform_string_to_list(rdf_object),
                rdf_predicate,
            )
        else:
            graph.add(
                (
                    self.to_uri(rdf_subject),
                    self.property_to_uri(rdf_predicate),
                    Literal(rdf_object),
                ),
            )
    else:
        graph.add(
            (
                self.to_uri(rdf_subject),
                self.property_to_uri(rdf_predicate),
                Literal(rdf_object),
            ),
        )

as_uri(name, namespace='')

Return an RDFlib object with the given namespace as a URI.

There is often a default for empty namespaces, which would have been loaded with the ontology, and put in self.namespace by self._init_namespaces.


name (str): The name to be transformed.
namespace (str): The namespace to be used.

str: The URI for the given name and namespace.
Source code in biocypher/output/write/graph/_rdf.py
def as_uri(self, name: str, namespace: str = "") -> str:
    """Return an RDFlib object with the given namespace as a URI.

    There is often a default for empty namespaces, which would have been
    loaded with the ontology, and put in `self.namespace` by
    `self._init_namespaces`.

    Args:
    ----
        name (str): The name to be transformed.
        namespace (str): The namespace to be used.

    Returns:
    -------
        str: The URI for the given name and namespace.

    """
    if namespace in self.namespaces:
        return URIRef(self.namespaces[namespace][name])
    else:
        assert "biocypher" in self.namespaces
        # If no default empty NS, use the biocypher one,
        # which is always there.
        logger.debug(f"I'll consider '{name}' as part of 'biocypher' namespace.")
        return URIRef(self.namespaces["biocypher"][name])

property_to_uri(property_name)

Convert a property name to its corresponding URI.

This function takes a property name and searches for its corresponding URI in various namespaces. It first checks the core namespaces for rdflib, including owl, rdf, rdfs, xsd, and xml.


property_name (str): The property name to be converted to a URI.

str: The corresponding URI for the input property name.
Source code in biocypher/output/write/graph/_rdf.py
def property_to_uri(self, property_name: str) -> dict[str, str]:
    """Convert a property name to its corresponding URI.

    This function takes a property name and searches for its corresponding
    URI in various namespaces. It first checks the core namespaces for
    rdflib, including owl, rdf, rdfs, xsd, and xml.

    Args:
    ----
        property_name (str): The property name to be converted to a URI.

    Returns:
    -------
        str: The corresponding URI for the input property name.

    """
    # These namespaces are core for rdflib; owl, rdf, rdfs, xsd and xml
    for namespace in _NAMESPACE_PREFIXES_CORE.values():
        if property_name in namespace:
            return namespace[property_name]

    # If the property name is not found in the core namespaces, search in
    # the SKOS, DC, and DCTERMS namespaces
    for namespace in [SKOS, DC, DCTERMS]:
        if property_name in namespace:
            return namespace[property_name]

    # If the property name is still not found, try other namespaces from
    # rdflib.
    for namespace in _NAMESPACE_PREFIXES_RDFLIB.values():
        if property_name in namespace:
            return namespace[property_name]

    # If the property name is "licence", it recursively calls the function
    # with "license" as the input.
    if property_name == "licence":
        return self.property_to_uri("license")

    # TODO: add an option to search trough manually implemented namespaces

    # If the input is not found in any of the namespaces, it returns
    # the corresponding URI from the biocypher namespace.
    # TODO: give a warning and try to prevent this option altogether
    return self.as_uri(property_name, "biocypher")

to_uri(subject)

Extract the namespace from the given subject.

Split the subject's string on ":". Then convert the subject to a proper URI, if the namespace is known. If namespace is unknown, defaults to the default prefix of the ontology.


subject (str): The subject to be converted to a URI.

str: The corresponding URI for the subject.
Source code in biocypher/output/write/graph/_rdf.py
def to_uri(self, subject: str) -> str:
    """Extract the namespace from the given subject.

    Split the subject's string on ":". Then convert the subject to a
    proper URI, if the namespace is known. If namespace is unknown,
    defaults to the default prefix of the ontology.

    Args:
    ----
        subject (str): The subject to be converted to a URI.

    Returns:
    -------
        str: The corresponding URI for the subject.

    """
    pref_id = subject.split(":")
    if len(pref_id) == 2:
        pref, id = pref_id
        return self.as_uri(id, pref)
    else:
        return self.as_uri(subject)

transform_string_to_list(string_list)

Transform a string representation of a list into a list.


string_list (str): The string representation of the list.

list: The list representation of the input string.
Source code in biocypher/output/write/graph/_rdf.py
def transform_string_to_list(self, string_list: str) -> list:
    """Transform a string representation of a list into a list.

    Args:
    ----
        string_list (str): The string representation of the list.

    Returns:
    -------
        list: The list representation of the input string.

    """
    return string_list.replace("[", "").replace("]", "").replace("'", "").split(", ")

write_edges(edges, batch_size=int(1000000.0))

Write edges in RDF format.


edges (BioCypherEdge): a list or generator of edges in
    :py:class:`BioCypherEdge` format
batch_size (int): The number of edges to write in each batch.

bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/graph/_rdf.py
def write_edges(
    self,
    edges: list | GeneratorType,
    batch_size: int = int(1e6),
) -> bool:
    """Write edges in RDF format.

    Args:
    ----
        edges (BioCypherEdge): a list or generator of edges in
            :py:class:`BioCypherEdge` format
        batch_size (int): The number of edges to write in each batch.

    Returns:
    -------
        bool: The return value. True for success, False otherwise.

    """
    # check if specified output format is correct
    passed = self._is_rdf_format_supported(self.file_format)
    if not passed:
        logger.error("Error while writing edge data, wrong RDF format")
        return False
    # write edge data using _write_edge_data method
    passed = self._write_edge_data(edges, batch_size=batch_size)
    if not passed:
        logger.error("Error while writing edge data.")
        return False

    return True

write_nodes(nodes, batch_size=int(1000000.0), force=False)

Write nodes in RDF format.


nodes (list or generator): A list or generator of nodes in
    BioCypherNode format.
batch_size (int): The number of nodes to write in each batch.
force (bool): Flag to force the writing even if the output file
    already exists.

bool: True if the writing is successful, False otherwise.
Source code in biocypher/output/write/graph/_rdf.py
def write_nodes(self, nodes, batch_size: int = int(1e6), force: bool = False) -> bool:
    """Write nodes in RDF format.

    Args:
    ----
        nodes (list or generator): A list or generator of nodes in
            BioCypherNode format.
        batch_size (int): The number of nodes to write in each batch.
        force (bool): Flag to force the writing even if the output file
            already exists.

    Returns:
    -------
        bool: True if the writing is successful, False otherwise.

    """
    # check if specified output format is correct
    passed = self._is_rdf_format_supported(self.file_format)
    if not passed:
        logger.error("Error while writing node data, wrong RDF format")
        return False
    # write node data using _write_node_data method
    passed = self._write_node_data(nodes, batch_size, force)
    if not passed:
        logger.error("Error while writing node data.")
        return False
    return True

NetworkX Writer

Bases: _Writer

Class for writing the in-memory networkx DiGraph to file.

Call _construct_import_call to write the networkx DiGraph to a pickle file and return the Python call to load it.

TODO: this is a non-intuitive name, should be adjusted.

Source code in biocypher/output/write/graph/_networkx.py
class _NetworkXWriter(_Writer):
    """
    Class for writing the in-memory networkx DiGraph to file.

    Call `_construct_import_call` to write the networkx DiGraph to a pickle
    file and return the Python call to load it.

    TODO: this is a non-intuitive name, should be adjusted.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.in_memory_networkx_kg = NetworkxKG(
            deduplicator=self.deduplicator,
        )

    def _construct_import_call(self) -> str:
        """Dump networkx graph to a pickle file and return Python call.

        Returns:
            str: Python code to load the networkx graph from a pickle file.
        """
        self.G = self.in_memory_networkx_kg._create_networkx_kg()
        logger.info(f"Writing networkx {self.G} to pickle file networkx_graph.pkl.")
        with open(f"{self.output_directory}/networkx_graph.pkl", "wb") as f:
            pickle.dump(self.G, f)

        import_call = "import pickle\n"
        import_call += "with open('./networkx_graph.pkl', 'rb') as f:\n\tG_loaded = pickle.load(f)"
        return import_call

    def _get_import_script_name(self) -> str:
        """Function to return the name of the import script."""
        return "import_networkx.py"

    def _write_node_data(self, nodes) -> bool:
        """Add nodes to the networkx graph.

        TODO: this is not strictly writing, should be refactored.

        Args:
            nodes (list): List of nodes to add to the networkx graph.

        Returns:
            bool: True if the nodes were added successfully, False otherwise.
        """
        passed = self.in_memory_networkx_kg.add_nodes(nodes)
        return passed

    def _write_edge_data(self, edges) -> bool:
        """Add edges to the networkx graph.

        TODO: this is not strictly writing, should be refactored.

        Args:
            edges (list): List of edges to add to the networkx graph.

        Returns:
            bool: True if the edges were added successfully, False otherwise.
        """
        passed = self.in_memory_networkx_kg.add_edges(edges)
        return passed

_construct_import_call()

Dump networkx graph to a pickle file and return Python call.

Returns:

Name Type Description
str str

Python code to load the networkx graph from a pickle file.

Source code in biocypher/output/write/graph/_networkx.py
def _construct_import_call(self) -> str:
    """Dump networkx graph to a pickle file and return Python call.

    Returns:
        str: Python code to load the networkx graph from a pickle file.
    """
    self.G = self.in_memory_networkx_kg._create_networkx_kg()
    logger.info(f"Writing networkx {self.G} to pickle file networkx_graph.pkl.")
    with open(f"{self.output_directory}/networkx_graph.pkl", "wb") as f:
        pickle.dump(self.G, f)

    import_call = "import pickle\n"
    import_call += "with open('./networkx_graph.pkl', 'rb') as f:\n\tG_loaded = pickle.load(f)"
    return import_call

_get_import_script_name()

Function to return the name of the import script.

Source code in biocypher/output/write/graph/_networkx.py
def _get_import_script_name(self) -> str:
    """Function to return the name of the import script."""
    return "import_networkx.py"

_write_edge_data(edges)

Add edges to the networkx graph.

TODO: this is not strictly writing, should be refactored.

Parameters:

Name Type Description Default
edges list

List of edges to add to the networkx graph.

required

Returns:

Name Type Description
bool bool

True if the edges were added successfully, False otherwise.

Source code in biocypher/output/write/graph/_networkx.py
def _write_edge_data(self, edges) -> bool:
    """Add edges to the networkx graph.

    TODO: this is not strictly writing, should be refactored.

    Args:
        edges (list): List of edges to add to the networkx graph.

    Returns:
        bool: True if the edges were added successfully, False otherwise.
    """
    passed = self.in_memory_networkx_kg.add_edges(edges)
    return passed

_write_node_data(nodes)

Add nodes to the networkx graph.

TODO: this is not strictly writing, should be refactored.

Parameters:

Name Type Description Default
nodes list

List of nodes to add to the networkx graph.

required

Returns:

Name Type Description
bool bool

True if the nodes were added successfully, False otherwise.

Source code in biocypher/output/write/graph/_networkx.py
def _write_node_data(self, nodes) -> bool:
    """Add nodes to the networkx graph.

    TODO: this is not strictly writing, should be refactored.

    Args:
        nodes (list): List of nodes to add to the networkx graph.

    Returns:
        bool: True if the nodes were added successfully, False otherwise.
    """
    passed = self.in_memory_networkx_kg.add_nodes(nodes)
    return passed

PostgreSQL Batch Writer

Bases: _BatchWriter

Write node and edge representations for PostgreSQL.

Class for writing node and edge representations to disk using the format specified by PostgreSQL for the use of "COPY FROM...". Each batch writer instance has a fixed representation that needs to be passed at instantiation via the 🇵🇾attr:schema argument. The instance also expects an ontology adapter via 🇵🇾attr:ontology_adapter to be able to convert and extend the hierarchy.

This class inherits from the abstract class "_BatchWriter" and implements the PostgreSQL-specific methods:

- _write_node_headers
- _write_edge_headers
- _construct_import_call
- _write_array_string
Source code in biocypher/output/write/relational/_postgresql.py
class _PostgreSQLBatchWriter(_BatchWriter):
    """Write node and edge representations for PostgreSQL.

    Class for writing node and edge representations to disk using the
    format specified by PostgreSQL for the use of "COPY FROM...". Each batch
    writer instance has a fixed representation that needs to be passed
    at instantiation via the :py:attr:`schema` argument. The instance
    also expects an ontology adapter via :py:attr:`ontology_adapter` to be able
    to convert and extend the hierarchy.

    This class inherits from the abstract class "_BatchWriter" and implements the
    PostgreSQL-specific methods:

        - _write_node_headers
        - _write_edge_headers
        - _construct_import_call
        - _write_array_string
    """

    DATA_TYPE_LOOKUP = {
        "str": "VARCHAR",  # VARCHAR needs limit
        "int": "INTEGER",
        "long": "BIGINT",
        "float": "NUMERIC",
        "double": "NUMERIC",
        "dbl": "NUMERIC",
        "boolean": "BOOLEAN",
        "str[]": "VARCHAR[]",
        "string[]": "VARCHAR[]",
    }

    def __init__(self, *args, **kwargs):
        self._copy_from_csv_commands = set()
        super().__init__(*args, **kwargs)

    def _get_default_import_call_bin_prefix(self) -> str:
        """Provide the default string for the import call bin prefix.

        Returns
        -------
            str: The default location for the psql command

        """
        return ""

    def _get_data_type(self, string) -> str:
        try:
            return self.DATA_TYPE_LOOKUP[string]
        except KeyError:
            logger.info('Could not determine data type {string}. Using default "VARCHAR"')
            return "VARCHAR"

    def _quote_string(self, value: str) -> str:
        """Quote a string."""
        return f"{self.quote}{value}{self.quote}"

    def _write_array_string(self, string_list) -> str:
        """Write the string representation of an array into a .csv file.

        Abstract method to output.write the string representation of an array
        into a .csv file as required by the postgresql COPY command, with
        '{','}' brackets and ',' separation.

        Args:
        ----
            string_list (list): list of ontology strings

        Returns:
        -------
            str: The string representation of an array for postgres COPY

        """
        string = ",".join(string_list)
        string = f'"{{{string}}}"'
        return string

    def _get_import_script_name(self) -> str:
        """Return the name of the psql import script.

        Returns
        -------
            str: The name of the import script (ending in .sh)

        """
        return f"{self.db_name}-import-call.sh"

    def _adjust_pascal_to_psql(self, string):
        string = string.replace(".", "_")
        string = string.lower()
        return string

    def _write_node_headers(self) -> bool:
        """Write node header files for PostgreSQL.

        Writes single CSV file for a graph entity that is represented
        as a node as per the definition in the `schema_config.yaml`,
        containing only the header for this type of node.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        # load headers from data parse
        if not self.node_property_dict:
            logger.error(
                "Header information not found. Was the data parsed first?",
            )
            return False

        for label, props in self.node_property_dict.items():
            # create header CSV with ID, properties, labels

            # translate label to PascalCase
            pascal_label = self.translator.name_sentence_to_pascal(label)

            parts = f"{pascal_label}-part*.csv"
            parts_paths = os.path.join(self.outdir, parts)
            parts_paths = glob.glob(parts_paths)
            parts_paths.sort()

            # adjust label for import to psql
            pascal_label = self._adjust_pascal_to_psql(pascal_label)
            table_create_command_path = os.path.join(
                self.outdir,
                f"{pascal_label}-create_table.sql",
            )

            # check if file already exists
            if os.path.exists(table_create_command_path):
                logger.warning(
                    f"File {table_create_command_path} already exists. Overwriting.",
                )

            # concatenate key:value in props
            columns = ["_ID VARCHAR"]
            for col_name, col_type in props.items():
                col_type = self._get_data_type(col_type)
                col_name = self._adjust_pascal_to_psql(col_name)
                columns.append(f"{col_name} {col_type}")
            columns.append("_LABEL VARCHAR[]")

            with open(table_create_command_path, "w", encoding="utf-8") as f:
                command = ""
                if self.wipe:
                    command += f"DROP TABLE IF EXISTS {pascal_label};\n"

                # table creation requires comma separation
                command += f"CREATE TABLE {pascal_label}({','.join(columns)});\n"
                f.write(command)

                for parts_path in parts_paths:
                    # if import_call_file_prefix is set, replace actual path
                    # with prefix
                    if self.import_call_file_prefix != self.outdir:
                        parts_path = parts_path.replace(
                            self.outdir,
                            self.import_call_file_prefix,
                        )

                    self._copy_from_csv_commands.add(
                        f"\\copy {pascal_label} FROM '{parts_path}' DELIMITER E'{self.delim}' CSV;",
                    )

            # add file path to import statement
            # if import_call_file_prefix is set, replace actual path
            # with prefix
            if self.import_call_file_prefix != self.outdir:
                table_create_command_path = table_create_command_path.replace(
                    self.outdir,
                    self.import_call_file_prefix,
                )

            self.import_call_nodes.add(table_create_command_path)

        return True

    def _write_edge_headers(self):
        """Writes single CSV file for a graph entity that is represented
        as an edge as per the definition in the `schema_config.yaml`,
        containing only the header for this type of edge.

        Returns
        -------
            bool: The return value. True for success, False otherwise.

        """
        # load headers from data parse
        if not self.edge_property_dict:
            logger.error(
                "Header information not found. Was the data parsed first?",
            )
            return False

        for label, props in self.edge_property_dict.items():
            # translate label to PascalCase
            pascal_label = self.translator.name_sentence_to_pascal(label)

            parts_paths = os.path.join(self.outdir, f"{pascal_label}-part*.csv")
            parts_paths = glob.glob(parts_paths)
            parts_paths.sort()

            # adjust label for import to psql
            pascal_label = self._adjust_pascal_to_psql(pascal_label)
            table_create_command_path = os.path.join(
                self.outdir,
                f"{pascal_label}-create_table.sql",
            )

            # check for file exists
            if os.path.exists(table_create_command_path):
                logger.warning(
                    f"File {table_create_command_path} already exists. Overwriting.",
                )

            # concatenate key:value in props
            columns = []
            for col_name, col_type in props.items():
                col_type = self._get_data_type(col_type)
                col_name = self._adjust_pascal_to_psql(col_name)
                if col_name == "_ID":
                    # should ideally never happen
                    raise ValueError(
                        "Column name '_ID' is reserved for internal use, "
                        "denoting the relationship ID. Please choose a "
                        "different name for your column.",
                    )

                columns.append(f"{col_name} {col_type}")

            # create list of lists and flatten
            # removes need for empty check of property list
            out_list = [
                "_START_ID VARCHAR",
                "_ID VARCHAR",
                *columns,
                "_END_ID VARCHAR",
                "_TYPE VARCHAR",
            ]

            with open(table_create_command_path, "w", encoding="utf-8") as f:
                command = ""
                if self.wipe:
                    command += f"DROP TABLE IF EXISTS {pascal_label};\n"

                # table creation requires comma separation
                command += f"CREATE TABLE {pascal_label}({','.join(out_list)});\n"
                f.write(command)

                for parts_path in parts_paths:
                    # if import_call_file_prefix is set, replace actual path
                    # with prefix
                    if self.import_call_file_prefix != self.outdir:
                        parts_path = parts_path.replace(
                            self.outdir,
                            self.import_call_file_prefix,
                        )

                    self._copy_from_csv_commands.add(
                        f"\\copy {pascal_label} FROM '{parts_path}' DELIMITER E'{self.delim}' CSV;",
                    )

            # add file path to import statement
            # if import_call_file_prefix is set, replace actual path
            # with prefix
            if self.import_call_file_prefix != self.outdir:
                table_create_command_path = table_create_command_path.replace(
                    self.outdir,
                    self.import_call_file_prefix,
                )

            self.import_call_edges.add(table_create_command_path)

        return True

    def _construct_import_call(self) -> str:
        """Function to construct the import call detailing folder and
        individual node and edge headers and data files, as well as
        delimiters and database name. Built after all data has been
        processed to ensure that nodes are called before any edges.

        Returns
        -------
            str: a bash command for postgresql import

        """
        import_call = ""

        # create tables
        # At this point, csv files of nodes and edges do not require differentiation
        for import_file_path in [
            *self.import_call_nodes,
            *self.import_call_edges,
        ]:
            import_call += f'echo "Setup {import_file_path}..."\n'
            if {self.db_password}:
                # set password variable inline
                import_call += f"PGPASSWORD={self.db_password} "
            import_call += f"{self.import_call_bin_prefix}psql -f {import_file_path}"
            import_call += f" --dbname {self.db_name}"
            import_call += f" --host {self.db_host}"
            import_call += f" --port {self.db_port}"
            import_call += f" --user {self.db_user}"
            import_call += '\necho "Done!"\n'
            import_call += "\n"

        # copy data to tables
        for command in self._copy_from_csv_commands:
            table_part = command.split(" ")[3]
            import_call += f'echo "Importing {table_part}..."\n'
            if {self.db_password}:
                # set password variable inline
                import_call += f"PGPASSWORD={self.db_password} "
            import_call += f'{self.import_call_bin_prefix}psql -c "{command}"'
            import_call += f" --dbname {self.db_name}"
            import_call += f" --host {self.db_host}"
            import_call += f" --port {self.db_port}"
            import_call += f" --user {self.db_user}"
            import_call += '\necho "Done!"\n'
            import_call += "\n"

        return import_call

_construct_import_call()

Function to construct the import call detailing folder and individual node and edge headers and data files, as well as delimiters and database name. Built after all data has been processed to ensure that nodes are called before any edges.

Returns
str: a bash command for postgresql import
Source code in biocypher/output/write/relational/_postgresql.py
def _construct_import_call(self) -> str:
    """Function to construct the import call detailing folder and
    individual node and edge headers and data files, as well as
    delimiters and database name. Built after all data has been
    processed to ensure that nodes are called before any edges.

    Returns
    -------
        str: a bash command for postgresql import

    """
    import_call = ""

    # create tables
    # At this point, csv files of nodes and edges do not require differentiation
    for import_file_path in [
        *self.import_call_nodes,
        *self.import_call_edges,
    ]:
        import_call += f'echo "Setup {import_file_path}..."\n'
        if {self.db_password}:
            # set password variable inline
            import_call += f"PGPASSWORD={self.db_password} "
        import_call += f"{self.import_call_bin_prefix}psql -f {import_file_path}"
        import_call += f" --dbname {self.db_name}"
        import_call += f" --host {self.db_host}"
        import_call += f" --port {self.db_port}"
        import_call += f" --user {self.db_user}"
        import_call += '\necho "Done!"\n'
        import_call += "\n"

    # copy data to tables
    for command in self._copy_from_csv_commands:
        table_part = command.split(" ")[3]
        import_call += f'echo "Importing {table_part}..."\n'
        if {self.db_password}:
            # set password variable inline
            import_call += f"PGPASSWORD={self.db_password} "
        import_call += f'{self.import_call_bin_prefix}psql -c "{command}"'
        import_call += f" --dbname {self.db_name}"
        import_call += f" --host {self.db_host}"
        import_call += f" --port {self.db_port}"
        import_call += f" --user {self.db_user}"
        import_call += '\necho "Done!"\n'
        import_call += "\n"

    return import_call

_get_default_import_call_bin_prefix()

Provide the default string for the import call bin prefix.

Returns
str: The default location for the psql command
Source code in biocypher/output/write/relational/_postgresql.py
def _get_default_import_call_bin_prefix(self) -> str:
    """Provide the default string for the import call bin prefix.

    Returns
    -------
        str: The default location for the psql command

    """
    return ""

_get_import_script_name()

Return the name of the psql import script.

Returns
str: The name of the import script (ending in .sh)
Source code in biocypher/output/write/relational/_postgresql.py
def _get_import_script_name(self) -> str:
    """Return the name of the psql import script.

    Returns
    -------
        str: The name of the import script (ending in .sh)

    """
    return f"{self.db_name}-import-call.sh"

_quote_string(value)

Quote a string.

Source code in biocypher/output/write/relational/_postgresql.py
def _quote_string(self, value: str) -> str:
    """Quote a string."""
    return f"{self.quote}{value}{self.quote}"

_write_array_string(string_list)

Write the string representation of an array into a .csv file.

Abstract method to output.write the string representation of an array into a .csv file as required by the postgresql COPY command, with '{','}' brackets and ',' separation.


string_list (list): list of ontology strings

str: The string representation of an array for postgres COPY
Source code in biocypher/output/write/relational/_postgresql.py
def _write_array_string(self, string_list) -> str:
    """Write the string representation of an array into a .csv file.

    Abstract method to output.write the string representation of an array
    into a .csv file as required by the postgresql COPY command, with
    '{','}' brackets and ',' separation.

    Args:
    ----
        string_list (list): list of ontology strings

    Returns:
    -------
        str: The string representation of an array for postgres COPY

    """
    string = ",".join(string_list)
    string = f'"{{{string}}}"'
    return string

_write_edge_headers()

Writes single CSV file for a graph entity that is represented as an edge as per the definition in the schema_config.yaml, containing only the header for this type of edge.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/relational/_postgresql.py
def _write_edge_headers(self):
    """Writes single CSV file for a graph entity that is represented
    as an edge as per the definition in the `schema_config.yaml`,
    containing only the header for this type of edge.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    # load headers from data parse
    if not self.edge_property_dict:
        logger.error(
            "Header information not found. Was the data parsed first?",
        )
        return False

    for label, props in self.edge_property_dict.items():
        # translate label to PascalCase
        pascal_label = self.translator.name_sentence_to_pascal(label)

        parts_paths = os.path.join(self.outdir, f"{pascal_label}-part*.csv")
        parts_paths = glob.glob(parts_paths)
        parts_paths.sort()

        # adjust label for import to psql
        pascal_label = self._adjust_pascal_to_psql(pascal_label)
        table_create_command_path = os.path.join(
            self.outdir,
            f"{pascal_label}-create_table.sql",
        )

        # check for file exists
        if os.path.exists(table_create_command_path):
            logger.warning(
                f"File {table_create_command_path} already exists. Overwriting.",
            )

        # concatenate key:value in props
        columns = []
        for col_name, col_type in props.items():
            col_type = self._get_data_type(col_type)
            col_name = self._adjust_pascal_to_psql(col_name)
            if col_name == "_ID":
                # should ideally never happen
                raise ValueError(
                    "Column name '_ID' is reserved for internal use, "
                    "denoting the relationship ID. Please choose a "
                    "different name for your column.",
                )

            columns.append(f"{col_name} {col_type}")

        # create list of lists and flatten
        # removes need for empty check of property list
        out_list = [
            "_START_ID VARCHAR",
            "_ID VARCHAR",
            *columns,
            "_END_ID VARCHAR",
            "_TYPE VARCHAR",
        ]

        with open(table_create_command_path, "w", encoding="utf-8") as f:
            command = ""
            if self.wipe:
                command += f"DROP TABLE IF EXISTS {pascal_label};\n"

            # table creation requires comma separation
            command += f"CREATE TABLE {pascal_label}({','.join(out_list)});\n"
            f.write(command)

            for parts_path in parts_paths:
                # if import_call_file_prefix is set, replace actual path
                # with prefix
                if self.import_call_file_prefix != self.outdir:
                    parts_path = parts_path.replace(
                        self.outdir,
                        self.import_call_file_prefix,
                    )

                self._copy_from_csv_commands.add(
                    f"\\copy {pascal_label} FROM '{parts_path}' DELIMITER E'{self.delim}' CSV;",
                )

        # add file path to import statement
        # if import_call_file_prefix is set, replace actual path
        # with prefix
        if self.import_call_file_prefix != self.outdir:
            table_create_command_path = table_create_command_path.replace(
                self.outdir,
                self.import_call_file_prefix,
            )

        self.import_call_edges.add(table_create_command_path)

    return True

_write_node_headers()

Write node header files for PostgreSQL.

Writes single CSV file for a graph entity that is represented as a node as per the definition in the schema_config.yaml, containing only the header for this type of node.

Returns
bool: The return value. True for success, False otherwise.
Source code in biocypher/output/write/relational/_postgresql.py
def _write_node_headers(self) -> bool:
    """Write node header files for PostgreSQL.

    Writes single CSV file for a graph entity that is represented
    as a node as per the definition in the `schema_config.yaml`,
    containing only the header for this type of node.

    Returns
    -------
        bool: The return value. True for success, False otherwise.

    """
    # load headers from data parse
    if not self.node_property_dict:
        logger.error(
            "Header information not found. Was the data parsed first?",
        )
        return False

    for label, props in self.node_property_dict.items():
        # create header CSV with ID, properties, labels

        # translate label to PascalCase
        pascal_label = self.translator.name_sentence_to_pascal(label)

        parts = f"{pascal_label}-part*.csv"
        parts_paths = os.path.join(self.outdir, parts)
        parts_paths = glob.glob(parts_paths)
        parts_paths.sort()

        # adjust label for import to psql
        pascal_label = self._adjust_pascal_to_psql(pascal_label)
        table_create_command_path = os.path.join(
            self.outdir,
            f"{pascal_label}-create_table.sql",
        )

        # check if file already exists
        if os.path.exists(table_create_command_path):
            logger.warning(
                f"File {table_create_command_path} already exists. Overwriting.",
            )

        # concatenate key:value in props
        columns = ["_ID VARCHAR"]
        for col_name, col_type in props.items():
            col_type = self._get_data_type(col_type)
            col_name = self._adjust_pascal_to_psql(col_name)
            columns.append(f"{col_name} {col_type}")
        columns.append("_LABEL VARCHAR[]")

        with open(table_create_command_path, "w", encoding="utf-8") as f:
            command = ""
            if self.wipe:
                command += f"DROP TABLE IF EXISTS {pascal_label};\n"

            # table creation requires comma separation
            command += f"CREATE TABLE {pascal_label}({','.join(columns)});\n"
            f.write(command)

            for parts_path in parts_paths:
                # if import_call_file_prefix is set, replace actual path
                # with prefix
                if self.import_call_file_prefix != self.outdir:
                    parts_path = parts_path.replace(
                        self.outdir,
                        self.import_call_file_prefix,
                    )

                self._copy_from_csv_commands.add(
                    f"\\copy {pascal_label} FROM '{parts_path}' DELIMITER E'{self.delim}' CSV;",
                )

        # add file path to import statement
        # if import_call_file_prefix is set, replace actual path
        # with prefix
        if self.import_call_file_prefix != self.outdir:
            table_create_command_path = table_create_command_path.replace(
                self.outdir,
                self.import_call_file_prefix,
            )

        self.import_call_nodes.add(table_create_command_path)

    return True

SQLite Batch Writer

Bases: _PostgreSQLBatchWriter

Class for writing node and edge representations to a SQLite database. It uses the _PostgreSQLBatchWriter class under the hood, which already implements the logic to write the nodes/edges to a relational DBMS. Only the import bash script differs between PostgreSQL and SQLite and is therefore implemented in this class.

  • _construct_import_call
Source code in biocypher/output/write/relational/_sqlite.py
class _SQLiteBatchWriter(_PostgreSQLBatchWriter):
    """
    Class for writing node and edge representations to a SQLite database.
    It uses the _PostgreSQLBatchWriter class under the hood, which already
    implements the logic to write the nodes/edges to a relational DBMS.
    Only the import bash script differs between PostgreSQL and SQLite
    and is therefore implemented in this class.

    - _construct_import_call
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def _construct_import_call(self) -> str:
        """
        Function to construct the import call detailing folder and
        individual node and edge headers and data files, as well as
        delimiters and database name. Built after all data has been
        processed to ensure that nodes are called before any edges.

        Returns:
            str: a bash command for sqlite import
        """
        import_call = ""

        # create tables
        # At this point, csv files of nodes and edges do not require differentiation
        for import_file_path in [
            *self.import_call_nodes,
            *self.import_call_edges,
        ]:
            import_call += f'echo "Setup {import_file_path}..."\n'
            import_call += f"{self.import_call_bin_prefix}sqlite3 {self.db_name} < {import_file_path}"
            import_call += '\necho "Done!"\n'
            import_call += "\n"

        for command in self._copy_from_csv_commands:
            table_name = command.split(" ")[1]
            table_part = command.split(" ")[3].replace("'", "")
            import_call += f'echo "Importing {table_part}..."\n'
            separator = self.delim
            import_part = f".import {table_part} {table_name}"
            import_call += (
                f"{self.import_call_bin_prefix}sqlite3 -separator $'{separator}' {self.db_name} \"{import_part}\""
            )
            import_call += '\necho "Done!"\n'
            import_call += "\n"

        return import_call

_construct_import_call()

Function to construct the import call detailing folder and individual node and edge headers and data files, as well as delimiters and database name. Built after all data has been processed to ensure that nodes are called before any edges.

Returns:

Name Type Description
str str

a bash command for sqlite import

Source code in biocypher/output/write/relational/_sqlite.py
def _construct_import_call(self) -> str:
    """
    Function to construct the import call detailing folder and
    individual node and edge headers and data files, as well as
    delimiters and database name. Built after all data has been
    processed to ensure that nodes are called before any edges.

    Returns:
        str: a bash command for sqlite import
    """
    import_call = ""

    # create tables
    # At this point, csv files of nodes and edges do not require differentiation
    for import_file_path in [
        *self.import_call_nodes,
        *self.import_call_edges,
    ]:
        import_call += f'echo "Setup {import_file_path}..."\n'
        import_call += f"{self.import_call_bin_prefix}sqlite3 {self.db_name} < {import_file_path}"
        import_call += '\necho "Done!"\n'
        import_call += "\n"

    for command in self._copy_from_csv_commands:
        table_name = command.split(" ")[1]
        table_part = command.split(" ")[3].replace("'", "")
        import_call += f'echo "Importing {table_part}..."\n'
        separator = self.delim
        import_part = f".import {table_part} {table_name}"
        import_call += (
            f"{self.import_call_bin_prefix}sqlite3 -separator $'{separator}' {self.db_name} \"{import_part}\""
        )
        import_call += '\necho "Done!"\n'
        import_call += "\n"

    return import_call

Pandas CSV Writer

Bases: _Writer

Class for writing node and edge representations to CSV files.

Source code in biocypher/output/write/relational/_csv.py
class _PandasCSVWriter(_Writer):
    """
    Class for writing node and edge representations to CSV files.
    """

    def __init__(self, *args, write_to_file: bool = True, **kwargs):
        kwargs["write_to_file"] = write_to_file
        super().__init__(*args, **kwargs)
        self.in_memory_dfs = {}
        self.stored_dfs = {}
        self.pandas_in_memory = PandasKG(
            deduplicator=self.deduplicator,
        )
        self.delimiter = kwargs.get("delimiter")
        if not self.delimiter:
            self.delimiter = ","
        self.write_to_file = write_to_file

    def _construct_import_call(self) -> str:
        """Function to construct the Python code to load all node and edge csv files again into Pandas dfs.

        Returns:
            str: Python code to load the csv files into Pandas dfs.
        """
        import_call = "import pandas as pd\n\n"
        for df_name in self.stored_dfs.keys():
            import_call += f"{df_name} = pd.read_csv('./{df_name}.csv', header=0, index_col=0)\n"
        return import_call

    def _get_import_script_name(self) -> str:
        """Function to return the name of the import script."""
        return "import_pandas_csv.py"

    def _write_node_data(self, nodes) -> bool:
        passed = self._write_entities_to_file(nodes)
        return passed

    def _write_edge_data(self, edges) -> bool:
        passed = self._write_entities_to_file(edges)
        return passed

    def _write_entities_to_file(self, entities: iter) -> bool:
        """Function to write the entities to a CSV file.

        Args:
            entities (iterable): An iterable of BioCypherNode / BioCypherEdge / BioCypherRelAsNode objects.
        """
        entities = peekable(entities)
        entity_list = self.pandas_in_memory._separate_entity_types(entities)
        for entity_type, entities in entity_list.items():
            self.in_memory_dfs[entity_type] = self.pandas_in_memory._add_entity_df(entity_type, entities)
        for entity_type in self.in_memory_dfs.keys():
            entity_df = self.in_memory_dfs[entity_type]
            if " " in entity_type or "." in entity_type:
                entity_type = entity_type.replace(" ", "_").replace(".", "_")
            if self.write_to_file:
                logger.info(f"Writing {entity_df.shape[0]} entries to {entity_type}.csv.")
                entity_df.to_csv(
                    f"{self.output_directory}/{entity_type}.csv",
                    sep=self.delimiter,
                )
            self.stored_dfs[entity_type] = entity_df
        self.in_memory_dfs = {}
        return True

_construct_import_call()

Function to construct the Python code to load all node and edge csv files again into Pandas dfs.

Returns:

Name Type Description
str str

Python code to load the csv files into Pandas dfs.

Source code in biocypher/output/write/relational/_csv.py
def _construct_import_call(self) -> str:
    """Function to construct the Python code to load all node and edge csv files again into Pandas dfs.

    Returns:
        str: Python code to load the csv files into Pandas dfs.
    """
    import_call = "import pandas as pd\n\n"
    for df_name in self.stored_dfs.keys():
        import_call += f"{df_name} = pd.read_csv('./{df_name}.csv', header=0, index_col=0)\n"
    return import_call

_get_import_script_name()

Function to return the name of the import script.

Source code in biocypher/output/write/relational/_csv.py
def _get_import_script_name(self) -> str:
    """Function to return the name of the import script."""
    return "import_pandas_csv.py"

_write_entities_to_file(entities)

Function to write the entities to a CSV file.

Parameters:

Name Type Description Default
entities iterable

An iterable of BioCypherNode / BioCypherEdge / BioCypherRelAsNode objects.

required
Source code in biocypher/output/write/relational/_csv.py
def _write_entities_to_file(self, entities: iter) -> bool:
    """Function to write the entities to a CSV file.

    Args:
        entities (iterable): An iterable of BioCypherNode / BioCypherEdge / BioCypherRelAsNode objects.
    """
    entities = peekable(entities)
    entity_list = self.pandas_in_memory._separate_entity_types(entities)
    for entity_type, entities in entity_list.items():
        self.in_memory_dfs[entity_type] = self.pandas_in_memory._add_entity_df(entity_type, entities)
    for entity_type in self.in_memory_dfs.keys():
        entity_df = self.in_memory_dfs[entity_type]
        if " " in entity_type or "." in entity_type:
            entity_type = entity_type.replace(" ", "_").replace(".", "_")
        if self.write_to_file:
            logger.info(f"Writing {entity_df.shape[0]} entries to {entity_type}.csv.")
            entity_df.to_csv(
                f"{self.output_directory}/{entity_type}.csv",
                sep=self.delimiter,
            )
        self.stored_dfs[entity_type] = entity_df
    self.in_memory_dfs = {}
    return True