重点来啦,直接上干货,走起
package cn.xhh.monitor.task;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.CallableStatementCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
public class AutoCancelSlowSql {
private JdbcTemplate redshiftJdbcTemplate;
private JdbcTemplate mysqlJdbcTemplate;
log.info("定时取消慢查询任务开始执行,周期30秒钟" );
Runnable runnable = new Runnable () {
int hour = LocalTime.now().getHour();
log.info("当前小时:{}。" , hour);
String querySlowSql = "select pid, (now() - query_start) as exec, query from pg_stat_activity where pg_stat_activity.query <> ''::text AND pg_stat_activity.state <> 'idle'::text and (now() - query_start) > '00:03:00';" ;
cancelActiveSql(querySlowSql);
String querySessionSql = "select pid, (now() - query_start) as exec, query from pg_stat_activity where usename not in ('gpadmin', 'gpmon') and (now() - query_start) > '00:05:00' order by now() - query_start desc limit 10" ;
cancelActiveSql(querySessionSql);
String queryVacuumSql = "select pid, (now() - query_start) as exec, query from pg_stat_activity where usename = 'gpadmin' and (now() - query_start) > '01:00:00' and query like 'vacuum full%';" ;
cancelActiveSql(queryVacuumSql);
log.error("定时取消慢查询任务出错:" + e.getMessage());
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(runnable, 30 , 30 , TimeUnit.SECONDS);
private void cancelActiveSql (String querySql) {
List> slowSqlList = redshiftJdbcTemplate.queryForList(querySql);
if (slowSqlList.size() > 0 ) {
slowSqlList.forEach(result -> {
String pid = result.get("pid" ).toString();
String query = result.get("query" ).toString();
Object exec = result.get("exec" );
if (exec != null && !exec.toString().equals("" )) {
log.info("canal sql pid : {}, exec {}." , pid, exec);
if (!Strings.isNullOrEmpty(query) && query.toLowerCase().trim().startsWith("select" )) {
sql = "select pg_cancel_backend( ? )" ;
sql = "select pg_terminate_backend( ? )" ;
redshiftJdbcTemplate.execute(sql, (CallableStatementCallback) cs -> {
cs.setInt(1 , Integer.parseInt(pid));
log.info("取消慢查询:" + pid + ":" + query);
log.error("error:{}" , e);
public void executeProduction () {
Runnable runnable = new Runnable () {
log.info("调度productionLevel" );
String queryProductionLevel =
"select company_id,level,source from production_level where level = 31" ;
mysqlJdbcTemplate.queryForList(queryProductionLevel);
redshiftJdbcTemplate.execute("drop table if exists productionLevel" );
.execute("create table productionLevel(company_id int4,level text,source text)" );
queryForList.forEach(i -> {
.update("insert into productionLevel values(?,?,?)" , new Object []{i.get("company_id" ),
i.get("level" ), i.get("source" )});
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(runnable, 1 , 60 , TimeUnit.MINUTES);