Skip to content

sgnligo.transforms.latency

An element to calculate latency of buffers.

Latency dataclass

Bases: TransformElement

Calculate latency and prepare data into the format expected by the KafkaSink

Parameters:

Name Type Description Default
route Optional[str]

str, the kafka route to send the latency data to

None
interval Optional[float]

float, the interval to calculate latency, in seconds

None
Source code in sgnligo/transforms/latency.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@dataclass
class Latency(TransformElement):
    """Calculate latency and prepare data into the format expected by the KafkaSink

    Args:
        route:
            str, the kafka route to send the latency data to
        interval:
            float, the interval to calculate latency, in seconds
    """

    route: Optional[str] = None
    interval: Optional[float] = None

    def __post_init__(self):
        super().__post_init__()
        assert len(self.sink_pads) == 1
        assert isinstance(self.route, str)
        self.frame = None

        if self.interval is not None:
            self.last_time = now()
            self.latencies = []

    def pull(self, pad, frame):
        self.frame = frame

    def new(self, pad):
        """Calculate buffer latency. Latency is defined as the current time subtracted
        by the buffer start time.
        """

        frame = self.frame
        time = now().ns()
        if isinstance(frame, TSFrame):
            framets = frame.buffers[0].t0
            framete = frame.buffers[-1].end
        elif isinstance(frame, EventFrame):
            framets = next(iter(frame.events.values())).ts
            framete = next(iter(frame.events.values())).te

        latency = (time - framets) / 1_000_000_000

        if self.interval is None:
            event_data = {
                self.route: {
                    "time": [
                        framets / 1_000_000_000,
                    ],
                    "data": [
                        latency,
                    ],
                }
            }
        else:
            self.latencies.append(latency)
            if time / 1e9 - self.last_time >= self.interval:
                event_data = {
                    self.route: {
                        "time": [
                            framets / 1_000_000_000,
                        ],
                        "data": [
                            max(self.latencies),
                        ],
                    }
                }
                self.latencies = []
                self.last_time = time / 1e9
            else:
                event_data = None

        return EventFrame(
            events={"kafka": EventBuffer(ts=framets, te=framete, data=event_data)},
            EOS=frame.EOS,
        )

new(pad)

Calculate buffer latency. Latency is defined as the current time subtracted by the buffer start time.

Source code in sgnligo/transforms/latency.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def new(self, pad):
    """Calculate buffer latency. Latency is defined as the current time subtracted
    by the buffer start time.
    """

    frame = self.frame
    time = now().ns()
    if isinstance(frame, TSFrame):
        framets = frame.buffers[0].t0
        framete = frame.buffers[-1].end
    elif isinstance(frame, EventFrame):
        framets = next(iter(frame.events.values())).ts
        framete = next(iter(frame.events.values())).te

    latency = (time - framets) / 1_000_000_000

    if self.interval is None:
        event_data = {
            self.route: {
                "time": [
                    framets / 1_000_000_000,
                ],
                "data": [
                    latency,
                ],
            }
        }
    else:
        self.latencies.append(latency)
        if time / 1e9 - self.last_time >= self.interval:
            event_data = {
                self.route: {
                    "time": [
                        framets / 1_000_000_000,
                    ],
                    "data": [
                        max(self.latencies),
                    ],
                }
            }
            self.latencies = []
            self.last_time = time / 1e9
        else:
            event_data = None

    return EventFrame(
        events={"kafka": EventBuffer(ts=framets, te=framete, data=event_data)},
        EOS=frame.EOS,
    )