diff --git a/README.md b/README.md index 54cd4f2..816d0a4 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,32 @@ handle.get_captures() # returns list of 'captures' dictionaries handle.get_annotations() # returns list of all annotations ``` +### Load a SigMF archive with multiple recordings +There are different ways to read an archive using `SigMFArchiveReader` +class, the `sigmffile.fromarchive()` method, and the `sigmffile.fromfile()` +method. + +```python +import numpy as np +from sigmf.archivereader import SigMFArchiveReader + +from sigmf.sigmffile import (fromarchive, + fromfile) +# read multirecording archives using fromarchive +sigmffiles = fromarchive("multi_recording_archive1.sigmf") +print(len(sigmffiles)) + +# read multirecording archives using fromfile +sigmffiles = fromfile("multi_recording_archive1.sigmf") +print(len(sigmffiles)) + +# read multirecording archives using archive reader with collection +reader = SigMFArchiveReader("multi_recording_archive2.sigmf") +print(len(reader)) +print(reader.collection) +print(len(reader.collection.sigmffiles)) # get SigMFFiles from collection +``` + ### Verify SigMF dataset integrity & compliance ```bash @@ -180,6 +206,93 @@ ci16_sigmffile = collection.get_SigMFFile(stream_name='example_ci16') cf32_sigmffile = collection.get_SigMFFile(stream_name='example_cf32') ``` +### Create a SigMF Archive +The `SigMFArchive` class, the `SigMFFile.archive()` method, and the +`SigMFFile.tofile()` method can all be used to create an archive. + +```python +import numpy as np + +from sigmf.sigmffile import (SigMFFile, + SigMFArchive) + + +# create data file +random_data1 = np.random.rand(128) +data1_path = "recording1.sigmf-data" +random_data1.tofile(data1_path) + +# create metadata +sigmf_file_1 = SigMFFile(name='recording1') +sigmf_file_1.set_global_field("core:datatype", "rf32_le") +sigmf_file_1.add_annotation(start_index=0, length=len(random_data1)) +sigmf_file_1.add_capture(start_index=0) +sigmf_file_1.set_data_file(data1_path) + +# create archive using SigMFArchive +archive1 = SigMFArchive(sigmffiles=sigmf_file_1, + path="single_recording_archive1.sigmf") + +# create archive using SigMFFile archive() +archive1_path = sigmf_file_1.archive(file_path="single_recording_archive2.sigmf") + +# create archive using tofile +sigmf_file_1.tofile(file_path="single_recording_archive3.sigmf", + toarchive=True) +``` + +### Create SigMF Archives with Multiple Recordings +Archives with collections can be created using `SigMFArchive` class, +`SigMFCollection.archive()` method, and the `SigMFCollection.tofile()` method. + +```python +import numpy as np + +from sigmf.sigmffile import (SigMFFile, + SigMFArchive, + SigMFCollection) + + +# create data files +random_data1 = np.random.rand(128) +data1_path = "recording1.sigmf-data" +random_data1.tofile(data1_path) + +random_data2 = np.random.rand(128) +data2_path = "recording2.sigmf-data" +random_data2.tofile(data2_path) + +# create metadata +sigmf_file_1 = SigMFFile(name='recording1') +sigmf_file_1.set_global_field("core:datatype", "rf32_le") +sigmf_file_1.add_annotation(start_index=0, length=len(random_data1)) +sigmf_file_1.add_capture(start_index=0) +sigmf_file_1.set_data_file(data1_path) + +sigmf_file_2 = SigMFFile(name='recording2') +sigmf_file_2.set_global_field("core:datatype", "rf32_le") +sigmf_file_2.add_annotation(start_index=0, length=len(random_data2)) +sigmf_file_2.add_capture(start_index=0) +sigmf_file_2.set_data_file(data2_path) + +# create collection +sigmf_file_1.tofile("recording1.sigmf-meta") +sigmf_file_2.tofile("recording2.sigmf-meta") +metafiles = ["recording1.sigmf-meta", "recording2.sigmf-meta"] +collection = SigMFCollection(metafiles=metafiles) + +# create archive using SigMFArchive without collection +sigmffiles = [sigmf_file_1, sigmf_file_2] +archive3 = SigMFArchive(sigmffiles=sigmffiles, + path="multi_recording_archive1.sigmf") + +# create archive using collection archive +archive3_path = collection.archive(file_path="multi_recording_archive2.sigmf") + +# create archive using collection tofile +collection.tofile(file_path="multi_recording_archive3.sigmf", toarchive=True) +``` + ### Load a SigMF Archive and slice its data without untaring it Since an *archive* is merely a tarball (uncompressed), and since there any many diff --git a/sigmf/archive.py b/sigmf/archive.py index de6bd50..5176c9d 100644 --- a/sigmf/archive.py +++ b/sigmf/archive.py @@ -6,12 +6,18 @@ """Create and extract SigMF archives.""" +import collections +from io import BytesIO import os -import shutil import tarfile import tempfile +import time +from typing import BinaryIO, Iterable, Union -from .error import SigMFFileError +import sigmf + + +from .error import SigMFFileError, SigMFValidationError SIGMF_ARCHIVE_EXT = ".sigmf" @@ -21,7 +27,8 @@ class SigMFArchive(): - """Archive a SigMFFile. + """Archive one or more `SigMFFile`s. A collection file can + optionally be included. A `.sigmf` file must include both valid metadata and data. If `self.data_file` is not set or the requested output file @@ -29,51 +36,70 @@ class SigMFArchive(): Parameters: - sigmffile -- A SigMFFile object with valid metadata and data_file - - name -- path to archive file to create. If file exists, overwrite. - If `name` doesn't end in .sigmf, it will be appended. - For example: if `name` == "/tmp/archive1", then the - following archive will be created: - /tmp/archive1.sigmf - - archive1/ - - archive1.sigmf-meta - - archive1.sigmf-data - - fileobj -- If `fileobj` is specified, it is used as an alternative to - a file object opened in binary mode for `name`. It is - supposed to be at position 0. `name` is not required, but - if specified will be used to determine the directory and - file names within the archive. `fileobj` won't be closed. - For example: if `name` == "archive1" and fileobj is given, - a tar archive will be written to fileobj with the - following structure: - - archive1/ - - archive1.sigmf-meta - - archive1.sigmf-data + sigmffiles -- A single SigMFFile or an iterable of SigMFFile objects with + valid metadata and data_files + + collection -- An optional SigMFCollection. + + path -- Path to archive file to create. If file exists, overwrite. + If `path` doesn't end in .sigmf, it will be appended. The + `self.path` instance variable will be updated upon + successful writing of the archive to point to the final + archive path. + + + fileobj -- If `fileobj` is specified, it is used as an alternative to + a file object opened in binary mode for `path`. If + `fileobj` is an open tarfile, it will be appended to. It is + supposed to be at position 0. `fileobj` won't be closed. If + `fileobj` is given, `path` has no effect. + + pretty -- If True, pretty print JSON when creating the metadata and + collection files in the archive. Defaults to True. """ - def __init__(self, sigmffile, name=None, fileobj=None): - self.sigmffile = sigmffile - self.name = name + def __init__(self, + sigmffiles: Union["sigmf.sigmffile.SigMFFile", + Iterable["sigmf.sigmffile.SigMFFile"]], + collection: "sigmf.sigmffile.SigMFCollection" = None, + path: Union[str, os.PathLike] = None, + fileobj: BinaryIO = None, + pretty=True): + + if (not path) and (not fileobj): + raise SigMFFileError("'path' or 'fileobj' required for creating " + "SigMF archive!") + + if isinstance(sigmffiles, sigmf.sigmffile.SigMFFile): + self.sigmffiles = [sigmffiles] + elif (hasattr(collections, "Iterable") and + isinstance(sigmffiles, collections.Iterable)): + self.sigmffiles = sigmffiles + elif isinstance(sigmffiles, collections.abc.Iterable): # python 3.10 + self.sigmffiles = sigmffiles + else: + raise SigMFFileError("Unknown type for sigmffiles argument!") + + if path: + self.path = str(path) + else: + self.path = None self.fileobj = fileobj + self.collection = collection self._check_input() archive_name = self._get_archive_name() + mode = "a" if fileobj is not None else "w" sigmf_fileobj = self._get_output_fileobj() - sigmf_archive = tarfile.TarFile(mode="w", - fileobj=sigmf_fileobj, - format=tarfile.PAX_FORMAT) - tmpdir = tempfile.mkdtemp() - sigmf_md_filename = archive_name + SIGMF_METADATA_EXT - sigmf_md_path = os.path.join(tmpdir, sigmf_md_filename) - sigmf_data_filename = archive_name + SIGMF_DATASET_EXT - sigmf_data_path = os.path.join(tmpdir, sigmf_data_filename) - - with open(sigmf_md_path, "w") as mdfile: - self.sigmffile.dump(mdfile, pretty=True) - - shutil.copy(self.sigmffile.data_file, sigmf_data_path) + try: + sigmf_archive = tarfile.TarFile(mode=mode, + fileobj=sigmf_fileobj, + format=tarfile.PAX_FORMAT) + except tarfile.ReadError: + # fileobj doesn't contain any archives yet, so reopen in 'w' mode + sigmf_archive = tarfile.TarFile(mode='w', + fileobj=sigmf_fileobj, + format=tarfile.PAX_FORMAT) def chmod(tarinfo): if tarinfo.isdir(): @@ -82,47 +108,124 @@ def chmod(tarinfo): tarinfo.mode = 0o644 # -wr-r--r-- return tarinfo - sigmf_archive.add(tmpdir, arcname=archive_name, filter=chmod) + if collection: + with tempfile.NamedTemporaryFile(mode="w") as tmpfile: + collection.dump(tmpfile, pretty=pretty) + tmpfile.flush() + collection_filename = archive_name + SIGMF_COLLECTION_EXT + sigmf_archive.add(tmpfile.name, + arcname=collection_filename, + filter=chmod) + + for sigmffile in self.sigmffiles: + if os.path.sep in sigmffile.name: + parent, _ = os.path.split(sigmffile.name) + self._create_parent_dirs(sigmf_archive, parent, chmod) + sf_md_filename = sigmffile.name + SIGMF_METADATA_EXT + sf_data_filename = sigmffile.name + SIGMF_DATASET_EXT + metadata = sigmffile.dumps(pretty=pretty) + metadata_tarinfo = tarfile.TarInfo(sf_md_filename) + metadata_tarinfo.size = len(metadata) + metadata_tarinfo.mtime = time.time() + metadata_tarinfo = chmod(metadata_tarinfo) + metadata_buffer = BytesIO(metadata.encode("utf-8")) + sigmf_archive.addfile(metadata_tarinfo, fileobj=metadata_buffer) + data_tarinfo = sigmf_archive.gettarinfo(name=sigmffile.data_file, + arcname=sf_data_filename) + data_tarinfo = chmod(data_tarinfo) + with open(sigmffile.data_file, "rb") as data_file: + sigmf_archive.addfile(data_tarinfo, fileobj=data_file) + sigmf_archive.close() if not fileobj: sigmf_fileobj.close() - - shutil.rmtree(tmpdir) + else: + sigmf_fileobj.seek(0) # ensure next open can read this as a tar self.path = sigmf_archive.name - def _check_input(self): - self._ensure_name_has_correct_extension() - self._ensure_data_file_set() - self._validate_sigmffile_metadata() + def _create_parent_dirs(self, _tarfile, sigmffile_name, set_permission): + path_components = sigmffile_name.split(os.path.sep) + current_path = "" + for path in path_components: + current_path = os.path.join(current_path, path) + path_found = False + for member in _tarfile.getmembers(): + if member.name == current_path: + path_found = True + break + if not path_found: + tarinfo = tarfile.TarInfo(current_path) + tarinfo.type = tarfile.DIRTYPE + tarinfo = set_permission(tarinfo) + _tarfile.addfile(tarinfo) - def _ensure_name_has_correct_extension(self): - name = self.name - if name is None: + def _check_input(self): + self._ensure_path_has_correct_extension() + for sigmffile in self.sigmffiles: + self._ensure_sigmffile_name_set(sigmffile) + self._ensure_data_file_set(sigmffile) + self._validate_sigmffile_metadata(sigmffile) + if self.collection: + self._validate_sigmffile_collection(self.collection, + self.sigmffiles) + + def _ensure_path_has_correct_extension(self): + path = self.path + if path is None: return - has_extension = "." in name - has_correct_extension = name.endswith(SIGMF_ARCHIVE_EXT) + has_extension = "." in path + has_correct_extension = path.endswith(SIGMF_ARCHIVE_EXT) if has_extension and not has_correct_extension: - apparent_ext = os.path.splitext(name)[-1] + apparent_ext = os.path.splitext(path)[-1] err = "extension {} != {}".format(apparent_ext, SIGMF_ARCHIVE_EXT) raise SigMFFileError(err) - self.name = name if has_correct_extension else name + SIGMF_ARCHIVE_EXT + self.path = path if has_correct_extension else path + SIGMF_ARCHIVE_EXT + + @staticmethod + def _ensure_sigmffile_name_set(sigmffile): + if not sigmffile.name: + err = "the `name` attribute must be set to pass to `SigMFArchive`" + raise SigMFFileError(err) - def _ensure_data_file_set(self): - if not self.sigmffile.data_file: + @staticmethod + def _ensure_data_file_set(sigmffile): + if not sigmffile.data_file: err = "no data file - use `set_data_file`" raise SigMFFileError(err) - def _validate_sigmffile_metadata(self): - self.sigmffile.validate() + @staticmethod + def _validate_sigmffile_metadata(sigmffile): + sigmffile.validate() + + @staticmethod + def _validate_sigmffile_collection(collectionfile, sigmffiles): + if len(collectionfile) != len(sigmffiles): + raise SigMFValidationError("Mismatched number of recordings " + "between sigmffiles and collection " + "file!") + streams_key = collectionfile.STREAMS_KEY + streams = collectionfile.get_collection_field(streams_key) + sigmf_meta_hashes = [s["hash"] for s in streams] + if not streams: + raise SigMFValidationError("No recordings in collection file!") + for sigmffile in sigmffiles: + with tempfile.NamedTemporaryFile(mode="w") as tmpfile: + sigmffile.dump(tmpfile, pretty=True) + tmpfile.flush() + meta_path = tmpfile.name + sigmf_meta_hash = sigmf.sigmf_hash.calculate_sha512(meta_path) + if sigmf_meta_hash not in sigmf_meta_hashes: + raise SigMFValidationError("SigMFFile given that " + "is not in collection file!") def _get_archive_name(self): - if self.fileobj and not self.name: + if self.fileobj and not self.path: pathname = self.fileobj.name else: - pathname = self.name + pathname = self.path filename = os.path.split(pathname)[-1] archive_name, archive_ext = os.path.splitext(filename) @@ -135,7 +238,7 @@ def _get_output_fileobj(self): if self.fileobj: err = "fileobj {!r} is not byte-writable".format(self.fileobj) else: - err = "can't open {!r} for writing".format(self.name) + err = "can't open {!r} for writing".format(self.path) raise SigMFFileError(err) @@ -146,6 +249,6 @@ def _get_open_fileobj(self): fileobj = self.fileobj fileobj.write(bytes()) # force exception if not byte-writable else: - fileobj = open(self.name, "wb") + fileobj = open(self.path, "wb") return fileobj diff --git a/sigmf/archivereader.py b/sigmf/archivereader.py index 5759b74..8356fe2 100644 --- a/sigmf/archivereader.py +++ b/sigmf/archivereader.py @@ -7,86 +7,115 @@ """Access SigMF archives without extracting them.""" import os -import shutil import tarfile -import tempfile -from . import __version__ #, schema, sigmf_hash, validate -from .sigmffile import SigMFFile -from .archive import SigMFArchive, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT, SIGMF_ARCHIVE_EXT -from .utils import dict_merge +from .sigmffile import SigMFCollection, SigMFFile +from .archive import (SIGMF_COLLECTION_EXT, + SIGMF_DATASET_EXT, + SIGMF_METADATA_EXT, + SIGMF_ARCHIVE_EXT) from .error import SigMFFileError class SigMFArchiveReader(): - """Access data within SigMF archive `tar` in-place without extracting. + """Access data within SigMF archive `tar` in-place without extracting. This + class can be used to iterate through multiple SigMFFiles in the archive. Parameters: - name -- path to archive file to access. If file does not exist, - or if `name` doesn't end in .sigmf, SigMFFileError is raised. - """ - def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_buffer=None): - self.name = name - if self.name is not None: - if not name.endswith(SIGMF_ARCHIVE_EXT): - err = "archive extension != {}".format(SIGMF_ARCHIVE_EXT) - raise SigMFFileError(err) - - tar_obj = tarfile.open(self.name) - - elif archive_buffer is not None: - tar_obj = tarfile.open(fileobj=archive_buffer, mode='r:') - - else: - raise ValueError('In sigmf.archivereader.__init__(), either `name` or `archive_buffer` must be not None') - - json_contents = None - data_offset_size = None - - for memb in tar_obj.getmembers(): - if memb.isdir(): # memb.type == tarfile.DIRTYPE: - # the directory structure will be reflected in the member name - continue - - elif memb.isfile(): # memb.type == tarfile.REGTYPE: - if memb.name.endswith(SIGMF_METADATA_EXT): - json_contents = memb.name - if data_offset_size is None: - # consider a warnings.warn() here; the datafile should be earlier in the - # archive than the metadata, so that updating it (like, adding an annotation) - # is fast. - pass - with tar_obj.extractfile(memb) as memb_fid: - json_contents = memb_fid.read() - - elif memb.name.endswith(SIGMF_DATASET_EXT): - data_offset_size = memb.offset_data, memb.size + path -- path to archive file to access. If file does not exist, + or if `path` doesn't end in .sigmf, SigMFFileError is raised. - else: - print('A regular file', memb.name, 'was found but ignored in the archive') - else: - print('A member of type', memb.type, 'and name', memb.name, 'was found but not handled, just FYI.') + self.sigmffiles will contain the SigMFFile(s) (metadata/data) found in the + archive. - if data_offset_size is None: - raise SigMFFileError('No .sigmf-data file found in archive!') + self.collection will contain the SigMFCollection if found in the archive. + """ + def __init__(self, path=None, skip_checksum=False, map_readonly=True, archive_buffer=None): + self.path = path + tar_obj = None + try: + if self.path is not None: + if not self.path.endswith(SIGMF_ARCHIVE_EXT): + err = "archive extension != {}".format(SIGMF_ARCHIVE_EXT) + raise SigMFFileError(err) - self.sigmffile = SigMFFile(metadata=json_contents) - valid_md = self.sigmffile.validate() + tar_obj = tarfile.open(self.path) - self.sigmffile.set_data_file(self.name, data_buffer=archive_buffer, skip_checksum=skip_checksum, offset=data_offset_size[0], - size_bytes=data_offset_size[1], map_readonly=map_readonly) + elif archive_buffer is not None: + tar_obj = tarfile.open(fileobj=archive_buffer, mode='r:') - self.ndim = self.sigmffile.ndim - self.shape = self.sigmffile.shape + else: + raise ValueError('In sigmf.archivereader.__init__(), either `path` or `archive_buffer` must be not None') + + json_contents = None + data_offset_size = None + sigmffile_name = None + self.sigmffiles = [] + data_found = False + collection_metadata = {} + + for memb in tar_obj.getmembers(): + if memb.isdir(): # memb.type == tarfile.DIRTYPE: + # the directory structure will be reflected in the member name + continue + + elif memb.isfile(): # memb.type == tarfile.REGTYPE: + if memb.name.endswith(SIGMF_METADATA_EXT): + json_contents = memb.name + if data_offset_size is None: + # consider a warnings.warn() here; the datafile should be earlier in the + # archive than the metadata, so that updating it (like, adding an annotation) + # is fast. + pass + with tar_obj.extractfile(memb) as memb_fid: + json_contents = memb_fid.read() + + sigmffile_name, _ = os.path.splitext(memb.name) + elif memb.name.endswith(SIGMF_DATASET_EXT): + data_offset_size = memb.offset_data, memb.size + data_found = True + elif memb.name.endswith(SIGMF_COLLECTION_EXT): + with tar_obj.extractfile(memb) as collection_f: + collection_metadata = collection_f.read() + else: + print('A regular file', memb.name, 'was found but ignored in the archive') + else: + print('A member of type', memb.type, 'and name', memb.name, 'was found but not handled, just FYI.') + + if data_offset_size is not None and json_contents is not None: + sigmffile = SigMFFile(sigmffile_name, + metadata=json_contents) + sigmffile.validate() + + sigmffile.set_data_file(self.path, + data_buffer=archive_buffer, + skip_checksum=skip_checksum, + offset=data_offset_size[0], + size_bytes=data_offset_size[1], + map_readonly=map_readonly) + + self.sigmffiles.append(sigmffile) + data_offset_size = None + json_contents = None + sigmffile_name = None + if collection_metadata: + self.collection = SigMFCollection(metafiles=self.sigmffiles, + metadata=collection_metadata) + else: + self.collection = None - tar_obj.close() + if not data_found: + raise SigMFFileError('No .sigmf-data file found in archive!') + finally: + if tar_obj: + tar_obj.close() def __len__(self): - return self.sigmffile.__len__() + return len(self.sigmffiles) def __iter__(self): - return self.sigmffile.__iter__() + return self.sigmffiles.__iter__() def __getitem__(self, sli): - return self.sigmffile.__getitem__(sli) + return self.sigmffiles.__getitem__(sli) diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 423ab49..a4a894b 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -8,7 +8,9 @@ from collections import OrderedDict import codecs +from io import BytesIO import json +import os import tarfile import tempfile from os import path @@ -18,7 +20,7 @@ from . import __version__, schema, sigmf_hash, validate from .archive import SigMFArchive, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT, SIGMF_ARCHIVE_EXT, SIGMF_COLLECTION_EXT from .utils import dict_merge -from .error import SigMFFileError, SigMFAccessError +from .error import SigMFError, SigMFFileError, SigMFAccessError class SigMFMetafile(): VALID_KEYS = {} @@ -78,6 +80,7 @@ def dump(self, filep, pretty=True): indent=4 if pretty else None, separators=(',', ': ') if pretty else None, ) + filep.write("\n") def dumps(self, pretty=True): ''' @@ -97,7 +100,7 @@ def dumps(self, pretty=True): self.ordered_metadata(), indent=4 if pretty else None, separators=(',', ': ') if pretty else None, - ) + ) + "\n" class SigMFFile(SigMFMetafile): START_INDEX_KEY = "core:sample_start" @@ -148,12 +151,25 @@ class SigMFFile(SigMFMetafile): ] VALID_KEYS = {GLOBAL_KEY: VALID_GLOBAL_KEYS, CAPTURE_KEY: VALID_CAPTURE_KEYS, ANNOTATION_KEY: VALID_ANNOTATION_KEYS} - def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksum=False, map_readonly=True): + def __init__(self, + name, + metadata=None, + data_file=None, + global_info=None, + skip_checksum=False, + map_readonly=True): ''' API for SigMF I/O Parameters ---------- + name: Name used for directory and filenames if archived. + For example, given `name=archive1`, then passing this + sigmffile to SigMFArchive will add the following files + to the archive: + - archive1/ + - archive1.sigmf-meta + - archive1.sigmf-data metadata: str or dict, optional Metadata for associated dataset. data_file: str, optional @@ -183,6 +199,7 @@ def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksu self.set_global_info(global_info) if data_file is not None: self.set_data_file(data_file, skip_checksum=skip_checksum, map_readonly=map_readonly) + self.name = name def __len__(self): return self._memmap.shape[0] @@ -213,6 +230,20 @@ def __getitem__(self, sli): raise ValueError("unhandled ndim in SigMFFile.__getitem__(); this shouldn't happen") return a + def __eq__(self, other): + """Define equality between two `SigMFFile`s. + + Rely on the `core:sha512` value in the metadata to decide whether + `data_file` is the same since the same sigmf archive could be extracted + twice to two different temp directories and the SigMFFiles should still + be equivalent. + + """ + if isinstance(other, SigMFFile): + return self._metadata == other._metadata + + return False + def _get_start_offset(self): """ Return the offset of the first sample. @@ -511,13 +542,33 @@ def validate(self): version = self.get_global_field(self.VERSION_KEY) validate.validate(self._metadata, self.get_schema()) - def archive(self, name=None, fileobj=None): + def archive(self, file_path=None, fileobj=None, pretty=True): """Dump contents to SigMF archive format. - `name` and `fileobj` are passed to SigMFArchive and are defined there. - + Keyword arguments: + file_path -- passed to SigMFArchive`path`. Path to archive file to + create. If file exists, overwrite. If `path` doesn't end + in .sigmf, it will be appended. If not given, `file_path` + will be set to self.name. (default None) + fileobj -- passed to SigMFArchive `fileobj`. If `fileobj` is + specified, it is used as an alternative to a file object + opened in binary mode for `file_path`. If `fileobj` is an + open tarfile, it will be appended to. It is supposed to + be at position 0. `fileobj` won't be closed. If `fileobj` + is given, `file_path` has no effect. (default None) + pretty -- passed to SigMFArchive `pretty`. If True, pretty print + JSON when creating the metadata and collection files in + the archive. (default True). + + Returns the path to the created archive. """ - archive = SigMFArchive(self, name, fileobj) + if file_path is None: + file_path = self.name + + archive = SigMFArchive(self, + path=file_path, + fileobj=fileobj, + pretty=pretty) return archive.path def tofile(self, file_path, pretty=True, toarchive=False, skip_validate=False): @@ -538,11 +589,10 @@ def tofile(self, file_path, pretty=True, toarchive=False, skip_validate=False): self.validate() fns = get_sigmf_filenames(file_path) if toarchive: - self.archive(fns['archive_fn']) + self.archive(fns['archive_fn'], pretty=pretty) else: with open(fns['meta_fn'], 'w') as fp: self.dump(fp, pretty=pretty) - fp.write('\n') # text files should end in carriage return def read_samples_in_capture(self, index=0, autoscale=True): ''' @@ -602,7 +652,10 @@ def read_samples(self, start_index=0, count=-1, autoscale=True, raw_components=F if not self._is_conforming_dataset(): warnings.warn(f'Recording dataset appears non-compliant, resulting data may be erroneous') - return self._read_datafile(first_byte, count * self.get_num_channels(), autoscale, False) + return self._read_datafile(first_byte, + count * self.get_num_channels(), + autoscale, + raw_components) def _read_datafile(self, first_byte, nitems, autoscale, raw_components): ''' @@ -662,23 +715,27 @@ def __init__(self, metafiles=None, metadata=None, skip_checksums=False): Parameters: - metafiles -- A list of SigMF metadata filenames objects comprising the Collection, - there must be at least one file. If the files do not exist, this will - raise a SigMFFileError. + metafiles -- A list of SigMF metadata filenames or SigMFFile + objects comprising the Collection, there must be at least + one file. If the files do not exist, this will raise a + SigMFFileError. metadata -- collection metadata to use, if not provided this will populate a minimal set of default metadata. The core:streams field will be - regenerated automatically + regenerated automatically. Can be str or dict. """ super(SigMFCollection, self).__init__() self.skip_checksums = skip_checksums + self.sigmffiles = [] if metadata is None: self._metadata = {self.COLLECTION_KEY:{}} self._metadata[self.COLLECTION_KEY][self.VERSION_KEY] = __version__ self._metadata[self.COLLECTION_KEY][self.STREAMS_KEY] = [] - else: + elif isinstance(metadata, dict): self._metadata = metadata + else: + self._metadata = json.loads(metadata) if metafiles is None: self.metafiles = [] @@ -694,6 +751,15 @@ def __len__(self): ''' return len(self.get_stream_names()) + def __eq__(self, other): + """Define equality between two `SigMFCollections's by comparing + metadata. + """ + if isinstance(other, SigMFCollection): + return self._metadata == other._metadata + + return False + def verify_stream_hashes(self): ''' compares the stream hashes in the collection metadata to the metadata files @@ -705,23 +771,66 @@ def verify_stream_hashes(self): if path.isfile(metafile_name): new_hash = sigmf_hash.calculate_sha512(filename=metafile_name) if old_hash != new_hash: - raise SigMFFileError(f'Calculated file hash for {metafile_name} does not match collection metadata.') + raise SigMFFileError('Calculated file hash for metadata ' + f'file {metafile_name} does not ' + 'match collection metadata.') + if self.sigmffiles: + sigmffile = [x for x in self.sigmffiles + if x.name == stream.get('name')][0] + sigmffile_meta = sigmffile.dumps() + sigmffile_bytes = sigmffile_meta.encode('utf-8') + size_of_meta = len(sigmffile_bytes) + sigmffile_hash = sigmf_hash.calculate_sha512( + fileobj=BytesIO(sigmffile_bytes), + offset_and_size=(0, size_of_meta) + ) + if old_hash != sigmffile_hash: + raise SigMFFileError('Calculated file hash for SigMFFile ' + f'{sigmffile.name} does not match ' + 'collection metadata.') def set_streams(self, metafiles): ''' - configures the collection `core:streams` field from the specified list of metafiles + configures the collection `core:streams` field from the specified list + of metafiles or SigMFFiles ''' - self.metafiles = metafiles streams = [] - for metafile in self.metafiles: - if metafile.endswith('.sigmf-meta') and path.isfile(metafile): - stream = { - "name": get_sigmf_filenames(metafile)['base_fn'], - "hash": sigmf_hash.calculate_sha512(filename=metafile) - } - streams.append(stream) + sigmffile_names = [] + self.sigmffiles = [] + + for sigmffile in metafiles: + if isinstance(sigmffile, SigMFFile): + sigmffile_names.append(sigmffile.name + SIGMF_METADATA_EXT) + sigmffile_meta = sigmffile.dumps() + sigmffile_bytes = sigmffile_meta.encode('utf-8') + size_of_meta = len(sigmffile_bytes) + streams.append({ + "name": sigmffile.name, + "hash": sigmf_hash.calculate_sha512( + fileobj=BytesIO(sigmffile_bytes), + offset_and_size=(0, size_of_meta)) + }) + self.sigmffiles.append(sigmffile) + elif (isinstance(sigmffile, str) or + isinstance(sigmffile, os.PathLike)): + sigmffile_names.append(str(sigmffile)) + if (str(sigmffile).endswith(SIGMF_METADATA_EXT) and + path.isfile(sigmffile)): + stream = { + "name": get_sigmf_filenames(sigmffile)['base_fn'], + "hash": sigmf_hash.calculate_sha512(filename=sigmffile) + } + streams.append(stream) + else: + raise SigMFFileError(f'Specifed stream file {sigmffile} is' + ' not a valid SigMF Metadata file') + self.sigmffiles.append( + fromfile(sigmffile, skip_checksum=self.skip_checksums) + ) else: - raise SigMFFileError(f'Specifed stream file {metafile} is not a valid SigMF Metadata file') + raise SigMFError("Unknown type, set_streams() input must be" + " list of metafiles or SigMFFiles") + self.metafiles = sigmffile_names self.set_collection_field(self.STREAMS_KEY, streams) def get_stream_names(self): @@ -757,9 +866,40 @@ def get_collection_field(self, key, default=None): """ return self._metadata[self.COLLECTION_KEY].get(key, default) - def tofile(self, file_path, pretty=True): + def archive(self, file_path=None, fileobj=None, pretty=True): + """Dump contents to SigMF archive format. + + Keyword arguments: + file_path -- passed to SigMFArchive`path`. Path to archive file to + create. If file exists, overwrite. If `path` doesn't end + in .sigmf, it will be appended. (default None) + fileobj -- passed to SigMFArchive `fileobj`. If `fileobj` is + specified, it is used as an alternative to a file object + opened in binary mode for `file_path`. If `fileobj` is an + open tarfile, it will be appended to. It is supposed to + be at position 0. `fileobj` won't be closed. If `fileobj` + is given, `file_path` has no effect. (default None) + pretty -- passed to SigMFArchive `pretty`. If True, pretty print + JSON when creating the metadata and collection files in + the archive. (default True). + + Returns the path to the created archive. + """ + + sigmffiles = [] + for name in self.get_stream_names(): + sigmffile = self.get_SigMFFile(name) + sigmffiles.append(sigmffile) + archive = SigMFArchive(sigmffiles, + self, + file_path, + fileobj, + pretty=pretty) + return archive.path + + def tofile(self, file_path, pretty=True, toarchive=False): ''' - Write metadata file + Write metadata file or create archive. Parameters ---------- @@ -767,25 +907,30 @@ def tofile(self, file_path, pretty=True): Location to save. pretty : bool, default True When True will write more human-readable output, otherwise will be flat JSON. + toarchive : bool, default False + If True, create an archive from the collection file and recordings + instead of creating collection metadata file. ''' fns = get_sigmf_filenames(file_path) - with open(fns['collection_fn'], 'w') as fp: - self.dump(fp, pretty=pretty) - fp.write('\n') # text files should end in carriage return + if toarchive: + self.archive(fns['archive_fn'], pretty=pretty) + else: + with open(fns['collection_fn'], 'w') as fp: + self.dump(fp, pretty=pretty) def get_SigMFFile(self, stream_name=None, stream_index=None): ''' Returns the SigMFFile instance of the specified stream if it exists ''' - metafile = None - if stream_name is not None: - if stream_name in self.get_stream_names(): - metafile = stream_name + '.sigmf_meta' - if stream_index is not None and stream_index < self.__len__(): - metafile = self.get_stream_names()[stream_index] + '.sigmf_meta' + sigmffile = None + if self.sigmffiles: + if stream_name is not None: + sigmffile = [x for x in self.sigmffiles + if x.name == stream_name][0] + if stream_index is not None and stream_index < self.__len__(): + sigmffile = self.sigmffiles[stream_index] + return sigmffile - if metafile is not None: - return fromfile(metafile, skip_checksum=self.skip_checksums) def dtype_info(datatype): """ @@ -891,13 +1036,28 @@ def get_dataset_filename_from_metadata(meta_fn, metadata=None): def fromarchive(archive_path, dir=None): - """Extract an archive and return a SigMFFile. + """Extract an archive and return containing SigMFFiles and SigMFCollection. The `dir` parameter is no longer used as this function has been changed to access SigMF archives without extracting them. + + If the archive contains a single recording, a single SigMFFile object will + be returned. If the archive contains multiple recordings a list of + SigMFFile objects will be returned. If the archive contains a collection, + a tuple (SigMFFile(s), SigMFCollection) will be returned. """ from .archivereader import SigMFArchiveReader - return SigMFArchiveReader(archive_path).sigmffile + reader = SigMFArchiveReader(archive_path) + sigmffiles = reader.sigmffiles + sigmffile_ret = None + if len(sigmffiles) == 1: + sigmffile_ret = sigmffiles[0] + else: + sigmffile_ret = sigmffiles + if reader.collection: + return sigmffile_ret, reader.collection + else: + return sigmffile_ret def fromfile(filename, skip_checksum=False): @@ -916,8 +1076,8 @@ def fromfile(filename, skip_checksum=False): Returns ------- - object - SigMFFile object with dataset & metadata or a SigMFCollection depending on the type of file + SigMFFile object(s) with dataset & metadata and/or a SigMFCollection + depending on the type of file or contents of archive. ''' fns = get_sigmf_filenames(filename) meta_fn = fns['meta_fn'] @@ -944,7 +1104,10 @@ def fromfile(filename, skip_checksum=False): meta_fp.close() data_fn = get_dataset_filename_from_metadata(meta_fn, metadata) - return SigMFFile(metadata=metadata, data_file=data_fn, skip_checksum=skip_checksum) + return SigMFFile(name=fns['base_fn'], + metadata=metadata, + data_file=data_fn, + skip_checksum=skip_checksum) def get_sigmf_filenames(filename): diff --git a/tests/conftest.py b/tests/conftest.py index 9a8aa64..60f0be4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,22 +24,63 @@ from sigmf.sigmffile import SigMFFile -from .testdata import TEST_FLOAT32_DATA, TEST_METADATA +from .testdata import (TEST_FLOAT32_DATA_1, + TEST_METADATA_1, + TEST_FLOAT32_DATA_2, + TEST_METADATA_2, + TEST_FLOAT32_DATA_3, + TEST_METADATA_3) @pytest.fixture -def test_data_file(): +def test_data_file_1(): with tempfile.NamedTemporaryFile() as temp: - TEST_FLOAT32_DATA.tofile(temp.name) + TEST_FLOAT32_DATA_1.tofile(temp.name) yield temp @pytest.fixture -def test_sigmffile(test_data_file): - sigf = SigMFFile() - sigf.set_global_field("core:datatype", "rf32_le") - sigf.add_annotation(start_index=0, length=len(TEST_FLOAT32_DATA)) - sigf.add_capture(start_index=0) - sigf.set_data_file(test_data_file.name) - assert sigf._metadata == TEST_METADATA - return sigf +def test_data_file_2(): + with tempfile.NamedTemporaryFile() as t: + TEST_FLOAT32_DATA_2.tofile(t.name) + yield t + + +@pytest.fixture +def test_data_file_3(): + with tempfile.NamedTemporaryFile() as t: + TEST_FLOAT32_DATA_3.tofile(t.name) + yield t + + +@pytest.fixture +def test_sigmffile(test_data_file_1): + f = SigMFFile(name='test1') + f.set_global_field("core:datatype", "rf32_le") + f.add_annotation(start_index=0, length=len(TEST_FLOAT32_DATA_1)) + f.add_capture(start_index=0) + f.set_data_file(test_data_file_1.name) + assert f._metadata == TEST_METADATA_1 + return f + + +@pytest.fixture +def test_alternate_sigmffile(test_data_file_2): + f = SigMFFile(name='test2') + f.set_global_field("core:datatype", "rf32_le") + f.add_annotation(start_index=0, length=len(TEST_FLOAT32_DATA_2)) + f.add_capture(start_index=0) + f.set_data_file(test_data_file_2.name) + assert f._metadata == TEST_METADATA_2 + return f + + +@pytest.fixture +def test_alternate_sigmffile_2(test_data_file_3): + f = SigMFFile(name='test3') + f.set_global_field("core:datatype", "rf32_le") + f.add_annotation(start_index=0, length=len(TEST_FLOAT32_DATA_3)) + f.add_capture(start_index=0) + f.set_data_file(test_data_file_3.name) + assert f._metadata == TEST_METADATA_3 + return f diff --git a/tests/test_archive.py b/tests/test_archive.py index 5c3d67b..0aaa6e6 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -1,5 +1,7 @@ import codecs import json +import os +from pathlib import Path import tarfile import tempfile from os import path @@ -8,10 +10,13 @@ import pytest import jsonschema -from sigmf import error -from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT +from sigmf import error, sigmffile +from sigmf.archive import (SIGMF_COLLECTION_EXT, + SIGMF_DATASET_EXT, + SIGMF_METADATA_EXT, + SigMFArchive) -from .testdata import TEST_FLOAT32_DATA, TEST_METADATA +from .testdata import TEST_FLOAT32_DATA_1, TEST_METADATA_1 def create_test_archive(test_sigmffile, tmpfile): @@ -24,20 +29,20 @@ def test_without_data_file_throws_fileerror(test_sigmffile): test_sigmffile.data_file = None with tempfile.NamedTemporaryFile() as temp: with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(name=temp.name) + test_sigmffile.archive(file_path=temp.name) def test_invalid_md_throws_validationerror(test_sigmffile): del test_sigmffile._metadata["global"]["core:datatype"] # required field with tempfile.NamedTemporaryFile() as temp: with pytest.raises(jsonschema.exceptions.ValidationError): - test_sigmffile.archive(name=temp.name) + test_sigmffile.archive(file_path=temp.name) def test_name_wrong_extension_throws_fileerror(test_sigmffile): with tempfile.NamedTemporaryFile() as temp: with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(name=temp.name + ".zip") + test_sigmffile.archive(file_path=temp.name + ".zip") def test_fileobj_extension_ignored(test_sigmffile): @@ -47,17 +52,18 @@ def test_fileobj_extension_ignored(test_sigmffile): def test_name_used_in_fileobj(test_sigmffile): with tempfile.NamedTemporaryFile() as temp: - sigmf_archive = test_sigmffile.archive(name="testarchive", fileobj=temp) + sigmf_archive = test_sigmffile.archive(file_path="testarchive", + fileobj=temp) sigmf_tarfile = tarfile.open(sigmf_archive, mode="r") - basedir, file1, file2 = sigmf_tarfile.getmembers() - assert basedir.name == "testarchive" + file1, file2 = sigmf_tarfile.getmembers() + assert sigmf_tarfile.name == temp.name def filename(tarinfo): path_root, _ = path.splitext(tarinfo.name) return path.split(path_root)[-1] - assert filename(file1) == "testarchive" - assert filename(file2) == "testarchive" + assert filename(file1) == test_sigmffile.name + assert filename(file2) == test_sigmffile.name def test_fileobj_not_closed(test_sigmffile): @@ -77,39 +83,74 @@ def test_unwritable_name_throws_fileerror(test_sigmffile): # so use invalid filename unwritable_file = '/bad_name/' with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(name=unwritable_file) + test_sigmffile.archive(file_path=unwritable_file) def test_tarfile_layout(test_sigmffile): with tempfile.NamedTemporaryFile() as temp: sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() - assert tarfile.TarInfo.isdir(basedir) + file1, file2 = sigmf_tarfile.getmembers() assert tarfile.TarInfo.isfile(file1) assert tarfile.TarInfo.isfile(file2) def test_tarfile_names_and_extensions(test_sigmffile): with tempfile.NamedTemporaryFile() as temp: + sigmf_tarfile = create_test_archive(test_sigmffile, temp) + file1, file2 = sigmf_tarfile.getmembers() + archive_name = sigmf_tarfile.name + assert archive_name == temp.name + path.split(temp.name)[-1] + file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT} + + file1_name, file1_ext = path.splitext(file1.name) + assert file1_ext in file_extensions + assert path.split(file1_name)[-1] == test_sigmffile.name + + file_extensions.remove(file1_ext) + + file2_name, file2_ext = path.splitext(file2.name) + assert path.split(file2_name)[-1] == test_sigmffile.name + assert file2_ext in file_extensions + + +def test_tarfile_names_and_extensions_with_paths(test_sigmffile): + with tempfile.NamedTemporaryFile() as temp: + test_sigmffile.name = os.path.join("test_folder", "test") sigmf_tarfile = create_test_archive(test_sigmffile, temp) basedir, file1, file2 = sigmf_tarfile.getmembers() - archive_name = basedir.name - assert archive_name == path.split(temp.name)[-1] + assert "test_folder" == basedir.name + archive_name = sigmf_tarfile.name + assert archive_name == temp.name + path.split(temp.name)[-1] file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT} file1_name, file1_ext = path.splitext(file1.name) assert file1_ext in file_extensions - assert path.split(file1_name)[-1] == archive_name + assert file1_name == test_sigmffile.name file_extensions.remove(file1_ext) file2_name, file2_ext = path.splitext(file2.name) - assert path.split(file2_name)[-1] == archive_name + assert file2_name == test_sigmffile.name assert file2_ext in file_extensions +def test_multirec_archive_into_fileobj(test_sigmffile, + test_alternate_sigmffile): + with tempfile.NamedTemporaryFile() as t: + # add first sigmffile to the fileobj t + create_test_archive(test_sigmffile, t) + # add a second one to the same fileobj + multirec_tar = create_test_archive(test_alternate_sigmffile, t) + members = multirec_tar.getmembers() + assert len(members) == 4 # 2 metadata files + 2 data files + + def test_tarfile_persmissions(test_sigmffile): with tempfile.NamedTemporaryFile() as temp: + # add 'test1' to name to create 'test1' folder + test_sigmffile.name = "test1/test1" sigmf_tarfile = create_test_archive(test_sigmffile, temp) basedir, file1, file2 = sigmf_tarfile.getmembers() assert basedir.mode == 0o755 @@ -120,7 +161,7 @@ def test_tarfile_persmissions(test_sigmffile): def test_contents(test_sigmffile): with tempfile.NamedTemporaryFile() as temp: sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() + file1, file2 = sigmf_tarfile.getmembers() if file1.name.endswith(SIGMF_METADATA_EXT): mdfile = file1 datfile = file2 @@ -130,17 +171,201 @@ def test_contents(test_sigmffile): bytestream_reader = codecs.getreader("utf-8") # bytes -> str mdfile_reader = bytestream_reader(sigmf_tarfile.extractfile(mdfile)) - assert json.load(mdfile_reader) == TEST_METADATA + assert json.load(mdfile_reader) == TEST_METADATA_1 datfile_reader = sigmf_tarfile.extractfile(datfile) # calling `fileno` on `tarfile.ExFileObject` throws error (?), but # np.fromfile requires it, so we need this extra step data = np.frombuffer(datfile_reader.read(), dtype=np.float32) - assert np.array_equal(data, TEST_FLOAT32_DATA) + assert np.array_equal(data, TEST_FLOAT32_DATA_1) def test_tarfile_type(test_sigmffile): with tempfile.NamedTemporaryFile() as temp: sigmf_tarfile = create_test_archive(test_sigmffile, temp) assert sigmf_tarfile.format == tarfile.PAX_FORMAT + + +def test_create_archive_pathlike(test_sigmffile, test_alternate_sigmffile): + with tempfile.NamedTemporaryFile() as t: + input_sigmffiles = [test_sigmffile, test_alternate_sigmffile] + arch = SigMFArchive(input_sigmffiles, path=Path(t.name)) + output_sigmf_files = sigmffile.fromarchive(archive_path=arch.path) + assert len(output_sigmf_files) == 2 + assert input_sigmffiles == output_sigmf_files + + +def test_archive_names(test_sigmffile): + with tempfile.NamedTemporaryFile(suffix=".sigmf") as t: + a = SigMFArchive(sigmffiles=test_sigmffile, path=t.name) + assert a.path == t.name + observed_sigmffile = sigmffile.fromarchive(t.name) + assert observed_sigmffile.name == test_sigmffile.name + + with tempfile.NamedTemporaryFile(suffix=".sigmf") as t: + archive_path = test_sigmffile.archive(t.name) + assert archive_path == t.name + observed_sigmffile = sigmffile.fromarchive(t.name) + assert observed_sigmffile.name == test_sigmffile.name + + with tempfile.NamedTemporaryFile(suffix=".sigmf") as t: + test_sigmffile.tofile(t.name, toarchive=True) + observed_sigmffile = sigmffile.fromarchive(t.name) + assert observed_sigmffile.name == test_sigmffile.name + + +def test_single_recording_with_collection(test_sigmffile): + sigmf_meta_file = test_sigmffile.name + SIGMF_METADATA_EXT + try: + with open(sigmf_meta_file, mode="w") as sigmf_meta_fd: + test_sigmffile.dump(sigmf_meta_fd) + test_collection = sigmffile.SigMFCollection([sigmf_meta_file]) + input_collection_json = test_collection.dumps(pretty=True) + with tempfile.NamedTemporaryFile(suffix=".sigmf") as tmpfile: + archive = SigMFArchive(test_sigmffile, + test_collection, + fileobj=tmpfile) + with tarfile.open(archive.path) as tar: + # 1 collection_file + 1 meta file + 1 data file + assert len(tar.getmembers()) == 3 + for member in tar.getmembers(): + if member.isfile(): + if member.name.endswith(SIGMF_COLLECTION_EXT): + collection_file = tar.extractfile(member) + output_collection_json = json.load(collection_file) + assert (json.loads(input_collection_json) == + output_collection_json) + finally: + if os.path.exists(sigmf_meta_file): + os.remove(sigmf_meta_file) + + +def test_multiple_recordings_with_collection(test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2): + sigmf_meta_files = [ + test_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile_2.name + SIGMF_METADATA_EXT + ] + input_sigmf_files = [test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2] + try: + for sigmf_meta_file, sigmf_file in zip(sigmf_meta_files, + input_sigmf_files): + with open(sigmf_meta_file, mode="w") as sigmf_meta_fd: + sigmf_file.dump(sigmf_meta_fd) + test_collection = sigmffile.SigMFCollection(sigmf_meta_files) + input_collection_json = test_collection.dumps(pretty=True) + with tempfile.NamedTemporaryFile(suffix=".sigmf") as tmpfile: + archive = SigMFArchive(input_sigmf_files, + test_collection, + fileobj=tmpfile) + with tarfile.open(archive.path) as tar: + # 1 collection_file + 3 meta file + 3 data file + assert len(tar.getmembers()) == 7 + for member in tar.getmembers(): + if member.isfile(): + if member.name.endswith(SIGMF_COLLECTION_EXT): + collection_file = tar.extractfile(member) + output_collection_json = json.load(collection_file) + assert (json.loads(input_collection_json) == + output_collection_json) + finally: + for sigmf_meta_file in sigmf_meta_files: + if os.path.exists(sigmf_meta_file): + os.remove(sigmf_meta_file) + + +def test_extra_sigmf_file_not_in_collection(test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2): + sigmf_meta_files = [ + test_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile_2.name + SIGMF_METADATA_EXT + ] + input_sigmf_files = [test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2] + try: + for sigmf_meta_file, sigmf_file in zip(sigmf_meta_files, + input_sigmf_files): + with open(sigmf_meta_file, mode="w") as sigmf_meta_fd: + sigmf_file.dump(sigmf_meta_fd) + test_collection = sigmffile.SigMFCollection(sigmf_meta_files[:2]) + with tempfile.NamedTemporaryFile(suffix=".sigmf") as tmpfile: + with pytest.raises(error.SigMFValidationError): + SigMFArchive(input_sigmf_files, + test_collection, + fileobj=tmpfile) + finally: + for sigmf_meta_file in sigmf_meta_files: + if os.path.exists(sigmf_meta_file): + os.remove(sigmf_meta_file) + + +def test_extra_recording_not_in_sigmffiles(test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2): + sigmf_meta_files = [ + test_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile_2.name + SIGMF_METADATA_EXT + ] + input_sigmf_files = [test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2] + try: + for sigmf_meta_file, sigmf_file in zip(sigmf_meta_files, + input_sigmf_files): + with open(sigmf_meta_file, mode="w") as sigmf_meta_fd: + sigmf_file.dump(sigmf_meta_fd) + test_collection = sigmffile.SigMFCollection(sigmf_meta_files) + + with tempfile.NamedTemporaryFile(suffix=".sigmf") as tmpfile: + with pytest.raises(error.SigMFValidationError): + SigMFArchive(input_sigmf_files[:2], + test_collection, + fileobj=tmpfile) + finally: + for sigmf_meta_file in sigmf_meta_files: + if os.path.exists(sigmf_meta_file): + os.remove(sigmf_meta_file) + + +def test_mismatched_sigmffiles_collection(test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2): + sigmf_meta_files = [ + test_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile_2.name + SIGMF_METADATA_EXT + ] + input_sigmf_files = [test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2] + try: + for sigmf_meta_file, sigmf_file in zip(sigmf_meta_files, + input_sigmf_files): + with open(sigmf_meta_file, mode="w") as sigmf_meta_fd: + sigmf_file.dump(sigmf_meta_fd) + test_collection = sigmffile.SigMFCollection(sigmf_meta_files[:2]) + + with tempfile.NamedTemporaryFile(suffix=".sigmf") as tmpfile: + with pytest.raises(error.SigMFValidationError): + SigMFArchive(input_sigmf_files[1:3], + test_collection, + fileobj=tmpfile) + finally: + for sigmf_meta_file in sigmf_meta_files: + if os.path.exists(sigmf_meta_file): + os.remove(sigmf_meta_file) + + +def test_archive_no_path_or_fileobj(test_sigmffile): + """Error should be raised when no path or fileobj given.""" + with pytest.raises(error.SigMFFileError): + SigMFArchive(test_sigmffile) diff --git a/tests/test_archivereader.py b/tests/test_archivereader.py index 2b5b449..04d682b 100644 --- a/tests/test_archivereader.py +++ b/tests/test_archivereader.py @@ -1,15 +1,13 @@ -import codecs -import json -import tarfile +import os +import shutil import tempfile -from os import path import numpy as np -import pytest -from sigmf import error from sigmf import SigMFFile, SigMFArchiveReader -from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT +from sigmf.archive import SIGMF_METADATA_EXT, SigMFArchive +from sigmf.sigmffile import SigMFCollection + def test_access_data_without_untar(test_sigmffile): global_info = { @@ -24,7 +22,7 @@ def test_access_data_without_untar(test_sigmffile): "core:datetime": "2021-06-18T23:17:51.163959Z", "core:sample_start": 0 } - + NUM_ROWS = 5 for dt in "ri16_le", "ci16_le", "rf32_le", "rf64_le", "cf32_le", "cf64_le": @@ -33,7 +31,7 @@ def test_access_data_without_untar(test_sigmffile): global_info["core:num_channels"] = num_chan base_filename = dt + '_' + str(num_chan) archive_filename = base_filename + '.sigmf' - + a = np.arange(NUM_ROWS * num_chan * (2 if 'c' in dt else 1)) if 'i16' in dt: b = a.astype(np.int16) @@ -43,12 +41,153 @@ def test_access_data_without_untar(test_sigmffile): b = a.astype(np.float64) else: raise ValueError('whoops') - + test_sigmffile.data_file = None with tempfile.NamedTemporaryFile() as temp: b.tofile(temp.name) - meta = SigMFFile(data_file=temp.name, global_info=global_info) + meta = SigMFFile("test", data_file=temp.name, global_info=global_info) meta.add_capture(0, metadata=capture_info) meta.tofile(archive_filename, toarchive=True) archi = SigMFArchiveReader(archive_filename, skip_checksum=True) + + +def test_extract_single_recording(test_sigmffile): + with tempfile.NamedTemporaryFile() as tf: + expected_sigmffile = test_sigmffile + arch = SigMFArchive(expected_sigmffile, path=tf.name) + reader = SigMFArchiveReader(arch.path) + assert len(reader) == 1 + actual_sigmffile = reader[0] + assert expected_sigmffile == actual_sigmffile + + +def test_extract_multi_recording(test_sigmffile, test_alternate_sigmffile): + with tempfile.NamedTemporaryFile() as tf: + # Create a multi-recording archive + expected_sigmffiles = [test_sigmffile, test_alternate_sigmffile] + arch = SigMFArchive(expected_sigmffiles, path=tf.name) + reader = SigMFArchiveReader(arch.path) + assert len(reader) == 2 + for expected in expected_sigmffiles: + assert expected in reader.sigmffiles + + +def test_extract_single_recording_with_collection(test_sigmffile): + try: + meta_filepath = test_sigmffile.name + SIGMF_METADATA_EXT + with open(meta_filepath, "w") as meta_fd: + test_sigmffile.dump(meta_fd) + collection = SigMFCollection(metafiles=[meta_filepath]) + archive_path = "test_archive.sigmf" + arch = SigMFArchive(test_sigmffile, collection, path=archive_path) + reader = SigMFArchiveReader(arch.path) + assert len(reader) == 1 + actual_sigmffile = reader[0] + assert test_sigmffile == actual_sigmffile + assert collection == reader.collection + finally: + if os.path.exists(meta_filepath): + os.remove(meta_filepath) + if os.path.exists(archive_path): + os.remove(archive_path) + + +def test_extract_multi_recording_with_collection(test_sigmffile, + test_alternate_sigmffile): + try: + meta1_filepath = test_sigmffile.name + SIGMF_METADATA_EXT + with open(meta1_filepath, "w") as meta_fd: + test_sigmffile.dump(meta_fd) + meta2_filepath = test_alternate_sigmffile.name + SIGMF_METADATA_EXT + with open(meta2_filepath, "w") as meta_fd: + test_alternate_sigmffile.dump(meta_fd) + collection = SigMFCollection(metafiles=[meta1_filepath, + meta2_filepath]) + archive_path = "test_archive.sigmf" + input_sigmffiles = [test_sigmffile, test_alternate_sigmffile] + arch = SigMFArchive(input_sigmffiles, collection, path=archive_path) + reader = SigMFArchiveReader(arch.path) + assert len(reader) == 2 # number of SigMFFiles + for actual_sigmffile in reader: + assert actual_sigmffile in input_sigmffiles + assert collection == reader.collection + finally: + if os.path.exists(meta1_filepath): + os.remove(meta1_filepath) + if os.path.exists(meta2_filepath): + os.remove(meta2_filepath) + if os.path.exists(archive_path): + os.remove(archive_path) + + +def test_archivereader_different_folder(test_sigmffile, + test_alternate_sigmffile): + try: + os.makedirs("folder1", exist_ok=True) + test_sigmffile.name = os.path.join("folder1", "test1") + os.makedirs("folder2", exist_ok=True) + test_alternate_sigmffile.name = os.path.join("folder2", "test2") + meta1_filepath = test_sigmffile.name + SIGMF_METADATA_EXT + with open(meta1_filepath, "w") as meta_fd: + test_sigmffile.dump(meta_fd) + meta2_filepath = test_alternate_sigmffile.name + SIGMF_METADATA_EXT + with open(meta2_filepath, "w") as meta_fd: + test_alternate_sigmffile.dump(meta_fd) + collection = SigMFCollection(metafiles=[meta1_filepath, + meta2_filepath]) + os.makedirs("archive_folder", exist_ok=True) + archive_path = os.path.join("archive_folder", "test_archive.sigmf") + input_sigmffiles = [test_sigmffile, test_alternate_sigmffile] + arch = SigMFArchive(input_sigmffiles, collection, path=archive_path) + reader = SigMFArchiveReader(arch.path) + assert len(reader) == 2 # number of SigMFFiles + for actual_sigmffile in reader: + assert actual_sigmffile in input_sigmffiles + assert collection == reader.collection + finally: + if os.path.exists(meta1_filepath): + os.remove(meta1_filepath) + if os.path.exists(meta2_filepath): + os.remove(meta2_filepath) + if os.path.exists(archive_path): + os.remove(archive_path) + if os.path.exists("folder1"): + shutil.rmtree("folder1") + if os.path.exists("folder2"): + shutil.rmtree("folder2") + if os.path.exists("archive_folder"): + shutil.rmtree("archive_folder") + + +def test_archivereader_same_folder(test_sigmffile, + test_alternate_sigmffile): + try: + os.makedirs("folder1", exist_ok=True) + test_sigmffile.name = os.path.join("folder1", "test1") + test_alternate_sigmffile.name = os.path.join("folder1", "test2") + meta1_filepath = test_sigmffile.name + SIGMF_METADATA_EXT + with open(meta1_filepath, "w") as meta_fd: + test_sigmffile.dump(meta_fd) + meta2_filepath = test_alternate_sigmffile.name + SIGMF_METADATA_EXT + with open(meta2_filepath, "w") as meta_fd: + test_alternate_sigmffile.dump(meta_fd) + collection = SigMFCollection(metafiles=[meta1_filepath, + meta2_filepath]) + archive_path = os.path.join("folder1", "test_archive.sigmf") + input_sigmffiles = [test_sigmffile, test_alternate_sigmffile] + arch = SigMFArchive(input_sigmffiles, collection, path=archive_path) + reader = SigMFArchiveReader(arch.path) + assert len(reader) == 2 # number of SigMFFiles + for actual_sigmffile in reader: + assert actual_sigmffile in input_sigmffiles + assert collection == reader.collection + finally: + if os.path.exists(meta1_filepath): + os.remove(meta1_filepath) + if os.path.exists(meta2_filepath): + os.remove(meta2_filepath) + if os.path.exists(archive_path): + os.remove(archive_path) + if os.path.exists("folder1"): + shutil.rmtree("folder1") diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index e371964..491a1fa 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -26,7 +26,9 @@ import unittest from sigmf import sigmffile, utils -from sigmf.sigmffile import SigMFFile +from sigmf.archivereader import SigMFArchiveReader +from sigmf.sigmffile import SigMFCollection, SigMFFile, fromarchive +from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT, SigMFArchive from .testdata import * @@ -35,8 +37,10 @@ class TestClassMethods(unittest.TestCase): def setUp(self): '''assure tests have a valid SigMF object to work with''' _, temp_path = tempfile.mkstemp() - TEST_FLOAT32_DATA.tofile(temp_path) - self.sigmf_object = SigMFFile(TEST_METADATA, data_file=temp_path) + TEST_FLOAT32_DATA_1.tofile(temp_path) + self.sigmf_object = SigMFFile("test", + TEST_METADATA_1, + data_file=temp_path) def test_iterator_basic(self): '''make sure default batch_size works''' @@ -64,39 +68,103 @@ def simulate_capture(sigmf_md, n, capture_len): def test_default_constructor(): - SigMFFile() + SigMFFile(name="test") def test_set_non_required_global_field(): - sigf = SigMFFile() + sigf = SigMFFile(name="test") sigf.set_global_field('this_is:not_in_the_schema', None) def test_add_capture(): - sigf = SigMFFile() + sigf = SigMFFile(name="test") sigf.add_capture(start_index=0, metadata={}) def test_add_annotation(): - sigf = SigMFFile() + sigf = SigMFFile(name="test") sigf.add_capture(start_index=0) meta = {"latitude": 40.0, "longitude": -105.0} sigf.add_annotation(start_index=0, length=128, metadata=meta) +def test_add_annotation_with_duplicate_key(): + f = SigMFFile(name="test") + f.add_capture(start_index=0) + m1 = {"latitude": 40.0, "longitude": -105.0} + f.add_annotation(start_index=0, length=128, metadata=m1) + m2 = {"latitude": 50.0, "longitude": -115.0} + f.add_annotation(start_index=0, length=128, metadata=m2) + assert len(f.get_annotations(64)) == 2 + + def test_fromarchive(test_sigmffile): print("test_sigmffile is:\n", test_sigmffile) tf = tempfile.mkstemp()[1] td = tempfile.mkdtemp() - archive_path = test_sigmffile.archive(name=tf) + archive_path = test_sigmffile.archive(file_path=tf) result = sigmffile.fromarchive(archive_path=archive_path, dir=td) - assert result._metadata == test_sigmffile._metadata == TEST_METADATA + assert result == test_sigmffile os.remove(tf) shutil.rmtree(td) +def test_fromarchive_multi_recording(test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2): + # single recording + with tempfile.NamedTemporaryFile(suffix=".sigmf") as t_file: + path = t_file.name + test_sigmffile.archive(fileobj=t_file) + single_sigmffile = fromarchive(path) + assert isinstance(single_sigmffile, SigMFFile) + assert single_sigmffile == test_sigmffile + + # 2 recordings + with tempfile.NamedTemporaryFile(suffix=".sigmf") as t_file: + path = t_file.name + input_sigmffiles = [test_sigmffile, test_alternate_sigmffile] + SigMFArchive(input_sigmffiles, fileobj=t_file) + sigmffile_one, sigmffile_two = fromarchive(path) + assert isinstance(sigmffile_one, SigMFFile) + assert sigmffile_one == test_sigmffile + assert isinstance(sigmffile_two, SigMFFile) + assert sigmffile_two == test_alternate_sigmffile + + # 3 recordings + with tempfile.NamedTemporaryFile(suffix=".sigmf") as t_file: + path = t_file.name + input_sigmffiles = [test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2] + SigMFArchive(input_sigmffiles, fileobj=t_file) + list_of_sigmffiles = fromarchive(path) + assert len(list_of_sigmffiles) == 3 + assert isinstance(list_of_sigmffiles[0], SigMFFile) + assert list_of_sigmffiles[0] == test_sigmffile + assert isinstance(list_of_sigmffiles[1], SigMFFile) + assert list_of_sigmffiles[1] == test_alternate_sigmffile + assert isinstance(list_of_sigmffiles[2], SigMFFile) + assert list_of_sigmffiles[2] == test_alternate_sigmffile_2 + + +def test_fromarchive_multirec_with_collection(test_sigmffile, + test_alternate_sigmffile): + with tempfile.NamedTemporaryFile(delete=True) as tf: + # Create a multi-recording archive with collection + in_sigmffiles = [test_sigmffile, test_alternate_sigmffile] + in_collection = SigMFCollection(in_sigmffiles) + arch = SigMFArchive(in_sigmffiles, + collection=in_collection, + path=tf.name) + out_sigmffiles, out_collection = fromarchive(archive_path=arch.path) + assert len(out_sigmffiles) == 2 + assert in_sigmffiles == out_sigmffiles + assert in_collection == out_collection + + def test_add_multiple_captures_and_annotations(): - sigf = SigMFFile() + sigf = SigMFFile(name="test") for idx in range(3): simulate_capture(sigf, idx, 1024) @@ -124,6 +192,7 @@ def test_multichannel_types(): # for real or complex check_count = raw_count * 1 # deepcopy temp_signal = SigMFFile( + name="test", data_file=temp_path, global_info={ SigMFFile.DATATYPE_KEY: f'{complex_prefix}{key}_le', @@ -149,6 +218,7 @@ def test_multichannel_seek(): # write some dummy data and read back np.arange(18, dtype=np.uint16).tofile(temp_path) temp_signal = SigMFFile( + name="test", data_file=temp_path, global_info={ SigMFFile.DATATYPE_KEY: 'cu16_le', @@ -163,7 +233,7 @@ def test_multichannel_seek(): def test_key_validity(): '''assure the keys in test metadata are valid''' - for top_key, top_val in TEST_METADATA.items(): + for top_key, top_val in TEST_METADATA_1.items(): if type(top_val) is dict: for core_key in top_val.keys(): assert core_key in vars(SigMFFile)[f'VALID_{top_key.upper()}_KEYS'] @@ -178,7 +248,7 @@ def test_key_validity(): def test_ordered_metadata(): '''check to make sure the metadata is sorted as expected''' - sigf = SigMFFile() + sigf = SigMFFile(name="test") top_sort_order = ['global', 'captures', 'annotations'] for kdx, key in enumerate(sigf.ordered_metadata()): assert kdx == top_sort_order.index(key) @@ -242,3 +312,104 @@ def test_captures_checking(): assert (160,224) == sigmf4.get_capture_byte_boundarys(1) assert np.array_equal(np.array(range(64)), sigmf4.read_samples_in_capture(0,autoscale=False)[:,0]) assert np.array_equal(np.array(range(64,96)), sigmf4.read_samples_in_capture(1,autoscale=False)[:,1]) + + +def test_archive_collection(test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2): + sigmf_meta_files = [ + test_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile.name + SIGMF_METADATA_EXT, + test_alternate_sigmffile_2.name + SIGMF_METADATA_EXT + ] + input_sigmf_files = [test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2] + data = [TEST_FLOAT32_DATA_1, TEST_FLOAT32_DATA_2, TEST_FLOAT32_DATA_3] + try: + for sigmf_meta_file, sigmf_file, _data in zip(sigmf_meta_files, + input_sigmf_files, + data): + with open(sigmf_meta_file, mode="w") as sigmf_meta_fd: + sigmf_file.dump(sigmf_meta_fd) + sample_data = sigmf_file.read_samples(autoscale=False, + raw_components=True) + assert np.array_equal(sample_data, _data) + sample_data.tofile(sigmf_file.name + SIGMF_DATASET_EXT) + test_collection = sigmffile.SigMFCollection(sigmf_meta_files) + with tempfile.NamedTemporaryFile(suffix=".sigmf") as tmpfile: + archive_path = test_collection.archive(fileobj=tmpfile) + archive_reader = SigMFArchiveReader(path=archive_path) + for input_sigmf_file in input_sigmf_files: + assert input_sigmf_file in archive_reader.sigmffiles + assert test_collection == archive_reader.collection + with tempfile.NamedTemporaryFile(suffix=".sigmf") as tmpfile: + test_collection.tofile(tmpfile.name, toarchive=True) + archive_reader = SigMFArchiveReader(path=tmpfile.name) + for input_sigmf_file in input_sigmf_files: + assert input_sigmf_file in archive_reader.sigmffiles + assert test_collection == archive_reader.collection + for input_sigmf_file in input_sigmf_files: + assert input_sigmf_file in test_collection.sigmffiles + finally: + for sigmf_meta_file in sigmf_meta_files: + if os.path.exists(sigmf_meta_file): + os.remove(sigmf_meta_file) + for sigmf_file in input_sigmf_files: + filename = sigmf_file.name + SIGMF_DATASET_EXT + if os.path.exists(filename): + os.remove(filename) + + +def test_create_collection_with_sigmffiles(test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2): + input_sigmf_files = [test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2] + collection = SigMFCollection(metafiles=input_sigmf_files) + output_stream_names = collection.get_stream_names() + output_sigmf_files_by_name = [] + for stream_name in output_stream_names: + output_sigmf_file = collection.get_SigMFFile(stream_name=stream_name) + output_sigmf_files_by_name.append(output_sigmf_file) + output_sigmf_files_by_index = [] + for i in range(len(collection)): + output_sigmf_file = collection.get_SigMFFile(stream_index=i) + output_sigmf_files_by_index.append(output_sigmf_file) + for input_sigmf in input_sigmf_files: + assert input_sigmf.name in output_stream_names + assert input_sigmf in output_sigmf_files_by_name + assert input_sigmf in output_sigmf_files_by_index + + +def test_collection_set_sigmffiles(test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2): + try: + input_sigmf_files = [test_sigmffile, + test_alternate_sigmffile, + test_alternate_sigmffile_2] + third_sigmf_meta_filename = test_alternate_sigmffile_2.name + SIGMF_METADATA_EXT + streams_input = [test_sigmffile, test_alternate_sigmffile, third_sigmf_meta_filename] + with open(third_sigmf_meta_filename, "w") as out_f: + test_alternate_sigmffile_2.dump(out_f) + + collection = SigMFCollection(metafiles=[test_sigmffile]) + collection.set_streams(streams_input) + output_stream_names = collection.get_stream_names() + output_sigmf_files_by_name = [] + for stream_name in output_stream_names: + output_sigmf_file = collection.get_SigMFFile(stream_name=stream_name) + output_sigmf_files_by_name.append(output_sigmf_file) + output_sigmf_files_by_index = [] + for i in range(len(collection)): + output_sigmf_file = collection.get_SigMFFile(stream_index=i) + output_sigmf_files_by_index.append(output_sigmf_file) + for input_sigmf in input_sigmf_files: + assert input_sigmf.name in output_stream_names + assert input_sigmf in output_sigmf_files_by_name + assert input_sigmf in output_sigmf_files_by_index + finally: + if os.path.exists(third_sigmf_meta_filename): + os.remove(third_sigmf_meta_filename) diff --git a/tests/test_validation.py b/tests/test_validation.py index 75cf048..57a186c 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -26,18 +26,19 @@ from jsonschema.exceptions import ValidationError -from .testdata import TEST_FLOAT32_DATA, TEST_METADATA +from .testdata import TEST_FLOAT32_DATA_1, TEST_METADATA_1 def test_valid_data(): '''assure the supplied metadata is OK''' - invalid_metadata = dict(TEST_METADATA) - SigMFFile(TEST_METADATA).validate() + invalid_metadata = dict(TEST_METADATA_1) + SigMFFile("test", TEST_METADATA_1).validate() + class FailingCases(unittest.TestCase): '''Cases where the validator should throw an exception.''' def setUp(self): - self.metadata = dict(TEST_METADATA) + self.metadata = dict(TEST_METADATA_1) def test_extra_top_level_key(self): '''no extra keys allowed on the top level''' @@ -45,7 +46,7 @@ def test_extra_top_level_key(self): with self.assertRaises(ValidationError): SigMFFile(self.metadata).validate() - def test_extra_top_level_key(self): + def test_invalid_label(self): '''label must be less than 20 chars''' self.metadata[SigMFFile.ANNOTATION_KEY][0][SigMFFile.LABEL_KEY] = 'a' * 21 with self.assertRaises(ValidationError): @@ -83,7 +84,7 @@ def test_invalid_annotation_order(self): def test_invalid_hash(self): _, temp_path = tempfile.mkstemp() - TEST_FLOAT32_DATA.tofile(temp_path) + TEST_FLOAT32_DATA_1.tofile(temp_path) self.metadata[SigMFFile.GLOBAL_KEY][SigMFFile.HASH_KEY] = 'derp' with self.assertRaises(sigmf.error.SigMFFileError): - SigMFFile(metadata=self.metadata, data_file=temp_path) + SigMFFile(name="test", metadata=self.metadata, data_file=temp_path) diff --git a/tests/testdata.py b/tests/testdata.py index 0a0d5ed..db55c56 100644 --- a/tests/testdata.py +++ b/tests/testdata.py @@ -25,9 +25,9 @@ from sigmf import __version__ from sigmf import SigMFFile -TEST_FLOAT32_DATA = np.arange(16, dtype=np.float32) +TEST_FLOAT32_DATA_1 = np.arange(16, dtype=np.float32) -TEST_METADATA = { +TEST_METADATA_1 = { SigMFFile.ANNOTATION_KEY: [{SigMFFile.LENGTH_INDEX_KEY: 16, SigMFFile.START_INDEX_KEY: 0}], SigMFFile.CAPTURE_KEY: [{SigMFFile.START_INDEX_KEY: 0}], SigMFFile.GLOBAL_KEY: { @@ -38,6 +38,32 @@ } } +TEST_FLOAT32_DATA_2 = np.arange(16, 32, dtype=np.float32) + +TEST_METADATA_2 = { + SigMFFile.ANNOTATION_KEY: [{SigMFFile.LENGTH_INDEX_KEY: 16, SigMFFile.START_INDEX_KEY: 0}], + SigMFFile.CAPTURE_KEY: [{SigMFFile.START_INDEX_KEY: 0}], + SigMFFile.GLOBAL_KEY: { + SigMFFile.DATATYPE_KEY: 'rf32_le', + SigMFFile.HASH_KEY: 'a85018cf117a4704596c0f360dbc3fce2d0d561966d865b9b8a356634161bde6a528c5181837890a9f4d54243e2e8eaf7e19bd535e54e3e34aabf76793723d03', + SigMFFile.NUM_CHANNELS_KEY: 1, + SigMFFile.VERSION_KEY: __version__ + } +} + +TEST_FLOAT32_DATA_3 = np.arange(32, 48, dtype=np.float32) + +TEST_METADATA_3 = { + SigMFFile.ANNOTATION_KEY: [{SigMFFile.LENGTH_INDEX_KEY: 16, SigMFFile.START_INDEX_KEY: 0}], + SigMFFile.CAPTURE_KEY: [{SigMFFile.START_INDEX_KEY: 0}], + SigMFFile.GLOBAL_KEY: { + SigMFFile.DATATYPE_KEY: 'rf32_le', + SigMFFile.HASH_KEY: '089753bd48a1724c485e822eaf4d510491e4e54faa83cc3e7b3f18a9f651813190862aa97c922278454c66f20a741050762e008cbe4f96f3bd0dcdb7d720179d', + SigMFFile.NUM_CHANNELS_KEY: 1, + SigMFFile.VERSION_KEY: __version__ + } +} + # Data0 is a test of a compliant two capture recording TEST_U8_DATA0 = list(range(256)) TEST_U8_META0 = {