-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathhtml_extractor.py
69 lines (54 loc) · 1.98 KB
/
html_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import io
import sys
from bs4 import BeautifulSoup
from bs4.dammit import EncodingDetector
from warcio.archiveiterator import WARCIterator
# For `soup.decode_content` that can hit the limit
sys.setrecursionlimit(10000)
class HtmlExtractor:
def __call__(self, example):
if example["html"] and not example["html_error"]:
return example
warc, warc_error = example["warc"], example["warc_error"]
if warc_error:
example["html"] = ""
example["html_error"] = "no WARC"
return example
html, html_error = self.get_html_from_warc(warc=warc)
example["html"] = html
example["html_error"] = html_error
return example
def get_html_from_warc(self, warc):
page, encoding = None, None
with io.BytesIO(warc) as stream:
try:
for record in WARCIterator(stream):
if record.rec_type == "response":
page = record.content_stream().read()
encoding = record.rec_headers["WARC-Identified-Content-Charset"]
break
except Exception as e:
return "", str(e)
if not encoding:
try:
for enc in EncodingDetector(page, is_html=True).encodings:
# take the first detected encoding
encoding = enc
break
except Exception as e:
return "", str(e)
if (not page) or (not encoding):
return "", "Not page or encoding"
try:
soup = BeautifulSoup(page, "html.parser", from_encoding=encoding)
except Exception as e:
return "", str(e)
try:
html_str = soup.decode_contents(formatter="html")
except Exception as e:
return "", str(e)
try:
html_str.encode()
except Exception as e:
return "", str(e)
return html_str, ""