要求:
读取服务器上某个目录的文件下的txt的内容,读取里面的数据,进行处理,保存入库。保存后,把文件移到该目录下的bak目录。
处理:
要操作服务器文件,需要使用 ChannelSftp,文档
put(): 文件上传
get(): 文件下载
cd(): 进入指定目录
ls(): 得到指定目录下的文件列表
rm(): 删除指定文件
rename() 移动文件
- gradle:
- // 添加sftp的工具包
- implementation group: 'com.jcraft', name: 'jsch', version: '0.1.55'
-
- maven:
- <dependency>
- <groupId>com.jcraft</groupId>
- <artifactId>jsch</artifactId>
- <version>0.1.55</version>
- </dependency>
- @Slf4j
- @Service
- public class CustomerService {
-
-
- public void readFile() throws SftpException {
- SftpConfig sftpConfig = SftpUtils.getSftpConfigVal();
- ChannelSftp sftp = SftpUtils.connect(sftpConfig);
- String path = sftpConfig.getPath();
- String newpath = path + "/bak";
- // 判断文件路径是否存在
- SftpUtils.isExist(sftp, newpath);
- sftp.cd(path);
- // 读取txt文件
- Vector<ChannelSftp.LsEntry> list = sftp.ls("*.txt");
- List<String> data;
- String pathFileName, newpathFileName ="";
- for (ChannelSftp.LsEntry entry : list) {
- String fileName = entry.getFilename();
- log.info(fileName);
- pathFileName = path +"/"+ fileName;
- data = SftpUtils.getList(sftp, pathFileName);
-
- if(CollectionUtils.isNotEmpty(data)){
- Boolean moveFile = dealFileData(data);
- if(moveFile){
- try {
- newpathFileName = newpath + "/"+ fileName;
- log.info("path:{}, newPath: {} ", pathFileName, newpathFileName);
- sftp.rename(pathFileName, newpathFileName);
- }catch (Exception e){
- log.error("路径{},文件{}移动文件失败!", pathFileName, newpathFileName);
- e.printStackTrace();
- }
-
- }
- }
-
- }
- SftpUtils.disConnect(sftpConfig)
-
- }
-
- private Boolean dealFileData(List<String> data){
- Boolean moveFile = false;
- List<String> filterData = ListUtils.emptyIfNull(data).stream().filter(e -> e.contains("_")).collect(Collectors.toList());
- if(filterData.size() > 0){// 有符合要求的数据
- moveFile = true;
- // 对数据进行处理
- }
- return moveFile;
-
- }
-
-
- }
- /**
- * sftp 配置
- */
- @Data
- public class SftpConfig {
- /** 密钥地址 */
- private String privateKeyPath;
- /** 口令 */
- private String passphrase;
-
- private String ip;
-
- private Integer port;
-
- private String username;
-
- private String pwd;
-
- private String path;
-
- private String baseDir;
- }
- @Slf4j
- public class SftpChannel {
- Session session = null;
- ChannelSftp sftp = null;
-
- //端口默认为22
- public static final int SFTP_DEFAULT_PORT = 22;
-
- /** 利用JSch包实现SFTP下载、上传文件(秘钥方式登陆)*/
- public ChannelSftp connectByIdentity(SftpConfig sftpConfig) throws JSchException {
- JSch jsch = new JSch();
- int port = SFTP_DEFAULT_PORT;
- //设置密钥和密码
- //支持密钥的方式登陆,只需在jsch.getSession之前设置一下密钥的相关信息就可以了
- if (StringUtils.isNotBlank(sftpConfig.getPrivateKeyPath())) {
- if (StringUtils.isNotBlank(sftpConfig.getPassphrase())) {
- //设置带口令的密钥
- jsch.addIdentity(sftpConfig.getPrivateKeyPath(), sftpConfig.getPassphrase());
- } else {
- //设置不带口令的密钥
- jsch.addIdentity(sftpConfig.getPrivateKeyPath());
- }
- }
- if (sftpConfig.getPort() != null) {
- port = sftpConfig.getPort();
- }
- if (port > 0) {
- //采用指定的端口连接服务器
- session = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp(), port);
- } else {
- //连接服务器,采用默认端口
- session = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp());
- }
- if (session == null) {
- throw new JSchException("session is null,connect fail");
- }
- log.info("Session created ... UserName={};ip={};port={}", sftpConfig.getUsername(), sftpConfig.getIp(), sftpConfig.getPort());
- Properties sshConfig = new Properties();
- sshConfig.put("StrictHostKeyChecking", "no");
- session.setConfig(sshConfig);
- session.setTimeout(30000);
- session.connect();
- //创建sftp通信通道
- Channel channel = session.openChannel("sftp");
- channel.connect();
- sftp = (ChannelSftp) channel;
- log.info("login success...");
- return sftp;
- }
-
- /** 利用JSch包实现SFTP下载、上传文件(用户名密码方式登陆) */
- public ChannelSftp connectByPwd(SftpConfig sftpConfig) throws JSchException {
- JSch jsch = new JSch();
- int port = SFTP_DEFAULT_PORT;
- if (sftpConfig.getPort() != null) {
- port = sftpConfig.getPort();
- }
- if (port > 0) {
- //采用指定的端口连接服务器
- session = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp(), port);
- } else {
- //连接服务器,采用默认端口
- session = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp());
- }
- if (session == null) {
- throw new JSchException("session is null,connect fail");
- }
- log.info("Session created ... UserName={};ip={};port={}", sftpConfig.getUsername(), sftpConfig.getIp(), sftpConfig.getPort());
- //设置登陆主机的密码
- session.setPassword(sftpConfig.getPwd());//设置密码
- Properties sshConfig = new Properties();
- sshConfig.put("StrictHostKeyChecking", "no");
- session.setConfig(sshConfig);
- session.setTimeout(30000);
- session.connect();
- //创建sftp通信通道
- Channel channel = session.openChannel("sftp");
- channel.connect();
- sftp = (ChannelSftp) channel;
- log.info("login success...");
- return sftp;
- }
-
- public void closeChannel() {
- log.info("sftp object closing...");
- if (sftp != null) {
- if (sftp.isConnected()) {
- sftp.disconnect();
- }
- }
- if (session != null) {
- if (session.isConnected()) {
- session.disconnect();
- }
- }
- }
-
- }
-
- /**
- * sftp 上传下载工具类
- */
- public class SftpUtils {
-
- private static Logger logger = LoggerFactory.getLogger(SftpUtils.class);
- private static long count = 3;
- private static long count1 = 0;
- private static long sleepTime;
-
- /**
- * 连接sftp服务器
- *
- * @return
- */
- public static ChannelSftp connect(SftpConfig sftpConfig) {
- ChannelSftp sftp = null;
- try {
- JSch jsch = new JSch();
- jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp(), sftpConfig.getPort());
- Session sshSession = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp(),
- sftpConfig.getPort());
- logger.info("Session created ... UserName=" + sftpConfig.getUsername() + ";host=" + sftpConfig.getIp()
- + ";port=" + sftpConfig.getIp());
- sshSession.setPassword(sftpConfig.getPwd());
- Properties sshConfig = new Properties();
- sshConfig.put("StrictHostKeyChecking", "no");
- sshSession.setConfig(sshConfig);
- sshSession.connect();
- logger.info("Session connected ...");
- logger.info("Opening Channel ...");
- Channel channel = sshSession.openChannel("sftp");
- channel.connect();
- sftp = (ChannelSftp) channel;
- logger.info("登录成功");
- } catch (Exception e) {
- try {
- count1 += 1;
- if (count == count1) {
- throw new RuntimeException(e);
- }
- if (count1 != count) {
- Thread.sleep(sleepTime);
- logger.info("connect again....");
- connect(sftpConfig);
- }
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- throw new RuntimeException(e1);
-
- }
- }
- return sftp;
- }
- /**
- * 获取Sftp对象
- * @param param json sftp对象
- * @return SftpConfig
- */
- public static SftpConfig getSftpObj(String param) {
- SftpConfig sftpConfig = new SftpConfig();
- if (StringUtils.isNotBlank(param)) {
- sftpConfig = JSONObject.parseObject(param, SftpConfig.class);
- }
- return sftpConfig;
- }
-
- /** sftp 上传*/
- public static boolean upload(SftpConfig config, String baseDir, String fileName, String filePath) {
- logger.info("路径:baseDir=" + baseDir);
- SftpChannel sftpChannel = new SftpChannel();
- ChannelSftp sftp;
- try {
- if (StringUtils.isNotBlank(config.getPrivateKeyPath())) {
- sftp = sftpChannel.connectByIdentity(config);
- } else {
- sftp = sftpChannel.connectByPwd(config);
- }
- if (sftp.isConnected()) {
- logger.info("connect server success");
- } else {
- logger.error("connect server fail");
- return false;
- }
- //检查路径
- if (!isExist(sftp, baseDir)) {
- logger.error("创建sftp服务器路径失败:" + baseDir);
- return false;
- }
- String dst = baseDir + "/" + fileName;
- String src = filePath + "/" + fileName;
- logger.info("开始上传,本地服务器路径:[" + src + "]目标服务器路径:[" + dst + "]");
- sftp.put(src, dst);
- sftp.put(src, dst);
- logger.info("上传成功");
- return true;
- } catch (Exception e) {
- logger.error("上传失败", e);
- return false;
- } finally {
- sftpChannel.closeChannel();
- }
- }
-
- /** sftp 上传*/
- public boolean uploadNew(SftpConfig config, String baseDir, String fileName, ByteArrayOutputStream out) {
- logger.info("sftpconfig==>{}", config);
- ChannelSftp sftp = connect(config);
- boolean flag = true;
- try {
- sftp.cd(baseDir);
- logger.info("first step=========>{}");
- } catch (SftpException e) {
- try {
- sftp.mkdir(baseDir);
- sftp.cd(baseDir);
- } catch (SftpException e1) {
- flag = false;
- throw new RuntimeException("ftp create filepath fail" + baseDir);
- }
- }
-
- logger.info("the third step =========>{},uploadFile==>{}", flag, fileName);
- try (InputStream in = new ByteArrayInputStream(out.toByteArray())) {
- sftp.put(in, fileName+".ing");
- logger.info("the fourth step =========>{}", "transfer success");
- sftp.rename(baseDir + "/" + fileName+".ing", baseDir + "/" + fileName);
- logger.info("the fifth step =========>{}", "rename success");
- } catch (Exception e) {
- flag = false;
- throw new RuntimeException("sftp excetion" + e);
- } finally {
- disConnect(sftp);
- }
- return flag;
- }
-
- /**
- * 复制某路径下的文件到另外一个路径
- * 直接移动文件,用 ChannelSftp sftp.rename(String oldpath, String newpath)
- * @param config SftpConfig
- * @param sourceBaseDir 源文件路径
- * @param sourceFileName 源文件名称
- * @param destFilePath 目标文件路径
- * @param destFileName 目标文件名称
- * @return boolean
- */
- public static boolean copyFile(SftpConfig config, String sourceBaseDir, String sourceFileName, String destFilePath, String destFileName) {
- SftpChannel sftpChannel = new SftpChannel();
- ChannelSftp sftp;
- try {
- if (StringUtils.isNotBlank(config.getPrivateKeyPath())) {
- sftp = sftpChannel.connectByIdentity(config);
- } else {
- sftp = sftpChannel.connectByPwd(config);
- }
- if (sftp.isConnected()) {
- logger.info("connect server success");
- } else {
- logger.error("connect server fail");
- return false;
- }
- String dest;
- if (StringUtils.isBlank(destFileName)) {
- dest = destFilePath + sourceFileName;
- } else {
- dest = destFilePath + destFileName;
- }
- String src = sourceBaseDir + "/" + sourceFileName;
- logger.info("start downing,sftp server path:[" + src + "] dest server path:[" + dest + "]");
- //获取文件的大小
- SftpATTRS attr = sftp.stat(src);
- long fileSize = attr.getSize();
- logger.info("downing file size :" + fileSize);
- sftp.get(src, dest);
- logger.info("download success");
- return true;
- } catch (Exception e) {
- logger.error("download fail", e);
- return false;
- } finally {
- sftpChannel.closeChannel();
- }
- }
-
- /**
- *
- * @param config SftpConfig
- * @param baseDir sftp 路径
- * @param sourceFileName sftp 文件名称
- * @param savePath 本地路径
- * @param destFileName 本地文件名称
- * @return 返回文件路径
- * @throws SftpException
- * @throws JSchException
- * @throws IOException
- */
- public static String downToLocal(SftpConfig config, String baseDir, String sourceFileName, String savePath, String destFileName) throws SftpException, JSchException, IOException {
- SftpChannel sftpChannel = new SftpChannel();
- ChannelSftp sftp = null;
- String localPath = "";
- try {
- if (StringUtils.isNotBlank(config.getPrivateKeyPath())) {
- sftp = sftpChannel.connectByIdentity(config);
- } else {
- sftp = sftpChannel.connectByPwd(config);
- }
- if (sftp.isConnected()) {
- logger.info("connect server success");
- } else {
- logger.error("connect server fail");
- }
- String src = baseDir + "/" + sourceFileName;
- logger.info("start downing,sftp server path:[" + src + "]");
- //获取文件的大小
- SftpATTRS attr = sftp.stat(src);
- long fileSize = attr.getSize();
- logger.info("downing file size :" + fileSize);
- InputStream is = sftp.get(src);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- int len;
- byte[] bytes = new byte[1024];
- while ((len = is.read(bytes)) != -1) {
- out.write(bytes, 0, len);
- }
- File saveDir = new File(savePath);
- if(!saveDir.exists()){ // 没有就创建该文件
- saveDir.mkdir();
- }
- File file = new File(saveDir+File.separator+(System.currentTimeMillis()/10000)+destFileName);
- FileOutputStream fos = new FileOutputStream(file);
- fos.write(bytes);
-
- fos.close();
- is.close();
- return file.getPath();
- } finally {
- sftpChannel.closeChannel();
- }
- }
-
-
- /**
- * 判断文件夹是否存在
- * true 目录创建成功,false 目录创建失败
- * @param sftp ChannelSftp
- * @param filePath 文件夹路径
- * @return boolean
- */
- public static boolean isExist(ChannelSftp sftp, String filePath) {
- String paths[] = filePath.split("/");
- String dir = paths[0];
- for (int i = 0; i < paths.length - 1; i++) {
- dir = dir + "/" + paths[i + 1];
- try {
- sftp.cd(dir);
- } catch (SftpException sException) {
- if (ChannelSftp.SSH_FX_NO_SUCH_FILE == sException.id) {
- try {
- sftp.mkdir(dir);
- } catch (SftpException e) {
- e.printStackTrace();
- return false;
- }
- }
- }
- }
- return true;
- }
-
- /**
- * 获取文件数据
- * @param sftp ChannelSftp
- * @param ftpRootDir 文件路径及名称, 比如/home/ddzxsftp/ivr/33.txt
- * @return List
- */
- public static List<String> getList(ChannelSftp sftp, String ftpRootDir) {
- List<String> list = new ArrayList<>();
- try {
- InputStream is = sftp.get(ftpRootDir);
- String str;
- BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
- while ((str = br.readLine()) != null) {
- list.add(str);
- }
- br.close();
- } catch (Exception e) {
- logger.error("get file data excetion {}", e);
- }
- return list;
- }
- /**
- * 断掉连接
- */
- public void disConnect(ChannelSftp sftp) {
- try {
- sftp.disconnect();
- sftp.getSession().disconnect();
- } catch (Exception e) {
- logger.error("disConnect errorMessage",e);
- }
- }
-
-
-
- /**
- * 获取配置表里面配置sftp连接的内容
- * 把连接信息配置在数据库
- */
- public static SftpConfig getSftpConfigVal() {
- String configVal = "{\"ftpHost\":\"132.65.205.86\",\"ftpPort\":\"22\",\"ftpUserName\":\"yan\",\"ftpPassword\":\"KG@055\",\"ftpBaseDir\":\"/home/sftp\",\"path\":\"/home/sftp/ivr\",\"privateKeyPath\":\"\",\"passphrase\":\"\"}";
- JSONObject configObj = JSON.parseObject(configVal);
- String ftpHost = MapUtils.getString(configObj, "ftpHost");
- int ftpPort = MapUtils.getInteger(configObj, "ftpPort");
- String ftpUserName = MapUtils.getString(configObj, "ftpUserName");
- String ftpPassword = MapUtils.getString(configObj, "ftpPassword");
- String ftpBaseDir = MapUtils.getString(configObj, "ftpBaseDir");
- String path = MapUtils.getString(configObj, "path");
- String privateKeyPath = MapUtils.getString(configObj, "privateKeyPath");
- String passphrase = MapUtils.getString(configObj, "passphrase");
- SftpConfig config = new SftpConfig();
- config.setIp(ftpHost);
- config.setPort(ftpPort);
- config.setUsername(ftpUserName);
- config.setPwd(ftpPassword);
- config.setBaseDir(ftpBaseDir);
- config.setPath(path);
- config.setPrivateKeyPath(privateKeyPath);
- config.setPassphrase(passphrase);
- return config;
- }
-
-
- }
用cd(): 进入指定目录,然后用ls(): 得到指定目录下的文件列表,可以指定具体的文件类型。读取文件后,再用 rename(): 移动文件。了解ChannelSftp命令,要操作服务器上的文件就方便很多