Skip to content

sgnligo.transforms.bit_mask

An element to apply a bit mask on the incoming data.

BitMask dataclass

Bases: TSTransform

Apply the bit mask on the incoming data.

Parameters:

Name Type Description Default
bit_mask Optional[int]

int, the bit mask to apply on the data. Data is passed through if data_bit & bit_mask == bit_mask. The data not matching the bit mask will be turned into gap buffers.

None
Source code in sgnligo/transforms/bit_mask.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@dataclass
class BitMask(TSTransform):
    """Apply the bit mask on the incoming data.

    Args:
        bit_mask:
            int, the bit mask to apply on the data. Data is passed through if
            data_bit & bit_mask == bit_mask. The data not matching the bit mask will
            be turned into gap buffers.
    """

    bit_mask: Optional[int] = None

    def __post_init__(self):
        super().__post_init__()

        assert self.bit_mask is not None

        # set state vector on bits
        self.bit_mask = state_vector_on_off_bits(self.bit_mask)

    def new(self, pad: SourcePad) -> TSFrame:
        """Produce non-gap buffers if the bit of the data passes the bit mask,
        otherwise, produce gap buffers.

        Args:
            pad:
                SourcePad, the source pad to produce TSFrames

        Returns:
            TSFrame, the TSFrame carrying buffers with bit mask applied
        """
        frame = self.preparedframes[self.sink_pads[0]]
        metadata = frame.metadata

        outbufs = []
        for buf in frame:
            if not buf.is_gap:
                buf_offset = buf.offset
                sample_rate = buf.sample_rate

                state_flags = [
                    state_vector_on_off_bits(b) & self.bit_mask == self.bit_mask
                    for b in buf.data
                ]

                if all(state_flags):
                    # Pass through
                    outbufs.append(buf)
                elif not any(state_flags):
                    # Full gap
                    buf.set_data(None)
                    outbufs.append(buf)
                else:
                    for b in buf.data:
                        bit = state_vector_on_off_bits(b)
                        if bit & self.bit_mask == self.bit_mask:
                            outbuf = SeriesBuffer(
                                offset=buf_offset,
                                sample_rate=sample_rate,
                                data=np.array([b]),
                                shape=(1,),
                            )
                        else:
                            outbuf = SeriesBuffer(
                                offset=buf_offset,
                                sample_rate=sample_rate,
                                data=None,
                                shape=(1,),
                            )
                        outbufs.append(outbuf)
                        buf_offset = outbuf.end_offset
            else:
                outbufs.append(buf)

        return TSFrame(
            buffers=outbufs,
            metadata=metadata,
            EOS=frame.EOS,
        )

new(pad)

Produce non-gap buffers if the bit of the data passes the bit mask, otherwise, produce gap buffers.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the source pad to produce TSFrames

required

Returns:

Type Description
TSFrame

TSFrame, the TSFrame carrying buffers with bit mask applied

Source code in sgnligo/transforms/bit_mask.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def new(self, pad: SourcePad) -> TSFrame:
    """Produce non-gap buffers if the bit of the data passes the bit mask,
    otherwise, produce gap buffers.

    Args:
        pad:
            SourcePad, the source pad to produce TSFrames

    Returns:
        TSFrame, the TSFrame carrying buffers with bit mask applied
    """
    frame = self.preparedframes[self.sink_pads[0]]
    metadata = frame.metadata

    outbufs = []
    for buf in frame:
        if not buf.is_gap:
            buf_offset = buf.offset
            sample_rate = buf.sample_rate

            state_flags = [
                state_vector_on_off_bits(b) & self.bit_mask == self.bit_mask
                for b in buf.data
            ]

            if all(state_flags):
                # Pass through
                outbufs.append(buf)
            elif not any(state_flags):
                # Full gap
                buf.set_data(None)
                outbufs.append(buf)
            else:
                for b in buf.data:
                    bit = state_vector_on_off_bits(b)
                    if bit & self.bit_mask == self.bit_mask:
                        outbuf = SeriesBuffer(
                            offset=buf_offset,
                            sample_rate=sample_rate,
                            data=np.array([b]),
                            shape=(1,),
                        )
                    else:
                        outbuf = SeriesBuffer(
                            offset=buf_offset,
                            sample_rate=sample_rate,
                            data=None,
                            shape=(1,),
                        )
                    outbufs.append(outbuf)
                    buf_offset = outbuf.end_offset
        else:
            outbufs.append(buf)

    return TSFrame(
        buffers=outbufs,
        metadata=metadata,
        EOS=frame.EOS,
    )