1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
|
# 在远程机执行python脚本命令 # stdin, stdout, stderr = client.exec_command("python /home/test.py") import paramiko # 发起ssh请求 import wx # python界面 from threading import Thread # python多线程 from pubsub import pub # python消息通知 import time # 用于延时操作 from threading import Timer
import pickle # 保存本地数据 import signal import os.path
import sys
# 初始化ssh实例,发起ssh请求 ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
pid = 0
# 界面绘制 app = wx.App() frame = wx.Frame(parent=None, title='远程打包程序', size=(800, 600))
frame.Show()
canOutput = True
resultStdout = '' # 接受到pub的通知 异步处理
def handler(signalnum): # 查找打包等相关进程 并杀掉 command = "ps -ef | grep -E 'xcodebuild|git|pod|fir|curl' | grep -v grep | awk '{print $2}'" stdin, stdout, stderr = ssh.exec_command(command) out = stdout.read().decode() print(out) out = out.split() if(len(out) > 0): command = '' for p in out: command += "kill -9 "+p+"\n" ssh.exec_command(command) print('exit is done!') ssh.close() app.ExitMainLoop()
frame.Bind(wx.EVT_CLOSE, handler, frame)
# 定时循环调用给子线程发送消息 def prinTime(inc, l): global canOutput canOutput = bool(1-canOutput) if canOutput == False: t = Timer(inc, prinTime, (inc, l,)) t.start() else: wx.CallAfter(pub.sendMessage, "update", msg=l)
# 从缓存中读取ssh执行返回 def line_buffered(f): line_buf = ""
while not f.channel.exit_status_ready(): try: line_buf += f.read(5).decode('utf-8')
except: if line_buf.endswith('\n'): yield line_buf line_buf = '' else: if line_buf.endswith('\n'): yield line_buf line_buf = '' else: lab.AppendText("上传完毕")
# 子线程
class TestThread(Thread):
def __init__(self): # 线程实例化时立即启动 Thread.__init__(self) self.start() pub.subscribe(self.updateDisplay, "update")
def run(self): # 线程执行的代码
for l in line_buffered(resultStdout): # print(l) if canOutput == True: # 隔一秒发送一次消息,绘制一次太快了容易卡住 prinTime(1, l)
# 接受到消息 把终端输出绘制到label def updateDisplay(self, msg): # if (lab.GetNumberOfLines() > 100): # lab.Clear() # else: # lab.AppendText(msg) lab.AppendText(msg) lab.ShowPosition(lab.GetLastPosition())
# 弹框
class TextEntryDialog(wx.Dialog): def __init__(self, parent=None, title='Title', caption='Caption', size=(500, 200)):
style = wx.DEFAULT_DIALOG_STYLE | wx.RESIZE_BORDER super(TextEntryDialog, self).__init__( parent, -1, title=title, style=style) # self.text = wx.StaticText(self, -1, caption) # self.input = wx.TextCtrl( # self, -1, pos=(100, 100)) # self.text1 = wx.StaticText(self, 2, caption) # self.input1 = wx.TextCtrl( # self, 2) # # self.input.SetInitialSize(size=(500, 60)) # self.buttons = self.CreateButtonSizer(wx.OK | wx.CANCEL) # self.sizer = wx.BoxSizer(wx.VERTICAL) # self.sizer.Add(self.text, 0, wx.ALL, 5) # self.sizer.Add(self.input, 1, wx.EXPAND | wx.ALL, 5) # self.sizer.Add(self.text1, 2, wx.ALL, 6) # self.sizer.Add(self.input1, 3, wx.EXPAND | wx.ALL, 6) # self.sizer.Add(self.buttons, 0, wx.EXPAND | wx.ALL, 5) # self.SetSizerAndFit(self.sizer) # self.Center()
def SetValue(self, value): self.input.SetValue(value)
def GetValue(self): return self.input.GetValue()
def GetText(self): return self.input.LabelText
ip = ''
button = wx.Button(frame, 0, "连接", pos=(100, 150))
def startssh(self): lab.SetEditable(False) global resultStdout ssh.connect(ip, username=str(usernameinput.GetValue()), password=str(passwordinput.GetValue())) stdin, resultStdout, stderr = ssh.exec_command( 'echo $$; exec '+'~/Documents/work/xcodebuild.sh') # 接受返回值pid 方便关闭应用 pid = resultStdout.readline() TestThread() lab.Show() ulab.Hide() plab.Hide() usernameinput.Hide() passwordinput.Hide() button.Hide()
lab = wx.TextCtrl(frame, 0, "正在连接\n", pos=(10, 10), size=(780, 580), style=wx.BORDER_SIMPLE | wx.TE_MULTILINE | wx.HSCROLL | wx.TE_WORDWRAP) lab.Hide()
ulab = wx.StaticText(frame, -1, "用户名", pos=(10, 10)) plab = wx.StaticText(frame, -1, "密码", pos=(10, 80))
usernameinput = wx.TextCtrl(frame, 0, "xxx", pos=(100, 10), size=(200, 50)) passwordinput = wx.TextCtrl(frame, 0, "123456", pos=(100, 80), size=(200, 50))
button.Bind(wx.EVT_BUTTON, startssh, button)
dialog = wx.TextEntryDialog(None, 'IP', '连接ssh', style=wx.TE_MULTILINE | wx.OK | wx.CANCEL) dialog.SetInitialSize(size=(500, 200)) dialog.SetValue("192.168.7.16") if dialog.ShowModal() == wx.ID_OK: ip = dialog.GetValue() print("ok") dialog.Destroy()
else: print(dialog.Destroy()) app.ExitMainLoop() exit(0)
# 启动子线程 dialog.Destroy()
app.MainLoop()
|