1075 lines
36 KiB
Python
1075 lines
36 KiB
Python
import itertools
|
|
import logging
|
|
import re
|
|
import struct
|
|
from hashlib import sha256, md5, sha384, sha512
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
Iterable,
|
|
Iterator,
|
|
KeysView,
|
|
List,
|
|
Optional,
|
|
Sequence,
|
|
Tuple,
|
|
Type,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
|
from . import settings
|
|
from .arcfour import Arcfour
|
|
from .data_structures import NumberTree
|
|
from .pdfparser import PDFSyntaxError, PDFParser, PDFStreamParser
|
|
from .pdftypes import (
|
|
DecipherCallable,
|
|
PDFException,
|
|
PDFTypeError,
|
|
PDFStream,
|
|
PDFObjectNotFound,
|
|
decipher_all,
|
|
int_value,
|
|
str_value,
|
|
list_value,
|
|
uint_value,
|
|
dict_value,
|
|
stream_value,
|
|
)
|
|
from .psparser import PSEOF, literal_name, LIT, KWD
|
|
from .utils import choplist, decode_text, nunpack, format_int_roman, format_int_alpha
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class PDFNoValidXRef(PDFSyntaxError):
|
|
pass
|
|
|
|
|
|
class PDFNoValidXRefWarning(SyntaxWarning):
|
|
"""Legacy warning for missing xref.
|
|
|
|
Not used anymore because warnings.warn is replaced by logger.Logger.warn.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
class PDFNoOutlines(PDFException):
|
|
pass
|
|
|
|
|
|
class PDFNoPageLabels(PDFException):
|
|
pass
|
|
|
|
|
|
class PDFDestinationNotFound(PDFException):
|
|
pass
|
|
|
|
|
|
class PDFEncryptionError(PDFException):
|
|
pass
|
|
|
|
|
|
class PDFPasswordIncorrect(PDFEncryptionError):
|
|
pass
|
|
|
|
|
|
class PDFEncryptionWarning(UserWarning):
|
|
"""Legacy warning for failed decryption.
|
|
|
|
Not used anymore because warnings.warn is replaced by logger.Logger.warn.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
class PDFTextExtractionNotAllowedWarning(UserWarning):
|
|
"""Legacy warning for PDF that does not allow extraction.
|
|
|
|
Not used anymore because warnings.warn is replaced by logger.Logger.warn.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
class PDFTextExtractionNotAllowed(PDFEncryptionError):
|
|
pass
|
|
|
|
|
|
class PDFTextExtractionNotAllowedError(PDFTextExtractionNotAllowed):
|
|
def __init__(self, *args: object) -> None:
|
|
from warnings import warn
|
|
|
|
warn(
|
|
"PDFTextExtractionNotAllowedError will be removed in the future. "
|
|
"Use PDFTextExtractionNotAllowed instead.",
|
|
DeprecationWarning,
|
|
)
|
|
super().__init__(*args)
|
|
|
|
|
|
# some predefined literals and keywords.
|
|
LITERAL_OBJSTM = LIT("ObjStm")
|
|
LITERAL_XREF = LIT("XRef")
|
|
LITERAL_CATALOG = LIT("Catalog")
|
|
|
|
|
|
class PDFBaseXRef:
|
|
def get_trailer(self) -> Dict[str, Any]:
|
|
raise NotImplementedError
|
|
|
|
def get_objids(self) -> Iterable[int]:
|
|
return []
|
|
|
|
# Must return
|
|
# (strmid, index, genno)
|
|
# or (None, pos, genno)
|
|
def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
|
|
raise KeyError(objid)
|
|
|
|
def load(self, parser: PDFParser) -> None:
|
|
raise NotImplementedError
|
|
|
|
|
|
class PDFXRef(PDFBaseXRef):
|
|
def __init__(self) -> None:
|
|
self.offsets: Dict[int, Tuple[Optional[int], int, int]] = {}
|
|
self.trailer: Dict[str, Any] = {}
|
|
|
|
def __repr__(self) -> str:
|
|
return "<PDFXRef: offsets=%r>" % (self.offsets.keys())
|
|
|
|
def load(self, parser: PDFParser) -> None:
|
|
while True:
|
|
try:
|
|
(pos, line) = parser.nextline()
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
except PSEOF:
|
|
raise PDFNoValidXRef("Unexpected EOF - file corrupted?")
|
|
if line.startswith(b"trailer"):
|
|
parser.seek(pos)
|
|
break
|
|
f = line.split(b" ")
|
|
if len(f) != 2:
|
|
error_msg = "Trailer not found: {!r}: line={!r}".format(parser, line)
|
|
raise PDFNoValidXRef(error_msg)
|
|
try:
|
|
(start, nobjs) = map(int, f)
|
|
except ValueError:
|
|
error_msg = "Invalid line: {!r}: line={!r}".format(parser, line)
|
|
raise PDFNoValidXRef(error_msg)
|
|
for objid in range(start, start + nobjs):
|
|
try:
|
|
(_, line) = parser.nextline()
|
|
line = line.strip()
|
|
except PSEOF:
|
|
raise PDFNoValidXRef("Unexpected EOF - file corrupted?")
|
|
f = line.split(b" ")
|
|
if len(f) != 3:
|
|
error_msg = "Invalid XRef format: {!r}, line={!r}".format(
|
|
parser, line
|
|
)
|
|
raise PDFNoValidXRef(error_msg)
|
|
(pos_b, genno_b, use_b) = f
|
|
if use_b != b"n":
|
|
continue
|
|
self.offsets[objid] = (None, int(pos_b), int(genno_b))
|
|
log.debug("xref objects: %r", self.offsets)
|
|
self.load_trailer(parser)
|
|
|
|
def load_trailer(self, parser: PDFParser) -> None:
|
|
try:
|
|
(_, kwd) = parser.nexttoken()
|
|
assert kwd is KWD(b"trailer"), str(kwd)
|
|
(_, dic) = parser.nextobject()
|
|
except PSEOF:
|
|
x = parser.pop(1)
|
|
if not x:
|
|
raise PDFNoValidXRef("Unexpected EOF - file corrupted")
|
|
(_, dic) = x[0]
|
|
self.trailer.update(dict_value(dic))
|
|
log.debug("trailer=%r", self.trailer)
|
|
|
|
def get_trailer(self) -> Dict[str, Any]:
|
|
return self.trailer
|
|
|
|
def get_objids(self) -> KeysView[int]:
|
|
return self.offsets.keys()
|
|
|
|
def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
|
|
try:
|
|
return self.offsets[objid]
|
|
except KeyError:
|
|
raise
|
|
|
|
|
|
class PDFXRefFallback(PDFXRef):
|
|
def __repr__(self) -> str:
|
|
return "<PDFXRefFallback: offsets=%r>" % (self.offsets.keys())
|
|
|
|
PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b")
|
|
|
|
def load(self, parser: PDFParser) -> None:
|
|
parser.seek(0)
|
|
while 1:
|
|
try:
|
|
(pos, line_bytes) = parser.nextline()
|
|
except PSEOF:
|
|
break
|
|
if line_bytes.startswith(b"trailer"):
|
|
parser.seek(pos)
|
|
self.load_trailer(parser)
|
|
log.debug("trailer: %r", self.trailer)
|
|
break
|
|
line = line_bytes.decode("latin-1") # default pdf encoding
|
|
m = self.PDFOBJ_CUE.match(line)
|
|
if not m:
|
|
continue
|
|
(objid_s, genno_s) = m.groups()
|
|
objid = int(objid_s)
|
|
genno = int(genno_s)
|
|
self.offsets[objid] = (None, pos, genno)
|
|
# expand ObjStm.
|
|
parser.seek(pos)
|
|
(_, obj) = parser.nextobject()
|
|
if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM:
|
|
stream = stream_value(obj)
|
|
try:
|
|
n = stream["N"]
|
|
except KeyError:
|
|
if settings.STRICT:
|
|
raise PDFSyntaxError("N is not defined: %r" % stream)
|
|
n = 0
|
|
parser1 = PDFStreamParser(stream.get_data())
|
|
objs: List[int] = []
|
|
try:
|
|
while 1:
|
|
(_, obj) = parser1.nextobject()
|
|
objs.append(cast(int, obj))
|
|
except PSEOF:
|
|
pass
|
|
n = min(n, len(objs) // 2)
|
|
for index in range(n):
|
|
objid1 = objs[index * 2]
|
|
self.offsets[objid1] = (objid, index, 0)
|
|
|
|
|
|
class PDFXRefStream(PDFBaseXRef):
|
|
def __init__(self) -> None:
|
|
self.data: Optional[bytes] = None
|
|
self.entlen: Optional[int] = None
|
|
self.fl1: Optional[int] = None
|
|
self.fl2: Optional[int] = None
|
|
self.fl3: Optional[int] = None
|
|
self.ranges: List[Tuple[int, int]] = []
|
|
|
|
def __repr__(self) -> str:
|
|
return "<PDFXRefStream: ranges=%r>" % (self.ranges)
|
|
|
|
def load(self, parser: PDFParser) -> None:
|
|
(_, objid) = parser.nexttoken() # ignored
|
|
(_, genno) = parser.nexttoken() # ignored
|
|
(_, kwd) = parser.nexttoken()
|
|
(_, stream) = parser.nextobject()
|
|
if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF:
|
|
raise PDFNoValidXRef("Invalid PDF stream spec.")
|
|
size = stream["Size"]
|
|
index_array = stream.get("Index", (0, size))
|
|
if len(index_array) % 2 != 0:
|
|
raise PDFSyntaxError("Invalid index number")
|
|
self.ranges.extend(cast(Iterator[Tuple[int, int]], choplist(2, index_array)))
|
|
(self.fl1, self.fl2, self.fl3) = stream["W"]
|
|
assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
|
|
self.data = stream.get_data()
|
|
self.entlen = self.fl1 + self.fl2 + self.fl3
|
|
self.trailer = stream.attrs
|
|
log.debug(
|
|
"xref stream: objid=%s, fields=%d,%d,%d",
|
|
", ".join(map(repr, self.ranges)),
|
|
self.fl1,
|
|
self.fl2,
|
|
self.fl3,
|
|
)
|
|
return
|
|
|
|
def get_trailer(self) -> Dict[str, Any]:
|
|
return self.trailer
|
|
|
|
def get_objids(self) -> Iterator[int]:
|
|
for (start, nobjs) in self.ranges:
|
|
for i in range(nobjs):
|
|
assert self.entlen is not None
|
|
assert self.data is not None
|
|
offset = self.entlen * i
|
|
ent = self.data[offset : offset + self.entlen]
|
|
f1 = nunpack(ent[: self.fl1], 1)
|
|
if f1 == 1 or f1 == 2:
|
|
yield start + i
|
|
return
|
|
|
|
def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
|
|
index = 0
|
|
for (start, nobjs) in self.ranges:
|
|
if start <= objid and objid < start + nobjs:
|
|
index += objid - start
|
|
break
|
|
else:
|
|
index += nobjs
|
|
else:
|
|
raise KeyError(objid)
|
|
assert self.entlen is not None
|
|
assert self.data is not None
|
|
assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
|
|
offset = self.entlen * index
|
|
ent = self.data[offset : offset + self.entlen]
|
|
f1 = nunpack(ent[: self.fl1], 1)
|
|
f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2])
|
|
f3 = nunpack(ent[self.fl1 + self.fl2 :])
|
|
if f1 == 1:
|
|
return (None, f2, f3)
|
|
elif f1 == 2:
|
|
return (f2, f3, 0)
|
|
else:
|
|
# this is a free object
|
|
raise KeyError(objid)
|
|
|
|
|
|
class PDFStandardSecurityHandler:
|
|
|
|
PASSWORD_PADDING = (
|
|
b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08"
|
|
b"..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz"
|
|
)
|
|
supported_revisions: Tuple[int, ...] = (2, 3)
|
|
|
|
def __init__(
|
|
self, docid: Sequence[bytes], param: Dict[str, Any], password: str = ""
|
|
) -> None:
|
|
self.docid = docid
|
|
self.param = param
|
|
self.password = password
|
|
self.init()
|
|
return
|
|
|
|
def init(self) -> None:
|
|
self.init_params()
|
|
if self.r not in self.supported_revisions:
|
|
error_msg = "Unsupported revision: param=%r" % self.param
|
|
raise PDFEncryptionError(error_msg)
|
|
self.init_key()
|
|
return
|
|
|
|
def init_params(self) -> None:
|
|
self.v = int_value(self.param.get("V", 0))
|
|
self.r = int_value(self.param["R"])
|
|
self.p = uint_value(self.param["P"], 32)
|
|
self.o = str_value(self.param["O"])
|
|
self.u = str_value(self.param["U"])
|
|
self.length = int_value(self.param.get("Length", 40))
|
|
return
|
|
|
|
def init_key(self) -> None:
|
|
self.key = self.authenticate(self.password)
|
|
if self.key is None:
|
|
raise PDFPasswordIncorrect
|
|
return
|
|
|
|
def is_printable(self) -> bool:
|
|
return bool(self.p & 4)
|
|
|
|
def is_modifiable(self) -> bool:
|
|
return bool(self.p & 8)
|
|
|
|
def is_extractable(self) -> bool:
|
|
return bool(self.p & 16)
|
|
|
|
def compute_u(self, key: bytes) -> bytes:
|
|
if self.r == 2:
|
|
# Algorithm 3.4
|
|
return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2
|
|
else:
|
|
# Algorithm 3.5
|
|
hash = md5(self.PASSWORD_PADDING) # 2
|
|
hash.update(self.docid[0]) # 3
|
|
result = Arcfour(key).encrypt(hash.digest()) # 4
|
|
for i in range(1, 20): # 5
|
|
k = b"".join(bytes((c ^ i,)) for c in iter(key))
|
|
result = Arcfour(k).encrypt(result)
|
|
result += result # 6
|
|
return result
|
|
|
|
def compute_encryption_key(self, password: bytes) -> bytes:
|
|
# Algorithm 3.2
|
|
password = (password + self.PASSWORD_PADDING)[:32] # 1
|
|
hash = md5(password) # 2
|
|
hash.update(self.o) # 3
|
|
# See https://github.com/pdfminer/pdfminer.six/issues/186
|
|
hash.update(struct.pack("<L", self.p)) # 4
|
|
hash.update(self.docid[0]) # 5
|
|
if self.r >= 4:
|
|
if not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata:
|
|
hash.update(b"\xff\xff\xff\xff")
|
|
result = hash.digest()
|
|
n = 5
|
|
if self.r >= 3:
|
|
n = self.length // 8
|
|
for _ in range(50):
|
|
result = md5(result[:n]).digest()
|
|
return result[:n]
|
|
|
|
def authenticate(self, password: str) -> Optional[bytes]:
|
|
password_bytes = password.encode("latin1")
|
|
key = self.authenticate_user_password(password_bytes)
|
|
if key is None:
|
|
key = self.authenticate_owner_password(password_bytes)
|
|
return key
|
|
|
|
def authenticate_user_password(self, password: bytes) -> Optional[bytes]:
|
|
key = self.compute_encryption_key(password)
|
|
if self.verify_encryption_key(key):
|
|
return key
|
|
else:
|
|
return None
|
|
|
|
def verify_encryption_key(self, key: bytes) -> bool:
|
|
# Algorithm 3.6
|
|
u = self.compute_u(key)
|
|
if self.r == 2:
|
|
return u == self.u
|
|
return u[:16] == self.u[:16]
|
|
|
|
def authenticate_owner_password(self, password: bytes) -> Optional[bytes]:
|
|
# Algorithm 3.7
|
|
password = (password + self.PASSWORD_PADDING)[:32]
|
|
hash = md5(password)
|
|
if self.r >= 3:
|
|
for _ in range(50):
|
|
hash = md5(hash.digest())
|
|
n = 5
|
|
if self.r >= 3:
|
|
n = self.length // 8
|
|
key = hash.digest()[:n]
|
|
if self.r == 2:
|
|
user_password = Arcfour(key).decrypt(self.o)
|
|
else:
|
|
user_password = self.o
|
|
for i in range(19, -1, -1):
|
|
k = b"".join(bytes((c ^ i,)) for c in iter(key))
|
|
user_password = Arcfour(k).decrypt(user_password)
|
|
return self.authenticate_user_password(user_password)
|
|
|
|
def decrypt(
|
|
self,
|
|
objid: int,
|
|
genno: int,
|
|
data: bytes,
|
|
attrs: Optional[Dict[str, Any]] = None,
|
|
) -> bytes:
|
|
return self.decrypt_rc4(objid, genno, data)
|
|
|
|
def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes:
|
|
assert self.key is not None
|
|
key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2]
|
|
hash = md5(key)
|
|
key = hash.digest()[: min(len(key), 16)]
|
|
return Arcfour(key).decrypt(data)
|
|
|
|
|
|
class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler):
|
|
|
|
supported_revisions: Tuple[int, ...] = (4,)
|
|
|
|
def init_params(self) -> None:
|
|
super().init_params()
|
|
self.length = 128
|
|
self.cf = dict_value(self.param.get("CF"))
|
|
self.stmf = literal_name(self.param["StmF"])
|
|
self.strf = literal_name(self.param["StrF"])
|
|
self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True))
|
|
if self.stmf != self.strf:
|
|
error_msg = "Unsupported crypt filter: param=%r" % self.param
|
|
raise PDFEncryptionError(error_msg)
|
|
self.cfm = {}
|
|
for k, v in self.cf.items():
|
|
f = self.get_cfm(literal_name(v["CFM"]))
|
|
if f is None:
|
|
error_msg = "Unknown crypt filter method: param=%r" % self.param
|
|
raise PDFEncryptionError(error_msg)
|
|
self.cfm[k] = f
|
|
self.cfm["Identity"] = self.decrypt_identity
|
|
if self.strf not in self.cfm:
|
|
error_msg = "Undefined crypt filter: param=%r" % self.param
|
|
raise PDFEncryptionError(error_msg)
|
|
return
|
|
|
|
def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]:
|
|
if name == "V2":
|
|
return self.decrypt_rc4
|
|
elif name == "AESV2":
|
|
return self.decrypt_aes128
|
|
else:
|
|
return None
|
|
|
|
def decrypt(
|
|
self,
|
|
objid: int,
|
|
genno: int,
|
|
data: bytes,
|
|
attrs: Optional[Dict[str, Any]] = None,
|
|
name: Optional[str] = None,
|
|
) -> bytes:
|
|
if not self.encrypt_metadata and attrs is not None:
|
|
t = attrs.get("Type")
|
|
if t is not None and literal_name(t) == "Metadata":
|
|
return data
|
|
if name is None:
|
|
name = self.strf
|
|
return self.cfm[name](objid, genno, data)
|
|
|
|
def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes:
|
|
return data
|
|
|
|
def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes:
|
|
assert self.key is not None
|
|
key = (
|
|
self.key
|
|
+ struct.pack("<L", objid)[:3]
|
|
+ struct.pack("<L", genno)[:2]
|
|
+ b"sAlT"
|
|
)
|
|
hash = md5(key)
|
|
key = hash.digest()[: min(len(key), 16)]
|
|
initialization_vector = data[:16]
|
|
ciphertext = data[16:]
|
|
cipher = Cipher(
|
|
algorithms.AES(key),
|
|
modes.CBC(initialization_vector),
|
|
backend=default_backend(),
|
|
) # type: ignore
|
|
return cipher.decryptor().update(ciphertext) # type: ignore
|
|
|
|
|
|
class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4):
|
|
|
|
supported_revisions = (5, 6)
|
|
|
|
def init_params(self) -> None:
|
|
super().init_params()
|
|
self.length = 256
|
|
self.oe = str_value(self.param["OE"])
|
|
self.ue = str_value(self.param["UE"])
|
|
self.o_hash = self.o[:32]
|
|
self.o_validation_salt = self.o[32:40]
|
|
self.o_key_salt = self.o[40:]
|
|
self.u_hash = self.u[:32]
|
|
self.u_validation_salt = self.u[32:40]
|
|
self.u_key_salt = self.u[40:]
|
|
return
|
|
|
|
def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]:
|
|
if name == "AESV3":
|
|
return self.decrypt_aes256
|
|
else:
|
|
return None
|
|
|
|
def authenticate(self, password: str) -> Optional[bytes]:
|
|
password_b = self._normalize_password(password)
|
|
hash = self._password_hash(password_b, self.o_validation_salt, self.u)
|
|
if hash == self.o_hash:
|
|
hash = self._password_hash(password_b, self.o_key_salt, self.u)
|
|
cipher = Cipher(
|
|
algorithms.AES(hash), modes.CBC(b"\0" * 16), backend=default_backend()
|
|
) # type: ignore
|
|
return cipher.decryptor().update(self.oe) # type: ignore
|
|
hash = self._password_hash(password_b, self.u_validation_salt)
|
|
if hash == self.u_hash:
|
|
hash = self._password_hash(password_b, self.u_key_salt)
|
|
cipher = Cipher(
|
|
algorithms.AES(hash), modes.CBC(b"\0" * 16), backend=default_backend()
|
|
) # type: ignore
|
|
return cipher.decryptor().update(self.ue) # type: ignore
|
|
return None
|
|
|
|
def _normalize_password(self, password: str) -> bytes:
|
|
if self.r == 6:
|
|
# saslprep expects non-empty strings, apparently
|
|
if not password:
|
|
return b""
|
|
from ._saslprep import saslprep
|
|
|
|
password = saslprep(password)
|
|
return password.encode("utf-8")[:127]
|
|
|
|
def _password_hash(
|
|
self, password: bytes, salt: bytes, vector: Optional[bytes] = None
|
|
) -> bytes:
|
|
"""
|
|
Compute password hash depending on revision number
|
|
"""
|
|
if self.r == 5:
|
|
return self._r5_password(password, salt, vector)
|
|
return self._r6_password(password, salt[0:8], vector)
|
|
|
|
def _r5_password(
|
|
self, password: bytes, salt: bytes, vector: Optional[bytes] = None
|
|
) -> bytes:
|
|
"""
|
|
Compute the password for revision 5
|
|
"""
|
|
hash = sha256(password)
|
|
hash.update(salt)
|
|
if vector is not None:
|
|
hash.update(vector)
|
|
return hash.digest()
|
|
|
|
def _r6_password(
|
|
self, password: bytes, salt: bytes, vector: Optional[bytes] = None
|
|
) -> bytes:
|
|
"""
|
|
Compute the password for revision 6
|
|
"""
|
|
initial_hash = sha256(password)
|
|
initial_hash.update(salt)
|
|
if vector is not None:
|
|
initial_hash.update(vector)
|
|
k = initial_hash.digest()
|
|
hashes = (sha256, sha384, sha512)
|
|
round_no = last_byte_val = 0
|
|
while round_no < 64 or last_byte_val > round_no - 32:
|
|
k1 = (password + k + (vector or b"")) * 64
|
|
e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1)
|
|
# compute the first 16 bytes of e,
|
|
# interpreted as an unsigned integer mod 3
|
|
next_hash = hashes[self._bytes_mod_3(e[:16])]
|
|
k = next_hash(e).digest()
|
|
last_byte_val = e[len(e) - 1]
|
|
round_no += 1
|
|
return k[:32]
|
|
|
|
@staticmethod
|
|
def _bytes_mod_3(input_bytes: bytes) -> int:
|
|
# 256 is 1 mod 3, so we can just sum 'em
|
|
return sum(b % 3 for b in input_bytes) % 3
|
|
|
|
def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes:
|
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
|
|
encryptor = cipher.encryptor() # type: ignore
|
|
return encryptor.update(data) + encryptor.finalize() # type: ignore
|
|
|
|
def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes:
|
|
initialization_vector = data[:16]
|
|
ciphertext = data[16:]
|
|
assert self.key is not None
|
|
cipher = Cipher(
|
|
algorithms.AES(self.key),
|
|
modes.CBC(initialization_vector),
|
|
backend=default_backend(),
|
|
) # type: ignore
|
|
return cipher.decryptor().update(ciphertext) # type: ignore
|
|
|
|
|
|
class PDFDocument:
|
|
"""PDFDocument object represents a PDF document.
|
|
|
|
Since a PDF file can be very big, normally it is not loaded at
|
|
once. So PDF document has to cooperate with a PDF parser in order to
|
|
dynamically import the data as processing goes.
|
|
|
|
Typical usage:
|
|
doc = PDFDocument(parser, password)
|
|
obj = doc.getobj(objid)
|
|
|
|
"""
|
|
|
|
security_handler_registry: Dict[int, Type[PDFStandardSecurityHandler]] = {
|
|
1: PDFStandardSecurityHandler,
|
|
2: PDFStandardSecurityHandler,
|
|
4: PDFStandardSecurityHandlerV4,
|
|
5: PDFStandardSecurityHandlerV5,
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
parser: PDFParser,
|
|
password: str = "",
|
|
caching: bool = True,
|
|
fallback: bool = True,
|
|
) -> None:
|
|
"Set the document to use a given PDFParser object."
|
|
self.caching = caching
|
|
self.xrefs: List[PDFBaseXRef] = []
|
|
self.info = []
|
|
self.catalog: Dict[str, Any] = {}
|
|
self.encryption: Optional[Tuple[Any, Any]] = None
|
|
self.decipher: Optional[DecipherCallable] = None
|
|
self._parser = None
|
|
self._cached_objs: Dict[int, Tuple[object, int]] = {}
|
|
self._parsed_objs: Dict[int, Tuple[List[object], int]] = {}
|
|
self._parser = parser
|
|
self._parser.set_document(self)
|
|
self.is_printable = self.is_modifiable = self.is_extractable = True
|
|
# Retrieve the information of each header that was appended
|
|
# (maybe multiple times) at the end of the document.
|
|
try:
|
|
pos = self.find_xref(parser)
|
|
self.read_xref_from(parser, pos, self.xrefs)
|
|
except PDFNoValidXRef:
|
|
if fallback:
|
|
parser.fallback = True
|
|
newxref = PDFXRefFallback()
|
|
newxref.load(parser)
|
|
self.xrefs.append(newxref)
|
|
|
|
for xref in self.xrefs:
|
|
trailer = xref.get_trailer()
|
|
if not trailer:
|
|
continue
|
|
# If there's an encryption info, remember it.
|
|
if "Encrypt" in trailer:
|
|
if "ID" in trailer:
|
|
id_value = list_value(trailer["ID"])
|
|
else:
|
|
# Some documents may not have a /ID, use two empty
|
|
# byte strings instead. Solves
|
|
# https://github.com/pdfminer/pdfminer.six/issues/594
|
|
id_value = (b"", b"")
|
|
self.encryption = (id_value, dict_value(trailer["Encrypt"]))
|
|
self._initialize_password(password)
|
|
if "Info" in trailer:
|
|
self.info.append(dict_value(trailer["Info"]))
|
|
if "Root" in trailer:
|
|
# Every PDF file must have exactly one /Root dictionary.
|
|
self.catalog = dict_value(trailer["Root"])
|
|
break
|
|
else:
|
|
raise PDFSyntaxError("No /Root object! - Is this really a PDF?")
|
|
if self.catalog.get("Type") is not LITERAL_CATALOG:
|
|
if settings.STRICT:
|
|
raise PDFSyntaxError("Catalog not found!")
|
|
return
|
|
|
|
KEYWORD_OBJ = KWD(b"obj")
|
|
|
|
# _initialize_password(password=b'')
|
|
# Perform the initialization with a given password.
|
|
def _initialize_password(self, password: str = "") -> None:
|
|
assert self.encryption is not None
|
|
(docid, param) = self.encryption
|
|
if literal_name(param.get("Filter")) != "Standard":
|
|
raise PDFEncryptionError("Unknown filter: param=%r" % param)
|
|
v = int_value(param.get("V", 0))
|
|
factory = self.security_handler_registry.get(v)
|
|
if factory is None:
|
|
raise PDFEncryptionError("Unknown algorithm: param=%r" % param)
|
|
handler = factory(docid, param, password)
|
|
self.decipher = handler.decrypt
|
|
self.is_printable = handler.is_printable()
|
|
self.is_modifiable = handler.is_modifiable()
|
|
self.is_extractable = handler.is_extractable()
|
|
assert self._parser is not None
|
|
self._parser.fallback = False # need to read streams with exact length
|
|
return
|
|
|
|
def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object:
|
|
if stream.objid in self._parsed_objs:
|
|
(objs, n) = self._parsed_objs[stream.objid]
|
|
else:
|
|
(objs, n) = self._get_objects(stream)
|
|
if self.caching:
|
|
assert stream.objid is not None
|
|
self._parsed_objs[stream.objid] = (objs, n)
|
|
i = n * 2 + index
|
|
try:
|
|
obj = objs[i]
|
|
except IndexError:
|
|
raise PDFSyntaxError("index too big: %r" % index)
|
|
return obj
|
|
|
|
def _get_objects(self, stream: PDFStream) -> Tuple[List[object], int]:
|
|
if stream.get("Type") is not LITERAL_OBJSTM:
|
|
if settings.STRICT:
|
|
raise PDFSyntaxError("Not a stream object: %r" % stream)
|
|
try:
|
|
n = cast(int, stream["N"])
|
|
except KeyError:
|
|
if settings.STRICT:
|
|
raise PDFSyntaxError("N is not defined: %r" % stream)
|
|
n = 0
|
|
parser = PDFStreamParser(stream.get_data())
|
|
parser.set_document(self)
|
|
objs: List[object] = []
|
|
try:
|
|
while 1:
|
|
(_, obj) = parser.nextobject()
|
|
objs.append(obj)
|
|
except PSEOF:
|
|
pass
|
|
return (objs, n)
|
|
|
|
def _getobj_parse(self, pos: int, objid: int) -> object:
|
|
assert self._parser is not None
|
|
self._parser.seek(pos)
|
|
(_, objid1) = self._parser.nexttoken() # objid
|
|
(_, genno) = self._parser.nexttoken() # genno
|
|
(_, kwd) = self._parser.nexttoken()
|
|
# hack around malformed pdf files
|
|
# copied from https://github.com/jaepil/pdfminer3k/blob/master/
|
|
# pdfminer/pdfparser.py#L399
|
|
# to solve https://github.com/pdfminer/pdfminer.six/issues/56
|
|
# assert objid1 == objid, str((objid1, objid))
|
|
if objid1 != objid:
|
|
x = []
|
|
while kwd is not self.KEYWORD_OBJ:
|
|
(_, kwd) = self._parser.nexttoken()
|
|
x.append(kwd)
|
|
if len(x) >= 2:
|
|
objid1 = x[-2]
|
|
# #### end hack around malformed pdf files
|
|
if objid1 != objid:
|
|
raise PDFSyntaxError("objid mismatch: {!r}={!r}".format(objid1, objid))
|
|
|
|
if kwd != KWD(b"obj"):
|
|
raise PDFSyntaxError("Invalid object spec: offset=%r" % pos)
|
|
(_, obj) = self._parser.nextobject()
|
|
return obj
|
|
|
|
# can raise PDFObjectNotFound
|
|
def getobj(self, objid: int) -> object:
|
|
"""Get object from PDF
|
|
|
|
:raises PDFException if PDFDocument is not initialized
|
|
:raises PDFObjectNotFound if objid does not exist in PDF
|
|
"""
|
|
if not self.xrefs:
|
|
raise PDFException("PDFDocument is not initialized")
|
|
log.debug("getobj: objid=%r", objid)
|
|
if objid in self._cached_objs:
|
|
(obj, genno) = self._cached_objs[objid]
|
|
else:
|
|
for xref in self.xrefs:
|
|
try:
|
|
(strmid, index, genno) = xref.get_pos(objid)
|
|
except KeyError:
|
|
continue
|
|
try:
|
|
if strmid is not None:
|
|
stream = stream_value(self.getobj(strmid))
|
|
obj = self._getobj_objstm(stream, index, objid)
|
|
else:
|
|
obj = self._getobj_parse(index, objid)
|
|
if self.decipher:
|
|
obj = decipher_all(self.decipher, objid, genno, obj)
|
|
|
|
if isinstance(obj, PDFStream):
|
|
obj.set_objid(objid, genno)
|
|
break
|
|
except (PSEOF, PDFSyntaxError):
|
|
continue
|
|
else:
|
|
raise PDFObjectNotFound(objid)
|
|
log.debug("register: objid=%r: %r", objid, obj)
|
|
if self.caching:
|
|
self._cached_objs[objid] = (obj, genno)
|
|
return obj
|
|
|
|
OutlineType = Tuple[Any, Any, Any, Any, Any]
|
|
|
|
def get_outlines(self) -> Iterator[OutlineType]:
|
|
if "Outlines" not in self.catalog:
|
|
raise PDFNoOutlines
|
|
|
|
def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]:
|
|
entry = dict_value(entry)
|
|
if "Title" in entry:
|
|
if "A" in entry or "Dest" in entry:
|
|
title = decode_text(str_value(entry["Title"]))
|
|
dest = entry.get("Dest")
|
|
action = entry.get("A")
|
|
se = entry.get("SE")
|
|
yield (level, title, dest, action, se)
|
|
if "First" in entry and "Last" in entry:
|
|
yield from search(entry["First"], level + 1)
|
|
if "Next" in entry:
|
|
yield from search(entry["Next"], level)
|
|
return
|
|
|
|
return search(self.catalog["Outlines"], 0)
|
|
|
|
def get_page_labels(self) -> Iterator[str]:
|
|
"""
|
|
Generate page label strings for the PDF document.
|
|
|
|
If the document includes page labels, generates strings, one per page.
|
|
If not, raises PDFNoPageLabels.
|
|
|
|
The resulting iteration is unbounded.
|
|
"""
|
|
assert self.catalog is not None
|
|
|
|
try:
|
|
page_labels = PageLabels(self.catalog["PageLabels"])
|
|
except (PDFTypeError, KeyError):
|
|
raise PDFNoPageLabels
|
|
|
|
return page_labels.labels
|
|
|
|
def lookup_name(self, cat: str, key: Union[str, bytes]) -> Any:
|
|
try:
|
|
names = dict_value(self.catalog["Names"])
|
|
except (PDFTypeError, KeyError):
|
|
raise KeyError((cat, key))
|
|
# may raise KeyError
|
|
d0 = dict_value(names[cat])
|
|
|
|
def lookup(d: Dict[str, Any]) -> Any:
|
|
if "Limits" in d:
|
|
(k1, k2) = list_value(d["Limits"])
|
|
if key < k1 or k2 < key:
|
|
return None
|
|
if "Names" in d:
|
|
objs = list_value(d["Names"])
|
|
names = dict(
|
|
cast(Iterator[Tuple[Union[str, bytes], Any]], choplist(2, objs))
|
|
)
|
|
return names[key]
|
|
if "Kids" in d:
|
|
for c in list_value(d["Kids"]):
|
|
v = lookup(dict_value(c))
|
|
if v:
|
|
return v
|
|
raise KeyError((cat, key))
|
|
|
|
return lookup(d0)
|
|
|
|
def get_dest(self, name: Union[str, bytes]) -> Any:
|
|
try:
|
|
# PDF-1.2 or later
|
|
obj = self.lookup_name("Dests", name)
|
|
except KeyError:
|
|
# PDF-1.1 or prior
|
|
if "Dests" not in self.catalog:
|
|
raise PDFDestinationNotFound(name)
|
|
d0 = dict_value(self.catalog["Dests"])
|
|
if name not in d0:
|
|
raise PDFDestinationNotFound(name)
|
|
obj = d0[name]
|
|
return obj
|
|
|
|
# find_xref
|
|
def find_xref(self, parser: PDFParser) -> int:
|
|
"""Internal function used to locate the first XRef."""
|
|
# search the last xref table by scanning the file backwards.
|
|
prev = None
|
|
for line in parser.revreadlines():
|
|
line = line.strip()
|
|
log.debug("find_xref: %r", line)
|
|
if line == b"startxref":
|
|
break
|
|
if line:
|
|
prev = line
|
|
else:
|
|
raise PDFNoValidXRef("Unexpected EOF")
|
|
log.debug("xref found: pos=%r", prev)
|
|
assert prev is not None
|
|
return int(prev)
|
|
|
|
# read xref table
|
|
def read_xref_from(
|
|
self, parser: PDFParser, start: int, xrefs: List[PDFBaseXRef]
|
|
) -> None:
|
|
"""Reads XRefs from the given location."""
|
|
parser.seek(start)
|
|
parser.reset()
|
|
try:
|
|
(pos, token) = parser.nexttoken()
|
|
except PSEOF:
|
|
raise PDFNoValidXRef("Unexpected EOF")
|
|
log.debug("read_xref_from: start=%d, token=%r", start, token)
|
|
if isinstance(token, int):
|
|
# XRefStream: PDF-1.5
|
|
parser.seek(pos)
|
|
parser.reset()
|
|
xref: PDFBaseXRef = PDFXRefStream()
|
|
xref.load(parser)
|
|
else:
|
|
if token is parser.KEYWORD_XREF:
|
|
parser.nextline()
|
|
xref = PDFXRef()
|
|
xref.load(parser)
|
|
xrefs.append(xref)
|
|
trailer = xref.get_trailer()
|
|
log.debug("trailer: %r", trailer)
|
|
if "XRefStm" in trailer:
|
|
pos = int_value(trailer["XRefStm"])
|
|
self.read_xref_from(parser, pos, xrefs)
|
|
if "Prev" in trailer:
|
|
# find previous xref
|
|
pos = int_value(trailer["Prev"])
|
|
self.read_xref_from(parser, pos, xrefs)
|
|
return
|
|
|
|
|
|
class PageLabels(NumberTree):
|
|
"""PageLabels from the document catalog.
|
|
|
|
See Section 8.3.1 in the PDF Reference.
|
|
"""
|
|
|
|
@property
|
|
def labels(self) -> Iterator[str]:
|
|
ranges = self.values
|
|
|
|
# The tree must begin with page index 0
|
|
if len(ranges) == 0 or ranges[0][0] != 0:
|
|
if settings.STRICT:
|
|
raise PDFSyntaxError("PageLabels is missing page index 0")
|
|
else:
|
|
# Try to cope, by assuming empty labels for the initial pages
|
|
ranges.insert(0, (0, {}))
|
|
|
|
for (next, (start, label_dict_unchecked)) in enumerate(ranges, 1):
|
|
label_dict = dict_value(label_dict_unchecked)
|
|
style = label_dict.get("S")
|
|
prefix = decode_text(str_value(label_dict.get("P", b"")))
|
|
first_value = int_value(label_dict.get("St", 1))
|
|
|
|
if next == len(ranges):
|
|
# This is the last specified range. It continues until the end
|
|
# of the document.
|
|
values: Iterable[int] = itertools.count(first_value)
|
|
else:
|
|
end, _ = ranges[next]
|
|
range_length = end - start
|
|
values = range(first_value, first_value + range_length)
|
|
|
|
for value in values:
|
|
label = self._format_page_label(value, style)
|
|
yield prefix + label
|
|
|
|
@staticmethod
|
|
def _format_page_label(value: int, style: Any) -> str:
|
|
"""Format page label value in a specific style"""
|
|
if style is None:
|
|
label = ""
|
|
elif style is LIT("D"): # Decimal arabic numerals
|
|
label = str(value)
|
|
elif style is LIT("R"): # Uppercase roman numerals
|
|
label = format_int_roman(value).upper()
|
|
elif style is LIT("r"): # Lowercase roman numerals
|
|
label = format_int_roman(value)
|
|
elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ...
|
|
label = format_int_alpha(value).upper()
|
|
elif style is LIT("a"): # Lowercase letters a-z, aa-zz...
|
|
label = format_int_alpha(value)
|
|
else:
|
|
log.warning("Unknown page label style: %r", style)
|
|
label = ""
|
|
return label
|