Skip to content

Instantly share code, notes, and snippets.

@arubdesu
Last active June 21, 2018 14:43
Show Gist options
  • Save arubdesu/c4f60e653cbe851827bd6e6023a1b396 to your computer and use it in GitHub Desktop.
Save arubdesu/c4f60e653cbe851827bd6e6023a1b396 to your computer and use it in GitHub Desktop.
The auditing project I discussed at Philly MacAdmins, April 21 2016 (unfortunately crashed and burned on older python...)
#!/usr/bin/python
"""Collect inventory via osquery, etc"""
import CoreFoundation
import csv
import datetime
import os
import platform
import plistlib
import subprocess
import sys
from xml.etree import ElementTree as et
CWD = os.path.dirname(__file__)
sys.path.append(CWD + '/osquery')
try:
import osquery
except ImportError:
pass
sys.path.append(CWD + '/gmacpyutil')
_CD_APP = os.path.join(CWD, "gmacpyutil/gmacpyutil/CocoaDialog.app")
_CD = '%s/Contents/MacOS/CocoaDialog' % _CD_APP
from gmacpyutil import cocoadialog
#pylint: disable=no-name-in-module
from SystemConfiguration import SCNetworkInterfaceCopyAll, \
SCNetworkInterfaceGetLocalizedDisplayName, SCNetworkInterfaceGetBSDName, \
SCNetworkInterfaceGetHardwareAddressString, SCDynamicStoreCreate, \
SCDynamicStoreCopyValue, SCDynamicStoreCopyConsoleUser
from Foundation import NSKeyedUnarchiver
from struct import unpack
def extract_share(bookmark_data):
'''pulls out mount_URL from bookmark data - enter the frogor'''
content_offset, = unpack('I', bookmark_data[12:16])
#pylint: disable=C0103, W0612
first_TOC, = unpack('I', bookmark_data[content_offset:content_offset+4])
first_TOC += content_offset
TOC_len, rec_type, level, next_TOC, record_count = unpack('IIIII', \
bookmark_data[first_TOC:first_TOC+20])
TOC_cursor = first_TOC + 20
record_offsets = {}
for i in range(record_count):
record_id, offset = unpack('<IQ', bookmark_data[TOC_cursor:TOC_cursor+12])
record_offsets[record_id] = offset + content_offset
TOC_cursor += 12
mount_record = record_offsets[0x2050]
mount_length, rec_type = unpack('II', bookmark_data[mount_record:mount_record+8])
mount_record += 8
mount_URL = (bookmark_data[mount_record:mount_record+mount_length]).decode('utf-8')
return mount_URL
def get_recentservers(sfl_file_path):
'''Read the com.apple.LSSharedFileList.RecentServers.sfl file'''
with open(sfl_file_path, 'rb') as f:
raw_data = f.read()
recent_servers = NSKeyedUnarchiver.unarchiveObjectWithData_(buffer(raw_data))
server_URLs = []
for x in sorted(recent_servers['items'], lambda y, _: int(y.order())):
server_URLs.append(extract_share(x.bookmark()[:].tobytes()))
return server_URLs
def pull_plistval(path, val):
'''Reads binary and XML plists, returns stanza for further parsing'''
return CoreFoundation.CFPreferencesCopyAppValue(val, path)
def get_adinfo(net_config):
'''Returns the ad directory info'''
ad_info = SCDynamicStoreCopyValue(net_config, \
"com.apple.opendirectoryd.ActiveDirectory")
if ad_info:
return ad_info
else:
return {}
def get_adadmins():
'''Returns list of ad-connector-configured admin groups'''
try:
nested_admins = \
plistlib.readPlistFromString(subprocess.check_output( \
['/usr/sbin/dsconfigad', '-show', '-xml']))
if nested_admins:
return nested_admins['Administrative']['Allowed admin groups']
except:
return {}
def get_currentuser():
pass
def get_computername(net_config):
'''Returns the ComputerName of this Mac'''
sys_info = SCDynamicStoreCopyValue(net_config, "Setup:/System")
try:
return sys_info['ComputerName']
except:
return 'compnamenotset'
def get_printers():
'''Returns name, ppd, and uri of printers'''
full_prints = \
plistlib.readPlistFromString(subprocess.check_output( \
['/usr/sbin/system_profiler', 'SPPrintersDataType', '-xml']))
all_printdicts = full_prints[0]['_items']
just_prints = {}
for printer in all_printdicts:
just_prints[printer.get('_name', None)] = [printer.get('ppd', None),
printer.get('uri', None)]
return just_prints
def get_dnsinfo(net_config):
'''Returns the currently considered primary network interface'''
dns_info = SCDynamicStoreCopyValue(net_config, "State:/Network/Global/DNS")
return dict(dns_info)
def get_primaryinterface(net_config):
'''Returns the currently considered primary network interface'''
states = SCDynamicStoreCopyValue(net_config, "State:/Network/Global/IPv4")
if states:
return states['PrimaryInterface']
else:
return None
def lookup_guids(net_config, guid):
'''Returns net iface for IPv4 guid'''
netname = SCDynamicStoreCopyValue(net_config, "Setup:/Network/Service/%s" %
guid)
if netname:
return netname['UserDefinedName']
else:
return None
def get_is_dhcp(net_config, guid):
'''Returns net iface for IPv4 guid'''
config = SCDynamicStoreCopyValue(net_config,
"Setup:/Network/Service/%s/IPv4" %
guid)
if config:
if ['ConfigMethod'] == 'DHCP':
return True
else:
return False
def get_ip_address(net_config, iface):
'''Returns IP for provided interface'''
try:
addresses = SCDynamicStoreCopyValue(net_config,
"State:/Network/Interface/%s/IPv4" %
iface)
return addresses['Addresses'][0]
except TypeError:
return None
def get_networkinterfacelist(net_config):
'''Returns a list of all network interface names/MACs'''
all_media = {}
names_forguids = []
guids = {}
network_interfaces = SCNetworkInterfaceCopyAll()
for interface in network_interfaces:
bsdname = SCNetworkInterfaceGetBSDName(interface)
disp_name = SCNetworkInterfaceGetLocalizedDisplayName(interface)
mac_addy = SCNetworkInterfaceGetHardwareAddressString(interface)
if not disp_name == 'Bluetooth PAN':
all_media[disp_name] = [get_ip_address(net_config, bsdname),
mac_addy, bsdname]
names_forguids.append(disp_name)
guid_list = SCDynamicStoreCopyValue(net_config, "Setup:/Network/Global/IPv4")
for guid in guid_list['ServiceOrder']:
if guid:
lookd_up = lookup_guids(net_config, guid)
guids[lookd_up] = guid
for iface_key in names_forguids:
associated_guid = guids.get(iface_key, None)
check_dhcp = get_is_dhcp(net_config, associated_guid)
if check_dhcp:
all_media[iface_key].append('DHCP')
return all_media.items()
def parse_groups(group, result_dict):
'''takes group to parse and current output, returns filtered list'''
parsed = []
stripd_usrs = result_dict.get('users')
group_usrs = result_dict.get(group)
for user_dict in stripd_usrs:
new_dict = {}
this_uid = user_dict.get('uid')
for grp_uid in group_usrs:
if this_uid == grp_uid.get('uid'):
new_dict[user_dict.get('username')] = this_uid
parsed.append(new_dict)
return parsed
def manual_sysinfo(net_config):
'''for my 10.7 and 10.8 brethren'''
sysinfo_dict = {}
sysinfo_dict['hostname'] = SCDynamicStoreCopyValue(net_config, "Setup:/System")
model_cmd = ['/usr/sbin/sysctl', '-n', 'hw.model']
memsize_cmd = ['/usr/sbin/sysctl', '-n', 'hw.memsize']
machdep_cmd = ['/usr/sbin/sysctl', '-n', 'machdep.cpu.brand_string']
sysinfo_dict['hardware_model'] = subprocess.check_output(model_cmd)
sysinfo_dict['physical_memory'] = subprocess.check_output(memsize_cmd)
sysinfo_dict['cpu_brand'] = subprocess.check_output(machdep_cmd)
serial_cmd = ['/usr/sbin/system_profiler', 'SPHardwareDataType', '-xml']
serial_plist = subprocess.check_output(serial_cmd)
serial = (plistlib.readPlistFromString(serial_plist))[0]['_items'][0]['serial_number']
sysinfo_dict['cpu_serial'] = serial
return sysinfo_dict
def manual_fw():
'''return fw state dict'''
fw_dict = {}
fw_dict['globalstate'] = pull_plistval('globalstate',
'/Library/Preferences/com.apple.alf.plist')
return fw_dict
def manual_fvtwo():
'''return filevault status'''
fvtwo = []
try:
fvtwo.append(subprocess.check_output(['/usr/bin/fdesetup', 'status']))
except:
pass
return fvtwo
def manual_admins():
'''gets admin group membership'''
admins, ssh, ard = [], [], []
ssh_grp = '/Groups/com.apple.access_ssh'
ard_grp = '/Groups/com.apple.access_screensharing'
admin_dsclist = ['/usr/bin/dscl', '.', 'read', '/Groups/admin',
'GroupMembership']
admins.append((subprocess.check_output(admin_dsclist)).strip())
ssh_dsclist = ['/usr/bin/dscl', '.', 'read', ssh_grp, 'GroupMembership',
'NestedGroups']
try:
ssh.append((subprocess.check_output(ssh_dsclist)).strip())
except:
pass
ard_dsclist = ['/usr/bin/dscl', '.', 'read', ard_grp, 'GroupMembership',
'NestedGroups']
try:
ard.append((subprocess.check_output(ard_dsclist)).strip())
except:
pass
return admins, ssh, ard
def old_inventory(net_config):
'''mem/cpu/name/serial/model/fwState/dsclARD/ssh/admin/fv2/disk'''
all_sys_info = {}
all_sys_info['system_info'] = manual_sysinfo(net_config)
all_sys_info['alf'] = manual_fw()
all_sys_info['disk_encryption'] = manual_fvtwo()
admins, ssh, ard = manual_admins()
all_sys_info['ARD'] = ard
all_sys_info['ssh'] = ssh
all_sys_info['admins'] = admins
return all_sys_info
def run_osquery(sql_dict):
"""take sql command you'd like output from osquery for, returns..."""
result_dict = {}
instance = osquery.SpawnInstance(path='%s/osqueryd' % CWD)
instance.open()
for name, quer in sql_dict.items():
results = instance.client.query(quer)
result_dict[name] = results.response
return result_dict
def pokemon():
"""returns inventory items"""
query_dict = {}
result_dict = {}
inventory = [('hostname, cpu_serial, hardware_model, physical_memory, \
cpu_brand', 'system_info'),
('major, minor, patch, build', 'os_version'),
('global_state', 'alf'),
('name, path, bundle_identifier, bundle_name, \
bundle_short_version, bundle_version, copyright, \
bundle_executable', 'apps')
]
for query in inventory:
sql_string = ''.join(['select ', query[0], ' from ', query[1]])
query_dict[query[1]] = sql_string
complex_qs = [("select uid from user_groups where gid like '80' and uid \
not like '0'", 'admins'),
("select uid from user_groups where gid like '398' and uid \
not like '0'", 'ARD'),
("select uid from user_groups where gid like '399' and uid \
not like '0'", 'ssh'),
("select uid, gid, username, description, directory from \
users where shell not like '/usr/%' and uid not like '0' \
and uid not like '248'", 'users'),
("select * from disk_encryption where encrypted like '1'",
'disk_encryption'),
("select device, path, blocks, blocks_size, blocks_free, \
blocks_available from mounts where device not like 'map%' \
and device not like 'dev%'", 'mounts'),
("select name, label from block_devices where type not like \
'Virtual Interface' and label like '%Media'", 'block_devices')
]
for quer in complex_qs:
query_dict[quer[1]] = quer[0]
result_dict = run_osquery(query_dict)
result_dict['admins'] = parse_groups('admins', result_dict)
result_dict['ARD'] = parse_groups('ARD', result_dict)
result_dict['ssh'] = parse_groups('ssh', result_dict)
result_dict['system_info'] = result_dict.get('system_info')[0]
result_dict['alf'] = result_dict.get('alf')[0]
return result_dict
def post_approve(net_config):
'''Build most of the non-osq dicts'''
extend = {}
extend['ad_info'] = dict(get_adinfo(net_config))
extend['ad_groups'] = get_adadmins()
extend['current_user'] = SCDynamicStoreCopyConsoleUser(None, None, None)[0]
extend['network_interfaces'] = get_networkinterfacelist(net_config)
extend['primary_interface'] = get_primaryinterface(net_config)
extend['printers'] = get_printers()
return extend
def curler(filepath, email, descript):
'''curl our csv to Box'''
curl = '/usr/bin/curl'
url = 'https://upload.box.com/api/1.0/not/showing/therest'
frst_txt = 'new_file_1=@'
middle_txt = 'check_name_conflict_folder_option=1'
eml_blrb = 'uploader_email='
late_txt = 'description='
command = [curl, url, '-F', frst_txt + filepath, '-F', middle_txt, '-F',
eml_blrb + email, '-F', str(late_txt + descript)]
print command
try:
response = subprocess.check_output(command)
xmldata = et.fromstring(response)
status = xmldata.find("status").text
if status == 'upload_ok':
return 'Completed successfully'
else:
return 'Something went wrong'
except subprocess.CalledProcessError as err:
print err
return 'Something went wrong'
def cleanup(result):
'''catch errors with feedback, or show success'''
if result == 'Something went wrong':
tooold_msg = cocoadialog.MsgBox(title="Sorry!")
tooold_msg.SetText("There was an error submitting your inventory. Quitting")
tooold_msg.Show()
sys.exit(0)
# else:
# complete_msg = cocoadialog.MsgBox(title="Complete")
# complete_msg.SetText("Thank you for your help!")
# button_pressed = complete_msg.Show()
# button = button_pressed.split('\n')[0]
# if button:
# sys.exit(0)
def how_proceed(this_os, filename_now, net_config):
cd_title = "Org Device Registration"
email_prompt = \
cocoadialog.Standard_InputBox(title=cd_title)
email_prompt.SetInformativeText("Please enter your email address, you'll then be taken to a webpage for more information.")
email_prompt.SetText("@college.edu")
email_entered = email_prompt.Show()
button, email = email_entered.split('\n')[:2]
print button
if button == 'Cancel':
sys.exit(0)
machine_primary = get_primaryinterface(net_config)
das_url = 'http://registration.url'
macproc = subprocess.Popen(['/usr/sbin/networksetup', '-getmacaddress',
machine_primary], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = macproc.communicate()
mac_addy = (stdout[18:35]).replace(':', '-')
utf8_compyname = get_computername(net_config)
compyname = utf8_compyname.encode('ascii', 'ignore').decode('ascii')
userid = os.getlogin()
subprocess.Popen(['/usr/bin/open', das_url + mac_addy + '&email=' + email
+ '&userid=' + userid + '&computername=' +
compyname])
if this_os < 7:
result_dict = {}
cmd = ['/usr/sbin/system_profiler', '-xml', 'SPHardwareDataType']
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
output = proc.communicate()[0]
plist = plistlib.readPlistFromString(output)
result_dict['serial'] = plist[0]['_items'][0]['serial_number']
result_dict['current_user'] = SCDynamicStoreCopyConsoleUser(None, None, None)[0]
our_filepath = os.path.join('/tmp/', result_dict.get('serial') + '_' \
+ filename_now + '.csv')
with open(our_filepath, 'w') as target:
writer = csv.writer(target)
writer.writerows(zip(result_dict.keys(), result_dict.values()))
response = curler(our_filepath, email, result_dict.get('current_user'))
cleanup(response)
except:
cleanup('Something went wrong')
else:
return email
def main():
'''gimme some main'''
this_os = int(platform.mac_ver()[0].split('.')[1])
now = str(datetime.datetime.now())[:-7]
filename_now = (now.replace(':', '-')).replace(' ', '_')
net_config = SCDynamicStoreCreate(None, "net", None, None)
email = how_proceed(this_os, filename_now, net_config)
extend = post_approve(net_config)
extend['DateCollected'] = now
extend['captured_email'] = email
descript = extend.get('current_user')
if this_os < 9:
result_dict = old_inventory(net_config)
result_dict['recent_servers'] = pull_plistval('/Users/' + descript + \
'/Library/Preferences/com.apple.sidebarlists.plist', 'favoriteservers')
else:
result_dict = pokemon()
stanza = result_dict.get('os_version')
if stanza[0]['minor'] == '11':
#pylint: disable=C0103
path_str = 'Library/Application Support/com.apple.sharedfilelist/com.apple.LSSharedFileList.RecentServers.sfl'
try:
result_dict['recent_servers'] = list(get_recentservers(str(os.path.join('/Users', descript, path_str))))
except IOError:
pass
else:
quear = "select subkey, value from preferences where key = \
'favoriteservers' and subkey = 'CustomListItems//URL' and \
path = '/Users/" + descript + \
"/Library/Preferences/com.apple.sidebarlists.plist';"
new_dict = {'recent_servers': str(quear)}
result_dict['recent_servers'] = run_osquery(new_dict)
pars_serial = result_dict.get('system_info')
serial = pars_serial.get('cpu_serial')
result_dict.update(extend)
print result_dict
our_filepath = os.path.join('/tmp/', serial + '_' + filename_now + '.csv')
print our_filepath
with open(our_filepath, 'w') as target:
writer = csv.writer(target)
writer.writerows(zip(result_dict.keys(), result_dict.values()))
response = curler(our_filepath, email, descript)
cleanup(response)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment