michael@0: # This Source Code Form is subject to the terms of the Mozilla Public michael@0: # License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: # file, You can obtain one at http://mozilla.org/MPL/2.0/. michael@0: michael@0: from ctypes import c_void_p, POINTER, sizeof, Structure, windll, WinError, WINFUNCTYPE, addressof, c_size_t, c_ulong michael@0: from ctypes.wintypes import BOOL, BYTE, DWORD, HANDLE, LARGE_INTEGER michael@0: michael@0: LPVOID = c_void_p michael@0: LPDWORD = POINTER(DWORD) michael@0: SIZE_T = c_size_t michael@0: ULONG_PTR = POINTER(c_ulong) michael@0: michael@0: # A ULONGLONG is a 64-bit unsigned integer. michael@0: # Thus there are 8 bytes in a ULONGLONG. michael@0: # XXX why not import c_ulonglong ? michael@0: ULONGLONG = BYTE * 8 michael@0: michael@0: class IO_COUNTERS(Structure): michael@0: # The IO_COUNTERS struct is 6 ULONGLONGs. michael@0: # TODO: Replace with non-dummy fields. michael@0: _fields_ = [('dummy', ULONGLONG * 6)] michael@0: michael@0: class JOBOBJECT_BASIC_ACCOUNTING_INFORMATION(Structure): michael@0: _fields_ = [('TotalUserTime', LARGE_INTEGER), michael@0: ('TotalKernelTime', LARGE_INTEGER), michael@0: ('ThisPeriodTotalUserTime', LARGE_INTEGER), michael@0: ('ThisPeriodTotalKernelTime', LARGE_INTEGER), michael@0: ('TotalPageFaultCount', DWORD), michael@0: ('TotalProcesses', DWORD), michael@0: ('ActiveProcesses', DWORD), michael@0: ('TotalTerminatedProcesses', DWORD)] michael@0: michael@0: class JOBOBJECT_BASIC_AND_IO_ACCOUNTING_INFORMATION(Structure): michael@0: _fields_ = [('BasicInfo', JOBOBJECT_BASIC_ACCOUNTING_INFORMATION), michael@0: ('IoInfo', IO_COUNTERS)] michael@0: michael@0: # see http://msdn.microsoft.com/en-us/library/ms684147%28VS.85%29.aspx michael@0: class JOBOBJECT_BASIC_LIMIT_INFORMATION(Structure): michael@0: _fields_ = [('PerProcessUserTimeLimit', LARGE_INTEGER), michael@0: ('PerJobUserTimeLimit', LARGE_INTEGER), michael@0: ('LimitFlags', DWORD), michael@0: ('MinimumWorkingSetSize', SIZE_T), michael@0: ('MaximumWorkingSetSize', SIZE_T), michael@0: ('ActiveProcessLimit', DWORD), michael@0: ('Affinity', ULONG_PTR), michael@0: ('PriorityClass', DWORD), michael@0: ('SchedulingClass', DWORD) michael@0: ] michael@0: michael@0: # see http://msdn.microsoft.com/en-us/library/ms684156%28VS.85%29.aspx michael@0: class JOBOBJECT_EXTENDED_LIMIT_INFORMATION(Structure): michael@0: _fields_ = [('BasicLimitInformation', JOBOBJECT_BASIC_LIMIT_INFORMATION), michael@0: ('IoInfo', IO_COUNTERS), michael@0: ('ProcessMemoryLimit', SIZE_T), michael@0: ('JobMemoryLimit', SIZE_T), michael@0: ('PeakProcessMemoryUsed', SIZE_T), michael@0: ('PeakJobMemoryUsed', SIZE_T)] michael@0: michael@0: # XXX Magical numbers like 8 should be documented michael@0: JobObjectBasicAndIoAccountingInformation = 8 michael@0: michael@0: # ...like magical number 9 comes from michael@0: # http://community.flexerasoftware.com/archive/index.php?t-181670.html michael@0: # I wish I had a more canonical source michael@0: JobObjectExtendedLimitInformation = 9 michael@0: michael@0: class JobObjectInfo(object): michael@0: mapping = { 'JobObjectBasicAndIoAccountingInformation': 8, michael@0: 'JobObjectExtendedLimitInformation': 9 michael@0: } michael@0: structures = { 8: JOBOBJECT_BASIC_AND_IO_ACCOUNTING_INFORMATION, michael@0: 9: JOBOBJECT_EXTENDED_LIMIT_INFORMATION michael@0: } michael@0: def __init__(self, _class): michael@0: if isinstance(_class, basestring): michael@0: assert _class in self.mapping, 'Class should be one of %s; you gave %s' % (self.mapping, _class) michael@0: _class = self.mapping[_class] michael@0: assert _class in self.structures, 'Class should be one of %s; you gave %s' % (self.structures, _class) michael@0: self.code = _class michael@0: self.info = self.structures[_class]() michael@0: michael@0: michael@0: QueryInformationJobObjectProto = WINFUNCTYPE( michael@0: BOOL, # Return type michael@0: HANDLE, # hJob michael@0: DWORD, # JobObjectInfoClass michael@0: LPVOID, # lpJobObjectInfo michael@0: DWORD, # cbJobObjectInfoLength michael@0: LPDWORD # lpReturnLength michael@0: ) michael@0: michael@0: QueryInformationJobObjectFlags = ( michael@0: (1, 'hJob'), michael@0: (1, 'JobObjectInfoClass'), michael@0: (1, 'lpJobObjectInfo'), michael@0: (1, 'cbJobObjectInfoLength'), michael@0: (1, 'lpReturnLength', None) michael@0: ) michael@0: michael@0: _QueryInformationJobObject = QueryInformationJobObjectProto( michael@0: ('QueryInformationJobObject', windll.kernel32), michael@0: QueryInformationJobObjectFlags michael@0: ) michael@0: michael@0: class SubscriptableReadOnlyStruct(object): michael@0: def __init__(self, struct): michael@0: self._struct = struct michael@0: michael@0: def _delegate(self, name): michael@0: result = getattr(self._struct, name) michael@0: if isinstance(result, Structure): michael@0: return SubscriptableReadOnlyStruct(result) michael@0: return result michael@0: michael@0: def __getitem__(self, name): michael@0: match = [fname for fname, ftype in self._struct._fields_ michael@0: if fname == name] michael@0: if match: michael@0: return self._delegate(name) michael@0: raise KeyError(name) michael@0: michael@0: def __getattr__(self, name): michael@0: return self._delegate(name) michael@0: michael@0: def QueryInformationJobObject(hJob, JobObjectInfoClass): michael@0: jobinfo = JobObjectInfo(JobObjectInfoClass) michael@0: result = _QueryInformationJobObject( michael@0: hJob=hJob, michael@0: JobObjectInfoClass=jobinfo.code, michael@0: lpJobObjectInfo=addressof(jobinfo.info), michael@0: cbJobObjectInfoLength=sizeof(jobinfo.info) michael@0: ) michael@0: if not result: michael@0: raise WinError() michael@0: return SubscriptableReadOnlyStruct(jobinfo.info) michael@0: michael@0: def test_qijo(): michael@0: from killableprocess import Popen michael@0: michael@0: popen = Popen('c:\\windows\\notepad.exe') michael@0: michael@0: try: michael@0: result = QueryInformationJobObject(0, 8) michael@0: raise AssertionError('throw should occur') michael@0: except WindowsError, e: michael@0: pass michael@0: michael@0: try: michael@0: result = QueryInformationJobObject(0, 1) michael@0: raise AssertionError('throw should occur') michael@0: except NotImplementedError, e: michael@0: pass michael@0: michael@0: result = QueryInformationJobObject(popen._job, 8) michael@0: if result['BasicInfo']['ActiveProcesses'] != 1: michael@0: raise AssertionError('expected ActiveProcesses to be 1') michael@0: popen.kill() michael@0: michael@0: result = QueryInformationJobObject(popen._job, 8) michael@0: if result.BasicInfo.ActiveProcesses != 0: michael@0: raise AssertionError('expected ActiveProcesses to be 0') michael@0: michael@0: if __name__ == '__main__': michael@0: print "testing." michael@0: test_qijo() michael@0: print "success!"