Skip to content

trestle.tasks.csv_to_oscal_mc_utilities

trestle.tasks.csv_to_oscal_mc_utilities ¤

OSCAL utilities.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

HrefManager ¤

Href manager.

Source code in trestle/tasks/csv_to_oscal_mc_utilities.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class HrefManager:
    """Href manager."""

    def __init__(self, root: str = '.') -> None:
        """Initialize."""
        self._root = root
        self._map_href_ci = {}
        self._map_href_type = {}

    def add(self, href: str) -> None:
        """Add."""
        if href not in self._map_href_ci.keys():
            if self.add_catalog(href):
                return
            if self.add_resolved_profile(href):
                return
            raise RuntimeError(f'Error loading href: {href}')

    def add_catalog(self, href: str) -> bool:
        """Add catalog."""
        try:
            catalog_path = Path(href)
            catalog = Catalog.oscal_read(catalog_path)
            catalog_interface = CatalogInterface(catalog)
            self._map_href_ci[href] = catalog_interface
            self._map_href_type[href] = 'catalog'
            logger.info(f'add catalog href: {href}')
            rval = True
        except Exception as e:
            logger.info(f'{e}')
            rval = False
        return rval

    def add_resolved_profile(self, href: str) -> bool:
        """Add resolved profile."""
        try:
            catalog, i_props = ProfileResolver().get_resolved_profile_catalog_and_inherited_props(
                Path(self._root), href
            )
            catalog_interface = CatalogInterface(catalog)
            self._map_href_ci[href] = catalog_interface
            self._map_href_type[href] = 'profile'
            logger.info(f'add resolved profile href: {href}')
            rval = True
        except Exception as e:
            logger.info(f'{e}')
            rval = False
        return rval

    def _get_subpart(self, part: Part, id_name: str) -> Optional[Part]:
        """Get subpart."""
        rval = None
        if part.parts:
            for subpart in part.parts:
                if 'smt' not in subpart.id:
                    continue
                if subpart.id == id_name:
                    rval = subpart
                    break
        if rval:
            logger.debug(f'id_name: {id_name} part.id: {rval.id}')
        return rval

    def _get_part(self, control: Control, id_name: str) -> Optional[Part]:
        """Get part."""
        rval = None
        if control.parts:
            for part in control.parts:
                if 'smt' not in part.id:
                    continue
                if part.id == id_name:
                    rval = part
                    break
                subpart = self._get_subpart(part, id_name)
                if subpart is not None:
                    rval = subpart
                    break
        if rval:
            logger.debug(f'id_name: {id_name} part.id: {rval.id}')
        return rval

    def get_type(self, href: str) -> Optional[str]:
        """Get type."""
        return self._map_href_type[href]

    def get_id(self, href: str, id_name: str) -> Optional[str]:
        """Get id."""
        rval = None
        try:
            ctl_name = id_name.split('_smt')[0]
            catalog_interface = self._map_href_ci[href]
            control = catalog_interface.get_control(ctl_name)
            # insure control exists
            if control is None:
                text = f'control {ctl_name} not found in {href}'
                raise RuntimeError(text)
            if id_name == ctl_name:
                rval = control.id
            else:
                part = self._get_part(control, id_name)
                if part:
                    rval = part.id
        except Exception as e:
            logger.debug(f'{e}')
        return rval

    def get_id_list(self, href: str, id_name_list: List[str]) -> List[str]:
        """Get id list."""
        id_list = []
        for id_name in id_name_list:
            id_ = self.get_id(href, id_name)
            if id_:
                id_list.append(id_)
        return id_list
Functions¤
__init__(root='.') ¤

Initialize.

Source code in trestle/tasks/csv_to_oscal_mc_utilities.py
32
33
34
35
36
def __init__(self, root: str = '.') -> None:
    """Initialize."""
    self._root = root
    self._map_href_ci = {}
    self._map_href_type = {}
add(href) ¤

Add.

Source code in trestle/tasks/csv_to_oscal_mc_utilities.py
38
39
40
41
42
43
44
45
def add(self, href: str) -> None:
    """Add."""
    if href not in self._map_href_ci.keys():
        if self.add_catalog(href):
            return
        if self.add_resolved_profile(href):
            return
        raise RuntimeError(f'Error loading href: {href}')
add_catalog(href) ¤

Add catalog.

Source code in trestle/tasks/csv_to_oscal_mc_utilities.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def add_catalog(self, href: str) -> bool:
    """Add catalog."""
    try:
        catalog_path = Path(href)
        catalog = Catalog.oscal_read(catalog_path)
        catalog_interface = CatalogInterface(catalog)
        self._map_href_ci[href] = catalog_interface
        self._map_href_type[href] = 'catalog'
        logger.info(f'add catalog href: {href}')
        rval = True
    except Exception as e:
        logger.info(f'{e}')
        rval = False
    return rval
add_resolved_profile(href) ¤

Add resolved profile.

Source code in trestle/tasks/csv_to_oscal_mc_utilities.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def add_resolved_profile(self, href: str) -> bool:
    """Add resolved profile."""
    try:
        catalog, i_props = ProfileResolver().get_resolved_profile_catalog_and_inherited_props(
            Path(self._root), href
        )
        catalog_interface = CatalogInterface(catalog)
        self._map_href_ci[href] = catalog_interface
        self._map_href_type[href] = 'profile'
        logger.info(f'add resolved profile href: {href}')
        rval = True
    except Exception as e:
        logger.info(f'{e}')
        rval = False
    return rval
get_id(href, id_name) ¤

Get id.

Source code in trestle/tasks/csv_to_oscal_mc_utilities.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def get_id(self, href: str, id_name: str) -> Optional[str]:
    """Get id."""
    rval = None
    try:
        ctl_name = id_name.split('_smt')[0]
        catalog_interface = self._map_href_ci[href]
        control = catalog_interface.get_control(ctl_name)
        # insure control exists
        if control is None:
            text = f'control {ctl_name} not found in {href}'
            raise RuntimeError(text)
        if id_name == ctl_name:
            rval = control.id
        else:
            part = self._get_part(control, id_name)
            if part:
                rval = part.id
    except Exception as e:
        logger.debug(f'{e}')
    return rval
get_id_list(href, id_name_list) ¤

Get id list.

Source code in trestle/tasks/csv_to_oscal_mc_utilities.py
135
136
137
138
139
140
141
142
def get_id_list(self, href: str, id_name_list: List[str]) -> List[str]:
    """Get id list."""
    id_list = []
    for id_name in id_name_list:
        id_ = self.get_id(href, id_name)
        if id_:
            id_list.append(id_)
    return id_list
get_type(href) ¤

Get type.

Source code in trestle/tasks/csv_to_oscal_mc_utilities.py
110
111
112
def get_type(self, href: str) -> Optional[str]:
    """Get type."""
    return self._map_href_type[href]

handler: python