• flink测试redis sink报错


    具体报错

    1    [Source: Custom Source -> Sink: Unnamed (1/1)#0] ERROR org.apache.flink.streaming.connectors.redis.RedisSink  - Redis has not been properly initialized: 
    redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
    	at redis.clients.util.Pool.getResource(Pool.java:50)
    	at redis.clients.jedis.JedisPool.getResource(JedisPool.java:99)
    	at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.getInstance(RedisContainer.java:250)
    	at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.open(RedisContainer.java:85)
    	at org.apache.flink.streaming.connectors.redis.RedisSink.open(RedisSink.java:174)
    	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    	at java.lang.Thread.run(Thread.java:750)
    Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect
    	at redis.clients.jedis.Connection.connect(Connection.java:164)
    	at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:80)
    	at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1676)
    	at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:87)
    	at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)
    	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
    	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
    	at redis.clients.util.Pool.getResource(Pool.java:48)
    	... 14 more
    Caused by: java.net.ConnectException: Connection refused: connect
    	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    	at java.net.Socket.connect(Socket.java:607)
    	at redis.clients.jedis.Connection.connect(Connection.java:158)
    	... 21 more
    Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
    	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
    	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
    	at akka.dispatch.OnComplete.internal(Future.scala:264)
    	at akka.dispatch.OnComplete.internal(Future.scala:261)
    	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
    	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
    	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
    	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
    	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
    	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
    	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
    	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
    	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
    	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
    	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
    	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
    	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
    	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
    	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
    	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
    	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
    	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at akka.actor.Actor.aroundReceive(Actor.scala:517)
    	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    	... 4 more
    Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
    	at redis.clients.util.Pool.getResource(Pool.java:50)
    	at redis.clients.jedis.JedisPool.getResource(JedisPool.java:99)
    	at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.getInstance(RedisContainer.java:250)
    	at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.open(RedisContainer.java:85)
    	at org.apache.flink.streaming.connectors.redis.RedisSink.open(RedisSink.java:174)
    	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    	at java.lang.Thread.run(Thread.java:750)
    Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect
    	at redis.clients.jedis.Connection.connect(Connection.java:164)
    	at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:80)
    	at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1676)
    	at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:87)
    	at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)
    	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
    	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
    	at redis.clients.util.Pool.getResource(Pool.java:48)
    	... 14 more
    Caused by: java.net.ConnectException: Connection refused: connect
    	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    	at java.net.Socket.connect(Socket.java:607)
    	at redis.clients.jedis.Connection.connect(Connection.java:158)
    	... 21 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150

    根据报错定位到一个错误栈:Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect

    应该是redis连接不上,确认配置(host、password、port、timeout等)没有问题之后检查逐步排查其他问题

    1、先检查了redis状态

    [root@master src]# ps -aux | grep redis
    root      11455  0.0  0.0 162396  7796 ?        Ssl  20:42   0:01 redis-server 127.0.0.1:6379
    root      58166  0.0  0.0 112828   988 pts/0    S+   21:19   0:00 grep --color=auto redis
    [root@master src]# netstat -nltp | grep redis
    tcp        0      0 127.0.0.1:6379          0.0.0.0:*               LISTEN      11455/redis-server
    [root@master src]# redis-cli
    127.0.0.1:6379>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    redis正常状态且能正常使用

    2、检查防火墙是否关闭或者是否放行redis端口(redis默认端口6379)

    [root@master src]# systemctl status firewalld
    ● firewalld.service - firewalld - dynamic firewall daemon
       Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
       Active: inactive (dead)
         Docs: man:firewalld(1)
    [root@master src]#
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    防火墙为关闭状态
    3、查看redis配置里的保护模式是否关闭(redis3.2版本以上有这个属性)
    查看redis配置里的bind是否关闭

    
    [root@master redis-6.0.8]# cat redis.conf | grep protect
    # Protected mode is a layer of security protection, in order to avoid that
    # When protected mode is on and if:
    # By default protected mode is enabled. You should disable it only if
    protected-mode yes
    # If the master is password protected (using the "requirepass" configuration
    # on the internet. It's just a protection layer against misuse of the instance.
    # So use the 'requirepass' option to protect your instance.
    [root@master redis-6.0.8] # cat redis.conf |grep bind
    # By default, if no "bind" configuration directive is specified, Redis listens
    # the "bind" configuration directive, followed by one or more IP addresses.
    # bind 192.168.1.100 10.0.0.1
    # bind 127.0.0.1 ::1
    # internet, binding to all the interfaces is dangerous and will expose the
    # following bind directive, that will force Redis to listen only into
    bind 127.0.0.1
    # 1) The server is not binding explicitly to a set of addresses using the
    #    "bind" directive.
    # are explicitly listed using the "bind" directive.
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    可以看到redis保护模式合bind 参数是开启的

    • 将配置中的yes改为no
    • 将bind哪一行注释掉
    • 之后重新启动即可解决问题
  • 相关阅读:
    EasyCVR及智能分析网关在校园视频融合及明厨亮灶项目中的应用方案设计
    spring boot logback.xml文件配置,info、error隔离
    一文了解 Java 中的构造器
    单臂路由 - 实验配置
    Hadoop简明教程
    jQuery实现动画登录页面+表单验证+数据采用localStorage本地储存
    ECharts数据可视化(案例)
    Options Error: invalid boolean value
    2023 10月8日 至 10 月16日学习总结
    2022年最新最全的Java零基础入门,零基础入门springboot,MySQL的学习
  • 原文地址:https://blog.csdn.net/asd1358355022/article/details/128008080