博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
用多线程处理FTP上传
阅读量:5312 次
发布时间:2019-06-14

本文共 14998 字,大约阅读时间需要 49 分钟。

  在开发中遇到总站发送命令请求分站将某资源通过FTP上传过来,也就是总站提取分站的资源问题。并且总站实时可以获取已经提取了文件的大小的比例。

  思路:1.首先分站要将文件大小告知总站

               2.总站收到文件大小后,根据指定路径去判断指定路径文件夹(分站的文件存储的位置)下的文件大小,然后和当前文件大小/总大小,就获取了已经上传的所占比。

  总站发送请求就不再赘述了,直接说分站接收到请求后的处理,为了防止恶意请求,我们这对每次请求都加了“盐”,那么分站接收时需要判断是否带“盐”了,如果没有,则不予处理,返回错误。

  分站接收请求contorller

@RequestMapping("/fileSize.htm")    @ResponseBody    public String getFileSize(HttpServletRequest request,HttpServletResponse response,String json) {        log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "获取资源文件大小(字节)------start");        /*获取参数--start*/        String checkCode = "see-P2C";//校验码        JSONObject jsonObject = new JSONObject(json);        String tranno =jsonObject.getString("tranno"); //交易流水号        String isCheckcode = jsonObject.getString("checkCode"); //校验码        checkCode = tranno.substring(tranno.length()-6,tranno.length())+checkCode;        String mD5CheckCode = MD5Util.encrypt(checkCode);           Long ids =jsonObject.getLong("sid"); //资源ID        Map
map = new HashMap
(); map.put("ids", ids); map.put("tranno", tranno); map.put("isCheckcode", isCheckcode); map.put("mD5CheckCode", mD5CheckCode); log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "获取资源文件大小(字节)------end"); return this.transRecordService.fileSizeHandle(map); }

  TransRecordServiceImpl.java中我们处理这些请求,首先是总站第一遍请求时,分站会判断有没有这个文件,如果没有返回错误,如果有资源分站会告诉总站我的文件有多大、我会把文件放到你的FTP的哪个文件夹下(路径)。

/**     * @desc 获取文件总大小     * @author zp     */    @Override    public String fileSizeHandle(Map
map) { // TODO Auto-generated method stub log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(获取视频总大小)----start"); String tranno = map.get("tranno").toString(); Long ids = (Long)map.get("ids"); String mD5CheckCode = map.get("mD5CheckCode").toString(); String isCheckcode = map.get("isCheckcode").toString(); /*交易记录表插入信息--start*/ TranCodeModel tranCodeModel = new TranCodeModel(); tranCodeModel.setTrandesc("总站提取分站视频资源"); tranCodeModel.setUrl("parentMsg/fileSizeHandle.htm"); tranCodeModel.setProjectphase("2"); tranCodeModel.setRspcode("00"); tranCodeModel.setRspinfo("交易成功"); tranCodeModel.setTranconent("提取资源(resourceid="+ids+")"); tranCodeModel.setTranno(tranno); tranCodeModel.setCreatetime(DateUtil.getCurrentDateString("YYYY-MM-dd HH:mm:ss")); /*交易记录表插入信息--end*/ if(!mD5CheckCode.equals(isCheckcode)){ // 身份验证失败 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("身份验证失败"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"identifyError\"}"; } if(this.vertifyTranNo(tranno)>0){ // 流水号重复 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("交易流水号重复"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"trannoRepeat\"}"; } Map
resourceMap = this.resourceService.findById(ids); if (Validator.isNull(resourceMap)||resourceMap==null) { // 资源被删除 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("资源被删除"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"delresource\"}"; } if ("-1".equals(resourceMap.get("isextract").toString())) { // 提取异常 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("提取异常"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"errvideo\"}"; } List
rFiles = this.rFileService.findGroupByResourceId(ids); if (rFiles.size()==0) { // 视频资源不存在 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("nofiles"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"novideo\"}"; } Long filesize = (long)0; for (ResourceFileModel resourceFileModel : rFiles) { filesize += Long.parseLong(resourceFileModel.getSize()); } String uuid = UUID.randomUUID().toString().replace("-", ""); log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(获取视频总大小)----end"); return "{\"rspcode\":\"00\",\"rspinfo\":\"success\",\"filesize\":\""+filesize+"\",\"uuid\":\""+uuid+"\"}"; }

  总站接收到分站第一次成功返回信息之后,然后通知分站,你可以开始上传了,总站也会将文件总大小进行保存,然后开始根据分站提供的文件路径进行实时查询文件大小,然后进行比对。

  分站接收到总站回信后,开始上传文件(总站的回信也是加盐的哦)。

  分站接收总站的第二次请求contorller

/**     * @desc 提取资源     * @author zp     * @throws IOException      * @date 2018-3-16     */    @RequestMapping("/pickup.htm")    @ResponseBody    public String pickup(HttpServletRequest request,HttpServletResponse response,String json) throws IOException{        log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "提取资源------start");                 /*获取参数--start*/        String checkCode = "see-P2C";//校验码        JSONObject jsonObject = new JSONObject(json);        String tranno =jsonObject.getString("tranno"); //交易流水号        String isCheckcode = jsonObject.getString("checkCode"); //校验码        checkCode = tranno.substring(tranno.length()-6,tranno.length())+checkCode;        String mD5CheckCode = MD5Util.encrypt(checkCode);           Long ids =jsonObject.getLong("sid"); //资源ID        String ftpuser = jsonObject.getString("ftpuser");// ftp帐号        String ftppwd = jsonObject.getString("ftppwd");// ftp密码        String ftpport = jsonObject.getString("ftpport");// ftp端口        String ftpIP = jsonObject.getString("ftpIP");// ftpIP        String uuid = jsonObject.getString("uuid");// uuid        Map
map = new HashMap
(); map.put("ids", ids); map.put("tranno", tranno); map.put("isCheckcode", isCheckcode); map.put("mD5CheckCode", mD5CheckCode); map.put("ftpuser", ftpuser); map.put("ftppwd", ftppwd); map.put("ftpIP", ftpIP); map.put("ftpport", ftpport); map.put("uuid", uuid); return this.transRecordService.pickHandle(map); }

  TransRecordServiceImpl.java中,总站会提供自己的FTP的相关信息。然后开始上传。里边也会处理盐是否正确,是否有该文件等操作。 

/**     * @desc 处理总站提取请求     * @author zp     * @throws IOException      * @date 2018-3-19     */    @Override    public String pickHandle(Map
map) throws IOException { String tranno = map.get("tranno").toString(); Long ids = (Long)map.get("ids"); String mD5CheckCode = map.get("mD5CheckCode").toString(); String isCheckcode = map.get("isCheckcode").toString(); String ftpuser = map.get("ftpuser").toString(); String ftppwd = map.get("ftppwd").toString(); String ftpIP = map.get("ftpIP").toString(); String ftpport = map.get("ftpport").toString(); /*交易记录表插入信息--start*/ TranCodeModel tranCodeModel = new TranCodeModel(); tranCodeModel.setTrandesc("总站提取分站视频资源"); tranCodeModel.setUrl("parentMsg/pickup.htm"); tranCodeModel.setProjectphase("2"); tranCodeModel.setRspcode("00"); tranCodeModel.setRspinfo("交易成功"); tranCodeModel.setTranconent("提取资源(resourceid="+ids+")"); tranCodeModel.setTranno(tranno); tranCodeModel.setCreatetime(DateUtil.getCurrentDateString("YYYY-MM-dd HH:mm:ss")); /*交易记录表插入信息--end*/ if(!mD5CheckCode.equals(isCheckcode)){ // 身份验证失败 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("身份验证失败"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"identifyError\"}"; } if(this.vertifyTranNo(tranno)>0){ // 流水号重复 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("交易流水号重复"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"trannoRepeat\"}"; } // 开始上传资源 // 1.判断该任务是否录制完成 Map
reMap = this.resourceService.findById(ids); if (Validator.isNull(reMap)) { // 不存在 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("资源已被删除"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"delResouces\"}"; } if (!"3".equals(reMap.get("isextract").toString())&&!"2".equals(reMap.get("isextract").toString())) { // 资源未被提取或已被删除 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("资源还未被提取或已被删除"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"nopick\"}"; } // 2.通过资源ID查找所有的资源信息 List
rFiles = this.rFileService.findGroupByResourceId(ids); if (rFiles.size()==0) { // 视频资源不存在 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("nofiles"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"novideo\"}"; } // 3.开始上传 String filePath = ""; String arr = ""; String picktemp = PropertyUtil.getProperty("picktemp"); // 临时路径 String uuid = map.get("uuid").toString(); picktemp = picktemp+uuid+"\\"; picktemp = picktemp.replaceAll("\\\\", "/"); for (ResourceFileModel resourceFileModel : rFiles) { filePath = resourceFileModel.getPath(); filePath = filePath.replaceAll("\\\\", "/"); String path = filePath.replaceAll("\\\\", "/"); String temp = filePath.substring(0,filePath.lastIndexOf("resources/")+10); String temp2 = filePath.substring(filePath.lastIndexOf("resources/")+10,filePath.length()); temp2 = temp2.substring(0,temp2.indexOf("/")+1); filePath = temp + temp2; path = path .substring(filePath.length(),path.length()); path = picktemp + path; path = path.replace("/picktemp/", "/resources/"); resourceFileModel.setPath(path); String url = path.substring(path.indexOf("ResourcesManager/"),path.length()); resourceFileModel.setUrl(url); arr += JSONObject.fromObject(resourceFileModel)+","; } if (arr.length()>0) { arr = arr.substring(0, arr.length()-1); } // 将文件拷贝到指定的临时目录 CopyDirectory copyfile = new CopyDirectory(); copyfile.copyDirectiory(filePath, picktemp); // filePath 源文件 picktemp 目标地址 log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "提取资源------end"); try { String[] ip = ftpIP.split(":"); ftpIP = ip[0]; this.starFtp(picktemp, ftpIP, ftpuser, ftppwd, ftpport); return "{\"rspcode\":\"00\",\"rspinfo\":\"success\",\"resource\":["+arr+"]}"; } catch (Exception e) { // TODO: handle exception e.printStackTrace(); return "{\"rspcode\":\"01\",\"rspinfo\":\"ftperror\"}"; } } /** * @desc 开启线程进行FTP上传 * @author zp * @date 2018-3-23 */ public void starFtp(String picktemp, String ftpIP,String ftpuser,String ftppwd,String ftpport) { log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "准备开启FTP上传线程------start"); //创建一个可重用固定线程数的线程池 ExecutorService pool = Executors.newFixedThreadPool(5); //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口 CreateTreadPool t1 = new CreateTreadPool(picktemp, ftpIP, ftpuser, ftppwd, ftpport); //将线程放入池中进行执行 pool.execute(t1); System.out.println(Thread.currentThread().getName()+"进入线程"); //关闭线程池 pool.shutdown(); log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "准备开启FTP上传线程------end"); }

  开启线程对FTP上传进行操作,因为考虑到本次操作总站可能不关心上传过程,所以成功与否就不实时通知总站了~~~~~

public class CreateTreadPool implements Runnable {    private static Logger log = Logger.getLogger(CreateTreadPool.class);    // public static ReentrantLock lock=new ReentrantLock();    //定义信号量,只能5个线程同时访问      final Semaphore semaphore = new Semaphore(5);     public static int c=0;    public String picktemp; //路径    public String ftpIP; // ftpip    public String ftpuser ; // ftp帐号    public String ftppwd ; // ftp密码    public String ftpport; // ftp端口    // 通过构造方法 获取FTP相关信息。    public CreateTreadPool(String picktemp, String ftpIP,String ftpuser,String ftppwd,String ftpport){        this.picktemp = picktemp;        this.ftpIP = ftpIP;        this.ftpuser = ftpuser;        this.ftppwd = ftppwd;        this.ftpport = ftpport;    }    @Override    public void run() {        // TODO Auto-generated method stub        System.out.println(Thread.currentThread().getName()+"---------上传准备中---------");         try {            //获取许可              semaphore.acquire();            System.out.println(Thread.currentThread().getName()+"获得锁");            System.out.println(Thread.currentThread().getName()+"====>"+c);             boolean isSuccess = this.FTPUpload(picktemp, ftpIP, ftpuser, ftppwd, ftpport);            if (isSuccess) {                System.out.println(Thread.currentThread().getName()+"---------上传成功---------");              }else {                System.out.println(Thread.currentThread().getName()+"---------上传失败---------");             }             c++;        } catch (Exception e) {             System.out.println(Thread.currentThread().getName()+"---------上传失败---------");            e.printStackTrace();        }finally{             System.out.println(Thread.currentThread().getName()+"------------释放锁");             semaphore.release();        }    }    /**     * @desc FTP上传     * @author zp     * @date 2018-3-23     */    public boolean FTPUpload(String filePath, String serverIp,String ftpuser,String ftppwd,String ftpport) {        log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(给总站FTP上传视频)----start");        FtpUtil ftpTool = new FtpUtil(ftpuser, ftppwd, serverIp, ftpport); // 创建链接        boolean linkStatus = ftpTool.connectServer(ftpuser, ftppwd, serverIp, ftpport);        File file = new File(filePath); // 实例化        UploadListener listener = new UploadListener(ftpTool.client); // 创建监听        ftpTool.ftpUploadFolder(file, listener); // 上传        ftpTool.disconnect(); // 关闭        RemoveFilesUtil removeFilesUtil = new RemoveFilesUtil();        removeFilesUtil.DeleteFolder(filePath); // 移除本地临时上传的文件        log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(给总站FTP上传视频)----end");        return linkStatus;    }

 

 

 

 

 

 

  

转载于:https://www.cnblogs.com/pengpengzhang/p/8629491.html

你可能感兴趣的文章
atom编辑器安装说明
查看>>
团队组员得分分配工作(改动)——PM(李忠)
查看>>
我的开源项目
查看>>
Display BLOBs and CLOBs (DB2可视化工具AQT )
查看>>
adb的使用介绍(转载)
查看>>
linux下打开windows txt文件中文乱码问题 (转载)
查看>>
JVM菜鸟进阶高手之路六(JVM每隔一小时执行一次Full GC)
查看>>
Spring Boot中使用Swagger2构建强大的RESTful API文档
查看>>
怎么看吉他简谱
查看>>
java_流程控制
查看>>
解决Azure中COULD NOT LOAD FILE OR ASSEMBLY问题
查看>>
工厂模式小结
查看>>
storm+Calcite
查看>>
四、抽象类
查看>>
[第九章]设计模式
查看>>
OpenCV——LBP(Local Binary Patterns)特征检测
查看>>
模糊控制——(4)Sugeno模糊模型
查看>>
树莓派.安装Redis环境
查看>>
小程序加载图片的坑
查看>>
jquery二维码
查看>>