Skip to content

trestle.common.model_utils

trestle.common.model_utils ¤

Common utilities for the OSCAL models and directories.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

ModelUtils ¤

Utilities for the OSCAL models input and output.

Source code in trestle/common/model_utils.py
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
class ModelUtils:
    """Utilities for the OSCAL models input and output."""

    @staticmethod
    def load_distributed(
        abs_path: Path, abs_trestle_root: Path, collection_type: Optional[Type[Any]] = None
    ) -> Tuple[
        Type[OscalBaseModel], str, Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]]
    ]:
        """
        Given path to a model, load the model.

        If the model is decomposed/split/distributed,the decomposed models are loaded recursively.

        Args:
            abs_path: The path to the file/directory to be loaded.
            abs_trestle_root: The trestle workspace root directory.
            collection_type: The type of collection model, if it is a collection model.
                typing.List is the only collection type handled or expected.
                Defaults to None.

        Returns:
            Return a tuple of Model Type (e.g. class 'trestle.oscal.catalog.Catalog'),
            Model Alias (e.g. 'catalog.metadata') and Instance of the Model.
            If the model is decomposed/split/distributed, the instance of the model contains
                the decomposed models loaded recursively.

        Note:
            This does not validate the model.  You must either validate the model separately or use the load_validate
            utilities.
        """
        # if trying to load file that does not exist, load path instead
        if not abs_path.exists():
            abs_path = abs_path.with_name(abs_path.stem)

        if not abs_path.exists():
            raise TrestleNotFoundError(f'File {abs_path} not found for load.')

        if collection_type:
            # If the path contains a list type model
            if collection_type is list:
                return ModelUtils._load_list(abs_path, abs_trestle_root)
            # the only other collection type in OSCAL is dict, and it only applies to include_all,
            # which is too granular ever to be loaded by this routine
            else:
                raise TrestleError(f'Collection type {collection_type} not recognized for distributed load.')

        # Get current model
        primary_model_type, primary_model_alias = ModelUtils.get_stripped_model_type(abs_path, abs_trestle_root)
        primary_model_instance: Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]] = None

        # is this an attempt to load an actual json or yaml file?
        content_type = FileContentType.path_to_content_type(abs_path)
        # if file is sought but it doesn't exist, ignore and load as decomposed model
        if FileContentType.is_readable_file(content_type) and abs_path.exists():
            # Use the model type as-is (may be wrapped Union) for reading
            # The smart validators in generated models will choose the correct variant
            primary_model_instance = primary_model_type.oscal_read(abs_path)
            # If the instance has __root__, unwrap it to get the actual model
            if hasattr(primary_model_instance, '__root__'):
                root_val = primary_model_instance.__root__
                # Only unwrap if it's a single OscalBaseModel, not a list
                if isinstance(root_val, OscalBaseModel):
                    primary_model_instance = root_val
        # Is model decomposed?
        decomposed_dir = abs_path.with_name(abs_path.stem)

        if decomposed_dir.exists():
            aliases_not_to_be_stripped = []
            instances_to_be_merged: List[OscalBaseModel] = []

            for local_path in sorted(trestle.common.file_utils.iterdir_without_hidden_files(decomposed_dir)):
                if local_path.is_file():
                    model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path, abs_trestle_root)
                    aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                    instances_to_be_merged.append(model_instance)

                elif local_path.is_dir():
                    model_type, model_alias = ModelUtils.get_stripped_model_type(local_path, abs_trestle_root)
                    # Only load the directory if it is a collection model. Otherwise do nothing - it gets loaded when
                    # iterating over the model file

                    # If a model is just a container for a list e.g.
                    # class Foo(OscalBaseModel):  noqa: E800
                    #      __root__: List[Bar]    noqa: E800
                    # You need to test whether first a root key exists
                    # then whether the outer_type of root is a collection.
                    # Alternative is to do a try except to avoid the error for an unknown key.

                    if model_type.is_collection_container():
                        # This directory is a decomposed List or Dict
                        collection_type = model_type.get_collection_type()
                        model_type, model_alias, model_instance = ModelUtils.load_distributed(
                            local_path, abs_trestle_root, collection_type
                        )
                        aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                        instances_to_be_merged.append(model_instance)
            primary_model_dict = {}
            if primary_model_instance is not None:
                primary_model_dict = primary_model_instance.__dict__

            merged_model_type, merged_model_alias = ModelUtils.get_stripped_model_type(
                abs_path, abs_trestle_root, aliases_not_to_be_stripped
            )

            # The following use of top_level is to allow loading of a top level model by name only, e.g. MyCatalog
            # There may be a better overall way to approach this.
            top_level = len(merged_model_alias.split('.')) == 1

            for i in range(len(aliases_not_to_be_stripped)):
                alias = aliases_not_to_be_stripped[i]
                instance = instances_to_be_merged[i]
                if (
                    hasattr(instance, '__dict__')
                    and '__root__' in instance.__dict__
                    and isinstance(instance, OscalBaseModel)
                ):
                    instance = instance.__dict__['__root__']
                if top_level and not primary_model_dict:
                    primary_model_dict = instance.__dict__
                else:
                    primary_model_dict[alias] = instance

            # If merged_model_type is a wrapped Union (has __root__), we need to unwrap it
            # to get the actual model type for instantiation
            actual_model_type = merged_model_type
            if hasattr(merged_model_type, '__fields__') and '__root__' in merged_model_type.__fields__:
                # This is a wrapped Union model - extract the Union type from __root__
                root_field = merged_model_type.__fields__['__root__']
                root_type = root_field.outer_type_ if hasattr(root_field, 'outer_type_') else root_field.type_
                # Inspect primary_model_dict to determine which Union variant to use
                # Look for distinctive fields that indicate which variant
                # For Group1|Group2: 'groups' -> Group1, 'controls' -> Group2
                # For Parameter1|Parameter2: 'values' -> Parameter1, 'select' -> Parameter2
                field_hint = None
                distinctive_fields = ['controls', 'groups', 'values', 'select', 'insert-controls']
                for key in primary_model_dict.keys():
                    if key in distinctive_fields:
                        field_hint = key
                        break
                # If no distinctive field found, use any field
                if field_hint is None and primary_model_dict:
                    field_hint = next(iter(primary_model_dict.keys()))
                # Resolve the Union to get an actual model type based on the data
                actual_model_type = _get_model_type_from_union(root_type, field_hint)

            merged_model_instance = actual_model_type(**primary_model_dict)
            return merged_model_type, merged_model_alias, merged_model_instance
        return primary_model_type, primary_model_alias, primary_model_instance

    @staticmethod
    def load_model_for_class(
        trestle_root: pathlib.Path,
        model_name: str,
        model_class: TG,
        file_content_type: Optional[FileContentType] = None,
    ) -> Tuple[TG, pathlib.Path]:
        """Load a model by name and model class and infer file content type if not specified.

        If you need to load an existing model but its content type may not be known, use this method.
        But the file content type should be specified if it is somehow known.

        Note:
            This does not validate the model.  If you want to validate the model use the load_validate utilities.
        """
        root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)  # type: ignore
        if file_content_type is None:
            file_content_type = FileContentType.path_to_content_type(root_model_path)
        if not FileContentType.is_readable_file(file_content_type):
            raise TrestleError(f'Unable to load model {model_name} without specifying json or yaml.')
        full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
        _, _, model = ModelUtils.load_distributed(full_model_path, trestle_root)
        return model, full_model_path  # type: ignore

    @staticmethod
    def load_model_for_type(
        trestle_root: pathlib.Path, model_type: str, model_name: str
    ) -> Tuple[TopLevelOscalModel, pathlib.Path]:
        """Load model for the given type and name."""
        dir_name = ModelUtils.model_type_to_model_dir(model_type)
        model_path = trestle_root / dir_name / model_name

        if not model_path.exists():
            raise TrestleError(f'No model is found at path: {model_path}.')

        _, _, oscal_object = ModelUtils.load_distributed(model_path, trestle_root)

        return oscal_object, model_path  # type: ignore

    @staticmethod
    def save_top_level_model(
        model: TopLevelOscalModel, trestle_root: pathlib.Path, model_name: str, file_content_type: FileContentType
    ) -> None:
        """Save a model by name and infer model type by inspection.

        You don't need to specify the model type (catalog, profile, etc.) but you must specify the file content type.
        If the model directory does not exist, it is created.
        """
        root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model)
        full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
        if not full_model_path.parent.exists():
            full_model_path.parent.mkdir(parents=True, exist_ok=True)
        model.oscal_write(full_model_path)

    @staticmethod
    def get_relative_model_type(relative_path: pathlib.Path) -> Tuple[Type[OscalBaseModel], str]:
        """
        Given the relative path of a file with respect to 'trestle_root' return the oscal model type.

        Args:
            relative_path: Relative path of the model with respect to the root directory of the trestle workspace.
        Returns:
            Type of Oscal Model for the provided model
            Alias of that oscal model.
        """
        if len(relative_path.parts) < 2:
            raise TrestleError(
                'Insufficient path length to be a valid relative path w.r.t trestle workspace root directory.'
            )
        model_dir = relative_path.parts[0]
        model_relative_path = pathlib.Path(*relative_path.parts[2:])  # catalogs, profiles, etc

        if model_dir in const.MODEL_DIR_LIST:
            module_name = const.MODEL_DIR_TO_MODEL_MODULE[model_dir]
        else:
            raise TrestleError(f'No valid trestle model type directory (e.g. catalogs) found for {model_dir}.')

        model_type, model_alias = ModelUtils.get_root_model(module_name)
        full_alias = model_alias

        for index, part in enumerate(model_relative_path.parts):
            alias = ModelUtils._extract_alias(part)
            if index > 0 or model_alias != alias:
                model_alias = alias
                full_alias = f'{full_alias}.{model_alias}'
                if utils.is_collection_field_type(model_type):
                    model_type = utils.get_inner_type(model_type)
                else:
                    model_type = _get_model_type_from_union(model_type, alias)
                    model_type = model_type.alias_to_field_map()[alias].outer_type_

        return model_type, full_alias

    @staticmethod
    def get_stripped_model_type(
        absolute_path: pathlib.Path, absolute_trestle_root: pathlib.Path, aliases_not_to_be_stripped: List[str] = None
    ) -> Tuple[Type[OscalBaseModel], str]:
        """
        Get the stripped contextual model class and alias based on the contextual path.

        This function relies on the directory structure of the trestle model being edited to determine, based on the
        existing files and folder, which fields should be stripped from the model type represented by the
        path passed in as a parameter.
        """
        if aliases_not_to_be_stripped is None:
            aliases_not_to_be_stripped = []
        singular_model_type, model_alias = ModelUtils.get_relative_model_type(
            absolute_path.relative_to(absolute_trestle_root)
        )
        logger.debug(f'singular model type {singular_model_type} model alias {model_alias}')

        # Stripped models do not apply to collection types such as List[] and Dict{}
        # if model type is a list or dict, generate a new wrapping model for it
        if utils.is_collection_field_type(singular_model_type):
            malias = model_alias.split('.')[-1]
            class_name = alias_to_classname(malias, AliasMode.JSON)
            logger.debug(f'collection field type class name {class_name} and alias {malias}')
            model_type = create_model(class_name, __base__=OscalBaseModel, __root__=(singular_model_type, ...))
            logger.debug(f'model_type created: {model_type}')
            return model_type, model_alias

        malias = model_alias.split('.')[-1]
        logger.debug(f'not collection field type, malias: {malias}')

        # Check if this is a Union type FIRST, before stripping logic
        origin = get_origin(singular_model_type)
        is_union = origin is Union or (hasattr(types, 'UnionType') and origin is types.UnionType)

        if absolute_path.is_dir() and malias != ModelUtils._extract_alias(absolute_path.name):
            split_subdir = absolute_path / malias
        else:
            split_subdir = absolute_path.parent / absolute_path.with_suffix('').name

        aliases_to_be_stripped = set()
        if split_subdir.exists():
            for f in iterdir_without_hidden_files(split_subdir):
                alias = ModelUtils._extract_alias(f.name)
                if alias not in aliases_not_to_be_stripped:
                    aliases_to_be_stripped.add(alias)

        logger.debug(f'aliases to be stripped: {aliases_to_be_stripped}')

        # For Union types, use subdirectories to SELECT variant, not strip fields
        if is_union and len(aliases_to_be_stripped) > 0:
            # Use the first subdirectory name as a hint for which Union variant to use
            field_hint = next(iter(aliases_to_be_stripped))
            logger.debug(f'Union type: using field hint "{field_hint}" to select variant')
            singular_model_type = _get_model_type_from_union(singular_model_type, field_hint)
            # Now proceed with stripping for the selected variant
            model_type = singular_model_type.create_stripped_model_type(
                stripped_fields_aliases=list(aliases_to_be_stripped)
            )
            logger.debug(f'model_type: {model_type}')
            return model_type, model_alias
        elif len(aliases_to_be_stripped) > 0:
            # Non-Union type: normal stripping logic
            model_type = singular_model_type.create_stripped_model_type(
                stripped_fields_aliases=list(aliases_to_be_stripped)
            )
            logger.debug(f'model_type: {model_type}')
            return model_type, model_alias
        # Handle Union types even when no stripping is needed
        origin = get_origin(singular_model_type)
        if origin is Union or (hasattr(types, 'UnionType') and origin is types.UnionType):
            # Check if there are subdirectories that indicate which variant to use
            # If absolute_path is a directory, look inside it; otherwise look in parent
            if absolute_path.is_dir():
                split_subdir = absolute_path
            else:
                split_subdir = absolute_path.parent / absolute_path.with_suffix('').name
            field_hint = None
            if split_subdir.exists() and split_subdir.is_dir():
                # Check what subdirectories exist to determine which Union variant
                for item in split_subdir.iterdir():
                    if item.is_dir():
                        # Use the subdirectory name as a hint for which field exists
                        # controls -> catalog Group2, insert-controls -> profile Group2, groups -> Group1
                        field_hint = item.name
                        logger.debug(f'Using subdirectory {field_hint} to select Union variant')
                        break

            # If we have a field hint from subdirectories, resolve to specific variant
            # Otherwise, wrap the Union for file reading (smart validators will choose)
            if field_hint:
                singular_model_type = _get_model_type_from_union(singular_model_type, field_hint)
            else:
                # No subdirectory hint - wrap Union for reading
                # This allows smart validators to choose the correct variant at deserialization time
                malias = model_alias.split('.')[-1]
                class_name = alias_to_classname(malias, AliasMode.JSON)
                logger.debug(f'Wrapping Union type {singular_model_type} in __root__ model')
                model_type = create_model(class_name, __base__=OscalBaseModel, __root__=(singular_model_type, ...))
                return model_type, model_alias
        else:
            singular_model_type = _get_model_type_from_union(singular_model_type)
        return singular_model_type, model_alias

    @staticmethod
    def model_type_to_model_dir(model_type: str) -> str:
        """Get plural model directory from model type."""
        if model_type not in const.MODEL_TYPE_LIST:
            raise err.TrestleError(f'Not a valid model type: {model_type}.')
        return const.MODEL_TYPE_TO_MODEL_DIR[model_type]

    @staticmethod
    def get_models_of_type(model_type: str, root: pathlib.Path) -> List[str]:
        """Get list of model names for requested type in trestle directory."""
        if model_type not in const.MODEL_TYPE_LIST:
            raise err.TrestleError(f'Model type {model_type} is not supported')
        # search relative to project root
        trestle_root = extract_trestle_project_root(root)
        if not trestle_root:
            logger.error(f'Given directory {root} is not within a trestle project.')
            raise err.TrestleError('Given directory is not within a trestle project.')

        # contruct path to the model file name
        model_dir_name = ModelUtils.model_type_to_model_dir(model_type)
        root_model_dir = trestle_root / model_dir_name
        model_list = []
        for f in root_model_dir.glob('*/'):
            # Use f.name for directories to preserve full directory name including dots
            # f.stem is for files and incorrectly treats dots as file extensions
            dir_name = f.name
            if not ModelUtils._should_ignore(dir_name):
                if not f.is_dir():
                    logger.warning(
                        f'Ignoring validation of misplaced file {dir_name} '
                        + f'found in the model directory, {model_dir_name}.'
                    )
                else:
                    model_list.append(dir_name)
        return model_list

    @staticmethod
    def get_all_models(root: pathlib.Path) -> List[Tuple[str, str]]:
        """Get list of all models in trestle directory as tuples (model_type, model_name)."""
        full_list = []
        for model_type in const.MODEL_TYPE_LIST:
            models = ModelUtils.get_models_of_type(model_type, root)
            for m in models:
                full_list.append((model_type, m))
        return full_list

    @staticmethod
    def get_model_path_for_name_and_class(
        trestle_root: pathlib.Path,
        model_name: str,
        model_class: Type[TopLevelOscalModel],
        file_content_type: Optional[FileContentType] = None,
    ) -> Optional[pathlib.Path]:
        """
        Find the full path of a model given its name, model type and file content type.

        If file_content_type is given it will not inspect the file system or confirm the needed path and file exists.
        """
        if file_content_type is None:
            root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
            file_content_type = FileContentType.path_to_content_type(root_model_path)
            if not FileContentType.is_readable_file(file_content_type):
                return None

            return root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))

        root_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
        return root_path.with_suffix(FileContentType.to_file_extension(file_content_type))

    @staticmethod
    def get_singular_alias(alias_path: str, relative_path: Optional[pathlib.Path] = None) -> str:
        """
        Get the alias in the singular form from a jsonpath.

        If contextual_mode is True and contextual_path is None, it assumes alias_path
        is relative to the directory the user is running trestle from.

        Args:
            alias_path: The current alias element path as a string
            relative_path: Optional relative path (w.r.t. trestle_root) to cater for relative element paths.
        Returns:
            Alias as a string
        """
        if len(alias_path.strip()) == 0:
            raise err.TrestleError(f'Invalid jsonpath {alias_path}')

        singular_alias: str = ''

        full_alias_path = alias_path
        if relative_path:
            logger.debug(f'get_singular_alias contextual mode: {str}')
            _, full_model_alias = ModelUtils.get_relative_model_type(relative_path)
            first_alias_a = full_model_alias.split('.')[-1]
            first_alias_b = alias_path.split('.')[0]
            if first_alias_a == first_alias_b:
                full_model_alias = '.'.join(full_model_alias.split('.')[:-1])
            full_alias_path = '.'.join([full_model_alias, alias_path]).strip('.')

        path_parts = full_alias_path.split(const.ALIAS_PATH_SEPARATOR)
        logger.debug(f'path parts: {path_parts}')

        model_types = []

        root_model_alias = path_parts[0]
        found = False
        for module_name in const.MODEL_TYPE_TO_MODEL_MODULE.values():
            model_type, model_alias = ModelUtils.get_root_model(module_name)
            if root_model_alias == model_alias:
                found = True
                model_types.append(model_type)
                break

        if not found:
            raise err.TrestleError(f'{root_model_alias} is an invalid root model alias.')

        if len(path_parts) == 1:
            return root_model_alias

        model_type = model_types[0]
        # go through path parts skipping first one
        for i in range(1, len(path_parts)):
            if utils.is_collection_field_type(model_type):
                # if it is a collection type and last part is * then break
                if i == len(path_parts) - 1 and path_parts[i] == '*':
                    break
                # otherwise get the inner type of items in the collection
                model_type = utils.get_inner_type(model_type)
                # and bump i
                i = i + 1
            else:
                path_part = path_parts[i]
                model_type = _get_model_type_from_union(model_type, path_part)
                field_map = model_type.alias_to_field_map()
                if path_part not in field_map:
                    continue
                field = field_map[path_part]
                model_type = field.outer_type_
            model_types.append(model_type)

        last_alias = path_parts[-1]
        if last_alias == '*':
            last_alias = path_parts[-2]

        # generic model and not list, so return itself fixme doc
        if not utils.is_collection_field_type(model_type):
            return last_alias

        parent_model_type = model_types[-2]
        try:
            parent_model_type = _get_model_type_from_union(parent_model_type, last_alias)
            field_map = parent_model_type.alias_to_field_map()
            field = field_map[last_alias]
            outer_type = field.outer_type_
            inner_type = utils.get_inner_type(outer_type)

            # Handle Union types - if inner_type is a Union, get the first non-None type
            origin = utils.get_origin(inner_type)
            if origin == Union or str(origin) == "<class 'types.UnionType'>":
                union_args = get_args(inner_type)
                # Find first non-None type in the union
                for arg in union_args:
                    if arg is not type(None):
                        inner_type = arg
                        break

            inner_type_name = inner_type.__name__
            singular_alias = str_utils.classname_to_alias(inner_type_name, AliasMode.JSON)
        except Exception as e:
            raise err.TrestleError(f'Error in json path {alias_path}: {e}')

        return singular_alias

    @staticmethod
    def get_root_model(module_name: str) -> Tuple[Type[Any], str]:
        """Get the root model class and alias based on the module."""
        try:
            module = importlib.import_module(module_name)
        except ModuleNotFoundError as e:
            raise err.TrestleError(str(e))

        if hasattr(module, 'Model'):
            model_metadata = next(iter(module.Model.__fields__.values()))
            return model_metadata.type_, model_metadata.alias
        raise err.TrestleError('Invalid module')

    @staticmethod
    def _root_path_for_top_level_model(
        trestle_root: pathlib.Path, model_name: str, model_class: Union[TopLevelOscalModel, Type[TopLevelOscalModel]]
    ) -> pathlib.Path:
        """
        Find the root path to a model given its name and class - with no suffix.

        This is a private method used only to construct the root filepath based on model name and type.
        It does not check for existence or content type and it does not create the directory if it does not exist.
        """
        if not hasattr(model_class, '__module__') or model_class.__module__ not in const.MODEL_MODULE_LIST:
            raise TrestleError(f'Unable to determine model type for model {model_name} with class {model_class}')
        model_alias = const.MODEL_MODULE_TO_MODEL_TYPE[model_class.__module__]
        model_dir = trestle_root / f'{const.MODEL_TYPE_TO_MODEL_DIR[model_alias]}/{model_name}'
        return model_dir / model_alias

    @staticmethod
    def _extract_alias(string_dir: str) -> str:
        """
        Extract alias from filename or directory name removing extensions and prefixes related to dict and list.

        As we need to do this for multiple parts of a path operating on strings is easier.
        """
        alias = string_dir.split('.')[0].split(const.IDX_SEP)[
            -1
        ]  # get suffix of file or directory name representing list or dict item
        return alias

    @staticmethod
    def _should_ignore(name: str) -> bool:
        """Check if the file or directory should be ignored or not."""
        return name[0] == '.' or name[0] == '_'

    @staticmethod
    def _load_list(abs_path: Path, abs_trestle_root: Path) -> Tuple[Type[OscalBaseModel], str, List[OscalBaseModel]]:
        """Given path to a directory of list(array) models, load the distributed models."""
        aliases_not_to_be_stripped = []
        instances_to_be_merged: List[OscalBaseModel] = []
        collection_model_type, collection_model_alias = ModelUtils.get_stripped_model_type(abs_path, abs_trestle_root)
        for path in sorted(trestle.common.file_utils.iterdir_without_hidden_files(abs_path)):
            # ASSUMPTION HERE: if it is a directory, there's a file that can not be decomposed further.
            if path.is_dir():
                continue
            _, model_alias, model_instance = ModelUtils.load_distributed(path, abs_trestle_root)

            instances_to_be_merged.append(model_instance)
            aliases_not_to_be_stripped.append(model_alias.split('.')[-1])

        return collection_model_type, collection_model_alias, instances_to_be_merged

    @staticmethod
    def _parameter_to_dict_recurse(obj: Union[OscalBaseModel, str], partial: bool) -> Union[str, Dict[str, Any]]:
        """
        Convert obj to dict containing only string values with recursion.

        Args:
            obj: The parameter or its consituent parts in recursive calls
            partial: Whether to convert the entire param or just the parts needed for markdown header

        Returns:
            The converted parameter as dictionary
        """
        main_fields = ['id', 'label', 'values', 'select', 'choice', 'how_many', 'guidelines', 'prose']
        if isinstance(obj, common.Remarks):
            return obj.__root__
        if isinstance(obj, common.HowMany):
            return obj.value
        # it is either a string already or we cast it to string
        if not hasattr(obj, const.FIELDS_SET):
            return str(obj)
        # it is an oscal object and we need to recurse within its attributes
        res = {}
        for field in obj.__fields_set__:
            if partial and field not in main_fields:
                continue
            attr = getattr(obj, field)
            if not attr:
                continue
            if isinstance(attr, list):
                new_list = []
                for item in attr:
                    new_list.append(ModelUtils._parameter_to_dict_recurse(item, partial))
                res[field] = new_list
            elif isinstance(attr, str):
                res[field] = attr
            else:
                res[field] = ModelUtils._parameter_to_dict_recurse(attr, partial)
        return res

    @staticmethod
    def parameter_to_dict(obj: Union[OscalBaseModel, str], partial: bool) -> Union[str, Dict[str, Any]]:
        """
        Convert obj to dict containing only string values, storing only the fields that have values set.

        Args:
            obj: The parameter or its consituent parts in recursive calls
            partial: Whether to convert the entire param or just the parts needed for markdown header

        Returns:
            The converted parameter as dictionary, with values as None if not present for Parameter1
        """
        res = ModelUtils._parameter_to_dict_recurse(obj, partial)
        # Only add values: None for Parameter1 (which doesn't have select)
        # Parameter2 has select and should not have values field
        if 'values' not in res and 'select' not in res:
            res['values'] = None  # type: ignore
        return res

    @staticmethod
    def _string_to_howmany(count_str: str) -> Optional[str]:
        clean_str = count_str.lower().strip().replace('-', ' ').replace('_', ' ')
        if clean_str == const.ONE:
            return common.HowMany.one  # type: ignore
        if clean_str == const.ONE_OR_MORE_SPACED:
            return common.HowMany.one_or_more  # type: ignore
        return None

    @staticmethod
    def dict_to_parameter(param_dict: Dict[str, Any]) -> common.Parameter:
        """
        Convert dict with only string values to Parameter with handling for HowMany and with validity checks.

        Args:
            param_dict: Dictionary of pure string values representing Parameter contents

        Returns:
            A valid OSCAL Parameter

        Notes:
            This handles both partial and full parameter dictionaries
            It checks for validity of the values if a select and HowMany is specified
            There is special handling for values: If it is a single string it is converted to list of one ParameterValue
            But if it is a list of strings is regarded as a list of values and is converted to a list of ParameterValues
        """
        values = param_dict.get('values', [])
        # special handling when only one value present - convert to list of 1
        if isinstance(values, str):
            values = [values]
            param_dict['values'] = values
        if 'select' in param_dict and 'how_many' in param_dict['select']:
            count_str = param_dict['select']['how_many']
            how_many = ModelUtils._string_to_howmany(count_str)
            if how_many is None:
                raise TrestleError(f'Unrecognized HowMany value {how_many} in Parameter: should be one-or-more or one.')
            param_dict['select']['how_many'] = how_many
            if how_many == const.ONE and len(values) > 1:
                logger.warning(f'Parameter specifies HowMany=1 but has {len(values)} values given.')
            choices = param_dict['select'].get('choice', [])
            if choices and values:
                for value in values:
                    if value not in choices:
                        logger.warning(f'Parameter {param_dict["id"]} has value "{value}" not in choices: {choices}.')
        props = param_dict.get('props', [])
        if const.DISPLAY_NAME in param_dict:
            display_name = param_dict.pop(const.DISPLAY_NAME)
            props.append(common.Property(name=const.DISPLAY_NAME, value=display_name, ns=const.TRESTLE_GENERIC_NS))  # type: ignore[call-arg]
        if const.AGGREGATES in param_dict:
            # removing aggregates as this is prop just informative in markdown
            param_dict.pop(const.AGGREGATES)
        param_value_origin = None
        if const.PARAM_VALUE_ORIGIN in param_dict:
            param_value_origin = param_dict.pop(const.PARAM_VALUE_ORIGIN)
            if param_value_origin is not None:
                props.append(common.Property(name=const.PARAM_VALUE_ORIGIN, value=param_value_origin))  # type:ignore[call-arg]
            else:
                raise TrestleError(
                    f'Parameter value origin property for parameter {param_dict["id"]}'
                    'is None and it should have a value'
                )
        if const.ALT_IDENTIFIER in param_dict:
            # removing alt-identifier as this is prop just informative in markdown
            param_dict.pop(const.ALT_IDENTIFIER)

        if 'ns' in param_dict:
            param_dict.pop('ns')

        # Choose Parameter1 (with values) or Parameter2 (with select) based on which field is present
        # Remove the field that doesn't belong to the chosen variant
        if 'select' in param_dict and param_dict.get('select') is not None:
            # Creating Parameter2 - remove values if present
            param_dict.pop('values', None)
            param = common.Parameter2(**param_dict)
        else:
            # Creating Parameter1 - remove select if present
            param_dict.pop('select', None)
            param = common.Parameter1(**param_dict)

        param.props = none_if_empty(props)
        return param

    @staticmethod
    def last_modified_at_time(timestamp: Optional[datetime] = None) -> datetime:
        """Generate a LastModified set to timestamp or now."""
        timestamp = timestamp if timestamp else datetime.now().astimezone()
        return timestamp

    @staticmethod
    def update_last_modified(model: TopLevelOscalModel, timestamp: Optional[datetime] = None) -> None:
        """Update the LastModified timestamp in top level model to now."""
        timestamp = timestamp if timestamp else datetime.now().astimezone()
        model.metadata.last_modified = timestamp

    @staticmethod
    def model_age(model: TopLevelOscalModel) -> int:
        """Find time in seconds since LastModified timestamp."""
        # default to one year if no last_modified
        age_seconds = const.DAY_SECONDS * 365
        if model.metadata.last_modified:
            dt = datetime.now().astimezone() - model.metadata.last_modified
            age_seconds = int(dt.total_seconds())
        return age_seconds

    @staticmethod
    def find_values_by_name(object_of_interest: Any, name_of_interest: str) -> List[Any]:
        """Traverse object and return list of values of specified name."""
        loe = []
        if isinstance(object_of_interest, BaseModel):
            value = getattr(object_of_interest, name_of_interest, None)
            if value is not None:
                loe.append(value)
            fields = getattr(object_of_interest, const.FIELDS_SET, None)
            if fields is not None:
                for field in fields:
                    loe.extend(
                        ModelUtils.find_values_by_name(getattr(object_of_interest, field, None), name_of_interest)
                    )
        elif type(object_of_interest) is list:
            for item in object_of_interest:
                loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
        elif type(object_of_interest) is dict:
            if name_of_interest in object_of_interest:
                loe.append(object_of_interest[name_of_interest])
            for item in object_of_interest.values():
                loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
        return loe

    @staticmethod
    def has_no_duplicate_values_by_name(object_of_interest: BaseModel, name_of_interest: str) -> bool:
        """Determine if duplicate values of type exist in object."""
        loe = ModelUtils.find_values_by_name(object_of_interest, name_of_interest)
        set_loe = set(loe)
        if len(loe) == len(set_loe):
            return True
        items: Dict[str, Any] = {}
        for item in loe:
            items[item] = items.get(item, 0) + 1
        # now print items
        for item, instances in items.items():
            if instances > 1:
                logger.warning(f'Duplicate detected of item {item} with {instances} instances.')
        return False

    @staticmethod
    def find_uuid_refs(object_of_interest: BaseModel) -> Set[str]:
        """Find uuid references made in prose and links."""
        # hrefs have form #foo or #uuid
        uuid_strs = ModelUtils.find_values_by_name(object_of_interest, 'href')

        # prose has uuid refs in markdown form: [foo](#bar) or [foo](#uuid)
        prose_list = ModelUtils.find_values_by_name(object_of_interest, 'prose')
        for prose in prose_list:
            matches = re.findall(const.MARKDOWN_URL_REGEX, prose)
            # the [1] is to extract the inner of 3 capture patterns
            new_uuids = [match[1] for match in matches]
            uuid_strs.extend(new_uuids)

        # collect the strings that start with # and are potential uuids
        uuid_strs = [uuid_str for uuid_str in uuid_strs if uuid_str and uuid_str[0] == '#']

        # go through all matches and build set of those that are uuids
        uuid_set = {uuid_match for uuid_str in uuid_strs for uuid_match in re.findall(const.UUID_REGEX, uuid_str[1:])}
        return uuid_set

    @staticmethod
    def _regenerate_uuids_in_place(object_of_interest: Any, uuid_lut: Dict[str, str]) -> Tuple[Any, Dict[str, str]]:
        """Update all uuids in model that require updating.

        Go through the model and replace all dicts with key == 'uuid' and replace the value with a new uuid4.
        Build a lookup table of the updates that were made.
        This function does not update the corresponding refs to those uuid's.  That is done by update_uuid_refs
        Note that this function needs to be started off with uuid_lut == {}, i.e. an empty dict.
        After that it recurses and grows the lut.

        Args:
            object_of_interest: pydantic.BaseModel, list, dict or str will be updated
            uuid_lut: dict of the growing lut of old:new uuid's.  First call must be made with value {}

        Returns:
            The updated object_of_interest with new uuid's (but refs to them are not updated)
            The final lookup table of old:new uuid's

        """
        uuid_str = 'uuid'
        # Certain types are known not to need updating and should not change
        # Resources are identified by uuid, and the corresponding href will have # in front of the uuid string
        # Neither of these should change
        # If other similar types are found they should be added to the FixedUuidModel typevar to prevent updating
        if isinstance(object_of_interest, common.Resource):
            pass
        elif isinstance(object_of_interest, BaseModel):
            # fields has names of all fields in model
            fields = getattr(object_of_interest, const.FIELDS_SET, None)
            for field in fields:
                new_object = None
                if field == uuid_str:
                    orig_uuid = getattr(object_of_interest, field)
                    if orig_uuid:
                        new_object = str(uuid.uuid4())
                        uuid_lut[orig_uuid] = new_object
                else:
                    new_object, uuid_lut = ModelUtils._regenerate_uuids_in_place(
                        object_of_interest.__dict__[field], uuid_lut
                    )
                object_of_interest.__dict__[field] = new_object
        elif type(object_of_interest) is list:
            new_list = []
            for item in object_of_interest:
                new_item, uuid_lut = ModelUtils._regenerate_uuids_in_place(item, uuid_lut)
                new_list.append(new_item)
            object_of_interest = new_list
        elif type(object_of_interest) is dict:
            new_dict = {}
            for key, value in object_of_interest.items():
                if key == uuid_str:
                    new_val = str(uuid.uuid4())
                    new_dict[uuid_str] = new_val
                    uuid_lut[value] = new_val
                else:
                    new_value, uuid_lut = ModelUtils._regenerate_uuids_in_place(value, uuid_lut)
                    new_dict[key] = new_value
            object_of_interest = new_dict
        return object_of_interest, uuid_lut

    @staticmethod
    def _update_new_uuid_refs(object_of_interest: Any, uuid_lut: Dict[str, str]) -> Tuple[Any, int]:
        """Update all refs to uuids that were changed."""
        n_refs_updated = 0
        if isinstance(object_of_interest, BaseModel):
            fields = getattr(object_of_interest, const.FIELDS_SET, None)
            for field in fields:
                new_object, n_new_updates = ModelUtils._update_new_uuid_refs(
                    object_of_interest.__dict__[field], uuid_lut
                )
                n_refs_updated += n_new_updates
                object_of_interest.__dict__[field] = new_object
        elif type(object_of_interest) is list:
            new_list = []
            for item in object_of_interest:
                new_item, n_new_updates = ModelUtils._update_new_uuid_refs(item, uuid_lut)
                n_refs_updated += n_new_updates
                new_list.append(new_item)
            object_of_interest = new_list
        elif type(object_of_interest) is dict:
            new_dict = {}
            for key, value in object_of_interest.items():
                if isinstance(value, str):
                    if value in uuid_lut:
                        new_dict[key] = uuid_lut[value]
                        n_refs_updated += 1
                    else:
                        new_dict[key] = value
                else:
                    new_value, n_new_updates = ModelUtils._update_new_uuid_refs(value, uuid_lut)
                    n_refs_updated += n_new_updates
                    new_dict[key] = new_value
            object_of_interest = new_dict
        elif isinstance(object_of_interest, str):
            if object_of_interest in uuid_lut:
                n_refs_updated += 1
                object_of_interest = uuid_lut[object_of_interest]
        return object_of_interest, n_refs_updated

    @staticmethod
    def regenerate_uuids(object_of_interest: Any) -> Tuple[Any, Dict[str, str], int]:
        """Regenerate all uuids in object and update corresponding references.

        Find all dicts with key == 'uuid' and replace the value with a new uuid4.
        Build a corresponding lookup table as you go, of old:new uuid values.
        Then make a second pass through the object and replace all string values
        present in the lookup table with the new value.

        Args:
            object_of_interest: pydantic.BaseModel, list, dict or str will be updated

        Returns:
            The updated object with new uuid's and refs
            The final lookup table of old:new uuid's
            A count of the number of refs that were updated
        """
        new_object, uuid_lut = ModelUtils._regenerate_uuids_in_place(object_of_interest, {})
        new_object, n_refs_updated = ModelUtils._update_new_uuid_refs(new_object, uuid_lut)
        return new_object, uuid_lut, n_refs_updated

    @staticmethod
    def fields_set_non_none(obj: BaseModel) -> Set[str]:
        """Find the fields set with Nones and empty items removed."""
        return set(as_filtered_list(list(obj.__fields_set__), lambda f: getattr(obj, f)))

    @staticmethod
    def _objects_differ(
        obj_a: Any, obj_b: Any, ignore_type_list: List[Any], ignore_name_list: List[str], ignore_all_uuid: bool
    ) -> bool:
        """
        Compare two objects with option to ignore given types.

        This does not check for tuples or other structures that won't be found in JSON.
        """
        from enum import Enum

        obj_a_type = type(obj_a)
        obj_b_type = type(obj_b)

        # Check if both are falsy
        if bool(obj_a) != bool(obj_b):
            return True

        # For dynamically created wrapper classes (like Components, Props), compare by class name
        # These are created on-the-fly and may have different type identities but same structure
        if obj_a_type != obj_b_type:
            # Handle enum vs string comparison (enums get converted to strings after JSON round-trip)
            if isinstance(obj_a, Enum) and obj_b_type is str:
                # Compare enum value with string
                return obj_a.value != obj_b
            elif isinstance(obj_b, Enum) and obj_a_type is str:
                # Compare string with enum value
                return obj_a != obj_b.value
            # Handle __root__ wrapper vs enum comparison
            elif hasattr(obj_a, '__root__') and isinstance(obj_b, Enum):
                return obj_a.__root__ != obj_b.value
            elif hasattr(obj_b, '__root__') and isinstance(obj_a, Enum):
                return obj_a.value != obj_b.__root__
            # If both are BaseModel instances with same class name, treat as equivalent
            elif isinstance(obj_a, BaseModel) and isinstance(obj_b, BaseModel):
                if obj_a_type.__name__ == obj_b_type.__name__:
                    # Same class name, continue with field comparison below
                    pass
                else:
                    return True
            else:
                return True
        if not bool(obj_a):
            return False
        if obj_a_type in ignore_type_list:
            return False
        if obj_a_type is str:
            return obj_a != obj_b
        elif isinstance(obj_a, BaseModel):
            fields_a = ModelUtils.fields_set_non_none(obj_a)
            fields_b = ModelUtils.fields_set_non_none(obj_b)
            if fields_a != fields_b:
                return True
            for field in list_utils.as_filtered_list(fields_a, lambda f: f not in ignore_name_list):  # type: ignore
                if ignore_all_uuid and 'uuid' in field:
                    continue
                if ModelUtils._objects_differ(
                    getattr(obj_a, field), getattr(obj_b, field), ignore_type_list, ignore_name_list, ignore_all_uuid
                ):
                    return True
        elif obj_a_type is list:
            if len(obj_a) != len(obj_b):
                return True
            for item_a, item_b in zip(obj_a, obj_b, strict=True):
                if ModelUtils._objects_differ(item_a, item_b, ignore_type_list, ignore_name_list, ignore_all_uuid):
                    return True
        elif obj_a_type is dict:
            if obj_a.keys() != obj_b.keys():
                return True
            for key, val in obj_a.items():
                if ignore_all_uuid and 'uuid' in key:
                    continue
                if key not in ignore_name_list and ModelUtils._objects_differ(
                    val, obj_b[key], ignore_type_list, ignore_name_list, ignore_all_uuid
                ):
                    return True
        elif obj_a != obj_b:
            return True
        return False

    @staticmethod
    def models_are_equivalent(
        model_a: Optional[TopLevelOscalModel], model_b: Optional[TopLevelOscalModel], ignore_all_uuid: bool = False
    ) -> bool:
        """
        Test if models are equivalent except for last modified and possibly uuid.

        If a model has had uuids regenerated, then all uuids *and references to them* are updated.  This means that
        special handling is required if a model has had uuids regenerated - when checking equivalence.
        """
        uuid_type_list = [
            common.LastModified,
            common.LocationUuid,
            common.PartyUuid,
            assessment_plan.RelatedObservation,
            assessment_results.RelatedObservation,
            poam.RelatedObservation,
            poam.RelatedObservation1,
        ]
        type_list = uuid_type_list if ignore_all_uuid else [common.LastModified]
        return not ModelUtils._objects_differ(model_a, model_b, type_list, ['last_modified'], ignore_all_uuid)

    @staticmethod
    def get_title_from_model_uri(trestle_root: pathlib.Path, uri: str) -> str:
        """Get title from model at uri."""
        try:
            fetcher = cache.FetcherFactory.get_fetcher(trestle_root, uri)
            model, _ = fetcher.get_oscal()
            return model.metadata.title
        except TrestleError as e:
            logger.warning(f'Error finding title for model at uri {uri}: {e}')
            raise
Functions¤
dict_to_parameter(param_dict) staticmethod ¤

Convert dict with only string values to Parameter with handling for HowMany and with validity checks.

Parameters:

Name Type Description Default
param_dict Dict[str, Any]

Dictionary of pure string values representing Parameter contents

required

Returns:

Type Description
Parameter

A valid OSCAL Parameter

Notes

This handles both partial and full parameter dictionaries It checks for validity of the values if a select and HowMany is specified There is special handling for values: If it is a single string it is converted to list of one ParameterValue But if it is a list of strings is regarded as a list of values and is converted to a list of ParameterValues

Source code in trestle/common/model_utils.py
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
@staticmethod
def dict_to_parameter(param_dict: Dict[str, Any]) -> common.Parameter:
    """
    Convert dict with only string values to Parameter with handling for HowMany and with validity checks.

    Args:
        param_dict: Dictionary of pure string values representing Parameter contents

    Returns:
        A valid OSCAL Parameter

    Notes:
        This handles both partial and full parameter dictionaries
        It checks for validity of the values if a select and HowMany is specified
        There is special handling for values: If it is a single string it is converted to list of one ParameterValue
        But if it is a list of strings is regarded as a list of values and is converted to a list of ParameterValues
    """
    values = param_dict.get('values', [])
    # special handling when only one value present - convert to list of 1
    if isinstance(values, str):
        values = [values]
        param_dict['values'] = values
    if 'select' in param_dict and 'how_many' in param_dict['select']:
        count_str = param_dict['select']['how_many']
        how_many = ModelUtils._string_to_howmany(count_str)
        if how_many is None:
            raise TrestleError(f'Unrecognized HowMany value {how_many} in Parameter: should be one-or-more or one.')
        param_dict['select']['how_many'] = how_many
        if how_many == const.ONE and len(values) > 1:
            logger.warning(f'Parameter specifies HowMany=1 but has {len(values)} values given.')
        choices = param_dict['select'].get('choice', [])
        if choices and values:
            for value in values:
                if value not in choices:
                    logger.warning(f'Parameter {param_dict["id"]} has value "{value}" not in choices: {choices}.')
    props = param_dict.get('props', [])
    if const.DISPLAY_NAME in param_dict:
        display_name = param_dict.pop(const.DISPLAY_NAME)
        props.append(common.Property(name=const.DISPLAY_NAME, value=display_name, ns=const.TRESTLE_GENERIC_NS))  # type: ignore[call-arg]
    if const.AGGREGATES in param_dict:
        # removing aggregates as this is prop just informative in markdown
        param_dict.pop(const.AGGREGATES)
    param_value_origin = None
    if const.PARAM_VALUE_ORIGIN in param_dict:
        param_value_origin = param_dict.pop(const.PARAM_VALUE_ORIGIN)
        if param_value_origin is not None:
            props.append(common.Property(name=const.PARAM_VALUE_ORIGIN, value=param_value_origin))  # type:ignore[call-arg]
        else:
            raise TrestleError(
                f'Parameter value origin property for parameter {param_dict["id"]}'
                'is None and it should have a value'
            )
    if const.ALT_IDENTIFIER in param_dict:
        # removing alt-identifier as this is prop just informative in markdown
        param_dict.pop(const.ALT_IDENTIFIER)

    if 'ns' in param_dict:
        param_dict.pop('ns')

    # Choose Parameter1 (with values) or Parameter2 (with select) based on which field is present
    # Remove the field that doesn't belong to the chosen variant
    if 'select' in param_dict and param_dict.get('select') is not None:
        # Creating Parameter2 - remove values if present
        param_dict.pop('values', None)
        param = common.Parameter2(**param_dict)
    else:
        # Creating Parameter1 - remove select if present
        param_dict.pop('select', None)
        param = common.Parameter1(**param_dict)

    param.props = none_if_empty(props)
    return param
fields_set_non_none(obj) staticmethod ¤

Find the fields set with Nones and empty items removed.

Source code in trestle/common/model_utils.py
1023
1024
1025
1026
@staticmethod
def fields_set_non_none(obj: BaseModel) -> Set[str]:
    """Find the fields set with Nones and empty items removed."""
    return set(as_filtered_list(list(obj.__fields_set__), lambda f: getattr(obj, f)))
find_uuid_refs(object_of_interest) staticmethod ¤

Find uuid references made in prose and links.

Source code in trestle/common/model_utils.py
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
@staticmethod
def find_uuid_refs(object_of_interest: BaseModel) -> Set[str]:
    """Find uuid references made in prose and links."""
    # hrefs have form #foo or #uuid
    uuid_strs = ModelUtils.find_values_by_name(object_of_interest, 'href')

    # prose has uuid refs in markdown form: [foo](#bar) or [foo](#uuid)
    prose_list = ModelUtils.find_values_by_name(object_of_interest, 'prose')
    for prose in prose_list:
        matches = re.findall(const.MARKDOWN_URL_REGEX, prose)
        # the [1] is to extract the inner of 3 capture patterns
        new_uuids = [match[1] for match in matches]
        uuid_strs.extend(new_uuids)

    # collect the strings that start with # and are potential uuids
    uuid_strs = [uuid_str for uuid_str in uuid_strs if uuid_str and uuid_str[0] == '#']

    # go through all matches and build set of those that are uuids
    uuid_set = {uuid_match for uuid_str in uuid_strs for uuid_match in re.findall(const.UUID_REGEX, uuid_str[1:])}
    return uuid_set
find_values_by_name(object_of_interest, name_of_interest) staticmethod ¤

Traverse object and return list of values of specified name.

Source code in trestle/common/model_utils.py
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
@staticmethod
def find_values_by_name(object_of_interest: Any, name_of_interest: str) -> List[Any]:
    """Traverse object and return list of values of specified name."""
    loe = []
    if isinstance(object_of_interest, BaseModel):
        value = getattr(object_of_interest, name_of_interest, None)
        if value is not None:
            loe.append(value)
        fields = getattr(object_of_interest, const.FIELDS_SET, None)
        if fields is not None:
            for field in fields:
                loe.extend(
                    ModelUtils.find_values_by_name(getattr(object_of_interest, field, None), name_of_interest)
                )
    elif type(object_of_interest) is list:
        for item in object_of_interest:
            loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
    elif type(object_of_interest) is dict:
        if name_of_interest in object_of_interest:
            loe.append(object_of_interest[name_of_interest])
        for item in object_of_interest.values():
            loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
    return loe
get_all_models(root) staticmethod ¤

Get list of all models in trestle directory as tuples (model_type, model_name).

Source code in trestle/common/model_utils.py
481
482
483
484
485
486
487
488
489
@staticmethod
def get_all_models(root: pathlib.Path) -> List[Tuple[str, str]]:
    """Get list of all models in trestle directory as tuples (model_type, model_name)."""
    full_list = []
    for model_type in const.MODEL_TYPE_LIST:
        models = ModelUtils.get_models_of_type(model_type, root)
        for m in models:
            full_list.append((model_type, m))
    return full_list
get_model_path_for_name_and_class(trestle_root, model_name, model_class, file_content_type=None) staticmethod ¤

Find the full path of a model given its name, model type and file content type.

If file_content_type is given it will not inspect the file system or confirm the needed path and file exists.

Source code in trestle/common/model_utils.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
@staticmethod
def get_model_path_for_name_and_class(
    trestle_root: pathlib.Path,
    model_name: str,
    model_class: Type[TopLevelOscalModel],
    file_content_type: Optional[FileContentType] = None,
) -> Optional[pathlib.Path]:
    """
    Find the full path of a model given its name, model type and file content type.

    If file_content_type is given it will not inspect the file system or confirm the needed path and file exists.
    """
    if file_content_type is None:
        root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
        file_content_type = FileContentType.path_to_content_type(root_model_path)
        if not FileContentType.is_readable_file(file_content_type):
            return None

        return root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))

    root_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
    return root_path.with_suffix(FileContentType.to_file_extension(file_content_type))
get_models_of_type(model_type, root) staticmethod ¤

Get list of model names for requested type in trestle directory.

Source code in trestle/common/model_utils.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
@staticmethod
def get_models_of_type(model_type: str, root: pathlib.Path) -> List[str]:
    """Get list of model names for requested type in trestle directory."""
    if model_type not in const.MODEL_TYPE_LIST:
        raise err.TrestleError(f'Model type {model_type} is not supported')
    # search relative to project root
    trestle_root = extract_trestle_project_root(root)
    if not trestle_root:
        logger.error(f'Given directory {root} is not within a trestle project.')
        raise err.TrestleError('Given directory is not within a trestle project.')

    # contruct path to the model file name
    model_dir_name = ModelUtils.model_type_to_model_dir(model_type)
    root_model_dir = trestle_root / model_dir_name
    model_list = []
    for f in root_model_dir.glob('*/'):
        # Use f.name for directories to preserve full directory name including dots
        # f.stem is for files and incorrectly treats dots as file extensions
        dir_name = f.name
        if not ModelUtils._should_ignore(dir_name):
            if not f.is_dir():
                logger.warning(
                    f'Ignoring validation of misplaced file {dir_name} '
                    + f'found in the model directory, {model_dir_name}.'
                )
            else:
                model_list.append(dir_name)
    return model_list
get_relative_model_type(relative_path) staticmethod ¤

Given the relative path of a file with respect to 'trestle_root' return the oscal model type.

Parameters:

Name Type Description Default
relative_path Path

Relative path of the model with respect to the root directory of the trestle workspace.

required

Returns: Type of Oscal Model for the provided model Alias of that oscal model.

Source code in trestle/common/model_utils.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@staticmethod
def get_relative_model_type(relative_path: pathlib.Path) -> Tuple[Type[OscalBaseModel], str]:
    """
    Given the relative path of a file with respect to 'trestle_root' return the oscal model type.

    Args:
        relative_path: Relative path of the model with respect to the root directory of the trestle workspace.
    Returns:
        Type of Oscal Model for the provided model
        Alias of that oscal model.
    """
    if len(relative_path.parts) < 2:
        raise TrestleError(
            'Insufficient path length to be a valid relative path w.r.t trestle workspace root directory.'
        )
    model_dir = relative_path.parts[0]
    model_relative_path = pathlib.Path(*relative_path.parts[2:])  # catalogs, profiles, etc

    if model_dir in const.MODEL_DIR_LIST:
        module_name = const.MODEL_DIR_TO_MODEL_MODULE[model_dir]
    else:
        raise TrestleError(f'No valid trestle model type directory (e.g. catalogs) found for {model_dir}.')

    model_type, model_alias = ModelUtils.get_root_model(module_name)
    full_alias = model_alias

    for index, part in enumerate(model_relative_path.parts):
        alias = ModelUtils._extract_alias(part)
        if index > 0 or model_alias != alias:
            model_alias = alias
            full_alias = f'{full_alias}.{model_alias}'
            if utils.is_collection_field_type(model_type):
                model_type = utils.get_inner_type(model_type)
            else:
                model_type = _get_model_type_from_union(model_type, alias)
                model_type = model_type.alias_to_field_map()[alias].outer_type_

    return model_type, full_alias
get_root_model(module_name) staticmethod ¤

Get the root model class and alias based on the module.

Source code in trestle/common/model_utils.py
617
618
619
620
621
622
623
624
625
626
627
628
@staticmethod
def get_root_model(module_name: str) -> Tuple[Type[Any], str]:
    """Get the root model class and alias based on the module."""
    try:
        module = importlib.import_module(module_name)
    except ModuleNotFoundError as e:
        raise err.TrestleError(str(e))

    if hasattr(module, 'Model'):
        model_metadata = next(iter(module.Model.__fields__.values()))
        return model_metadata.type_, model_metadata.alias
    raise err.TrestleError('Invalid module')
get_singular_alias(alias_path, relative_path=None) staticmethod ¤

Get the alias in the singular form from a jsonpath.

If contextual_mode is True and contextual_path is None, it assumes alias_path is relative to the directory the user is running trestle from.

Parameters:

Name Type Description Default
alias_path str

The current alias element path as a string

required
relative_path Optional[Path]

Optional relative path (w.r.t. trestle_root) to cater for relative element paths.

None

Returns: Alias as a string

Source code in trestle/common/model_utils.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
@staticmethod
def get_singular_alias(alias_path: str, relative_path: Optional[pathlib.Path] = None) -> str:
    """
    Get the alias in the singular form from a jsonpath.

    If contextual_mode is True and contextual_path is None, it assumes alias_path
    is relative to the directory the user is running trestle from.

    Args:
        alias_path: The current alias element path as a string
        relative_path: Optional relative path (w.r.t. trestle_root) to cater for relative element paths.
    Returns:
        Alias as a string
    """
    if len(alias_path.strip()) == 0:
        raise err.TrestleError(f'Invalid jsonpath {alias_path}')

    singular_alias: str = ''

    full_alias_path = alias_path
    if relative_path:
        logger.debug(f'get_singular_alias contextual mode: {str}')
        _, full_model_alias = ModelUtils.get_relative_model_type(relative_path)
        first_alias_a = full_model_alias.split('.')[-1]
        first_alias_b = alias_path.split('.')[0]
        if first_alias_a == first_alias_b:
            full_model_alias = '.'.join(full_model_alias.split('.')[:-1])
        full_alias_path = '.'.join([full_model_alias, alias_path]).strip('.')

    path_parts = full_alias_path.split(const.ALIAS_PATH_SEPARATOR)
    logger.debug(f'path parts: {path_parts}')

    model_types = []

    root_model_alias = path_parts[0]
    found = False
    for module_name in const.MODEL_TYPE_TO_MODEL_MODULE.values():
        model_type, model_alias = ModelUtils.get_root_model(module_name)
        if root_model_alias == model_alias:
            found = True
            model_types.append(model_type)
            break

    if not found:
        raise err.TrestleError(f'{root_model_alias} is an invalid root model alias.')

    if len(path_parts) == 1:
        return root_model_alias

    model_type = model_types[0]
    # go through path parts skipping first one
    for i in range(1, len(path_parts)):
        if utils.is_collection_field_type(model_type):
            # if it is a collection type and last part is * then break
            if i == len(path_parts) - 1 and path_parts[i] == '*':
                break
            # otherwise get the inner type of items in the collection
            model_type = utils.get_inner_type(model_type)
            # and bump i
            i = i + 1
        else:
            path_part = path_parts[i]
            model_type = _get_model_type_from_union(model_type, path_part)
            field_map = model_type.alias_to_field_map()
            if path_part not in field_map:
                continue
            field = field_map[path_part]
            model_type = field.outer_type_
        model_types.append(model_type)

    last_alias = path_parts[-1]
    if last_alias == '*':
        last_alias = path_parts[-2]

    # generic model and not list, so return itself fixme doc
    if not utils.is_collection_field_type(model_type):
        return last_alias

    parent_model_type = model_types[-2]
    try:
        parent_model_type = _get_model_type_from_union(parent_model_type, last_alias)
        field_map = parent_model_type.alias_to_field_map()
        field = field_map[last_alias]
        outer_type = field.outer_type_
        inner_type = utils.get_inner_type(outer_type)

        # Handle Union types - if inner_type is a Union, get the first non-None type
        origin = utils.get_origin(inner_type)
        if origin == Union or str(origin) == "<class 'types.UnionType'>":
            union_args = get_args(inner_type)
            # Find first non-None type in the union
            for arg in union_args:
                if arg is not type(None):
                    inner_type = arg
                    break

        inner_type_name = inner_type.__name__
        singular_alias = str_utils.classname_to_alias(inner_type_name, AliasMode.JSON)
    except Exception as e:
        raise err.TrestleError(f'Error in json path {alias_path}: {e}')

    return singular_alias
get_stripped_model_type(absolute_path, absolute_trestle_root, aliases_not_to_be_stripped=None) staticmethod ¤

Get the stripped contextual model class and alias based on the contextual path.

This function relies on the directory structure of the trestle model being edited to determine, based on the existing files and folder, which fields should be stripped from the model type represented by the path passed in as a parameter.

Source code in trestle/common/model_utils.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
@staticmethod
def get_stripped_model_type(
    absolute_path: pathlib.Path, absolute_trestle_root: pathlib.Path, aliases_not_to_be_stripped: List[str] = None
) -> Tuple[Type[OscalBaseModel], str]:
    """
    Get the stripped contextual model class and alias based on the contextual path.

    This function relies on the directory structure of the trestle model being edited to determine, based on the
    existing files and folder, which fields should be stripped from the model type represented by the
    path passed in as a parameter.
    """
    if aliases_not_to_be_stripped is None:
        aliases_not_to_be_stripped = []
    singular_model_type, model_alias = ModelUtils.get_relative_model_type(
        absolute_path.relative_to(absolute_trestle_root)
    )
    logger.debug(f'singular model type {singular_model_type} model alias {model_alias}')

    # Stripped models do not apply to collection types such as List[] and Dict{}
    # if model type is a list or dict, generate a new wrapping model for it
    if utils.is_collection_field_type(singular_model_type):
        malias = model_alias.split('.')[-1]
        class_name = alias_to_classname(malias, AliasMode.JSON)
        logger.debug(f'collection field type class name {class_name} and alias {malias}')
        model_type = create_model(class_name, __base__=OscalBaseModel, __root__=(singular_model_type, ...))
        logger.debug(f'model_type created: {model_type}')
        return model_type, model_alias

    malias = model_alias.split('.')[-1]
    logger.debug(f'not collection field type, malias: {malias}')

    # Check if this is a Union type FIRST, before stripping logic
    origin = get_origin(singular_model_type)
    is_union = origin is Union or (hasattr(types, 'UnionType') and origin is types.UnionType)

    if absolute_path.is_dir() and malias != ModelUtils._extract_alias(absolute_path.name):
        split_subdir = absolute_path / malias
    else:
        split_subdir = absolute_path.parent / absolute_path.with_suffix('').name

    aliases_to_be_stripped = set()
    if split_subdir.exists():
        for f in iterdir_without_hidden_files(split_subdir):
            alias = ModelUtils._extract_alias(f.name)
            if alias not in aliases_not_to_be_stripped:
                aliases_to_be_stripped.add(alias)

    logger.debug(f'aliases to be stripped: {aliases_to_be_stripped}')

    # For Union types, use subdirectories to SELECT variant, not strip fields
    if is_union and len(aliases_to_be_stripped) > 0:
        # Use the first subdirectory name as a hint for which Union variant to use
        field_hint = next(iter(aliases_to_be_stripped))
        logger.debug(f'Union type: using field hint "{field_hint}" to select variant')
        singular_model_type = _get_model_type_from_union(singular_model_type, field_hint)
        # Now proceed with stripping for the selected variant
        model_type = singular_model_type.create_stripped_model_type(
            stripped_fields_aliases=list(aliases_to_be_stripped)
        )
        logger.debug(f'model_type: {model_type}')
        return model_type, model_alias
    elif len(aliases_to_be_stripped) > 0:
        # Non-Union type: normal stripping logic
        model_type = singular_model_type.create_stripped_model_type(
            stripped_fields_aliases=list(aliases_to_be_stripped)
        )
        logger.debug(f'model_type: {model_type}')
        return model_type, model_alias
    # Handle Union types even when no stripping is needed
    origin = get_origin(singular_model_type)
    if origin is Union or (hasattr(types, 'UnionType') and origin is types.UnionType):
        # Check if there are subdirectories that indicate which variant to use
        # If absolute_path is a directory, look inside it; otherwise look in parent
        if absolute_path.is_dir():
            split_subdir = absolute_path
        else:
            split_subdir = absolute_path.parent / absolute_path.with_suffix('').name
        field_hint = None
        if split_subdir.exists() and split_subdir.is_dir():
            # Check what subdirectories exist to determine which Union variant
            for item in split_subdir.iterdir():
                if item.is_dir():
                    # Use the subdirectory name as a hint for which field exists
                    # controls -> catalog Group2, insert-controls -> profile Group2, groups -> Group1
                    field_hint = item.name
                    logger.debug(f'Using subdirectory {field_hint} to select Union variant')
                    break

        # If we have a field hint from subdirectories, resolve to specific variant
        # Otherwise, wrap the Union for file reading (smart validators will choose)
        if field_hint:
            singular_model_type = _get_model_type_from_union(singular_model_type, field_hint)
        else:
            # No subdirectory hint - wrap Union for reading
            # This allows smart validators to choose the correct variant at deserialization time
            malias = model_alias.split('.')[-1]
            class_name = alias_to_classname(malias, AliasMode.JSON)
            logger.debug(f'Wrapping Union type {singular_model_type} in __root__ model')
            model_type = create_model(class_name, __base__=OscalBaseModel, __root__=(singular_model_type, ...))
            return model_type, model_alias
    else:
        singular_model_type = _get_model_type_from_union(singular_model_type)
    return singular_model_type, model_alias
get_title_from_model_uri(trestle_root, uri) staticmethod ¤

Get title from model at uri.

Source code in trestle/common/model_utils.py
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
@staticmethod
def get_title_from_model_uri(trestle_root: pathlib.Path, uri: str) -> str:
    """Get title from model at uri."""
    try:
        fetcher = cache.FetcherFactory.get_fetcher(trestle_root, uri)
        model, _ = fetcher.get_oscal()
        return model.metadata.title
    except TrestleError as e:
        logger.warning(f'Error finding title for model at uri {uri}: {e}')
        raise
has_no_duplicate_values_by_name(object_of_interest, name_of_interest) staticmethod ¤

Determine if duplicate values of type exist in object.

Source code in trestle/common/model_utils.py
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
@staticmethod
def has_no_duplicate_values_by_name(object_of_interest: BaseModel, name_of_interest: str) -> bool:
    """Determine if duplicate values of type exist in object."""
    loe = ModelUtils.find_values_by_name(object_of_interest, name_of_interest)
    set_loe = set(loe)
    if len(loe) == len(set_loe):
        return True
    items: Dict[str, Any] = {}
    for item in loe:
        items[item] = items.get(item, 0) + 1
    # now print items
    for item, instances in items.items():
        if instances > 1:
            logger.warning(f'Duplicate detected of item {item} with {instances} instances.')
    return False
last_modified_at_time(timestamp=None) staticmethod ¤

Generate a LastModified set to timestamp or now.

Source code in trestle/common/model_utils.py
820
821
822
823
824
@staticmethod
def last_modified_at_time(timestamp: Optional[datetime] = None) -> datetime:
    """Generate a LastModified set to timestamp or now."""
    timestamp = timestamp if timestamp else datetime.now().astimezone()
    return timestamp
load_distributed(abs_path, abs_trestle_root, collection_type=None) staticmethod ¤

Given path to a model, load the model.

If the model is decomposed/split/distributed,the decomposed models are loaded recursively.

Parameters:

Name Type Description Default
abs_path Path

The path to the file/directory to be loaded.

required
abs_trestle_root Path

The trestle workspace root directory.

required
collection_type Optional[Type[Any]]

The type of collection model, if it is a collection model. typing.List is the only collection type handled or expected. Defaults to None.

None

Returns:

Type Description
Type[OscalBaseModel]

Return a tuple of Model Type (e.g. class 'trestle.oscal.catalog.Catalog'),

str

Model Alias (e.g. 'catalog.metadata') and Instance of the Model.

Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]]

If the model is decomposed/split/distributed, the instance of the model contains the decomposed models loaded recursively.

Note

This does not validate the model. You must either validate the model separately or use the load_validate utilities.

Source code in trestle/common/model_utils.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
@staticmethod
def load_distributed(
    abs_path: Path, abs_trestle_root: Path, collection_type: Optional[Type[Any]] = None
) -> Tuple[
    Type[OscalBaseModel], str, Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]]
]:
    """
    Given path to a model, load the model.

    If the model is decomposed/split/distributed,the decomposed models are loaded recursively.

    Args:
        abs_path: The path to the file/directory to be loaded.
        abs_trestle_root: The trestle workspace root directory.
        collection_type: The type of collection model, if it is a collection model.
            typing.List is the only collection type handled or expected.
            Defaults to None.

    Returns:
        Return a tuple of Model Type (e.g. class 'trestle.oscal.catalog.Catalog'),
        Model Alias (e.g. 'catalog.metadata') and Instance of the Model.
        If the model is decomposed/split/distributed, the instance of the model contains
            the decomposed models loaded recursively.

    Note:
        This does not validate the model.  You must either validate the model separately or use the load_validate
        utilities.
    """
    # if trying to load file that does not exist, load path instead
    if not abs_path.exists():
        abs_path = abs_path.with_name(abs_path.stem)

    if not abs_path.exists():
        raise TrestleNotFoundError(f'File {abs_path} not found for load.')

    if collection_type:
        # If the path contains a list type model
        if collection_type is list:
            return ModelUtils._load_list(abs_path, abs_trestle_root)
        # the only other collection type in OSCAL is dict, and it only applies to include_all,
        # which is too granular ever to be loaded by this routine
        else:
            raise TrestleError(f'Collection type {collection_type} not recognized for distributed load.')

    # Get current model
    primary_model_type, primary_model_alias = ModelUtils.get_stripped_model_type(abs_path, abs_trestle_root)
    primary_model_instance: Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]] = None

    # is this an attempt to load an actual json or yaml file?
    content_type = FileContentType.path_to_content_type(abs_path)
    # if file is sought but it doesn't exist, ignore and load as decomposed model
    if FileContentType.is_readable_file(content_type) and abs_path.exists():
        # Use the model type as-is (may be wrapped Union) for reading
        # The smart validators in generated models will choose the correct variant
        primary_model_instance = primary_model_type.oscal_read(abs_path)
        # If the instance has __root__, unwrap it to get the actual model
        if hasattr(primary_model_instance, '__root__'):
            root_val = primary_model_instance.__root__
            # Only unwrap if it's a single OscalBaseModel, not a list
            if isinstance(root_val, OscalBaseModel):
                primary_model_instance = root_val
    # Is model decomposed?
    decomposed_dir = abs_path.with_name(abs_path.stem)

    if decomposed_dir.exists():
        aliases_not_to_be_stripped = []
        instances_to_be_merged: List[OscalBaseModel] = []

        for local_path in sorted(trestle.common.file_utils.iterdir_without_hidden_files(decomposed_dir)):
            if local_path.is_file():
                model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path, abs_trestle_root)
                aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                instances_to_be_merged.append(model_instance)

            elif local_path.is_dir():
                model_type, model_alias = ModelUtils.get_stripped_model_type(local_path, abs_trestle_root)
                # Only load the directory if it is a collection model. Otherwise do nothing - it gets loaded when
                # iterating over the model file

                # If a model is just a container for a list e.g.
                # class Foo(OscalBaseModel):  noqa: E800
                #      __root__: List[Bar]    noqa: E800
                # You need to test whether first a root key exists
                # then whether the outer_type of root is a collection.
                # Alternative is to do a try except to avoid the error for an unknown key.

                if model_type.is_collection_container():
                    # This directory is a decomposed List or Dict
                    collection_type = model_type.get_collection_type()
                    model_type, model_alias, model_instance = ModelUtils.load_distributed(
                        local_path, abs_trestle_root, collection_type
                    )
                    aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                    instances_to_be_merged.append(model_instance)
        primary_model_dict = {}
        if primary_model_instance is not None:
            primary_model_dict = primary_model_instance.__dict__

        merged_model_type, merged_model_alias = ModelUtils.get_stripped_model_type(
            abs_path, abs_trestle_root, aliases_not_to_be_stripped
        )

        # The following use of top_level is to allow loading of a top level model by name only, e.g. MyCatalog
        # There may be a better overall way to approach this.
        top_level = len(merged_model_alias.split('.')) == 1

        for i in range(len(aliases_not_to_be_stripped)):
            alias = aliases_not_to_be_stripped[i]
            instance = instances_to_be_merged[i]
            if (
                hasattr(instance, '__dict__')
                and '__root__' in instance.__dict__
                and isinstance(instance, OscalBaseModel)
            ):
                instance = instance.__dict__['__root__']
            if top_level and not primary_model_dict:
                primary_model_dict = instance.__dict__
            else:
                primary_model_dict[alias] = instance

        # If merged_model_type is a wrapped Union (has __root__), we need to unwrap it
        # to get the actual model type for instantiation
        actual_model_type = merged_model_type
        if hasattr(merged_model_type, '__fields__') and '__root__' in merged_model_type.__fields__:
            # This is a wrapped Union model - extract the Union type from __root__
            root_field = merged_model_type.__fields__['__root__']
            root_type = root_field.outer_type_ if hasattr(root_field, 'outer_type_') else root_field.type_
            # Inspect primary_model_dict to determine which Union variant to use
            # Look for distinctive fields that indicate which variant
            # For Group1|Group2: 'groups' -> Group1, 'controls' -> Group2
            # For Parameter1|Parameter2: 'values' -> Parameter1, 'select' -> Parameter2
            field_hint = None
            distinctive_fields = ['controls', 'groups', 'values', 'select', 'insert-controls']
            for key in primary_model_dict.keys():
                if key in distinctive_fields:
                    field_hint = key
                    break
            # If no distinctive field found, use any field
            if field_hint is None and primary_model_dict:
                field_hint = next(iter(primary_model_dict.keys()))
            # Resolve the Union to get an actual model type based on the data
            actual_model_type = _get_model_type_from_union(root_type, field_hint)

        merged_model_instance = actual_model_type(**primary_model_dict)
        return merged_model_type, merged_model_alias, merged_model_instance
    return primary_model_type, primary_model_alias, primary_model_instance
load_model_for_class(trestle_root, model_name, model_class, file_content_type=None) staticmethod ¤

Load a model by name and model class and infer file content type if not specified.

If you need to load an existing model but its content type may not be known, use this method. But the file content type should be specified if it is somehow known.

Note

This does not validate the model. If you want to validate the model use the load_validate utilities.

Source code in trestle/common/model_utils.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
@staticmethod
def load_model_for_class(
    trestle_root: pathlib.Path,
    model_name: str,
    model_class: TG,
    file_content_type: Optional[FileContentType] = None,
) -> Tuple[TG, pathlib.Path]:
    """Load a model by name and model class and infer file content type if not specified.

    If you need to load an existing model but its content type may not be known, use this method.
    But the file content type should be specified if it is somehow known.

    Note:
        This does not validate the model.  If you want to validate the model use the load_validate utilities.
    """
    root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)  # type: ignore
    if file_content_type is None:
        file_content_type = FileContentType.path_to_content_type(root_model_path)
    if not FileContentType.is_readable_file(file_content_type):
        raise TrestleError(f'Unable to load model {model_name} without specifying json or yaml.')
    full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
    _, _, model = ModelUtils.load_distributed(full_model_path, trestle_root)
    return model, full_model_path  # type: ignore
load_model_for_type(trestle_root, model_type, model_name) staticmethod ¤

Load model for the given type and name.

Source code in trestle/common/model_utils.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
@staticmethod
def load_model_for_type(
    trestle_root: pathlib.Path, model_type: str, model_name: str
) -> Tuple[TopLevelOscalModel, pathlib.Path]:
    """Load model for the given type and name."""
    dir_name = ModelUtils.model_type_to_model_dir(model_type)
    model_path = trestle_root / dir_name / model_name

    if not model_path.exists():
        raise TrestleError(f'No model is found at path: {model_path}.')

    _, _, oscal_object = ModelUtils.load_distributed(model_path, trestle_root)

    return oscal_object, model_path  # type: ignore
model_age(model) staticmethod ¤

Find time in seconds since LastModified timestamp.

Source code in trestle/common/model_utils.py
832
833
834
835
836
837
838
839
840
@staticmethod
def model_age(model: TopLevelOscalModel) -> int:
    """Find time in seconds since LastModified timestamp."""
    # default to one year if no last_modified
    age_seconds = const.DAY_SECONDS * 365
    if model.metadata.last_modified:
        dt = datetime.now().astimezone() - model.metadata.last_modified
        age_seconds = int(dt.total_seconds())
    return age_seconds
model_type_to_model_dir(model_type) staticmethod ¤

Get plural model directory from model type.

Source code in trestle/common/model_utils.py
445
446
447
448
449
450
@staticmethod
def model_type_to_model_dir(model_type: str) -> str:
    """Get plural model directory from model type."""
    if model_type not in const.MODEL_TYPE_LIST:
        raise err.TrestleError(f'Not a valid model type: {model_type}.')
    return const.MODEL_TYPE_TO_MODEL_DIR[model_type]
models_are_equivalent(model_a, model_b, ignore_all_uuid=False) staticmethod ¤

Test if models are equivalent except for last modified and possibly uuid.

If a model has had uuids regenerated, then all uuids and references to them are updated. This means that special handling is required if a model has had uuids regenerated - when checking equivalence.

Source code in trestle/common/model_utils.py
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
@staticmethod
def models_are_equivalent(
    model_a: Optional[TopLevelOscalModel], model_b: Optional[TopLevelOscalModel], ignore_all_uuid: bool = False
) -> bool:
    """
    Test if models are equivalent except for last modified and possibly uuid.

    If a model has had uuids regenerated, then all uuids *and references to them* are updated.  This means that
    special handling is required if a model has had uuids regenerated - when checking equivalence.
    """
    uuid_type_list = [
        common.LastModified,
        common.LocationUuid,
        common.PartyUuid,
        assessment_plan.RelatedObservation,
        assessment_results.RelatedObservation,
        poam.RelatedObservation,
        poam.RelatedObservation1,
    ]
    type_list = uuid_type_list if ignore_all_uuid else [common.LastModified]
    return not ModelUtils._objects_differ(model_a, model_b, type_list, ['last_modified'], ignore_all_uuid)
parameter_to_dict(obj, partial) staticmethod ¤

Convert obj to dict containing only string values, storing only the fields that have values set.

Parameters:

Name Type Description Default
obj Union[OscalBaseModel, str]

The parameter or its consituent parts in recursive calls

required
partial bool

Whether to convert the entire param or just the parts needed for markdown header

required

Returns:

Type Description
Union[str, Dict[str, Any]]

The converted parameter as dictionary, with values as None if not present for Parameter1

Source code in trestle/common/model_utils.py
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
@staticmethod
def parameter_to_dict(obj: Union[OscalBaseModel, str], partial: bool) -> Union[str, Dict[str, Any]]:
    """
    Convert obj to dict containing only string values, storing only the fields that have values set.

    Args:
        obj: The parameter or its consituent parts in recursive calls
        partial: Whether to convert the entire param or just the parts needed for markdown header

    Returns:
        The converted parameter as dictionary, with values as None if not present for Parameter1
    """
    res = ModelUtils._parameter_to_dict_recurse(obj, partial)
    # Only add values: None for Parameter1 (which doesn't have select)
    # Parameter2 has select and should not have values field
    if 'values' not in res and 'select' not in res:
        res['values'] = None  # type: ignore
    return res
regenerate_uuids(object_of_interest) staticmethod ¤

Regenerate all uuids in object and update corresponding references.

Find all dicts with key == 'uuid' and replace the value with a new uuid4. Build a corresponding lookup table as you go, of old:new uuid values. Then make a second pass through the object and replace all string values present in the lookup table with the new value.

Parameters:

Name Type Description Default
object_of_interest Any

pydantic.BaseModel, list, dict or str will be updated

required

Returns:

Type Description
Any

The updated object with new uuid's and refs

Dict[str, str]

The final lookup table of old:new uuid's

int

A count of the number of refs that were updated

Source code in trestle/common/model_utils.py
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
@staticmethod
def regenerate_uuids(object_of_interest: Any) -> Tuple[Any, Dict[str, str], int]:
    """Regenerate all uuids in object and update corresponding references.

    Find all dicts with key == 'uuid' and replace the value with a new uuid4.
    Build a corresponding lookup table as you go, of old:new uuid values.
    Then make a second pass through the object and replace all string values
    present in the lookup table with the new value.

    Args:
        object_of_interest: pydantic.BaseModel, list, dict or str will be updated

    Returns:
        The updated object with new uuid's and refs
        The final lookup table of old:new uuid's
        A count of the number of refs that were updated
    """
    new_object, uuid_lut = ModelUtils._regenerate_uuids_in_place(object_of_interest, {})
    new_object, n_refs_updated = ModelUtils._update_new_uuid_refs(new_object, uuid_lut)
    return new_object, uuid_lut, n_refs_updated
save_top_level_model(model, trestle_root, model_name, file_content_type) staticmethod ¤

Save a model by name and infer model type by inspection.

You don't need to specify the model type (catalog, profile, etc.) but you must specify the file content type. If the model directory does not exist, it is created.

Source code in trestle/common/model_utils.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@staticmethod
def save_top_level_model(
    model: TopLevelOscalModel, trestle_root: pathlib.Path, model_name: str, file_content_type: FileContentType
) -> None:
    """Save a model by name and infer model type by inspection.

    You don't need to specify the model type (catalog, profile, etc.) but you must specify the file content type.
    If the model directory does not exist, it is created.
    """
    root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model)
    full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
    if not full_model_path.parent.exists():
        full_model_path.parent.mkdir(parents=True, exist_ok=True)
    model.oscal_write(full_model_path)
update_last_modified(model, timestamp=None) staticmethod ¤

Update the LastModified timestamp in top level model to now.

Source code in trestle/common/model_utils.py
826
827
828
829
830
@staticmethod
def update_last_modified(model: TopLevelOscalModel, timestamp: Optional[datetime] = None) -> None:
    """Update the LastModified timestamp in top level model to now."""
    timestamp = timestamp if timestamp else datetime.now().astimezone()
    model.metadata.last_modified = timestamp

Functions¤

handler: python