Skip to content

Commit

Permalink
mostly working ACMEv2, except for letsencrypt/boulder#3367
Browse files Browse the repository at this point in the history
  • Loading branch information
diafygi committed Jan 15, 2018
1 parent 4ed1395 commit bb248e0
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 116 deletions.
226 changes: 111 additions & 115 deletions acme_tiny.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,86 @@
except ImportError:
from urllib2 import urlopen # Python 2

#DEFAULT_CA = "https://acme-staging.api.letsencrypt.org"
DEFAULT_CA = "https://acme-v01.api.letsencrypt.org"
#DEFAULT_CA = "https://acme-staging-v02.api.letsencrypt.org"
DEFAULT_CA = "https://acme-v02.api.letsencrypt.org"

LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)

def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA):
# helper function base64 encode for jose spec
def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False):
directory, acct_headers, alg, jwk = None, None, None, None # global variables

# helper functions - base64 encode for jose spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")

# helper function - run external commands
def _cmd(cmd_list, cmd_input=None):
stdin = subprocess.PIPE if cmd_input is not None else None
proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate(cmd_input)
return proc, out, err

# helper function - make request and automatically parse json response
def _do_request(url, data=None, err_msg="Error"):
try:
resp = urlopen(url, data)
resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers
resp_data = json.loads(resp_data) # try to parse json results
except ValueError:
pass # ignore json parsing errors
except IOError as e:
resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
code, headers = getattr(e, "code", None), {}
if code not in [200, 201, 204]:
raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data))
return resp_data, code, headers

# helper function - make signed requests
def _send_signed_request(url, payload, err_msg):
payload64 = _b64(json.dumps(payload).encode('utf8'))
new_nonce = _do_request(directory['newNonce'])[2]['Replay-Nonce']
protected = {"url": url, "alg": alg, "nonce": new_nonce}
protected.update({"jwk": jwk} if acct_headers is None else {"kid": acct_headers['Location']})
protected64 = _b64(json.dumps(protected).encode('utf8'))
protected_input = "{0}.{1}".format(protected64, payload64).encode('utf8')
proc, out, err = _cmd(["openssl", "dgst", "-sha256", "-sign", account_key], cmd_input=protected_input)
if proc.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
data = json.dumps({"protected": protected64, "payload": payload64, "signature": _b64(out)})
return _do_request(url, data=data.encode('utf8'), err_msg=err_msg)

# helper function - poll until complete
def _poll_until_not(url, pending_statuses, err_msg):
while True:
result, _, _ = _do_request(url, err_msg=err_msg)
if result['status'] in pending_statuses:
time.sleep(2)
continue
return result

# parse account key to get public key
log.info("Parsing account key...")
proc = subprocess.Popen(["openssl", "rsa", "-in", account_key, "-noout", "-text"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
proc, out, err = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"])
if proc.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
out.decode('utf8'), re.MULTILINE|re.DOTALL).groups()
pub_pattern = r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)"
pub_hex, pub_exp = re.search(pub_pattern, out.decode('utf8'), re.MULTILINE|re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
header = {
"alg": "RS256",
"jwk": {
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"kty": "RSA",
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
},
alg = "RS256"
jwk = {
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"kty": "RSA",
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
}
accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))
accountkey_json = json.dumps(jwk, sort_keys=True, separators=(',', ':'))
thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest())

# helper function make signed requests
def _send_signed_request(url, payload):
payload64 = _b64(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(header)
protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce']
protected64 = _b64(json.dumps(protected).encode('utf8'))
proc = subprocess.Popen(["openssl", "dgst", "-sha256", "-sign", account_key],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8'))
if proc.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
data = json.dumps({
"header": header, "protected": protected64,
"payload": payload64, "signature": _b64(out),
})
try:
resp = urlopen(url, data.encode('utf8'))
return resp.getcode(), resp.read()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()

# find domains
log.info("Parsing CSR...")
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-noout", "-text"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
proc, out, err = _cmd(["openssl", "req", "-in", csr, "-noout", "-text"])
if proc.returncode != 0:
raise IOError("Error loading {0}: {1}".format(csr, err))
domains = set([])
Expand All @@ -77,93 +96,69 @@ def _send_signed_request(url, payload):
for san in subject_alt_names.group(1).split(", "):
if san.startswith("DNS:"):
domains.add(san[4:])
log.info("Found domains: {}".format(", ".join(domains)))

# get the ACME directory of urls
log.info("Getting directory...")
directory, _, _ = _do_request(CA + "/directory", err_msg="Error getting directory")
log.info("Directory found!")

# get the certificate domains and expiration
# create account and set the global key identifier
log.info("Registering account...")
code, result = _send_signed_request(CA + "/acme/new-reg", {
"resource": "new-reg",
"agreement": json.loads(urlopen(CA + "/directory").read().decode('utf8'))['meta']['terms-of-service'],
})
if code == 201:
log.info("Registered!")
elif code == 409:
log.info("Already registered!")
else:
raise ValueError("Error registering: {0} {1}".format(code, result))

# verify each domain
for domain in domains:
reg_payload = {"termsOfServiceAgreed": True}
account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering")
log.info("Registered!" if code == 201 else "Already registered!")

# create a new order
log.info("Creating new order...")
order_payload = {"identifiers": [{"type": "dns", "value": d} for d in domains]}
order, _, order_headers = _send_signed_request(directory['newOrder'], order_payload, "Error creating new order")
log.info("Order created!")

# get the authorizations that need to be completed
for auth_url in order['authorizations']:
authorization, _, _ = _do_request(auth_url, err_msg="Error getting challenges")
domain = authorization['identifier']['value']
log.info("Verifying {0}...".format(domain))

# get new challenge
code, result = _send_signed_request(CA + "/acme/new-authz", {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
})
if code != 201:
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))

# make the challenge file
challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0]
# find the http-01 challenge and write the challenge file
challenge = [c for c in authorization['challenges'] if c['type'] == "http-01"][0]
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
keyauthorization = "{0}.{1}".format(token, thumbprint)
wellknown_path = os.path.join(acme_dir, token)
with open(wellknown_path, "w") as wellknown_file:
wellknown_file.write(keyauthorization)

# check that the file is in place
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
try:
resp = urlopen(wellknown_url)
resp_data = resp.read().decode('utf8').strip()
assert resp_data == keyauthorization
except (IOError, AssertionError):
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
assert(disable_check or _do_request(wellknown_url)[0] == keyauthorization)
except (AssertionError, ValueError) as e:
os.remove(wellknown_path)
raise ValueError("Wrote file to {0}, but couldn't download {1}".format(
wellknown_path, wellknown_url))

# notify challenge are met
code, result = _send_signed_request(challenge['uri'], {
"resource": "challenge",
"keyAuthorization": keyauthorization,
})
if code != 202:
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))

# wait for challenge to be verified
while True:
try:
resp = urlopen(challenge['uri'])
challenge_status = json.loads(resp.read().decode('utf8'))
except IOError as e:
raise ValueError("Error checking challenge: {0} {1}".format(
e.code, json.loads(e.read().decode('utf8'))))
if challenge_status['status'] == "pending":
time.sleep(2)
elif challenge_status['status'] == "valid":
log.info("{0} verified!".format(domain))
os.remove(wellknown_path)
break
else:
raise ValueError("{0} challenge did not pass: {1}".format(
domain, challenge_status))

# get the new certificate
raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e))

# say the challenge is done
challenge_payload = {"keyAuthorization": keyauthorization}
_send_signed_request(challenge['url'], challenge_payload, "Error submitting challenges: {0}".format(domain))
authorization = _poll_until_not(auth_url, ["pending"], "Error checking challenge status for {0}".format(domain))
if authorization['status'] != "valid":
raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization))
log.info("{0} verified!".format(domain))

# finalize the order with the csr
log.info("Signing certificate...")
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
csr_der, err = proc.communicate()
code, result = _send_signed_request(CA + "/acme/new-cert", {
"resource": "new-cert",
"csr": _b64(csr_der),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))

# return signed certificate!
proc, csr_der, err = _cmd(["openssl", "req", "-in", csr, "-outform", "DER"])
_send_signed_request(order['finalize'], {"csr": _b64(csr_der)}, "Error finalizing order")

# poll the order to monitor when it's done
order = _poll_until_not(order_headers['Location'], ["pending", "processing"], "Error checking order status")
if order['status'] != "valid":
raise ValueError("Order failed: {0}".format(order))

# download the certificate
certificate_pem, _, _ = _do_request(order['certificate'], err_msg="Certificate download failed")
log.info("Certificate signed!")
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
"\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))
return certificate_pem

def main(argv):
parser = argparse.ArgumentParser(
Expand All @@ -188,10 +183,11 @@ def main(argv):
parser.add_argument("--acme-dir", required=True, help="path to the .well-known/acme-challenge/ directory")
parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors")
parser.add_argument("--ca", default=DEFAULT_CA, help="certificate authority, default is Let's Encrypt")
parser.add_argument("--disable-check", default=False, action="store_true", help="disable checking if the challenge file is hosted correctly before telling the CA")

args = parser.parse_args(argv)
LOGGER.setLevel(args.quiet or LOGGER.level)
signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca)
signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check)
sys.stdout.write(signed_crt)

if __name__ == "__main__": # pragma: no cover
Expand Down
2 changes: 1 addition & 1 deletion tests/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TestModule(unittest.TestCase):
"Tests for acme_tiny.get_crt()"

def setUp(self):
self.CA = "https://acme-staging.api.letsencrypt.org"
self.CA = "https://acme-staging-v02.api.letsencrypt.org"
self.tempdir = tempfile.mkdtemp()
self.fuse_proc = Popen(["python", "tests/monkey.py", self.tempdir])

Expand Down

0 comments on commit bb248e0

Please sign in to comment.