-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathazuremetadata
executable file
·146 lines (127 loc) · 5.24 KB
/
azuremetadata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/python3
import argparse
import io
import os
import random
import sys
import urllib.error
import urllib.request
from azuremetadata import azuremetadatautils, azuremetadata
class PreserveArgumentOrder(argparse.Action):
def __call__(self, parser, namespace, value, option_string=None):
if 'ordered_args' not in namespace:
setattr(namespace, 'ordered_args', [])
namespace.ordered_args.append((self.dest, value))
api_version_parser = argparse.ArgumentParser(add_help=False)
api_version_parser.add_argument('-a', '--api', nargs='?', const=None)
api_version_parser.add_argument('--device', nargs='?', const=None)
api_args, _ = api_version_parser.parse_known_args()
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-h', '--help', action="store_true", help="Display help")
parser.add_argument('-x', '--xml', action="store_true", help="Output as XML")
parser.add_argument('-j', '--json', action="store_true", help="Output as JSON")
parser.add_argument('-o', '--output', help="Output file path")
parser.add_argument('-a', '--api',
help="API version or 'latest' for newest API version")
# Only root can read the tag, thus hide the argument
if os.geteuid() == 0:
help_msg = "Device to read disk tag from (default: root device)"
parser.add_argument('--device',
help=help_msg,
nargs='?'
)
parser.add_argument('--listapis', action="store_true",
help="List available API versions")
with io.StringIO() as string_io:
parser.print_help(string_io)
help_header = string_io.getvalue()
# IMDS is not intended to be used behind a proxy and IMDS does not support it
os.environ['no_proxy'] = '169.254.169.254'
try:
metadata = azuremetadata.AzureMetadata(api_args.api)
data = {}
# Handle instances in ASM, aka Classic
# Heuristic data: When requesting attestedData in ASM it triggers an
# internal server error, that's the best thing we have to go on to
# figure out whether or not we are in ASM.
# ASM gets retired in 2023, rip this code out, it's ugly!
instance_compute_url = 'http://169.254.169.254/metadata/instance/compute'
req = urllib.request.Request(
'%s?api-version=2019-11-01' % instance_compute_url,
headers={'Metadata': 'true'}
)
try:
response = urllib.request.urlopen(req, timeout=2)
except urllib.error.HTTPError as e:
if e.getcode() == 404:
# Set the api version to the first implementation, it works in ASM
metadata.set_api_version('2017-04-02')
# Special code for SUSE, ugh becasue we know what we are
# looking for there is unfortunately no better way.
kvp_pool_file_path = '/var/lib/hyperv/.kvp_pool_3'
if os.path.exists(kvp_pool_file_path):
kvp_key_size = 512
kvp_value_size = 2048
with open(kvp_pool_file_path, 'rb') as kvp_pool_3:
while True:
key = kvp_pool_3.read(kvp_key_size)
value = kvp_pool_3.read(kvp_value_size)
if key and value:
if key.split(b"\x00")[0] == b'VirtualMachineId':
vm_id = value.split(b"\x00")[0]
data['subscriptionId'] = 'classic-{}'.format(
vm_id.lower().decode('utf-8')
)
break
else:
data['subscriptionId'] = 'classic-{}'.format(
random.randint(0, 1e9)
)
# ensuring that --attestedData --signature and
# --signature are available
data['attestedData'] = {}
data['attestedData']['signature'] = ''
data['signature'] = ''
# End code removal in 2023
# Only root can read the tag only add the value if we are root
if os.geteuid() == 0:
data['billingTag'] = metadata.get_disk_tag(api_args.device)
data.update(metadata.get_all())
util = azuremetadatautils.AzureMetadataUtils(data)
for key in util.available_params.keys():
parser.add_argument(
'--{}'.format(key), nargs='?', type=int, const=True,
action=PreserveArgumentOrder
)
args = parser.parse_args()
ordered_args = getattr(args, 'ordered_args', [])
if args.help:
print(help_header)
print("\nquery arguments:")
util.print_help()
exit()
if args.listapis:
api_versions = metadata.list_api_versions()
print('Available API versions:')
for api in api_versions:
print(' {}'.format(api))
exit()
fh = None
if args.output:
fh = open(args.output, 'w')
try:
if not len(ordered_args):
util.print_pretty(
print_xml=args.xml, print_json=args.json, file=fh
)
else:
result = util.query(ordered_args)
util.print_pretty(
print_xml=args.xml, print_json=args.json, data=result, file=fh
)
finally:
if fh:
fh.close()
except azuremetadatautils.QueryException as e:
print(e, file=sys.stderr)
exit(1)