编写一个简单的 Flink 测试作业,尝试从 Kafka 中读取数据或将数据写入 Kafka。这可以帮助验证 Flink 是否能够与 Kafka 进行基本的通信。

示例代码:读取 Kafka 数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class KafkaReadTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("192.168.23.60:9092")
.setTopics("test_topic")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();

DataStream<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

stream.print(); // Print the data to the console for debugging

env.execute("Kafka Read Test");
}
}

示例代码:写入 Kafka 数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class KafkaWriteTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStream<String> stream = env.fromElements("test message 1", "test message 2");

KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.23.60:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("test_topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();

stream.sinkTo(kafkaSink);

env.execute("Kafka Write Test");
}
}

2. 使用 Kafka 工具检查

使用 Kafka 的工具验证数据是否到达了 Kafka 主题。

  • 生产者测试:使用 Kafka 的 kafka-console-producer 工具向主题写入测试数据,确保数据能够成功写入 Kafka。

    1
    kafka-console-producer.sh --broker-list 192.168.23.60:9092 --topic test_topic
  • 消费者测试:使用 Kafka 的 kafka-console-consumer 工具从主题读取数据,确保数据能够成功读取。

    1
    kafka-console-consumer.sh --bootstrap-server 192.168.23.60:9092 --topic test_topic --from-beginning
  • 在 Flink 的 Web UI 中查看作业的日志,检查是否有任何错误或警告消息。
  • 查看 Flink 作业的详细日志以了解连接 Kafka 的详细情况。

4. 检查网络连接

确保 Flink 集群能够访问 Kafka 服务器:

  • Ping Kafka 服务器:确认网络连接正常。
  • 端口检查:确保 Kafka 服务器的端口(默认是 9092)没有被防火墙阻挡。

5. 调试 Kafka 配置

  • Bootstrap Servers:确保 Kafka 的 bootstrap.servers 配置正确。
  • 主题名称:确保使用的主题名称正确且存在。
  • 消费者组:如果使用消费者组,确保配置正确且没有冲突。

Flink 提供了示例和测试作业,你可以参考官方文档和示例代码来确认是否正确配置。

通过以上步骤,你可以确认 Flink 是否能够成功连接到 Kafka。如果遇到问题,可以提供具体的错误信息和日志,以便进一步分析和解决。