Skip to content

onset

Onset

https://www.onsetcomp.com/

csv(path, convert_units_to_si=True, read_csv_kwargs=None, standardize_variable_names=True)

Parses the Onset CSV format generate by HOBOware into a xarray object

Inputs

path: The path to the CSV file convert_units_to_si: Whether to standardize data units to SI units read_csv_kwargs: dictionary of keyword arguments to be passed to pd.read_csv standardize_variable_names: Rename the variable names a standardize name convention

Returns:

Type Description
xarray.Dataset

xarray.Dataset

Source code in ocean_data_parser/parsers/onset.py
def csv(
    path: str,
    convert_units_to_si: bool = True,
    read_csv_kwargs: dict = None,
    standardize_variable_names: bool = True,
) -> xarray.Dataset:
    """Parses the Onset CSV format generate by HOBOware into a xarray object
    Inputs:
        path: The path to the CSV file
        convert_units_to_si: Whether to standardize data units to SI units
        read_csv_kwargs: dictionary of keyword arguments to be passed to pd.read_csv
        standardize_variable_names: Rename the variable names a standardize name
        convention
    Returns:
        xarray.Dataset
    """
    if read_csv_kwargs is None:
        read_csv_kwargs = {}
    raw_header = []
    with open(
        path,
        encoding=read_csv_kwargs.get("encoding", "UTF-8"),
        errors=read_csv_kwargs.get("encoding_errors"),
    ) as f:
        raw_header += [f.readline().replace("\n", "")]
        header_lines = 1
        if "Serial Number:" in raw_header[0]:
            # skip second empty line
            header_lines += 1
            f.readline()  #
        # Read csv columns
        raw_header += [f.readline()]

    # Parse onset header
    header, variables = _parse_onset_csv_header(raw_header)

    # Inputs to pd.read_csv
    column_names = [var for var in list(variables.keys()) if var]
    df = pd.read_csv(
        path,
        na_values=[" "],
        sep=",",
        engine="python",
        header=header_lines,
        memory_map=True,
        names=column_names,
        usecols=[id for id, name in enumerate(column_names)],
        **read_csv_kwargs,
    )
    df[header["time_variables"]] = df[header["time_variables"]].applymap(
        lambda x: _parse_onset_time(x, header["timezone"])
    )

    # Convert to dataset
    ds = df.to_xarray()
    ds.attrs = {**global_attributes, **header}
    for var in ds:
        ds[var].attrs = variables[var]

    if standardize_variable_names:
        ds = ds.rename_vars(_standardized_variable_mapping(ds))
        # Detect instrument type based on variables available
        ds.attrs["instrument_type"] = _detect_instrument_type(ds)

    # # Review units and convert SI system
    if convert_units_to_si:
        if standardize_variable_names:
            if "temperature" in ds and ("C" not in ds["temperature"].attrs["units"]):
                logger.warning("Temperaure in farenheit will be converted to celsius")
                ds["temperature"] = _farenheit_to_celsius(ds["temperature"])
                ds["temperature"].attrs["units"] = "degC"
                ds.attrs["history"] += " ".join(
                    [
                        f"{datetime.now()}",
                        f"Convert temperature ({ ds['temperature'].attrs['units']}) to"
                        "degree Celsius [(degF-32)/1.8000]",
                    ]
                )
            if (
                "conductivity" in ds
                and "uS/cm" not in ds["conductivity"].attrs["units"]
            ):
                logger.warning(
                    "Unknown conductivity units (%s)", ds["conductivity"].attrs["units"]
                )
        else:
            logger.warning(
                "Unit conversion is not supported if standardize_variable_names=False"
            )

    # Test daylight saving issue
    # TODO move this daylight saving detection test elsewhere
    dt = ds["time"].diff("index")
    sampling_interval = dt.median().values
    dst_fall = -pd.Timedelta("1h") + sampling_interval
    dst_spring = pd.Timedelta("1h") + sampling_interval
    if any(dt == dst_fall):
        logger.warning(
            (
                "Time gaps (=%s) for sampling interval of %s "
                "suggest a Fall daylight saving issue is present"
            ),
            dst_fall,
            sampling_interval,
        )
    if any(dt == dst_spring):
        logger.warning(
            (
                "Time gaps (=%s) for sampling interval of %s "
                "suggest a Spring daylight saving issue is present"
            ),
            dst_fall,
            sampling_interval,
        )

    ds = standardize_dataset(ds)
    return ds