Bases: CommandPlusDocs
flowchart TD
trestle.core.commands.replicate.ReplicateCmd[ReplicateCmd]
trestle.core.commands.command_docs.CommandPlusDocs[CommandPlusDocs]
trestle.core.commands.command_docs.CommandBase[CommandBase]
trestle.core.commands.command_docs.CommandPlusDocs --> trestle.core.commands.replicate.ReplicateCmd
trestle.core.commands.command_docs.CommandBase --> trestle.core.commands.command_docs.CommandPlusDocs
click trestle.core.commands.replicate.ReplicateCmd href "" "trestle.core.commands.replicate.ReplicateCmd"
click trestle.core.commands.command_docs.CommandPlusDocs href "" "trestle.core.commands.command_docs.CommandPlusDocs"
click trestle.core.commands.command_docs.CommandBase href "" "trestle.core.commands.command_docs.CommandBase"
Replicate a top level model within the trestle directory structure.
Source code in trestle/core/commands/replicate.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 | class ReplicateCmd(CommandPlusDocs):
"""Replicate a top level model within the trestle directory structure."""
name = 'replicate'
def _init_arguments(self) -> None:
logger.debug('Init arguments')
self.add_argument('model', help='Choose OSCAL model', choices=const.MODEL_TYPE_LIST)
self.add_argument('-n', '--name', help='Name of model to replicate.', type=str, required=True)
self.add_argument('-o', '--output', help='Name of replicated model.', type=str, required=True)
self.add_argument('-r', '--regenerate', action='store_true', help=const.HELP_REGENERATE)
def _run(self, args: argparse.Namespace) -> int:
"""Execute and process the args."""
try:
log.set_log_level_from_args(args)
return self.replicate_object(args.model, args)
except Exception as e: # pragma: no cover
return handle_generic_command_exception(e, logger, 'Error while replicating model')
@classmethod
def replicate_object(cls, model_alias: str, args: argparse.Namespace) -> int:
"""
Core replicate routine invoked by subcommands.
Args:
model_alias: Name of the top level model in the trestle directory.
args: CLI arguments
Returns:
A return code that can be used as standard posix codes. 0 is success.
"""
logger.debug('Entering replicate_object.')
# 1 Bad working directory if not running from current working directory
trestle_root = args.trestle_root # trestle root is set via command line in args. Default is cwd.
if not trestle_root or not file_utils.is_valid_project_root(trestle_root):
raise TrestleError(f'Given directory: {trestle_root} is not a trestle project.')
plural_path = ModelUtils.model_type_to_model_dir(model_alias)
# 2 Check that input file given exists.
input_file_stem = trestle_root / plural_path / args.name / model_alias
content_type = FileContentType.path_to_content_type(input_file_stem)
if content_type == FileContentType.UNKNOWN:
raise TrestleError(
f'Input file {args.name} has no json or yaml file at expected location {input_file_stem}.'
)
input_file = input_file_stem.with_suffix(FileContentType.to_file_extension(content_type))
# 3 Distributed load from file
_, model_alias, model_instance = ModelUtils.load_distributed(input_file, trestle_root)
rep_model_path = trestle_root / plural_path / args.output / (
model_alias + FileContentType.to_file_extension(content_type)
)
if rep_model_path.exists():
raise TrestleError(f'OSCAL file to be replicated here: {rep_model_path} exists.')
if args.regenerate:
logger.debug(f'regenerating uuids for model {input_file}')
model_instance, uuid_lut, n_refs_updated = ModelUtils.regenerate_uuids(model_instance)
logger.debug(f'{len(uuid_lut)} uuids generated and {n_refs_updated} references updated')
# 4 Prepare actions and plan
top_element = Element(model_instance)
create_action = CreatePathAction(rep_model_path, True)
write_action = WriteFileAction(rep_model_path, top_element, content_type)
# create a plan to create the directory and imported file.
replicate_plan = Plan()
replicate_plan.add_action(create_action)
replicate_plan.add_action(write_action)
replicate_plan.execute()
return CmdReturnCodes.SUCCESS.value
|
Attributes
name = 'replicate' class-attribute instance-attribute
Functions
replicate_object(model_alias, args) classmethod
Core replicate routine invoked by subcommands.
Parameters:
| Name | Type | Description | Default |
model_alias | str | Name of the top level model in the trestle directory. | required |
args | Namespace | | required |
Returns: A return code that can be used as standard posix codes. 0 is success.
Source code in trestle/core/commands/replicate.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 | @classmethod
def replicate_object(cls, model_alias: str, args: argparse.Namespace) -> int:
"""
Core replicate routine invoked by subcommands.
Args:
model_alias: Name of the top level model in the trestle directory.
args: CLI arguments
Returns:
A return code that can be used as standard posix codes. 0 is success.
"""
logger.debug('Entering replicate_object.')
# 1 Bad working directory if not running from current working directory
trestle_root = args.trestle_root # trestle root is set via command line in args. Default is cwd.
if not trestle_root or not file_utils.is_valid_project_root(trestle_root):
raise TrestleError(f'Given directory: {trestle_root} is not a trestle project.')
plural_path = ModelUtils.model_type_to_model_dir(model_alias)
# 2 Check that input file given exists.
input_file_stem = trestle_root / plural_path / args.name / model_alias
content_type = FileContentType.path_to_content_type(input_file_stem)
if content_type == FileContentType.UNKNOWN:
raise TrestleError(
f'Input file {args.name} has no json or yaml file at expected location {input_file_stem}.'
)
input_file = input_file_stem.with_suffix(FileContentType.to_file_extension(content_type))
# 3 Distributed load from file
_, model_alias, model_instance = ModelUtils.load_distributed(input_file, trestle_root)
rep_model_path = trestle_root / plural_path / args.output / (
model_alias + FileContentType.to_file_extension(content_type)
)
if rep_model_path.exists():
raise TrestleError(f'OSCAL file to be replicated here: {rep_model_path} exists.')
if args.regenerate:
logger.debug(f'regenerating uuids for model {input_file}')
model_instance, uuid_lut, n_refs_updated = ModelUtils.regenerate_uuids(model_instance)
logger.debug(f'{len(uuid_lut)} uuids generated and {n_refs_updated} references updated')
# 4 Prepare actions and plan
top_element = Element(model_instance)
create_action = CreatePathAction(rep_model_path, True)
write_action = WriteFileAction(rep_model_path, top_element, content_type)
# create a plan to create the directory and imported file.
replicate_plan = Plan()
replicate_plan.add_action(create_action)
replicate_plan.add_action(write_action)
replicate_plan.execute()
return CmdReturnCodes.SUCCESS.value
|