skaiuijing

前言

在之前,我们使用eBPF为Linux内核网络子系统编写程序,实现了流量在ip协议、进程、进程组、网卡等级别的观察及流量控制,为了实现分布式观察及控制,我们需要采集内核网络信息并推送。

在内核中,采集的信息会通过ringbuf_poll等操作递交给应用层,我们可以将数据转化为json格式并进行推送。

这样就实现了解耦合,并且可以实现分布式应用。

我们要用到的消息队列框架就是Kafka。

其实也可以使用Prometheus这些框架,但是笔者还要重新编写http端口,而且也并不是很适配,毕竟笔者的重点在于网络流量控制及分析,所以还是重新写一个前端比较好。

还有一个原因是,笔者最近有点烦躁,找点事情做分散一下注意力。

代码

进入到wirefisher文件夹目录,

在config.yml文件中更改配置:

1
2
3
kafka:
brokers: "你的ip地址:9092"
topic: "wirefisher.flow"

在事件中,将我们采集的信息转为json,然后调用send方法,发送给对应的topic即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
nlohmann::json j = {
{"src_ip", ip_to_string(e->tuple.src_ip)},
{"dst_ip", ip_to_string(e->tuple.dst_ip)},
{"src_port", ntohs(e->tuple.src_port)},
{"dst_port", ntohs(e->tuple.dst_port)},
{"protocol", protocol_to_string(e->tuple.protocol)},
{"instant_rate_bps", e->instance_rate_bps / (1024.0 * 1024.0)},
{"rate_bps", e->rate_bps / (1024.0 * 1024.0)},
{"peak_rate_bps", e->peak_rate_bps / (1024.0 * 1024.0)},
{"smoothed_rate_bps", e->smoothed_rate_bps / (1024.0 * 1024.0)},
{"timestamp", format_elapsed_ns(e->timestamp)}
};

if (g_producer) {
g_producer->send("", j.dump());
}

使用版本

Apache Kafka

选择合适的版本,在官网下载后,我们可以直接使用,不过有一些组件我们需要了解。

ZooKeeper(默认端口 2181)

  • Kafka 早期版本依赖 ZooKeeper 来做集群的协调者,可以理解为:ZooKeeper 是 Kafka 的记录与选举者。
  • 它负责:
    • 记录有哪些 broker 存活。
    • 哪个 broker 是某个 topic 分区的 leader。
    • 保存一些元数据(比如 ACL、配置信息)。

Kafka Broker(默认端口 9092)

  • 作用:真正存储和转发消息的服务进程。
  • 每个 broker 启动时会去 ZooKeeper 注册自己,例如:“我是 broker-0,我监听在 192.168.253.112:9092”。
  • 客户端(Producer/Consumer)只要连上任意一个 broker,就能拿到整个集群的元数据。

Kafka Producer

  • 作用:生产消息,写入 Kafka,把采集到的流量 JSON 推送到 Kafka 的topic

Kafka Consumer

  • 作用:消费消息,从 Kafka 拉取数据。
  • Windows 上的 kafka-console-consumer.bat 或 Spring Boot 应用都是Consumer。

数据处理

  1. wirefisher → Kafka Broker
    • wirefisher 在 Ubuntu 上运行,作为 Producer,把 JSON 消息写入 Kafka。
    • 它连接到 192.168.253.112:9092,Broker 接收并存储消息。
  2. Broker → ZooKeeper
    • Broker 启动时会向 ZooKeeper 注册自己。
    • ZooKeeper 记录“broker-0 在 192.168.253.112:9092”。
    • 如果有多个 broker,ZooKeeper 还会帮忙选举 leader。
  3. Windows Consumer → Kafka Broker
    • Windows 上的 Consumer 用 --bootstrap-server 192.168.253.112:9092 连接 Kafka。
    • Broker 返回元数据:“wirefisher.flow 的 leader 在我这里”。
    • Consumer 就直接从 Broker 拉取消息。
  4. 数据到达 Windows
    • 消息通过 TCP 9092 从 Ubuntu 虚拟机传输到 Windows 主机。
    • Windows Consumer 打印出 JSON。

步骤

这里的192.168.253.112是笔者虚拟机的ip,读者可以修改为自己的。

进入kafka文件夹,修改server.properties文件配置:

1
2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.253.112:9092

在这里,可以启动wirefisher:

1
sudo ./wirefisher

在启动kafka之前,我们可以停止之前的进程(如果你不是第一次执行的话):

1
2
bin/kafka-server-stop.sh
ps aux | grep kafka

然后启动zookeeper:

1
bin/zookeeper-server-start.sh config/zookeeper.properties

image-20250930210513286

接着启动服务:

1
bin/kafka-server-start.sh config/server.properties

image-20250930210746626

在Windows终端执行:

1
.\bin\windows\kafka-console-consumer.bat --topic wirefisher.flow --bootstrap-server 192.168.253.112:9092 --from-beginning

我们可以看见,我们的数据已经成功被推送到了Windows:

image-20250930210358133

结语

可以看到,我们的数据已经被推送到Windows主机,现在,我们可以编写web前后端了。

打开idea,开始编写java代码。

我们使用springboot框架,直接与kafka无缝衔接,在这里,我们需要使用数据库存储数据、分析并处理不同数据(毕竟是面向分布式场景),然后将信息推送到web前端。

不过为了简洁起见,笔者目前仅仅是将数据推送到前端。

其实用Prometheus这些也挺好的,不过笔者懒得重写应用层并编写对应的http接口了。