Skip to content

sgnligo.sinks.kafka_sink

A sink element to send data to kafka topics.

KafkaSink dataclass

Bases: SinkElement

Send data to kafka topics

Parameters:

Name Type Description Default
output_kafka_server Optional[str]

str, The kafka server to write data to

None
time_series_topics Optional[list[str]]

list[str], The kafka topics to write time-series data to

None
trigger_topics Optional[list[str]]

list[str], The kafka topics to write trigger data to

None
tag Optional[list[str]]

str, The tag to write the kafka data with

None
prefix str

str, The prefix of the kafka topic

''
interval Optional[float]

int, The interval at which to write the data to kafka

None
Source code in sgnligo/sinks/kafka_sink.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@dataclass
class KafkaSink(SinkElement):
    """Send data to kafka topics

    Args:
        output_kafka_server:
            str, The kafka server to write data to
        time_series_topics:
            list[str], The kafka topics to write time-series data to
        trigger_topics:
            list[str], The kafka topics to write trigger data to
        tag:
            str, The tag to write the kafka data with
        prefix:
            str, The prefix of the kafka topic
        interval:
            int, The interval at which to write the data to kafka
    """

    output_kafka_server: Optional[str] = None
    time_series_topics: Optional[list[str]] = None
    trigger_topics: Optional[list[str]] = None
    tag: Optional[list[str]] = None
    prefix: str = ""
    interval: Optional[float] = None

    def __post_init__(self):
        assert isinstance(self.output_kafka_server, str)
        super().__post_init__()

        self.client = kafka.Client("kafka://{}".format(self.output_kafka_server))
        if self.tag is None:
            self.tag = []

        if self.time_series_topics is not None:
            self.time_series_data = {}
            for topic in self.time_series_topics:
                self.time_series_data[topic] = {"time": [], "data": []}
        else:
            self.time_series_data = None

        if self.trigger_topics is not None:
            self.trigger_data = {}
            for topic in self.trigger_topics:
                self.trigger_data[topic] = []
        else:
            self.trigger_data = None

        self.last_sent = now()

    def write(self):
        if self.time_series_data is not None:
            for topic, data in self.time_series_data.items():
                if len(data["time"]) > 0:
                    self.client.write(self.prefix + topic, data, tags=self.tag)
                    self.time_series_data[topic] = {"time": [], "data": []}

        if self.trigger_data is not None:
            for topic, data in self.trigger_data.items():
                if len(data) > 0:
                    self.client.write(self.prefix + topic, data, tags=self.tag)
                    self.trigger_data[topic] = []

    def pull(self, pad, frame):
        """Incoming frames are expected to be an EventFrame containing {"kafka":
        EventBuffer}. The data in the EventBuffer are expected to in the format of
        {topic: {"time": [t1, t2, ...], "data": [d1, d2, ...]}}
        """
        events = frame["kafka"].data
        if events is not None:
            for topic, data in events.items():
                if (
                    self.time_series_topics is not None
                    and topic in self.time_series_topics
                ):
                    self.time_series_data[topic]["time"].extend(data["time"])
                    self.time_series_data[topic]["data"].extend(data["data"])
                elif self.trigger_topics is not None and topic in self.trigger_topics:
                    self.trigger_data[topic].extend(data)

        if frame.EOS:
            self.mark_eos(pad)

    def internal(self):
        if self.interval is None:
            # Don't wait
            self.write()
        else:
            time_now = now()
            if time_now - self.last_sent > self.interval:
                self.write()
                self.last_sent = time_now

        if self.at_eos:
            print("shutdown: KafkaSink: close")
            self.client.close()

pull(pad, frame)

Incoming frames are expected to be an EventFrame containing {"kafka": EventBuffer}. The data in the EventBuffer are expected to in the format of {topic: {"time": [t1, t2, ...], "data": [d1, d2, ...]}}

Source code in sgnligo/sinks/kafka_sink.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def pull(self, pad, frame):
    """Incoming frames are expected to be an EventFrame containing {"kafka":
    EventBuffer}. The data in the EventBuffer are expected to in the format of
    {topic: {"time": [t1, t2, ...], "data": [d1, d2, ...]}}
    """
    events = frame["kafka"].data
    if events is not None:
        for topic, data in events.items():
            if (
                self.time_series_topics is not None
                and topic in self.time_series_topics
            ):
                self.time_series_data[topic]["time"].extend(data["time"])
                self.time_series_data[topic]["data"].extend(data["data"])
            elif self.trigger_topics is not None and topic in self.trigger_topics:
                self.trigger_data[topic].extend(data)

    if frame.EOS:
        self.mark_eos(pad)