Skip to content

Commit

Permalink
Security improvements (#17)
Browse files Browse the repository at this point in the history
* Fixed is_url

* Secure `check_against_profile` function

* Renamed `helpers.data->dict`

* Renamed to `is_url_path`

* Implemented `assert_safe_path`

* Ensure `resource.dereference` is secure
  • Loading branch information
roll committed Apr 25, 2024
1 parent f00e814 commit 13ee96d
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 39 deletions.
8 changes: 4 additions & 4 deletions dplib/actions/metadata/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

from ... import settings, types
from ...errors.metadata import MetadataError
from ...helpers.data import read_data
from ...helpers.profile import check_profile
from ...helpers.dict import read_dict
from ...helpers.profile import check_against_profile


def check_metadata(
metadata: Union[str, types.IDict], *, type: types.IMetadataType
) -> List[MetadataError]:
if isinstance(metadata, str):
metadata = read_data(metadata)
metadata = read_dict(metadata)

# Get default profile
if type == "dialect":
Expand All @@ -28,6 +28,6 @@ def check_metadata(

# Validate metadata
profile = metadata.get("$schema", default_profile)
errors = check_profile(metadata=metadata, profile=profile)
errors = check_against_profile(metadata=metadata, profile=profile)

return errors
9 changes: 5 additions & 4 deletions dplib/actions/package/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from ... import types
from ...errors.metadata import MetadataError
from ...helpers.data import read_data
from ...helpers.path import infer_basepath
from ...helpers.dict import read_dict
from ...helpers.path import assert_safe_path, infer_basepath
from ...models import Package
from ..metadata.check import check_metadata

Expand All @@ -25,7 +25,7 @@ def check_package(package: Union[str, types.IDict, Package]) -> List[MetadataErr
basepath = None
if isinstance(package, str):
basepath = infer_basepath(package)
package = read_data(package)
package = read_dict(package)
if isinstance(package, Package):
basepath = package.basepath
package = package.to_dict()
Expand All @@ -38,7 +38,8 @@ def check_package(package: Union[str, types.IDict, Package]) -> List[MetadataErr
for type in ["dialect", "schema"]:
value = resource.get(type) # type: ignore
if isinstance(value, str):
metadata = read_data(value, basepath=basepath)
assert_safe_path(value, basepath=basepath)
metadata = read_dict(value, basepath=basepath)
errors.extend(check_metadata(metadata, type=type)) # type: ignore

return errors
9 changes: 5 additions & 4 deletions dplib/actions/resource/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from ... import types
from ...errors.metadata import MetadataError
from ...helpers.data import read_data
from ...helpers.path import infer_basepath
from ...helpers.dict import read_dict
from ...helpers.path import assert_safe_path, infer_basepath
from ...models import Resource
from ..metadata.check import check_metadata

Expand All @@ -25,7 +25,7 @@ def check_resource(resource: Union[str, types.IDict, Resource]) -> List[Metadata
basepath = None
if isinstance(resource, str):
basepath = infer_basepath(resource)
resource = read_data(resource)
resource = read_dict(resource)
if isinstance(resource, Resource):
basepath = resource.basepath
resource = resource.to_dict()
Expand All @@ -35,7 +35,8 @@ def check_resource(resource: Union[str, types.IDict, Resource]) -> List[Metadata
for type in ["dialect", "schema"]:
value = resource.get(type)
if isinstance(value, str):
metadata = read_data(value, basepath=basepath)
assert_safe_path(value, basepath=basepath)
metadata = read_dict(value, basepath=basepath)
errors.extend(check_metadata(metadata, type=type)) # type: ignore

return errors
18 changes: 9 additions & 9 deletions dplib/helpers/data.py → dplib/helpers/dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@
from .path import infer_format


def read_data(
def read_dict(
path: str, *, format: Optional[str] = None, basepath: Optional[str] = None
) -> types.IDict:
if not format:
format = infer_format(path, raise_missing=True)
text = read_file(path, basepath=basepath)
data = load_data(text, format=format)
data = load_dict(text, format=format)
return data


def write_data(path: str, data: types.IDict, *, format: Optional[str] = None):
def write_dict(path: str, data: types.IDict, *, format: Optional[str] = None):
if not format:
format = infer_format(path, raise_missing=True)
text = dump_data(data, format=format)
text = dump_dict(data, format=format)
write_file(path, text)


def load_data(text: str, *, format: str) -> types.IDict:
def load_dict(text: str, *, format: str) -> types.IDict:
try:
if format == "json":
return json.loads(text)
Expand All @@ -39,7 +39,7 @@ def load_data(text: str, *, format: str) -> types.IDict:
raise Error(f"Cannot load data from text with format: {format}")


def dump_data(data: types.IDict, *, format: str) -> str:
def dump_dict(data: types.IDict, *, format: str) -> str:
try:
if format == "json":
return json.dumps(data, indent=2)
Expand All @@ -51,13 +51,13 @@ def dump_data(data: types.IDict, *, format: str) -> str:
raise Error(f"Cannot dump data to text with format: {format}")


def clean_data(data: types.IDict):
def clean_dict(data: types.IDict):
for key, value in list(data.items()):
if isinstance(value, dict):
clean_data(value) # type: ignore
clean_dict(value) # type: ignore
elif isinstance(value, list):
for item in value: # type: ignore
if isinstance(item, dict):
clean_data(item) # type: ignore
clean_dict(item) # type: ignore
if value is None or value == [] or value == {}:
data.pop(key)
25 changes: 15 additions & 10 deletions dplib/helpers/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def infer_format(path: str, *, raise_missing: bool = False):

def infer_basepath(path: str):
basepath = os.path.dirname(path)
if basepath and not is_remote_path(basepath):
if basepath and not is_url_path(basepath):
if not os.path.abspath(basepath):
basepath = os.path.relpath(basepath, start=os.getcwd())
return basepath
Expand All @@ -38,18 +38,23 @@ def ensure_basepath(path: str, basepath: Optional[str] = None) -> Tuple[str, str
def join_basepath(path: str, basepath: Optional[str] = None) -> str:
if not basepath:
return path
if is_remote_path(path):
if is_url_path(path):
return path
if is_remote_path(basepath):
if is_url_path(basepath):
return f"{basepath}/{path}"
return os.path.join(basepath, path)


def is_remote_path(path: str) -> bool:
path = path[0] if path and isinstance(path, list) else path
def is_url_path(path: str) -> bool:
scheme = urlparse(path).scheme
if not scheme:
return False
if path.lower().startswith(scheme + ":\\"):
return False
return True
return scheme in ["http", "https"]


def assert_safe_path(path: str, *, basepath: Optional[str] = None):
"""Assert that the path (untrusted) is not outside the basepath (trusted)"""
try:
root = Path(basepath or os.getcwd()).resolve()
item = root.joinpath(path).resolve()
item.relative_to(root)
except Exception:
raise Error(f"Path is not safe: {path}")
20 changes: 16 additions & 4 deletions dplib/helpers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
from .. import settings, types
from ..error import Error
from ..errors.metadata import MetadataError
from .data import load_data
from .dict import load_dict
from .file import read_file
from .path import is_url_path

# TODO: implement additional user-side profile caching


def check_profile(*, metadata: types.IDict, profile: str) -> List[MetadataError]:
def check_against_profile(*, metadata: types.IDict, profile: str) -> List[MetadataError]:
# Prepare validator
jsonSchema = read_profile(profile=profile)
Validator = validator_for(jsonSchema) # type: ignore
Expand All @@ -38,12 +39,23 @@ def read_profile(*, profile: str) -> types.IDict:
version, filename = parts
profile = os.path.join(settings.PROFILE_BASEDIR, version, filename)

# Ensure profile is URL
if not is_url_path(profile):
raise Error(f'Profile MUST be a URL: "{profile}"')

# Read jsonSchema
try:
text = read_file(profile)
data = load_data(text, format="json")
data = load_dict(text, format="json")
except Exception:
raise Error(f'Profile MUST be resolvable: "{profile}"')

# Validate jsonSchema
try:
Validator = validator_for(data) # type: ignore
Validator.check_schema(data) # type: ignore
except Exception:
raise Error(f'Cannot read profile: "{profile}"')
raise Error(f'Profile MUST resolve to a valid JSON Schema: "{profile}"')

return data

Expand Down
3 changes: 3 additions & 0 deletions dplib/models/resource/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ... import settings, types
from ...helpers.file import join_basepath
from ...helpers.path import assert_safe_path
from ...system import Model
from ..contributor import Contributor
from ..dialect import Dialect
Expand Down Expand Up @@ -175,8 +176,10 @@ def dereference(self):
It will dereference all the resource's dialects and schemas
"""
if isinstance(self.dialect, str):
assert_safe_path(self.dialect, basepath=self.basepath)
self.dialect = Dialect.from_path(self.dialect, basepath=self.basepath)
if isinstance(self.schema, str):
assert_safe_path(self.schema, basepath=self.basepath)
self.schema = Schema.from_path(self.schema, basepath=self.basepath) # type: ignore

# Converters
Expand Down
8 changes: 4 additions & 4 deletions dplib/system/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .. import types
from ..error import Error
from ..helpers.data import clean_data, dump_data, load_data
from ..helpers.dict import clean_dict, dump_dict, load_dict
from ..helpers.file import read_file, write_file
from ..helpers.path import ensure_basepath, infer_format

Expand Down Expand Up @@ -48,19 +48,19 @@ def from_path(

def to_text(self, *, format: str) -> str:
data = self.to_dict()
text = dump_data(data, format=format)
text = dump_dict(data, format=format)
return text

@classmethod
def from_text(cls, text: str, *, format: str, basepath: Optional[str] = None) -> Self:
data = load_data(text, format=format)
data = load_dict(text, format=format)
return cls.from_dict(data, basepath=basepath)

def to_dict(self):
data = self.model_dump(
mode="json", by_alias=True, exclude_none=True, exclude_defaults=True
)
clean_data(data)
clean_dict(data)
return data

@classmethod
Expand Down

0 comments on commit 13ee96d

Please sign in to comment.