# perllib module for pythonizer, generated by makelib.py on Wed Apr 19 09:51:29 2023
#
# WARNING: Do not edit this file - to change the functions or add new ones, edit them in the pyf directory,
# then re-run makelib.py
#
__author__ = """Joe Cool"""
___email__ = 'snoopyjc@gmail.com'
__version__ = '1.030'
import sys,os,re,fileinput,subprocess,collections.abc,warnings,inspect,itertools,signal,traceback,io,tempfile,calendar,types,random,dataclasses,builtins,codecs,struct,pprint,functools,argparse,abc,copy
import time as tm_py
import stat as st_py
_str = lambda s: "" if s is None else str(s)
try:
import fcntl as fc_py
except Exception:
pass
[docs]class Die(Exception):
def __init__(self, *args,suppress_traceback=None):
super().__init__(*args)
if TRACEBACK and not suppress_traceback:
cluck()
TRACEBACK = 0
WARNING = 0
OUTPUT_AUTOFLUSH = 0
EXCEPTIONS_BEING_CAUGHT = ''
CHILD_ERROR = 0
TRACE_RUN = 0
AUTODIE = 0
SIG_DIE_HANDLER = None
OUTPUT_LAYERS = ''
BASETIME = tm_py.time()
OUTPUT_RECORD_SEPARATOR = ''
_OPEN_MODE_MAP = {'<': 'r', '>': 'w', '+<': 'r+', '+>': 'w+', '>>': 'a', '+>>': 'a+', '|': '|-'}
SIG_WARN_HANDLER = None
INPUT_RECORD_SEPARATOR = "\n"
OS_ERROR = ''
LIST_SEPARATOR = ' '
EVAL_ERROR = ''
INPUT_LAYERS = ''
_TIE_MAP = {'DELETE': '__delitem__','STORE': '__setitem__','SCALAR': '__len__','FETCHSIZE': '__len__','UNTIE': '__untie__','FETCH': '__getitem__','DESTROY': '__del__','CLEAR': 'clear','PUSH': 'append','EXISTS': '__contains__'}
_INPUT_FH_NAME = None
OUTPUT_FIELD_SEPARATOR = ''
_DUP_MAP = dict(STDIN=0, STDOUT=1, STDERR=2)
_PYTHONIZER_KEYWORDS = {'pass','False','print','fcntl','complex','async','lambda','bytearray','oct','property','class','raise','iter','as','traceback','aiter','next','not','dict','get','sum','list','break','re','pow','object','or','min','compile','map','inspect','None','True','atexit','set','warnings','isinstance','return','float','continue','find','tempfile','str','import','int','update','open','upper','try','calendar','bool','hex','abs','repr','signal','builtins','any','dir','Hash','with','sys','getattr','eval','math','keys','exec','hasattr','dataclasses','setattr','subprocess','yield','insert','lower','if','frozenset','help','divmod','issubclass','round','fileinput','Array','bin','assert','memoryview','for','classmethod','all','ArrayHash','globals','id','callable','types','input','is','rfind','anext','tm_py','pdb','argparse','await','filter','bytes','len','max','finally','and','codecs','collections.abc','pop','vars','enumerate','ascii','io','rstrip','struct','abc','staticmethod','locals','del','except','type','reversed','functools','elif','glob','format','copy','slice','wantarray','close','while','random','super','values','os','def','breakpoint','hash','nonlocal','in','casefold','stat','itertools','ord','global','chr','sorted','zip','range','delattr','from','else','getopt','perllib','tuple','extend'}
INPUT_LINE_NUMBER = 0
[docs]def init_package(name, is_class=False, isa=(), autovivification=True):
"""Initialize a package by creating a namespace for it"""
global TRACEBACK, AUTODIE, TRACE_RUN, WARNING
if name == 'main':
if (v := os.environ.get('PERLLIB_TRACEBACK')) is not None:
TRACEBACK = int(v) if v.isdecimal() else 1
if (v := os.environ.get('PERLLIB_AUTODIE')) is not None:
AUTODIE = int(v) if v.isdecimal() else 1
if (v := os.environ.get('PERLLIB_TRACE_RUN')) is not None:
TRACE_RUN = int(v) if v.isdecimal() else 1
if (v := os.environ.get('PERLLIB_WARNING')) is not None:
WARNING = int(v) if v.isdecimal() else 1
pieces = name.split('.')
parent = builtins
parent_name = ''
package_name = ''
class perllibMeta(abc.ABCMeta):
def __str__(self): # so str(class_v) works
return self.__name__.replace('.', '::') if isinstance(self, type) else super().__str__()
for i, piece in enumerate(pieces):
prior_namespace = namespace = None
if hasattr(parent, piece):
namespace = getattr(parent, piece)
if parent_name:
package_name = parent_name + '.' + piece
else:
package_name = piece
if (is_class or isa) and i == len(pieces)-1 and not isinstance(namespace, type):
prior_namespace = namespace # we have the wrong type of namespace - copy it over below
namespace = None
if namespace is None:
if (is_class or isa) and i == len(pieces)-1:
class_parents = []
any_parent_is_class = False
if not hasattr(builtins, 'UNIVERSAL'):
if autovivification:
builtins.UNIVERSAL = _ArrayHashClass
else:
builtins.UNIVERSAL = type('UNIVERSAL', tuple(), dict())
for p in isa:
py = p.replace("'", '.').replace('::', '.')
if hasattr(builtins, f"{py}_"): # handle names that need to be escaped
py = f"{py}_"
if hasattr(builtins, py):
parent_namespace = getattr(builtins, py)
if not isinstance(parent_namespace, type):
# Promote the parent namespace to a class if it isn't one already
if autovivification:
new_parent_namespace = type(name, (_ArrayHashClass,), Hash())
else:
new_parent_namespace = type(name, tuple(), dict())
new_parent_namespace.__class__ = perllibMeta
new_parent_namespace.__eq__ = lambda self, other: self is other
new_parent_namespace.__bool__ = lambda self: True
for k, v in parent_namespace.__dict__.items():
setattr(new_parent_namespace, k, v)
parent_package_name = parent_namespace.__PACKAGE__
setattr(builtins, parent_package_name, new_parent_namespace)
setattr(builtins, f"main.{parent_package_name}", new_parent_namespace)
if(parent_namespace.__PARENT__):
ppn_pieces = parent_package_name.split('.')
grandparent_package_name = parent_namespace.__PARENT__
grandparent_namespace = getattr(builtins, grandparent_package_name)
setattr(grandparent_namespace, ppn_pieces[-1], new_parent_namespace)
parent_namespace = new_parent_namespace
if class_parents and type(class_parents[0]) != type(parent_namespace) and \
type(class_parents[0]).__name__ == type(parent_namespace).__name__:
# Avoid: TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
parent_namespace.__class__ = class_parents[0].__class__
class_parents.append(parent_namespace)
if isinstance(parent_namespace, type):
any_parent_is_class = True
if autovivification:
if is_class and not any_parent_is_class:
class_parents.append(_ArrayHashClass)
namespace = type(name, tuple(class_parents), Hash())
else:
if is_class and not any_parent_is_class:
class_parents.append(builtins.UNIVERSAL)
namespace = type(name, tuple(class_parents), dict())
if is_class or any_parent_is_class:
for key in dir(namespace):
if isinstance(getattr(namespace, key), types.MethodType):
setattr(namespace, key, types.MethodType(getattr(namespace, key).__func__, namespace))
if is_class or any_parent_is_class:
namespace.__class__ = perllibMeta
namespace.__eq__ = lambda self, other: self is other
namespace.__bool__ = lambda self: True
if prior_namespace is not None:
for k, v in prior_namespace.__dict__.items():
setattr(namespace, k, v)
else:
namespace = types.SimpleNamespace()
namespace.__autovivification__ = autovivification
if parent_name:
package_name = parent_name + '.' + piece
else:
package_name = piece
namespace.__PARENT__ = parent_name
namespace.__PACKAGE__ = package_name
setattr(parent, piece, namespace)
if parent != builtins:
setattr(builtins, package_name, namespace)
if pieces[0] != 'main':
if not hasattr(builtins, 'main'):
init_package('main')
setattr(builtins.main, package_name, namespace)
setattr(builtins, f"main.{package_name}", namespace)
elif name != 'main':
setattr(builtins, f"main.{piece}", namespace)
if pieces[0] != 'main':
if not hasattr(builtins, 'main'):
init_package('main')
setattr(builtins.main, piece, namespace)
parent = namespace
parent_name = package_name
if not hasattr(builtins, '__packages__'):
builtins.__packages__ = set()
builtins.__packages__.add(name)
return namespace
[docs]def Array(init=None):
"""Array with autovivification"""
return ArrayHash(init, isHash=False)
class _ArrayHash(collections.defaultdict, collections.abc.Sequence):
"""Implements autovivification of array elements and hash keys"""
def __init__(self, fcn, isHash=None):
self.isHash = isHash # Can be None (not determined yet), False (is an Array), or True (is a Hash)
super().__init__(fcn)
def get(self, key, default=None):
if self.isHash:
return super().get(key, default)
elif self.isHash is False:
if key < 0:
key += len(self)
return self[key] if key >= 0 and key < len(self) else default
else:
return default
def append(self, value):
if self.isHash is None:
self.isHash = False
elif self.isHash:
raise TypeError('Not an ARRAY reference')
self[len(self)] = value
def copy(self):
if self.isHash:
return Hash(self)
elif self.isHash is None:
return ArrayHash(self)
else:
return Array(self)
def extend(self, lst):
if self.isHash is None:
self.isHash = False
elif self.isHash and len(lst):
raise TypeError('Not an ARRAY reference')
ln = len(self)
for item in lst:
self[ln] = item
ln += 1
def update(self, values):
if self.isHash is None:
self.isHash = True
elif not self.isHash:
raise TypeError('Not a HASH reference')
for k, v in values.items():
self[k] = v
def pop(self, key=-1, default=None):
if self.isHash:
if key in self:
value = self[key]
del self[key]
return value
return default
else:
ls = len(self)
if not ls:
return None
if key < 0:
key += ls
if key < ls:
value = self[key]
for i in range(key, ls-1):
self[i] = self[i+1]
del self[ls-1]
return value
return None
def remove(self, x):
if self.isHash:
raise TypeError('Not an ARRAY reference')
for ndx in range(len(self)):
if self[ndx] == x:
return self.pop(ndx)
raise ValueError(f"remove({x}): not found in Array")
def __getitem__(self, index):
if self.isHash:
try:
return super().__getitem__(index)
except (TypeError, KeyError):
return super().__getitem__(str(index))
elif self.isHash is None:
if isinstance(index, int) or isinstance(index, slice):
self.isHash = False
else:
self.isHash = True
try:
return super().__getitem__(index)
except TypeError:
return super().__getitem__(str(index))
if isinstance(index, int):
if index < 0:
index += len(self)
return super().__getitem__(index)
elif isinstance(index, slice):
return Array([self[i] for i in range(*index.indices(len(self)))])
else:
raise TypeError('Not a HASH reference')
def __setitem__(self, index, value):
if self.isHash:
try:
super().__setitem__(index, value)
except TypeError:
super().__setitem__(str(index), value)
return
elif self.isHash is None:
if isinstance(index, int) or isinstance(index, slice):
self.isHash = False
else:
self.isHash = True
try:
super().__setitem__(index, value)
except TypeError:
super().__setitem__(str(index), value)
if isinstance(index, int):
if index < 0:
index += len(self)
for i in range(len(self), index):
super().__setitem__(i, None)
super().__setitem__(index, value)
return
elif isinstance(index, slice):
if index.start is not None:
for i in range(len(self), index.start):
super().__setitem__(i, None)
value = iter(value)
ndx = index.start if index.start is not None else 0
j = None
for i in range(*index.indices(len(self))):
try:
super().__setitem__(i, next(value))
except StopIteration:
if j is None:
j = i
self.pop(j)
ndx += 1
rest = list(value)
lr = len(rest)
if lr:
for i in range(len(self)-1,ndx-1,-1): # Move everything else up
super().__setitem__(i+lr, super().__getitem__(i))
for i in range(lr):
super().__setitem__(i+ndx, rest[i])
def __delitem__(self, index):
if self.isHash:
try:
super().__delitem__(index)
except (TypeError, KeyError):
super().__delitem__(str(index))
elif isinstance(index, int):
if self.isHash:
raise TypeError('Not an ARRAY reference')
ls = len(self)
if not ls:
return
if index < 0:
index += len(self)
super().__delitem__(index)
elif isinstance(index, slice):
if self.isHash:
raise TypeError('Not an ARRAY reference')
for i in range(*index.indices(len(self))):
super().__delitem__(i)
def __iter__(self):
if self.isHash:
for i in self.keys():
yield i
else:
for i in range(len(self)):
yield self[i]
def __str__(self):
if self.isHash:
return str(dict(self))
elif self.isHash is None:
return ''
return str(list(self))
def __repr__(self):
if self.isHash:
return "Hash(" + self.__str__() + ")"
elif self.isHash is None:
return "ArrayHash(" + self.__str__() + ")"
return "Array(" + self.__str__() + ")"
def __add__(self, other):
result = ArrayHash(self)
if self.isHash or (isinstance(other, dict) and not isinstance(other, _ArrayHash)) or (hasattr(other, 'isHash') and other.isHash):
result.update(other)
elif self.isHash is None and isinstance(other, (int, float, str)):
return other
else:
result.extend(other)
return result
def __iadd__(self, other):
if self.isHash or (isinstance(other, dict) and not isinstance(other, _ArrayHash)) or (hasattr(other, 'isHash') and other.isHash):
self.update(other)
elif self.isHash is None and isinstance(other, (int, float, str)):
return other
else:
self.extend(other)
return self
def __radd__(self, other):
result = ArrayHash()
if self.isHash or (isinstance(other, dict) and not isinstance(other, _ArrayHash)) or (hasattr(other, 'isHash') and other.isHash):
result.update(other)
result.update(self)
elif self.isHash is None and isinstance(other, (int, float, str)):
return other
else:
result.extend(other)
result.extend(self)
return result
def __mul__(self, other):
if isinstance(other, int):
if self.isHash is False: # Only perform multiplication for arrays
new_data = ArrayHash(isHash=False)
for _ in range(other):
new_data.extend(self)
return new_data
else:
raise TypeError("Can only multiply Array instances by an integer")
else:
raise TypeError("Can only multiply Array by an integer")
def __rmul__(self, other):
return self.__mul__(other)
def __eq__(self, other):
if self.isHash is None:
if hasattr(other, 'isHash') and other.isHash is None:
return True
try:
return '' == other
except Exception:
pass
try:
return 0 == other
except Exception:
pass
try:
return 0 == len(other)
except Exception:
pass
if other is None:
return True
return False
try:
if(len(self) != len(other)):
return False
i1 = iter(self)
i2 = iter(other)
while True:
if next(i1) != next(i2):
return False
return True
except StopIteration:
return True
except Exception:
return False
def __lt__(self, other):
if self.isHash is None:
if hasattr(other, 'isHash') and other.isHash is None:
return False
try:
return '' < other
except Exception:
pass
try:
return 0 < other
except Exception:
pass
try:
return 0 < len(other)
except Exception:
pass
if other is None:
return False
return False
try:
i1 = iter(self)
i2 = iter(other)
while True:
ni1 = next(i1)
try:
ni2 = next(i2)
except StopIteration:
return False
if ni1 < ni2:
return True
elif ni1 > ni2:
return False
except StopIteration:
try:
next(i2)
except StopIteration:
return False
return True
except Exception:
return False
def __ne__(self, other):
return not self == other
def __le__(self, other):
return self < other or self == other
def __ge__(self, other):
return not self < other
def __gt__(self, other):
return not (self < other or self == other)
def __contains__(self, key):
if self.isHash:
return key in self.keys()
else:
return any(key == self[i] for i in range(len(self)))
[docs]def ArrayHash(init=None,isHash=None):
"""Acts like an array or hash with autovivification"""
result = _ArrayHash(ArrayHash,isHash=isHash)
if init is not None:
if isinstance(init, _ArrayHash) or '_ArrayHash' in str(type(init)):
if init.isHash:
result.update(init)
else:
result.extend(init)
elif isinstance(init, collections.abc.Mapping):
result.update(init)
elif isinstance(init, collections.abc.Iterable) and not isinstance(init, str):
result.extend(init)
else:
result.append(init)
return result
def _partialclass(cls, *args, **kwds):
class UNIVERSAL(cls):
__init__ = functools.partialmethod(cls.__init__, *args, **kwds)
return UNIVERSAL
_ArrayHashClass = _partialclass(_ArrayHash, ArrayHash)
[docs]def Hash(init=None):
"""Hash with autovivification"""
return ArrayHash(init, isHash=True)
[docs]def abspath(*args):
"""Implementation of perl Cwd::abs_path function"""
if len(args) == 0:
return os.path.abspath(os.getcwd())
return os.path.abspath(args[0])
[docs]def add_element(base, index, value):
"""Implementation of += on an array element"""
try:
base[index] += value
except TypeError:
if isinstance(value, int) or isinstance(value, float):
base[index] = num(base[index]) + value
elif value is None:
base[index] = num(base[index])
else:
base[index] = num(base[index]) + num(value)
return base[index]
[docs]def add_path(path_list):
"""Add a path_list to the system path list"""
sys.path[0:0] = path_list
[docs]def add_tie_call(func, package):
"""Add a call to _tie_call for functions defined in a tie package"""
def tie_call_func(*args, **kwargs):
__package__ = package # for caller() only
return tie_call(func, args, kwargs)
return tie_call_func
[docs]def add_tie_methods(obj):
"""Create a subclass for the object and add the methods to it to implement 'tie', like __getitem__ etc. The call to this functions is generated on any 'return' statement (or implicit return) in TIEHASH or TIEARRAY"""
try:
cls = obj.__class__
except Exception: # Not an object, so just ignore
return obj
is_scalar = is_hash = is_array = False
if hasattr(cls, 'TIESCALAR'):
is_scalar = True
if hasattr(cls, 'TIEARRAY'):
is_array = True
if hasattr(cls, 'TIEHASH'):
is_hash = True
if not is_array and not is_hash and not is_scalar:
return obj
elif hasattr(cls, '__TIE_subclass__'):
obj.__class__ = cls.__TIE_subclass__
return obj
classname = cls.__name__
result = type(classname, (cls,), Hash() if hasattr(cls, 'isHash') else dict())
if is_scalar:
def __get__(self, obj, objtype=None):
return self.FETCH()
result.__get__ = __get__
def __set__(self, obj, value):
return self.STORE(value)
result.__set__ = __set__
if hasattr(result, 'DELETE'):
setattr(result, _TIE_MAP['DELETE'], getattr(result, 'DELETE'))
if hasattr(result, 'UNTIE'):
setattr(result, _TIE_MAP['UNTIE'], getattr(result, 'UNTIE'))
else:
setattr(result, _TIE_MAP['UNTIE'], lambda self: None)
cls.__TIE_subclass__ = result
obj.__class__ = result
return obj
for m, p in _TIE_MAP.items():
if hasattr(result, m):
setattr(result, p, getattr(result, m))
elif p != '__del__' and p != '__len__': # Don't define __del__ unless they define DELETE, __len__ could be SCALAR or FETCHSIZE
setattr(result, p, eval(f'lambda *_args: raise_(Die(\'Can\\\'t locate object method "{m}" via package "{classname}"\'))'))
if not hasattr(result, 'SCALAR') and not hasattr(result, 'FETCHSIZE'):
setattr(result, _TIE_MAP['SCALAR'], eval(f'lambda *_args: raise_(Die(\'Can\\\'t locate object method SCALAR or FETCHSIZE via package "{classname}"\'))'))
# Always generate an __untie__ method unless we generated it above
if not hasattr(result, 'UNTIE'):
setattr(result, _TIE_MAP['UNTIE'], lambda self: None)
result.__bool__ = lambda self: True
if is_array:
if hasattr(result, 'POP') and hasattr(result, 'SHIFT'):
result.pop = lambda *_args: _args[0].POP() if len(_args) == 1 else _args[0].SHIFT()
else:
def pop(self, ndx=-1):
result = self.FETCH(ndx)
self.DELETE(ndx)
return result
setattr(result, 'pop', pop)
result.extend = lambda self, lst: [self.PUSH(l) for l in lst]
def __getitem__(self, index):
if isinstance(index, int):
if index < 0:
index += len(self)
return self.FETCH(index)
elif isinstance(index, slice):
return Array([self[i] for i in range(*index.indices(len(self)))])
else:
return self.FETCH(index)
result.__getitem__ = __getitem__
def __setitem__(self, index, value):
if isinstance(index, int):
if index < 0:
index += len(self)
return self.STORE(index, value)
elif isinstance(index, slice):
if index.start is not None:
for i in range(len(self), index.start):
self.STORE(i, None)
value = iter(value)
ndx = index.start if index.start is not None else 0
j = None
for i in range(*index.indices(len(self))):
try:
self.STORE(i, next(value))
except StopIteration:
if j is None:
j = i
self.pop(j)
ndx += 1
rest = list(value)
lr = len(rest)
if lr:
for i in range(len(self)-1,ndx-1,-1): # Move everything else up
self.STORE(i+lr, self.FETCH(i))
for i in range(lr):
self.STORE(i+ndx, rest[i])
else:
return self.STORE(index, value)
result.__setitem__ = __setitem__
def __delitem__(self, index):
if isinstance(index, int):
ls = len(self)
if not ls:
return
if index < 0:
index += len(self)
self.DELETE(index)
elif isinstance(index, slice):
for i in range(*index.indices(len(self))):
self.DELETE(i)
else:
try:
self.DELETE(index)
except (TypeError, KeyError):
self.DELETE(str(index))
result.__delitem__ = __delitem__
def get(self, index, default=None):
if index < 0:
index += len(self)
if index < 0 or index >= len(self):
return default
return self.FETCH(index)
result.get = get
def __iter__(self):
for i in range(self.SCALAR() if hasattr(self, 'SCALAR') else self.FETCHSIZE()):
yield self.FETCH(i)
result.__iter__ = __iter__
cls.__TIE_subclass__ = result
if is_hash:
if is_array:
def __iter__(self):
if self.isHash:
current_key = self.FIRSTKEY()
# Handle FIRSTKEY being implemented using each
if isinstance(current_key, collections.abc.Sequence) and not isinstance(current_key, str):
if len(current_key) == 0:
current_key = None
else:
current_key = current_key[0]
while current_key is not None:
yield current_key
current_key = self.NEXTKEY()
if isinstance(current_key, collections.abc.Sequence) and not isinstance(current_key, str):
if len(current_key) == 0:
current_key = None
else:
current_key = current_key[0]
else:
for i in range(self.SCALAR() if hasattr(self, "SCALAR") else self.FETCHSIZE()):
yield self.FETCH(i)
result.__iter__ = __iter__
else:
def __iter__(self):
current_key = self.FIRSTKEY()
# Handle FIRSTKEY being implemented using each
if isinstance(current_key, collections.abc.Sequence) and not isinstance(current_key, str):
if len(current_key) == 0:
current_key = None
else:
current_key = current_key[0]
while current_key is not None:
yield current_key
current_key = self.NEXTKEY()
if isinstance(current_key, collections.abc.Sequence) and not isinstance(current_key, str):
if len(current_key) == 0:
current_key = None
else:
current_key = current_key[0]
result.__iter__ = __iter__
if is_array:
def pop(*args):
self = args[0]
if len(args) == 1:
key = -1
else:
key = args[1]
if isinstance(key, int): # Array style
result = self.FETCH(key)
self.DELETE(key)
return result
if not self.EXISTS(key): # Hash style
if len(args) >= 2:
return args[2] # default
return None # default default
return self.DELETE(key)
result.pop = pop
def get(self, key, default=None):
if isinstance(key, int): # Array style
if key < 0:
key += len(self)
if key < 0 or key >= len(self):
return default
else: # Hash style
if not self.EXISTS(key):
return default
return self.FETCH(key)
result.get = get
else:
def pop(self, key, default=None):
if not self.EXISTS(key):
return default
return self.DELETE(key)
result.pop = pop
def get(self, key, default=None):
if not self.EXISTS(key):
return default
return self.FETCH(key)
result.get = get
result.keys = lambda self: [k for k in self]
result.values = lambda self: [self[k] for k in self]
result.items = lambda self: [(k, self[k]) for k in self]
result.update = lambda self, items: {self.STORE(i, items[i]) for i in items}
cls.__TIE_subclass__ = result
obj.__class__ = result
return obj
[docs]def and_element(base, index, value):
base[index] &= value
return base[index]
[docs]def assign_global(packname, varname, value):
"""Assigns a value to a package global variable and returns the value"""
namespace = getattr(builtins, packname)
setattr(namespace, varname, value)
return value
[docs]def assign_hash(h, keys, values):
"""Assign a hash with a list of hash keys and a list of values"""
keys = list(keys)
values = list(values)
if len(keys) == len(values):
for i in range(len(keys)):
h[_str(keys[i])] = values[i]
else:
for i in range(len(keys)):
h[_str(keys[i])] = values[i] if i < len(values) else None
return h
[docs]def assign_sparse(lst, indexes, values):
"""Assign a list with a sparse list of indexes and a list of values"""
if len(indexes) == len(values):
for i in range(len(indexes)):
lst[int_(indexes[i])] = values[i]
else:
for i in range(len(indexes)):
lst[int_(indexes[i])] = values[i] if i < len(values) else None
return lst
[docs]def autoflush(self, arg=1):
"""Method added to FH to support OO perl"""
orig = self._autoflush if hasattr(self, '_autoflush') else 0
self._autoflush = arg
if arg:
self._orig_writelines = self.writelines
def new_writelines(self, lines):
self._orig_writelines(lines)
self.flush()
self.writelines = types.MethodType(new_writelines, self)
if hasattr(self, 'write'):
self._orig_write = self.write
def new_write(self, b):
result = self._orig_write(b)
self.flush()
return result
self.write = types.MethodType(new_write, self)
elif hasattr(self, '_orig_writelines'):
self.writelines = self._orig_writelines
if hasattr(self, '_orig_write'):
self.write = self._orig_write
return orig
[docs]def basename(path, *suffixes):
"""Implementation of perl basename function"""
path = re.sub(r'(.)/*$', r'\1', path, flags=re.S)
[basename, dirname, suffix] = fileparse(path, *map(re.escape, suffixes))
if len(suffix) and not len(basename):
basename = suffix
if not len(basename):
basename = dirname
return basename
[docs]def binmode(fh,mode='b',encoding=None,errors=None,newline=None):
"""Handle binmode"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
omode = ''
fno = None
try:
fno = fh.fileno()
fh.flush() # could be a closed file
omode = fh.mode # could not have a mode
except Exception:
pass
if mode is None:
mode = omode.replace('b', '')
else:
mode = omode + mode
if encoding is None and 'b' not in mode:
encoding = fh.encoding
if errors is None and 'b' not in mode:
errors = fh.errors
if fno is None:
result = io.TextIOWrapper(io.BufferedIOBase(), encoding=encoding, errors=errors, newline=newline)
else:
result = os.fdopen(os.dup(fno), mode, encoding=encoding, errors=errors, newline=newline)
if hasattr(fh, 'filename') and hasattr(fh, '_name'): # from tempfile
result.filename = fh.filename
result._name = fh._name
if hasattr(fh, '_autoflush'):
result._autoflush = fh._autoflush
if hasattr(fh, 'autoflush'):
result.autoflush = types.MethodType(autoflush, result)
if hasattr(fh, 'say'): # from IO::File
return _create_all_fh_methods(result)
return result
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"binmode failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return None
[docs]def binmode_dynamic(fh, mode):
"""Handle binmode where the mode/layers are dynamic"""
encoding = None
errors = None
newline = None
mmode = mode
ext = None
if ':' in mode:
mode, ext = mode.split(':')
if mode in _OPEN_MODE_MAP:
mode = _OPEN_MODE_MAP[mode]
else:
mode = 'r'
if ext:
if ext == 'raw' or ext == 'bytes':
mode += 'b'
elif ext.startswith('encoding('):
encoding = ext.replace('encoding(','').replace(')','')
errors = 'replace'
elif ext == 'utf8':
encoding = 'UTF-8'
errors = 'ignore'
return binmode(fh, mode, encoding=encoding, errors=errors, newline=newline)
[docs]def bless(obj, classname, isa=()):
"""Create an object for obj in classname"""
if isinstance(classname, str):
classname = classname.replace("'", '.').replace('::', '.')
else:
if hasattr(classname, '__name__'): # They sent us the class object
classname = classname.__name__
elif hasattr(classname, '__class__'): # They sent us an instance
classname = classname.__class__.__name__
if not hasattr(builtins, classname):
init_package(classname, is_class=True, isa=isa)
result_class = getattr(builtins, classname)
result = result_class()
self = result
if hasattr(obj, 'isHash'):
if obj.isHash:
result.isHash = True
for key, value in obj.items():
self[key] = value
else:
result.isHash = False
for i, value in enumerate(obj):
self[i] = value
elif isinstance(obj, collections.abc.Mapping):
for key, value in obj.items():
self[key] = value
elif isinstance(obj, collections.abc.Iterable) and not isinstance(obj, str):
for i, value in enumerate(obj):
self[i] = value
elif WARNING:
carp(f"'bless' {classname} not implemented on {type(obj)} type object")
return result
[docs]def blessed(r):
"""blessed function in perl"""
_ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR',
"<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR',
"<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY',
"<class 'function'>": 'CODE', "<class 'dict'>": 'HASH'}
tr = type(r)
t = str(tr)
if t in _ref_map:
return None
elif '_ArrayHash' in t:
return None
if hasattr(tr, '__name__'):
return tr.__name__.replace('.', '::')
return t.replace("<class '", '').replace("'>", '').replace('.', '::')
[docs]def caller(expr=None):
""" Implementation of caller function in perl"""
try:
level = 2 if expr is None else (max(int(expr),0)+2)
cur = 2
get_level = 2
last_level = 1
while True:
fr = sys._getframe(get_level)
if fr.f_code.co_name != '_tie_call' and \
fr.f_code.co_name != 'tie_call' and \
fr.f_code.co_name != '_tie_call_func' and \
fr.f_code.co_name != 'tie_call_func' and \
fr.f_code.co_name != '<lambda>' and \
not '__goto_sub__' in fr.f_locals and \
not re.match(r'^_f\d+[a-z]?$', fr.f_code.co_name):
cur += 1
if(cur > level):
break
last_level = get_level
get_level += 1
def get_package(fr, get_level):
fr_tcf = None
try:
nfr = sys._getframe(get_level+2)
if nfr.f_code.co_name == '_tie_call' or \
nfr.f_code.co_name == 'tie_call':
nfr = sys._getframe(get_level+3)
if nfr.f_code.co_name == '_tie_call_func' or \
nfr.f_code.co_name == 'tie_call_func' or \
nfr.f_code.co_name == '<lambda>':
if '__package__' in nfr.f_locals:
# We have a variable __package__ defined in _tie_call_func for this purpose
# The reason we have that is that all _tie_call_func's code pointers are
# the same and we can't tell them apart.
if hasattr(nfr.f_locals['__package__'], '__PACKAGE__'):
return nfr.f_locals['__package__'].__PACKAGE__
fr_tcf = nfr
except Exception as e:
pass
package = None
try:
#callable_obj = fr.f_globals[fr.f_code.co_name]
callable_obj = fr.f_code
for pack in builtins.__packages__:
namespace = getattr(builtins, pack)
for key in namespace.__dict__:
func = namespace.__dict__[key]
if hasattr(func, '__code__'):
code = func.__code__
if code == callable_obj:
package = pack
break
if fr_tcf and code == fr_tcf.f_code:
package = pack
break
if hasattr(func, '__func__'): # e.g. MethodType
if func.__func__.__code__ == callable_obj:
package = pack
break
if package is not None:
break
else:
raise Exception(f"Couldn't find {callable_obj} in {builtins.__packages__}")
except Exception as e:
package = 'main'
if '__PACKAGE__' in fr.f_builtins:
package = fr.f_builtins['__PACKAGE__']
return package
package = get_package(fr, get_level)
filename = fr.f_code.co_filename
if filename == '<string>': # Running with pdb
raise ValueError
if sys.platform == 'win32':
if os.getcwd().lower() == os.path.dirname(filename).lower():
filename = os.path.basename(filename)
else:
if os.getcwd() == os.path.dirname(filename):
filename = os.path.basename(filename)
if expr is None:
return [package, filename, fr.f_lineno]
cfr = sys._getframe(last_level)
while re.match(r'^_f\d+[a-z]?$', cfr.f_code.co_name):
last_level += 1
cfr = sys._getframe(last_level)
cpackage = get_package(cfr, last_level)
wantarray = ''
argvalues = inspect.formatargvalues(*inspect.getargvalues(cfr))
if re.search(r'wantarray=True', argvalues):
wantarray = 1
return [package, filename, fr.f_lineno,
f"{cpackage}.{cfr.f_code.co_name}", 1, wantarray,
'', 0, 0, 0, 0]
except ValueError:
if expr is None:
return [None, None, None]
else:
return [None, None, None, None, None, None, None, None, None, None, None]
[docs]def caller_s(expr=None):
""" Implementation of caller function in scalar context"""
result = caller(1 if expr is None else (max(int(expr),0)+1))
if result is None:
return result
return result[0]
[docs]def can(self, methodname):
"""Implementation of CLASS::can and $obj->can"""
if self is None:
return None
if isinstance(self, str):
if hasattr(builtins, self):
self = getattr(builtins, self)
if hasattr(self, methodname):
method = getattr(self, methodname)
if callable(method):
return method
if methodname == 'can':
return can
if methodname == 'isa':
return isa
return None
[docs]def carp(*args,skip=1):
"""Warn with no backtrace"""
if TRACEBACK:
print(longmess(*args, skip=skip), end='', file=sys.stderr)
else:
print(shortmess(*args, skip=skip), end='', file=sys.stderr)
return 1
[docs]def cgtime(secs=None):
"""Replacement for perl built-in gmtime function in scalar context"""
return tm_py.asctime(tm_py.gmtime(secs))
[docs]def chdir(d):
"""Implementation of perl chdir"""
global AUTODIE, TRACEBACK, OS_ERROR
try:
os.chdir(d)
return 1
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(OS_ERROR,skip=2)
if AUTODIE:
raise
return ''
[docs]def chmod(mode, *argv):
"""Implementation of perl chmod function"""
result = 0
for arg in argv:
try:
os.chmod(arg, mode)
result += 1
except Exception:
pass
return result
[docs]def chomp_element(base, index, value):
"""Implementation of perl = and chomp on an array element"""
if value is None:
value = ''
if INPUT_RECORD_SEPARATOR is None or isinstance(INPUT_RECORD_SEPARATOR, int):
base[index] = value
return 0
if INPUT_RECORD_SEPARATOR == '':
chomped_value = value.rstrip("\n")
else:
chomped_value = value.rstrip(INPUT_RECORD_SEPARATOR)
base[index] = chomped_value
return len(value) - len(base[index])
[docs]def chomp_global(packname, varname, value):
"""Assigns a value to a package global variable, does a chomp and returns the number of chars chopped"""
namespace = getattr(builtins, packname)
if value is None:
value = ''
if INPUT_RECORD_SEPARATOR is None or isinstance(INPUT_RECORD_SEPARATOR, int):
setattr(namespace, varname, value)
return 0
if INPUT_RECORD_SEPARATOR == '':
chomped_value = value.rstrip("\n")
else:
chomped_value = value.rstrip(INPUT_RECORD_SEPARATOR)
setattr(namespace, varname, chomped_value)
return len(value) - len(chomped_value)
[docs]def chomp_with_result(var):
"""Implementation of chomp where the count of chars removed is needed. Returns a tuple
of (result, count)."""
count = 0
if var is None:
var = ''
if INPUT_RECORD_SEPARATOR is None or isinstance(INPUT_RECORD_SEPARATOR, int):
return (var, 0)
if (hasattr(var, 'isHash') and var.isHash) or (not hasattr(var, 'isHash') and isinstance(var, collections.abc.Mapping)):
for k, v in var.items():
(var[k], cnt) = chomp_with_result(v)
count += cnt
return (var, count)
if isinstance(var, collections.abc.Iterable) and not isinstance(var, str):
for i, v in enumerate(var):
(var[i], cnt) = chomp_with_result(v)
count += cnt
return (var, count)
var = str(var)
if INPUT_RECORD_SEPARATOR == '':
result = var.rstrip("\n")
elif var.endswith(INPUT_RECORD_SEPARATOR):
result = var[0:-len(INPUT_RECORD_SEPARATOR)]
else:
result = var
return (result, len(var) - len(result))
[docs]def chop_element(base, index, value):
"""Implementation of perl = and chop on an array element"""
if value is None:
value = ''
result = value[-1:]
base[index] = value[0:-1]
return result
[docs]def chop_global(packname, varname, value):
"""Assigns a value to a package global variable, does a chop and returns the value chopped"""
namespace = getattr(builtins, packname)
if value is None:
value = ''
result = value[-1:]
setattr(namespace, varname, value[0:-1])
return result
[docs]def chop_without_result(var):
"""Implementation of chop where the last char removed is not needed. Returns a tuple
of (result, '')."""
if var is None:
var = ''
if (hasattr(var, 'isHash') and var.isHash) or (not hasattr(var, 'isHash') and isinstance(var, collections.abc.Mapping)):
for k, v in var.items():
(var[k], _) = chop_without_result(v)
return (var, '')
if isinstance(var, collections.abc.Iterable) and not isinstance(var, str):
for i, v in enumerate(var):
(var[i], _) = chop_without_result(v)
return (var, '')
var = str(var)
result = var[0:-1]
return (result, '')
[docs]def chop_with_result(var):
"""Implementation of chop where the last char removed is needed. Returns a tuple
of (result, last_c)."""
if var is None:
var = ''
if (hasattr(var, 'isHash') and var.isHash) or (not hasattr(var, 'isHash') and isinstance(var, collections.abc.Mapping)):
for k, v in var.items():
(var[k], last_c) = chop_with_result(v)
return (var, last_c)
if isinstance(var, collections.abc.Iterable) and not isinstance(var, str):
for i, v in enumerate(var):
(var[i], last_c) = chop_with_result(v)
return (var, last_c)
var = str(var)
last_c = var[-1:]
result = var[0:-1]
return (result, last_c)
[docs]def clone_encoding(encoding):
"""Implementation of Encoding::clone_encoding"""
obj = find_encoding(encoding)
if obj is None:
return obj
return copy.deepcopy(obj)
[docs]def closedir(DIR):
"""Implementation of perl closedir"""
DIR[0] = None
DIR[1] = None
[docs]def close_(fh):
"""Implementation of perl close"""
global AUTODIE, TRACEBACK, OS_ERROR, TRACE_RUN
try:
if hasattr(fh, '_sp'): # issue 72: subprocess
fh.flush()
fh._sp.communicate()
if TRACE_RUN:
sp = subprocess.CompletedProcess(f"open({fh._file})", fh._sp.returncode)
carp(f'trace close({fh._file}): {repr(sp)}', skip=2)
fh.close()
if fh._sp.returncode:
raise IOError(f"close({fh._file}): failed with {fh._sp.returncode}")
return 1
if fh is None:
raise TypeError(f"close(None): failed")
#if WARNING and fh.closed:
#carp(f"close failed: Filehandle is already closed", skip=2)
fh.close()
return 1
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(OS_ERROR,skip=2)
if AUTODIE:
raise
return ''
[docs]def cluck(*args,skip=1):
"""Warn with stack backtrace"""
print(longmess(*args, skip=skip), end='', file=sys.stderr)
return 1
[docs]def cmp(a,b):
"""3-way comparison like the cmp operator in perl"""
if a is None:
a = ''
elif hasattr(a, '__cmp__'):
return a.__cmp__(b)
if b is None:
b = ''
elif hasattr(b, '__rcmp__'):
return b.__rcmp__(a)
a = str(a)
b = str(b)
return (a > b) - (a < b)
[docs]def concat_element(base, index, value):
"""Implementation of perl .= on an array element"""
try:
base[index] += value
except TypeError:
if value is None:
if base[index] is None:
base[index] = ''
else:
base[index] = str(base[index])
else:
if base[index] is None:
base[index] = str(value)
else:
base[index] = str(base[index]) + str(value)
return base[index]
[docs]def confess(*args,skip=1):
"""Error with stack backtrace"""
if TRACEBACK:
raise Die(longmess(*args, skip=skip),suppress_traceback=True)
raise Die(longmess(*args, skip=skip))
[docs]def croak(*args,skip=1):
"""Error with no backtrace"""
if TRACEBACK:
raise Die(longmess(*args, skip=skip),suppress_traceback=True)
raise Die(shortmess(*args, skip=skip))
[docs]def curdir():
"""Implementation of File::Spec->curdir"""
return '.'
init_package('Encode')
[docs]def FB_DEFAULT():
return 0
Encode.FB_DEFAULT = FB_DEFAULT
[docs]def FB_CROAK():
return 1
Encode.FB_CROAK = FB_CROAK
[docs]def FB_QUIET():
return 4
Encode.FB_QUIET = FB_QUIET
Encode.FB_WARN = FB_WARN
[docs]def LEAVE_SRC():
return 8
Encode.LEAVE_SRC = LEAVE_SRC
[docs]def decode(encoding, octets, check=None):
"""Implementation of Encode::decode"""
# NOTE: Changes to this function should also be make in decode_utf8.py
if check is None:
check = 0
elif callable(check):
name = str(id(check))
try:
codecs.lookup_error(name)
except LookupError:
def handler(uee):
slc = uee.object[uee.start:uee.end]
if not isinstance(slc, bytes):
slc = bytes(slc,encoding='latin-1')
return (check(*slc), uee.end)
codecs.register_error(name, handler)
return octets.encode('latin-1').decode(encoding, errors=name)
else:
check = int_(check)
if check & Encode.FB_CROAK():
try:
return octets.encode('latin-1').decode(encoding)
except Exception as e:
croak(str(e))
elif (check & 7) == Encode.FB_WARN():
try:
s = octets.encode('latin-1')
return s.decode(encoding)
except UnicodeError as e:
print(str(e), file=sys.stderr)
return s[:e.start].decode(encoding)
elif (check & 7) == Encode.FB_QUIET():
# NOTE: To keep from having to change the user's string (which is difficult
# in python, since they are immutable), we instead keep track of the status
# of the current decode as an attribute of this function. We avoid using this status
# in a completely different decode by checking the first 16-bytes of the
# string being decoded, and also ensuring that the string being passed keeps growing.
# This may not be 100% effective in all cases, but it does pass a fairly
# comprehensive test (test_Encode.py)
try:
s = octets.encode('latin-1')
orig_s = s
if hasattr(decode, 'start'):
# Quick sanity check that we're still decoding the same data
verify_chars = 16
string = decode.string
check_len = min(verify_chars, decode.start)
ln = len(s)
if(ln == 1 or ln < decode.start or s[:check_len] != string[:check_len]):
delattr(decode, 'start')
else:
s = s[decode.start:]
result = s.decode(encoding)
decode.start = len(orig_s)
decode.string = orig_s
return result
except UnicodeError as e:
prior_start = 0
if e.reason.startswith('unexpected end'):
if hasattr(decode, 'start'):
prior_start = decode.start
decode.start += e.start
else:
decode.start = e.start
decode.string = orig_s
elif hasattr(decode, 'start'):
delattr(decode, 'start')
return orig_s[prior_start:decode.start].decode(encoding)
else:
return octets.encode('latin-1').decode(encoding, errors='replace')
[docs]def decode_utf8(octets, check=None):
"""Implementation of Encode::decode_utf8"""
# Note: The code here is mostly a copy of decode.py
if check is None:
check = 0
elif callable(check):
name = str(id(check))
try:
codecs.lookup_error(name)
except LookupError:
def handler(uee):
slc = uee.object[uee.start:uee.end]
if not isinstance(slc, bytes):
slc = bytes(slc,encoding='latin-1')
return (check(*slc), uee.end)
codecs.register_error(name, handler)
return octets.encode('latin-1').decode(errors=name)
else:
check = int_(check)
if check & Encode.FB_CROAK():
try:
return octets.encode('latin-1').decode()
except Exception as e:
croak(str(e))
elif (check & 7) == Encode.FB_WARN():
try:
s = octets.encode('latin-1')
return s.decode()
except UnicodeError as e:
print(str(e), file=sys.stderr)
return s[:e.start].decode()
elif (check & 7) == Encode.FB_QUIET():
# NOTE: To keep from having to change the user's string (which is difficult
# in python, since they are immutable), we instead keep track of the status
# of the current decode as an attribute of this function. We avoid using this status
# in a completely different decode_utf8 by checking the first 16-bytes of the
# string being decoded, and also ensuring that the string being passed keeps growing.
# This may not be 100% effective in all cases, but it does pass a fairly
# comprehensive test (test_Encode.py)
try:
s = octets.encode('latin-1')
orig_s = s
if hasattr(decode_utf8, 'start'):
# Quick sanity check that we're still decoding the same data
verify_chars = 16
string = decode_utf8.string
check_len = min(verify_chars, decode_utf8.start)
ln = len(s)
if(ln == 1 or ln < decode_utf8.start or s[:check_len] != string[:check_len]):
delattr(decode_utf8, 'start')
else:
s = s[decode_utf8.start:]
result = s.decode()
decode_utf8.start = len(orig_s)
decode_utf8.string = orig_s
return result
except UnicodeError as e:
prior_start = 0
if e.reason.startswith('unexpected end'):
if hasattr(decode_utf8, 'start'):
prior_start = decode_utf8.start
decode_utf8.start += e.start
else:
decode_utf8.start = e.start
decode_utf8.string = orig_s
elif hasattr(decode_utf8, 'start'):
delattr(decode_utf8, 'start')
return orig_s[prior_start:decode_utf8.start].decode()
else:
return octets.encode('latin-1').decode(errors='replace')
[docs]def define_alias(alias, name):
"""Implementation of Encode::define_alias"""
def norm(n):
return re.sub(r'[-\s]+', '_', n).lower()
if alias == norm(name):
return
try:
info = codecs.lookup(name)
result = codecs.CodecInfo(
name=alias,
encode=info.encode,
decode=info.decode)
if hasattr(info, '_obj'):
result._obj = info._obj
except LookupError:
result = None
alias = norm(alias)
codecs.register(lambda a: result if a == alias else None)
[docs]def define_encoding(obj, name, aliases):
"""Implementation of Encode::define_encoding"""
def norm(n):
return re.sub(r'[-\s]+', '_', n).lower()
def encode(s, errors='strict'):
l = len(s)
s = obj.encode(s)
return (bytes(s, encoding='latin1'), l)
def decode(b, errors='strict'):
l = len(b)
s = str(b, encoding='latin1')
return (obj.decode(s), l)
result = codecs.CodecInfo(name=name, encode=encode, decode=decode)
result._obj = obj
name = norm(name)
codecs.register(lambda n: result if n == name else None)
for alias in aliases:
define_alias(alias, obj.name())
return obj
[docs]def die(*args, skip=None):
"""Handle die in perl"""
global INPUT_LINE_NUMBER, _INPUT_FH_NAME, EVAL_ERROR
def is_func_in_call_stack(func): # Die handlers are turned off inside themselves
frame = sys._getframe(2)
while frame is not None:
if func.__code__ == frame.f_code:
return True
frame = frame.f_back
return False
if hasattr(builtins, 'CORE') and hasattr(builtins.CORE, 'GLOBAL') and \
hasattr(builtins.CORE.GLOBAL, 'die') and callable(builtins.CORE.GLOBAL.die) and not \
is_func_in_call_stack(builtins.CORE.GLOBAL.die):
return builtins.CORE.GLOBAL.die(*args)
args = list(map(_str, args))
if len(args) == 0 or len(''.join(args)) == 0:
args = ["Died"]
try:
if EVAL_ERROR or hasattr(EVAL_ERROR, 'PROPAGATE'):
if hasattr(EVAL_ERROR, 'PROPAGATE') and callable(EVAL_ERROR.PROPAGATE):
(_, fn, lno) = caller()
try:
EVAL_ERROR = EVAL_ERROR.PROPAGATE(fn, lno)
args = [EVAL_ERROR]
except Exception:
args = [EVAL_ERROR, "\t...propagated"]
else:
args = [EVAL_ERROR, "\t...propagated"]
except Exception:
pass
if "\n" not in args[-1]:
(_, fn, lno, *_) = caller() if skip is None else caller(skip)
iln = None
ifn = None
try:
iln = fileinput.lineno()
ifn = '<fileinput>'
except RuntimeError:
iln = INPUT_LINE_NUMBER
if _INPUT_FH_NAME:
ifn = f"<{_INPUT_FH_NAME}>"
if iln and ifn:
args.append(f" at {fn} line {lno}, {ifn} line {iln}.\n")
else:
args.append(f" at {fn} line {lno}.\n")
arg = ''.join(args)
if callable(SIG_DIE_HANDLER) and not is_func_in_call_stack(SIG_DIE_HANDLER):
SIG_DIE_HANDLER(arg)
orig_excepthook = sys.excepthook
def excepthook(typ, value, traceback):
if TRACEBACK:
orig_excepthook(typ, value, traceback)
else:
print(value, end='', file=sys.stderr)
if (m := re.search(r'\[Errno (\d+)\]', str(value))):
sys.exit(int(m.group(1)))
if CHILD_ERROR>>8:
sys.exit(CHILD_ERROR>>8)
sys.exit(255)
sys.excepthook = excepthook
raise Die(arg)
[docs]def dirname(fullname):
"""Emulation of File::Basename qw(dirname) for unix"""
def fileparse(fullname):
[dirpath,basename] = (_m:=re.search(re.compile(r'^(.*/)?(.*)',re.S),fullname),_m.groups() if _m else [None,None])[1]
if not (dirpath):
dirpath = './'
return (basename, dirpath)
[basename, dirname] = fileparse(fullname)
dirname = re.sub(r'(.)/*$', r'\1', dirname, flags=re.S)
if not len(basename):
[basename, dirname] = fileparse(dirname)
dirname = re.sub(r'(.)/*$', r'\1', dirname, flags=re.S)
return dirname
[docs]def divide_element(base, index, value):
base[index] /= value
return base[index]
init_package('Data.Dumper')
Data.Dumper.Indent_v = 2 # InIt
Data.Dumper.Trailingcomma_v = False # InIt
Data.Dumper.Purity_v = 0 # InIt
Data.Dumper.Pad_v = '' # InIt
Data.Dumper.Varname_v = "VAR" # InIt
Data.Dumper.Useqq_v = 0 # InIt
Data.Dumper.Terse_v = False # InIt
Data.Dumper.Freezer_v = '' # InIt
Data.Dumper.Toaster_v = '' # InIt
Data.Dumper.Deepcopy_v = 0 # InIt
Data.Dumper.Quotekeys_v = 1 # InIt
Data.Dumper.Bless_v = 'bless' # InIt
Data.Dumper.Pair_v = ':' # InIt
Data.Dumper.Maxdepth_v = 0 # InIt
Data.Dumper.Maxrecurse_v = 1000 # InIt
Data.Dumper.Useperl_v = 0 # InIt
Data.Dumper.Sortkeys_v = 0 # InIt
Data.Dumper.Deparse_v = False # InIt
Data.Dumper.Sparseseen_v = False # InIt
[docs]def Dumper(*args):
"""Implementation of Data::Dumper"""
result = []
pp = pprint.PrettyPrinter(indent=Data.Dumper.Indent_v,
depth=None if Data.Dumper.Maxdepth_v==0 else Data.Dumper.Maxdepth_v,
compact=Data.Dumper.Terse_v,
sort_dicts=Data.Dumper.Sortkeys_v)
for i, arg in enumerate(args, start=1):
if Data.Dumper.Terse_v:
result.append(f"{Data.Dumper.Pad_v}" + pp.pformat(arg))
else:
result.append(f"{Data.Dumper.Pad_v}{Data.Dumper.Varname_v}{i} = " + pp.pformat(arg))
spacer = " " if Data.Dumper.Indent_v == 0 else "\n"
return spacer.join(result)
[docs]def dup(file,mode,checked=True,equals=False,encoding=None,errors=None):
"""Replacement for perl built-in open function when the mode contains '&'. Keyword arg
'checked' means the result will be checked. Keyword arg 'equals' means that '&=' was specified,
so skip the os.dup operation."""
global OS_ERROR, TRACEBACK, AUTODIE
try:
if isinstance(file, io.IOBase): # file handle
file.flush()
if encoding is None:
encoding = file.encoding
if errors is None:
errors = file.errors
if equals:
return os.fdopen(file.fileno(), mode, encoding=encoding, errors=errors)
return os.fdopen(os.dup(file.fileno()), mode, encoding=encoding, errors=errors)
if isinstance(file, int):
pass
elif (_m:=re.match(r'=?(\d+)', file)):
file = int(_m.group(1))
elif file in _DUP_MAP:
file = _DUP_MAP[file]
if equals:
return _create_fh_methods(os.fdopen(file, mode, encoding=encoding, errors=errors))
return _create_fh_methods(os.fdopen(os.dup(file), mode, encoding=encoding, errors=errors))
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"dup failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
if checked:
return None
fh = io.StringIO()
fh.close()
return _create_fh_methods(fh)
[docs]def each(h_a):
"""See https://perldoc.perl.org/functions/each"""
key = str(id(h_a)) # Unique memory address of object
if not hasattr(each, key):
setattr(each, key, iter(h_a))
it = getattr(each, key)
try:
v = next(it)
except StopIteration:
setattr(each, key, iter(h_a))
return []
if hasattr(h_a, 'TIEHASH') or \
((hasattr(h_a, 'keys') and not hasattr(h_a, 'isHash')) or
(hasattr(h_a, 'isHash') and h_a.isHash)):
return [v, h_a[v]]
ndx_key = key + 'i'
i = 0;
if hasattr(each, ndx_key):
i = getattr(each, ndx_key)
setattr(each, ndx_key, i+1)
return [i, v]
[docs]def encode(encoding, string, check=None):
"""Implementation of Encode::encode"""
if check is None:
check = 0
elif callable(check):
name = str(id(check))
try:
codecs.lookup_error(name)
except LookupError:
def handler(uee):
slc = uee.object[uee.start:uee.end]
if not isinstance(slc, bytes):
slc = bytes(slc,encoding='latin-1')
return (check(*slc), uee.end)
codecs.register_error(name, handler)
return string.encode(encoding, errors=name).decode('latin-1')
else:
check = int_(check)
if check & Encode.FB_CROAK():
try:
return string.encode(encoding).decode('latin-1')
except Exception as e:
croak(str(e))
elif (check & 7) == Encode.FB_WARN():
try:
return string.encode(encoding).decode('latin-1')
except UnicodeError as e:
print(str(e), file=sys.stderr)
return string[:e.start].encode(encoding).decode('latin-1')
elif (check & 7) == Encode.FB_QUIET():
try:
return string.encode(encoding).decode('latin-1')
except UnicodeError as e:
return string[:e.start].encode(encoding).decode('latin-1')
else:
return string.encode(encoding, errors='replace').decode('latin-1')
[docs]def encode_utf8(string):
"""Implementation of Encode::encode_utf8"""
return string.encode(errors='replace').decode('latin-1')
[docs]def encodings(get=None):
"""Implementation of Encode::encodings"""
# list from https://stackoverflow.com/questions/1728376/get-a-list-of-all_e-the-encodings-python-can-encode-to
all_e = ['ascii', 'big5', 'big5hkscs', 'cp037', 'cp273', 'cp424', 'cp437', 'cp500', 'cp720', 'cp737', 'cp775', 'cp850', 'cp852', 'cp855', 'cp856', 'cp857', 'cp858', 'cp860', 'cp861', 'cp862', 'cp863', 'cp864', 'cp865', 'cp866', 'cp869', 'cp874', 'cp875', 'cp932', 'cp949', 'cp950', 'cp1006', 'cp1026', 'cp1125', 'cp1140', 'cp1250', 'cp1251', 'cp1252', 'cp1253', 'cp1254', 'cp1255', 'cp1256', 'cp1257', 'cp1258', 'euc_jp', 'euc_jis_2004', 'euc_jisx0213', 'euc_kr', 'gb2312', 'gbk', 'gb18030', 'hz', 'iso2022_jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_2004', 'iso2022_jp_3', 'iso2022_jp_ext', 'iso2022_kr', 'latin_1', 'iso8859_2', 'iso8859_3', 'iso8859_4', 'iso8859_5', 'iso8859_6', 'iso8859_7', 'iso8859_8', 'iso8859_9', 'iso8859_10', 'iso8859_11', 'iso8859_13', 'iso8859_14', 'iso8859_15', 'iso8859_16', 'johab', 'koi8_r', 'koi8_t', 'koi8_u', 'kz1048', 'mac_cyrillic', 'mac_greek', 'mac_iceland', 'mac_latin2', 'mac_roman', 'mac_turkish', 'ptcp154', 'shift_jis', 'shift_jis_2004', 'shift_jisx0213', 'utf_32', 'utf_32_be', 'utf_32_le', 'utf_16', 'utf_16_be', 'utf_16_le', 'utf_7', 'utf_8', 'utf_8_sig']
if not get or get == ':all':
return all_e
else:
if get.startswith('Encode::'):
get = get[8:]
get = get.lower().replace('-', '_')
list = []
for e in all_e:
if get in e:
list.append(e)
return list
[docs]def eof(fh):
global AUTODIE, TRACEBACK
"""Implementation of perl eof"""
try:
pos = fh.tell()
return (pos == os.path.getsize(fh))
except Exception as e:
if TRACEBACK:
cluck(f"eof failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return 1
[docs]def exc(e):
"""Exception information like perl, e.g. message at issue_42.pl line 21."""
try:
m = str(e)
if m.endswith('\n'):
return m
return f"{m} at {os.path.basename(sys.exc_info()[2].tb_frame.f_code.co_filename)} line {sys.exc_info()[2].tb_lineno}.\n"
except Exception:
return str(e)
[docs]def exec_(lst):
"""Implementation of perl exec with a list"""
global OS_ERROR, TRACEBACK
try:
if isinstance(lst, str):
lst = lst.split()
program = lst[0]
program = (program.split())[0]
execp(program, lst)
except TypeError:
OS_ERROR = "Undefined list on exec"
if TRACEBACK:
cluck(f"exec({lst}) failed: {OS_ERROR}", skip=2)
except IndexError:
OS_ERROR = "Empty list on exec"
if TRACEBACK:
cluck(f"exec({lst}) failed: {OS_ERROR}", skip=2)
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"exec({lst}) failed: {OS_ERROR}", skip=2)
[docs]def execp(program, lst):
"""Implementation of perl exec with a program and a list"""
global OS_ERROR, TRACEBACK
try:
sys.stdout.flush()
sys.stderr.flush()
except Exception:
pass
try:
os.execvp(program, list(lst))
except OSError: # checkif we're trying to run a perl or python script on Windows
if isinstance(program, str):
program_split = program.split()[0]
if program_split.endswith('.py'):
lst = [program] + lst
program = sys.executable
elif program_split.endswith('.pl'):
lst = [program] + lst
program = 'perl'
else:
raise
os.execvp(program, list(lst))
else:
raise
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"exec({program, lst}) failed: {OS_ERROR}", skip=2)
[docs]def exponentiate_element(base, index, value):
base[index] **= value
return base[index]
[docs]def fcntl(fh, func, scalar):
global AUTODIE, TRACEBACK, OS_ERROR
"""Implementation of perl fcntl"""
try:
result = fc_py.fcntl(fh, func, scalar)
if result == 0:
return "0 but true"
if result == -1:
return None
return result
except Exception as e:
OS_ERROR = str(e)
if TRACEBACK:
cluck(f"fcntl failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return None
[docs]def fdopen(fh, fd, mode):
"""Implementation of $fh->fdopen(fd, mode)"""
if isinstance(fd, str) and re.match(r'^\d+$', fd):
fd = int(fd)
if isinstance(fd, int):
fd = f'={fd}'
if fh and not fh.closed:
fh.close()
return _create_all_fh_methods(open_dynamic(_open_mode_string(mode) + '&' + fd))
[docs]def fetch_out_parameter(arg):
"""Fetch the value of a sub out parameter from the
location where _store_out_parameter saved it. This is called after
the sub returns. arg is the argument index, starting at 0.
Returns the value we saved."""
try:
result = getattr(builtins, f"__outp{arg}__")
delattr(builtins, f"__outp{arg}__")
return result
except Exception:
return None
[docs]def fetch_out_parameters(var, start=0):
"""Fetch the values of all sub out parameters from the
location where _store_out_parameter saved them. This is called after
the sub returns. var is the array or hash to store them in. start is
the argument starting index, defaulting to 0.
Returns the array or hash we saved."""
if (hasattr(var, 'isHash') and var.isHash) or (not hasattr(var, 'isHash') and isinstance(var, collections.abc.Mapping)):
ln = len(var.keys())
var_copy = var.copy()
var.clear()
missing = 0;
for arg in range(0, ln*2, 2):
try:
key = getattr(builtins, f"__outp{arg+start}__")
delattr(builtins, f"__outp{arg+start}__")
var[key] = getattr(builtins, f"__outp{arg+start+1}__")
delattr(builtins, f"__outp{arg+start+1}__")
except Exception:
missing += 1
if missing:
for k,v in var_copy.items():
if k not in var:
var[k] = v
missing -= 1
if missing == 0:
break
else:
ln = len(var)
var_copy = var.copy()
var.clear()
for arg in range(0, ln):
try:
var.append(getattr(builtins, f"__outp{arg+start}__"))
delattr(builtins, f"__outp{arg+start}__")
except Exception:
var.append(var_copy[arg])
return var
[docs]def fetch_perl_global(perlname):
"""Fetch the value of a package global variable specified by it's perl name"""
(packname, varname) = perlname.rsplit('::', maxsplit=1)
packname = packname.replace('::', '.')
if packname == '':
packname = 'main'
if not hasattr(builtins, packname):
init_package(packname)
namespace = getattr(builtins, packname)
if varname in _PYTHONIZER_KEYWORDS:
varname += '_'
if hasattr(namespace, varname):
return getattr(namespace, varname)
if varname == '' or varname == '_h': # They want the namespace dictionary
return namespace.__dict__
return None
_fileinput_iter = None
[docs]def fileno(fh):
global OS_ERROR, TRACEBACK, AUTODIE
try:
if isinstance(fh, list) and len(fh) == 2 and isinstance(fh[0], list) and isinstance(fh[1], int): # DIRHANDLE
raise TypeError("Directories have no associated fileno");
return fh.fileno()
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"fileno failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return None
[docs]def fileparse(*args):
"""Split a path into basename, dirpath, and (optional) suffixes.
Translated from perl File::Basename for unix, plus annotations"""
fullname = args[0]
suffixes = args[1:]
if fullname is None:
raise Die("fileparse(): need a valid pathname")
fullname = str(fullname)
[dirpath,basename] = (_m:=re.search(re.compile(r'^(.*/)?(.*)',re.S),fullname),_m.groups() if _m else [None,None])[1]
if not (dirpath):
dirpath = './'
tail=''
suffix=''
if suffixes:
for suffix in suffixes:
if(isinstance(suffix, re.Pattern)): # in case they use qr
suffix = suffix.pattern
pat=f"({suffix})$"
def sub(_m):
nonlocal tail
tail = _m.group(1) + tail
return ''
basename = re.sub(re.compile(pat,re.S),sub,basename,count=1)
return (basename, dirpath, tail)
[docs]def file_exists(path): # -e
if not path:
return '' # False
if hasattr(path, 'cando'):
return 1 # True
return 1 if os.path.exists(path) else ''
[docs]def file_size(path): # -s
if not path:
return None
if hasattr(path, '_size'):
return path._size
return os.path.getsize(path)
[docs]def filter_map(f, i):
"""Given a function f that returns a tuple of (new_val, include) and
an iterable i, return an iterable of new_vals where include is True"""
for v in i:
(new_val, include) = f(v)
if include:
yield new_val
_finditer_pattern = None
_finditer_string = None
_finditer_iter = None
[docs]def finditer_next(pattern, string, flags=0):
"""Implementation of re.finditer() where it can be called multiple times for the same pattern and string"""
global _finditer_iter, _finditer_pattern, _finditer_string
if _finditer_pattern != pattern or _finditer_string != string:
_finditer_iter = None
if _finditer_iter is None:
_finditer_iter = re.finditer(pattern, string, flags)
_finditer_pattern = pattern
_finditer_string = string
result = next(_finditer_iter, None)
if result is None:
_finditer_iter = None
return result
[docs]def find_encoding(encoding):
"""Implementation of Encode::find_encoding"""
try:
info = codecs.lookup(encoding)
if hasattr(info, '_obj'): # We defined it
return info._obj
decod = functools.partial(decode, encoding)
encod = functools.partial(encode, encoding)
name = lambda: encoding
mime_name = lambda: info.name
return type('Encode.Encoding',
tuple(),
dict(decode=decod, encode=encod, name=name, mime_name=mime_name))
except LookupError:
return None
[docs]def find_mime_encoding(encoding):
"""Implementation of Encode::find_mime_encoding"""
obj = find_encoding(encoding)
if hasattr(obj, 'mime_name'):
mime_name = obj.mime_name()
def normalize(enc):
# Make sure 'ISO-8859-1' matches 'iso8859-1'
return enc.lower().replace('-', '').replace('_', '')
if normalize(mime_name) != normalize(encoding):
return None
return obj
[docs]def flatten(lst):
"""Flatten a list down to 1 level"""
result = []
if (not isinstance(lst, collections.abc.Iterable)) or isinstance(lst, str):
return [lst]
for elem in lst:
if hasattr(elem, 'isHash'): # Array or Hash
result = Array(result)
if elem.isHash:
for e in itertools.chain.from_iterable(elem.items()):
result.extend(flatten(e))
else:
for e in elem:
result.extend(flatten(e))
elif isinstance(elem, collections.abc.Mapping):
for e in itertools.chain.from_iterable(elem.items()):
result.extend(flatten(e))
elif isinstance(elem, collections.abc.Iterable) and not isinstance(elem, str):
for e in elem:
result.extend(flatten(e))
else:
result.append(elem)
return result
[docs]def flock(fd, operation):
""" Replacement for perl Fcntl flock function"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
# To avoid the possibility of miscoordination, Perl now flushes FILEHANDLE before locking or unlocking it.
fd.flush()
fc_py.flock(fd, operation)
return 1
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"flock failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return ''
[docs]def flt(expr):
"""Convert expr to a float number
Ref: https://squareperl.com/en/how-perl-convert-string-to-number"""
if not expr:
return 0
try:
return +expr # Unary plus: The fastest way to test for numeric
except Exception:
pass
for _ in range(2):
try:
f = float(expr)
return f
except Exception:
pass
if isinstance(expr, str):
if not (m:=re.match(r'^\s*([+-]?(?:\d+(?:[.]\d*)?(?:[eE][+-]?\d+)?|[.]\d+(?:[eE][+-]?\d+)?))', expr)):
break
expr = m.group(1);
elif isinstance(expr, bytes):
if not (m:=re.match(br'^\s*([+-]?(?:\d+(?:[.]\d*)?(?:[eE][+-]?\d+)?|[.]\d+(?:[eE][+-]?\d+)?))', expr)):
break
expr = m.group(1);
else:
return expr
if WARNING == 2:
die(f"Argument \"{expr}\" isn't numeric in numeric context", skip=1)
if WARNING:
#caller = inspect.getframeinfo(inspect.stack()[1][0])
warn(f"Argument \"{expr}\" isn't numeric in numeric context", skip=1)
return 0
[docs]def from_to(octets, from_enc, to_enc, check=None):
"""Implementation of Encode::from_to"""
result = encode(to_enc, decode(from_enc, octets), check)
return (result, len(result))
[docs]def getc(fh):
"""Implementation of perl getc"""
fh._last_pos = fh.tell() # for ungetc
return fh.read(1)
[docs]def getpos(fh):
"""Implementation of perl $fh->getpos"""
return fh.tell()
[docs]def getsignal(signum):
"""Handle references to %SIG not on the LHS of expression"""
result = signal.getsignal(signum)
if result == signal.SIG_IGN:
return 'IGNORE'
elif result == signal.SIG_DFL:
return 'DEFAULT'
return result
[docs]def get_access_age_days(path): # -A
"""Implementation of perl -A"""
global OS_ERROR, TRACEBACK, AUTODIE
if not path:
return None
if hasattr(path, '_atime'):
t = path._atime
else:
try:
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
t = os.path.getatime(path)
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"-A {path} failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return 0
return (BASETIME - t) / 86400.0
[docs]def get_creation_age_days(path): # -C
"""Implementation of perl -C"""
global OS_ERROR, TRACEBACK, AUTODIE
if not path:
return None
if hasattr(path, '_ctime'):
t = path._ctime
else:
try:
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
t = os.path.getctime(path)
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"-C {path} failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return 0
return (BASETIME - t) / 86400.0
[docs]def get_element(base, index):
"""Safe element getter from a list, tuple, or Array - returns None if the element doesn't exist"""
if index < 0:
index += len(base)
if index >= 0 and index < len(base):
return base[index]
return None
[docs]def get_layers(fh):
"""Implementation of PerlIO::get_layers"""
result = ['unix', 'perlio']
if fh.encoding == 'UTF-8':
if fh.errors == 'strict':
result.append('encoding(utf-8-strict)')
else:
result.append('encoding(utf-8)')
result.append('utf8')
if fh.newlines == "\n":
result.append('lf')
elif fh.newlines == "\r\n":
result.append('crlf')
return result
[docs]def get_mod_age_days(path): # -M
"""Implementation of perl -M"""
global OS_ERROR, TRACEBACK, AUTODIE
if not path:
return None
if hasattr(path, '_mtime'):
t = path._mtime
else:
try:
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
t = os.path.getmtime(path)
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"-M {path} failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return 0
return (BASETIME - t) / 86400.0
[docs]def get_subref(ref):
"""Convert a sub reference to a callable sub. 'ref' may already
be callable or it could be the name of the sub. Returns None if
the sub isn't callable"""
if callable(ref):
return ref
if isinstance(ref, str):
ref = ref.replace('::', '.').replace("'", '.')
ld = ref.rfind('.')
if ld == -1:
def caller_globals():
frame = inspect.currentframe()
try:
caller_frame = frame.f_back
return caller_frame.f_globals
finally:
del frame
glb = caller_globals()
if ref in _PYTHONIZER_KEYWORDS:
ref += '_'
if ref in glb:
result = glb[ref]
if callable(result):
return result
packname = builtins.__PACKAGE__
sub = ref
else:
packname = ref[0:ld]
if packname == '':
packname = 'main'
sub = ref[ld+1:]
if sub in _PYTHONIZER_KEYWORDS:
sub += '_'
if hasattr(builtins, packname):
namespace = getattr(builtins, packname)
if hasattr(namespace, sub):
result = getattr(namespace, sub)
if callable(result):
return result
return None
[docs]def gmtime(secs=None):
"""Replacement for perl built-in gmtime function"""
try:
gmt = tm_py.gmtime(secs)
except Exception:
try:
import datetime
dt = datetime.datetime.utcfromtimestamp(0) + datetime.timedelta(seconds=secs)
gmt = dt.timetuple()
except Exception:
return (9, 9, 9, 9, 9, 99999, 0, 9, 0)
return (gmt.tm_sec, gmt.tm_min, gmt.tm_hour, gmt.tm_mday,
gmt.tm_mon-1, gmt.tm_year-1900, (gmt.tm_wday+1)%7,
gmt.tm_yday-1, 0)
[docs]def handle_open_pragma(mode, encoding, errors, newline="\n"):
"""Handle any "use open" pragma that may be in effect"""
if encoding is not None:
return (mode, encoding, errors, newline)
layers = None
if ('r' in mode or mode == '-|') and INPUT_LAYERS:
layers = INPUT_LAYERS
elif OUTPUT_LAYERS:
layers = OUTPUT_LAYERS
else:
return (mode, encoding, errors, newline)
layers = layers.replace(':', '')
if layers == 'raw' or layers == 'bytes':
if 'b' not in mode:
mode += 'b'
newline = None
elif layers.startswith('encoding('):
encoding = layers.replace('encoding(','').replace(')','')
errors = 'replace'
elif layers == 'utf8':
encoding = 'UTF-8'
errors = 'ignore'
elif layers == 'crlf':
newline = None
return (mode, encoding, errors, newline)
[docs]def has_setgid(path): # -g
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if (path._mode & st_py.S_ISGID) != 0 else ''
return 1 if (os.stat(path).st_mode & st_py.S_ISGID) != 0 else ''
[docs]def has_setuid(path): # -u
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if (path._mode & st_py.S_ISUID) != 0 else ''
return 1 if (os.stat(path).st_mode & st_py.S_ISUID) != 0 else ''
[docs]def has_sticky(path): # -k
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if (path._mode & st_py.S_ISVTX) != 0 else ''
return 1 if (os.stat(path).st_mode & st_py.S_ISVTX) != 0 else ''
[docs]def hires_alarm(floating_seconds, interval_floating_seconds=0):
"""Implementation of Time::HiRes::alarm"""
if interval_floating_seconds == 0:
signal.setitimer(signal.ITIMER_REAL, floating_seconds)
return floating_seconds
else:
import threading
def send_sigalrm(start_time, interval):
signal.raise_signal(signal.SIGALRM)
current_time = tm_py.time()
elapsed_time = current_time - start_time
next_interval = interval - (elapsed_time % interval)
t = threading.Timer(next_interval, send_sigalrm, [start_time, interval])
t.start()
start_time = tm_py.time() + floating_seconds
t = threading.Timer(floating_seconds, send_sigalrm, [start_time, interval_floating_seconds])
t.start()
return floating_seconds
[docs]def hires_clock():
"""Implementation of Time::HiRes::clock"""
return tm_py.process_time()
[docs]def hires_clock_getres(which):
"""Implementation of Time::HiRes::clock_getres"""
if not which or not hasattr(tm_py, 'clock"getres'):
return tm_py.get_clock_info('time').resolution
return tm_py.clock_getres(which)
[docs]def hires_clock_gettime(which):
"""Implementation of Time::HiRes::clock_gettime"""
if not which or not hasattr(tm_py, 'clock_gettime'):
return tm_py.time()
return tm_py.clock_gettime(which)
[docs]def hires_clock_nanosleep(which, nanoseconds, flags=0):
if flags:
nanoseconds = (nanoseconds / 1_000_000_000 - tm_py.time()) * 1_000_000_000
start_time = tm_py.time()
if nanoseconds > 0:
tm_py.sleep(nanoseconds / 1_000_000_000)
if flags:
return tm_py.time() * 1_000_000_000
return (tm_py.time() - start_time) * 1_000_000_000
[docs]def hires_getitimer_s(which):
"""Implementation of Time::HiRes::getitimer in scalar context"""
return signal.getitimer(which)[0]
[docs]def hires_gettimeofday():
"""Implementation of Time::HiRes::gettimeofday in list context"""
current_time = tm_py.time()
seconds, fraction = divmod(current_time, 1)
microseconds = int(fraction * 1_000_000)
return (seconds, microseconds)
[docs]def hires_lstat(path):
"""Implementation of Time::HiRes::lstat"""
try:
if hasattr(path, 'fileno') and os.lstat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
s = os.lstat(path)
except Exception:
return ()
result = (s.st_dev, s.st_ino, s.st_mode,
s.st_nlink, s.st_uid, s.st_gid,
s.st_rdev if hasattr(s, 'st_rdev') else 0,
s.st_size,
s.st_atime_ns / 1_000_000_000,
s.st_mtime_ns / 1_000_000_000,
s.st_ctime_ns / 1_000_000_000,
s.st_blksize if hasattr(s, 'st_blksize') else 512,
s.st_blocks if hasattr(s, 'st_blocks') else s.st_size // 512)
return result
[docs]def hires_nanosleep(nanoseconds):
"""Implementation of Time::HiRes::nanosleep"""
tm_py.sleep(nanoseconds / 1_000_000_000)
[docs]def hires_setitimer_s(which, floating_seconds, interval_floating_seconds=0):
"""Implementation of Time::HiRes::setitimer in scalar context"""
return signal.setitimer(which, floating_seconds, interval_floating_seconds)[0]
[docs]def hires_stat(path):
"""Implementation of Time::HiRes::stat"""
try:
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
s = os.stat(path)
except Exception:
return ()
result = (s.st_dev, s.st_ino, s.st_mode,
s.st_nlink, s.st_uid, s.st_gid,
s.st_rdev if hasattr(s, 'st_rdev') else 0,
s.st_size,
s.st_atime_ns / 1_000_000_000,
s.st_mtime_ns / 1_000_000_000,
s.st_ctime_ns / 1_000_000_000,
s.st_blksize if hasattr(s, 'st_blksize') else 512,
s.st_blocks if hasattr(s, 'st_blocks') else s.st_size // 512)
return result
[docs]def hires_tv_interval(t0, t1=None):
"""Implementation of Time::HiRes::tv_interval"""
if t1 is None:
t1 = _gettimeofday()
return (t1[0] - t0[0]) + (t1[1] - t0[1]) / 1_000_000
[docs]def hires_ualarm(useconds, interval_useconds=0):
"""Implementation of Time::HiRes::ualarm"""
return hires_alarm(useconds / 1_000_000, interval_useconds / 1_000_000)
[docs]def hires_usleep(useconds):
"""Implementation of Time::HiRes::usleep"""
tm_py.sleep(useconds / 1_000_000)
[docs]def hires_utime(atime, mtime, *args):
"""Implementation of Time::HiRes::utime function"""
global TRACEBACK, AUTODIE, OS_ERROR
result = 0
OS_ERROR = ''
ntimes = None
if atime is None and mtime is None:
pass
elif atime is None:
atime = 0
elif mtime is None:
mtime = 0
ntimes = (int(atime*1_000_000_000), int(mtime*1_000_000_000))
for fd in args:
try:
if hasattr(fd, 'fileno') and os.utime in os.supports_fd:
fd = fd.fileno()
elif hasattr(fd, 'name'):
fd = fd.name
os.utime(fd, ns=ntimes)
result += 1
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"Time::HiRes::utime({atime}, {mtime}, {fd}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return result
[docs]def import_(globals, path, module=None, fromlist=None, version=None, is_do=False):
"""Handle use/require statement from perl. 'path' is the relative or absolute path to the .py file
of the module (the extension is ignored if specified). If 'module' is specified, then that is
effectively added to the path, 'fromlist' is the list of desired functions to import. 'version'
will perform a version check. 'is_do' handles a 'do EXPR;' statement."""
global OS_ERROR, EVAL_ERROR
if not hasattr(builtins, '__PACKAGE__'):
caller_package = 'main'
else:
caller_package = builtins.__PACKAGE__
pathname = None
path = path.replace('::', '/')
if module is not None:
path = f'{path}/{module}'
module = None
if not os.path.isabs(path):
path = os.path.splitext(path)[0]
pathname = path.replace('.', '').replace('/', '.')
if pathname[0] == '.':
pathname = pathname[1:]
if path[0] == '.':
pass
else:
for pa in sys.path:
if os.path.isfile(os.path.join(pa, path, '__init__.py')):
path = os.path.join(pa, path)
break
elif os.path.isfile(os.path.join(pa, f'{path}.py')):
path = os.path.join(pa, path)
break
else:
if not is_do:
msg = f"Can't locate {path}.py in sys.path (sys.path contains: {' '.join(sys.path)})"
raise ImportError(msg)
[path, module] = os.path.split(os.path.splitext(os.path.abspath(path))[0])
if is_do:
sys.modules.pop(module, None)
if module in sys.modules and \
hasattr((mod:=sys.modules[module]), '__file__') and \
os.path.join(path, module) + '.py' == mod.__file__:
pass
else:
try:
sys.path.insert(0, path)
mod = __import__(module, globals=globals, fromlist=['*'])
sys.modules[module] = mod
except ImportError as _i:
if is_do:
OS_ERROR = str(_i)
return None
else:
raise
except Exception as _e:
if is_do:
EVAL_ERROR = str(_e)
return None
else:
raise
finally:
sys.path.pop(0)
if hasattr(mod, 'VERSION') and version is not None:
if isinstance(version, str) and version[0] == 'v':
version = version[1:]
try:
version = float(version)
except Exception:
version = 0.0
mod_version = None
try:
mod_version = float(mod.VERSION)
except Exception:
pass
if mod_version is not None and version > mod_version:
raise ValueError(f"For import {module}, desired version {version} > actual version {mod_version} at {path}")
# globals[module] = mod
if fromlist is None:
return 1 # use X ();
if not isinstance(fromlist, (list, tuple)):
fromlist = [fromlist]
actual_imports = set()
export = ()
export_ok = ()
export_tags = dict()
for pn in (pathname, builtins.__PACKAGE__): # builtins.__PACKAGE__ is now the module's package, not ours
if pn is not None and hasattr(builtins, pn):
module_namespace = getattr(builtins, pn)
if hasattr(module_namespace, 'EXPORT_a'):
export = getattr(module_namespace, 'EXPORT_a')
if hasattr(module_namespace, 'EXPORT_OK_a'):
export_ok = getattr(module_namespace, 'EXPORT_OK_a')
if hasattr(module_namespace, 'EXPORT_TAGS_h'):
export_tags = getattr(module_namespace, 'EXPORT_TAGS_h')
builtins.__PACKAGE__ = caller_package
if (fromlist[0] == '*' or fromlist[0] == ':all') and hasattr(mod, '__all__'):
actual_imports = set(mod.__all__)
elif fromlist[0] == '*' and not export:
for key in mod.__dict__.keys():
if callable(mod.__dict__[key]) and key[0] != '_':
actual_imports.add(key)
else:
# This should mirror the code in pythonizer expand_extras:
for desired in fromlist:
if (ch:=desired[0]) == '!':
if desired == fromlist[0]:
actual_imports = set(export)
ch2 = desired[1]
if ch2 == ':':
tag = desired[2:]
if tag in export_tags:
for e in export_tags[tag][0]:
actual_imports.discard(e)
elif ch2 == '/':
pat = re.compile(desired[2:-1])
for e in (export + export_ok):
if re.search(pat, e):
actual_imports.discard(e)
else:
actual_imports.discard(desired[1:])
elif ch == ':':
tag = desired[1:]
if tag in export_tags:
for e in export_tags[tag][0]:
actual_imports.add(e)
elif tag == 'DEFAULT':
actual_imports.update(set(export))
elif ch == '/':
pat = re.compile(desired[1:-1])
for e in (export + export_ok):
if re.search(pat, e):
actual_imports.add(e)
elif desired == '*':
actual_imports.update(set(export))
elif ch == '-' or not re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', desired):
pass
else:
actual_imports.add(desired)
actual_imports = list(actual_imports)
sig_map = {'$': '_v', '@': '_a', '%': '_h'}
for i in range(len(actual_imports)):
perl_name = actual_imports[i]
sig = perl_name[0]
if sig == '&':
perl_name = perl_name[1:]
if hasattr(mod, perl_name+'_'):
actual_imports[i] = perl_name+'_'
elif sig in ('$', '@', '%'):
perl_name = perl_name[1:]
sm = sig_map[sig]
if hasattr(mod, perl_name+sm):
actual_imports[i] = perl_name+sm
elif hasattr(mod, perl_name+'_'):
actual_imports[i] = perl_name+'_'
elif hasattr(mod, perl_name+'_'):
actual_imports[i] = perl_name+'_'
namespace = None
if not hasattr(builtins, caller_package):
init_package(caller_package)
namespace = getattr(builtins, caller_package)
for imp in actual_imports:
if hasattr(mod, imp):
mi = getattr(mod, imp)
globals[imp] = mi
if namespace:
setattr(namespace, imp, mi)
return 1
[docs]def init_global(packname, varname, value):
"""Return the proper value to initialize a package global variable only once"""
namespace = getattr(builtins, packname)
if hasattr(namespace, varname):
return getattr(namespace, varname)
setattr(namespace, varname, value)
return value
[docs]def init_out_parameters(arglist, *_args):
"""Initialize sub's out parameters. Pass the arglist of the sub
and a list of the sub's out parameters, counting from 0. If no
list is passed, then all args are assumed to be out parameters"""
if len(_args) == 0:
for i in range(len(arglist)):
try:
setattr(builtins, f"__outp{i}__", arglist[i])
except Exception:
pass
return
for i in _args:
try:
setattr(builtins, f"__outp{i}__", arglist[i])
except Exception:
pass
[docs]def int_(expr):
"""Convert expr to an integer"""
if not expr:
return 0
if isinstance(expr, int):
return +expr # The Unary plus will convert True to 1
try:
return int(expr)
except Exception:
pass
if not isinstance(expr, (str, bytes)):
if isinstance(expr, complex):
return int_(expr.real)
return expr
if (m:=re.match(r'^\s*([+-]?(?:\d+))', expr)):
return int(m.group(1))
if WARNING == 2:
die(f"Argument \"{expr}\" isn't numeric in integer context", skip=1)
if WARNING:
#caller = inspect.getframeinfo(inspect.stack()[1][0])
warn(f"Argument \"{expr}\" isn't numeric in integer context", skip=1)
return 0
[docs]def ioctl(fh, func, scalar):
global AUTODIE, TRACEBACK, OS_ERROR
"""Implementation of perl ioctl"""
try:
result = fc_py.ioctl(fh, func, scalar)
if result == 0:
return "0 but true"
if result == -1:
return None
return result
except Exception as e:
OS_ERROR = str(e)
if TRACEBACK:
cluck(f"ioctl failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return None
def _create_all_fh_methods(fh):
"""Create all special methods for OO filehandles"""
methods=dict(autoflush=autoflush,binmode=binmode, close_=close_, eof=eof,
fcntl=fcntl, format_write=format_write, getc=getc,
getpos=getpos, ioctl=ioctl, input_line_number=input_line_number,
open=IOFile_open, print_=print_, printf=printf, say=say, setpos=setpos,
# READ is handled specially because of the output scalar: read=read,
stat=stat,
# SYSREAD needs to be handled like READ sysread=sysread,
sysseek=sysseek, syswrite=syswrite,
truncate=truncate, ungetc=ungetc, write_=write_,
)
for method, func in methods.items():
setattr(fh, method, types.MethodType(func, fh))
fh.getline = fh.readline
fh.getlines = fh.readlines
return fh
[docs]def IOFile(path=None, mode=None, perms=None):
"""Implementation of IO::File->new()"""
global TRACEBACK, AUTODIE
try:
if path is None:
fh = io.TextIOWrapper(io.BufferedIOBase())
fh.close()
return _create_all_fh_methods(fh)
if perms is None:
perms = 0o777
#fh = os.fdopen(os.open(path, mode, perms))
fh = IOFile_open(path, mode, perms)
return _create_all_fh_methods(fh)
except Exception as e:
if TRACEBACK:
if perms is None:
if mode is None:
cluck(f"IO::File->new({path}) failed: {OS_ERROR}",skip=2)
else:
cluck(f"IO::File->new({path}, {mode}) failed: {OS_ERROR}",skip=2)
else:
cluck(f"IO::File->new({path}, {mode}, {perms}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
fh = io.TextIOWrapper(io.BufferedIOBase())
fh.close()
return _create_all_fh_methods(fh)
[docs]def IOFile_from_fd(fd, mode):
"""Implementation of IO::File::new_from_fd()"""
global TRACEBACK, AUTODIE
try:
return fdopen(None, fd, mode)
except Exception as e:
if TRACEBACK:
cluck(f"IO::File::new_from_fd({fd}, {mode}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return None
def _open_mode_string(mode):
if not ((_m:=re.search(r'^\+?(<|>>?)$',mode))):
if not (mode:=re.sub(r'^r(\+?)$',r'\g<1><',mode, count=1)):
if not (mode:=re.sub(r'^w(\+?)$',r'\g<1>>',mode, count=1)):
if not (mode:=re.sub(r'^a(\+?)$',r'\g<1>>>',mode, count=1)):
croak(f"IO::Handle: bad open mode: {mode}")
return mode
[docs]def IOFile_open(fh, filename, mode=None, perms=None):
"""Implementation of perl $fh->open method"""
if mode is not None:
if isinstance(mode, str) and re.match(r'^\d+$', mode):
mode = int(mode)
if perms is None:
perms = 0o666
result = os.fdopen(os.open(filename, mode, perms))
elif ':' in mode:
result = open_dynamic(filename, mode, checked=False)
else:
result = open_dynamic(filename, _open_mode_string(mode), checked=False)
else:
encoding = errors = None
if hasattr(fh, 'encoding'):
encoding = fh.encoding
errors = fh.errors
result = open_dynamic(filename,encoding=encoding,errors=errors)
if not fh.closed:
fh.close()
return _create_all_fh_methods(result)
[docs]def IOFile_tmpfile():
"""Implementation of IO::File->new_tmpfile"""
fh = tempfile.NamedTemporaryFile()
return _create_all_fh_methods(fh)
[docs]def isa(self, classname):
"""Implementation of UNIVERSAL::isa and $obj->isa and $cls->isa"""
if hasattr(self, 'isa'):
return self.isa(classname)
_ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR',
"<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR',
"<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY',
"<class 'dict'>": 'HASH'}
t = str(type(self))
if t in _ref_map:
return 1 if _ref_map[t] == classname else ''
elif '_ArrayHash' in t:
if self.isHash:
return 1 if 'HASH' == classname else ''
return 1 if 'ARRAY' == classname else ''
elif classname == 'IO::Handle':
return 1 if isinstance(self, io.IOBase) else ''
elif classname == 'UNIVERSAL':
return 1
elif classname == 'GLOB':
# Assume all file handles and subs are globs
return 1 if isinstance(self, io.IOBase) or callable(self) else ''
elif classname == 'CODE':
return 1 if callable(self) else ''
classname = classname.replace("'", '.').replace('::', '.')
if hasattr(builtins, classname):
the_class = getattr(builtins, classname)
if isinstance(the_class, type): # make sure it's a class and not a namespace
if isinstance(self, the_class):
return 1
if isinstance(self, type) and issubclass(self, the_class):
return 1
elif self == the_class:
return 1
else:
if hasattr(self, 'ISA_a'):
for parent_name in getattr(self, 'ISA_a'):
parent_name = parent_name.replace("'", '.').replace('::', '.')
if hasattr(builtins, parent_name):
return isa(getattr(builtins, parent_name), classname)
return ''
return '' # False
[docs]def isa_op(self, classname):
"""Implementation of isa operator"""
if hasattr(classname, '__name__'):
classname = classname.__name__
elif not isinstance(classname, str) and hasattr(classname, '__class__'):
classname = classname.__class__.__name__
if hasattr(self, 'isa'):
return self.isa(classname)
_ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR',
"<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR',
"<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY',
"<class 'dict'>": 'HASH'}
if isinstance(self, type):
return '' # isa operator needs an object on the LHS, not a class
t = str(type(self))
if t in _ref_map:
return 1 if _ref_map[t] == classname else ''
elif '_ArrayHash' in t:
if self.isHash:
return 1 if 'HASH' == classname else ''
return 1 if 'ARRAY' == classname else ''
elif classname == 'IO::Handle' or classname == 'IO.Handle':
return 1 if isinstance(self, io.IOBase) else ''
elif classname == 'UNIVERSAL':
return 1
elif classname == 'GLOB':
# Assume all file handles and subs are globs
return 1 if isinstance(self, io.IOBase) or callable(self) else ''
elif classname == 'CODE':
return 1 if callable(self) else ''
classname = classname.replace("'", '.').replace('::', '.')
if hasattr(builtins, classname):
the_class = getattr(builtins, classname)
if isinstance(the_class, type): # make sure it's a class and not a namespace
if isinstance(self, the_class):
return 1
if isinstance(self, type) and issubclass(self, the_class):
return 1
elif self == the_class:
return '' # Not an object
else:
if hasattr(self, 'ISA_a'):
for parent_name in getattr(self, 'ISA_a'):
parent_name = parent_name.replace("'", '.').replace('::', '.')
if hasattr(builtins, parent_name):
return isa(getattr(builtins, parent_name), classname)
return ''
return '' # False
[docs]def is_block_special(path): # -b
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if st_py.S_ISBLK(path._mode) else ''
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if st_py.S_ISBLK(os.stat(path).st_mode) else ''
[docs]def is_char_special(path): # -c
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if st_py.S_ISCHR(path._mode) else ''
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if st_py.S_ISCHR(os.stat(path).st_mode) else ''
[docs]def is_dir(path): # -d
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if st_py.S_ISDIR(path._mode) else ''
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.path.isdir(path) else ''
[docs]def is_empty_file(path): # -z
if not path:
return None
if hasattr(path, '_size'):
return 1 if path._size == 0 else ''
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if not os.path.getsize(path) else ''
[docs]def is_executable(path): # -x
if not path:
return '' # False
if hasattr(path, 'cando'):
return 1 if path.cando(st_py.S_IXUSR, 1) else ''
if hasattr(path, 'fileno') and os.access in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.access(path, os.X_OK, effective_ids=(os.access in os.supports_effective_ids)) else ''
[docs]def is_file(path): # -f
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if st_py.S_ISREG(path._mode) else ''
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.path.isfile(path) else ''
[docs]def is_link(path): # -l
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if st_py.S_ISLNK(path._mode) else ''
if hasattr(path, 'fileno') and os.lstat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.path.islink(path) else ''
[docs]def is_owned(path): # -o
if not path:
return '' # False
if hasattr(path, '_uid'):
return 1 if path._uid == os.geteuid() else ''
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.stat(path).st_uid == os.geteuid() else ''
[docs]def is_pipe(path): # -p
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if st_py.S_ISFIFO(path._mode) else ''
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if st_py.S_ISFIFO(os.stat(path).st_mode) else ''
[docs]def is_readable(path): # -r
if not path:
return '' # False
if hasattr(path, 'cando'):
return 1 if path.cando(st_py.S_IRUSR, 1) else ''
if hasattr(path, 'fileno') and os.access in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.access(path, os.R_OK, effective_ids=(os.access in os.supports_effective_ids)) else ''
[docs]def is_real_executable(path): # -X
if not path:
return '' # False
if hasattr(path, 'cando'):
return 1 if path.cando(st_py.S_IXUSR, 0) else ''
if hasattr(path, 'fileno') and os.access in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.access(path, os.X_OK) else ''
[docs]def is_real_owned(path): # -O
if not path:
return '' # False
if hasattr(path, '_uid'):
return 1 if path._uid == os.getuid() else ''
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.stat(path).st_uid == os.getuid() else ''
[docs]def is_real_readable(path): # -R
if not path:
return '' # False
if hasattr(path, 'cando'):
return 1 if path.cando(st_py.S_IRUSR, 0) else ''
if hasattr(path, 'fileno') and os.access in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.access(path, os.R_OK) else ''
[docs]def is_real_writable(path):
if hasattr(path, 'cando'):
return 1 if path.cando(st_py.S_IRUSR, 0) else ''
if hasattr(path, 'fileno') and os.access in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.access(path, os.W_OK) else ''
[docs]def is_socket(path): # -S
if not path:
return '' # False
if hasattr(path, '_mode'):
return 1 if st_py.S_ISSOCK(path._mode) else ''
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if st_py.S_ISSOCK(os.stat(path).st_mode) else ''
[docs]def is_tty(path): # -t
if not path:
return '' # False
if hasattr(path, 'isatty'):
return 1 if path.isatty() else ''
if isinstance(path, tuple):
raise ValueError('-t not supported on File_stat')
if hasattr(path, 'name'):
path = path.name
try:
with open(path, 'r') as t:
return 1 if t.isatty() else ''
except Exception:
return '' # False
[docs]def is_utf8(s, check=False):
"""Implementation of Encode::is_utf8"""
if check:
return utf8_is_utf8(s)
try:
s = str(s)
if s.isascii():
return ''
s.encode()
except Exception:
return ''
return 1
[docs]def is_writable(path): # -w
if not path:
return '' # False
if hasattr(path, 'cando'):
return 1 if path.cando(st_py.S_IWUSR, 1) else ''
if hasattr(path, 'fileno') and os.access in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
return 1 if os.access(path, os.W_OK, effective_ids=(os.access in os.supports_effective_ids)) else ''
[docs]def kill(sig, *args):
"""Implementation of perl kill function"""
global AUTODIE, TRACEBACK, OS_ERROR
if isinstance(sig, str):
neg = 1
if sig.startswith('-'):
neg = -1
if not sig.startswith('SIG'):
sig = f"SIG{sig}"
if not sig in signal.Signals.__members__:
carp(f'Unrecognized signal name "{sig}"')
return 0
sig = signal.Signals.__members__[sig] * neg
result = 0
for pid in args:
try:
os.kill(pid, sig)
result += 1
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"kill({sig}, {pid}) failed: {OS_ERROR}", skip=2)
if AUTODIE:
raise
return result
[docs]def lcfirst(string):
"""Implementation of lcfirst and \l in interpolated strings: lowercase the first char of the given string"""
return string[0:1].lower() + string[1:]
[docs]def list_of_at_least_n(lst, n):
"""For assignment to (list, ..., *last) - make this list at least the right size."""
if lst is None or (hasattr(lst, 'isHash') and lst.isHash) or not (isinstance(lst, collections.abc.Sequence) and not isinstance(lst, str)):
lst = [lst]
la = len(lst)
if la >= n:
return lst
return list(lst) + [None for _ in range(n-la)]
[docs]def list_of_n(lst, n):
"""For assignment to (list, ...) - make this list the right size"""
if isinstance(lst, itertools.chain):
lst = list(lst)
if lst is None or (hasattr(lst, 'isHash') and lst.isHash) or not (isinstance(lst, collections.abc.Sequence) and not isinstance(lst, str)):
lst = [lst]
la = len(lst)
if la == n:
return lst
if la > n:
return lst[:n]
return list(lst) + [None for _ in range(n-la)]
[docs]def list_to_hash(lst):
"""Convert a flat list of key value pairs to a hash"""
return {lst[i]: lst[i+1] for i in range(0, len(lst), 2)};
[docs]def localtime(secs=None):
"""Replacement for perl built-in localtime function"""
try:
lct = tm_py.localtime(secs)
except Exception:
try:
import datetime
dt = datetime.datetime.fromtimestamp(0) + datetime.timedelta(seconds=secs)
lct = dt.timetuple()
except Exception:
return (9, 9, 9, 9, 9, 99999, 0, 9, 0)
return (lct.tm_sec, lct.tm_min, lct.tm_hour, lct.tm_mday,
lct.tm_mon-1, lct.tm_year-1900, (lct.tm_wday+1)%7,
lct.tm_yday-1, lct.tm_isdst)
[docs]def logical_xor(a, b):
"""Implementation of perl's xor operator"""
return 1 if (a or b) and not (a and b) else ''
[docs]def longmess(*args, skip=0):
"""Message with stack backtrace"""
def ff(fn):
fn = os.path.relpath(fn)
if fn.startswith('./'):
return fn[2:]
return fn
def fa(a):
result = re.sub(r'^\(\*_args=(.*)\)$', r'\1',a).replace(',)', ')')
if result == '[]':
return '()'
return result
stack = inspect.stack()
stack = stack[skip:]
m = ''.join(map(str, args))
m += ' at ' + ff(stack[1].filename) + ' line ' + str(stack[1].lineno) + ".\n"
for i in range(1, len(stack)-1):
s = stack[i]
s2 = stack[i+1]
m += ' ' + s.function+fa(inspect.formatargvalues(*inspect.getargvalues(s.frame))) + ' called at ' + ff(s2.filename) + ' line ' + str(s2.lineno) + "\n"
return m
[docs]def looks_like_binary(path): # -B
"""Implementation of perl -B"""
if isinstance(path, tuple):
return ValueError('-B not supported on File_stat')
return 1 if not looks_like_text(path) else ''
[docs]def looks_like_text(path): # -T
"""Implementation of perl -T"""
global TRACE_RUN
if not isinstance(path, str):
return ValueError('-T is only supported on paths')
rtn = subprocess.run(f'file "{path}"',capture_output=True,text=True,shell=(os.name!='nt'))
if TRACE_RUN:
carp(f'trace -T {path}: {repr(rtn)}', skip=2)
if rtn.returncode:
return None
rtn = rtn.stdout
return 1 if 'text' in rtn else ''
[docs]def lstat(path):
"""Handle lstat call with or without "use File::stat;" """
if isinstance(path, File_stat):
return path # for '_' special variable
try:
if hasattr(path, 'fileno') and os.lstat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
s = os.lstat(path)
except Exception:
return ()
result = File_stat(_dev=s.st_dev, _ino=s.st_ino, _mode=s.st_mode,
_nlink=s.st_nlink, _uid=s.st_uid, _gid=s.st_gid,
_rdev=s.st_rdev if hasattr(s, 'st_rdev') else 0,
_size=s.st_size, _atime=s.st_atime, _mtime=s.st_mtime, _ctime=s.st_ctime,
_blksize=s.st_blksize if hasattr(s, 'st_blksize') else 512,
_blocks=s.st_blocks if hasattr(s, 'st_blocks') else s.st_size // 512)
return result
[docs]def maketrans_c(arg1, arg2, delete=False):
"""Make a complement tr table for the 'c' flag. If the 'd' flag is passed, then delete=True. Ranges are expanded in arg1 and arg2 but arg2 is not otherwise normalized"""
t = str.maketrans(arg1, arg1)
d = dict()
for i in range(257):
if i not in t:
if not arg2:
if delete:
d[i] = None
else:
d[i] = i
elif i < len(arg2):
d[i] = arg2[i]
elif delete:
d[i] = None
else:
d[i] = arg2[-1]
return str.maketrans(d)
[docs]def make_list(*args):
"""For push/unshift @arr, expr; We use extend/[0:0] so make sure expr is iterable"""
if len(args) == 1 and isinstance(args[0], collections.abc.Iterable) and not isinstance(args[0], str) and (
not hasattr(args[0], 'isHash') or not args[0].isHash):
return args[0]
return args
[docs]def map_int(*args):
"""Convert each element to an int"""
return list(map(int_, flatten(args)))
[docs]def map_num(*args):
"""Convert each element to a num"""
return list(map(num, flatten(args)))
[docs]def map_str(*args):
"""Convert each element to a str"""
return list(map(_str, flatten(args)))
[docs]def method_call(cls_or_obj, methodname, *args, **kwargs):
"""Call a method by name in a class that can also be specified by name"""
try:
if methodname in _PYTHONIZER_KEYWORDS:
methodname += '_'
method = getattr(cls_or_obj, methodname)
if hasattr(method, '__func__'):
method = method.__func__
return method(cls_or_obj, *args, **kwargs)
except AttributeError:
if isinstance(cls_or_obj, str):
cls_or_obj = cls_or_obj.replace('::', '.')
if cls_or_obj in _PYTHONIZER_KEYWORDS:
cls_or_obj += '_'
if hasattr(builtins, cls_or_obj):
cls_or_obj = getattr(builtins, cls_or_obj)
method = getattr(cls_or_obj, methodname)
if hasattr(method, '__func__'):
method = method.__func__
return method(cls_or_obj, *args, **kwargs)
except TypeError:
if callable(methodname):
method = methodname
if isinstance(cls_or_obj, str):
cls_or_obj = cls_or_obj.replace('::', '.')
if cls_or_obj in _PYTHONIZER_KEYWORDS:
cls_or_obj += '_'
if hasattr(method, '__func__'):
method = method.__func__
return method(cls_or_obj, *args, **kwargs)
cluck(f"Can't locate object method \"{methodname}\" via package \"{_str(cls_or_obj)}\"", skip=2)
[docs]def mkdir(path, mode=0o777):
global TRACEBACK, AUTODIE, OS_ERROR
"""Implementation of perl mkdir function"""
try:
os.mkdir(path, mode)
return 1
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
if mode == 0o777:
cluck(f"mkdir({path}) failed: {OS_ERROR}",skip=2)
else:
cluck(f"mkdir({path}, 0o{mode:o}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return ''
[docs]def mkdtemp(template):
"""Implementation of File::Temp::mkdtemp()"""
template = template.replace('X', '')
(base, dirn, tail) = fileparse(template)
return tempfile.mkdtemp(prefix=base, dir=dirn)
[docs]def mkstemp(template):
"""Implementation of File::Temp::mkstemp()"""
template = template.replace('X', '')
(base, dirn, tail) = fileparse(template)
fh = tempfile.NamedTemporaryFile(prefix=base, dir=dirn, delete=False)
return (fh, fh.name)
[docs]def mkstemps(template, suffix):
"""Implementation of File::Temp::mkstemps()"""
template = template.replace('X', '')
(base, dirn, tail) = fileparse(template)
fh = tempfile.NamedTemporaryFile(prefix=base, dir=dirn, suffix=suffix, delete=False)
return(fh, fh.name)
[docs]def mktemp(template):
"""Implementation of File::Temp::mktemp()"""
template = template.replace('X', '')
(base, dirn, tail) = fileparse(template)
ntf = tempfile.NamedTemporaryFile(prefix=base, dir=dirn, delete=False)
result = ntf.name
ntf.close()
return result
[docs]def mod_element(base, index, value):
base[index] %= value
return base[index]
[docs]def multiply_element(base, index, value):
base[index] *= value
return base[index]
[docs]def need_sh(cmd):
"""Does this command need a shell to run it?"""
if os.name == 'nt': # windows
if isinstance(cmd, (tuple, list)):
for e in cmd:
if need_sh(e):
return True
return False
if re.search(r'[<>|&*]', cmd) or re.match(r'(?:copy|echo|dir|type|cd) ', cmd):
return True
return False
return True
[docs]def nr():
"""Get the current INPUT_LINE_NUMBER"""
global INPUT_LINE_NUMBER
try:
return fileinput.lineno()
except RuntimeError:
return INPUT_LINE_NUMBER
[docs]def num(expr):
"""Convert expr to a number
Ref: https://squareperl.com/en/how-perl-convert-string-to-number"""
if expr is None:
return 0
try:
return +expr # Unary plus: The fastest way to test for numeric
except Exception:
pass
#if isinstance(expr, (int, float)):
#return expr
#try:
#return int(expr)
#except Exception:
#pass
for _ in range(2):
try:
f = float(expr)
if f.is_integer():
return int(f)
return f
except Exception:
pass
if isinstance(expr, str):
if not (m:=re.match(r'^\s*([+-]?(?:\d+(?:[.]\d*)?(?:[eE][+-]?\d+)?|[.]\d+(?:[eE][+-]?\d+)?))', expr)):
break
expr = m.group(1);
elif isinstance(expr, bytes):
if not (m:=re.match(br'^\s*([+-]?(?:\d+(?:[.]\d*)?(?:[eE][+-]?\d+)?|[.]\d+(?:[eE][+-]?\d+)?))', expr)):
break
expr = m.group(1);
elif hasattr(expr, 'isHash') and expr.isHash is None:
return 0
elif isinstance(expr, object) and hasattr(expr, '__class__') and isinstance(expr.__class__, type): # a perl object
if hasattr(expr, '_num_') and callable(expr._num_):
return expr._num_() # use overload "0+"
# Breaks Math::Complex operations! return id(expr) # Objects in == are compared by address
return expr
else:
return expr
if WARNING == 2:
die(f"Argument \"{expr}\" isn't numeric in numeric context", skip=1)
if WARNING:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# warnings.warn(f"Argument \"{expr}\" isn't numeric in numeric context at {caller.filename}:{caller.lineno}")
warn(f"Argument \"{expr}\" isn't numeric in numeric context", skip=1)
return 0
def _create_fh_methods(fh):
"""Create special methods for filehandles"""
try:
fh.autoflush = types.MethodType(autoflush, fh)
except NameError: # _autoflush is only brought in if we reference it
pass
return fh
[docs]def open_(file,mode,encoding=None,errors=None,checked=True,newline="\n"):
"""Replacement for perl built-in open function when the mode is known."""
global OS_ERROR, TRACEBACK, AUTODIE
try:
(mode, encoding, errors, newline) = handle_open_pragma(mode, encoding, errors, newline)
except NameError:
pass
if 'b' not in mode and encoding is None:
encoding = 'latin1'
if errors is None:
errors = 'ignore'
file = str(file) # handle numeric filename
try:
if mode == '|-' or mode == '|-b': # pipe to
text = True if mode == '|-' else False
sp = subprocess.Popen(file, stdin=subprocess.PIPE, shell=need_sh(file), text=text, encoding=encoding, errors=errors)
if sp.returncode:
raise Die(f"open(|{file}): failed with {sp.returncode}")
sp.stdin._sp = sp # issue 72
sp.stdin._file = f"|{file}" # issue 72
return sp.stdin
elif mode == '-|' or mode == '-|b': # pipe from
text = True if mode == '-|' else False
sp = subprocess.Popen(file, stdout=subprocess.PIPE, shell=need_sh(file), text=text, encoding=encoding, errors=errors)
if sp.returncode:
raise Die(f"open({file}|): failed with {sp.returncode}")
sp.stdout._sp = sp # issue 72
sp.stdout._file = f"|{file}" # issue 72
return sp.stdout
if file is None:
return tempfile.TemporaryFile(mode=mode, encoding=encoding)
if os.name == 'nt' and file.startswith('/tmp/'):
file = tempfile.gettempdir() + file[4:]
if 'b' in mode:
newline = None
file = file.rstrip("\n\r")
return _create_fh_methods(open(file,mode,encoding=encoding,errors=errors,newline=newline))
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"open({file}, {mode}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
if checked: # e.g. used in if(...)
return None
fh = io.TextIOWrapper(io.BufferedIOBase())
fh.close()
return _create_fh_methods(fh)
[docs]def opendir(DIR):
"""Implementation of perl opendir"""
global OS_ERROR, TRACEBACK, AUTODIE
class DirHandle(list):
pass
try:
result = DirHandle([list(os.listdir(DIR)), 0])
result.name = DIR # for stat and friends
return result
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"opendir({DIR}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return None
[docs]def openhandle(fh):
"""Return the file handle if this is an opened file handle, else return None"""
if hasattr(fh, 'closed'):
if not fh.closed:
return fh
return None
[docs]def open_dynamic(file,mode=None,encoding=None,errors=None,checked=True):
"""Replacement for perl built-in open function when the mode is unknown."""
dup_it = None
pipe = None
if mode is None:
m = re.match(r'^\s*([<>+|-]*)([&]?=?)\s*(.*?)\s*([|]?)\s*$', file)
mode = m.group(1)
dup_it = m.group(2)
file = m.group(3)
pipe = m.group(4)
elif '&' in mode: # dup
dup_it = '&'
mode = mode.replace('&', '')
if '=' in mode:
dup_it = '&='
mode = mode.replace('=', '')
if mode == '<-' or mode == '-' or mode == '-<':
return sys.stdin
if mode == '>-' or mode == '->':
return sys.stdout
ext = None
if ':' in mode:
mode, ext = mode.split(':')
if mode in _OPEN_MODE_MAP:
mode = _OPEN_MODE_MAP[mode]
if ext:
if ext == 'raw' or ext == 'bytes':
mode += 'b'
elif ext.startswith('encoding('):
encoding = ext.replace('encoding(','').replace(')','')
errors = 'replace'
elif ext == 'utf8':
encoding = 'UTF-8'
errors = 'ignore'
if dup_it:
if '=' in dup_it:
return dup(file, mode,encoding=encoding,errors=errors,checked=checked,equals=True)
return dup(file, mode,encoding=encoding,errors=errors,checked=checked)
return open_(file, mode,encoding=encoding,errors=errors,checked=checked)
if pipe:
return open_(file, '-|',encoding=encoding,errors=errors,checked=checked)
return open_(file, 'r',encoding=encoding,errors=errors,checked=checked)
[docs]def or_element(base, index, value):
base[index] |= value
return base[index]
[docs]def os_name():
"""Implementation of $OSNAME / $^O"""
result = sys.platform
return 'MSWin32' if result == 'win32' else result
[docs]def overload_Method(obj, op):
"""Given an object and an operation string, return a reference to
the code if it's overloaded, else return None"""
key = f"({op}"
if hasattr(obj, key):
return getattr(obj, key)
return None
[docs]def overload_Overloaded(obj):
"""Given an object, return 1 if it has any overloads defined,
else return ''"""
for a in dir(obj):
if a.startswith('('): # special attribute for overloaded method
return 1
return ''
[docs]def overload_StrVal(obj):
"""Implementation of overload::StrVal($obj)"""
cls = ref_scalar(obj)
if not cls:
return obj
if cls == 'ARRAY' or cls == 'HASH' or cls == 'CODE':
return f"{cls}(0x{id(obj):x})"
cls_type = 'HASH'
if hasattr(obj, 'isHash') and not obj.isHash:
cls_type = 'ARRAY'
return f"{cls}={cls_type}(0x{id(obj):x})"
_PACK_TO_STRUCT = dict(a='s', c='b', C='B', s='h', S='H', l='l', L='L', q='q', Q='Q', i='i', I='I', n='!H', N='!L', v='<H', V='<L', j='i', J='I', f='f', d='d', F='d', x='x')
_TEMPLATE_LENGTH = dict(a=1, c=1, C=1, s=2, S=2, l=4, L=4, q=8, Q=8, i=4, I=4, n=2, N=4, v=2, V=4, j=4, J=4, f=4, d=8, F=8, x=1)
_decoding_map = codecs.make_identity_dict(range(256))
_encoding_map = codecs.make_encoding_map(_decoding_map)
def _str_to_bytes(by):
return codecs.charmap_encode(by, 'ignore', _encoding_map)[0]
def _bytes_to_str(by):
return codecs.charmap_decode(by, 'ignore', _decoding_map)[0]
def _get_pack_unpack_format_and_counts(template, args, is_unpack=False):
# FIXME: Handle more cases using a custom translator.
format_and_counts = []
prefix = ''
format = ''
i = 0
ndx = 0
typ = 'unpack' if is_unpack else 'pack'
len_so_far = 0
prev = 0
while i < len(template):
if template[i].isspace():
i += 1
continue
if template[i] in _PACK_TO_STRUCT:
fmt = _PACK_TO_STRUCT[template[i]]
else:
raise Die(f'{typ} format {template[i]} is not currently supported')
mod = ''
cnt = 1
if (_m:=re.match(r'^([!<>]?)((?:(?:\[?(?:(?:\d+)|[*]))\]?)|(?:\[[A-Za-z]\]))?', template[i+1:])):
i += _m.end()
mod = _m.group(1)
if mod is None:
mod = ''
cnt = _m.group(2)
if not cnt:
cnt = '1'
elif cnt[0] == '[':
cnt = cnt[1:-1]
if cnt.isdigit():
cnt = int(cnt)
elif cnt in _TEMPLATE_LENGTH:
cnt = _TEMPLATE_LENGTH[cnt]
elif cnt != '*':
raise Die(f'{typ} cannot get length of {cnt} template')
if mod == '!':
if len(fmt) != 1:
fmt = fmt.lower()
mod = fmt[0]
fmt = fmt[1]
else:
mod = '@' # Native
elif len(fmt) != 1:
mod = fmt[0]
fmt = fmt[1]
if cnt == '*':
if is_unpack and hasattr(args[0], '__len__'):
cnt = len(args[0]) - len_so_far - (struct.calcsize(format) if format else 0)
if ndx < len(args) and hasattr(args[ndx], '__len__'):
cnt = len(args[ndx])
else:
cnt = 1
fmt = f"{cnt}{fmt}"
elif len(fmt) != 1:
mod = fmt[0]
fmt = fmt[1]
fmt_code = fmt[-1]
if mod == prefix:
format += fmt
mod = ''
fmt = ''
elif mod and not prefix:
prefix = mod
format += fmt
mod = ''
fmt = ''
else:
format = f'{prefix}{format}'
len_so_far += struct.calcsize(format)
format_and_counts.append((format, prev, ndx))
prefix = ''
format = ''
prev = ndx
if fmt_code == 's':
if not is_unpack and isinstance(args[ndx], str):
args[ndx] = _str_to_bytes(args[ndx])
ndx += 1
else:
ndx += cnt
i += 1
format = prefix + mod + format + fmt
if format:
format_and_counts.append((format, prev, ndx))
return format_and_counts
[docs]def pack(template, *args):
"""pack items into a str via a given format template"""
# Look here to handle many cases: https://docs.python.org/3/library/struct.html
args = list(args)
result = ''
format_and_counts = _get_pack_unpack_format_and_counts(template, args)
for format, start, end in format_and_counts:
result += _bytes_to_str(struct.pack(format, *args[start:end]))
return result
[docs]def package_call(package, function, *args, **kwargs):
"""Call a function in a different package"""
cur_package = builtins.__PACKAGE__
try:
builtins.__PACKAGE__ = package.__PACKAGE__
return function(*args, **kwargs)
finally:
builtins.__PACKAGE = cur_package
[docs]def perlio_ok(encoding):
"""Implementation of Encoding::perl_io_ok - returns 1"""
return 1
[docs]def perl_print(*args, **kwargs):
"""Replacement for perl built-in print/say/warn functions.
Note that by default this acts like 'say' in that it appends a newline.
To prevent the newline, pass the end='' keyword argument. To write
to a different file, pass the file=... keyword argument. To flush the output
after writing, pass flush=True. To replace the OUTPUT_FIELD_SEPARATOR, pass sep='...'.
It returns 1 if successful"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
file = sys.stdout
if 'file' in kwargs:
file = kwargs['file']
if file is None:
raise Die('print() on unopened filehandle')
if 'sep' not in kwargs:
kwargs['sep'] = OUTPUT_FIELD_SEPARATOR
if 'end' in kwargs:
kwargs['end'] += OUTPUT_RECORD_SEPARATOR
else:
kwargs['end'] = "\n" + OUTPUT_RECORD_SEPARATOR
if 'flush' not in kwargs and hasattr(file, '_autoflush'):
kwargs['flush'] = file._autoflush
try:
print(*args, **kwargs)
except TypeError as _t:
if 'bytes-like' in str(_t):
for k in ('sep', 'end'):
if k in kwargs:
kwargs[k] = bytes(kwargs[k], encoding="latin1", errors="ignore")
for i in range(len(args)):
a = args[i]
file.write(bytes(a, encoding="latin1", errors="ignore"))
if i == len(args)-1:
if 'end' in kwargs:
file.write(kwargs['end'])
elif 'sep' in kwargs:
file.write(kwargs['sep'])
if 'flush' in kwargs and kwargs['flush'] and hasattr(file, 'flush'):
file.flush()
return 1 # True
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"print failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return '' # False
[docs]def postprocess_arguments(parser, parser_rem):
"""After argument parsing, see if we have any leftover arguments and
flag those as errors"""
errors = ''
for arg in parser_rem:
if len(arg) != 0 and arg[0] == '-':
errors += f"Unknown option: {re.sub(r'^-*', '', arg)}\n"
if errors:
if parser.exit_on_error:
print(errors, file=sys.stderr, end='')
sys.exit(1)
raise argparse.ArgumentError(None, errors)
init_package('Getopt.Long')
[docs]def preprocess_arguments():
"""Pre-process the command line arguments, changing -option to --option"""
for i in range(1, len(sys.argv)):
if len(sys.argv[i]) > 2 and sys.argv[i][0] == '-' and sys.argv[i][1] != '-':
sys.argv[i] = '-' + sys.argv[i]
[docs]def print_(fh, *args):
"""Implementation of perl $fh->print method"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
print(*args, end='', file=fh)
return 1 # True
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"print failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return '' # False
[docs]def printf(fh, fmt, *args):
"""Implementation of perl $fh->printf method"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
print(format_(fmt, *args), end='', file=fh)
return 1 # True
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
if isinstance(fmt, str):
fmt = fmt.replace("\n", '\\n')
cluck(f"printf({fmt},...) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return '' # False
[docs]def raise_(exception):
"""To raise an exception in a lambda function or expression"""
raise exception
[docs]def rand(expr=0):
"""Implementation of perl rand function"""
if expr == 0:
expr = 1
return random.random() * expr
[docs]def range_(var, pat1, flags1, pat2, flags2, key):
"""The line-range operator. See https://perldoc.perl.org/perlop#Range-Operators"""
if not hasattr(range_, key):
setattr(range_, key, 0)
seq = getattr(range_, key)
if isinstance(seq, str): # e.g. nnE0
setattr(range_, key, 0)
return '' # False
if seq == 0: # Waiting for left to become True
if isinstance(pat1, str):
val = re.search(pat1, var, flags=flags1)
else:
val = bool(pat1)
if not val:
return '' # False
seq += 1 # once left becomes True, then the seq starts counting, and we check right
setattr(range_, key, seq)
if isinstance(pat2, str):
val = re.search(pat2, var, flags=flags2)
else:
val = bool(pat2)
if val:
seq = str(seq)+'E0' # end marker
setattr(range_, key, seq)
return seq
[docs]def read(fh, var, length, offset=0, need_len=False):
"""Read length bytes from the fh, and return the result to store in var
if need_len is False, else return a tuple with the result
and the length read"""
global OS_ERROR, TRACEBACK, AUTODIE
if var is None:
var = ''
try:
s = fh.read(length)
if isinstance(s, bytes):
s = str(s, encoding='latin1', errors='ignore')
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"read of {length} byte(s) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
if need_len:
return (var, None)
return var
ls = len(s)
var = _str(var)
lv = len(var)
if offset < 0:
offset += lv
if offset:
if isinstance(var, bytes):
var = var.decode(encoding='latin1', errors='ignore')
if need_len:
return (var[:offset] + ('\0' * (offset-lv)) + s, ls)
else:
return var[:offset] + ('\0' * (offset-lv)) + s
if need_len:
return (s, ls)
return s
[docs]def readdir(DIR):
"""Implementation of perl readdir in scalar context"""
try:
result = (DIR[0])[DIR[1]]
DIR[1] += 1
return result
except IndexError:
return None
[docs]def readdirs(DIR):
"""Implementation of perl readdir in list context"""
result = (DIR[0])[DIR[1]:]
DIR[1] = len(DIR[0])
return result
[docs]def readline(fh):
"""Reads a line from a file.
(instead use _readline_full if you need support for perl $/ or $.)"""
result = fh.readline()
if not result:
return None
return result
[docs]def readline_full(fh, fhname=''):
"""Reads a line from a file, handles perl $/ and sets $. """
global INPUT_RECORD_SEPARATOR, INPUT_LINE_NUMBER
global _INPUT_FH_NAME
if INPUT_RECORD_SEPARATOR == "\n":
result = fh.readline()
if not result:
return None
elif INPUT_RECORD_SEPARATOR is None:
result = fh.read()
else:
if not hasattr(fh, '_data'):
fh._data = fh.read()
fh._pos = 0
irs = INPUT_RECORD_SEPARATOR
if irs == '': # paragraph mode
pos = fh._pos
while(fh._data[pos] == "\n"):
pos += 1
fh._pos = pos
irs = "\n\n"
pos = fh._pos
ndx = fh._data.index(irs, pos)
if ndx < 0:
fh._pos = len(fh._data)
fh._pos = ndx + len(irs)
result = fh._data[pos:fh._pos]
if not result:
if hasattr(fh, '_data'):
del fh._data
if hasattr(fh, '_at_eof') and fh._at_eof:
return None
else:
fh._at_eof = True
else:
fh._at_eof = False
if not hasattr(fh, '_lno'):
INPUT_LINE_NUMBER = fh._lno = 1
_INPUT_FH_NAME = fhname
else:
fh._lno += 1
INPUT_LINE_NUMBER = fh._lno
return result
[docs]def readlink(path):
"""Returns the value of a symbolic link. If there is a system error, returns the undefined value and sets OS_ERROR (errno)."""
global OS_ERROR, TRACEBACK, AUTODIE
try:
return os.readlink(path)
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"readlink({path}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return None
[docs]def ref(r):
"""ref function in perl - called when NOT followed by a backslash"""
_ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR',
"<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR',
"<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY',
"<class 'function'>": 'CODE', "<class 'dict'>": 'HASH'}
tr = type(r)
t = str(tr)
if t in _ref_map:
return ''
elif '_ArrayHash' in t:
return ''
if isinstance(r, type): # return '' for a class (not a class instance)
return ''
if hasattr(tr, '__name__'):
return tr.__name__.replace('.', '::')
return t.replace("<class '", '').replace("'>", '').replace('.', '::')
[docs]def refs(r):
"""ref function in perl - called when followed by a backslash"""
_ref_map = {"<class 'int'>": 'SCALAR', "<class 'str'>": 'SCALAR',
"<class 'float'>": 'SCALAR', "<class 'NoneType'>": 'SCALAR',
"<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY',
"<class 'function'>": 'CODE', "<class 'dict'>": 'HASH'}
t = str(type(r))
if t in _ref_map:
return _ref_map[t]
elif '_ArrayHash' in t:
if r.isHash:
return 'HASH'
return 'ARRAY'
elif hasattr(r, 'TIEARRAY'):
return 'ARRAY'
elif hasattr(r, 'TIEHASH'):
return 'HASH'
return ''
[docs]def ref_scalar(r):
"""ref function in perl - called when being passed a scalar without a backslash"""
_ref_map = {"<class 'int'>": '', "<class 'str'>": '',
"<class 'float'>": '', "<class 'NoneType'>": '',
"<class 'list'>": 'ARRAY', "<class 'tuple'>": 'ARRAY',
"<class 'function'>": 'CODE', "<class 'dict'>": 'HASH'}
tr = type(r)
t = str(tr)
if t in _ref_map:
return _ref_map[t]
elif '_ArrayHash' in t:
if r.isHash:
return 'HASH'
return 'ARRAY'
if isinstance(r, type): # return '' for a class (not a class instance)
return ''
if hasattr(tr, '__name__'):
return tr.__name__.replace('.', '::')
elif hasattr(r, 'TIEARRAY'):
return 'ARRAY'
elif hasattr(r, 'TIEHASH'):
return 'HASH'
return t.replace("<class '", '').replace("'>", '').replace('.', '::')
[docs]def reset_each(h_a):
"""Reset the 'each' iterator on keys/values calls"""
key = str(id(h_a)) # Unique memory address of object
if hasattr(each, key):
delattr(each, key)
[docs]def resolve_alias(encoding):
"""Implementation of Encode::resolve_alias"""
try:
info = codecs.lookup(encoding)
return info.name
except LookupError:
return None
[docs]def reverse_scalar(expr):
"""reverse function implementation in scalar context"""
if expr is None:
return ''
if hasattr(expr, 'isHash'):
if expr.isHash:
expr = [_item for _k in expr for _item in (_k, expr[_k])]
else:
return ''.join(expr)[::-1]
elif isinstance(expr, collections.abc.Mapping): # flatten hash (dict)
expr = [_item for _k in expr for _item in (_k, expr[_k])]
if isinstance(expr, collections.abc.Iterable) and not isinstance(expr, str):
return ''.join(expr)[::-1]
return expr[::-1]
[docs]def rewinddir(DIR):
DIR[1] = 0
[docs]def rmdir(d):
"""Implementation of perl rmdir"""
global AUTODIE, TRACEBACK, OS_ERROR
try:
os.rmdir(d)
return 1
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(OS_ERROR,skip=2)
if AUTODIE:
raise
return ''
[docs]def run(*args):
"""Execute a command and return the stdout in list context"""
global CHILD_ERROR, AUTODIE, TRACEBACK, INPUT_RECORD_SEPARATOR, TRACE_RUN
if len(args) == 1:
args = args[0]
try:
if os.name == 'nt':
if isinstance(args, str):
args = [args]
arg_split = args[0].split()[0]
if arg_split.endswith('.py'):
args = [sys.executable] + args
elif arg_split.endswith('.pl'):
args = ['perl'] + args
newargs = []
for arg in args:
if '"' in arg or "'" in arg:
newargs.append(arg)
else:
arg_split = arg.split()
newargs.extend(arg_split)
args = newargs
if len(args) == 1:
args = args[0]
sp = subprocess.run(args,stdin=sys.stdin,capture_output=True,text=True,shell=need_sh(args))
except FileNotFoundError: # can happen on windows if shell=False
sp = subprocess.CompletedProcess(args, -1)
if TRACE_RUN:
carp(f'trace run({args}): {repr(sp)}', skip=2)
CHILD_ERROR = -1 if sp.returncode == -1 else ((sp.returncode<<8) if sp.returncode >= 0 else -sp.returncode)
if CHILD_ERROR:
if AUTODIE:
raise Die(f'run({args}): failed with rc {CHILD_ERROR}')
if TRACEBACK:
cluck(f'run({args}): failed with rc {CHILD_ERROR}',skip=2)
if INPUT_RECORD_SEPARATOR is None:
return sp.stdout
irs = INPUT_RECORD_SEPARATOR
pos = 0
if irs == '': # paragraph mode
while(sp.stdout[pos] == "\n"):
pos += 1;
irs = "\n\n"
arr = sp.stdout[pos:].split(irs)
if arr[-1] == '':
arr = arr[:-1]
return [line + irs for line in arr]
[docs]def run_s(*args):
"""Execute a command and return the stdout in scalar context"""
global CHILD_ERROR, AUTODIE, TRACEBACK, TRACE_RUN
if len(args) == 1:
args = args[0]
try:
if os.name == 'nt':
if isinstance(args, str):
args = [args]
arg_split = args[0].split()[0]
if arg_split.endswith('.py'):
args = [sys.executable] + args
elif arg_split.endswith('.pl'):
args = ['perl'] + args
newargs = []
for arg in args:
if '"' in arg or "'" in arg:
newargs.append(arg)
else:
arg_split = arg.split()
newargs.extend(arg_split)
args = newargs
if len(args) == 1:
args = args[0]
sp = subprocess.run(args,stdin=sys.stdin,capture_output=True,text=True,shell=need_sh(args))
except FileNotFoundError: # can happen on windows if shell=False
sp = subprocess.CompletedProcess(args, -1)
if TRACE_RUN:
carp(f'trace run({args}): {repr(sp)}', skip=2)
CHILD_ERROR = -1 if sp.returncode == -1 else ((sp.returncode<<8) if sp.returncode >= 0 else -sp.returncode)
if CHILD_ERROR:
if AUTODIE:
raise Die(f'run({args}): failed with rc {CHILD_ERROR}')
if TRACEBACK:
cluck(f'run({args}): failed with rc {CHILD_ERROR}',skip=2)
return sp.stdout
[docs]def say(fh, *args):
"""Implementation of perl $fh->say method"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
print(*args, file=fh)
return 1 # True
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"say failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return '' # False
[docs]def seek(fh, pos, whence):
"""Implementation of perl seek"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
return fh.seek(pos, whence)
return 1 # True
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"seek({pos},{whence}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return None
[docs]def seekdir(DIR, pos):
DIR[1] = pos
[docs]def select(fh):
"""Implementation of perl select function"""
result = sys.stdout
sys.stdout = fh
return result
[docs]def setpos(fh, off):
"""Implementation of perl $fh->setpos method"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
fh.seek(off, os.SEEK_SET)
return 1 # True
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"setpos({off}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return '' # False
[docs]def set_breakpoint():
"""Sets a debugger breakpoint, but only if pdb is active, mimicking $DB::single"""
if 'pdb' in sys.modules:
import pdb
pdb.set_trace()
[docs]def set_element(base, index, value):
"""Implementation of perl = on an array element or hash key"""
base[index] = value
return value
[docs]def set_last_ndx(arr, ndx):
"""Implementation of assignment to perl array last index $#array"""
del arr[ndx+1:]
for _ in range((ndx+1)-len(arr)):
arr.append(None)
[docs]def set_signal(sig, handler):
"""Set a signal handler to either a code reference or a string containing a perl code name or IGNORE or DEFAULT"""
sig = num(sig)
if callable(handler):
signal.signal(sig, handler)
elif handler == 'IGNORE':
signal.signal(sig, signal.SIG_IGN)
elif handler == 'DEFAULT':
signal.signal(sig, signal.SIG_DFL)
elif isinstance(handler, str):
handler = handler.replace('::', '.').replace("'", '.')
rdot = handler.rfind('.')
if rdot == -1:
pkg = getattr(builtins, '__PACKAGE__')
fun = handler
else:
pkg = handler[0:rdot]
fun = handler[rdot+1:]
if hasattr(builtins, pkg):
namespace = getattr(builtins, pkg)
if hasattr(namespace, fun):
signal.signal(sig, getattr(namespace, fun))
return
def error_handler(sno, frm):
warn(f'{signal.Signals(sno).name} handler "{handler}" not defined.\n')
signal.signal(sig, error_handler)
else:
def bad_handler(sno, frm):
warn(f"{signal.Signals(sno).name} handler invalid: {handler}.\n")
signal.signal(sig, bad_handler)
[docs]def shift_left_element(base, index, value):
base[index] <<= value
return base[index]
[docs]def shift_right_element(base, index, value):
base[index] >>= value
return base[index]
[docs]def shortmess(*args, skip=0):
"""Message with no backtrace"""
def ff(fn):
fn = os.path.relpath(fn)
if fn.startswith('./'):
return fn[2:]
return fn
stack = inspect.stack()
stack = stack[skip:]
m = ''.join(map(str, args))
m += ' at ' + ff(stack[1].filename) + ' line ' + str(stack[1].lineno) + ".\n"
return m
[docs]def smartmatch(left, right):
"""Implement the perl smartmatch (~~) operator"""
if right is None:
return 1 if left is None else ''
if hasattr(left, '__smartmatch__'):
return left.__smartmatch__(right)
if hasattr(right, '__rsmartmatch__'):
return right.__rsmartmatch__(left)
if (hasattr(right, 'isHash') and right.isHash) or (not hasattr(right, 'isHash') and isinstance(right, collections.abc.Mapping)):
if (hasattr(left, 'isHash') and left.isHash) or (not hasattr(left, 'isHash') and isinstance(left, collections.abc.Mapping)):
return smartmatch(sorted(left.keys()), sorted(right.keys()))
elif isinstance(left, collections.abc.Iterable) and not isinstance(left, str):
return 1 if any(i in right for i in left) else ''
elif isinstance(left, re.Pattern):
return 1 if any(re.search(left, _str(i)) for i in right.keys()) else ''
else:
return 1 if left in right.keys() else ''
elif isinstance(right, collections.abc.Iterable) and not isinstance(right, str):
if (hasattr(left, 'isHash') and left.isHash) or (not hasattr(left, 'isHash') and isinstance(left, collections.abc.Mapping)):
return 1 if any(i in left for i in right) else ''
elif isinstance(left, collections.abc.Iterable) and not isinstance(left, str):
ll = list(left)
lll = len(ll)
lr = list(right)
if lll != len(lr):
return ''
for i in range(lll):
if not smartmatch(ll[i], lr[i]):
return ''
return 1
elif isinstance(left, re.Pattern):
return 1 if any(re.search(left, _str(i)) for i in right) else ''
elif all(isinstance(i, (int, float, str)) for i in right):
return 1 if left in right else ''
else:
return 1 if any(left == i for i in right) else ''
elif callable(right):
if (hasattr(left, 'isHash') and left.isHash) or (not hasattr(left, 'isHash') and isinstance(left, collections.abc.Mapping)):
return 1 if all(right(i) for i in left.keys()) else ''
elif isinstance(left, collections.abc.Iterable) and not isinstance(left, str):
return 1 if all(right(i) for i in left) else ''
return 1 if right(left) else ''
elif isinstance(right, re.Pattern):
if (hasattr(left, 'isHash') and left.isHash) or (not hasattr(left, 'isHash') and isinstance(left, collections.abc.Mapping)):
return 1 if any(re.search(right, i) for i in left.keys()) else ''
elif isinstance(left, collections.abc.Iterable) and not isinstance(left, str):
return 1 if any(re.search(right, _str(i)) for i in left) else ''
else:
return 1 if re.search(right, _str(left)) else ''
elif isinstance(right, (int, float)):
return 1 if num(left) == right else ''
elif isinstance(left, (int, float)):
return 1 if left == num(right) else ''
elif isinstance(right, str):
return 1 if left == right else ''
elif left is None:
return 1 if right is None else ''
else:
return 1 if left == right else ''
[docs]def spaceship(a,b):
"""3-way comparison like the <=> operator in perl"""
if hasattr(a, '__spaceship__'):
return a.__spaceship__(b)
if hasattr(b, '__rspaceship__'):
return b.__rspaceship__(a)
return (a > b) - (a < b)
[docs]def splice(array, *args):
"""Implementation of splice function"""
offset = 0;
if len(args) >= 1:
offset = args[0]
length = len(array)
if len(args) >= 2:
length = args[1]
if offset < 0:
offset += len(array)
total = offset + length
if length < 0:
total = length
removed = array[offset:total]
array[offset:total] = args[2:]
return removed
[docs]def splice_s(array, *args):
"""Implementation of splice function in scalar context"""
offset = 0;
if len(args) >= 1:
offset = args[0]
length = len(array)
if len(args) >= 2:
length = args[1]
if offset < 0:
offset += len(array)
total = offset + length
if length < 0:
total = length
removed = array[offset:total]
array[offset:total] = args[2:]
if not removed:
return None
return removed[-1]
[docs]def split(pattern, string, maxsplit=0, flags=0):
"""Split function in perl is similar to re.split but not quite
the same - this function makes it the same"""
result = re.split(pattern, string, max(0, maxsplit), flags)
if len(result) >= 1 and result[0] == '' and (m:=re.match(pattern, string, flags)) and len(m.group(0)) == 0:
result = result[1:] # A zero-width match at the beginning of EXPR never produces an empty field
if maxsplit >= -1: # We subtracted one from what the user specifies
limit = len(result)
# Empty results at the end are eliminated
for i in range(limit-1, -1, -1):
if result[i] == '':
limit -= 1
else:
break
return result[:limit]
return result
[docs]def splitdir(*_args):
"""Implementation of File::Spec->splitdir"""
return split(r"/", _str(_args[0]), -1 - 1) # Preserve trailing fields
[docs]def splitpath(*_args):
"""Implementation of File::Spec->splitpath"""
[path, nofile] = list_of_n(_args, 2)
[volume, directory, file] = ("", "", "")
if nofile:
directory = path
else:
_m = re.search(
re.compile(r"^ ( (?: .* / (?: \.\.?\Z )? )? ) ([^/]*) ", re.X | re.S), _str(path)
)
directory = _m.group(1)
file = _m.group(2)
return [volume, directory, file]
[docs]def split_s(pattern, string, maxsplit=0, flags=0):
"""Split function in perl is similar to re.split but not quite
the same - this is the version used in scalar context"""
result = re.split(pattern, string, max(0, maxsplit), flags)
if len(result) >= 1 and result[0] == '' and (m:=re.match(pattern, string, flags)) and len(m.group(0)) == 0:
result = result[1:] # A zero-width match at the beginning of EXPR never produces an empty field
if maxsplit >= -1: # We subtracted one from what the user specifies
limit = len(result)
# Empty results at the end are eliminated
for i in range(limit-1, -1, -1):
if result[i] == '':
limit -= 1
else:
break
return limit
return len(result)
[docs]def stat_cando(self, mode, eff):
"""Implementation of File::Stat::stat_cando. This takes an arrayref containing the return values of stat or lstat as its first argument, and interprets it for you"""
if os.name == 'nt':
if (self._mode & mode):
return 1 # True
return '' # False
uid = os.geteuid() if eff else os.getuid()
def _ingroup(gid, eff):
rgid = os.getgid()
egid = os.getegid()
supp = os.getgroups()
if gid == (egid if eff else rgid):
return 1 # True
if gid in supp:
return 1 # True
return '' # False
if uid == 0 or (sys.platform == 'cygwin' and _ingroup(544, eff)): # Root
if not (mode & 0o111):
return 1 # Not testing for executable: all file tests are true
if (self._mode & 0o111) or st_py.S_ISDIR(self._mode):
return 1 # True
return '' # False
if self._uid == uid:
if (self._mode & mode):
return 1 # True
elif _ingroup(self._gid, eff):
if (self._mode & (mode >> 3)):
return 1 # True
else:
if (self._mode & (mode >> 6)):
return 1 # True
return '' # False
[docs]@dataclasses.dataclass
class File_stat(collections.abc.Sequence):
_dev: int
_ino: int
_mode: int
_nlink: int
_uid: int
_gid: int
_rdev: int
_size: int
_atime: int
_mtime: int
_ctime: int
_blksize: int
_blocks: int
_item_map = {0:'_dev', 1:'_ino', 2:'_mode', 3:'_nlink', 4:'_uid', 5:'_gid',
6:'_rdev', 7:'_size', 8:'_atime', 9:'_mtime', 10:'_ctime', 11:'_blksize', 12:'_blocks'}
[docs] def dev(self):
return self._dev
[docs] def ino(self):
return self._ino
[docs] def mode(self):
return self._mode
[docs] def nlink(self):
return self._nlink
[docs] def uid(self):
return self._uid
[docs] def gid(self):
return self._gid
[docs] def rdev(self):
return self._rdev
[docs] def size(self):
return self._size
[docs] def atime(self):
return self._atime
[docs] def mtime(self):
return self._mtime
[docs] def ctime(self):
return self._ctime
[docs] def blksize(self):
return self._blksize
[docs] def blocks(self):
return self._blocks
def __len__(self):
return len(self._item_map)
def __getitem__(self, index):
if isinstance(index, slice):
return [self[i] for i in range(*index.indices(len(self)))]
if index < 0:
index += len(self)
try:
return getattr(self, self._item_map[index])
except KeyError:
raise IndexError('File_stat index out of range')
def __contains__(self, item):
return item in self._item_map
[docs] def cando(self, mode, eff):
return stat_cando(self, mode, eff)
[docs] def get(self, index, default=None):
try:
return self[index]
except IndexError:
return default
[docs]def stat(path):
"""Handle stat call with or without "use File::stat;" """
if isinstance(path, File_stat):
return path # for '_' special variable
try:
if hasattr(path, 'fileno') and os.stat in os.supports_fd:
path = path.fileno()
elif hasattr(path, 'name'):
path = path.name
s = os.stat(path)
except Exception:
return ()
result = File_stat(_dev=s.st_dev, _ino=s.st_ino, _mode=s.st_mode,
_nlink=s.st_nlink, _uid=s.st_uid, _gid=s.st_gid,
_rdev=s.st_rdev if hasattr(s, 'st_rdev') else 0,
_size=s.st_size, _atime=s.st_atime, _mtime=s.st_mtime, _ctime=s.st_ctime,
_blksize=s.st_blksize if hasattr(s, 'st_blksize') else 512,
_blocks=s.st_blocks if hasattr(s, 'st_blocks') else s.st_size // 512)
return result
[docs]def store_out_parameter(arglist, arg, value, shifts=0):
"""Store the value of a sub out parameter both in the arglist and in a
location where _fetch_out_parameter can retrieve it after the sub returns. arg
is the argument index, starting at 0, and value is the value to be stored.
shifts specifies the number of shift operations that have been performed
on the arglist. Returns the value."""
if arglist is not None:
arglist[arg] = value
setattr(builtins, f"__outp{arg+shifts}__", value)
return value
[docs]def store_perl_global(perlname, value, infer_suffix=False, method_type=False):
"""Assigns a value to a package global variable specified by it's perl name
and returns the value. Optional keyword argument infer_suffix
will map the variable's suffix based on the type of the value,
e.g. it will add _h for a hash. Optional keyword argument method_type will
set this to a MethodType if True, or will check if the name is 'new' or 'make'
and set this to a MethodType if None"""
(packname, varname) = perlname.rsplit('::', maxsplit=1)
packname = packname.replace('::', '.')
if packname == '':
packname = 'main'
if callable(value) and (method_type or (method_type is None and (varname == 'new' or varname == 'make'))) and \
((not hasattr(builtins, packname)) or (not isinstance(getattr(builtins, packname), type))):
init_package(packname, is_class=True)
elif not hasattr(builtins, packname):
init_package(packname)
namespace = getattr(builtins, packname)
if infer_suffix:
if isinstance(value, (str, float, int, bytes)):
varname += '_v'
elif hasattr(value, 'isHash'):
if value.isHash:
varname += '_h'
else:
varname += '_a'
elif isinstance(value, collections.abc.Mapping):
varname += '_h'
elif isinstance(value, collections.abc.Iterable):
varname += '_a'
if callable(value) and (method_type or (method_type is None and (varname == 'new' or varname == 'make'))):
value = types.MethodType(value, namespace)
if varname == '' or varname == '_h': # namespace dictionary
namespace.__dict__ = value
else:
setattr(namespace, varname, value)
if varname in _PYTHONIZER_KEYWORDS:
varname += '_'
setattr(namespace, varname, value)
return value
[docs]def strftime(fmt, sec, min=None, hour=None, mday=None, mon=None, year=None, wday=0, yday=0, isdst=0):
"""Implementation of perl strftime"""
if min is None:
min = sec[1]
hour = sec[2]
mday = sec[3]
mon = sec[4]
year = sec[5]
sec = sec[0]
tl = timelocal(sec, min, hour, mday, mon, year)
return tm_py.strftime(fmt, tm_py.localtime(tl))
[docs]def sub():
"""Implementation of __SUB__ in perl"""
try:
frame = sys._getframe(1)
name = frame.f_code.co_name
return frame.f_globals[name]
except Exception:
return None
[docs]def subname(code):
"""Implementation of Sub::Util::subname"""
def unescape(name):
ns = name.split('.')
result = []
for n in ns:
if n[-1] == '_' and n[:-1] in _PYTHONIZER_KEYWORDS:
result.append(n[:-1])
else:
result.append(n)
return '.'.join(result)
name = unescape(code.__name__)
if hasattr(builtins.main, name) and getattr(builtins.main, name) == code:
return f"main::{name}"
if hasattr(code, '__self__') and hasattr(code.__self__, '__PACKAGE__'):
package = unescape(code.__self__.__PACKAGE__)
return f"{package.replace('.', '::')}::{name}"
for packagename in vars(builtins):
namespace = getattr(builtins, packagename)
if isinstance(namespace, type) or isinstance(namespace, types.SimpleNamespace):
if hasattr(namespace, name):
cd = getattr(namespace, name)
if cd == code:
return f"{unescape(packagename).replace('.', '::')}::{name}"
if hasattr(cd, '__func__'):
cd = cd.__func__
if cd == code or (hasattr(code, '__func__') and cd == code.__func__):
return f"{unescape(packagename).replace('.', '::')}::{name}"
return f"main::{name}"
[docs]def substitute_and_count(this, that, var, replace=True, count=0):
"""Perform a re substitute, but also count the # of matches"""
(result, ctr) = re.subn(this, that, var, count=count)
if not replace:
return (var, result)
return (result, ctr)
[docs]def substitute_element(base, index, this, that, count=0, replace=True):
"""Perform a re substitution on an array element or hash value, and also count the # of matches"""
(result, ctr) = re.subn(this, that, _str(base[index]), count=count)
if replace:
base[index] = result
return ctr
return result
[docs]def substitute_global(packname, varname, this, that, replace=True, count=0):
"""Perform a re substitute on a global, and also count the # of matches"""
namespace = getattr(builtins, packname)
var = _str(getattr(namespace, varname))
(result, ctr) = re.subn(this, that, var, count=count)
if replace:
setattr(namespace, varname, result)
return ctr
return result
[docs]def substr(this, start, length, replacement):
"""Handle substr with replacement - returns a tuple
with (new_this, chars_removed)"""
chars_removed = this[start:start+length]
new_this = this[:start] + replacement + this[start+length:]
return (new_this, chars_removed)
[docs]def subtract_element(base, index, value):
"""Implementation of -= on an array element"""
try:
base[index] -= value
except TypeError:
if isinstance(value, int) or isinstance(value, float):
base[index] = num(base[index]) - value
elif value is not None:
raise
return base[index]
[docs]def switch(s_val):
"""Implementation of switch/given statement in perl. This
returns a function that is called for each case."""
def iter_to_bool(i):
try:
v = next(i)
return True
except StopIteration:
return False
if hasattr(s_val, '__rsmartmatch__'):
def o_switch(c_val):
return s_val.__rsmartmatch__(c_val)
return o_switch
elif callable(s_val):
def f_switch(c_val):
if callable(c_val):
return s_val == c_val
if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str):
return s_val(*c_val)
return s_val(c_val)
return f_switch
elif isinstance(s_val, int) or isinstance(s_val, float):
def n_switch(c_val):
if hasattr(c_val, '__smartmatch__'):
return c_val.__smartmatch__(s_val)
if isinstance(c_val, int) or isinstance(c_val, float):
return s_val == c_val
if callable(c_val):
return c_val(s_val)
if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str):
if hasattr(c_val, 'isHash') and c_val.isHash:
return str(s_val) in c_val
return s_val in c_val
if isinstance(c_val, re.Pattern):
return re.search(c_val, str(s_val))
if isinstance(c_val, dict):
return str(s_val) in c_val
return str(s_val) == str(c_val)
return n_switch
elif isinstance(s_val, str):
def s_switch(c_val):
if hasattr(c_val, '__smartmatch__'):
return c_val.__smartmatch__(s_val)
if (isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str)) or isinstance(c_val, dict):
return s_val in c_val # list, Array, or Hash
if callable(c_val):
return c_val(s_val)
if isinstance(c_val, re.Pattern):
return re.search(c_val, s_val)
return s_val == str(c_val)
return s_switch
elif isinstance(s_val, dict) and (not hasattr(s_val, 'isHash') or s_val.isHash):
def h_switch(c_val):
if hasattr(c_val, '__smartmatch__'):
return c_val.__smartmatch__(s_val)
if isinstance(c_val, dict) and (not hasattr(c_val, 'isHash') or c_val.isHash):
return s_val == c_val
if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str):
return iter_to_bool(filter(lambda _d: _d in s_val and s_val[_d], c_val))
if callable(c_val):
return c_val(s_val)
if isinstance(c_val, re.Pattern):
return iter_to_bool(filter(lambda _d: re.search(c_val, _d) and _d in s_val and s_val[_d], s_val.keys()))
return str(c_val) in s_val and s_val[str(c_val)]
return h_switch
elif isinstance(s_val, collections.abc.Iterable) and (not hasattr(s_val, 'isHash') or not s_val.isHash):
def a_switch(c_val):
if hasattr(c_val, '__smartmatch__'):
return c_val.__smartmatch__(s_val)
if isinstance(c_val, dict) and (not hasattr(c_val, 'isHash') or c_val.isHash):
return iter_to_bool(filter(lambda _d: str(_d) in c_val and c_val[str(_d)], s_val))
if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str):
for item in s_val:
if item not in c_val:
return False
return True
if callable(c_val):
return c_val(*(map(str, s_val)))
if isinstance(c_val, re.Pattern):
return iter_to_bool(filter(lambda _d: re.search(c_val, _d), map(str, s_val)))
return c_val in s_val
return a_switch
elif isinstance(s_val, re.Pattern):
def r_switch(c_val):
if hasattr(c_val, '__smartmatch__'):
return c_val.__smartmatch__(s_val)
if isinstance(c_val, dict) and (not hasattr(c_val, 'isHash') or c_val.isHash):
return iter_to_bool(filter(lambda _d: re.search(s_val, _d) and c_val[_d], c_val.keys()))
if isinstance(c_val, collections.abc.Iterable) and not isinstance(c_val, str):
return iter_to_bool(filter(lambda _d: re.search(s_val, _d), map(str, c_val)))
if callable(c_val):
return c_val(s_val)
if isinstance(c_val, re.Pattern):
return s_val.pattern == c_val.pattern
return re.search(s_val, str(c_val))
return r_switch
else:
def n_switch(c_val):
if hasattr(c_val, '__smartmatch__'):
return c_val.__smartmatch__(s_val)
return False
return n_switch
[docs]def sysread(fh, var, length, offset=0, need_len=False):
"""Read length bytes from the fh, and return the result to store in var
if need_len is False, else return a tuple with the result
and the length read. For compatability with perl, the
result is returned as a str, not bytes."""
global OS_ERROR, TRACEBACK, AUTODIE
if var is None:
var = ''
try:
s = str(os.read(fh.fileno(), length), encoding='latin1', errors='ignore')
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"sysread of {length} bytes failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
if need_len:
return (var, None)
return var
ls = len(s)
lv = len(var)
if offset < 0:
offset += lv
if offset:
if need_len:
return (var[:offset] + ('\0' * (offset-lv)) + s, ls)
else:
return var[:offset] + ('\0' * (offset-lv)) + s
if need_len:
return (s, ls)
return s
[docs]def sysseek(fh, pos, how=os.SEEK_SET):
"""Implementation of perl sysseek"""
return os.lseek(fh.fileno(), pos, how)
[docs]def system(*args):
"""Execute a command and return the return code"""
global CHILD_ERROR, AUTODIE, TRACEBACK, TRACE_RUN
if len(args) == 1:
args = args[0]
try:
sp = subprocess.run(args,text=True,stdin=sys.stdin,stdout=sys.stdout,stderr=sys.stderr,shell=need_sh(args))
except FileNotFoundError: # can happen on windows if shell=False
sp = subprocess.CompletedProcess(args, -1)
except OSError: # check if we're trying to run a perl or python script on Windows
if isinstance(args, str):
args = [args]
arg_split = args[0].split()[0]
if arg_split.endswith('.py'):
args = [sys.executable] + args
elif arg_split.endswith('.pl'):
args = ['perl'] + args
else:
raise
sp = subprocess.run(args,text=True,stdin=sys.stdin,stdout=sys.stdout,stderr=sys.stderr,shell=need_sh(args))
if TRACE_RUN:
carp(f'trace system({args}): {repr(sp)}', skip=2)
CHILD_ERROR = -1 if sp.returncode == -1 else ((sp.returncode<<8) if sp.returncode >= 0 else -sp.returncode)
if CHILD_ERROR:
if AUTODIE:
raise Die(f'system({args}): failed with rc {CHILD_ERROR}')
if TRACEBACK:
cluck(f'system({args}): failed with rc {CHILD_ERROR}',skip=2)
return CHILD_ERROR
[docs]def syswrite(fh, scalar, length=None, offset=0):
"""Implementation of perl syswrite"""
if length is None and hasattr(scalar, 'len'):
length = len(scalar)-offset
if isinstance(scalar, str):
return os.write(fh.fileno(), scalar[offset:length+offset].encode())
elif isinstance(scalar, bytes):
return os.write(fh.fileno(), scalar[offset:length+offset])
else:
return os.write(fh.fileno(), str(scalar).encode())
[docs]def tell(fh):
"""Implementation of perl tell"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
return fh.tell()
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"tell failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return -1
[docs]def telldir(DIR):
return DIR[1]
[docs]def tempdir(*args):
"""Implementation of File::Temp::tempdir()"""
template=None
options={}
start = 0
if len(args) % 2 == 1:
template = args[0]
start = 1
for i in range(start, len(args), 2):
options[args[i]] = args[i+1]
dirn=None
base=None
if template:
template = template.replace('X', '')
(base, dirn, tail) = fileparse(template)
if 'DIR' in options:
dirn = options['DIR']
return tempfile.mkdtemp(prefix=base, dir=dirn)
[docs]def tempfile_(*args):
"""Implementation of File::Temp::tempfile() in list context"""
template=None
options={}
start = 0
if len(args) % 2 == 1:
template = args[0]
start = 1
for i in range(start, len(args), 2):
options[args[i]] = args[i+1]
dirn=None
base=None
suffix=None
unlink=True
if 'TEMPLATE' in options:
template = options['TEMPLATE']
if template:
template = template.replace('X', '')
(base, dirn, tail) = fileparse(template)
if 'SUFFIX' in options:
suffix = options['SUFFIX']
if 'DIR' in options:
dirn = options['DIR']
if 'UNLINK' in options:
unlink = options['UNLINK']
fh = tempfile.NamedTemporaryFile(prefix=base, dir=dirn, suffix=suffix, delete=unlink)
def filename(fh):
return fh._name
fh._name = fh.name
fh.filename = types.MethodType(filename, fh)
return (fh, fh.name)
[docs]def tempfile_s(*args):
"""Implementation of File::Temp::tempfile() in scalar context"""
(fh, _) = tempfile_(*args)
return fh
[docs]def tempnam(template, suffix):
"""Implementation of File::Temp::tempnam()"""
template = template.replace('X', '')
(base, dirn, tail) = fileparse(template)
(fh, name) = tempfile.mkstemp(prefix=base, dir=dirn, suffix=suffix)
fh.close()
return name
[docs]def tie_call(func, _args, _kwargs=None):
"""Call a function in a package that uses TIEARRAY or TIEHASH. This is an
internal routine whose call is automatically generated."""
# This is needed to "undo" what _add_tie_methods does to a class that has TIEARRAY or TIEHASH.
# That function creates a subclass which is used as the object's class returned from TIEARRAY or
# TIEHASH which defines the python special methods like __getitem__ and __setitem__ to call
# FETCH and STORE respectively. However, inside the class that defines TIEARRAY or TIEHASH,
# we don't want indexing and other basic operations to call these special methods, so we temporarily
# change the type of the object to it's base type, then do the call, then restore it.
if _kwargs is None:
_kwargs = dict()
try:
self = _args[0]
tie_class = self.__class__
orig_class = tie_class.__bases__[0]
if(hasattr(orig_class, '__TIE_subclass__')):
self.__class__ = orig_class
except Exception: # e.g. we have no args, or self is a string or something
return func(*_args, **_kwargs)
try:
# Call the function with the class of 'self' reset to the parent class
# which doesn't have __getitem__ etc defined
return func(*_args, **_kwargs)
finally:
try:
self.__class__ = tie_class
except Exception: # e.g. self is a string or something
pass
[docs]def time():
""" Replacement for perl built-in time function"""
return (tm_py.time_ns() // 1000000000)
[docs]def timegm(sec, min, hour, mday, mon, year, wday=0, yday=0, isdst=0):
"""Replacement for perl built-in timegm function"""
if year < 1900:
year += 1900
return calendar.timegm((year, mon+1, mday, hour, min, sec, 0, 1, -1))
[docs]def timelocal(sec, min, hour, mday, mon, year, wday=0, yday=0, isdst=0):
"""Replacement for perl built-in timelocal function"""
if year < 1900:
year += 1900
try:
return tm_py.mktime((year, mon+1, mday, hour, min, sec, 0, 1, -1))
except Exception:
try:
import datetime
diff = datetime.datetime(year, mon+1, mday, hour, min, sec) - datetime.datetime.fromtimestamp(0)
return diff.total_seconds()
except Exception:
return 999999999999
[docs]def tmpfile():
"""Implementation of File::Temp tmpfile()"""
return tempfile.TemporaryFile()
[docs]def tmpnam():
"""Implementation of POSIX tmpnam() in list context"""
ntf = tempfile.NamedTemporaryFile(delete=False)
return (ntf, ntf.name)
[docs]def tmpnam_s():
"""Implementation of POSIX tmpnam() in scalar context"""
ntf = tempfile.NamedTemporaryFile(delete=False)
result = ntf.name
ntf.close()
return result
[docs]def translate(table, var, replace=True, complement=False, delete=False, squash=False):
"""Perform a tr translate operation"""
result = []
pv = None
for ch in var:
if ord(ch) > 256 and complement:
ch = chr(256)
try:
v = table[ord(ch)]
except LookupError:
v = ch
pv = None
if v is not None:
if isinstance(v, int):
v = chr(v)
if pv != v or not squash:
result.append(v)
pv = v
return ''.join(result)
[docs]def translate_and_count(table, var, replace=True, complement=False, delete=False, squash=False):
"""Perform a tr translate, but also count the # of matches"""
result = []
ctr = 0;
pv = None
for ch in var:
if ord(ch) > 256 and complement:
ch = chr(256)
try:
v = table[ord(ch)]
ctr += 1
except LookupError:
v = ch
pv = None
if v is not None:
if isinstance(v, int):
v = chr(v)
if pv != v or not squash:
result.append(v)
pv = v
if not replace:
return (var, ''.join(result))
return (''.join(result), ctr)
[docs]def translate_element(base, index, table, replace=True, complement=False, delete=False, squash=False):
"""Perform a tr translate on an array element, and also count the # of matches"""
result = []
ctr = 0;
var = _str(base[index])
pv = None
for ch in var:
if ord(ch) > 256 and complement:
ch = chr(256)
try:
v = table[ord(ch)]
ctr += 1
except LookupError:
v = ch
pv = None
if v is not None:
if isinstance(v, int):
v = chr(v)
if pv != v or not squash:
result.append(v)
pv = v
if replace:
base[index] = ''.join(result)
return ctr
return ''.join(result)
[docs]def translate_global(packname, varname, table, replace=True, complement=False, delete=False, squash=False):
"""Perform a tr translate on a global, and also count the # of matches"""
result = []
ctr = 0;
pv = None
namespace = getattr(builtins, packname)
var = _str(getattr(namespace, varname))
for ch in var:
if ord(ch) > 256 and complement:
ch = chr(256)
try:
v = table[ord(ch)]
ctr += 1
except LookupError:
v = ch
pv = None
if v is not None:
if isinstance(v, int):
v = chr(v)
if pv != v or not squash:
result.append(v)
pv = v
if replace:
setattr(namespace, varname, ''.join(result))
return ctr
return ''.join(result)
[docs]def truncate(fh, length):
"""Implementation of perl $fh->truncate method"""
global OS_ERROR, TRACEBACK, AUTODIE
try:
if hasattr(fh, 'flush'):
fh.flush()
if hasattr(fh, 'truncate'):
fh.truncate(length)
else:
if hasattr(fh, 'fileno'):
fh = fh.fileno()
os.truncate(fh, length)
return 1 # True
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
if isinstance(fh, str):
cluck(f"truncate({fh}, {length}) failed: {OS_ERROR}",skip=2)
else:
cluck(f"truncate to {length} failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return None
[docs]def ucfirst(string):
"""Implementation of ucfirst and \ u in interpolated strings: uppercase the first char of the given string"""
return string[0:1].upper() + string[1:]
[docs]def ungetc(fh, ordinal):
"""Implementation of perl $fh->ungetc method"""
# We only support putting back what was there after a getc
if hasattr(fh, "_last_pos"): # Set by _getc
fh.seek(fh._last_pos, 0)
ch = fh.read(1)
if ch == chr(ordinal):
fh.seek(fh._last_pos, 0)
delattr(fh, "_last_pos")
return
else:
fh.seek(0, 2)
raise NotImplementedError
[docs]def unlink(*args):
"""Implementation of perl unlink"""
global OS_ERROR
cnt = 0
for f in args:
try:
os.unlink(f)
cnt += 1
except Exception as e:
OS_ERROR = str(e)
return cnt
[docs]def unpack(template, bytestr):
"""Unpack bytestr using the template and return a list of values"""
if isinstance(bytestr, str):
bytestr = _str_to_bytes(bytestr)
result = []
format_and_counts = _get_pack_unpack_format_and_counts(template, (bytestr,), is_unpack=True)
start = 0
for format, _, _ in format_and_counts:
size = struct.calcsize(format)
result.extend(struct.unpack(format, bytestr[start:start+size]))
start += size
for i, r in enumerate(result):
if isinstance(r, bytes):
result[i] = _bytes_to_str(r)
return result
[docs]def updir():
"""Implementation of File::Spec->updir"""
return '..'
[docs]def utf8_decode(s):
"""Implementation of utf8::decode"""
try:
return (str(s).encode('latin-1').decode(), 1)
except Exception:
return (str(s).encode('latin-1').decode(errors='ignore'), '')
[docs]def utf8_downgrade(s, fail_ok=False):
"""Implementation of utf8::downgrade. Returns a tuple of string and success"""
if fail_ok:
try:
result = str(s)
return (result, 1)
except Exception:
return (s, 0)
return (str(s), 1)
[docs]def utf8_encode(s):
"""Implementation of utf8::encode"""
return (str(s).encode().decode('latin-1'), None)
[docs]def utf8_is_utf8(s):
"""Implementation of utf8::is_utf8"""
try:
s = str(s)
if s.isascii():
return ''
s.encode()
except Exception:
return ''
# if it looks like raw utf8, then it's not utf8 encoded
i = 0;
in_byte = 0
while i < len(s):
c = ord(s[i])
if in_byte:
if c < 0x80 or c > 0xbf:
return 1
in_byte -= 1
else:
if c < 0x80:
pass
elif c >= 0xc0 and c <= 0xdf: # 2-byte
in_byte = 1
elif c >= 0xe0 and c <= 0xef: # 3-byte
in_byte = 2
elif c >= 0xf0 and c <= 0xff: # 4-byte
in_byte = 3
else:
return 1
i += 1
if in_byte:
return 1
return ''
[docs]def utf8_native_to_unicode(c):
"""Implementation of utf8::native_to_unicode. Returns it's argument"""
return c
[docs]def utf8_unicode_to_native(c):
"""Implementation of utf8::unicode_to_native. Returns it's argument"""
return c
[docs]def utf8_upgrade(s):
"""Implementation of utf8::upgrade. Returns a tuple with the result and the length"""
result = str(s)
return (result, len(result))
[docs]def utf8_valid(s):
"""Implementation of utf8::valid. Returns 1"""
return 1
[docs]def utime(atime, mtime, *args):
"""Implementation of perl utime function"""
global TRACEBACK, AUTODIE, OS_ERROR
result = 0
OS_ERROR = ''
times = None
if atime is None and mtime is None:
pass
elif atime is None:
atime = 0
elif mtime is None:
mtime = 0
times = (atime, mtime)
for fd in args:
try:
if hasattr(fd, 'fileno') and os.utime in os.supports_fd:
fd = fd.fileno()
elif hasattr(fd, 'name'):
fd = fd.name
os.utime(fd, times)
result += 1
except Exception as _e:
OS_ERROR = str(_e)
if TRACEBACK:
cluck(f"utime({atime}, {mtime}, {fd}) failed: {OS_ERROR}",skip=2)
if AUTODIE:
raise
return result
[docs]def wait():
"""Replacement for perl wait() call"""
global CHILD_ERROR
try:
(pid, stat) = os.wait()
CHILD_ERROR = stat
return pid
except Exception:
return -1
[docs]def waitpid(pid, flags):
"""Replacement for perl waitpid() call"""
global CHILD_ERROR
try:
(rpid, stat) = os.waitpid(pid, options)
CHILD_ERROR = stat
return rpid
except Exception:
return -1
[docs]def warn(*args, skip=None):
"""Handle warn in perl"""
global INPUT_LINE_NUMBER, _INPUT_FH_NAME
def is_func_in_call_stack(func): # Warn handlers are turned off inside themselves
frame = sys._getframe(2)
while frame is not None:
if func.__code__ == frame.f_code:
return True
frame = frame.f_back
return False
if hasattr(builtins, 'CORE') and hasattr(builtins.CORE, 'GLOBAL') and \
hasattr(builtins.CORE.GLOBAL, 'warn') and callable(builtins.CORE.GLOBAL.warn) and not \
is_func_in_call_stack(builtins.CORE.GLOBAL.warn):
return builtins.CORE.GLOBAL.warn(*args)
args = list(map(_str, args))
if len(args) == 0 or len(''.join(args)) == 0:
args = ["Warning: something's wrong"]
try:
if EVAL_ERROR:
args = [EVAL_ERROR, "\t...caught"]
except Exception:
pass
if "\n" not in args[-1]:
(_, fn, lno, *_) = caller() if skip is None else caller(skip)
iln = None
ifn = None
try:
iln = fileinput.lineno()
ifn = '<fileinput>'
except RuntimeError:
iln = INPUT_LINE_NUMBER
if _INPUT_FH_NAME:
ifn = f"<{_INPUT_FH_NAME}>"
if iln and ifn:
args.append(f" at {fn} line {lno}, {ifn} line {iln}.\n")
else:
args.append(f" at {fn} line {lno}.\n")
if callable(SIG_WARN_HANDLER) and not is_func_in_call_stack(SIG_WARN_HANDLER):
arg = ''.join(args)
SIG_WARN_HANDLER(arg)
else:
print(*args, sep='', end='', file=sys.stderr)
return 1
[docs]def write_(fh, scalar, length=None, offset=0):
"""Implementation of perl $fh->write"""
if length is None and hasattr(scalar, 'len'):
length = len(scalar)-offset
if 'b' in fh.mode:
if isinstance(scalar, str):
return fh.write(scalar[offset:length+offset].encode())
elif isinstance(scalar, bytes):
return fh.write(scalar[offset:length+offset])
return fh.write(str(scalar).encode())
else:
if isinstance(scalar, str):
return fh.write(scalar[offset:length+offset])
elif isinstance(scalar, bytes):
return fh.write(scalar[offset:length+offset].decode())
return fh.write(str(scalar))
[docs]def xor_element(base, index, value):
base[index] ^= value
return base[index]