• 使用Testconainers来进行JAVA测试


    在JAVA项目中,我们需要对代码进行单元测试和集成测试。在测试中,我们需要用到一些外部系统如Kafka,数据库等,常用的做法是Mock这些服务或者搭建测试环境。Mock服务不能完全模拟外部系统,而搭建测试环境的工作量又比较大,另外有些时候测试环境会被多个测试项目共享,互相之间会有干扰。

    Testcontainers是一个开源项目,提供了通过加载镜像的服务来提供测试所需要的环境。在官网上我们可以看到现在支持了很多常见的服务,这里我也测试了一下如何用Testcontainers来做单元测试。

    假设我们有以下的类,其提供了一个方法,根据输入的消息主题,开始时间戳和结束时间戳,获取Kafka消息的相关分区以及实际的截至时间戳,从而获取指定时间范围内的消息。代码如下:

    1. public class CheckKafkaMsgTimestamp {
    2. private static final Logger LOG = LoggerFactory.getLogger(CheckKafkaMsgTimestamp.class);
    3. public static KafkaResult getTimestamp(String bootstrapServer, String topic, long startTimestamp, long stopTimestamp) {
    4. long max_timestamp = stopTimestamp;
    5. long max_records = 5L;
    6. Properties props = new Properties();
    7. props.setProperty("bootstrap.servers", bootstrapServer);
    8. props.setProperty("group.id", "test");
    9. props.setProperty("enable.auto.commit", "true");
    10. props.setProperty("auto.commit.interval.ms", "1000");
    11. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    12. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    13. KafkaConsumer consumer = new KafkaConsumer<>(props);
    14. // Get all the partitions of the topic
    15. int partition_num = consumer.partitionsFor(topic).size();
    16. HashMap search_map = new HashMap<>();
    17. ArrayList tp = new ArrayList<>();
    18. for (int i=0;i
    19. search_map.put(new TopicPartition(topic, i), stopTimestamp);
    20. tp.add(new TopicPartition(topic, i));
    21. }
    22. // Check if message exist with timestamp greater than search timestamp
    23. Boolean flag = true;
    24. ArrayList selected_tp = new ArrayList<>();
    25. Map results = consumer.offsetsForTimes(search_map);
    26. for (Map.Entry entry : results.entrySet()) {
    27. OffsetAndTimestamp value = entry.getValue();
    28. if (value==null) { //there is at least one partition don't have timestamp greater or equal to the stopTime
    29. flag = false;
    30. break;
    31. }
    32. }
    33. // Get the latest timestamp of all partitions if the above check result is false
    34. // Note the timestamp is the earliest of all the partitions.
    35. if (!flag) {
    36. max_timestamp = 0L;
    37. consumer.assign(tp);
    38. Map endoffsets = consumer.endOffsets(tp);
    39. for (Map.Entry entry : endoffsets.entrySet()) {
    40. Long temp_timestamp = 0L;
    41. int record_count = 0;
    42. TopicPartition t = entry.getKey();
    43. long offset = entry.getValue();
    44. if (offset < 1) {
    45. LOG.warn("Can not get max_timestamp as partition has no record!");
    46. continue;
    47. }
    48. consumer.assign(Arrays.asList(t));
    49. consumer.seek(t, offset>max_records?offset-5:0);
    50. Iterator> records = consumer.poll(Duration.ofSeconds(2)).iterator();
    51. while (records.hasNext()) {
    52. record_count++;
    53. ConsumerRecord record = records.next();
    54. LOG.info("Topic: {}, Record Timestamp: {}, recordcount: {}", t, record.timestamp(), record_count);
    55. if (temp_timestamp == 0L || record.timestamp() > temp_timestamp) {
    56. temp_timestamp = record.timestamp();
    57. }
    58. }
    59. if (temp_timestamp > 0L && temp_timestamp > startTimestamp) {
    60. if (max_timestamp == 0L || max_timestamp > temp_timestamp) {
    61. max_timestamp = temp_timestamp;
    62. }
    63. selected_tp.add(t);
    64. }
    65. }
    66. } else {
    67. selected_tp = tp;
    68. }
    69. consumer.close();
    70. LOG.info("Max Timestamp: {}", max_timestamp);
    71. return new KafkaResult(max_timestamp, selected_tp);
    72. }
    73. }

    现在我们要对这个功能做单元测试,可以用Testcontainers来起一个Kafka环境。首先我们在pom.xml里面增加以下依赖

    1. <dependency>
    2. <groupId>org.testcontainersgroupId>
    3. <artifactId>testcontainersartifactId>
    4. <version>1.19.1version>
    5. <scope>testscope>
    6. dependency>
    7. <dependency>
    8. <groupId>org.testcontainersgroupId>
    9. <artifactId>kafkaartifactId>
    10. <version>1.19.1version>
    11. <scope>testscope>
    12. dependency>
    13. <dependency>
    14. <groupId>junitgroupId>
    15. <artifactId>junitartifactId>
    16. <version>4.13.2version>
    17. dependency>
    18. <dependency>
    19. <groupId>org.hamcrestgroupId>
    20. <artifactId>hamcrest-allartifactId>
    21. <version>1.3version>
    22. <scope>testscope>
    23. dependency>

    然后编写以下的测试代码:

    1. @RunWith(JUnit4.class)
    2. @FixMethodOrder(MethodSorters.NAME_ASCENDING)
    3. public class CheckKafkaMsgTimestampTest {
    4. private String bootstrapServer;
    5. private Producer producer;
    6. @Rule
    7. public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
    8. @Before
    9. public void setUp() {
    10. kafka.start();
    11. bootstrapServer = kafka.getBootstrapServers();
    12. // Create a test topic with 3 partitions
    13. Properties adminProps = new Properties();
    14. adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    15. Admin admin = Admin.create(adminProps);
    16. int partitions = 3;
    17. short replicationFactor = 1;
    18. NewTopic newTopic = new NewTopic("test", partitions, replicationFactor);
    19. CreateTopicsResult result = admin.createTopics(
    20. Collections.singleton(newTopic)
    21. );
    22. try {
    23. KafkaFuture future = result.values().get("test");
    24. future.get();
    25. } catch (Exception e) {
    26. System.out.println(e.getMessage());
    27. }
    28. // Create a producer
    29. Properties props = new Properties();
    30. props.put("bootstrap.servers", bootstrapServer);
    31. props.put("acks", "all");
    32. props.put("retries", 0);
    33. props.put("linger.ms", 1);
    34. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    35. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    36. producer = new KafkaProducer<>(props);
    37. }
    38. @After
    39. public void tearDown() {
    40. producer.close();
    41. kafka.stop();
    42. }
    43. @Test
    44. public void testGetTimestamp() {
    45. // Prepare 6 messages to send to test topic
    46. // Each partition will receive 2 message
    47. long ts = System.currentTimeMillis()-5000L;
    48. producer.send(new ProducerRecord("test", 0, ts+100, "", "test message"));
    49. producer.send(new ProducerRecord("test", 0, ts+300, "", "test message"));
    50. producer.send(new ProducerRecord("test", 1, ts+110, "", "test message"));
    51. producer.send(new ProducerRecord("test", 1, ts+200, "", "test message"));
    52. producer.send(new ProducerRecord("test", 2, ts+105, "", "test message"));
    53. producer.send(new ProducerRecord("test", 2, ts+250, "", "test message"));
    54. KafkaResult result = CheckKafkaMsgTimestamp.getTimestamp(bootstrapServer, "test", ts, System.currentTimeMillis());
    55. assertEquals(ts+200, result.max_timestamp);
    56. assertEquals(3, result.selected_tp.size());
    57. result = CheckKafkaMsgTimestamp.getTimestamp(bootstrapServer, "test", ts+500, System.currentTimeMillis());
    58. assertEquals(0, result.max_timestamp);
    59. assertEquals(0, result.selected_tp.size());
    60. result = CheckKafkaMsgTimestamp.getTimestamp(bootstrapServer, "test", ts+205, System.currentTimeMillis());
    61. assertEquals(ts+250, result.max_timestamp);
    62. assertEquals(2, result.selected_tp.size());
    63. }
    64. }

    解释一下代码,在Rule里面我们用Testcontainer加载了一个Kafka的镜像,然后在Before里面我们启动了Kafka服务,并获取bootstrapserver的地址,然后我们就可以在Kafka里面创建一个包含3个分区的消息主题。

    在testGetTimestamp方法中,我们先发送了几条消息到不同的消息分区,然后就可以调用CheckKafkaMsgTimestamp的方法来进行测试和验证了。

    如果我们有多个测试要用到同一个Kafka,那么还可以创建一个抽象类,把Kafka的加载定义在这个抽象类中,其他测试类继承这个抽象类。例如

    1. public abstract class AbstractContainerBaseTest {
    2. public static final KafkaContainer KAFKA_CONTAINER;
    3. static {
    4. KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));;
    5. KAFKA_CONTAINER.start();
    6. }
    7. }

    然后改写一下刚才我们的测试类

    1. public class CheckKafkaMsgTimestampTest extends AbstractContainerBaseTest {
    2. //@Rule
    3. //public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
    4. @Before
    5. public void setUp() {
    6. //kafka.start();
    7. //bootstrapServer = kafka.getBootstrapServers();
    8. bootstrapServer = KAFKA_CONTAINER.getBootstrapServers();

     

  • 相关阅读:
    Spark SQL内置函数
    C# 图片的绘制
    《每日一题》NO.46:何为WAT/CP/FT?
    60. UE5 RPG 使用场景查询系统(EQS,Environment Query System)实现远程敌人寻找攻击位置
    中医自学平台---前端网站
    免费小程序商城搭建之b2b2c o2o 多商家入驻商城 直播带货商城 电子商务b2b2c o2o 多商家入驻商城 直播带货商城 电子商务
    adb详细教程(四)-使用adb启动应用、关闭应用、清空应用数据、获取设备已安装应用列表
    星乐园项目┃助学无止境·探访暖人心
    ClickHouse 语法优化规则
    【原创】java+swing+mysql爱心捐赠管理系统设计与实现
  • 原文地址:https://blog.csdn.net/gzroy/article/details/133961391