1. 配置文件
spring. kafka. bootstrap- servers= CentOSA : 9092 , CentOSB : 9092 , CentOSC : 9092
spring. kafka. producer. retries= 5
spring. kafka. producer. acks= all
spring. kafka. producer. batch- size= 16384
spring. kafka. producer. buffer- memory= 33554432
spring. kafka. producer. key- serializer= org. apache. kafka. common. serialization. StringSerializer
spring. kafka. producer. value- serializer= org. apache. kafka. common. serialization. StringSerializer
spring. kafka. producer. properties. enable. idempotence= true
spring. kafka. producer. transaction- id- prefix= transaction- id-
spring. kafka. consumer. group- id= group1
spring. kafka. consumer. auto- offset- reset= earliest
spring. kafka. consumer. enable- auto- commit= true
spring. kafka. consumer. auto- commit- interval= 100
spring. kafka. consumer. properties. isolation. level= read_committed
spring. kafka. consumer. key- deserializer= org. apache. kafka. common. serialization. StringDeserializer
spring. kafka. consumer. value- deserializer= org. apache. kafka. common. serialization. StringDeserializer
spring. kafka. streams. application- id= wordcount_id
spring. kafka. streams. client- id= app1
spring. kafka. streams. auto- startup= true
spring. kafka. streams. state- dir= / Users / admin/ Desktop / checkpoint
spring. kafka. streams. replication- factor= 1
spring. kafka. streams. properties. processing. guarantee= exactly_once
2. logback.xml
< configuration>
< appender name = " STDOUT" class = " ch.qos.logback.core.ConsoleAppender" >
< encoder>
< pattern> %p %d{yyyy-MM-dd HH:mm:ss} - %m%n pattern>
< charset> UTF-8 charset>
< root level = " ERROR" >
< appender-ref ref = " STDOUT" />
< logger name = " org.springframework.kafka" level = " INFO" additivity = " false" >
< appender-ref ref = " STDOUT" />
< logger name = " org.springframework.kafka.transaction" level = " debug" additivity = " false" >
< appender-ref ref = " STDOUT" />
3. application
package com. baizhi ;
import org. apache. kafka. clients. consumer. ConsumerRecord ;
import org. apache. kafka. common. serialization. Serdes ;
import org. apache. kafka. common. utils. Bytes ;
import org. apache. kafka. streams. StreamsBuilder ;
import org. apache. kafka. streams. kstream. * ;
import org. apache. kafka. streams. state. KeyValueStore ;
import org. springframework. boot. SpringApplication ;
import org. springframework. boot. autoconfigure. SpringBootApplication ;
import org. springframework. context. annotation. Bean ;
import org. springframework. kafka. annotation. EnableKafka ;
import org. springframework. kafka. annotation. EnableKafkaStreams ;
import org. springframework. kafka. annotation. KafkaListener ;
import org. springframework. kafka. annotation. KafkaListeners ;
import org. springframework. messaging. handler. annotation. SendTo ;
import java. io. IOException ;
import java. util. Arrays ;
import java. util. stream. Collectors ;
public class KafkaSpringBootApplication {
public static void main ( String [ ] args) throws IOException {
SpringApplication . run ( KafkaSpringBootApplication . class , args) ;
System . in. read ( ) ;
@KafkaListeners ( value = { @KafkaListener ( topics = { "topic04" } ) } )
@SendTo ( value = { "topic05" } )
public String listenner ( ConsumerRecord < ? , ? > cr) {
return cr. value ( ) + " mashibing edu" ;
public KStream < String , String > kStream ( StreamsBuilder kStreamBuilder) {
KStream < String , String > stream = kStreamBuilder. stream (
"topic02" ,
Consumed . with ( Serdes. String ( ) ,
Serdes. String ( ) ) ) ;
stream. flatMapValues ( new ValueMapper < String , Iterable < String > > ( ) {
public Iterable < String > apply ( String s) {
return Arrays . stream ( s. split ( " " ) ) . collect ( Collectors . toList ( ) ) ;
} )
. selectKey ( ( k, v) -> v)
. groupByKey ( Serialized . with ( Serdes. String ( ) , Serdes. String ( ) ) )
. count ( Materialized . <String , Long , KeyValueStore < Bytes , byte [ ] >> as ( "wordcount" ) )
. toStream ( )
. print ( Printed . toSysOut ( ) ) ;
return stream;
package com. baizhi. service ;
public interface IOrderService {
public void saveOrder ( String id, Object message) ;
package com. baizhi. service. impl ;
import com. baizhi. service. IOrderService ;
import org. apache. kafka. clients. producer. ProducerRecord ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. stereotype. Service ;
import org. springframework. transaction. annotation. Transactional ;
public class OrderService implements IOrderService {
private KafkaTemplate kafkaTemplate;
public void saveOrder ( String id, Object message) {
kafkaTemplate. send ( new ProducerRecord ( "topic04" , id, message) ) ;
package com. baizhi. tests ;
import com. baizhi. KafkaSpringBootApplication ;
import com. baizhi. service. IOrderService ;
import org. apache. kafka. clients. producer. ProducerRecord ;
import org. junit. Test ;
import org. junit. runner. RunWith ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. boot. test. context. SpringBootTest ;
import org. springframework. kafka. core. KafkaOperations ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. test. context. junit4. SpringRunner ;
import org. springframework. transaction. annotation. Propagation ;
import org. springframework. transaction. annotation. Transactional ;
import java. util. Date ;
import java. util. zip. DataFormatException ;
@SpringBootTest ( classes = { KafkaSpringBootApplication . class } )
@RunWith ( SpringRunner . class )
public class KafkaTempolateTests {
private KafkaTemplate kafkaTemplate;
private IOrderService orderService;
public void testOrderService ( ) {
orderService. saveOrder ( "001" , "baizhi edu " ) ;
public void testKafkaTemplate ( ) {
kafkaTemplate. executeInTransaction ( new KafkaOperations. OperationsCallback ( ) {
public Object doInOperations ( KafkaOperations kafkaOperations) {
return kafkaOperations. send ( new ProducerRecord ( "topic01" , "002" , "this is a demo" ) ) ;
} ) ;
