在JAVA项目中,我们需要对代码进行单元测试和集成测试。在测试中,我们需要用到一些外部系统如Kafka,数据库等,常用的做法是Mock这些服务或者搭建测试环境。Mock服务不能完全模拟外部系统,而搭建测试环境的工作量又比较大,另外有些时候测试环境会被多个测试项目共享,互相之间会有干扰。
Testcontainers是一个开源项目,提供了通过加载镜像的服务来提供测试所需要的环境。在官网上我们可以看到现在支持了很多常见的服务,这里我也测试了一下如何用Testcontainers来做单元测试。
假设我们有以下的类,其提供了一个方法,根据输入的消息主题,开始时间戳和结束时间戳,获取Kafka消息的相关分区以及实际的截至时间戳,从而获取指定时间范围内的消息。代码如下:
- public class CheckKafkaMsgTimestamp {
- private static final Logger LOG = LoggerFactory.getLogger(CheckKafkaMsgTimestamp.class);
-
- public static KafkaResult getTimestamp(String bootstrapServer, String topic, long startTimestamp, long stopTimestamp) {
- long max_timestamp = stopTimestamp;
- long max_records = 5L;
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", bootstrapServer);
- props.setProperty("group.id", "test");
- props.setProperty("enable.auto.commit", "true");
- props.setProperty("auto.commit.interval.ms", "1000");
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer
consumer = new KafkaConsumer<>(props); -
- // Get all the partitions of the topic
- int partition_num = consumer.partitionsFor(topic).size();
- HashMap
search_map = new HashMap<>(); - ArrayList
tp = new ArrayList<>(); - for (int i=0;i
- search_map.put(new TopicPartition(topic, i), stopTimestamp);
- tp.add(new TopicPartition(topic, i));
- }
- // Check if message exist with timestamp greater than search timestamp
- Boolean flag = true;
- ArrayList
selected_tp = new ArrayList<>(); - Map
results = consumer.offsetsForTimes(search_map); - for (Map.Entry
entry : results.entrySet()) { - OffsetAndTimestamp value = entry.getValue();
- if (value==null) { //there is at least one partition don't have timestamp greater or equal to the stopTime
- flag = false;
- break;
- }
- }
- // Get the latest timestamp of all partitions if the above check result is false
- // Note the timestamp is the earliest of all the partitions.
- if (!flag) {
- max_timestamp = 0L;
- consumer.assign(tp);
- Map
endoffsets = consumer.endOffsets(tp); - for (Map.Entry
entry : endoffsets.entrySet()) { - Long temp_timestamp = 0L;
- int record_count = 0;
- TopicPartition t = entry.getKey();
- long offset = entry.getValue();
- if (offset < 1) {
- LOG.warn("Can not get max_timestamp as partition has no record!");
- continue;
- }
- consumer.assign(Arrays.asList(t));
- consumer.seek(t, offset>max_records?offset-5:0);
-
- Iterator
> records = consumer.poll(Duration.ofSeconds(2)).iterator(); - while (records.hasNext()) {
- record_count++;
- ConsumerRecord
record = records.next(); - LOG.info("Topic: {}, Record Timestamp: {}, recordcount: {}", t, record.timestamp(), record_count);
- if (temp_timestamp == 0L || record.timestamp() > temp_timestamp) {
- temp_timestamp = record.timestamp();
- }
- }
- if (temp_timestamp > 0L && temp_timestamp > startTimestamp) {
- if (max_timestamp == 0L || max_timestamp > temp_timestamp) {
- max_timestamp = temp_timestamp;
- }
- selected_tp.add(t);
- }
- }
- } else {
- selected_tp = tp;
- }
- consumer.close();
- LOG.info("Max Timestamp: {}", max_timestamp);
- return new KafkaResult(max_timestamp, selected_tp);
- }
- }
现在我们要对这个功能做单元测试,可以用Testcontainers来起一个Kafka环境。首先我们在pom.xml里面增加以下依赖
- <dependency>
- <groupId>org.testcontainersgroupId>
- <artifactId>testcontainersartifactId>
- <version>1.19.1version>
- <scope>testscope>
- dependency>
- <dependency>
- <groupId>org.testcontainersgroupId>
- <artifactId>kafkaartifactId>
- <version>1.19.1version>
- <scope>testscope>
- dependency>
- <dependency>
- <groupId>junitgroupId>
- <artifactId>junitartifactId>
- <version>4.13.2version>
- dependency>
- <dependency>
- <groupId>org.hamcrestgroupId>
- <artifactId>hamcrest-allartifactId>
- <version>1.3version>
- <scope>testscope>
- dependency>
然后编写以下的测试代码:
- @RunWith(JUnit4.class)
- @FixMethodOrder(MethodSorters.NAME_ASCENDING)
- public class CheckKafkaMsgTimestampTest {
- private String bootstrapServer;
- private Producer
producer; -
- @Rule
- public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
-
- @Before
- public void setUp() {
- kafka.start();
- bootstrapServer = kafka.getBootstrapServers();
- // Create a test topic with 3 partitions
- Properties adminProps = new Properties();
- adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
- Admin admin = Admin.create(adminProps);
- int partitions = 3;
- short replicationFactor = 1;
- NewTopic newTopic = new NewTopic("test", partitions, replicationFactor);
- CreateTopicsResult result = admin.createTopics(
- Collections.singleton(newTopic)
- );
- try {
- KafkaFuture
future = result.values().get("test"); - future.get();
- } catch (Exception e) {
- System.out.println(e.getMessage());
- }
-
- // Create a producer
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServer);
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("linger.ms", 1);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- producer = new KafkaProducer<>(props);
- }
-
- @After
- public void tearDown() {
- producer.close();
- kafka.stop();
- }
-
- @Test
- public void testGetTimestamp() {
- // Prepare 6 messages to send to test topic
- // Each partition will receive 2 message
- long ts = System.currentTimeMillis()-5000L;
- producer.send(new ProducerRecord
("test", 0, ts+100, "", "test message")); - producer.send(new ProducerRecord
("test", 0, ts+300, "", "test message")); - producer.send(new ProducerRecord
("test", 1, ts+110, "", "test message")); - producer.send(new ProducerRecord
("test", 1, ts+200, "", "test message")); - producer.send(new ProducerRecord
("test", 2, ts+105, "", "test message")); - producer.send(new ProducerRecord
("test", 2, ts+250, "", "test message")); -
- KafkaResult result = CheckKafkaMsgTimestamp.getTimestamp(bootstrapServer, "test", ts, System.currentTimeMillis());
- assertEquals(ts+200, result.max_timestamp);
- assertEquals(3, result.selected_tp.size());
-
- result = CheckKafkaMsgTimestamp.getTimestamp(bootstrapServer, "test", ts+500, System.currentTimeMillis());
- assertEquals(0, result.max_timestamp);
- assertEquals(0, result.selected_tp.size());
-
- result = CheckKafkaMsgTimestamp.getTimestamp(bootstrapServer, "test", ts+205, System.currentTimeMillis());
- assertEquals(ts+250, result.max_timestamp);
- assertEquals(2, result.selected_tp.size());
- }
- }
解释一下代码,在Rule里面我们用Testcontainer加载了一个Kafka的镜像,然后在Before里面我们启动了Kafka服务,并获取bootstrapserver的地址,然后我们就可以在Kafka里面创建一个包含3个分区的消息主题。
在testGetTimestamp方法中,我们先发送了几条消息到不同的消息分区,然后就可以调用CheckKafkaMsgTimestamp的方法来进行测试和验证了。
如果我们有多个测试要用到同一个Kafka,那么还可以创建一个抽象类,把Kafka的加载定义在这个抽象类中,其他测试类继承这个抽象类。例如
- public abstract class AbstractContainerBaseTest {
- public static final KafkaContainer KAFKA_CONTAINER;
-
- static {
- KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));;
- KAFKA_CONTAINER.start();
- }
- }
然后改写一下刚才我们的测试类
- public class CheckKafkaMsgTimestampTest extends AbstractContainerBaseTest {
- //@Rule
- //public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
-
- @Before
- public void setUp() {
- //kafka.start();
- //bootstrapServer = kafka.getBootstrapServers();
- bootstrapServer = KAFKA_CONTAINER.getBootstrapServers();