Source code for perllib

# perllib module for pythonizer, generated by makelib.py on Wed Apr 19 09:51:29 2023
#
# WARNING: Do not edit this file - to change the functions or add new ones, edit them in the pyf directory,
#          then re-run makelib.py
#
__author__ = """Joe Cool"""
___email__ = 'snoopyjc@gmail.com'
__version__ = '1.030'

import sys,os,re,fileinput,subprocess,collections.abc,warnings,inspect,itertools,signal,traceback,io,tempfile,calendar,types,random,dataclasses,builtins,codecs,struct,pprint,functools,argparse,abc,copy
import time as tm_py
import stat as st_py
_str = lambda s: "" if s is None else str(s)
try:
    import fcntl as fc_py
except Exception:
    pass

[docs]class Die(Exception): def __init__(self, *args,suppress_traceback=None): super().__init__(*args) if TRACEBACK and not suppress_traceback: cluck()
TRACEBACK = 0 WARNING = 0 OUTPUT_AUTOFLUSH = 0 EXCEPTIONS_BEING_CAUGHT = '' CHILD_ERROR = 0 TRACE_RUN = 0 AUTODIE = 0 SIG_DIE_HANDLER = None OUTPUT_LAYERS = '' BASETIME = tm_py.time() OUTPUT_RECORD_SEPARATOR = '' _OPEN_MODE_MAP = {'<': 'r', '>': 'w', '+<': 'r+', '+>': 'w+', '>>': 'a', '+>>': 'a+', '|': '|-'} SIG_WARN_HANDLER = None INPUT_RECORD_SEPARATOR = "\n" OS_ERROR = '' LIST_SEPARATOR = ' ' EVAL_ERROR = '' INPUT_LAYERS = '' _TIE_MAP = {'DELETE': '__delitem__','STORE': '__setitem__','SCALAR': '__len__','FETCHSIZE': '__len__','UNTIE': '__untie__','FETCH': '__getitem__','DESTROY': '__del__','CLEAR': 'clear','PUSH': 'append','EXISTS': '__contains__'} _INPUT_FH_NAME = None OUTPUT_FIELD_SEPARATOR = '' _DUP_MAP = dict(STDIN=0, STDOUT=1, STDERR=2) _PYTHONIZER_KEYWORDS = {'pass','False','print','fcntl','complex','async','lambda','bytearray','oct','property','class','raise','iter','as','traceback','aiter','next','not','dict','get','sum','list','break','re','pow','object','or','min','compile','map','inspect','None','True','atexit','set','warnings','isinstance','return','float','continue','find','tempfile','str','import','int','update','open','upper','try','calendar','bool','hex','abs','repr','signal','builtins','any','dir','Hash','with','sys','getattr','eval','math','keys','exec','hasattr','dataclasses','setattr','subprocess','yield','insert','lower','if','frozenset','help','divmod','issubclass','round','fileinput','Array','bin','assert','memoryview','for','classmethod','all','ArrayHash','globals','id','callable','types','input','is','rfind','anext','tm_py','pdb','argparse','await','filter','bytes','len','max','finally','and','codecs','collections.abc','pop','vars','enumerate','ascii','io','rstrip','struct','abc','staticmethod','locals','del','except','type','reversed','functools','elif','glob','format','copy','slice','wantarray','close','while','random','super','values','os','def','breakpoint','hash','nonlocal','in','casefold','stat','itertools','ord','global','chr','sorted','zip','range','delattr','from','else','getopt','perllib','tuple','extend'} INPUT_LINE_NUMBER = 0
[docs]def init_package(name, is_class=False, isa=(), autovivification=True): """Initialize a package by creating a namespace for it""" global TRACEBACK, AUTODIE, TRACE_RUN, WARNING if name == 'main': if (v := os.environ.get('PERLLIB_TRACEBACK')) is not None: TRACEBACK = int(v) if v.isdecimal() else 1 if (v := os.environ.get('PERLLIB_AUTODIE')) is not None: AUTODIE = int(v) if v.isdecimal() else 1 if (v := os.environ.get('PERLLIB_TRACE_RUN')) is not None: TRACE_RUN = int(v) if v.isdecimal() else 1 if (v := os.environ.get('PERLLIB_WARNING')) is not None: WARNING = int(v) if v.isdecimal() else 1 pieces = name.split('.') parent = builtins parent_name = '' package_name = '' class perllibMeta(abc.ABCMeta): def __str__(self): # so str(class_v) works return self.__name__.replace('.', '::') if isinstance(self, type) else super().__str__() for i, piece in enumerate(pieces): prior_namespace = namespace = None if hasattr(parent, piece): namespace = getattr(parent, piece) if parent_name: package_name = parent_name + '.' + piece else: package_name = piece if (is_class or isa) and i == len(pieces)-1 and not isinstance(namespace, type): prior_namespace = namespace # we have the wrong type of namespace - copy it over below namespace = None if namespace is None: if (is_class or isa) and i == len(pieces)-1: class_parents = [] any_parent_is_class = False if not hasattr(builtins, 'UNIVERSAL'): if autovivification: builtins.UNIVERSAL = _ArrayHashClass else: builtins.UNIVERSAL = type('UNIVERSAL', tuple(), dict()) for p in isa: py = p.replace("'", '.').replace('::', '.') if hasattr(builtins, f"{py}_"): # handle names that need to be escaped py = f"{py}_" if hasattr(builtins, py): parent_namespace = getattr(builtins, py) if not isinstance(parent_namespace, type): # Promote the parent namespace to a class if it isn't one already if autovivification: new_parent_namespace = type(name, (_ArrayHashClass,), Hash()) else: new_parent_namespace = type(name, tuple(), dict()) new_parent_namespace.__class__ = perllibMeta new_parent_namespace.__eq__ = lambda self, other: self is other new_parent_namespace.__bool__ = lambda self: True for k, v in parent_namespace.__dict__.items(): setattr(new_parent_namespace, k, v) parent_package_name = parent_namespace.__PACKAGE__ setattr(builtins, parent_package_name, new_parent_namespace) setattr(builtins, f"main.{parent_package_name}", new_parent_namespace) if(parent_namespace.__PARENT__): ppn_pieces = parent_package_name.split('.') grandparent_package_name = parent_namespace.__PARENT__ grandparent_namespace = getattr(builtins, grandparent_package_name) setattr(grandparent_namespace, ppn_pieces[-1], new_parent_namespace) parent_namespace = new_parent_namespace if class_parents and type(class_parents[0]) != type(parent_namespace) and \ type(class_parents[0]).__name__ == type(parent_namespace).__name__: # Avoid: TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases parent_namespace.__class__ = class_parents[0].__class__ class_parents.append(parent_namespace) if isinstance(parent_namespace, type): any_parent_is_class = True if autovivification: if is_class and not any_parent_is_class: class_parents.append(_ArrayHashClass) namespace = type(name, tuple(class_parents), Hash()) else: if is_class and not any_parent_is_class: class_parents.append(builtins.UNIVERSAL) namespace = type(name, tuple(class_parents), dict()) if is_class or any_parent_is_class: for key in dir(namespace): if isinstance(getattr(namespace, key), types.MethodType): setattr(namespace, key, types.MethodType(getattr(namespace, key).__func__, namespace)) if is_class or any_parent_is_class: namespace.__class__ = perllibMeta namespace.__eq__ = lambda self, other: self is other namespace.__bool__ = lambda self: True if prior_namespace is not None: for k, v in prior_namespace.__dict__.items(): setattr(namespace, k, v) else: namespace = types.SimpleNamespace() namespace.__autovivification__ = autovivification if parent_name: package_name = parent_name + '.' + piece else: package_name = piece namespace.__PARENT__ = parent_name namespace.__PACKAGE__ = package_name setattr(parent, piece, namespace) if parent != builtins: setattr(builtins, package_name, namespace) if pieces[0] != 'main': if not hasattr(builtins, 'main'): init_package('main') setattr(builtins.main, package_name, namespace) setattr(builtins, f"main.{package_name}", namespace) elif name != 'main': setattr(builtins, f"main.{piece}", namespace) if pieces[0] != 'main': if not hasattr(builtins, 'main'): init_package('main') setattr(builtins.main, piece, namespace) parent = namespace parent_name = package_name if not hasattr(builtins, '__packages__'): builtins.__packages__ = set() builtins.__packages__.add(name) return namespace
[docs]def Array(init=None): """Array with autovivification""" return ArrayHash(init, isHash=False)
class _ArrayHash(collections.defaultdict, collections.abc.Sequence): """Implements autovivification of array elements and hash keys""" def __init__(self, fcn, isHash=None): self.isHash = isHash # Can be None (not determined yet), False (is an Array), or True (is a Hash) super().__init__(fcn) def get(self, key, default=None): if self.isHash: return super().get(key, default) elif self.isHash is False: if key < 0: key += len(self) return self[key] if key >= 0 and key < len(self) else default else: return default def append(self, value): if self.isHash is None: self.isHash = False elif self.isHash: raise TypeError('Not an ARRAY reference') self[len(self)] = value def copy(self): if self.isHash: return Hash(self) elif self.isHash is None: return ArrayHash(self) else: return Array(self) def extend(self, lst): if self.isHash is None: self.isHash = False elif self.isHash and len(lst): raise TypeError('Not an ARRAY reference') ln = len(self) for item in lst: self[ln] = item ln += 1 def update(self, values): if self.isHash is None: self.isHash = True elif not self.isHash: raise TypeError('Not a HASH reference') for k, v in values.items(): self[k] = v def pop(self, key=-1, default=None): if self.isHash: if key in self: value = self[key] del self[key] return value return default else: ls = len(self) if not ls: return None if key < 0: key += ls if key < ls: value = self[key] for i in range(key, ls-1): self[i] = self[i+1] del self[ls-1] return value return None def remove(self, x): if self.isHash: raise TypeError('Not an ARRAY reference') for ndx in range(len(self)): if self[ndx] == x: return self.pop(ndx) raise ValueError(f"remove({x}): not found in Array") def __getitem__(self, index): if self.isHash: try: return super().__getitem__(index) except (TypeError, KeyError): return super().__getitem__(str(index)) elif self.isHash is None: if isinstance(index, int) or isinstance(index, slice): self.isHash = False else: self.isHash = True try: return super().__getitem__(index) except TypeError: return super().__getitem__(str(index)) if isinstance(index, int): if index < 0: index += len(self) return super().__getitem__(index) elif isinstance(index, slice): return Array([self[i] for i in range(*index.indices(len(self)))]) else: raise TypeError('Not a HASH reference') def __setitem__(self, index, value): if self.isHash: try: super().__setitem__(index, value) except TypeError: super().__setitem__(str(index), value) return elif self.isHash is None: if isinstance(index, int) or isinstance(index, slice): self.isHash = False else: self.isHash = True try: super().__setitem__(index, value) except TypeError: super().__setitem__(str(index), value) if isinstance(index, int): if index < 0: index += len(self) for i in range(len(self), index): super().__setitem__(i, None) super().__setitem__(index, value) return elif isinstance(index, slice): if index.start is not None: for i in range(len(self), index.start): super().__setitem__(i, None) value = iter(value) ndx = index.start if index.start is not None else 0 j = None for i in range(*index.indices(len(self))): try: super().__setitem__(i, next(value)) except StopIteration: if j is None: j = i self.pop(j) ndx += 1 rest = list(value) lr = len(rest) if lr: for i in range(len(self)-1,ndx-1,-1): # Move everything else up super().__setitem__(i+lr, super().__getitem__(i)) for i in range(lr): super().__setitem__(i+ndx, rest[i]) def __delitem__(self, index): if self.isHash: try: super().__delitem__(index) except (TypeError, KeyError): super().__delitem__(str(index)) elif isinstance(index, int): if self.isHash: raise TypeError('Not an ARRAY reference') ls = len(self) if not ls: return if index < 0: index += len(self) super().__delitem__(index) elif isinstance(index, slice): if self.isHash: raise TypeError('Not an ARRAY reference') for i in range(*index.indices(len(self))): super().__delitem__(i) def __iter__(self): if self.isHash: for i in self.keys(): yield i else: for i in range(len(self)): yield self[i] def __str__(self): if self.isHash: return str(dict(self)) elif self.isHash is None: return '' return str(list(self)) def __repr__(self): if self.isHash: return "Hash(" + self.__str__() + ")" elif self.isHash is None: return "ArrayHash(" + self.__str__() + ")" return "Array(" + self.__str__() + ")" def __add__(self, other): result = ArrayHash(self) if self.isHash or (isinstance(other, dict) and not isinstance(other, _ArrayHash)) or (hasattr(other, 'isHash') and other.isHash): result.update(other) elif self.isHash is None and isinstance(other, (int, float, str)): return other else: result.extend(other) return result def __iadd__(self, other): if self.isHash or (isinstance(other, dict) and not isinstance(other, _ArrayHash)) or (hasattr(other, 'isHash') and other.isHash): self.update(other) elif self.isHash is None and isinstance(other, (int, float, str)): return other else: self.extend(other) return self def __radd__(self, other): result = ArrayHash() if self.isHash or (isinstance(other, dict) and not isinstance(other, _ArrayHash)) or (hasattr(other, 'isHash') and other.isHash): result.update(other) result.update(self) elif self.isHash is None and isinstance(other, (int, float, str)): return other else: result.extend(other) result.extend(self) return result def __mul__(self, other): if isinstance(other, int): if self.isHash is False: # Only perform multiplication for arrays new_data = ArrayHash(isHash=False) for _ in range(other): new_data.extend(self) return new_data else: raise TypeError("Can only multiply Array instances by an integer") else: raise TypeError("Can only multiply Array by an integer") def __rmul__(self, other): return self.__mul__(other) def __eq__(self, other): if self.isHash is None: if hasattr(other, 'isHash') and other.isHash is None: return True try: return '' == other except Exception: pass try: return 0 == other except Exception: pass try: return 0 == len(other) except Exception: pass if other is None: return True return False try: if(len(self) != len(other)): return False i1 = iter(self) i2 = iter(other) while True: if next(i1) != next(i2): return False return True except StopIteration: return True except Exception: return False def __lt__(self, other): if self.isHash is None: if hasattr(other, 'isHash') and other.isHash is None: return False try: return '' < other except Exception: pass try: return 0 < other except Exception: pass try: return 0 < len(other) except Exception: pass if other is None: return False return False try: i1 = iter(self) i2 = iter(other) while True: ni1 = next(i1) try: ni2 = next(i2) except StopIteration: return False if ni1 < ni2: return True elif ni1 > ni2: return False except StopIteration: try: next(i2) except StopIteration: return False return True except Exception: return False def __ne__(self, other): return not self == other def __le__(self, other): return self < other or self == other def __ge__(self, other): return not self < other def __gt__(self, other): return not (self < other or self == other) def __contains__(self, key): if self.isHash: return key in self.keys() else: return any(key == self[i] for i in range(len(self)))
[docs]def ArrayHash(init=None,isHash=None): """Acts like an array or hash with autovivification""" result = _ArrayHash(ArrayHash,isHash=isHash) if init is not None: if isinstance(init, _ArrayHash) or '_ArrayHash' in str(type(init)): if init.isHash: result.update(init) else: result.extend(init) elif isinstance(init, collections.abc.Mapping): result.update(init) elif isinstance(init, collections.abc.Iterable) and not isinstance(init, str): result.extend(init) else: result.append(init) return result
def _partialclass(cls, *args, **kwds): class UNIVERSAL(cls): __init__ = functools.partialmethod(cls.__init__, *args, **kwds) return UNIVERSAL _ArrayHashClass = _partialclass(_ArrayHash, ArrayHash)
[docs]def Hash(init=None): """Hash with autovivification""" return ArrayHash(init, isHash=True)
[docs]def abspath(*args): """Implementation of perl Cwd::abs_path function""" if len(args) == 0: return os.path.abspath(os.getcwd()) return os.path.abspath(args[0])
[docs]def add_element(base, index, value): """Implementation of += on an array element""" try: base[index] += value except TypeError: if isinstance(value, int) or isinstance(value, float): base[index] = num(base[index]) + value elif value is None: base[index] = num(base[index]) else: base[index] = num(base[index]) + num(value) return base[index]
[docs]def add_path(path_list): """Add a path_list to the system path list""" sys.path[0:0] = path_list
[docs]def add_tie_call(func, package): """Add a call to _tie_call for functions defined in a tie package""" def tie_call_func(*args, **kwargs): __package__ = package # for caller() only return tie_call(func, args, kwargs) return tie_call_func
[docs]def add_tie_methods(obj): """Create a subclass for the object and add the methods to it to implement 'tie', like __getitem__ etc. The call to this functions is generated on any 'return' statement (or implicit return) in TIEHASH or TIEARRAY""" try: cls = obj.__class__ except Exception: # Not an object, so just ignore return obj is_scalar = is_hash = is_array = False if hasattr(cls, 'TIESCALAR'): is_scalar = True if hasattr(cls, 'TIEARRAY'): is_array = True if hasattr(cls, 'TIEHASH'): is_hash = True if not is_array and not is_hash and not is_scalar: return obj elif hasattr(cls, '__TIE_subclass__'): obj.__class__ = cls.__TIE_subclass__ return obj classname = cls.__name__ result = type(classname, (cls,), Hash() if hasattr(cls, 'isHash') else dict()) if is_scalar: def __get__(self, obj, objtype=None): return self.FETCH() result.__get__ = __get__ def __set__(self, obj, value): return self.STORE(value) result.__set__ = __set__ if hasattr(result, 'DELETE'): setattr(result, _TIE_MAP['DELETE'], getattr(result, 'DELETE')) if hasattr(result, 'UNTIE'): setattr(result, _TIE_MAP['UNTIE'], getattr(result, 'UNTIE')) else: setattr(result, _TIE_MAP['UNTIE'], lambda self: None) cls.__TIE_subclass__ = result obj.__class__ = result return obj for m, p in _TIE_MAP.items(): if hasattr(result, m): setattr(result, p, getattr(result, m)) elif p != '__del__' and p != '__len__': # Don't define __del__ unless they define DELETE, __len__ could be SCALAR or FETCHSIZE setattr(result, p, eval(f'lambda *_args: raise_(Die(\'Can\\\'t locate object method "{m}" via package "{classname}"\'))')) if not hasattr(result, 'SCALAR') and not hasattr(result, 'FETCHSIZE'): setattr(result, _TIE_MAP['SCALAR'], eval(f'lambda *_args: raise_(Die(\'Can\\\'t locate object method SCALAR or FETCHSIZE via package "{classname}"\'))')) # Always generate an __untie__ method unless we generated it above if not hasattr(result, 'UNTIE'): setattr(result, _TIE_MAP['UNTIE'], lambda self: None) result.__bool__ = lambda self: True if is_array: if hasattr(result, 'POP') and hasattr(result, 'SHIFT'): result.pop = lambda *_args: _args[0].POP() if len(_args) == 1 else _args[0].SHIFT() else: def pop(self, ndx=-1): result = self.FETCH(ndx) self.DELETE(ndx) return result setattr(result, 'pop', pop) result.extend = lambda self, lst: [self.PUSH(l) for l in lst] def __getitem__(self, index): if isinstance(index, int): if index < 0: index += len(self) return self.FETCH(index) elif isinstance(index, slice): return Array([self[i] for i in range(*index.indices(len(self)))]) else: return self.FETCH(index) result.__getitem__ = __getitem__ def __setitem__(self, index, value): if isinstance(index, int): if index < 0: index += len(self) return self.STORE(index, value) elif isinstance(index, slice): if index.start is not None: for i in range(len(self), index.start): self.STORE(i, None) value = iter(value) ndx = index.start if index.start is not None else 0 j = None for i in range(*index.indices(len(self))): try: self.STORE(i, next(value)) except StopIteration: if j is None: j = i self.pop(j) ndx += 1 rest = list(value) lr = len(rest) if lr: for i in range(len(self)-1,ndx-1,-1): # Move everything else up self.STORE(i+lr, self.FETCH(i)) for i in range(lr): self.STORE(i+ndx, rest[i]) else: return self.STORE(index, value) result.__setitem__ = __setitem__ def __delitem__(self, index): if isinstance(index, int): ls = len(self) if not ls: return if index < 0: index += len(self) self.DELETE(index) elif isinstance(index, slice): for i in range(*index.indices(len(self))): self.DELETE(i) else: try: self.DELETE(index) except (TypeError, KeyError): self.DELETE(str(index)) result.__delitem__ = __delitem__ def get(self, index, default=None): if index < 0: index += len(self) if index < 0 or index >= len(self): return default return self.FETCH(index) result.get = get def __iter__(self): for i in range(self.SCALAR() if hasattr(self, 'SCALAR') else self.FETCHSIZE()): yield self.FETCH(i) result.__iter__ = __iter__ cls.__TIE_subclass__ = result if is_hash: if is_array: def __iter__(self): if self.isHash: current_key = self.FIRSTKEY() # Handle FIRSTKEY being implemented using each if isinstance(current_key, collections.abc.Sequence) and not isinstance(current_key, str): if len(current_key) == 0: current_key = None else: current_key = current_key[0] while current_key is not None: yield current_key current_key = self.NEXTKEY() if isinstance(current_key, collections.abc.Sequence) and not isinstance(current_key, str): if len(current_key) == 0: current_key = None else: current_key = current_key[0] else: for i in range(self.SCALAR() if hasattr(self, "SCALAR") else self.FETCHSIZE()): yield self.FETCH(i) result.__iter__ = __iter__ else: def __iter__(self): current_key = self.FIRSTKEY() # Handle FIRSTKEY being implemented using each if isinstance(current_key, collections.abc.Sequence) and not isinstance(current_key, str): if len(current_key) == 0: current_key = None else: current_key = current_key[0] while current_key is not None: yield current_key current_key = self.NEXTKEY() if isinstance(current_key, collections.abc.Sequence) and not isinstance(current_key, str): if len(current_key) == 0: current_key = None else: current_key = current_key[0] result.__iter__ = __iter__ if is_array: def pop(*args): self = args[0] if len(args) == 1: key = -1 else: key = args[1] if isinstance(key, int): # Array style result = self.FETCH(key) self.DELETE(key) return result if not self.EXISTS(key): # Hash style if len(args) >= 2: return args[2] # default return None # default default return self.DELETE(key) result.pop = pop def get(self, key, default=None): if isinstance(key, int): # Array style if key < 0: key += len(self) if key < 0 or key >= len(self): return default else: # Hash style if not self.EXISTS(key): return default return self.FETCH(key) result.get = get else: def pop(self, key, default=None): if not self.EXISTS(key): return default return self.DELETE(key) result.pop = pop def get(self, key, default=None): if not self.EXISTS(key): return default return self.FETCH(key) result.get = get result.keys = lambda self: [k for k in self] result.values = lambda self: [self[k] for k in self] result.items = lambda self: [(k, self[k]) for k in self] result.update = lambda self, items: {self.STORE(i, items[i]) for i in items} cls.__TIE_subclass__ = result obj.__class__ = result return obj
[docs]def and_element(base, index, value): base[index] &= value return base[index]
[docs]def assign_global(packname, varname, value): """Assigns a value to a package global variable and returns the value""" namespace = getattr(builtins, packname) setattr(namespace, varname, value) return value
[docs]def assign_hash(h, keys, values): """Assign a hash with a list of hash keys and a list of values""" keys = list(keys) values = list(values) if len(keys) == len(values): for i in range(len(keys)): h[_str(keys[i])] = values[i] else: for i in range(len(keys)): h[_str(keys[i])] = values[i] if i < len(values) else None return h
[docs]def assign_meta(packname, varname, value): """Assigns a value in the metaclass of a package global variable and returns the value. Creates the metaclass if need be. This is use for tie $scalar""" namespace = getattr(builtins, packname) if not isinstance(namespace, type): init_package(packname, is_class=True, autovivification=namespace.__autovivification__) namespace = getattr(builtins, packname) meta = namespace.__class__ setattr(meta, varname, value) return value
[docs]def assign_sparse(lst, indexes, values): """Assign a list with a sparse list of indexes and a list of values""" if len(indexes) == len(values): for i in range(len(indexes)): lst[int_(indexes[i])] = values[i] else: for i in range(len(indexes)): lst[int_(indexes[i])] = values[i] if i < len(values) else None return lst
[docs]def autoflush(self, arg=1): """Method added to FH to support OO perl""" orig = self._autoflush if hasattr(self, '_autoflush') else 0 self._autoflush = arg if arg: self._orig_writelines = self.writelines def new_writelines(self, lines): self._orig_writelines(lines) self.flush() self.writelines = types.MethodType(new_writelines, self) if hasattr(self, 'write'): self._orig_write = self.write def new_write(self, b): result = self._orig_write(b) self.flush() return result self.write = types.MethodType(new_write, self) elif hasattr(self, '_orig_writelines'): self.writelines = self._orig_writelines if hasattr(self, '_orig_write'): self.write = self._orig_write return orig
[docs]def basename(path, *suffixes): """Implementation of perl basename function""" path = re.sub(r'(.)/*$', r'\1', path, flags=re.S) [basename, dirname, suffix] = fileparse(path, *map(re.escape, suffixes)) if len(suffix) and not len(basename): basename = suffix if not len(basename): basename = dirname return basename
[docs]def binmode(fh,mode='b',encoding=None,errors=None,newline=None): """Handle binmode""" global OS_ERROR, TRACEBACK, AUTODIE try: omode = '' fno = None try: fno = fh.fileno() fh.flush() # could be a closed file omode = fh.mode # could not have a mode except Exception: pass if mode is None: mode = omode.replace('b', '') else: mode = omode + mode if encoding is None and 'b' not in mode: encoding = fh.encoding if errors is None and 'b' not in mode: errors = fh.errors if fno is None: result = io.TextIOWrapper(io.BufferedIOBase(), encoding=encoding, errors=errors, newline=newline) else: result = os.fdopen(os.dup(fno), mode, encoding=encoding, errors=errors, newline=newline) if hasattr(fh, 'filename') and hasattr(fh, '_name'): # from tempfile result.filename = fh.filename result._name = fh._name if hasattr(fh, '_autoflush'): result._autoflush = fh._autoflush if hasattr(fh, 'autoflush'): result.autoflush = types.MethodType(autoflush, result) if hasattr(fh, 'say'): # from IO::File return _create_all_fh_methods(result) return result except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"binmode failed: {OS_ERROR}",skip=2) if AUTODIE: raise return None
[docs]def binmode_dynamic(fh, mode): """Handle binmode where the mode/layers are dynamic""" encoding = None errors = None newline = None mmode = mode ext = None if ':' in mode: mode, ext = mode.split(':') if mode in _OPEN_MODE_MAP: mode = _OPEN_MODE_MAP[mode] else: mode = 'r' if ext: if ext == 'raw' or ext == 'bytes': mode += 'b' elif ext.startswith('encoding('): encoding = ext.replace('encoding(','').replace(')','') errors = 'replace' elif ext == 'utf8': encoding = 'UTF-8' errors = 'ignore' return binmode(fh, mode, encoding=encoding, errors=errors, newline=newline)
[docs]def bless(obj, classname, isa=()): """Create an object for obj in classname""" if isinstance(classname, str): classname = classname.replace("'", '.').replace('::', '.') else: if hasattr(classname, '__name__'): # They sent us the class object classname = classname.__name__ elif hasattr(classname, '__class__'): # They sent us an instance classname = classname.__class__.__name__ if not hasattr(builtins, classname): init_package(classname, is_class=True, isa=isa) result_class = getattr(builtins, classname) result = result_class() self = result if hasattr(obj, 'isHash'): if obj.isHash: result.isHash = True for key, value in obj.items(): self[key] = value else: result.isHash = False for i, value in enumerate(obj): self[i] = value elif isinstance(obj, collections.abc.Mapping): for key, value in obj.items(): self[key] = value elif isinstance(obj, collections.abc.Iterable) and not isinstance(obj, str): for i, value in enumerate(obj): self[i] = value elif WARNING: carp(f"'bless' {classname} not implemented on {type(obj)} type object") return result
[docs]def blessed(r): """blessed function in perl""" _ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR', "<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR', "<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY', "<class 'function'>": 'CODE', "<class 'dict'>": 'HASH'} tr = type(r) t = str(tr) if t in _ref_map: return None elif '_ArrayHash' in t: return None if hasattr(tr, '__name__'): return tr.__name__.replace('.', '::') return t.replace("<class '", '').replace("'>", '').replace('.', '::')
[docs]def caller(expr=None): """ Implementation of caller function in perl""" try: level = 2 if expr is None else (max(int(expr),0)+2) cur = 2 get_level = 2 last_level = 1 while True: fr = sys._getframe(get_level) if fr.f_code.co_name != '_tie_call' and \ fr.f_code.co_name != 'tie_call' and \ fr.f_code.co_name != '_tie_call_func' and \ fr.f_code.co_name != 'tie_call_func' and \ fr.f_code.co_name != '<lambda>' and \ not '__goto_sub__' in fr.f_locals and \ not re.match(r'^_f\d+[a-z]?$', fr.f_code.co_name): cur += 1 if(cur > level): break last_level = get_level get_level += 1 def get_package(fr, get_level): fr_tcf = None try: nfr = sys._getframe(get_level+2) if nfr.f_code.co_name == '_tie_call' or \ nfr.f_code.co_name == 'tie_call': nfr = sys._getframe(get_level+3) if nfr.f_code.co_name == '_tie_call_func' or \ nfr.f_code.co_name == 'tie_call_func' or \ nfr.f_code.co_name == '<lambda>': if '__package__' in nfr.f_locals: # We have a variable __package__ defined in _tie_call_func for this purpose # The reason we have that is that all _tie_call_func's code pointers are # the same and we can't tell them apart. if hasattr(nfr.f_locals['__package__'], '__PACKAGE__'): return nfr.f_locals['__package__'].__PACKAGE__ fr_tcf = nfr except Exception as e: pass package = None try: #callable_obj = fr.f_globals[fr.f_code.co_name] callable_obj = fr.f_code for pack in builtins.__packages__: namespace = getattr(builtins, pack) for key in namespace.__dict__: func = namespace.__dict__[key] if hasattr(func, '__code__'): code = func.__code__ if code == callable_obj: package = pack break if fr_tcf and code == fr_tcf.f_code: package = pack break if hasattr(func, '__func__'): # e.g. MethodType if func.__func__.__code__ == callable_obj: package = pack break if package is not None: break else: raise Exception(f"Couldn't find {callable_obj} in {builtins.__packages__}") except Exception as e: package = 'main' if '__PACKAGE__' in fr.f_builtins: package = fr.f_builtins['__PACKAGE__'] return package package = get_package(fr, get_level) filename = fr.f_code.co_filename if filename == '<string>': # Running with pdb raise ValueError if sys.platform == 'win32': if os.getcwd().lower() == os.path.dirname(filename).lower(): filename = os.path.basename(filename) else: if os.getcwd() == os.path.dirname(filename): filename = os.path.basename(filename) if expr is None: return [package, filename, fr.f_lineno] cfr = sys._getframe(last_level) while re.match(r'^_f\d+[a-z]?$', cfr.f_code.co_name): last_level += 1 cfr = sys._getframe(last_level) cpackage = get_package(cfr, last_level) wantarray = '' argvalues = inspect.formatargvalues(*inspect.getargvalues(cfr)) if re.search(r'wantarray=True', argvalues): wantarray = 1 return [package, filename, fr.f_lineno, f"{cpackage}.{cfr.f_code.co_name}", 1, wantarray, '', 0, 0, 0, 0] except ValueError: if expr is None: return [None, None, None] else: return [None, None, None, None, None, None, None, None, None, None, None]
[docs]def caller_s(expr=None): """ Implementation of caller function in scalar context""" result = caller(1 if expr is None else (max(int(expr),0)+1)) if result is None: return result return result[0]
[docs]def can(self, methodname): """Implementation of CLASS::can and $obj->can""" if self is None: return None if isinstance(self, str): if hasattr(builtins, self): self = getattr(builtins, self) if hasattr(self, methodname): method = getattr(self, methodname) if callable(method): return method if methodname == 'can': return can if methodname == 'isa': return isa return None
[docs]def carp(*args,skip=1): """Warn with no backtrace""" if TRACEBACK: print(longmess(*args, skip=skip), end='', file=sys.stderr) else: print(shortmess(*args, skip=skip), end='', file=sys.stderr) return 1
[docs]def cgtime(secs=None): """Replacement for perl built-in gmtime function in scalar context""" return tm_py.asctime(tm_py.gmtime(secs))
[docs]def chdir(d): """Implementation of perl chdir""" global AUTODIE, TRACEBACK, OS_ERROR try: os.chdir(d) return 1 except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(OS_ERROR,skip=2) if AUTODIE: raise return ''
[docs]def chmod(mode, *argv): """Implementation of perl chmod function""" result = 0 for arg in argv: try: os.chmod(arg, mode) result += 1 except Exception: pass return result
[docs]def chomp_element(base, index, value): """Implementation of perl = and chomp on an array element""" if value is None: value = '' if INPUT_RECORD_SEPARATOR is None or isinstance(INPUT_RECORD_SEPARATOR, int): base[index] = value return 0 if INPUT_RECORD_SEPARATOR == '': chomped_value = value.rstrip("\n") else: chomped_value = value.rstrip(INPUT_RECORD_SEPARATOR) base[index] = chomped_value return len(value) - len(base[index])
[docs]def chomp_global(packname, varname, value): """Assigns a value to a package global variable, does a chomp and returns the number of chars chopped""" namespace = getattr(builtins, packname) if value is None: value = '' if INPUT_RECORD_SEPARATOR is None or isinstance(INPUT_RECORD_SEPARATOR, int): setattr(namespace, varname, value) return 0 if INPUT_RECORD_SEPARATOR == '': chomped_value = value.rstrip("\n") else: chomped_value = value.rstrip(INPUT_RECORD_SEPARATOR) setattr(namespace, varname, chomped_value) return len(value) - len(chomped_value)
[docs]def chomp_with_result(var): """Implementation of chomp where the count of chars removed is needed. Returns a tuple of (result, count).""" count = 0 if var is None: var = '' if INPUT_RECORD_SEPARATOR is None or isinstance(INPUT_RECORD_SEPARATOR, int): return (var, 0) if (hasattr(var, 'isHash') and var.isHash) or (not hasattr(var, 'isHash') and isinstance(var, collections.abc.Mapping)): for k, v in var.items(): (var[k], cnt) = chomp_with_result(v) count += cnt return (var, count) if isinstance(var, collections.abc.Iterable) and not isinstance(var, str): for i, v in enumerate(var): (var[i], cnt) = chomp_with_result(v) count += cnt return (var, count) var = str(var) if INPUT_RECORD_SEPARATOR == '': result = var.rstrip("\n") elif var.endswith(INPUT_RECORD_SEPARATOR): result = var[0:-len(INPUT_RECORD_SEPARATOR)] else: result = var return (result, len(var) - len(result))
[docs]def chop_element(base, index, value): """Implementation of perl = and chop on an array element""" if value is None: value = '' result = value[-1:] base[index] = value[0:-1] return result
[docs]def chop_global(packname, varname, value): """Assigns a value to a package global variable, does a chop and returns the value chopped""" namespace = getattr(builtins, packname) if value is None: value = '' result = value[-1:] setattr(namespace, varname, value[0:-1]) return result
[docs]def chop_without_result(var): """Implementation of chop where the last char removed is not needed. Returns a tuple of (result, '').""" if var is None: var = '' if (hasattr(var, 'isHash') and var.isHash) or (not hasattr(var, 'isHash') and isinstance(var, collections.abc.Mapping)): for k, v in var.items(): (var[k], _) = chop_without_result(v) return (var, '') if isinstance(var, collections.abc.Iterable) and not isinstance(var, str): for i, v in enumerate(var): (var[i], _) = chop_without_result(v) return (var, '') var = str(var) result = var[0:-1] return (result, '')
[docs]def chop_with_result(var): """Implementation of chop where the last char removed is needed. Returns a tuple of (result, last_c).""" if var is None: var = '' if (hasattr(var, 'isHash') and var.isHash) or (not hasattr(var, 'isHash') and isinstance(var, collections.abc.Mapping)): for k, v in var.items(): (var[k], last_c) = chop_with_result(v) return (var, last_c) if isinstance(var, collections.abc.Iterable) and not isinstance(var, str): for i, v in enumerate(var): (var[i], last_c) = chop_with_result(v) return (var, last_c) var = str(var) last_c = var[-1:] result = var[0:-1] return (result, last_c)
[docs]def clone_encoding(encoding): """Implementation of Encoding::clone_encoding""" obj = find_encoding(encoding) if obj is None: return obj return copy.deepcopy(obj)
[docs]def closedir(DIR): """Implementation of perl closedir""" DIR[0] = None DIR[1] = None
[docs]def close_(fh): """Implementation of perl close""" global AUTODIE, TRACEBACK, OS_ERROR, TRACE_RUN try: if hasattr(fh, '_sp'): # issue 72: subprocess fh.flush() fh._sp.communicate() if TRACE_RUN: sp = subprocess.CompletedProcess(f"open({fh._file})", fh._sp.returncode) carp(f'trace close({fh._file}): {repr(sp)}', skip=2) fh.close() if fh._sp.returncode: raise IOError(f"close({fh._file}): failed with {fh._sp.returncode}") return 1 if fh is None: raise TypeError(f"close(None): failed") #if WARNING and fh.closed: #carp(f"close failed: Filehandle is already closed", skip=2) fh.close() return 1 except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(OS_ERROR,skip=2) if AUTODIE: raise return ''
[docs]def cluck(*args,skip=1): """Warn with stack backtrace""" print(longmess(*args, skip=skip), end='', file=sys.stderr) return 1
[docs]def cmp(a,b): """3-way comparison like the cmp operator in perl""" if a is None: a = '' elif hasattr(a, '__cmp__'): return a.__cmp__(b) if b is None: b = '' elif hasattr(b, '__rcmp__'): return b.__rcmp__(a) a = str(a) b = str(b) return (a > b) - (a < b)
[docs]def concat_element(base, index, value): """Implementation of perl .= on an array element""" try: base[index] += value except TypeError: if value is None: if base[index] is None: base[index] = '' else: base[index] = str(base[index]) else: if base[index] is None: base[index] = str(value) else: base[index] = str(base[index]) + str(value) return base[index]
[docs]def confess(*args,skip=1): """Error with stack backtrace""" if TRACEBACK: raise Die(longmess(*args, skip=skip),suppress_traceback=True) raise Die(longmess(*args, skip=skip))
[docs]def croak(*args,skip=1): """Error with no backtrace""" if TRACEBACK: raise Die(longmess(*args, skip=skip),suppress_traceback=True) raise Die(shortmess(*args, skip=skip))
[docs]def curdir(): """Implementation of File::Spec->curdir""" return '.'
init_package('Encode')
[docs]def FB_DEFAULT(): return 0
Encode.FB_DEFAULT = FB_DEFAULT
[docs]def FB_CROAK(): return 1
Encode.FB_CROAK = FB_CROAK
[docs]def FB_QUIET(): return 4
Encode.FB_QUIET = FB_QUIET
[docs]def FB_WARN(): return 6
Encode.FB_WARN = FB_WARN
[docs]def LEAVE_SRC(): return 8
Encode.LEAVE_SRC = LEAVE_SRC
[docs]def decode(encoding, octets, check=None): """Implementation of Encode::decode""" # NOTE: Changes to this function should also be make in decode_utf8.py if check is None: check = 0 elif callable(check): name = str(id(check)) try: codecs.lookup_error(name) except LookupError: def handler(uee): slc = uee.object[uee.start:uee.end] if not isinstance(slc, bytes): slc = bytes(slc,encoding='latin-1') return (check(*slc), uee.end) codecs.register_error(name, handler) return octets.encode('latin-1').decode(encoding, errors=name) else: check = int_(check) if check & Encode.FB_CROAK(): try: return octets.encode('latin-1').decode(encoding) except Exception as e: croak(str(e)) elif (check & 7) == Encode.FB_WARN(): try: s = octets.encode('latin-1') return s.decode(encoding) except UnicodeError as e: print(str(e), file=sys.stderr) return s[:e.start].decode(encoding) elif (check & 7) == Encode.FB_QUIET(): # NOTE: To keep from having to change the user's string (which is difficult # in python, since they are immutable), we instead keep track of the status # of the current decode as an attribute of this function. We avoid using this status # in a completely different decode by checking the first 16-bytes of the # string being decoded, and also ensuring that the string being passed keeps growing. # This may not be 100% effective in all cases, but it does pass a fairly # comprehensive test (test_Encode.py) try: s = octets.encode('latin-1') orig_s = s if hasattr(decode, 'start'): # Quick sanity check that we're still decoding the same data verify_chars = 16 string = decode.string check_len = min(verify_chars, decode.start) ln = len(s) if(ln == 1 or ln < decode.start or s[:check_len] != string[:check_len]): delattr(decode, 'start') else: s = s[decode.start:] result = s.decode(encoding) decode.start = len(orig_s) decode.string = orig_s return result except UnicodeError as e: prior_start = 0 if e.reason.startswith('unexpected end'): if hasattr(decode, 'start'): prior_start = decode.start decode.start += e.start else: decode.start = e.start decode.string = orig_s elif hasattr(decode, 'start'): delattr(decode, 'start') return orig_s[prior_start:decode.start].decode(encoding) else: return octets.encode('latin-1').decode(encoding, errors='replace')
[docs]def decode_utf8(octets, check=None): """Implementation of Encode::decode_utf8""" # Note: The code here is mostly a copy of decode.py if check is None: check = 0 elif callable(check): name = str(id(check)) try: codecs.lookup_error(name) except LookupError: def handler(uee): slc = uee.object[uee.start:uee.end] if not isinstance(slc, bytes): slc = bytes(slc,encoding='latin-1') return (check(*slc), uee.end) codecs.register_error(name, handler) return octets.encode('latin-1').decode(errors=name) else: check = int_(check) if check & Encode.FB_CROAK(): try: return octets.encode('latin-1').decode() except Exception as e: croak(str(e)) elif (check & 7) == Encode.FB_WARN(): try: s = octets.encode('latin-1') return s.decode() except UnicodeError as e: print(str(e), file=sys.stderr) return s[:e.start].decode() elif (check & 7) == Encode.FB_QUIET(): # NOTE: To keep from having to change the user's string (which is difficult # in python, since they are immutable), we instead keep track of the status # of the current decode as an attribute of this function. We avoid using this status # in a completely different decode_utf8 by checking the first 16-bytes of the # string being decoded, and also ensuring that the string being passed keeps growing. # This may not be 100% effective in all cases, but it does pass a fairly # comprehensive test (test_Encode.py) try: s = octets.encode('latin-1') orig_s = s if hasattr(decode_utf8, 'start'): # Quick sanity check that we're still decoding the same data verify_chars = 16 string = decode_utf8.string check_len = min(verify_chars, decode_utf8.start) ln = len(s) if(ln == 1 or ln < decode_utf8.start or s[:check_len] != string[:check_len]): delattr(decode_utf8, 'start') else: s = s[decode_utf8.start:] result = s.decode() decode_utf8.start = len(orig_s) decode_utf8.string = orig_s return result except UnicodeError as e: prior_start = 0 if e.reason.startswith('unexpected end'): if hasattr(decode_utf8, 'start'): prior_start = decode_utf8.start decode_utf8.start += e.start else: decode_utf8.start = e.start decode_utf8.string = orig_s elif hasattr(decode_utf8, 'start'): delattr(decode_utf8, 'start') return orig_s[prior_start:decode_utf8.start].decode() else: return octets.encode('latin-1').decode(errors='replace')
[docs]def define_alias(alias, name): """Implementation of Encode::define_alias""" def norm(n): return re.sub(r'[-\s]+', '_', n).lower() if alias == norm(name): return try: info = codecs.lookup(name) result = codecs.CodecInfo( name=alias, encode=info.encode, decode=info.decode) if hasattr(info, '_obj'): result._obj = info._obj except LookupError: result = None alias = norm(alias) codecs.register(lambda a: result if a == alias else None)
[docs]def define_encoding(obj, name, aliases): """Implementation of Encode::define_encoding""" def norm(n): return re.sub(r'[-\s]+', '_', n).lower() def encode(s, errors='strict'): l = len(s) s = obj.encode(s) return (bytes(s, encoding='latin1'), l) def decode(b, errors='strict'): l = len(b) s = str(b, encoding='latin1') return (obj.decode(s), l) result = codecs.CodecInfo(name=name, encode=encode, decode=decode) result._obj = obj name = norm(name) codecs.register(lambda n: result if n == name else None) for alias in aliases: define_alias(alias, obj.name()) return obj
[docs]def die(*args, skip=None): """Handle die in perl""" global INPUT_LINE_NUMBER, _INPUT_FH_NAME, EVAL_ERROR def is_func_in_call_stack(func): # Die handlers are turned off inside themselves frame = sys._getframe(2) while frame is not None: if func.__code__ == frame.f_code: return True frame = frame.f_back return False if hasattr(builtins, 'CORE') and hasattr(builtins.CORE, 'GLOBAL') and \ hasattr(builtins.CORE.GLOBAL, 'die') and callable(builtins.CORE.GLOBAL.die) and not \ is_func_in_call_stack(builtins.CORE.GLOBAL.die): return builtins.CORE.GLOBAL.die(*args) args = list(map(_str, args)) if len(args) == 0 or len(''.join(args)) == 0: args = ["Died"] try: if EVAL_ERROR or hasattr(EVAL_ERROR, 'PROPAGATE'): if hasattr(EVAL_ERROR, 'PROPAGATE') and callable(EVAL_ERROR.PROPAGATE): (_, fn, lno) = caller() try: EVAL_ERROR = EVAL_ERROR.PROPAGATE(fn, lno) args = [EVAL_ERROR] except Exception: args = [EVAL_ERROR, "\t...propagated"] else: args = [EVAL_ERROR, "\t...propagated"] except Exception: pass if "\n" not in args[-1]: (_, fn, lno, *_) = caller() if skip is None else caller(skip) iln = None ifn = None try: iln = fileinput.lineno() ifn = '<fileinput>' except RuntimeError: iln = INPUT_LINE_NUMBER if _INPUT_FH_NAME: ifn = f"<{_INPUT_FH_NAME}>" if iln and ifn: args.append(f" at {fn} line {lno}, {ifn} line {iln}.\n") else: args.append(f" at {fn} line {lno}.\n") arg = ''.join(args) if callable(SIG_DIE_HANDLER) and not is_func_in_call_stack(SIG_DIE_HANDLER): SIG_DIE_HANDLER(arg) orig_excepthook = sys.excepthook def excepthook(typ, value, traceback): if TRACEBACK: orig_excepthook(typ, value, traceback) else: print(value, end='', file=sys.stderr) if (m := re.search(r'\[Errno (\d+)\]', str(value))): sys.exit(int(m.group(1))) if CHILD_ERROR>>8: sys.exit(CHILD_ERROR>>8) sys.exit(255) sys.excepthook = excepthook raise Die(arg)
[docs]def dirname(fullname): """Emulation of File::Basename qw(dirname) for unix""" def fileparse(fullname): [dirpath,basename] = (_m:=re.search(re.compile(r'^(.*/)?(.*)',re.S),fullname),_m.groups() if _m else [None,None])[1] if not (dirpath): dirpath = './' return (basename, dirpath) [basename, dirname] = fileparse(fullname) dirname = re.sub(r'(.)/*$', r'\1', dirname, flags=re.S) if not len(basename): [basename, dirname] = fileparse(dirname) dirname = re.sub(r'(.)/*$', r'\1', dirname, flags=re.S) return dirname
[docs]def divide_element(base, index, value): base[index] /= value return base[index]
init_package('Data.Dumper') Data.Dumper.Indent_v = 2 # InIt Data.Dumper.Trailingcomma_v = False # InIt Data.Dumper.Purity_v = 0 # InIt Data.Dumper.Pad_v = '' # InIt Data.Dumper.Varname_v = "VAR" # InIt Data.Dumper.Useqq_v = 0 # InIt Data.Dumper.Terse_v = False # InIt Data.Dumper.Freezer_v = '' # InIt Data.Dumper.Toaster_v = '' # InIt Data.Dumper.Deepcopy_v = 0 # InIt Data.Dumper.Quotekeys_v = 1 # InIt Data.Dumper.Bless_v = 'bless' # InIt Data.Dumper.Pair_v = ':' # InIt Data.Dumper.Maxdepth_v = 0 # InIt Data.Dumper.Maxrecurse_v = 1000 # InIt Data.Dumper.Useperl_v = 0 # InIt Data.Dumper.Sortkeys_v = 0 # InIt Data.Dumper.Deparse_v = False # InIt Data.Dumper.Sparseseen_v = False # InIt
[docs]def Dumper(*args): """Implementation of Data::Dumper""" result = [] pp = pprint.PrettyPrinter(indent=Data.Dumper.Indent_v, depth=None if Data.Dumper.Maxdepth_v==0 else Data.Dumper.Maxdepth_v, compact=Data.Dumper.Terse_v, sort_dicts=Data.Dumper.Sortkeys_v) for i, arg in enumerate(args, start=1): if Data.Dumper.Terse_v: result.append(f"{Data.Dumper.Pad_v}" + pp.pformat(arg)) else: result.append(f"{Data.Dumper.Pad_v}{Data.Dumper.Varname_v}{i} = " + pp.pformat(arg)) spacer = " " if Data.Dumper.Indent_v == 0 else "\n" return spacer.join(result)
[docs]def dup(file,mode,checked=True,equals=False,encoding=None,errors=None): """Replacement for perl built-in open function when the mode contains '&'. Keyword arg 'checked' means the result will be checked. Keyword arg 'equals' means that '&=' was specified, so skip the os.dup operation.""" global OS_ERROR, TRACEBACK, AUTODIE try: if isinstance(file, io.IOBase): # file handle file.flush() if encoding is None: encoding = file.encoding if errors is None: errors = file.errors if equals: return os.fdopen(file.fileno(), mode, encoding=encoding, errors=errors) return os.fdopen(os.dup(file.fileno()), mode, encoding=encoding, errors=errors) if isinstance(file, int): pass elif (_m:=re.match(r'=?(\d+)', file)): file = int(_m.group(1)) elif file in _DUP_MAP: file = _DUP_MAP[file] if equals: return _create_fh_methods(os.fdopen(file, mode, encoding=encoding, errors=errors)) return _create_fh_methods(os.fdopen(os.dup(file), mode, encoding=encoding, errors=errors)) except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"dup failed: {OS_ERROR}",skip=2) if AUTODIE: raise if checked: return None fh = io.StringIO() fh.close() return _create_fh_methods(fh)
[docs]def each(h_a): """See https://perldoc.perl.org/functions/each""" key = str(id(h_a)) # Unique memory address of object if not hasattr(each, key): setattr(each, key, iter(h_a)) it = getattr(each, key) try: v = next(it) except StopIteration: setattr(each, key, iter(h_a)) return [] if hasattr(h_a, 'TIEHASH') or \ ((hasattr(h_a, 'keys') and not hasattr(h_a, 'isHash')) or (hasattr(h_a, 'isHash') and h_a.isHash)): return [v, h_a[v]] ndx_key = key + 'i' i = 0; if hasattr(each, ndx_key): i = getattr(each, ndx_key) setattr(each, ndx_key, i+1) return [i, v]
[docs]def encode(encoding, string, check=None): """Implementation of Encode::encode""" if check is None: check = 0 elif callable(check): name = str(id(check)) try: codecs.lookup_error(name) except LookupError: def handler(uee): slc = uee.object[uee.start:uee.end] if not isinstance(slc, bytes): slc = bytes(slc,encoding='latin-1') return (check(*slc), uee.end) codecs.register_error(name, handler) return string.encode(encoding, errors=name).decode('latin-1') else: check = int_(check) if check & Encode.FB_CROAK(): try: return string.encode(encoding).decode('latin-1') except Exception as e: croak(str(e)) elif (check & 7) == Encode.FB_WARN(): try: return string.encode(encoding).decode('latin-1') except UnicodeError as e: print(str(e), file=sys.stderr) return string[:e.start].encode(encoding).decode('latin-1') elif (check & 7) == Encode.FB_QUIET(): try: return string.encode(encoding).decode('latin-1') except UnicodeError as e: return string[:e.start].encode(encoding).decode('latin-1') else: return string.encode(encoding, errors='replace').decode('latin-1')
[docs]def encode_utf8(string): """Implementation of Encode::encode_utf8""" return string.encode(errors='replace').decode('latin-1')
[docs]def encodings(get=None): """Implementation of Encode::encodings""" # list from https://stackoverflow.com/questions/1728376/get-a-list-of-all_e-the-encodings-python-can-encode-to all_e = ['ascii', 'big5', 'big5hkscs', 'cp037', 'cp273', 'cp424', 'cp437', 'cp500', 'cp720', 'cp737', 'cp775', 'cp850', 'cp852', 'cp855', 'cp856', 'cp857', 'cp858', 'cp860', 'cp861', 'cp862', 'cp863', 'cp864', 'cp865', 'cp866', 'cp869', 'cp874', 'cp875', 'cp932', 'cp949', 'cp950', 'cp1006', 'cp1026', 'cp1125', 'cp1140', 'cp1250', 'cp1251', 'cp1252', 'cp1253', 'cp1254', 'cp1255', 'cp1256', 'cp1257', 'cp1258', 'euc_jp', 'euc_jis_2004', 'euc_jisx0213', 'euc_kr', 'gb2312', 'gbk', 'gb18030', 'hz', 'iso2022_jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_2004', 'iso2022_jp_3', 'iso2022_jp_ext', 'iso2022_kr', 'latin_1', 'iso8859_2', 'iso8859_3', 'iso8859_4', 'iso8859_5', 'iso8859_6', 'iso8859_7', 'iso8859_8', 'iso8859_9', 'iso8859_10', 'iso8859_11', 'iso8859_13', 'iso8859_14', 'iso8859_15', 'iso8859_16', 'johab', 'koi8_r', 'koi8_t', 'koi8_u', 'kz1048', 'mac_cyrillic', 'mac_greek', 'mac_iceland', 'mac_latin2', 'mac_roman', 'mac_turkish', 'ptcp154', 'shift_jis', 'shift_jis_2004', 'shift_jisx0213', 'utf_32', 'utf_32_be', 'utf_32_le', 'utf_16', 'utf_16_be', 'utf_16_le', 'utf_7', 'utf_8', 'utf_8_sig'] if not get or get == ':all': return all_e else: if get.startswith('Encode::'): get = get[8:] get = get.lower().replace('-', '_') list = [] for e in all_e: if get in e: list.append(e) return list
[docs]def eof(fh): global AUTODIE, TRACEBACK """Implementation of perl eof""" try: pos = fh.tell() return (pos == os.path.getsize(fh)) except Exception as e: if TRACEBACK: cluck(f"eof failed: {OS_ERROR}",skip=2) if AUTODIE: raise return 1
[docs]def exc(e): """Exception information like perl, e.g. message at issue_42.pl line 21.""" try: m = str(e) if m.endswith('\n'): return m return f"{m} at {os.path.basename(sys.exc_info()[2].tb_frame.f_code.co_filename)} line {sys.exc_info()[2].tb_lineno}.\n" except Exception: return str(e)
[docs]def exec_(lst): """Implementation of perl exec with a list""" global OS_ERROR, TRACEBACK try: if isinstance(lst, str): lst = lst.split() program = lst[0] program = (program.split())[0] execp(program, lst) except TypeError: OS_ERROR = "Undefined list on exec" if TRACEBACK: cluck(f"exec({lst}) failed: {OS_ERROR}", skip=2) except IndexError: OS_ERROR = "Empty list on exec" if TRACEBACK: cluck(f"exec({lst}) failed: {OS_ERROR}", skip=2) except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"exec({lst}) failed: {OS_ERROR}", skip=2)
[docs]def execp(program, lst): """Implementation of perl exec with a program and a list""" global OS_ERROR, TRACEBACK try: sys.stdout.flush() sys.stderr.flush() except Exception: pass try: os.execvp(program, list(lst)) except OSError: # checkif we're trying to run a perl or python script on Windows if isinstance(program, str): program_split = program.split()[0] if program_split.endswith('.py'): lst = [program] + lst program = sys.executable elif program_split.endswith('.pl'): lst = [program] + lst program = 'perl' else: raise os.execvp(program, list(lst)) else: raise except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"exec({program, lst}) failed: {OS_ERROR}", skip=2)
[docs]def exponentiate_element(base, index, value): base[index] **= value return base[index]
[docs]def extract_bracketed(text, delimiters='{}()[]<>', prefix_pattern='^\s*'): """Implementation of Text::Bracketed::extract_bracketed in list context. Returns a list with (extracted_substring, updated_text, skipped_prefix)""" open_to_close=dict() close_to_open=dict() if '{' in delimiters: open_to_close['{'] = '}' close_to_open['}'] = '{' if '(' in delimiters: open_to_close['('] = ')' close_to_open[')'] = '(' if '[' in delimiters: open_to_close['['] = ']' close_to_open[']'] = '[' if '<' in delimiters: open_to_close['<'] = '>' close_to_open['>'] = '<' stack = [] prefix = '' orig_text = text if (_m := re.match(prefix_pattern, text)): prefix = _m.group(0) text = text[len(prefix):] if not text or text[0] not in open_to_close: return [None, orig_text, None] for i, c in enumerate(text): if c in open_to_close: stack.append(c) elif c in close_to_open: try: top = stack.pop() if top != close_to_open[c]: return [None, orig_text, None] if not stack: return [text[:i+1], text[i+1:], prefix] except IndexError: return [None, orig_text, None] return [None, orig_text, None]
[docs]def extract_bracketed_s(text, delimiters='{}()[]<>', prefix_pattern='^\s*'): """Implementation of Text::Bracketed::extract_bracketed in scalar context. Returns a tuple with (updated_text, extracted_substring)""" open_to_close=dict() close_to_open=dict() if '{' in delimiters: open_to_close['{'] = '}' close_to_open['}'] = '{' if '(' in delimiters: open_to_close['('] = ')' close_to_open[')'] = '(' if '[' in delimiters: open_to_close['['] = ']' close_to_open[']'] = '[' if '<' in delimiters: open_to_close['<'] = '>' close_to_open['>'] = '<' stack = [] if (_m := re.match(prefix_pattern, text)): text = text[len(_m.group(0)):] if not text or text[0] not in open_to_close: return (text, None) for i, c in enumerate(text): if c in open_to_close: stack.append(c) elif c in close_to_open: try: top = stack.pop() if top != close_to_open[c]: return (text, None) if not stack: return (text[i+1:], text[:i+1]) except IndexError: return (text, None) return (text, None)
[docs]def fcntl(fh, func, scalar): global AUTODIE, TRACEBACK, OS_ERROR """Implementation of perl fcntl""" try: result = fc_py.fcntl(fh, func, scalar) if result == 0: return "0 but true" if result == -1: return None return result except Exception as e: OS_ERROR = str(e) if TRACEBACK: cluck(f"fcntl failed: {OS_ERROR}",skip=2) if AUTODIE: raise return None
[docs]def fdopen(fh, fd, mode): """Implementation of $fh->fdopen(fd, mode)""" if isinstance(fd, str) and re.match(r'^\d+$', fd): fd = int(fd) if isinstance(fd, int): fd = f'={fd}' if fh and not fh.closed: fh.close() return _create_all_fh_methods(open_dynamic(_open_mode_string(mode) + '&' + fd))
[docs]def fetch_out_parameter(arg): """Fetch the value of a sub out parameter from the location where _store_out_parameter saved it. This is called after the sub returns. arg is the argument index, starting at 0. Returns the value we saved.""" try: result = getattr(builtins, f"__outp{arg}__") delattr(builtins, f"__outp{arg}__") return result except Exception: return None
[docs]def fetch_out_parameters(var, start=0): """Fetch the values of all sub out parameters from the location where _store_out_parameter saved them. This is called after the sub returns. var is the array or hash to store them in. start is the argument starting index, defaulting to 0. Returns the array or hash we saved.""" if (hasattr(var, 'isHash') and var.isHash) or (not hasattr(var, 'isHash') and isinstance(var, collections.abc.Mapping)): ln = len(var.keys()) var_copy = var.copy() var.clear() missing = 0; for arg in range(0, ln*2, 2): try: key = getattr(builtins, f"__outp{arg+start}__") delattr(builtins, f"__outp{arg+start}__") var[key] = getattr(builtins, f"__outp{arg+start+1}__") delattr(builtins, f"__outp{arg+start+1}__") except Exception: missing += 1 if missing: for k,v in var_copy.items(): if k not in var: var[k] = v missing -= 1 if missing == 0: break else: ln = len(var) var_copy = var.copy() var.clear() for arg in range(0, ln): try: var.append(getattr(builtins, f"__outp{arg+start}__")) delattr(builtins, f"__outp{arg+start}__") except Exception: var.append(var_copy[arg]) return var
[docs]def fetch_perl_global(perlname): """Fetch the value of a package global variable specified by it's perl name""" (packname, varname) = perlname.rsplit('::', maxsplit=1) packname = packname.replace('::', '.') if packname == '': packname = 'main' if not hasattr(builtins, packname): init_package(packname) namespace = getattr(builtins, packname) if varname in _PYTHONIZER_KEYWORDS: varname += '_' if hasattr(namespace, varname): return getattr(namespace, varname) if varname == '' or varname == '_h': # They want the namespace dictionary return namespace.__dict__ return None
_fileinput_iter = None
[docs]def fileinput_next(files=None, inplace=False, backup='',*, mode='r', openhook=None, encoding=None, errors=None): """Implementation of fileinput.input() where it can be called multiple times for the same <> operator""" global _fileinput_iter if _fileinput_iter is None: try: (mode, encoding, errors, newline) = handle_open_pragma(mode, encoding, errors) except NameError: pass try: _fileinput_iter = fileinput.input(files=files, inplace=inplace, backup=backup, mode=mode, openhook=openhook, encoding=encoding, errors=errors) except TypeError: # pythons older than 3.10 don't have encoding and errors _fileinput_iter = fileinput.input(files=files, inplace=inplace, backup=backup, mode=mode, openhook=openhook) result = next(_fileinput_iter, None) if result is None: _fileinput_iter = None return result
[docs]def fileno(fh): global OS_ERROR, TRACEBACK, AUTODIE try: if isinstance(fh, list) and len(fh) == 2 and isinstance(fh[0], list) and isinstance(fh[1], int): # DIRHANDLE raise TypeError("Directories have no associated fileno"); return fh.fileno() except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"fileno failed: {OS_ERROR}",skip=2) if AUTODIE: raise return None
[docs]def fileparse(*args): """Split a path into basename, dirpath, and (optional) suffixes. Translated from perl File::Basename for unix, plus annotations""" fullname = args[0] suffixes = args[1:] if fullname is None: raise Die("fileparse(): need a valid pathname") fullname = str(fullname) [dirpath,basename] = (_m:=re.search(re.compile(r'^(.*/)?(.*)',re.S),fullname),_m.groups() if _m else [None,None])[1] if not (dirpath): dirpath = './' tail='' suffix='' if suffixes: for suffix in suffixes: if(isinstance(suffix, re.Pattern)): # in case they use qr suffix = suffix.pattern pat=f"({suffix})$" def sub(_m): nonlocal tail tail = _m.group(1) + tail return '' basename = re.sub(re.compile(pat,re.S),sub,basename,count=1) return (basename, dirpath, tail)
[docs]def file_exists(path): # -e if not path: return '' # False if hasattr(path, 'cando'): return 1 # True return 1 if os.path.exists(path) else ''
[docs]def file_size(path): # -s if not path: return None if hasattr(path, '_size'): return path._size return os.path.getsize(path)
[docs]def filter_map(f, i): """Given a function f that returns a tuple of (new_val, include) and an iterable i, return an iterable of new_vals where include is True""" for v in i: (new_val, include) = f(v) if include: yield new_val
_finditer_pattern = None _finditer_string = None _finditer_iter = None
[docs]def finditer_next(pattern, string, flags=0): """Implementation of re.finditer() where it can be called multiple times for the same pattern and string""" global _finditer_iter, _finditer_pattern, _finditer_string if _finditer_pattern != pattern or _finditer_string != string: _finditer_iter = None if _finditer_iter is None: _finditer_iter = re.finditer(pattern, string, flags) _finditer_pattern = pattern _finditer_string = string result = next(_finditer_iter, None) if result is None: _finditer_iter = None return result
[docs]def find_encoding(encoding): """Implementation of Encode::find_encoding""" try: info = codecs.lookup(encoding) if hasattr(info, '_obj'): # We defined it return info._obj decod = functools.partial(decode, encoding) encod = functools.partial(encode, encoding) name = lambda: encoding mime_name = lambda: info.name return type('Encode.Encoding', tuple(), dict(decode=decod, encode=encod, name=name, mime_name=mime_name)) except LookupError: return None
[docs]def find_mime_encoding(encoding): """Implementation of Encode::find_mime_encoding""" obj = find_encoding(encoding) if hasattr(obj, 'mime_name'): mime_name = obj.mime_name() def normalize(enc): # Make sure 'ISO-8859-1' matches 'iso8859-1' return enc.lower().replace('-', '').replace('_', '') if normalize(mime_name) != normalize(encoding): return None return obj
[docs]def flatten(lst): """Flatten a list down to 1 level""" result = [] if (not isinstance(lst, collections.abc.Iterable)) or isinstance(lst, str): return [lst] for elem in lst: if hasattr(elem, 'isHash'): # Array or Hash result = Array(result) if elem.isHash: for e in itertools.chain.from_iterable(elem.items()): result.extend(flatten(e)) else: for e in elem: result.extend(flatten(e)) elif isinstance(elem, collections.abc.Mapping): for e in itertools.chain.from_iterable(elem.items()): result.extend(flatten(e)) elif isinstance(elem, collections.abc.Iterable) and not isinstance(elem, str): for e in elem: result.extend(flatten(e)) else: result.append(elem) return result
[docs]def flock(fd, operation): """ Replacement for perl Fcntl flock function""" global OS_ERROR, TRACEBACK, AUTODIE try: # To avoid the possibility of miscoordination, Perl now flushes FILEHANDLE before locking or unlocking it. fd.flush() fc_py.flock(fd, operation) return 1 except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"flock failed: {OS_ERROR}",skip=2) if AUTODIE: raise return ''
[docs]def flt(expr): """Convert expr to a float number Ref: https://squareperl.com/en/how-perl-convert-string-to-number""" if not expr: return 0 try: return +expr # Unary plus: The fastest way to test for numeric except Exception: pass for _ in range(2): try: f = float(expr) return f except Exception: pass if isinstance(expr, str): if not (m:=re.match(r'^\s*([+-]?(?:\d+(?:[.]\d*)?(?:[eE][+-]?\d+)?|[.]\d+(?:[eE][+-]?\d+)?))', expr)): break expr = m.group(1); elif isinstance(expr, bytes): if not (m:=re.match(br'^\s*([+-]?(?:\d+(?:[.]\d*)?(?:[eE][+-]?\d+)?|[.]\d+(?:[eE][+-]?\d+)?))', expr)): break expr = m.group(1); else: return expr if WARNING == 2: die(f"Argument \"{expr}\" isn't numeric in numeric context", skip=1) if WARNING: #caller = inspect.getframeinfo(inspect.stack()[1][0]) warn(f"Argument \"{expr}\" isn't numeric in numeric context", skip=1) return 0
[docs]def format_(fmt, args=None): """Like % formatter in python, but auto-converts the args to the proper types""" fmt = str(fmt) if args is None: args = [] if isinstance(args, collections.abc.Iterable) and not isinstance(args, str): args = list(args) else: args = [args] fmt_regex = re.compile(r'%(?:[#0+ -])*(\*|\d+)?([.](?:\*|\d+))?[hlL]?([diouxXeEfFgGcrsa])') num_fmts = set('diouxXeEfFgG') i = 0 for m in re.finditer(fmt_regex, fmt.replace('%%', '')): if i >= len(args): args.append('') if m.group(1) == '*': args[i] = int_(args[i]) i += 1 if m.group(2) == '.*': args[i] = int_(args[i]) i += 1 if m.group(3) in num_fmts: args[i] = num(args[i]) i += 1 return fmt % tuple(args)
[docs]def format_write(fh): """Implementation of perl format_write""" raise NotImplementedError
[docs]def from_to(octets, from_enc, to_enc, check=None): """Implementation of Encode::from_to""" result = encode(to_enc, decode(from_enc, octets), check) return (result, len(result))
[docs]def getc(fh): """Implementation of perl getc""" fh._last_pos = fh.tell() # for ungetc return fh.read(1)
[docs]def getpos(fh): """Implementation of perl $fh->getpos""" return fh.tell()
[docs]def getsignal(signum): """Handle references to %SIG not on the LHS of expression""" result = signal.getsignal(signum) if result == signal.SIG_IGN: return 'IGNORE' elif result == signal.SIG_DFL: return 'DEFAULT' return result
[docs]def get_access_age_days(path): # -A """Implementation of perl -A""" global OS_ERROR, TRACEBACK, AUTODIE if not path: return None if hasattr(path, '_atime'): t = path._atime else: try: if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name t = os.path.getatime(path) except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"-A {path} failed: {OS_ERROR}",skip=2) if AUTODIE: raise return 0 return (BASETIME - t) / 86400.0
[docs]def get_creation_age_days(path): # -C """Implementation of perl -C""" global OS_ERROR, TRACEBACK, AUTODIE if not path: return None if hasattr(path, '_ctime'): t = path._ctime else: try: if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name t = os.path.getctime(path) except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"-C {path} failed: {OS_ERROR}",skip=2) if AUTODIE: raise return 0 return (BASETIME - t) / 86400.0
[docs]def get_element(base, index): """Safe element getter from a list, tuple, or Array - returns None if the element doesn't exist""" if index < 0: index += len(base) if index >= 0 and index < len(base): return base[index] return None
[docs]def get_layers(fh): """Implementation of PerlIO::get_layers""" result = ['unix', 'perlio'] if fh.encoding == 'UTF-8': if fh.errors == 'strict': result.append('encoding(utf-8-strict)') else: result.append('encoding(utf-8)') result.append('utf8') if fh.newlines == "\n": result.append('lf') elif fh.newlines == "\r\n": result.append('crlf') return result
[docs]def get_mod_age_days(path): # -M """Implementation of perl -M""" global OS_ERROR, TRACEBACK, AUTODIE if not path: return None if hasattr(path, '_mtime'): t = path._mtime else: try: if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name t = os.path.getmtime(path) except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"-M {path} failed: {OS_ERROR}",skip=2) if AUTODIE: raise return 0 return (BASETIME - t) / 86400.0
[docs]def get_subref(ref): """Convert a sub reference to a callable sub. 'ref' may already be callable or it could be the name of the sub. Returns None if the sub isn't callable""" if callable(ref): return ref if isinstance(ref, str): ref = ref.replace('::', '.').replace("'", '.') ld = ref.rfind('.') if ld == -1: def caller_globals(): frame = inspect.currentframe() try: caller_frame = frame.f_back return caller_frame.f_globals finally: del frame glb = caller_globals() if ref in _PYTHONIZER_KEYWORDS: ref += '_' if ref in glb: result = glb[ref] if callable(result): return result packname = builtins.__PACKAGE__ sub = ref else: packname = ref[0:ld] if packname == '': packname = 'main' sub = ref[ld+1:] if sub in _PYTHONIZER_KEYWORDS: sub += '_' if hasattr(builtins, packname): namespace = getattr(builtins, packname) if hasattr(namespace, sub): result = getattr(namespace, sub) if callable(result): return result return None
[docs]def gmtime(secs=None): """Replacement for perl built-in gmtime function""" try: gmt = tm_py.gmtime(secs) except Exception: try: import datetime dt = datetime.datetime.utcfromtimestamp(0) + datetime.timedelta(seconds=secs) gmt = dt.timetuple() except Exception: return (9, 9, 9, 9, 9, 99999, 0, 9, 0) return (gmt.tm_sec, gmt.tm_min, gmt.tm_hour, gmt.tm_mday, gmt.tm_mon-1, gmt.tm_year-1900, (gmt.tm_wday+1)%7, gmt.tm_yday-1, 0)
[docs]def handle_open_pragma(mode, encoding, errors, newline="\n"): """Handle any "use open" pragma that may be in effect""" if encoding is not None: return (mode, encoding, errors, newline) layers = None if ('r' in mode or mode == '-|') and INPUT_LAYERS: layers = INPUT_LAYERS elif OUTPUT_LAYERS: layers = OUTPUT_LAYERS else: return (mode, encoding, errors, newline) layers = layers.replace(':', '') if layers == 'raw' or layers == 'bytes': if 'b' not in mode: mode += 'b' newline = None elif layers.startswith('encoding('): encoding = layers.replace('encoding(','').replace(')','') errors = 'replace' elif layers == 'utf8': encoding = 'UTF-8' errors = 'ignore' elif layers == 'crlf': newline = None return (mode, encoding, errors, newline)
[docs]def has_setgid(path): # -g if not path: return '' # False if hasattr(path, '_mode'): return 1 if (path._mode & st_py.S_ISGID) != 0 else '' return 1 if (os.stat(path).st_mode & st_py.S_ISGID) != 0 else ''
[docs]def has_setuid(path): # -u if not path: return '' # False if hasattr(path, '_mode'): return 1 if (path._mode & st_py.S_ISUID) != 0 else '' return 1 if (os.stat(path).st_mode & st_py.S_ISUID) != 0 else ''
[docs]def has_sticky(path): # -k if not path: return '' # False if hasattr(path, '_mode'): return 1 if (path._mode & st_py.S_ISVTX) != 0 else '' return 1 if (os.stat(path).st_mode & st_py.S_ISVTX) != 0 else ''
[docs]def hires_alarm(floating_seconds, interval_floating_seconds=0): """Implementation of Time::HiRes::alarm""" if interval_floating_seconds == 0: signal.setitimer(signal.ITIMER_REAL, floating_seconds) return floating_seconds else: import threading def send_sigalrm(start_time, interval): signal.raise_signal(signal.SIGALRM) current_time = tm_py.time() elapsed_time = current_time - start_time next_interval = interval - (elapsed_time % interval) t = threading.Timer(next_interval, send_sigalrm, [start_time, interval]) t.start() start_time = tm_py.time() + floating_seconds t = threading.Timer(floating_seconds, send_sigalrm, [start_time, interval_floating_seconds]) t.start() return floating_seconds
[docs]def hires_clock(): """Implementation of Time::HiRes::clock""" return tm_py.process_time()
[docs]def hires_clock_getres(which): """Implementation of Time::HiRes::clock_getres""" if not which or not hasattr(tm_py, 'clock"getres'): return tm_py.get_clock_info('time').resolution return tm_py.clock_getres(which)
[docs]def hires_clock_gettime(which): """Implementation of Time::HiRes::clock_gettime""" if not which or not hasattr(tm_py, 'clock_gettime'): return tm_py.time() return tm_py.clock_gettime(which)
[docs]def hires_clock_nanosleep(which, nanoseconds, flags=0): if flags: nanoseconds = (nanoseconds / 1_000_000_000 - tm_py.time()) * 1_000_000_000 start_time = tm_py.time() if nanoseconds > 0: tm_py.sleep(nanoseconds / 1_000_000_000) if flags: return tm_py.time() * 1_000_000_000 return (tm_py.time() - start_time) * 1_000_000_000
[docs]def hires_getitimer_s(which): """Implementation of Time::HiRes::getitimer in scalar context""" return signal.getitimer(which)[0]
[docs]def hires_gettimeofday(): """Implementation of Time::HiRes::gettimeofday in list context""" current_time = tm_py.time() seconds, fraction = divmod(current_time, 1) microseconds = int(fraction * 1_000_000) return (seconds, microseconds)
[docs]def hires_lstat(path): """Implementation of Time::HiRes::lstat""" try: if hasattr(path, 'fileno') and os.lstat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name s = os.lstat(path) except Exception: return () result = (s.st_dev, s.st_ino, s.st_mode, s.st_nlink, s.st_uid, s.st_gid, s.st_rdev if hasattr(s, 'st_rdev') else 0, s.st_size, s.st_atime_ns / 1_000_000_000, s.st_mtime_ns / 1_000_000_000, s.st_ctime_ns / 1_000_000_000, s.st_blksize if hasattr(s, 'st_blksize') else 512, s.st_blocks if hasattr(s, 'st_blocks') else s.st_size // 512) return result
[docs]def hires_nanosleep(nanoseconds): """Implementation of Time::HiRes::nanosleep""" tm_py.sleep(nanoseconds / 1_000_000_000)
[docs]def hires_setitimer_s(which, floating_seconds, interval_floating_seconds=0): """Implementation of Time::HiRes::setitimer in scalar context""" return signal.setitimer(which, floating_seconds, interval_floating_seconds)[0]
[docs]def hires_stat(path): """Implementation of Time::HiRes::stat""" try: if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name s = os.stat(path) except Exception: return () result = (s.st_dev, s.st_ino, s.st_mode, s.st_nlink, s.st_uid, s.st_gid, s.st_rdev if hasattr(s, 'st_rdev') else 0, s.st_size, s.st_atime_ns / 1_000_000_000, s.st_mtime_ns / 1_000_000_000, s.st_ctime_ns / 1_000_000_000, s.st_blksize if hasattr(s, 'st_blksize') else 512, s.st_blocks if hasattr(s, 'st_blocks') else s.st_size // 512) return result
[docs]def hires_tv_interval(t0, t1=None): """Implementation of Time::HiRes::tv_interval""" if t1 is None: t1 = _gettimeofday() return (t1[0] - t0[0]) + (t1[1] - t0[1]) / 1_000_000
[docs]def hires_ualarm(useconds, interval_useconds=0): """Implementation of Time::HiRes::ualarm""" return hires_alarm(useconds / 1_000_000, interval_useconds / 1_000_000)
[docs]def hires_usleep(useconds): """Implementation of Time::HiRes::usleep""" tm_py.sleep(useconds / 1_000_000)
[docs]def hires_utime(atime, mtime, *args): """Implementation of Time::HiRes::utime function""" global TRACEBACK, AUTODIE, OS_ERROR result = 0 OS_ERROR = '' ntimes = None if atime is None and mtime is None: pass elif atime is None: atime = 0 elif mtime is None: mtime = 0 ntimes = (int(atime*1_000_000_000), int(mtime*1_000_000_000)) for fd in args: try: if hasattr(fd, 'fileno') and os.utime in os.supports_fd: fd = fd.fileno() elif hasattr(fd, 'name'): fd = fd.name os.utime(fd, ns=ntimes) result += 1 except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"Time::HiRes::utime({atime}, {mtime}, {fd}) failed: {OS_ERROR}",skip=2) if AUTODIE: raise return result
[docs]def import_(globals, path, module=None, fromlist=None, version=None, is_do=False): """Handle use/require statement from perl. 'path' is the relative or absolute path to the .py file of the module (the extension is ignored if specified). If 'module' is specified, then that is effectively added to the path, 'fromlist' is the list of desired functions to import. 'version' will perform a version check. 'is_do' handles a 'do EXPR;' statement.""" global OS_ERROR, EVAL_ERROR if not hasattr(builtins, '__PACKAGE__'): caller_package = 'main' else: caller_package = builtins.__PACKAGE__ pathname = None path = path.replace('::', '/') if module is not None: path = f'{path}/{module}' module = None if not os.path.isabs(path): path = os.path.splitext(path)[0] pathname = path.replace('.', '').replace('/', '.') if pathname[0] == '.': pathname = pathname[1:] if path[0] == '.': pass else: for pa in sys.path: if os.path.isfile(os.path.join(pa, path, '__init__.py')): path = os.path.join(pa, path) break elif os.path.isfile(os.path.join(pa, f'{path}.py')): path = os.path.join(pa, path) break else: if not is_do: msg = f"Can't locate {path}.py in sys.path (sys.path contains: {' '.join(sys.path)})" raise ImportError(msg) [path, module] = os.path.split(os.path.splitext(os.path.abspath(path))[0]) if is_do: sys.modules.pop(module, None) if module in sys.modules and \ hasattr((mod:=sys.modules[module]), '__file__') and \ os.path.join(path, module) + '.py' == mod.__file__: pass else: try: sys.path.insert(0, path) mod = __import__(module, globals=globals, fromlist=['*']) sys.modules[module] = mod except ImportError as _i: if is_do: OS_ERROR = str(_i) return None else: raise except Exception as _e: if is_do: EVAL_ERROR = str(_e) return None else: raise finally: sys.path.pop(0) if hasattr(mod, 'VERSION') and version is not None: if isinstance(version, str) and version[0] == 'v': version = version[1:] try: version = float(version) except Exception: version = 0.0 mod_version = None try: mod_version = float(mod.VERSION) except Exception: pass if mod_version is not None and version > mod_version: raise ValueError(f"For import {module}, desired version {version} > actual version {mod_version} at {path}") # globals[module] = mod if fromlist is None: return 1 # use X (); if not isinstance(fromlist, (list, tuple)): fromlist = [fromlist] actual_imports = set() export = () export_ok = () export_tags = dict() for pn in (pathname, builtins.__PACKAGE__): # builtins.__PACKAGE__ is now the module's package, not ours if pn is not None and hasattr(builtins, pn): module_namespace = getattr(builtins, pn) if hasattr(module_namespace, 'EXPORT_a'): export = getattr(module_namespace, 'EXPORT_a') if hasattr(module_namespace, 'EXPORT_OK_a'): export_ok = getattr(module_namespace, 'EXPORT_OK_a') if hasattr(module_namespace, 'EXPORT_TAGS_h'): export_tags = getattr(module_namespace, 'EXPORT_TAGS_h') builtins.__PACKAGE__ = caller_package if (fromlist[0] == '*' or fromlist[0] == ':all') and hasattr(mod, '__all__'): actual_imports = set(mod.__all__) elif fromlist[0] == '*' and not export: for key in mod.__dict__.keys(): if callable(mod.__dict__[key]) and key[0] != '_': actual_imports.add(key) else: # This should mirror the code in pythonizer expand_extras: for desired in fromlist: if (ch:=desired[0]) == '!': if desired == fromlist[0]: actual_imports = set(export) ch2 = desired[1] if ch2 == ':': tag = desired[2:] if tag in export_tags: for e in export_tags[tag][0]: actual_imports.discard(e) elif ch2 == '/': pat = re.compile(desired[2:-1]) for e in (export + export_ok): if re.search(pat, e): actual_imports.discard(e) else: actual_imports.discard(desired[1:]) elif ch == ':': tag = desired[1:] if tag in export_tags: for e in export_tags[tag][0]: actual_imports.add(e) elif tag == 'DEFAULT': actual_imports.update(set(export)) elif ch == '/': pat = re.compile(desired[1:-1]) for e in (export + export_ok): if re.search(pat, e): actual_imports.add(e) elif desired == '*': actual_imports.update(set(export)) elif ch == '-' or not re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', desired): pass else: actual_imports.add(desired) actual_imports = list(actual_imports) sig_map = {'$': '_v', '@': '_a', '%': '_h'} for i in range(len(actual_imports)): perl_name = actual_imports[i] sig = perl_name[0] if sig == '&': perl_name = perl_name[1:] if hasattr(mod, perl_name+'_'): actual_imports[i] = perl_name+'_' elif sig in ('$', '@', '%'): perl_name = perl_name[1:] sm = sig_map[sig] if hasattr(mod, perl_name+sm): actual_imports[i] = perl_name+sm elif hasattr(mod, perl_name+'_'): actual_imports[i] = perl_name+'_' elif hasattr(mod, perl_name+'_'): actual_imports[i] = perl_name+'_' namespace = None if not hasattr(builtins, caller_package): init_package(caller_package) namespace = getattr(builtins, caller_package) for imp in actual_imports: if hasattr(mod, imp): mi = getattr(mod, imp) globals[imp] = mi if namespace: setattr(namespace, imp, mi) return 1
[docs]def init_global(packname, varname, value): """Return the proper value to initialize a package global variable only once""" namespace = getattr(builtins, packname) if hasattr(namespace, varname): return getattr(namespace, varname) setattr(namespace, varname, value) return value
[docs]def init_out_parameters(arglist, *_args): """Initialize sub's out parameters. Pass the arglist of the sub and a list of the sub's out parameters, counting from 0. If no list is passed, then all args are assumed to be out parameters""" if len(_args) == 0: for i in range(len(arglist)): try: setattr(builtins, f"__outp{i}__", arglist[i]) except Exception: pass return for i in _args: try: setattr(builtins, f"__outp{i}__", arglist[i]) except Exception: pass
[docs]def input_line_number(fh, value=None): """Implementation of perl input_line_number""" global INPUT_LINE_NUMBER if value is None: try: return fileinput.lineno() except RuntimeError: return INPUT_LINE_NUMBER else: prev = input_line_number(fh) INPUT_LINE_NUMBER = value return prev
[docs]def int_(expr): """Convert expr to an integer""" if not expr: return 0 if isinstance(expr, int): return +expr # The Unary plus will convert True to 1 try: return int(expr) except Exception: pass if not isinstance(expr, (str, bytes)): if isinstance(expr, complex): return int_(expr.real) return expr if (m:=re.match(r'^\s*([+-]?(?:\d+))', expr)): return int(m.group(1)) if WARNING == 2: die(f"Argument \"{expr}\" isn't numeric in integer context", skip=1) if WARNING: #caller = inspect.getframeinfo(inspect.stack()[1][0]) warn(f"Argument \"{expr}\" isn't numeric in integer context", skip=1) return 0
[docs]def ioctl(fh, func, scalar): global AUTODIE, TRACEBACK, OS_ERROR """Implementation of perl ioctl""" try: result = fc_py.ioctl(fh, func, scalar) if result == 0: return "0 but true" if result == -1: return None return result except Exception as e: OS_ERROR = str(e) if TRACEBACK: cluck(f"ioctl failed: {OS_ERROR}",skip=2) if AUTODIE: raise return None
def _create_all_fh_methods(fh): """Create all special methods for OO filehandles""" methods=dict(autoflush=autoflush,binmode=binmode, close_=close_, eof=eof, fcntl=fcntl, format_write=format_write, getc=getc, getpos=getpos, ioctl=ioctl, input_line_number=input_line_number, open=IOFile_open, print_=print_, printf=printf, say=say, setpos=setpos, # READ is handled specially because of the output scalar: read=read, stat=stat, # SYSREAD needs to be handled like READ sysread=sysread, sysseek=sysseek, syswrite=syswrite, truncate=truncate, ungetc=ungetc, write_=write_, ) for method, func in methods.items(): setattr(fh, method, types.MethodType(func, fh)) fh.getline = fh.readline fh.getlines = fh.readlines return fh
[docs]def IOFile(path=None, mode=None, perms=None): """Implementation of IO::File->new()""" global TRACEBACK, AUTODIE try: if path is None: fh = io.TextIOWrapper(io.BufferedIOBase()) fh.close() return _create_all_fh_methods(fh) if perms is None: perms = 0o777 #fh = os.fdopen(os.open(path, mode, perms)) fh = IOFile_open(path, mode, perms) return _create_all_fh_methods(fh) except Exception as e: if TRACEBACK: if perms is None: if mode is None: cluck(f"IO::File->new({path}) failed: {OS_ERROR}",skip=2) else: cluck(f"IO::File->new({path}, {mode}) failed: {OS_ERROR}",skip=2) else: cluck(f"IO::File->new({path}, {mode}, {perms}) failed: {OS_ERROR}",skip=2) if AUTODIE: raise fh = io.TextIOWrapper(io.BufferedIOBase()) fh.close() return _create_all_fh_methods(fh)
[docs]def IOFile_from_fd(fd, mode): """Implementation of IO::File::new_from_fd()""" global TRACEBACK, AUTODIE try: return fdopen(None, fd, mode) except Exception as e: if TRACEBACK: cluck(f"IO::File::new_from_fd({fd}, {mode}) failed: {OS_ERROR}",skip=2) if AUTODIE: raise return None
def _open_mode_string(mode): if not ((_m:=re.search(r'^\+?(<|>>?)$',mode))): if not (mode:=re.sub(r'^r(\+?)$',r'\g<1><',mode, count=1)): if not (mode:=re.sub(r'^w(\+?)$',r'\g<1>>',mode, count=1)): if not (mode:=re.sub(r'^a(\+?)$',r'\g<1>>>',mode, count=1)): croak(f"IO::Handle: bad open mode: {mode}") return mode
[docs]def IOFile_open(fh, filename, mode=None, perms=None): """Implementation of perl $fh->open method""" if mode is not None: if isinstance(mode, str) and re.match(r'^\d+$', mode): mode = int(mode) if perms is None: perms = 0o666 result = os.fdopen(os.open(filename, mode, perms)) elif ':' in mode: result = open_dynamic(filename, mode, checked=False) else: result = open_dynamic(filename, _open_mode_string(mode), checked=False) else: encoding = errors = None if hasattr(fh, 'encoding'): encoding = fh.encoding errors = fh.errors result = open_dynamic(filename,encoding=encoding,errors=errors) if not fh.closed: fh.close() return _create_all_fh_methods(result)
[docs]def IOFile_tmpfile(): """Implementation of IO::File->new_tmpfile""" fh = tempfile.NamedTemporaryFile() return _create_all_fh_methods(fh)
[docs]def isa(self, classname): """Implementation of UNIVERSAL::isa and $obj->isa and $cls->isa""" if hasattr(self, 'isa'): return self.isa(classname) _ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR', "<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR', "<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY', "<class 'dict'>": 'HASH'} t = str(type(self)) if t in _ref_map: return 1 if _ref_map[t] == classname else '' elif '_ArrayHash' in t: if self.isHash: return 1 if 'HASH' == classname else '' return 1 if 'ARRAY' == classname else '' elif classname == 'IO::Handle': return 1 if isinstance(self, io.IOBase) else '' elif classname == 'UNIVERSAL': return 1 elif classname == 'GLOB': # Assume all file handles and subs are globs return 1 if isinstance(self, io.IOBase) or callable(self) else '' elif classname == 'CODE': return 1 if callable(self) else '' classname = classname.replace("'", '.').replace('::', '.') if hasattr(builtins, classname): the_class = getattr(builtins, classname) if isinstance(the_class, type): # make sure it's a class and not a namespace if isinstance(self, the_class): return 1 if isinstance(self, type) and issubclass(self, the_class): return 1 elif self == the_class: return 1 else: if hasattr(self, 'ISA_a'): for parent_name in getattr(self, 'ISA_a'): parent_name = parent_name.replace("'", '.').replace('::', '.') if hasattr(builtins, parent_name): return isa(getattr(builtins, parent_name), classname) return '' return '' # False
[docs]def isa_op(self, classname): """Implementation of isa operator""" if hasattr(classname, '__name__'): classname = classname.__name__ elif not isinstance(classname, str) and hasattr(classname, '__class__'): classname = classname.__class__.__name__ if hasattr(self, 'isa'): return self.isa(classname) _ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR', "<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR', "<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY', "<class 'dict'>": 'HASH'} if isinstance(self, type): return '' # isa operator needs an object on the LHS, not a class t = str(type(self)) if t in _ref_map: return 1 if _ref_map[t] == classname else '' elif '_ArrayHash' in t: if self.isHash: return 1 if 'HASH' == classname else '' return 1 if 'ARRAY' == classname else '' elif classname == 'IO::Handle' or classname == 'IO.Handle': return 1 if isinstance(self, io.IOBase) else '' elif classname == 'UNIVERSAL': return 1 elif classname == 'GLOB': # Assume all file handles and subs are globs return 1 if isinstance(self, io.IOBase) or callable(self) else '' elif classname == 'CODE': return 1 if callable(self) else '' classname = classname.replace("'", '.').replace('::', '.') if hasattr(builtins, classname): the_class = getattr(builtins, classname) if isinstance(the_class, type): # make sure it's a class and not a namespace if isinstance(self, the_class): return 1 if isinstance(self, type) and issubclass(self, the_class): return 1 elif self == the_class: return '' # Not an object else: if hasattr(self, 'ISA_a'): for parent_name in getattr(self, 'ISA_a'): parent_name = parent_name.replace("'", '.').replace('::', '.') if hasattr(builtins, parent_name): return isa(getattr(builtins, parent_name), classname) return '' return '' # False
[docs]def is_block_special(path): # -b if not path: return '' # False if hasattr(path, '_mode'): return 1 if st_py.S_ISBLK(path._mode) else '' if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if st_py.S_ISBLK(os.stat(path).st_mode) else ''
[docs]def is_char_special(path): # -c if not path: return '' # False if hasattr(path, '_mode'): return 1 if st_py.S_ISCHR(path._mode) else '' if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if st_py.S_ISCHR(os.stat(path).st_mode) else ''
[docs]def is_dir(path): # -d if not path: return '' # False if hasattr(path, '_mode'): return 1 if st_py.S_ISDIR(path._mode) else '' if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.path.isdir(path) else ''
[docs]def is_empty_file(path): # -z if not path: return None if hasattr(path, '_size'): return 1 if path._size == 0 else '' if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if not os.path.getsize(path) else ''
[docs]def is_executable(path): # -x if not path: return '' # False if hasattr(path, 'cando'): return 1 if path.cando(st_py.S_IXUSR, 1) else '' if hasattr(path, 'fileno') and os.access in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.access(path, os.X_OK, effective_ids=(os.access in os.supports_effective_ids)) else ''
[docs]def is_file(path): # -f if not path: return '' # False if hasattr(path, '_mode'): return 1 if st_py.S_ISREG(path._mode) else '' if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.path.isfile(path) else ''
[docs]def is_owned(path): # -o if not path: return '' # False if hasattr(path, '_uid'): return 1 if path._uid == os.geteuid() else '' if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.stat(path).st_uid == os.geteuid() else ''
[docs]def is_pipe(path): # -p if not path: return '' # False if hasattr(path, '_mode'): return 1 if st_py.S_ISFIFO(path._mode) else '' if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if st_py.S_ISFIFO(os.stat(path).st_mode) else ''
[docs]def is_readable(path): # -r if not path: return '' # False if hasattr(path, 'cando'): return 1 if path.cando(st_py.S_IRUSR, 1) else '' if hasattr(path, 'fileno') and os.access in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.access(path, os.R_OK, effective_ids=(os.access in os.supports_effective_ids)) else ''
[docs]def is_real_executable(path): # -X if not path: return '' # False if hasattr(path, 'cando'): return 1 if path.cando(st_py.S_IXUSR, 0) else '' if hasattr(path, 'fileno') and os.access in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.access(path, os.X_OK) else ''
[docs]def is_real_owned(path): # -O if not path: return '' # False if hasattr(path, '_uid'): return 1 if path._uid == os.getuid() else '' if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.stat(path).st_uid == os.getuid() else ''
[docs]def is_real_readable(path): # -R if not path: return '' # False if hasattr(path, 'cando'): return 1 if path.cando(st_py.S_IRUSR, 0) else '' if hasattr(path, 'fileno') and os.access in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.access(path, os.R_OK) else ''
[docs]def is_real_writable(path): if hasattr(path, 'cando'): return 1 if path.cando(st_py.S_IRUSR, 0) else '' if hasattr(path, 'fileno') and os.access in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.access(path, os.W_OK) else ''
[docs]def is_socket(path): # -S if not path: return '' # False if hasattr(path, '_mode'): return 1 if st_py.S_ISSOCK(path._mode) else '' if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if st_py.S_ISSOCK(os.stat(path).st_mode) else ''
[docs]def is_tty(path): # -t if not path: return '' # False if hasattr(path, 'isatty'): return 1 if path.isatty() else '' if isinstance(path, tuple): raise ValueError('-t not supported on File_stat') if hasattr(path, 'name'): path = path.name try: with open(path, 'r') as t: return 1 if t.isatty() else '' except Exception: return '' # False
[docs]def is_utf8(s, check=False): """Implementation of Encode::is_utf8""" if check: return utf8_is_utf8(s) try: s = str(s) if s.isascii(): return '' s.encode() except Exception: return '' return 1
[docs]def is_writable(path): # -w if not path: return '' # False if hasattr(path, 'cando'): return 1 if path.cando(st_py.S_IWUSR, 1) else '' if hasattr(path, 'fileno') and os.access in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name return 1 if os.access(path, os.W_OK, effective_ids=(os.access in os.supports_effective_ids)) else ''
[docs]def kill(sig, *args): """Implementation of perl kill function""" global AUTODIE, TRACEBACK, OS_ERROR if isinstance(sig, str): neg = 1 if sig.startswith('-'): neg = -1 if not sig.startswith('SIG'): sig = f"SIG{sig}" if not sig in signal.Signals.__members__: carp(f'Unrecognized signal name "{sig}"') return 0 sig = signal.Signals.__members__[sig] * neg result = 0 for pid in args: try: os.kill(pid, sig) result += 1 except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"kill({sig}, {pid}) failed: {OS_ERROR}", skip=2) if AUTODIE: raise return result
[docs]def lcfirst(string): """Implementation of lcfirst and \l in interpolated strings: lowercase the first char of the given string""" return string[0:1].lower() + string[1:]
[docs]def list_of_at_least_n(lst, n): """For assignment to (list, ..., *last) - make this list at least the right size.""" if lst is None or (hasattr(lst, 'isHash') and lst.isHash) or not (isinstance(lst, collections.abc.Sequence) and not isinstance(lst, str)): lst = [lst] la = len(lst) if la >= n: return lst return list(lst) + [None for _ in range(n-la)]
[docs]def list_of_n(lst, n): """For assignment to (list, ...) - make this list the right size""" if isinstance(lst, itertools.chain): lst = list(lst) if lst is None or (hasattr(lst, 'isHash') and lst.isHash) or not (isinstance(lst, collections.abc.Sequence) and not isinstance(lst, str)): lst = [lst] la = len(lst) if la == n: return lst if la > n: return lst[:n] return list(lst) + [None for _ in range(n-la)]
[docs]def list_to_hash(lst): """Convert a flat list of key value pairs to a hash""" return {lst[i]: lst[i+1] for i in range(0, len(lst), 2)};
[docs]def localtime(secs=None): """Replacement for perl built-in localtime function""" try: lct = tm_py.localtime(secs) except Exception: try: import datetime dt = datetime.datetime.fromtimestamp(0) + datetime.timedelta(seconds=secs) lct = dt.timetuple() except Exception: return (9, 9, 9, 9, 9, 99999, 0, 9, 0) return (lct.tm_sec, lct.tm_min, lct.tm_hour, lct.tm_mday, lct.tm_mon-1, lct.tm_year-1900, (lct.tm_wday+1)%7, lct.tm_yday-1, lct.tm_isdst)
[docs]def logical_xor(a, b): """Implementation of perl's xor operator""" return 1 if (a or b) and not (a and b) else ''
[docs]def longmess(*args, skip=0): """Message with stack backtrace""" def ff(fn): fn = os.path.relpath(fn) if fn.startswith('./'): return fn[2:] return fn def fa(a): result = re.sub(r'^\(\*_args=(.*)\)$', r'\1',a).replace(',)', ')') if result == '[]': return '()' return result stack = inspect.stack() stack = stack[skip:] m = ''.join(map(str, args)) m += ' at ' + ff(stack[1].filename) + ' line ' + str(stack[1].lineno) + ".\n" for i in range(1, len(stack)-1): s = stack[i] s2 = stack[i+1] m += ' ' + s.function+fa(inspect.formatargvalues(*inspect.getargvalues(s.frame))) + ' called at ' + ff(s2.filename) + ' line ' + str(s2.lineno) + "\n" return m
[docs]def looks_like_binary(path): # -B """Implementation of perl -B""" if isinstance(path, tuple): return ValueError('-B not supported on File_stat') return 1 if not looks_like_text(path) else ''
[docs]def looks_like_text(path): # -T """Implementation of perl -T""" global TRACE_RUN if not isinstance(path, str): return ValueError('-T is only supported on paths') rtn = subprocess.run(f'file "{path}"',capture_output=True,text=True,shell=(os.name!='nt')) if TRACE_RUN: carp(f'trace -T {path}: {repr(rtn)}', skip=2) if rtn.returncode: return None rtn = rtn.stdout return 1 if 'text' in rtn else ''
[docs]def lstat(path): """Handle lstat call with or without "use File::stat;" """ if isinstance(path, File_stat): return path # for '_' special variable try: if hasattr(path, 'fileno') and os.lstat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name s = os.lstat(path) except Exception: return () result = File_stat(_dev=s.st_dev, _ino=s.st_ino, _mode=s.st_mode, _nlink=s.st_nlink, _uid=s.st_uid, _gid=s.st_gid, _rdev=s.st_rdev if hasattr(s, 'st_rdev') else 0, _size=s.st_size, _atime=s.st_atime, _mtime=s.st_mtime, _ctime=s.st_ctime, _blksize=s.st_blksize if hasattr(s, 'st_blksize') else 512, _blocks=s.st_blocks if hasattr(s, 'st_blocks') else s.st_size // 512) return result
[docs]def maketrans_c(arg1, arg2, delete=False): """Make a complement tr table for the 'c' flag. If the 'd' flag is passed, then delete=True. Ranges are expanded in arg1 and arg2 but arg2 is not otherwise normalized""" t = str.maketrans(arg1, arg1) d = dict() for i in range(257): if i not in t: if not arg2: if delete: d[i] = None else: d[i] = i elif i < len(arg2): d[i] = arg2[i] elif delete: d[i] = None else: d[i] = arg2[-1] return str.maketrans(d)
[docs]def make_list(*args): """For push/unshift @arr, expr; We use extend/[0:0] so make sure expr is iterable""" if len(args) == 1 and isinstance(args[0], collections.abc.Iterable) and not isinstance(args[0], str) and ( not hasattr(args[0], 'isHash') or not args[0].isHash): return args[0] return args
[docs]def map_int(*args): """Convert each element to an int""" return list(map(int_, flatten(args)))
[docs]def map_num(*args): """Convert each element to a num""" return list(map(num, flatten(args)))
[docs]def map_str(*args): """Convert each element to a str""" return list(map(_str, flatten(args)))
[docs]def method_call(cls_or_obj, methodname, *args, **kwargs): """Call a method by name in a class that can also be specified by name""" try: if methodname in _PYTHONIZER_KEYWORDS: methodname += '_' method = getattr(cls_or_obj, methodname) if hasattr(method, '__func__'): method = method.__func__ return method(cls_or_obj, *args, **kwargs) except AttributeError: if isinstance(cls_or_obj, str): cls_or_obj = cls_or_obj.replace('::', '.') if cls_or_obj in _PYTHONIZER_KEYWORDS: cls_or_obj += '_' if hasattr(builtins, cls_or_obj): cls_or_obj = getattr(builtins, cls_or_obj) method = getattr(cls_or_obj, methodname) if hasattr(method, '__func__'): method = method.__func__ return method(cls_or_obj, *args, **kwargs) except TypeError: if callable(methodname): method = methodname if isinstance(cls_or_obj, str): cls_or_obj = cls_or_obj.replace('::', '.') if cls_or_obj in _PYTHONIZER_KEYWORDS: cls_or_obj += '_' if hasattr(method, '__func__'): method = method.__func__ return method(cls_or_obj, *args, **kwargs) cluck(f"Can't locate object method \"{methodname}\" via package \"{_str(cls_or_obj)}\"", skip=2)
[docs]def mkdir(path, mode=0o777): global TRACEBACK, AUTODIE, OS_ERROR """Implementation of perl mkdir function""" try: os.mkdir(path, mode) return 1 except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: if mode == 0o777: cluck(f"mkdir({path}) failed: {OS_ERROR}",skip=2) else: cluck(f"mkdir({path}, 0o{mode:o}) failed: {OS_ERROR}",skip=2) if AUTODIE: raise return ''
[docs]def mkdtemp(template): """Implementation of File::Temp::mkdtemp()""" template = template.replace('X', '') (base, dirn, tail) = fileparse(template) return tempfile.mkdtemp(prefix=base, dir=dirn)
[docs]def mkstemp(template): """Implementation of File::Temp::mkstemp()""" template = template.replace('X', '') (base, dirn, tail) = fileparse(template) fh = tempfile.NamedTemporaryFile(prefix=base, dir=dirn, delete=False) return (fh, fh.name)
[docs]def mkstemps(template, suffix): """Implementation of File::Temp::mkstemps()""" template = template.replace('X', '') (base, dirn, tail) = fileparse(template) fh = tempfile.NamedTemporaryFile(prefix=base, dir=dirn, suffix=suffix, delete=False) return(fh, fh.name)
[docs]def mktemp(template): """Implementation of File::Temp::mktemp()""" template = template.replace('X', '') (base, dirn, tail) = fileparse(template) ntf = tempfile.NamedTemporaryFile(prefix=base, dir=dirn, delete=False) result = ntf.name ntf.close() return result
[docs]def mod_element(base, index, value): base[index] %= value return base[index]
[docs]def multiply_element(base, index, value): base[index] *= value return base[index]
[docs]def need_sh(cmd): """Does this command need a shell to run it?""" if os.name == 'nt': # windows if isinstance(cmd, (tuple, list)): for e in cmd: if need_sh(e): return True return False if re.search(r'[<>|&*]', cmd) or re.match(r'(?:copy|echo|dir|type|cd) ', cmd): return True return False return True
[docs]def nr(): """Get the current INPUT_LINE_NUMBER""" global INPUT_LINE_NUMBER try: return fileinput.lineno() except RuntimeError: return INPUT_LINE_NUMBER
[docs]def num(expr): """Convert expr to a number Ref: https://squareperl.com/en/how-perl-convert-string-to-number""" if expr is None: return 0 try: return +expr # Unary plus: The fastest way to test for numeric except Exception: pass #if isinstance(expr, (int, float)): #return expr #try: #return int(expr) #except Exception: #pass for _ in range(2): try: f = float(expr) if f.is_integer(): return int(f) return f except Exception: pass if isinstance(expr, str): if not (m:=re.match(r'^\s*([+-]?(?:\d+(?:[.]\d*)?(?:[eE][+-]?\d+)?|[.]\d+(?:[eE][+-]?\d+)?))', expr)): break expr = m.group(1); elif isinstance(expr, bytes): if not (m:=re.match(br'^\s*([+-]?(?:\d+(?:[.]\d*)?(?:[eE][+-]?\d+)?|[.]\d+(?:[eE][+-]?\d+)?))', expr)): break expr = m.group(1); elif hasattr(expr, 'isHash') and expr.isHash is None: return 0 elif isinstance(expr, object) and hasattr(expr, '__class__') and isinstance(expr.__class__, type): # a perl object if hasattr(expr, '_num_') and callable(expr._num_): return expr._num_() # use overload "0+" # Breaks Math::Complex operations! return id(expr) # Objects in == are compared by address return expr else: return expr if WARNING == 2: die(f"Argument \"{expr}\" isn't numeric in numeric context", skip=1) if WARNING: # caller = inspect.getframeinfo(inspect.stack()[1][0]) # warnings.warn(f"Argument \"{expr}\" isn't numeric in numeric context at {caller.filename}:{caller.lineno}") warn(f"Argument \"{expr}\" isn't numeric in numeric context", skip=1) return 0
def _create_fh_methods(fh): """Create special methods for filehandles""" try: fh.autoflush = types.MethodType(autoflush, fh) except NameError: # _autoflush is only brought in if we reference it pass return fh
[docs]def open_(file,mode,encoding=None,errors=None,checked=True,newline="\n"): """Replacement for perl built-in open function when the mode is known.""" global OS_ERROR, TRACEBACK, AUTODIE try: (mode, encoding, errors, newline) = handle_open_pragma(mode, encoding, errors, newline) except NameError: pass if 'b' not in mode and encoding is None: encoding = 'latin1' if errors is None: errors = 'ignore' file = str(file) # handle numeric filename try: if mode == '|-' or mode == '|-b': # pipe to text = True if mode == '|-' else False sp = subprocess.Popen(file, stdin=subprocess.PIPE, shell=need_sh(file), text=text, encoding=encoding, errors=errors) if sp.returncode: raise Die(f"open(|{file}): failed with {sp.returncode}") sp.stdin._sp = sp # issue 72 sp.stdin._file = f"|{file}" # issue 72 return sp.stdin elif mode == '-|' or mode == '-|b': # pipe from text = True if mode == '-|' else False sp = subprocess.Popen(file, stdout=subprocess.PIPE, shell=need_sh(file), text=text, encoding=encoding, errors=errors) if sp.returncode: raise Die(f"open({file}|): failed with {sp.returncode}") sp.stdout._sp = sp # issue 72 sp.stdout._file = f"|{file}" # issue 72 return sp.stdout if file is None: return tempfile.TemporaryFile(mode=mode, encoding=encoding) if os.name == 'nt' and file.startswith('/tmp/'): file = tempfile.gettempdir() + file[4:] if 'b' in mode: newline = None file = file.rstrip("\n\r") return _create_fh_methods(open(file,mode,encoding=encoding,errors=errors,newline=newline)) except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"open({file}, {mode}) failed: {OS_ERROR}",skip=2) if AUTODIE: raise if checked: # e.g. used in if(...) return None fh = io.TextIOWrapper(io.BufferedIOBase()) fh.close() return _create_fh_methods(fh)
[docs]def opendir(DIR): """Implementation of perl opendir""" global OS_ERROR, TRACEBACK, AUTODIE class DirHandle(list): pass try: result = DirHandle([list(os.listdir(DIR)), 0]) result.name = DIR # for stat and friends return result except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"opendir({DIR}) failed: {OS_ERROR}",skip=2) if AUTODIE: raise return None
[docs]def openhandle(fh): """Return the file handle if this is an opened file handle, else return None""" if hasattr(fh, 'closed'): if not fh.closed: return fh return None
[docs]def open_dynamic(file,mode=None,encoding=None,errors=None,checked=True): """Replacement for perl built-in open function when the mode is unknown.""" dup_it = None pipe = None if mode is None: m = re.match(r'^\s*([<>+|-]*)([&]?=?)\s*(.*?)\s*([|]?)\s*$', file) mode = m.group(1) dup_it = m.group(2) file = m.group(3) pipe = m.group(4) elif '&' in mode: # dup dup_it = '&' mode = mode.replace('&', '') if '=' in mode: dup_it = '&=' mode = mode.replace('=', '') if mode == '<-' or mode == '-' or mode == '-<': return sys.stdin if mode == '>-' or mode == '->': return sys.stdout ext = None if ':' in mode: mode, ext = mode.split(':') if mode in _OPEN_MODE_MAP: mode = _OPEN_MODE_MAP[mode] if ext: if ext == 'raw' or ext == 'bytes': mode += 'b' elif ext.startswith('encoding('): encoding = ext.replace('encoding(','').replace(')','') errors = 'replace' elif ext == 'utf8': encoding = 'UTF-8' errors = 'ignore' if dup_it: if '=' in dup_it: return dup(file, mode,encoding=encoding,errors=errors,checked=checked,equals=True) return dup(file, mode,encoding=encoding,errors=errors,checked=checked) return open_(file, mode,encoding=encoding,errors=errors,checked=checked) if pipe: return open_(file, '-|',encoding=encoding,errors=errors,checked=checked) return open_(file, 'r',encoding=encoding,errors=errors,checked=checked)
[docs]def or_element(base, index, value): base[index] |= value return base[index]
[docs]def os_name(): """Implementation of $OSNAME / $^O""" result = sys.platform return 'MSWin32' if result == 'win32' else result
[docs]def overload_Method(obj, op): """Given an object and an operation string, return a reference to the code if it's overloaded, else return None""" key = f"({op}" if hasattr(obj, key): return getattr(obj, key) return None
[docs]def overload_Overloaded(obj): """Given an object, return 1 if it has any overloads defined, else return ''""" for a in dir(obj): if a.startswith('('): # special attribute for overloaded method return 1 return ''
[docs]def overload_StrVal(obj): """Implementation of overload::StrVal($obj)""" cls = ref_scalar(obj) if not cls: return obj if cls == 'ARRAY' or cls == 'HASH' or cls == 'CODE': return f"{cls}(0x{id(obj):x})" cls_type = 'HASH' if hasattr(obj, 'isHash') and not obj.isHash: cls_type = 'ARRAY' return f"{cls}={cls_type}(0x{id(obj):x})"
_PACK_TO_STRUCT = dict(a='s', c='b', C='B', s='h', S='H', l='l', L='L', q='q', Q='Q', i='i', I='I', n='!H', N='!L', v='<H', V='<L', j='i', J='I', f='f', d='d', F='d', x='x') _TEMPLATE_LENGTH = dict(a=1, c=1, C=1, s=2, S=2, l=4, L=4, q=8, Q=8, i=4, I=4, n=2, N=4, v=2, V=4, j=4, J=4, f=4, d=8, F=8, x=1) _decoding_map = codecs.make_identity_dict(range(256)) _encoding_map = codecs.make_encoding_map(_decoding_map) def _str_to_bytes(by): return codecs.charmap_encode(by, 'ignore', _encoding_map)[0] def _bytes_to_str(by): return codecs.charmap_decode(by, 'ignore', _decoding_map)[0] def _get_pack_unpack_format_and_counts(template, args, is_unpack=False): # FIXME: Handle more cases using a custom translator. format_and_counts = [] prefix = '' format = '' i = 0 ndx = 0 typ = 'unpack' if is_unpack else 'pack' len_so_far = 0 prev = 0 while i < len(template): if template[i].isspace(): i += 1 continue if template[i] in _PACK_TO_STRUCT: fmt = _PACK_TO_STRUCT[template[i]] else: raise Die(f'{typ} format {template[i]} is not currently supported') mod = '' cnt = 1 if (_m:=re.match(r'^([!<>]?)((?:(?:\[?(?:(?:\d+)|[*]))\]?)|(?:\[[A-Za-z]\]))?', template[i+1:])): i += _m.end() mod = _m.group(1) if mod is None: mod = '' cnt = _m.group(2) if not cnt: cnt = '1' elif cnt[0] == '[': cnt = cnt[1:-1] if cnt.isdigit(): cnt = int(cnt) elif cnt in _TEMPLATE_LENGTH: cnt = _TEMPLATE_LENGTH[cnt] elif cnt != '*': raise Die(f'{typ} cannot get length of {cnt} template') if mod == '!': if len(fmt) != 1: fmt = fmt.lower() mod = fmt[0] fmt = fmt[1] else: mod = '@' # Native elif len(fmt) != 1: mod = fmt[0] fmt = fmt[1] if cnt == '*': if is_unpack and hasattr(args[0], '__len__'): cnt = len(args[0]) - len_so_far - (struct.calcsize(format) if format else 0) if ndx < len(args) and hasattr(args[ndx], '__len__'): cnt = len(args[ndx]) else: cnt = 1 fmt = f"{cnt}{fmt}" elif len(fmt) != 1: mod = fmt[0] fmt = fmt[1] fmt_code = fmt[-1] if mod == prefix: format += fmt mod = '' fmt = '' elif mod and not prefix: prefix = mod format += fmt mod = '' fmt = '' else: format = f'{prefix}{format}' len_so_far += struct.calcsize(format) format_and_counts.append((format, prev, ndx)) prefix = '' format = '' prev = ndx if fmt_code == 's': if not is_unpack and isinstance(args[ndx], str): args[ndx] = _str_to_bytes(args[ndx]) ndx += 1 else: ndx += cnt i += 1 format = prefix + mod + format + fmt if format: format_and_counts.append((format, prev, ndx)) return format_and_counts
[docs]def pack(template, *args): """pack items into a str via a given format template""" # Look here to handle many cases: https://docs.python.org/3/library/struct.html args = list(args) result = '' format_and_counts = _get_pack_unpack_format_and_counts(template, args) for format, start, end in format_and_counts: result += _bytes_to_str(struct.pack(format, *args[start:end])) return result
[docs]def package_call(package, function, *args, **kwargs): """Call a function in a different package""" cur_package = builtins.__PACKAGE__ try: builtins.__PACKAGE__ = package.__PACKAGE__ return function(*args, **kwargs) finally: builtins.__PACKAGE = cur_package
[docs]def perlio_ok(encoding): """Implementation of Encoding::perl_io_ok - returns 1""" return 1
[docs]def perl_print(*args, **kwargs): """Replacement for perl built-in print/say/warn functions. Note that by default this acts like 'say' in that it appends a newline. To prevent the newline, pass the end='' keyword argument. To write to a different file, pass the file=... keyword argument. To flush the output after writing, pass flush=True. To replace the OUTPUT_FIELD_SEPARATOR, pass sep='...'. It returns 1 if successful""" global OS_ERROR, TRACEBACK, AUTODIE try: file = sys.stdout if 'file' in kwargs: file = kwargs['file'] if file is None: raise Die('print() on unopened filehandle') if 'sep' not in kwargs: kwargs['sep'] = OUTPUT_FIELD_SEPARATOR if 'end' in kwargs: kwargs['end'] += OUTPUT_RECORD_SEPARATOR else: kwargs['end'] = "\n" + OUTPUT_RECORD_SEPARATOR if 'flush' not in kwargs and hasattr(file, '_autoflush'): kwargs['flush'] = file._autoflush try: print(*args, **kwargs) except TypeError as _t: if 'bytes-like' in str(_t): for k in ('sep', 'end'): if k in kwargs: kwargs[k] = bytes(kwargs[k], encoding="latin1", errors="ignore") for i in range(len(args)): a = args[i] file.write(bytes(a, encoding="latin1", errors="ignore")) if i == len(args)-1: if 'end' in kwargs: file.write(kwargs['end']) elif 'sep' in kwargs: file.write(kwargs['sep']) if 'flush' in kwargs and kwargs['flush'] and hasattr(file, 'flush'): file.flush() return 1 # True except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"print failed: {OS_ERROR}",skip=2) if AUTODIE: raise return '' # False
[docs]def postprocess_arguments(parser, parser_rem): """After argument parsing, see if we have any leftover arguments and flag those as errors""" errors = '' for arg in parser_rem: if len(arg) != 0 and arg[0] == '-': errors += f"Unknown option: {re.sub(r'^-*', '', arg)}\n" if errors: if parser.exit_on_error: print(errors, file=sys.stderr, end='') sys.exit(1) raise argparse.ArgumentError(None, errors)
init_package('Getopt.Long')
[docs]def preprocess_arguments(): """Pre-process the command line arguments, changing -option to --option""" for i in range(1, len(sys.argv)): if len(sys.argv[i]) > 2 and sys.argv[i][0] == '-' and sys.argv[i][1] != '-': sys.argv[i] = '-' + sys.argv[i]
[docs]def printf(fh, fmt, *args): """Implementation of perl $fh->printf method""" global OS_ERROR, TRACEBACK, AUTODIE try: print(format_(fmt, *args), end='', file=fh) return 1 # True except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: if isinstance(fmt, str): fmt = fmt.replace("\n", '\\n') cluck(f"printf({fmt},...) failed: {OS_ERROR}",skip=2) if AUTODIE: raise return '' # False
[docs]def quotemeta(string): """Implementation of perl quotemeta - all chars not matching /[A-Za-z_0-9]/ will be preceded by a backslash""" return re.sub(r'([^A-Za-z_0-9])', r'\\\g<1>', string, count=0)
[docs]def raise_(exception): """To raise an exception in a lambda function or expression""" raise exception
[docs]def rand(expr=0): """Implementation of perl rand function""" if expr == 0: expr = 1 return random.random() * expr
[docs]def range_(var, pat1, flags1, pat2, flags2, key): """The line-range operator. See https://perldoc.perl.org/perlop#Range-Operators""" if not hasattr(range_, key): setattr(range_, key, 0) seq = getattr(range_, key) if isinstance(seq, str): # e.g. nnE0 setattr(range_, key, 0) return '' # False if seq == 0: # Waiting for left to become True if isinstance(pat1, str): val = re.search(pat1, var, flags=flags1) else: val = bool(pat1) if not val: return '' # False seq += 1 # once left becomes True, then the seq starts counting, and we check right setattr(range_, key, seq) if isinstance(pat2, str): val = re.search(pat2, var, flags=flags2) else: val = bool(pat2) if val: seq = str(seq)+'E0' # end marker setattr(range_, key, seq) return seq
[docs]def read(fh, var, length, offset=0, need_len=False): """Read length bytes from the fh, and return the result to store in var if need_len is False, else return a tuple with the result and the length read""" global OS_ERROR, TRACEBACK, AUTODIE if var is None: var = '' try: s = fh.read(length) if isinstance(s, bytes): s = str(s, encoding='latin1', errors='ignore') except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"read of {length} byte(s) failed: {OS_ERROR}",skip=2) if AUTODIE: raise if need_len: return (var, None) return var ls = len(s) var = _str(var) lv = len(var) if offset < 0: offset += lv if offset: if isinstance(var, bytes): var = var.decode(encoding='latin1', errors='ignore') if need_len: return (var[:offset] + ('\0' * (offset-lv)) + s, ls) else: return var[:offset] + ('\0' * (offset-lv)) + s if need_len: return (s, ls) return s
[docs]def readdir(DIR): """Implementation of perl readdir in scalar context""" try: result = (DIR[0])[DIR[1]] DIR[1] += 1 return result except IndexError: return None
[docs]def readdirs(DIR): """Implementation of perl readdir in list context""" result = (DIR[0])[DIR[1]:] DIR[1] = len(DIR[0]) return result
[docs]def readline(fh): """Reads a line from a file. (instead use _readline_full if you need support for perl $/ or $.)""" result = fh.readline() if not result: return None return result
[docs]def readline_full(fh, fhname=''): """Reads a line from a file, handles perl $/ and sets $. """ global INPUT_RECORD_SEPARATOR, INPUT_LINE_NUMBER global _INPUT_FH_NAME if INPUT_RECORD_SEPARATOR == "\n": result = fh.readline() if not result: return None elif INPUT_RECORD_SEPARATOR is None: result = fh.read() else: if not hasattr(fh, '_data'): fh._data = fh.read() fh._pos = 0 irs = INPUT_RECORD_SEPARATOR if irs == '': # paragraph mode pos = fh._pos while(fh._data[pos] == "\n"): pos += 1 fh._pos = pos irs = "\n\n" pos = fh._pos ndx = fh._data.index(irs, pos) if ndx < 0: fh._pos = len(fh._data) fh._pos = ndx + len(irs) result = fh._data[pos:fh._pos] if not result: if hasattr(fh, '_data'): del fh._data if hasattr(fh, '_at_eof') and fh._at_eof: return None else: fh._at_eof = True else: fh._at_eof = False if not hasattr(fh, '_lno'): INPUT_LINE_NUMBER = fh._lno = 1 _INPUT_FH_NAME = fhname else: fh._lno += 1 INPUT_LINE_NUMBER = fh._lno return result
[docs]def ref(r): """ref function in perl - called when NOT followed by a backslash""" _ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR', "<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR', "<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY', "<class 'function'>": 'CODE', "<class 'dict'>": 'HASH'} tr = type(r) t = str(tr) if t in _ref_map: return '' elif '_ArrayHash' in t: return '' if isinstance(r, type): # return '' for a class (not a class instance) return '' if hasattr(tr, '__name__'): return tr.__name__.replace('.', '::') return t.replace("<class '", '').replace("'>", '').replace('.', '::')
[docs]def refs(r): """ref function in perl - called when followed by a backslash""" _ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR', "<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR', "<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY', "<class 'function'>": 'CODE', "<class 'dict'>": 'HASH'} t = str(type(r)) if t in _ref_map: return _ref_map[t] elif '_ArrayHash' in t: if r.isHash: return 'HASH' return 'ARRAY' elif hasattr(r, 'TIEARRAY'): return 'ARRAY' elif hasattr(r, 'TIEHASH'): return 'HASH' return ''
[docs]def ref_scalar(r): """ref function in perl - called when being passed a scalar without a backslash""" _ref_map = {"<class 'int'>": '', "<class 'str'>": '', "<class 'float'>": '', "<class 'NoneType'>": '', "<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY', "<class 'function'>": 'CODE', "<class 'dict'>": 'HASH'} tr = type(r) t = str(tr) if t in _ref_map: return _ref_map[t] elif '_ArrayHash' in t: if r.isHash: return 'HASH' return 'ARRAY' if isinstance(r, type): # return '' for a class (not a class instance) return '' if hasattr(tr, '__name__'): return tr.__name__.replace('.', '::') elif hasattr(r, 'TIEARRAY'): return 'ARRAY' elif hasattr(r, 'TIEHASH'): return 'HASH' return t.replace("<class '", '').replace("'>", '').replace('.', '::')
[docs]def reset_each(h_a): """Reset the 'each' iterator on keys/values calls""" key = str(id(h_a)) # Unique memory address of object if hasattr(each, key): delattr(each, key)
[docs]def resolve_alias(encoding): """Implementation of Encode::resolve_alias""" try: info = codecs.lookup(encoding) return info.name except LookupError: return None
[docs]def reverse_scalar(expr): """reverse function implementation in scalar context""" if expr is None: return '' if hasattr(expr, 'isHash'): if expr.isHash: expr = [_item for _k in expr for _item in (_k, expr[_k])] else: return ''.join(expr)[::-1] elif isinstance(expr, collections.abc.Mapping): # flatten hash (dict) expr = [_item for _k in expr for _item in (_k, expr[_k])] if isinstance(expr, collections.abc.Iterable) and not isinstance(expr, str): return ''.join(expr)[::-1] return expr[::-1]
[docs]def rewinddir(DIR): DIR[1] = 0
[docs]def rmdir(d): """Implementation of perl rmdir""" global AUTODIE, TRACEBACK, OS_ERROR try: os.rmdir(d) return 1 except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(OS_ERROR,skip=2) if AUTODIE: raise return ''
[docs]def run(*args): """Execute a command and return the stdout in list context""" global CHILD_ERROR, AUTODIE, TRACEBACK, INPUT_RECORD_SEPARATOR, TRACE_RUN if len(args) == 1: args = args[0] try: if os.name == 'nt': if isinstance(args, str): args = [args] arg_split = args[0].split()[0] if arg_split.endswith('.py'): args = [sys.executable] + args elif arg_split.endswith('.pl'): args = ['perl'] + args newargs = [] for arg in args: if '"' in arg or "'" in arg: newargs.append(arg) else: arg_split = arg.split() newargs.extend(arg_split) args = newargs if len(args) == 1: args = args[0] sp = subprocess.run(args,stdin=sys.stdin,capture_output=True,text=True,shell=need_sh(args)) except FileNotFoundError: # can happen on windows if shell=False sp = subprocess.CompletedProcess(args, -1) if TRACE_RUN: carp(f'trace run({args}): {repr(sp)}', skip=2) CHILD_ERROR = -1 if sp.returncode == -1 else ((sp.returncode<<8) if sp.returncode >= 0 else -sp.returncode) if CHILD_ERROR: if AUTODIE: raise Die(f'run({args}): failed with rc {CHILD_ERROR}') if TRACEBACK: cluck(f'run({args}): failed with rc {CHILD_ERROR}',skip=2) if INPUT_RECORD_SEPARATOR is None: return sp.stdout irs = INPUT_RECORD_SEPARATOR pos = 0 if irs == '': # paragraph mode while(sp.stdout[pos] == "\n"): pos += 1; irs = "\n\n" arr = sp.stdout[pos:].split(irs) if arr[-1] == '': arr = arr[:-1] return [line + irs for line in arr]
[docs]def run_s(*args): """Execute a command and return the stdout in scalar context""" global CHILD_ERROR, AUTODIE, TRACEBACK, TRACE_RUN if len(args) == 1: args = args[0] try: if os.name == 'nt': if isinstance(args, str): args = [args] arg_split = args[0].split()[0] if arg_split.endswith('.py'): args = [sys.executable] + args elif arg_split.endswith('.pl'): args = ['perl'] + args newargs = [] for arg in args: if '"' in arg or "'" in arg: newargs.append(arg) else: arg_split = arg.split() newargs.extend(arg_split) args = newargs if len(args) == 1: args = args[0] sp = subprocess.run(args,stdin=sys.stdin,capture_output=True,text=True,shell=need_sh(args)) except FileNotFoundError: # can happen on windows if shell=False sp = subprocess.CompletedProcess(args, -1) if TRACE_RUN: carp(f'trace run({args}): {repr(sp)}', skip=2) CHILD_ERROR = -1 if sp.returncode == -1 else ((sp.returncode<<8) if sp.returncode >= 0 else -sp.returncode) if CHILD_ERROR: if AUTODIE: raise Die(f'run({args}): failed with rc {CHILD_ERROR}') if TRACEBACK: cluck(f'run({args}): failed with rc {CHILD_ERROR}',skip=2) return sp.stdout
[docs]def say(fh, *args): """Implementation of perl $fh->say method""" global OS_ERROR, TRACEBACK, AUTODIE try: print(*args, file=fh) return 1 # True except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"say failed: {OS_ERROR}",skip=2) if AUTODIE: raise return '' # False
[docs]def seek(fh, pos, whence): """Implementation of perl seek""" global OS_ERROR, TRACEBACK, AUTODIE try: return fh.seek(pos, whence) return 1 # True except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"seek({pos},{whence}) failed: {OS_ERROR}",skip=2) if AUTODIE: raise return None
[docs]def seekdir(DIR, pos): DIR[1] = pos
[docs]def select(fh): """Implementation of perl select function""" result = sys.stdout sys.stdout = fh return result
[docs]def setpos(fh, off): """Implementation of perl $fh->setpos method""" global OS_ERROR, TRACEBACK, AUTODIE try: fh.seek(off, os.SEEK_SET) return 1 # True except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"setpos({off}) failed: {OS_ERROR}",skip=2) if AUTODIE: raise return '' # False
[docs]def set_breakpoint(): """Sets a debugger breakpoint, but only if pdb is active, mimicking $DB::single""" if 'pdb' in sys.modules: import pdb pdb.set_trace()
[docs]def set_element(base, index, value): """Implementation of perl = on an array element or hash key""" base[index] = value return value
[docs]def set_last_ndx(arr, ndx): """Implementation of assignment to perl array last index $#array""" del arr[ndx+1:] for _ in range((ndx+1)-len(arr)): arr.append(None)
[docs]def set_signal(sig, handler): """Set a signal handler to either a code reference or a string containing a perl code name or IGNORE or DEFAULT""" sig = num(sig) if callable(handler): signal.signal(sig, handler) elif handler == 'IGNORE': signal.signal(sig, signal.SIG_IGN) elif handler == 'DEFAULT': signal.signal(sig, signal.SIG_DFL) elif isinstance(handler, str): handler = handler.replace('::', '.').replace("'", '.') rdot = handler.rfind('.') if rdot == -1: pkg = getattr(builtins, '__PACKAGE__') fun = handler else: pkg = handler[0:rdot] fun = handler[rdot+1:] if hasattr(builtins, pkg): namespace = getattr(builtins, pkg) if hasattr(namespace, fun): signal.signal(sig, getattr(namespace, fun)) return def error_handler(sno, frm): warn(f'{signal.Signals(sno).name} handler "{handler}" not defined.\n') signal.signal(sig, error_handler) else: def bad_handler(sno, frm): warn(f"{signal.Signals(sno).name} handler invalid: {handler}.\n") signal.signal(sig, bad_handler)
[docs]def shift_left_element(base, index, value): base[index] <<= value return base[index]
[docs]def shift_right_element(base, index, value): base[index] >>= value return base[index]
[docs]def shortmess(*args, skip=0): """Message with no backtrace""" def ff(fn): fn = os.path.relpath(fn) if fn.startswith('./'): return fn[2:] return fn stack = inspect.stack() stack = stack[skip:] m = ''.join(map(str, args)) m += ' at ' + ff(stack[1].filename) + ' line ' + str(stack[1].lineno) + ".\n" return m
[docs]def smartmatch(left, right): """Implement the perl smartmatch (~~) operator""" if right is None: return 1 if left is None else '' if hasattr(left, '__smartmatch__'): return left.__smartmatch__(right) if hasattr(right, '__rsmartmatch__'): return right.__rsmartmatch__(left) if (hasattr(right, 'isHash') and right.isHash) or (not hasattr(right, 'isHash') and isinstance(right, collections.abc.Mapping)): if (hasattr(left, 'isHash') and left.isHash) or (not hasattr(left, 'isHash') and isinstance(left, collections.abc.Mapping)): return smartmatch(sorted(left.keys()), sorted(right.keys())) elif isinstance(left, collections.abc.Iterable) and not isinstance(left, str): return 1 if any(i in right for i in left) else '' elif isinstance(left, re.Pattern): return 1 if any(re.search(left, _str(i)) for i in right.keys()) else '' else: return 1 if left in right.keys() else '' elif isinstance(right, collections.abc.Iterable) and not isinstance(right, str): if (hasattr(left, 'isHash') and left.isHash) or (not hasattr(left, 'isHash') and isinstance(left, collections.abc.Mapping)): return 1 if any(i in left for i in right) else '' elif isinstance(left, collections.abc.Iterable) and not isinstance(left, str): ll = list(left) lll = len(ll) lr = list(right) if lll != len(lr): return '' for i in range(lll): if not smartmatch(ll[i], lr[i]): return '' return 1 elif isinstance(left, re.Pattern): return 1 if any(re.search(left, _str(i)) for i in right) else '' elif all(isinstance(i, (int, float, str)) for i in right): return 1 if left in right else '' else: return 1 if any(left == i for i in right) else '' elif callable(right): if (hasattr(left, 'isHash') and left.isHash) or (not hasattr(left, 'isHash') and isinstance(left, collections.abc.Mapping)): return 1 if all(right(i) for i in left.keys()) else '' elif isinstance(left, collections.abc.Iterable) and not isinstance(left, str): return 1 if all(right(i) for i in left) else '' return 1 if right(left) else '' elif isinstance(right, re.Pattern): if (hasattr(left, 'isHash') and left.isHash) or (not hasattr(left, 'isHash') and isinstance(left, collections.abc.Mapping)): return 1 if any(re.search(right, i) for i in left.keys()) else '' elif isinstance(left, collections.abc.Iterable) and not isinstance(left, str): return 1 if any(re.search(right, _str(i)) for i in left) else '' else: return 1 if re.search(right, _str(left)) else '' elif isinstance(right, (int, float)): return 1 if num(left) == right else '' elif isinstance(left, (int, float)): return 1 if left == num(right) else '' elif isinstance(right, str): return 1 if left == right else '' elif left is None: return 1 if right is None else '' else: return 1 if left == right else ''
[docs]def spaceship(a,b): """3-way comparison like the <=> operator in perl""" if hasattr(a, '__spaceship__'): return a.__spaceship__(b) if hasattr(b, '__rspaceship__'): return b.__rspaceship__(a) return (a > b) - (a < b)
[docs]def splice(array, *args): """Implementation of splice function""" offset = 0; if len(args) >= 1: offset = args[0] length = len(array) if len(args) >= 2: length = args[1] if offset < 0: offset += len(array) total = offset + length if length < 0: total = length removed = array[offset:total] array[offset:total] = args[2:] return removed
[docs]def splice_s(array, *args): """Implementation of splice function in scalar context""" offset = 0; if len(args) >= 1: offset = args[0] length = len(array) if len(args) >= 2: length = args[1] if offset < 0: offset += len(array) total = offset + length if length < 0: total = length removed = array[offset:total] array[offset:total] = args[2:] if not removed: return None return removed[-1]
[docs]def split(pattern, string, maxsplit=0, flags=0): """Split function in perl is similar to re.split but not quite the same - this function makes it the same""" result = re.split(pattern, string, max(0, maxsplit), flags) if len(result) >= 1 and result[0] == '' and (m:=re.match(pattern, string, flags)) and len(m.group(0)) == 0: result = result[1:] # A zero-width match at the beginning of EXPR never produces an empty field if maxsplit >= -1: # We subtracted one from what the user specifies limit = len(result) # Empty results at the end are eliminated for i in range(limit-1, -1, -1): if result[i] == '': limit -= 1 else: break return result[:limit] return result
[docs]def splitdir(*_args): """Implementation of File::Spec->splitdir""" return split(r"/", _str(_args[0]), -1 - 1) # Preserve trailing fields
[docs]def splitpath(*_args): """Implementation of File::Spec->splitpath""" [path, nofile] = list_of_n(_args, 2) [volume, directory, file] = ("", "", "") if nofile: directory = path else: _m = re.search( re.compile(r"^ ( (?: .* / (?: \.\.?\Z )? )? ) ([^/]*) ", re.X | re.S), _str(path) ) directory = _m.group(1) file = _m.group(2) return [volume, directory, file]
[docs]def split_s(pattern, string, maxsplit=0, flags=0): """Split function in perl is similar to re.split but not quite the same - this is the version used in scalar context""" result = re.split(pattern, string, max(0, maxsplit), flags) if len(result) >= 1 and result[0] == '' and (m:=re.match(pattern, string, flags)) and len(m.group(0)) == 0: result = result[1:] # A zero-width match at the beginning of EXPR never produces an empty field if maxsplit >= -1: # We subtracted one from what the user specifies limit = len(result) # Empty results at the end are eliminated for i in range(limit-1, -1, -1): if result[i] == '': limit -= 1 else: break return limit return len(result)
[docs]def stat_cando(self, mode, eff): """Implementation of File::Stat::stat_cando. This takes an arrayref containing the return values of stat or lstat as its first argument, and interprets it for you""" if os.name == 'nt': if (self._mode & mode): return 1 # True return '' # False uid = os.geteuid() if eff else os.getuid() def _ingroup(gid, eff): rgid = os.getgid() egid = os.getegid() supp = os.getgroups() if gid == (egid if eff else rgid): return 1 # True if gid in supp: return 1 # True return '' # False if uid == 0 or (sys.platform == 'cygwin' and _ingroup(544, eff)): # Root if not (mode & 0o111): return 1 # Not testing for executable: all file tests are true if (self._mode & 0o111) or st_py.S_ISDIR(self._mode): return 1 # True return '' # False if self._uid == uid: if (self._mode & mode): return 1 # True elif _ingroup(self._gid, eff): if (self._mode & (mode >> 3)): return 1 # True else: if (self._mode & (mode >> 6)): return 1 # True return '' # False
[docs]@dataclasses.dataclass class File_stat(collections.abc.Sequence): _dev: int _ino: int _mode: int _nlink: int _uid: int _gid: int _rdev: int _size: int _atime: int _mtime: int _ctime: int _blksize: int _blocks: int _item_map = {0:'_dev', 1:'_ino', 2:'_mode', 3:'_nlink', 4:'_uid', 5:'_gid', 6:'_rdev', 7:'_size', 8:'_atime', 9:'_mtime', 10:'_ctime', 11:'_blksize', 12:'_blocks'}
[docs] def dev(self): return self._dev
[docs] def ino(self): return self._ino
[docs] def mode(self): return self._mode
[docs] def uid(self): return self._uid
[docs] def gid(self): return self._gid
[docs] def rdev(self): return self._rdev
[docs] def size(self): return self._size
[docs] def atime(self): return self._atime
[docs] def mtime(self): return self._mtime
[docs] def ctime(self): return self._ctime
[docs] def blksize(self): return self._blksize
[docs] def blocks(self): return self._blocks
def __len__(self): return len(self._item_map) def __getitem__(self, index): if isinstance(index, slice): return [self[i] for i in range(*index.indices(len(self)))] if index < 0: index += len(self) try: return getattr(self, self._item_map[index]) except KeyError: raise IndexError('File_stat index out of range') def __contains__(self, item): return item in self._item_map
[docs] def cando(self, mode, eff): return stat_cando(self, mode, eff)
[docs] def get(self, index, default=None): try: return self[index] except IndexError: return default
[docs]def stat(path): """Handle stat call with or without "use File::stat;" """ if isinstance(path, File_stat): return path # for '_' special variable try: if hasattr(path, 'fileno') and os.stat in os.supports_fd: path = path.fileno() elif hasattr(path, 'name'): path = path.name s = os.stat(path) except Exception: return () result = File_stat(_dev=s.st_dev, _ino=s.st_ino, _mode=s.st_mode, _nlink=s.st_nlink, _uid=s.st_uid, _gid=s.st_gid, _rdev=s.st_rdev if hasattr(s, 'st_rdev') else 0, _size=s.st_size, _atime=s.st_atime, _mtime=s.st_mtime, _ctime=s.st_ctime, _blksize=s.st_blksize if hasattr(s, 'st_blksize') else 512, _blocks=s.st_blocks if hasattr(s, 'st_blocks') else s.st_size // 512) return result
[docs]def store_out_parameter(arglist, arg, value, shifts=0): """Store the value of a sub out parameter both in the arglist and in a location where _fetch_out_parameter can retrieve it after the sub returns. arg is the argument index, starting at 0, and value is the value to be stored. shifts specifies the number of shift operations that have been performed on the arglist. Returns the value.""" if arglist is not None: arglist[arg] = value setattr(builtins, f"__outp{arg+shifts}__", value) return value
[docs]def store_perl_global(perlname, value, infer_suffix=False, method_type=False): """Assigns a value to a package global variable specified by it's perl name and returns the value. Optional keyword argument infer_suffix will map the variable's suffix based on the type of the value, e.g. it will add _h for a hash. Optional keyword argument method_type will set this to a MethodType if True, or will check if the name is 'new' or 'make' and set this to a MethodType if None""" (packname, varname) = perlname.rsplit('::', maxsplit=1) packname = packname.replace('::', '.') if packname == '': packname = 'main' if callable(value) and (method_type or (method_type is None and (varname == 'new' or varname == 'make'))) and \ ((not hasattr(builtins, packname)) or (not isinstance(getattr(builtins, packname), type))): init_package(packname, is_class=True) elif not hasattr(builtins, packname): init_package(packname) namespace = getattr(builtins, packname) if infer_suffix: if isinstance(value, (str, float, int, bytes)): varname += '_v' elif hasattr(value, 'isHash'): if value.isHash: varname += '_h' else: varname += '_a' elif isinstance(value, collections.abc.Mapping): varname += '_h' elif isinstance(value, collections.abc.Iterable): varname += '_a' if callable(value) and (method_type or (method_type is None and (varname == 'new' or varname == 'make'))): value = types.MethodType(value, namespace) if varname == '' or varname == '_h': # namespace dictionary namespace.__dict__ = value else: setattr(namespace, varname, value) if varname in _PYTHONIZER_KEYWORDS: varname += '_' setattr(namespace, varname, value) return value
[docs]def store_perl_meta(perlname, value, infer_suffix=False): """Assigns a value to a package meta variable specified by it's perl name and returns the value. Optional keyword argument infer_suffix will map the variable's suffix based on the type of the value, e.g. it will add _h for a hash. This is used for tie ${"${pkg}::$scalarname"}""" (packname, varname) = perlname.rsplit('::', maxsplit=1) packname = packname.replace('::', '.') if packname == '': packname = 'main' if infer_suffix: if isinstance(value, (str, float, int, bytes)): varname += '_v' elif hasattr(value, 'isHash'): if value.isHash: varname += '_h' else: varname += '_a' elif isinstance(value, collections.abc.Mapping): varname += '_h' elif isinstance(value, collections.abc.Iterable): varname += '_a' assign_meta(packname, varname, value) if varname in _PYTHONIZER_KEYWORDS: varname += '_' assign_meta(packname, varname, value) return value
[docs]def strftime(fmt, sec, min=None, hour=None, mday=None, mon=None, year=None, wday=0, yday=0, isdst=0): """Implementation of perl strftime""" if min is None: min = sec[1] hour = sec[2] mday = sec[3] mon = sec[4] year = sec[5] sec = sec[0] tl = timelocal(sec, min, hour, mday, mon, year) return tm_py.strftime(fmt, tm_py.localtime(tl))
[docs]def sub(): """Implementation of __SUB__ in perl""" try: frame = sys._getframe(1) name = frame.f_code.co_name return frame.f_globals[name] except Exception: return None
[docs]def subname(code): """Implementation of Sub::Util::subname""" def unescape(name): ns = name.split('.') result = [] for n in ns: if n[-1] == '_' and n[:-1] in _PYTHONIZER_KEYWORDS: result.append(n[:-1]) else: result.append(n) return '.'.join(result) name = unescape(code.__name__) if hasattr(builtins.main, name) and getattr(builtins.main, name) == code: return f"main::{name}" if hasattr(code, '__self__') and hasattr(code.__self__, '__PACKAGE__'): package = unescape(code.__self__.__PACKAGE__) return f"{package.replace('.', '::')}::{name}" for packagename in vars(builtins): namespace = getattr(builtins, packagename) if isinstance(namespace, type) or isinstance(namespace, types.SimpleNamespace): if hasattr(namespace, name): cd = getattr(namespace, name) if cd == code: return f"{unescape(packagename).replace('.', '::')}::{name}" if hasattr(cd, '__func__'): cd = cd.__func__ if cd == code or (hasattr(code, '__func__') and cd == code.__func__): return f"{unescape(packagename).replace('.', '::')}::{name}" return f"main::{name}"
[docs]def substitute_and_count(this, that, var, replace=True, count=0): """Perform a re substitute, but also count the # of matches""" (result, ctr) = re.subn(this, that, var, count=count) if not replace: return (var, result) return (result, ctr)
[docs]def substitute_element(base, index, this, that, count=0, replace=True): """Perform a re substitution on an array element or hash value, and also count the # of matches""" (result, ctr) = re.subn(this, that, _str(base[index]), count=count) if replace: base[index] = result return ctr return result
[docs]def substitute_global(packname, varname, this, that, replace=True, count=0): """Perform a re substitute on a global, and also count the # of matches""" namespace = getattr(builtins, packname) var = _str(getattr(namespace, varname)) (result, ctr) = re.subn(this, that, var, count=count) if replace: setattr(namespace, varname, result) return ctr return result
[docs]def substr(this, start, length, replacement): """Handle substr with replacement - returns a tuple with (new_this, chars_removed)""" chars_removed = this[start:start+length] new_this = this[:start] + replacement + this[start+length:] return (new_this, chars_removed)
[docs]def subtract_element(base, index, value): """Implementation of -= on an array element""" try: base[index] -= value except TypeError: if isinstance(value, int) or isinstance(value, float): base[index] = num(base[index]) - value elif value is not None: raise return base[index]
[docs]def switch(s_val): """Implementation of switch/given statement in perl. This returns a function that is called for each case.""" def iter_to_bool(i): try: v = next(i) return True except StopIteration: return False if hasattr(s_val, '__rsmartmatch__'): def o_switch(c_val): return s_val.__rsmartmatch__(c_val) return o_switch elif callable(s_val): def f_switch(c_val): if callable(c_val): return s_val == c_val if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str): return s_val(*c_val) return s_val(c_val) return f_switch elif isinstance(s_val, int) or isinstance(s_val, float): def n_switch(c_val): if hasattr(c_val, '__smartmatch__'): return c_val.__smartmatch__(s_val) if isinstance(c_val, int) or isinstance(c_val, float): return s_val == c_val if callable(c_val): return c_val(s_val) if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str): if hasattr(c_val, 'isHash') and c_val.isHash: return str(s_val) in c_val return s_val in c_val if isinstance(c_val, re.Pattern): return re.search(c_val, str(s_val)) if isinstance(c_val, dict): return str(s_val) in c_val return str(s_val) == str(c_val) return n_switch elif isinstance(s_val, str): def s_switch(c_val): if hasattr(c_val, '__smartmatch__'): return c_val.__smartmatch__(s_val) if (isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str)) or isinstance(c_val, dict): return s_val in c_val # list, Array, or Hash if callable(c_val): return c_val(s_val) if isinstance(c_val, re.Pattern): return re.search(c_val, s_val) return s_val == str(c_val) return s_switch elif isinstance(s_val, dict) and (not hasattr(s_val, 'isHash') or s_val.isHash): def h_switch(c_val): if hasattr(c_val, '__smartmatch__'): return c_val.__smartmatch__(s_val) if isinstance(c_val, dict) and (not hasattr(c_val, 'isHash') or c_val.isHash): return s_val == c_val if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str): return iter_to_bool(filter(lambda _d: _d in s_val and s_val[_d], c_val)) if callable(c_val): return c_val(s_val) if isinstance(c_val, re.Pattern): return iter_to_bool(filter(lambda _d: re.search(c_val, _d) and _d in s_val and s_val[_d], s_val.keys())) return str(c_val) in s_val and s_val[str(c_val)] return h_switch elif isinstance(s_val, collections.abc.Iterable) and (not hasattr(s_val, 'isHash') or not s_val.isHash): def a_switch(c_val): if hasattr(c_val, '__smartmatch__'): return c_val.__smartmatch__(s_val) if isinstance(c_val, dict) and (not hasattr(c_val, 'isHash') or c_val.isHash): return iter_to_bool(filter(lambda _d: str(_d) in c_val and c_val[str(_d)], s_val)) if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str): for item in s_val: if item not in c_val: return False return True if callable(c_val): return c_val(*(map(str, s_val))) if isinstance(c_val, re.Pattern): return iter_to_bool(filter(lambda _d: re.search(c_val, _d), map(str, s_val))) return c_val in s_val return a_switch elif isinstance(s_val, re.Pattern): def r_switch(c_val): if hasattr(c_val, '__smartmatch__'): return c_val.__smartmatch__(s_val) if isinstance(c_val, dict) and (not hasattr(c_val, 'isHash') or c_val.isHash): return iter_to_bool(filter(lambda _d: re.search(s_val, _d) and c_val[_d], c_val.keys())) if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str): return iter_to_bool(filter(lambda _d: re.search(s_val, _d), map(str, c_val))) if callable(c_val): return c_val(s_val) if isinstance(c_val, re.Pattern): return s_val.pattern == c_val.pattern return re.search(s_val, str(c_val)) return r_switch else: def n_switch(c_val): if hasattr(c_val, '__smartmatch__'): return c_val.__smartmatch__(s_val) return False return n_switch
[docs]def sysread(fh, var, length, offset=0, need_len=False): """Read length bytes from the fh, and return the result to store in var if need_len is False, else return a tuple with the result and the length read. For compatability with perl, the result is returned as a str, not bytes.""" global OS_ERROR, TRACEBACK, AUTODIE if var is None: var = '' try: s = str(os.read(fh.fileno(), length), encoding='latin1', errors='ignore') except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"sysread of {length} bytes failed: {OS_ERROR}",skip=2) if AUTODIE: raise if need_len: return (var, None) return var ls = len(s) lv = len(var) if offset < 0: offset += lv if offset: if need_len: return (var[:offset] + ('\0' * (offset-lv)) + s, ls) else: return var[:offset] + ('\0' * (offset-lv)) + s if need_len: return (s, ls) return s
[docs]def sysseek(fh, pos, how=os.SEEK_SET): """Implementation of perl sysseek""" return os.lseek(fh.fileno(), pos, how)
[docs]def system(*args): """Execute a command and return the return code""" global CHILD_ERROR, AUTODIE, TRACEBACK, TRACE_RUN if len(args) == 1: args = args[0] try: sp = subprocess.run(args,text=True,stdin=sys.stdin,stdout=sys.stdout,stderr=sys.stderr,shell=need_sh(args)) except FileNotFoundError: # can happen on windows if shell=False sp = subprocess.CompletedProcess(args, -1) except OSError: # check if we're trying to run a perl or python script on Windows if isinstance(args, str): args = [args] arg_split = args[0].split()[0] if arg_split.endswith('.py'): args = [sys.executable] + args elif arg_split.endswith('.pl'): args = ['perl'] + args else: raise sp = subprocess.run(args,text=True,stdin=sys.stdin,stdout=sys.stdout,stderr=sys.stderr,shell=need_sh(args)) if TRACE_RUN: carp(f'trace system({args}): {repr(sp)}', skip=2) CHILD_ERROR = -1 if sp.returncode == -1 else ((sp.returncode<<8) if sp.returncode >= 0 else -sp.returncode) if CHILD_ERROR: if AUTODIE: raise Die(f'system({args}): failed with rc {CHILD_ERROR}') if TRACEBACK: cluck(f'system({args}): failed with rc {CHILD_ERROR}',skip=2) return CHILD_ERROR
[docs]def syswrite(fh, scalar, length=None, offset=0): """Implementation of perl syswrite""" if length is None and hasattr(scalar, 'len'): length = len(scalar)-offset if isinstance(scalar, str): return os.write(fh.fileno(), scalar[offset:length+offset].encode()) elif isinstance(scalar, bytes): return os.write(fh.fileno(), scalar[offset:length+offset]) else: return os.write(fh.fileno(), str(scalar).encode())
[docs]def tell(fh): """Implementation of perl tell""" global OS_ERROR, TRACEBACK, AUTODIE try: return fh.tell() except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"tell failed: {OS_ERROR}",skip=2) if AUTODIE: raise return -1
[docs]def telldir(DIR): return DIR[1]
[docs]def tempdir(*args): """Implementation of File::Temp::tempdir()""" template=None options={} start = 0 if len(args) % 2 == 1: template = args[0] start = 1 for i in range(start, len(args), 2): options[args[i]] = args[i+1] dirn=None base=None if template: template = template.replace('X', '') (base, dirn, tail) = fileparse(template) if 'DIR' in options: dirn = options['DIR'] return tempfile.mkdtemp(prefix=base, dir=dirn)
[docs]def tempfile_(*args): """Implementation of File::Temp::tempfile() in list context""" template=None options={} start = 0 if len(args) % 2 == 1: template = args[0] start = 1 for i in range(start, len(args), 2): options[args[i]] = args[i+1] dirn=None base=None suffix=None unlink=True if 'TEMPLATE' in options: template = options['TEMPLATE'] if template: template = template.replace('X', '') (base, dirn, tail) = fileparse(template) if 'SUFFIX' in options: suffix = options['SUFFIX'] if 'DIR' in options: dirn = options['DIR'] if 'UNLINK' in options: unlink = options['UNLINK'] fh = tempfile.NamedTemporaryFile(prefix=base, dir=dirn, suffix=suffix, delete=unlink) def filename(fh): return fh._name fh._name = fh.name fh.filename = types.MethodType(filename, fh) return (fh, fh.name)
[docs]def tempfile_s(*args): """Implementation of File::Temp::tempfile() in scalar context""" (fh, _) = tempfile_(*args) return fh
[docs]def tempnam(template, suffix): """Implementation of File::Temp::tempnam()""" template = template.replace('X', '') (base, dirn, tail) = fileparse(template) (fh, name) = tempfile.mkstemp(prefix=base, dir=dirn, suffix=suffix) fh.close() return name
[docs]def tie_call(func, _args, _kwargs=None): """Call a function in a package that uses TIEARRAY or TIEHASH. This is an internal routine whose call is automatically generated.""" # This is needed to "undo" what _add_tie_methods does to a class that has TIEARRAY or TIEHASH. # That function creates a subclass which is used as the object's class returned from TIEARRAY or # TIEHASH which defines the python special methods like __getitem__ and __setitem__ to call # FETCH and STORE respectively. However, inside the class that defines TIEARRAY or TIEHASH, # we don't want indexing and other basic operations to call these special methods, so we temporarily # change the type of the object to it's base type, then do the call, then restore it. if _kwargs is None: _kwargs = dict() try: self = _args[0] tie_class = self.__class__ orig_class = tie_class.__bases__[0] if(hasattr(orig_class, '__TIE_subclass__')): self.__class__ = orig_class except Exception: # e.g. we have no args, or self is a string or something return func(*_args, **_kwargs) try: # Call the function with the class of 'self' reset to the parent class # which doesn't have __getitem__ etc defined return func(*_args, **_kwargs) finally: try: self.__class__ = tie_class except Exception: # e.g. self is a string or something pass
[docs]def time(): """ Replacement for perl built-in time function""" return (tm_py.time_ns() // 1000000000)
[docs]def timegm(sec, min, hour, mday, mon, year, wday=0, yday=0, isdst=0): """Replacement for perl built-in timegm function""" if year < 1900: year += 1900 return calendar.timegm((year, mon+1, mday, hour, min, sec, 0, 1, -1))
[docs]def timelocal(sec, min, hour, mday, mon, year, wday=0, yday=0, isdst=0): """Replacement for perl built-in timelocal function""" if year < 1900: year += 1900 try: return tm_py.mktime((year, mon+1, mday, hour, min, sec, 0, 1, -1)) except Exception: try: import datetime diff = datetime.datetime(year, mon+1, mday, hour, min, sec) - datetime.datetime.fromtimestamp(0) return diff.total_seconds() except Exception: return 999999999999
[docs]def tmpfile(): """Implementation of File::Temp tmpfile()""" return tempfile.TemporaryFile()
[docs]def tmpnam(): """Implementation of POSIX tmpnam() in list context""" ntf = tempfile.NamedTemporaryFile(delete=False) return (ntf, ntf.name)
[docs]def tmpnam_s(): """Implementation of POSIX tmpnam() in scalar context""" ntf = tempfile.NamedTemporaryFile(delete=False) result = ntf.name ntf.close() return result
[docs]def translate(table, var, replace=True, complement=False, delete=False, squash=False): """Perform a tr translate operation""" result = [] pv = None for ch in var: if ord(ch) > 256 and complement: ch = chr(256) try: v = table[ord(ch)] except LookupError: v = ch pv = None if v is not None: if isinstance(v, int): v = chr(v) if pv != v or not squash: result.append(v) pv = v return ''.join(result)
[docs]def translate_and_count(table, var, replace=True, complement=False, delete=False, squash=False): """Perform a tr translate, but also count the # of matches""" result = [] ctr = 0; pv = None for ch in var: if ord(ch) > 256 and complement: ch = chr(256) try: v = table[ord(ch)] ctr += 1 except LookupError: v = ch pv = None if v is not None: if isinstance(v, int): v = chr(v) if pv != v or not squash: result.append(v) pv = v if not replace: return (var, ''.join(result)) return (''.join(result), ctr)
[docs]def translate_element(base, index, table, replace=True, complement=False, delete=False, squash=False): """Perform a tr translate on an array element, and also count the # of matches""" result = [] ctr = 0; var = _str(base[index]) pv = None for ch in var: if ord(ch) > 256 and complement: ch = chr(256) try: v = table[ord(ch)] ctr += 1 except LookupError: v = ch pv = None if v is not None: if isinstance(v, int): v = chr(v) if pv != v or not squash: result.append(v) pv = v if replace: base[index] = ''.join(result) return ctr return ''.join(result)
[docs]def translate_global(packname, varname, table, replace=True, complement=False, delete=False, squash=False): """Perform a tr translate on a global, and also count the # of matches""" result = [] ctr = 0; pv = None namespace = getattr(builtins, packname) var = _str(getattr(namespace, varname)) for ch in var: if ord(ch) > 256 and complement: ch = chr(256) try: v = table[ord(ch)] ctr += 1 except LookupError: v = ch pv = None if v is not None: if isinstance(v, int): v = chr(v) if pv != v or not squash: result.append(v) pv = v if replace: setattr(namespace, varname, ''.join(result)) return ctr return ''.join(result)
[docs]def truncate(fh, length): """Implementation of perl $fh->truncate method""" global OS_ERROR, TRACEBACK, AUTODIE try: if hasattr(fh, 'flush'): fh.flush() if hasattr(fh, 'truncate'): fh.truncate(length) else: if hasattr(fh, 'fileno'): fh = fh.fileno() os.truncate(fh, length) return 1 # True except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: if isinstance(fh, str): cluck(f"truncate({fh}, {length}) failed: {OS_ERROR}",skip=2) else: cluck(f"truncate to {length} failed: {OS_ERROR}",skip=2) if AUTODIE: raise return None
[docs]def ucfirst(string): """Implementation of ucfirst and \ u in interpolated strings: uppercase the first char of the given string""" return string[0:1].upper() + string[1:]
[docs]def unassign_meta(packname, varname): """Unassigns a variable in the metaclass of a package global variable. This is use for untie $scalar""" namespace = getattr(builtins, packname) if not isinstance(namespace, type): return meta = namespace.__class__ try: delattr(meta, varname) except AttributeError: pass
[docs]def ungetc(fh, ordinal): """Implementation of perl $fh->ungetc method""" # We only support putting back what was there after a getc if hasattr(fh, "_last_pos"): # Set by _getc fh.seek(fh._last_pos, 0) ch = fh.read(1) if ch == chr(ordinal): fh.seek(fh._last_pos, 0) delattr(fh, "_last_pos") return else: fh.seek(0, 2) raise NotImplementedError
[docs]def unpack(template, bytestr): """Unpack bytestr using the template and return a list of values""" if isinstance(bytestr, str): bytestr = _str_to_bytes(bytestr) result = [] format_and_counts = _get_pack_unpack_format_and_counts(template, (bytestr,), is_unpack=True) start = 0 for format, _, _ in format_and_counts: size = struct.calcsize(format) result.extend(struct.unpack(format, bytestr[start:start+size])) start += size for i, r in enumerate(result): if isinstance(r, bytes): result[i] = _bytes_to_str(r) return result
[docs]def updir(): """Implementation of File::Spec->updir""" return '..'
[docs]def utf8_decode(s): """Implementation of utf8::decode""" try: return (str(s).encode('latin-1').decode(), 1) except Exception: return (str(s).encode('latin-1').decode(errors='ignore'), '')
[docs]def utf8_downgrade(s, fail_ok=False): """Implementation of utf8::downgrade. Returns a tuple of string and success""" if fail_ok: try: result = str(s) return (result, 1) except Exception: return (s, 0) return (str(s), 1)
[docs]def utf8_encode(s): """Implementation of utf8::encode""" return (str(s).encode().decode('latin-1'), None)
[docs]def utf8_is_utf8(s): """Implementation of utf8::is_utf8""" try: s = str(s) if s.isascii(): return '' s.encode() except Exception: return '' # if it looks like raw utf8, then it's not utf8 encoded i = 0; in_byte = 0 while i < len(s): c = ord(s[i]) if in_byte: if c < 0x80 or c > 0xbf: return 1 in_byte -= 1 else: if c < 0x80: pass elif c >= 0xc0 and c <= 0xdf: # 2-byte in_byte = 1 elif c >= 0xe0 and c <= 0xef: # 3-byte in_byte = 2 elif c >= 0xf0 and c <= 0xff: # 4-byte in_byte = 3 else: return 1 i += 1 if in_byte: return 1 return ''
[docs]def utf8_native_to_unicode(c): """Implementation of utf8::native_to_unicode. Returns it's argument""" return c
[docs]def utf8_unicode_to_native(c): """Implementation of utf8::unicode_to_native. Returns it's argument""" return c
[docs]def utf8_upgrade(s): """Implementation of utf8::upgrade. Returns a tuple with the result and the length""" result = str(s) return (result, len(result))
[docs]def utf8_valid(s): """Implementation of utf8::valid. Returns 1""" return 1
[docs]def utime(atime, mtime, *args): """Implementation of perl utime function""" global TRACEBACK, AUTODIE, OS_ERROR result = 0 OS_ERROR = '' times = None if atime is None and mtime is None: pass elif atime is None: atime = 0 elif mtime is None: mtime = 0 times = (atime, mtime) for fd in args: try: if hasattr(fd, 'fileno') and os.utime in os.supports_fd: fd = fd.fileno() elif hasattr(fd, 'name'): fd = fd.name os.utime(fd, times) result += 1 except Exception as _e: OS_ERROR = str(_e) if TRACEBACK: cluck(f"utime({atime}, {mtime}, {fd}) failed: {OS_ERROR}",skip=2) if AUTODIE: raise return result
[docs]def wait(): """Replacement for perl wait() call""" global CHILD_ERROR try: (pid, stat) = os.wait() CHILD_ERROR = stat return pid except Exception: return -1
[docs]def waitpid(pid, flags): """Replacement for perl waitpid() call""" global CHILD_ERROR try: (rpid, stat) = os.waitpid(pid, options) CHILD_ERROR = stat return rpid except Exception: return -1
[docs]def warn(*args, skip=None): """Handle warn in perl""" global INPUT_LINE_NUMBER, _INPUT_FH_NAME def is_func_in_call_stack(func): # Warn handlers are turned off inside themselves frame = sys._getframe(2) while frame is not None: if func.__code__ == frame.f_code: return True frame = frame.f_back return False if hasattr(builtins, 'CORE') and hasattr(builtins.CORE, 'GLOBAL') and \ hasattr(builtins.CORE.GLOBAL, 'warn') and callable(builtins.CORE.GLOBAL.warn) and not \ is_func_in_call_stack(builtins.CORE.GLOBAL.warn): return builtins.CORE.GLOBAL.warn(*args) args = list(map(_str, args)) if len(args) == 0 or len(''.join(args)) == 0: args = ["Warning: something's wrong"] try: if EVAL_ERROR: args = [EVAL_ERROR, "\t...caught"] except Exception: pass if "\n" not in args[-1]: (_, fn, lno, *_) = caller() if skip is None else caller(skip) iln = None ifn = None try: iln = fileinput.lineno() ifn = '<fileinput>' except RuntimeError: iln = INPUT_LINE_NUMBER if _INPUT_FH_NAME: ifn = f"<{_INPUT_FH_NAME}>" if iln and ifn: args.append(f" at {fn} line {lno}, {ifn} line {iln}.\n") else: args.append(f" at {fn} line {lno}.\n") if callable(SIG_WARN_HANDLER) and not is_func_in_call_stack(SIG_WARN_HANDLER): arg = ''.join(args) SIG_WARN_HANDLER(arg) else: print(*args, sep='', end='', file=sys.stderr) return 1
[docs]def write_(fh, scalar, length=None, offset=0): """Implementation of perl $fh->write""" if length is None and hasattr(scalar, 'len'): length = len(scalar)-offset if 'b' in fh.mode: if isinstance(scalar, str): return fh.write(scalar[offset:length+offset].encode()) elif isinstance(scalar, bytes): return fh.write(scalar[offset:length+offset]) return fh.write(str(scalar).encode()) else: if isinstance(scalar, str): return fh.write(scalar[offset:length+offset]) elif isinstance(scalar, bytes): return fh.write(scalar[offset:length+offset].decode()) return fh.write(str(scalar))
[docs]def xor_element(base, index, value): base[index] ^= value return base[index]