1.添加依赖
<dependency>
<groupId>org.redissongroupId>
<artifactId>redissonartifactId>
<version>3.9.1version>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
<version>2.2.6.RELEASEversion>
dependency>
2.编写配置文件redis.properties
#*****************jedis连接参数设置*********************#
#是否启用redis
redis.enable=true
#运行模式:单机standalone、分片sharded、集群cluster三种模式
# 哨兵模式
#redis.runMode=sentinel
redis.runMode=standalone
#redis服务器配置,如果是集群,则使用英文逗号分隔(如:192.168.8.167:6379,192.168.8.167:6380) #
redis.hostports=192.168.5.210:6379
#redis.hostports=redis-app-0.redis-service.gtms-preprod.svc.cluster.local:6379,redis-app-1.redis-service.gtms-preprod.svc.cluster.local:6379,redis-app-2.redis-service.gtms-preprod.svc.cluster.local:6379,redis-app-3.redis-service.gtms-preprod.svc.cluster.local:6379,redis-app-4.redis-service.gtms-preprod.svc.cluster.local:6379,redis-app-5.redis-service.gtms-preprod.svc.cluster.local:6379
#redis.hostports=redis-ha.gtms-preprod.svc.cluster.local:26379
#*************JedisCluster构造函数的参数*****************#
#服务器密码,需要则设置登录密码
redis.password=Pansoft@uat1
#redis.masterName=name
#连接超时时间
redis.connectionTimeout=3000
#返回值的超时时间
redis.soTimeout=3000
#出现异常最大重试次数
redis.maxAttempts=10
#************************jedis池参数设置*******************#
#jedis的最大分配对象#
redis.maxTotal=100
#jedis最大保存idel状态对象数 #
redis.maxIdle=10
#jedis最小保存idel状态对象数 #
redis.minIdle=1
#jedis池没有对象返回时,最大等待时间 #
redis.maxWaitMillis=1500
#jedis调用borrowObject方法时,是否进行有效检查#
redis.testOnBorrow=true
#jedis调用returnObject方法时,是否进行有效检查 #
redis.testOnReturn=true
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
3.编写配置类
@Slf4j
public class OldRedissonConfig {
private static final String redisConfigFile = "Config/redis.properties";
public static RedissonClient runEntry() {
Map<String, String> map=getHostAndPwd();
if(EmptyUtil.isNotEmpty(map)) {
String runMode = map.get("runMode");
RedissonClient redisson=null;
if("standalone".equals(runMode)){
redisson = initialSingleRedissons();
}else if ("cluster".equals(runMode)){
redisson = initialClusterRedisson();
}
return redisson;
}else {
return null;
}
}
public static RedissonClient initialSingleRedissons(){
try {
Map<String, String> map=getHostAndPwd();
if(EmptyUtil.isNotEmpty(map)) {
String hostPort = map.get("hostports");
String redisPwd = map.get("pwd");
Config config = new Config();
config.useSingleServer().setAddress("redis://" + hostPort).setPassword(redisPwd);
RedissonClient redisson = Redisson.create(config);
log.info("「SingleRedisson」初始化成功!!");
return redisson;
}else {
return null;
}
}catch (Exception e){
log.info("「SingleRedisson」初始化异常!!message:{}",e.getMessage());
e.printStackTrace();
}
return null;
}
public static RedissonClient initialClusterRedisson(){
try {
Map<String, String> map=getHostAndPwd();
if(EmptyUtil.isNotEmpty(map)) {
String hostPort = map.get("hostports");
String redisPwd = map.get("pwd");
Config config = new Config();
String[] hostPostArr = hostPort.split(",");
for (String str : hostPostArr) {
config.useClusterServers()
.setScanInterval(2000)
.setPassword(redisPwd)
.addNodeAddress(str);
}
RedissonClient redisson = Redisson.create(config);
return redisson;
}else {
return null;
}
}catch (Exception e) {
log.info("「ClusterRedisson」初始化异常!!message:{}",e.getMessage());
e.printStackTrace();
}
return null;
}
public static Map<String,String> getHostAndPwd(){
try {
Properties props = new Properties();
props.load(RedisPoolConfig.class.getClassLoader().getResourceAsStream(redisConfigFile));
String runMode = props.getProperty("redis.runMode");
String hostPort = props.getProperty("redis.hostports");
String redisPwd = props.getProperty("redis.password");
Map<String, String> map = new HashMap<String, String>();
map.put("runMode",runMode);
map.put("hostports",hostPort);
map.put("pwd",redisPwd);
return map;
}catch (Exception e) {
log.info("redisson初始化获取结点和密码失败");
e.printStackTrace();
}
return null;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
4.service中使用线程池调用子线程
@Slf4j
@Service
public class AutoSendAddOrgInfo {
private String url;
private String requestUrl;
@Autowired
private PathUrlConfig pathUrlConfig;
@Autowired
private GetTokenService getTokenService;
ThreadPoolExecutor pool = new ThreadPoolExecutor(10,20,60,
TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
public OrgResultDto sendAddOrgInfos() throws Exception {
RedissonClient redissonClient= RedissonConfig.initialRedissonClient();
url=pathUrlConfig.getOrgHost();
requestUrl=pathUrlConfig.getSendAddOrginInfoRequestPath();
PreparedStatement preparedStatement=null;
JConnection connection=null;
ResultSet rs=null;
OrgResultDto userResultDto=null;
try {
JParamObject paramObject = JParamObject.getInstance();
connection = DBUtils.getConnection(paramObject);
String sql = "select a.DWZD_BH as organizationBH,a.DWZD_BZBM as orgCode,a.DWZD_MC as organizationName from zj_dwzd a left join dwzd_push b on a.DWZD_BH=b.ORGANIZATION_BH where b.ORGANIZATION_BH is null";
log.warn("查询新增待推送sql:"+sql);
preparedStatement = connection.prepareStatement(sql);
rs = preparedStatement.executeQuery();
List<OrgPushReqDto> orgList = new ArrayList<>();
while (rs.next()) {
OrgPushReqDto orgPushReqDto=new OrgPushReqDto();
orgPushReqDto.setOrganizationBH(rs.getString("organizationBH"));
orgPushReqDto.setOrganizationName(rs.getString("organizationName"));
orgPushReqDto.setExternalId(orgPushReqDto.getOrganizationBH());
orgPushReqDto.setExtendFields(new ExtendFieldDto());
orgPushReqDto.setType("0");
orgPushReqDto.getExtendFields().setTest1(rs.getString("orgCode"));
orgList.add(orgPushReqDto);
}
log.info("总共"+orgList.size()+"条新增待推送的数据");
for (OrgPushReqDto orgPushReqDto : orgList) {
pool.execute(new AutoSendAddOrgInfoTaskThread(orgPushReqDto,url,requestUrl,getTokenService,redissonClient));
}
userResultDto= new OrgResultDto(true, "200", "推送组织执行成功", null, null);
}catch (Exception e){
e.printStackTrace();
userResultDto= new OrgResultDto(false, "-1", "推送组织执行失败", null, null);
}finally {
if(rs!=null) {
rs.close();
}
if (preparedStatement!=null){
preparedStatement.close();
}
if (connection!=null){
connection.close();
}
}
return userResultDto;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
5.在子线程中使用
@Slf4j
public class AutoSendAddOrgInfoTaskThread implements Runnable{
private String ID_URL;
private String requestPath;
private GetTokenService getTokenService;
private OrgPushReqDto orgPushReqDto;
private RedissonClient redissonClient;
public AutoSendAddOrgInfoTaskThread(OrgPushReqDto orgPushReqDto,String ID_URL,String requestPath,GetTokenService getTokenService,RedissonClient redissonClient){
this.orgPushReqDto=orgPushReqDto;
this.ID_URL=ID_URL;
this.requestPath = requestPath;
this.getTokenService=getTokenService;
this.redissonClient =redissonClient;
}
@Override
public void run() {
PreparedStatement preparedStatement = null;
JConnection connection = null;
ResultSet rs = null;
String redisToken="";
Lock lock=redissonClient.getLock("aaa");
try {
JParamObject paramObject = JParamObject.getInstance();
connection = DBUtils.getConnection(paramObject);
lock.lock();
RBucket<String> bucket = redissonClient.getBucket("iamToken");
redisToken =bucket.get();
if (EmptyUtil.isEmpty(bucket.get())) {
IDasTokenResultDto tokenFromIDas = getTokenService.getTokenFromIDas();
redisToken = tokenFromIDas.getAccess_token();
log.info("系统返回的token过期时间是:" + tokenFromIDas.getExpires_in());
bucket.set(redisToken,Integer.parseInt(tokenFromIDas.getExpires_in()) / 1000 - 5, TimeUnit.SECONDS);
}
log.info("调用添加组织接口:");
GetRestTemplate getRestTemplate=new GetRestTemplate();
RestTemplate restTemplate = getRestTemplate.instanceRestTemplate();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.valueOf(MediaType.APPLICATION_JSON_VALUE));
httpHeaders.add("Authorization", "Bearer " + redisToken);
orgPushReqDto.setParentExternalId("main");
orgPushReqDto.setDescription("测试批量推送新增组织");
String jsonString=JSONObject.toJSONString(orgPushReqDto);
HttpEntity<String> requestEntity = new HttpEntity<>(jsonString, httpHeaders);
log.info("调用推送新增组织,参数是:"+requestEntity.toString());
ResponseEntity<OrgResultDto> result = restTemplate.exchange(ID_URL + requestPath, HttpMethod.POST, requestEntity, OrgResultDto.class);
OrgResultDto body = result.getBody();
log.info("调用推送新增组织,返回结果是:"+JSONObject.toJSONString(body));
jsonString=JSONObject.toJSONString(body);
OrgResultDto systemResUserResultDto = JSONObject.parseObject(jsonString, OrgResultDto.class);
int isSuccessSend=1;
if (!systemResUserResultDto.isSuccess()) {
String uuid = SnowFlakeUtil.generateKey().toString();
String sql = "insert into ZJ_RETRY(F_GUID,F_DATA_ID,F_ORGANZATION_NAME,F_ORG_CODE,F_RETRY_TYPE,F_DATA_TYPE,F_RETRY_STATUS,F_RETRY_NUM,F_RETRY_TIME,F_FAIL_CODE,F_FAIL_MSG,F_START_TIME) " +
"values(?,?,?,?,?,?,?,?,?,?,?,?)";
log.info("插入重试表sql:" + sql);
preparedStatement=connection.prepareStatement(sql);
preparedStatement.setString(1, uuid);
preparedStatement.setString(2,orgPushReqDto.getOrganizationBH());
preparedStatement.setString(3,orgPushReqDto.getOrganizationName());
preparedStatement.setString(4,orgPushReqDto.getExtendFields().getTest1());
preparedStatement.setString(5,"0");
preparedStatement.setString(6,"2");
preparedStatement.setString(7,"0");
preparedStatement.setInt(8,0);
preparedStatement.setString(9,"");
preparedStatement.setString(10,systemResUserResultDto.getCode());
preparedStatement.setString(11,systemResUserResultDto.getMessage());
preparedStatement.setObject(12,DateUtil.getCurrentDateStr());
int count = preparedStatement.executeUpdate();
if (count < 1) {
log.info("新增数据插入重试表失败");
}
isSuccessSend=0;
}
orgPushReqDto.setCreateTime(DateUtil.getCurrentDateStr());
orgPushReqDto.setModifyTime(DateUtil.getCurrentDateStr());
orgPushReqDto.setModifyTime("0");
String insertSql = "insert into dwzd_push(ORGANIZATION_BH,ORGANIZATION_NAME,ORG_CODE,EXTERNAL_ID,PARENT_EXTERNAL_ID,TYPE,SORT_NUMBER,ENABLED,DESCRIPTION,TEST1,CREATE_TIME,MODIFY_TIME,GUID,SEND_STATUS) " +
"values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
preparedStatement=connection.prepareStatement(insertSql);
preparedStatement.setString(1, orgPushReqDto.getOrganizationBH());
preparedStatement.setString(2, orgPushReqDto.getOrganizationName());
preparedStatement.setString(3,orgPushReqDto.getExtendFields().getTest1());
preparedStatement.setString(4, orgPushReqDto.getExternalId());
preparedStatement.setString(5, orgPushReqDto.getParentExternalId());
preparedStatement.setString(6, orgPushReqDto.getType());
preparedStatement.setString(7, orgPushReqDto.getSortNumber());
if ( orgPushReqDto.getEnabled()!=null) {
preparedStatement.setString(8, orgPushReqDto.getEnabled()+"");
}else {
preparedStatement.setString(8, "");
}
preparedStatement.setString(9, orgPushReqDto.getDescription());
if (orgPushReqDto.getExtendFields()!=null) {
preparedStatement.setString(10, orgPushReqDto.getExtendFields().getTest1());
}else {
preparedStatement.setString(10,"");
}
preparedStatement.setString(11, DateUtil.getCurrentDateStr());
preparedStatement.setString(12, DateUtil.getCurrentDateStr());
preparedStatement.setString(13,SnowFlakeUtil.generateKey().toString());
preparedStatement.setString(14,isSuccessSend+"");
int count = preparedStatement.executeUpdate();
if (count > 0) {
log.info("发送新增数据成功,添加推送表成功");
}
}catch (Exception e){
e.printStackTrace();
log.info("线程执行异常");
}finally {
lock.unlock();
try{
if (rs!=null){
rs.close();
}
if (preparedStatement!=null){
preparedStatement.close();
}
if (connection!=null){
connection.close();
}
}catch (Exception e){
log.info("关闭链接异常");
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142