之前写过一篇 python 自动化部署项目,实现了Tomcat项目的初级自动化部署功能,但是它有一个不足,只支持单个项目部署,也就是说,项目被高度定制化了,所有的配置信息都被固化在了程序里,无法扩展。所以今天,我们给它来个小小的升级,让这个工具以界面图形化的方式运行,在这个界面上,支持对要部署的Tomcat项目进行管理。该工具主要解决以下问题(痛点):
系统作了负载均衡处理,项目被分开存放在了多个不同的机器上,同时每个机器上也有多个项目,每次部署的时候,都需要工程师们手动在不同的目录之间来回切换,手动复制、粘贴、备份系统,手动删除缓存文件,手动启停web服务,步骤繁琐且容易出错。此时,如果能有一款工具,能够帮助我们自动化完成这些操作,解放工程师们的双手,应该算是大功一件了!
so,先来张效果图,让我们直观感受下。

下面正式开始!
1. 安装pysimplegui
pip install pysimplegui
2. 编写核心代码
- import os
- import json
- import time
- import shutil
- import threading
- import webbrowser
- from gui_helper import GuiHelper
- from auto_release import AutoRelease
- from PIL import Image as pil_image
- import PySimpleGUI as sg
-
- class ProjectDeploy():
-
- # 配置文件
- project_file = 'project.json'
-
- left_layout_key = 'left_layout'
-
- right_layout_key = 'right_layout'
-
- default_right_layout_key = 'default_right_layout'
-
- background_image = 'background_image.png'
-
- # 窗口元素的背景色
- element_background_color = '#FCFCFC'
-
- def run(self):
- self.load_win()
-
- '''
- 加载窗口
- '''
- def load_win(self, old_window=None):
- # 获取屏幕宽高
- (width, height) = GuiHelper.get_window_size()
- win_height = int(height)
- left_win_width = 150
- right_win_width = width - left_win_width
-
- default_right_column_layout = [
- GuiHelper.text_input('点击左侧按钮,新建一个项目'),
- ]
- image = pil_image.open(self.background_image)
- src_image_size = image.size
- background_image_size = (min(src_image_size[0], int(right_win_width*0.7)), min(src_image_size[1], int(win_height*0.7)))
- default_right_column_layout.append([sg.Image(filename=self.background_image, size=background_image_size)])
-
- left_columns = [
- sg.Column(self.load_left(), key=self.left_layout_key, size=(left_win_width, win_height), background_color=self.element_background_color)
- ]
- right_columns = [
- sg.Column(default_right_column_layout, key=self.default_right_layout_key, size=(right_win_width, win_height), background_color=self.element_background_color),
- sg.Column(self.load_right(), key=self.right_layout_key, visible=False, size=(right_win_width, win_height), background_color=self.element_background_color, scrollable=True, vertical_scroll_only=True)
- ]
- left_frame = sg.Frame('', [left_columns], background_color=self.element_background_color, border_width=1)
- right_frame = sg.Frame('', [right_columns], background_color=self.element_background_color, border_width=0)
-
- layout = [[
- sg.Column([[left_frame]], pad=(0,0), background_color=self.element_background_color),
- sg.Column([[right_frame]], pad=(30,0), background_color=self.element_background_color)
- ]]
- if old_window is not None:
- old_window.close()
- window = GuiHelper.create_window('Tomcat项目部署工具', layout = layout)
- event_list = self.load_events()
- if event_list is not None:
- for event_item in event_list:
- window[event_item[0]].set_cursor(cursor='hand2')
- GuiHelper.listen_window(window, event_list)
- GuiHelper.close_window(window)
-
- '''
- 加载左侧布局
- '''
- def load_left(self):
- left_column_layout = [
- [GuiHelper.button('主页', None, True, 10)],
- [GuiHelper.button('新增项目', '#337AB7', True, 10)],
- ]
- project_list = self.get_project_list()
- project_names = list(project_list.keys())
- for index, name in enumerate(project_names):
- left_column_layout.append(GuiHelper.text_input('{}. {}'.format(index+1, name)))
- return left_column_layout
-
- '''
- 加载右侧布局
- '''
- def load_right(self):
- button_layout = [
- sg.Column([[]], size=(150, 30), background_color=self.element_background_color),
- sg.Column([[
- GuiHelper.button('保存'),
- GuiHelper.button('删除', '#FF5722', False),
- GuiHelper.button('开始部署', '#337AB7', False, 10),
- ]], key='btn_group', justification='left', pad=(0, 30), background_color=self.element_background_color)
- ]
- right_column_layout = [
- GuiHelper.text_input('请输入基本信息'),
- GuiHelper.text_input('项目标记', 'project_name'),
- GuiHelper.text_input('Tomcat服务名称', 'service_name'),
- GuiHelper.text_input('Tomcat服务端口', 'service_port'),
- GuiHelper.text_input('Tomcat工作目录', 'work_dir', None, None, 'folder'),
- GuiHelper.text_input('Tomcat缓存目录', 'cache_dir', None, None, 'folder'),
- GuiHelper.text_input('war包文件', 'war_file_name', None, None, 'files'),
- GuiHelper.text_input('备份目录', 'backup_dir', None, None, 'folder'),
- GuiHelper.text_input('应用访问URL', 'url'),
- button_layout,
- GuiHelper.text_output('process_info', 5, 60, False),
- ]
- return right_column_layout
-
- '''
- 切换右侧布局
- @show_right_layout True 显示主布局,False 显示默认布局
- '''
- def swap_right_layout(self, window, show_right_layout=True):
- show_default_right_layout_key = False
- if not show_right_layout:
- show_default_right_layout_key = True
- window[self.default_right_layout_key].update(visible=show_default_right_layout_key)
- window[self.right_layout_key].update(visible=show_right_layout)
-
- '''
- 事件列表
- '''
- def load_events(self):
- event_list = [
- ('主页', self.home),
- ('新增项目', self.add),
- ('删除', self.delete),
- ('保存', self.save),
- ('开始部署', self.deploy)
- ]
- project_list = self.get_project_list()
- project_names = list(project_list.keys())
- for index, name in enumerate(project_names):
- event_list.append(('{}. {}'.format(index+1, name), self.show_detail))
- return event_list
-
- '''
- 获取项目名称列表
- '''
- def get_project_list(self):
- result = self.read_txt(self.project_file)
- if result is None:
- result = "{}"
- project_list = json.loads(result)
- return project_list
-
- '''
- 返回主页
- '''
- def home(self, values, window, event):
- self.swap_right_layout(window, False)
-
- '''
- 新增项目
- '''
- def add(self, values, window, event):
- self.swap_right_layout(window)
- for key in values.keys():
- # 以_c结尾表示该控件为文件选择按钮
- if key.endswith('_c'):
- window[key].update("选择...")
- else:
- window[key].update("")
- window['保存'].update(visible=True)
- window['删除'].update(visible=False)
- window['开始部署'].update(visible=False)
- window['btn_group'].update(visible=True)
- window['process_info_col'].hide_row()
-
- '''
- 查看项目信息
- '''
- def show_detail(self, values, window, event):
- project_list = self.get_project_list()
- project_names = list(project_list.keys())
- for index, name in enumerate(project_names):
- if event == '{}. {}'.format(index+1, name):
- self.update_right_text(project_list[name], window)
- window['保存'].update(visible=False)
- window['process_info_col'].hide_row()
- window['删除'].update(visible=True)
- window['开始部署'].update(visible=True)
- window['btn_group'].update(visible=True)
-
- '''
- 更新元素内容
- '''
- def update_right_text(self, values, window):
- self.swap_right_layout(window)
- keys = list(values.keys())
- for key in keys:
- # 以_c结尾表示该控件为文件选择按钮
- if key.endswith('_c'):
- window[key].update("选择...")
- else:
- window[key].update(values[key])
-
- '''
- 保存项目信息
- '''
- def save(self, values, window, event):
- if not self.verify(values):
- return False
- if(not GuiHelper.confirm('确定保存吗')):
- return False
- result = self.read_txt(self.project_file)
- if result is None:
- result = "{}"
- result = json.loads(result)
- if values['project_name'] in result:
- GuiHelper.alert('该项目标记已存在')
- return False
- result[values['project_name']] = values
- project.write_txt('project.json', json.dumps(result, ensure_ascii=False))
- GuiHelper.alert('保存成功')
- self.load_win(window)
- return True
-
- '''
- 删除项目
- '''
- def delete(self, values, window, event):
- if(not GuiHelper.confirm('确定删除吗')):
- return False
- result = self.read_txt(self.project_file)
- if result is None:
- result = "{}"
- result = json.loads(result)
- if values['project_name'] in result:
- del result[values['project_name']]
- project.write_txt('project.json', json.dumps(result, ensure_ascii=False))
- GuiHelper.alert('删除成功')
- self.load_win(window)
- return True
-
- '''
- 开始部署
- '''
- def deploy(self, values, window, event):
- if not self.verify(values):
- return False
- if(not GuiHelper.confirm('确定开始部署吗')):
- return False
- window['删除'].update(visible=False)
- window['开始部署'].update(visible=False)
- window['btn_group'].update(visible=False)
- window['process_info'].update(visible=True)
- window['process_info_col'].update(visible=True)
- window['process_info_col'].unhide_row()
- threading.Thread(target=self._deploy, args=(values, window,), daemon=True).start()
-
-
- def _deploy(self, values, window):
- # 应用端口号
- port = values['service_port']
- # 服务名称
- service_name = values['service_name']
- # war包文件列表,多个文件用英文封号隔开
- war_file_names = values['war_file_name']
- # 项目所在目录
- work_dir = values['work_dir']
- # 备份根目录
- backup_dir = values['backup_dir']
- # 缓存目录
- cache_dir = values['cache_dir']
- # 检测url
- url = values['url']
-
- try:
- start_time = int(time.time())
- war_file_list = war_file_names.split(';');
-
- # 停止服务
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【1/5】正在停止服务 {},请稍候...".format(hour_time, service_name))
- AutoRelease.stop_service(service_name)
- during_time_stop = 0;
- while True:
- pid = AutoRelease.get_pid(port)
- if pid == "" or pid == "0":
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【1/5】服务 {} 已停止".format(hour_time, service_name))
- break
- time.sleep(1)
- during_time_stop += 1
- if during_time_stop == 60:
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【1/5】服务 {} 停止超时,请手动停止".format(hour_time, service_name))
- window.refresh()
- break
-
- # 备份原文件
- for index, war_file in enumerate(war_file_list):
- if not os.path.isfile(war_file):
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【2/5】当前目录未找到文件 {}".format(hour_time, war_file))
- continue
- # 分离路径和文件名
- (src_file_path, src_file_name) = os.path.split(war_file)
- # 分离文件名和后缀
- (base_file_name, file_suffix) = os.path.splitext(src_file_name)
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【2/5】正在备份原文件 {},请稍候...".format(hour_time, base_file_name))
- zip_file_name = base_file_name+'.zip'
-
- date_str = time.strftime('%Y%m%d', time.localtime(time.time()))
- project_dir = work_dir+"/"+base_file_name
- out_zip_dir = "{}/{}".format(backup_dir, date_str)
- if not os.path.exists(out_zip_dir):
- os.makedirs(out_zip_dir)
-
- if AutoRelease.archive_file(out_zip_dir+"/"+zip_file_name, project_dir) :
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【2/5】文件 {} 备份完成,路径:{}".format(hour_time, base_file_name, out_zip_dir+"/"+zip_file_name))
-
- # 部署新文件
- for index, war_file in enumerate(war_file_list):
- if not os.path.isfile(war_file):
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【3/5】当前目录未找到文件 {}".format(hour_time, war_file))
- continue
- # 分离路径和文件名
- (src_file_path, src_file_name) = os.path.split(war_file)
- # 分离文件名和后缀
- (base_file_name, file_suffix) = os.path.splitext(src_file_name)
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【3/5】部署新文件 {},请稍候...".format(hour_time, src_file_name))
-
- date_str = time.strftime('%Y%m%d', time.localtime(time.time()))
- project_dir = work_dir+"/"+base_file_name
- out_zip_dir = "{}/{}".format(backup_dir, date_str)
- AutoRelease.copy_file(war_file, out_zip_dir)
- if os.path.isfile(work_dir+"/"+src_file_name):
- os.remove(work_dir+"/"+src_file_name)
- if os.path.exists(project_dir):
- shutil.rmtree(project_dir, True)
- if os.path.exists(cache_dir):
- shutil.rmtree(cache_dir, True)
- AutoRelease.copy_file(war_file, work_dir)
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【3/5】新文件 {} 部署完成".format(hour_time, src_file_name))
-
- # 启动服务
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【4/5】正在启动服务 {},请稍候...".format(hour_time, service_name))
- AutoRelease.start_service(service_name)
- during_time_start = 0
- while True:
- if AutoRelease.get_pid(port) != "":
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【4/5】服务 {} 已启动".format(hour_time, service_name))
- break
- time.sleep(1)
- during_time_start += 1
- if during_time_start == 60:
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【4/5】服务 {} 启动超时,请手动启动".format(hour_time, service_name))
- break
-
- # 检查应用
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【5/5】正在检测应用是否可以正常访问,请稍候...".format(hour_time))
- during_time_access = 0
- while True:
- if AutoRelease.get_http_status(url):
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【5/5】应用已可以正常访问".format(hour_time))
- break
- time.sleep(5)
- during_time_access += 5
- if during_time_access == 60:
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}【5/5】应用访问超时".format(hour_time))
- break
- except Exception as e:
- hour_time = time.strftime('%H:%M:%S', time.localtime(time.time()))
- GuiHelper.print_info(window, "{}部署异常:{}".format(hour_time, str(e)))
- else:
- GuiHelper.print_info(window, "{0},用时 {1} 秒".format("部署完毕", int(time.time()) - start_time))
- webbrowser.open(url)
- window['删除'].update(visible=True)
- window['开始部署'].update(visible=True)
- window['btn_group'].update(visible=True)
-
- def verify(self, values):
- keys = {
- 'project_name': '项目标记名称不能为空',
- 'service_name': '服务名称不能为空',
- 'service_port': '端口号不能为空',
- 'work_dir': '请选择工作目录',
- 'cache_dir': '请选择缓存目录',
- 'war_file_name': '请选择war包文件',
- 'backup_dir': '请选择项目备份目录',
- 'url': '访问URL不能为空',
- }
- for key, text in keys.items():
- if GuiHelper.is_empty(values, key):
- GuiHelper.alert(text)
- return False
- return True
-
- def read_txt(self, file, encoding="utf-8"):
- if not os.path.isfile(file):
- return None
- with open(file, "r", encoding=encoding) as f:
- result = f.read()
- return result.rstrip()
-
- def write_txt(self, file, content, overwrite=True, encoding="utf-8"):
- model = "w+" if overwrite else "a+"
- with open(file, model, encoding=encoding) as f:
- f.write(content)
-
-
- if __name__ == "__main__":
- project = ProjectDeploy()
- project.run()
3. 打包运行
工具虽然以界面方式运行,但是部分操作需要与命令行窗口进行交互,所以打包的时候请不要添加 -w 参数,否则会报 “句柄无效” 错误,直接用如下命令打包即可。
pyinstaller -F project_deploy.py
最终的成品如下:

