Package x2go :: Module utils
[frames] | no frames]

Source Code for Module x2go.utils

  1  # -*- coding: utf-8 -*- 
  2   
  3  # Copyright (C) 2010-2011 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de> 
  4  # 
  5  # Python X2go is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU General Public License as published by 
  7  # the Free Software Foundation; either version 3 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # Python X2go is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU General Public License 
 16  # along with this program; if not, write to the 
 17  # Free Software Foundation, Inc., 
 18  # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. 
 19   
 20  """\ 
 21  Python X2go helper functions, constants etc. 
 22   
 23  """ 
 24  __NAME__ = 'x2goutils-pylib' 
 25   
 26  import sys 
 27  import os 
 28  import locale 
 29  import re 
 30  import types 
 31  import copy 
 32  import paramiko 
 33  import socket 
 34  import gevent 
 35  import string 
 36  import re 
 37  import subprocess 
 38   
 39  # Python X2go modules 
 40  from defaults import X2GOCLIENT_OS as _X2GOCLIENT_OS 
 41  from defaults import X2GO_SESSIONPROFILE_DEFAULTS as _X2GO_SESSIONPROFILE_DEFAULTS 
 42  from defaults import X2GO_MIMEBOX_ACTIONS as _X2GO_MIMEBOX_ACTIONS 
 43  from defaults import pack_methods_nx3 
 44   
 45  if _X2GOCLIENT_OS == 'Windows': 
 46      import win32api 
 47   
48 -def is_in_nx3packmethods(method):
49 50 """\ 51 Test if a given compression method is valid for NX3 Proxy. 52 53 """ 54 return method in pack_methods_nx3
55 56
57 -def find_session_line_in_x2golistsessions(session_name, x2go_stdout):
58 """\ 59 Return the X2go session meta information as returned by the 60 C{x2golistsessions} server command for session C{session_name}. 61 62 """ 63 sessions = stdout.read().split("\n") 64 for line in sessions: 65 # skip empty lines 66 if not line: 67 continue 68 if session_name == line.split("|")[1]: 69 return line 70 return None
71 72
73 -def slugify(value):
74 """\ 75 Normalizes string, converts to lowercase, removes non-alpha characters, 76 converts spaces to hyphens and replaces round brackets by pointed brackets. 77 78 """ 79 import unicodedata 80 value = unicodedata.normalize('NFKD', unicode(value)).encode('ascii', 'ignore') 81 value = re.sub('[^\w\s-]', '', value).strip().lower() 82 value = re.sub('[(]', '<', value).strip().lower() 83 value = re.sub('[)]', '>', value).strip().lower() 84 return value
85
86 -def _genSessionProfileId():
87 """\ 88 Generate a session profile ID as used in x2goclient's sessions config file. 89 90 """ 91 import datetime 92 return datetime.datetime.utcnow().strftime('%Y%m%d%H%m%S%f')
93 94
95 -def _checkIniFileDefaults(defaults):
96 """\ 97 Check an ini file data structure passed on by a user app or class. 98 99 """ 100 if defaults is None: 101 return False 102 if type(defaults) is not types.DictType: 103 return False 104 for sub_dict in defaults.values(): 105 if type(sub_dict) is not types.DictType: 106 return False 107 return True
108 109
110 -def _checkSessionProfileDefaults(defaults):
111 """\ 112 Check the data structure of a default session profile passed by a user app. 113 114 """ 115 if defaults is None: 116 return False 117 if type(defaults) is not types.DictType: 118 return False 119 return True
120 121
122 -def _convert_SessionProfileOptions_2_SessionParams(_options):
123 """\ 124 Convert session profile options as used in x2goclient's sessions file to 125 Python X2go session parameters. 126 127 """ 128 129 _params = copy.deepcopy(_options) 130 131 # get rid of unknown session profile options 132 _known_options = _X2GO_SESSIONPROFILE_DEFAULTS.keys() 133 for p in _params.keys(): 134 if p not in _known_options: 135 del _params[p] 136 137 _rename_dict = { 138 'host': 'server', 139 'user': 'username', 140 'soundsystem': 'snd_system', 141 'sndport': 'snd_port', 142 'type': 'kbtype', 143 'layout': 'kblayout', 144 'speed': 'link', 145 'sshport': 'port', 146 'useexports': 'allow_share_local_folders', 147 'usemimebox': 'allow_mimebox', 148 'mimeboxextensions': 'mimebox_extensions', 149 'mimeboxaction': 'mimebox_action', 150 'print': 'printing', 151 'name': 'profile_name', 152 'key': 'key_filename', 153 'command': 'cmd', 154 'rdpserver': 'rdp_server', 155 'rdpoptions': 'rdp_options', 156 'xdmcpserver': 'xdmcp_server', 157 'useiconv': 'convert_encoding', 158 'iconvto': 'server_encoding', 159 'iconvfrom': 'client_encoding', 160 'usesshproxy': 'use_sshproxy', 161 'sshproxyhost': 'sshproxy_host', 162 'sshproxyuser': 'sshproxy_user', 163 'sshproxykeyfile': 'sshproxy_key_filename', 164 'sshproxytunnel': 'sshproxy_tunnel', 165 } 166 _speed_dict = { 167 '0': 'modem', 168 '1': 'isdn', 169 '2': 'adsl', 170 '3': 'wan', 171 '4': 'lan', 172 } 173 174 for opt, val in _options.iteritems(): 175 176 # rename options if necessary 177 if opt in _rename_dict.keys(): 178 del _params[opt] 179 opt = _rename_dict[opt] 180 _params[opt] = val 181 182 # translate integer values for connection speed to readable strings 183 if opt == 'link': 184 val = str(val).lower() 185 if val in _speed_dict.keys(): 186 val = _speed_dict[val] 187 val = val.lower() 188 _params['link'] = val 189 190 # share_local_folders is a list 191 if opt in ('share_local_folders', 'mimebox_extensions'): 192 if type(val) is types.StringType: 193 if val: 194 _params[opt] = val.split(',') 195 else: 196 _params[opt] = [] 197 198 # append value for quality to value for pack method 199 if _params['quality']: 200 _params['pack'] = '%s-%s' % (_params['pack'], _params['quality']) 201 # delete quality in any case... 202 del _params['quality'] 203 204 del _params['fstunnel'] 205 206 if _params.has_key('export'): 207 208 _export = _params['export'] 209 del _params['export'] 210 # fix for wrong export field usage in PyHoca-GUI/CLI and python-x2go before 20110923 211 _export = _export.replace(",", ";") 212 213 _export = _export.strip().strip('"').strip().strip(';').strip() 214 _export_list = [ f for f in _export.split(';') if f ] 215 216 _params['share_local_folders'] = [] 217 for _shared_folder in _export_list: 218 # fix for wrong export field usage in PyHoca-GUI/CLI and python-x2go before 20110923 219 if not ":" in _shared_folder: _shared_folder = "%s:1" % _shared_folder 220 if _shared_folder.split(":")[1] == "1": 221 _params['share_local_folders'].append(_shared_folder.split(":")[0]) 222 223 if not _options['fullscreen']: 224 _params['geometry'] = '%sx%s' % (_options['width'], _options['height']) 225 else: 226 _params['geometry'] = 'fullscreen' 227 del _params['width'] 228 del _params['height'] 229 del _params['fullscreen'] 230 231 if not _options['sound']: 232 _params['snd_system'] = 'none' 233 del _params['sound'] 234 235 if _options['rootless']: 236 _params['session_type'] = 'application' 237 else: 238 _params['session_type'] = 'desktop' 239 del _params['rootless'] 240 241 if _params['mimebox_action'] not in _X2GO_MIMEBOX_ACTIONS.keys(): 242 _params['mimebox_action'] = 'OPEN' 243 244 if not _options['usekbd']: 245 _params['kbtype'] = 'null/null' 246 _params['kblayout'] = 'null' 247 del _params['usekbd'] 248 249 # currently known but ignored in Python X2go 250 _ignored_options = [ 251 'dpi', 252 'setdpi', 253 'startsoundsystem', 254 'soundtunnel', 255 'defsndport', 256 'icon', 257 'applications', 258 ] 259 for i in _ignored_options: 260 del _params[i] 261 262 return _params
263 264
265 -def session_names_by_timestamp(session_infos):
266 """\ 267 Sorts session profile names by their timestamp (as used in the file format's section name). 268 269 """ 270 session_names = session_infos.keys() 271 sortable_session_names = [ '%s|%s' % (session_name.split('-')[2].split('_')[0], session_name) for session_name in session_names ] 272 sortable_session_names.sort() 273 return [ session_name.split('|')[1] for session_name in sortable_session_names ]
274 275
276 -def touch_file(filename, mode='a'):
277 """\ 278 Imitates the behaviour of the GNU/touch command. 279 280 @param filename: name of the file to touch 281 @type filename: C{str} 282 @param mode: the file mode (as used for Python file objects) 283 @type mode: C{str} 284 """ 285 if not os.path.isdir(os.path.dirname(filename)): 286 os.makedirs(os.path.dirname(filename), mode=00700) 287 f = open(filename, mode=mode) 288 f.close()
289 290
291 -def unique(seq):
292 """\ 293 Imitates the behaviour of the GNU/uniq command. 294 295 @param seq: a list/sequence containing consecutive duplicates. 296 @type seq: C{list} 297 298 @return: list that has been clean up from the consecutive duplicates 299 @rtype: C{list} 300 """ 301 # order preserving 302 noDupes = [] 303 [noDupes.append(i) for i in seq if not noDupes.count(i)] 304 return noDupes
305 306
307 -def known_encodings():
308 """\ 309 Render a list of all-known-to-Python character encodings (including 310 all known aliases) 311 312 """ 313 from encodings.aliases import aliases 314 _raw_encname_list = [] 315 _raw_encname_list.extend(aliases.keys()) 316 _raw_encname_list.extend(aliases.values()) 317 _raw_encname_list.sort() 318 _encname_list = [] 319 for _raw_encname in _raw_encname_list: 320 _encname = _raw_encname.upper() 321 _encname = _encname.replace('_', '-') 322 _encname_list.append(_encname) 323 _encname_list.sort() 324 _encname_list = unique(_encname_list) 325 return _encname_list
326 327
328 -def patiently_remove_file(dirname, filename):
329 """\ 330 Try to remove a file, wait for unlocking, remove it once removing is possible... 331 332 @param dirname: directory name the file is in 333 @type dirname: C{str} 334 @param filename: name of the file to be removed 335 @type filename: C{str} 336 """ 337 _not_removed = True 338 while _not_removed: 339 try: 340 os.remove(os.path.join(dirname, filename)) 341 _not_removed = False 342 except: 343 # file is probably locked 344 gevent.sleep(5)
345
346 -def detect_unused_port(bind_address='', preferred_port=None):
347 """\ 348 Detect an unused IP socket. 349 350 @param bind_address: IP address to bind to 351 @type bind_address: C{str} 352 @param preferred_port: IP socket port that shall be tried first for availability 353 @type preferred_port: C{str} 354 355 @return: free local IP socket port that can be used for binding 356 @rtype: C{str} 357 """ 358 359 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 360 try: 361 if preferred_port: 362 sock.bind((bind_address, preferred_port)) 363 ipaddr, port = sock.getsockname() 364 else: 365 raise 366 except: 367 sock.bind(('', 0)) 368 ipaddr, port = sock.getsockname() 369 return port
370
371 -def get_encoding():
372 """\ 373 Detect systems default character encoding. 374 375 @return: The system's local character encoding. 376 @rtype: C{str} 377 """ 378 try: 379 encoding = locale.getdefaultlocale()[1] 380 if encoding is None: 381 raise BaseException 382 except: 383 try: 384 encoding = sys.getdefaultencoding() 385 except: 386 encoding = 'ascii' 387 return encoding
388
389 -def is_abs_path(path):
390 """\ 391 Test if a given path is an absolute path name. 392 393 @param path: test this path for absolutism... 394 @type path: C{str} 395 396 @return: Returns C{True} if path is an absolute path name 397 @rtype: C{bool} 398 """ 399 return bool((path.startswith('/') or re.match('^[%s]\:\\\\' % string.ascii_letters, path)))
400
401 -def xkb_rules_names():
402 """\ 403 Wrapper for: xprop -root _XKB_RULES_NAMES 404 405 @return: A Python dictionary that contains the current X11 keyboard rules. 406 @rtype: C{dict} 407 408 """ 409 p = subprocess.Popen(['xprop', '-root', '_XKB_RULES_NAMES',], stdout=subprocess.PIPE, ) 410 _rn_list = p.stdout.read().split('"') 411 _rn_dict = { 412 'rules': _rn_list[1], 413 'model': _rn_list[3], 414 'layout': _rn_list[5], 415 'variant': _rn_list[7], 416 'options': _rn_list[9], 417 } 418 return _rn_dict
419
420 -def local_color_depth():
421 """\ 422 Detect the current local screen's color depth. 423 424 """ 425 if _X2GOCLIENT_OS != 'Windows': 426 try: 427 p = subprocess.Popen(['xwininfo', '-root',], stdout=subprocess.PIPE, ) 428 _depth_line = [ _info.strip() for _info in p.stdout.read().split('\n') if 'Depth:' in _info ][0] 429 _depth = _depth_line.split(' ')[1] 430 return int(_depth) 431 except IndexError: 432 # a sensible default value 433 return 24 434 except OSError: 435 # for building your package... 436 return 24 437 438 else: 439 return win32api.GetSystemMetrics(2)
440
441 -def is_color_depth_ok(depth_session, depth_local):
442 """\ 443 Test if color depth of this session is compatible with the 444 local screen's color depth. 445 446 @param depth_session: color depth of the session 447 @type depth_session: C{int} 448 @param depth_local: color depth of local screen 449 @type depth_local: C{int} 450 451 @return: Does the session color depth work with the local display? 452 @rtype: C{bool} 453 454 """ 455 if depth_session == 0: 456 return True 457 if depth_session == depth_local: 458 return True 459 if ( ( depth_session == 24 or depth_session == 32 ) and ( depth_local == 24 or depth_local == 32 ) ): 460 return True; 461 return False
462