wirefisher:kafka推送数据
skaiuijing
前言
在之前,我们使用eBPF为Linux内核网络子系统编写程序,实现了流量在ip协议、进程、进程组、网卡等级别的观察及流量控制,为了实现分布式观察及控制,我们需要采集内核网络信息并推送。
在内核中,采集的信息会通过ringbuf_poll等操作递交给应用层,我们可以将数据转化为json格式并进行推送。
这样就实现了解耦合,并且可以实现分布式应用。
我们要用到的消息队列框架就是Kafka。
其实也可以使用Prometheus这些框架,但是笔者还要重新编写http端口,而且也并不是很适配,毕竟笔者的重点在于网络流量控制及分析,所以还是重新写一个前端比较好。
还有一个原因是,笔者最近有点烦躁,找点事情做分散一下注意力。
代码
进入到wirefisher文件夹目录,
在config.yml文件中更改配置:
1 | kafka: |
在事件中,将我们采集的信息转为json,然后调用send方法,发送给对应的topic即可:
1 | nlohmann::json j = { |
使用版本
选择合适的版本,在官网下载后,我们可以直接使用,不过有一些组件我们需要了解。
ZooKeeper(默认端口 2181)
- Kafka 早期版本依赖 ZooKeeper 来做集群的协调者,可以理解为:ZooKeeper 是 Kafka 的记录与选举者。
- 它负责:
- 记录有哪些 broker 存活。
- 哪个 broker 是某个 topic 分区的 leader。
- 保存一些元数据(比如 ACL、配置信息)。
Kafka Broker(默认端口 9092)
- 作用:真正存储和转发消息的服务进程。
- 每个 broker 启动时会去 ZooKeeper 注册自己,例如:“我是 broker-0,我监听在 192.168.253.112:9092”。
- 客户端(Producer/Consumer)只要连上任意一个 broker,就能拿到整个集群的元数据。
Kafka Producer
- 作用:生产消息,写入 Kafka,把采集到的流量 JSON 推送到 Kafka 的topic
Kafka Consumer
- 作用:消费消息,从 Kafka 拉取数据。
- Windows 上的
kafka-console-consumer.bat或 Spring Boot 应用都是Consumer。
数据处理
- wirefisher → Kafka Broker
- wirefisher 在 Ubuntu 上运行,作为 Producer,把 JSON 消息写入 Kafka。
- 它连接到
192.168.253.112:9092,Broker 接收并存储消息。
- Broker → ZooKeeper
- Broker 启动时会向 ZooKeeper 注册自己。
- ZooKeeper 记录“broker-0 在 192.168.253.112:9092”。
- 如果有多个 broker,ZooKeeper 还会帮忙选举 leader。
- Windows Consumer → Kafka Broker
- Windows 上的 Consumer 用
--bootstrap-server 192.168.253.112:9092连接 Kafka。 - Broker 返回元数据:“
wirefisher.flow的 leader 在我这里”。 - Consumer 就直接从 Broker 拉取消息。
- Windows 上的 Consumer 用
- 数据到达 Windows
- 消息通过 TCP 9092 从 Ubuntu 虚拟机传输到 Windows 主机。
- Windows Consumer 打印出 JSON。
步骤
这里的192.168.253.112是笔者虚拟机的ip,读者可以修改为自己的。
进入kafka文件夹,修改server.properties文件配置:
1 | listeners=PLAINTEXT://0.0.0.0:9092 |
在这里,可以启动wirefisher:
1 | sudo ./wirefisher |
在启动kafka之前,我们可以停止之前的进程(如果你不是第一次执行的话):
1 | bin/kafka-server-stop.sh |
然后启动zookeeper:
1 | bin/zookeeper-server-start.sh config/zookeeper.properties |

接着启动服务:
1 | bin/kafka-server-start.sh config/server.properties |

在Windows终端执行:
1 | .\bin\windows\kafka-console-consumer.bat --topic wirefisher.flow --bootstrap-server 192.168.253.112:9092 --from-beginning |
我们可以看见,我们的数据已经成功被推送到了Windows:

结语
可以看到,我们的数据已经被推送到Windows主机,现在,我们可以编写web前后端了。
打开idea,开始编写java代码。
我们使用springboot框架,直接与kafka无缝衔接,在这里,我们需要使用数据库存储数据、分析并处理不同数据(毕竟是面向分布式场景),然后将信息推送到web前端。
不过为了简洁起见,笔者目前仅仅是将数据推送到前端。
其实用Prometheus这些也挺好的,不过笔者懒得重写应用层并编写对应的http接口了。