|
| 1 | +import base64 |
| 2 | +import io |
| 3 | +import time |
| 4 | + |
| 5 | +from PIL import UnidentifiedImageError |
| 6 | +from reportlab.lib.utils import ImageReader |
| 7 | +from reportlab.pdfgen import canvas |
| 8 | + |
| 9 | +from odoo import _, models, Command |
| 10 | +from odoo.tools import format_date |
| 11 | +from odoo.exceptions import UserError, ValidationError |
| 12 | +from odoo.tools.pdf import PdfFileReader, PdfFileWriter |
| 13 | + |
| 14 | +try: |
| 15 | + from PyPDF2.errors import PdfReadError # type: ignore |
| 16 | +except ImportError: |
| 17 | + from PyPDF2.utils import PdfReadError |
| 18 | + |
| 19 | + |
| 20 | +def _fix_image_transparency(image): |
| 21 | + pixels = image.load() |
| 22 | + for x in range(image.size[0]): |
| 23 | + for y in range(image.size[1]): |
| 24 | + if pixels[x, y] == (0, 0, 0, 0): |
| 25 | + pixels[x, y] = (255, 255, 255, 0) |
| 26 | + |
| 27 | + |
| 28 | +class SignRequest(models.Model): |
| 29 | + _inherit = "sign.request" |
| 30 | + |
| 31 | + def _generate_completed_document(self, password=""): |
| 32 | + self.ensure_one() |
| 33 | + self._validate_document_state() |
| 34 | + |
| 35 | + if not self.template_id.sign_item_ids: |
| 36 | + self._copy_template_to_completed_document() |
| 37 | + else: |
| 38 | + old_pdf = self._load_template_pdf(password) |
| 39 | + new_pdf_data = self._create_signed_overlay(old_pdf) |
| 40 | + self._merge_pdfs_and_store(old_pdf, new_pdf_data, password) |
| 41 | + |
| 42 | + attachment = self._create_attachment_from_completed_doc() |
| 43 | + log_attachment = self._create_completion_certificate() |
| 44 | + self._attach_completed_documents(attachment, log_attachment) |
| 45 | + |
| 46 | + def _validate_document_state(self): |
| 47 | + if self.state != "signed": |
| 48 | + raise UserError( |
| 49 | + _( |
| 50 | + "The completed document cannot be created because the sign request is not fully signed" |
| 51 | + ) |
| 52 | + ) |
| 53 | + |
| 54 | + def _copy_template_to_completed_document(self): |
| 55 | + self.completed_document = self.template_id.attachment_id.datas |
| 56 | + |
| 57 | + def _load_template_pdf(self, password): |
| 58 | + try: |
| 59 | + pdf_reader = PdfFileReader( |
| 60 | + io.BytesIO(base64.b64decode(self.template_id.attachment_id.datas)), |
| 61 | + strict=False, |
| 62 | + overwriteWarnings=False, |
| 63 | + ) |
| 64 | + pdf_reader.getNumPages() |
| 65 | + except PdfReadError: |
| 66 | + raise ValidationError(_("ERROR: Invalid PDF file!")) |
| 67 | + |
| 68 | + if pdf_reader.isEncrypted and not pdf_reader.decrypt(password): |
| 69 | + return |
| 70 | + |
| 71 | + return pdf_reader |
| 72 | + |
| 73 | + def _create_signed_overlay(self, old_pdf): |
| 74 | + font = self._get_font() |
| 75 | + normalFontSize = self._get_normal_font_size() |
| 76 | + packet = io.BytesIO() |
| 77 | + can = canvas.Canvas(packet, pagesize=self.get_page_size(old_pdf)) |
| 78 | + items_by_page, values = self._collect_items_and_values() |
| 79 | + |
| 80 | + for p in range(0, old_pdf.getNumPages()): |
| 81 | + page = old_pdf.getPage(p) |
| 82 | + width, height = self._get_page_dimensions(page) |
| 83 | + self._apply_page_rotation(can, page, width, height) |
| 84 | + |
| 85 | + for item in items_by_page.get(p + 1, []): |
| 86 | + self._draw_item( |
| 87 | + can, item, values.get(item.id), width, height, font, normalFontSize |
| 88 | + ) |
| 89 | + can.showPage() |
| 90 | + |
| 91 | + can.save() |
| 92 | + return PdfFileReader(packet, overwriteWarnings=False) |
| 93 | + |
| 94 | + def _collect_items_and_values(self): |
| 95 | + items_by_page = self.template_id._get_sign_items_by_page() |
| 96 | + item_ids = [id for items in items_by_page.values() for id in items.ids] |
| 97 | + values_dict = self.env["sign.request.item.value"]._read_group( |
| 98 | + [("sign_item_id", "in", item_ids), ("sign_request_id", "=", self.id)], |
| 99 | + groupby=["sign_item_id"], |
| 100 | + aggregates=[ |
| 101 | + "value:array_agg", |
| 102 | + "frame_value:array_agg", |
| 103 | + "frame_has_hash:array_agg", |
| 104 | + ], |
| 105 | + ) |
| 106 | + values = { |
| 107 | + item: {"value": vals[0], "frame": frames[0], "frame_has_hash": hashes[0]} |
| 108 | + for item, vals, frames, hashes in values_dict |
| 109 | + } |
| 110 | + return items_by_page, values |
| 111 | + |
| 112 | + def _get_page_dimensions(self, page): |
| 113 | + width = float(abs(page.mediaBox.getWidth())) |
| 114 | + height = float(abs(page.mediaBox.getHeight())) |
| 115 | + return width, height |
| 116 | + |
| 117 | + def _apply_page_rotation(self, can, page, width, height): |
| 118 | + rotation = page.get("/Rotate", 0) |
| 119 | + if isinstance(rotation, int): |
| 120 | + can.rotate(rotation) |
| 121 | + if rotation == 90: |
| 122 | + width, height = height, width |
| 123 | + can.translate(0, -height) |
| 124 | + elif rotation == 180: |
| 125 | + can.translate(-width, -height) |
| 126 | + elif rotation == 270: |
| 127 | + width, height = height, width |
| 128 | + can.translate(-width, 0) |
| 129 | + |
| 130 | + def _draw_item(self, can, item, value_dict, width, height, font, normalFontSize): |
| 131 | + if not value_dict: |
| 132 | + return |
| 133 | + |
| 134 | + value, frame = value_dict["value"], value_dict["frame"] |
| 135 | + if frame: |
| 136 | + self._draw_image(can, frame, item, width, height) |
| 137 | + |
| 138 | + draw_method = getattr(self, f"_draw_{item.type_id.item_type}", None) |
| 139 | + if draw_method: |
| 140 | + draw_method(can, item, value, width, height, font, normalFontSize) |
| 141 | + |
| 142 | + def _draw_image(self, can, frame_data, item, width, height): |
| 143 | + try: |
| 144 | + image_reader = ImageReader( |
| 145 | + io.BytesIO(base64.b64decode(frame_data.split(",")[1])) |
| 146 | + ) |
| 147 | + except UnidentifiedImageError: |
| 148 | + raise ValidationError( |
| 149 | + _( |
| 150 | + "There was an issue downloading your document. Please contact an administrator." |
| 151 | + ) |
| 152 | + ) |
| 153 | + |
| 154 | + _fix_image_transparency(image_reader._image) |
| 155 | + can.drawImage( |
| 156 | + image_reader, |
| 157 | + width * item.posX, |
| 158 | + height * (1 - item.posY - item.height), |
| 159 | + width * item.width, |
| 160 | + height * item.height, |
| 161 | + "auto", |
| 162 | + True, |
| 163 | + ) |
| 164 | + |
| 165 | + def _draw_signature(self, can, item, value, width, height, *_): |
| 166 | + self._draw_image(can, value, item, width, height) |
| 167 | + |
| 168 | + _draw_initial = _draw_signature |
| 169 | + _draw_stamp = _draw_signature |
| 170 | + |
| 171 | + def _merge_pdfs_and_store(self, old_pdf, overlay_pdf, password): |
| 172 | + new_pdf = PdfFileWriter() |
| 173 | + for i in range(old_pdf.getNumPages()): |
| 174 | + page = old_pdf.getPage(i) |
| 175 | + page.mergePage(overlay_pdf.getPage(i)) |
| 176 | + new_pdf.addPage(page) |
| 177 | + if old_pdf.isEncrypted: |
| 178 | + new_pdf.encrypt(password) |
| 179 | + |
| 180 | + output = io.BytesIO() |
| 181 | + try: |
| 182 | + new_pdf.write(output) |
| 183 | + except PdfReadError: |
| 184 | + raise ValidationError( |
| 185 | + _( |
| 186 | + "There was an issue downloading your document. Please contact an administrator." |
| 187 | + ) |
| 188 | + ) |
| 189 | + self.completed_document = base64.b64encode(output.getvalue()) |
| 190 | + output.close() |
| 191 | + |
| 192 | + def _create_attachment_from_completed_doc(self): |
| 193 | + filename = ( |
| 194 | + self.reference |
| 195 | + if self.reference.endswith(".pdf") |
| 196 | + else f"{self.reference}.pdf" |
| 197 | + ) |
| 198 | + return self.env["ir.attachment"].create( |
| 199 | + { |
| 200 | + "name": filename, |
| 201 | + "datas": self.completed_document, |
| 202 | + "type": "binary", |
| 203 | + "res_model": self._name, |
| 204 | + "res_id": self.id, |
| 205 | + } |
| 206 | + ) |
| 207 | + |
| 208 | + def _create_completion_certificate(self): |
| 209 | + public_user = ( |
| 210 | + self.env.ref("base.public_user", raise_if_not_found=False) or self.env.user |
| 211 | + ) |
| 212 | + pdf_content, _ = ( |
| 213 | + self.env["ir.actions.report"] |
| 214 | + .with_user(public_user) |
| 215 | + .sudo() |
| 216 | + ._render_qweb_pdf( |
| 217 | + "sign.action_sign_request_print_logs", |
| 218 | + self.ids, |
| 219 | + data={ |
| 220 | + "format_date": format_date, |
| 221 | + "company_id": self.communication_company_id, |
| 222 | + }, |
| 223 | + ) |
| 224 | + ) |
| 225 | + return self.env["ir.attachment"].create( |
| 226 | + { |
| 227 | + "name": f"Certificate of completion - {time.strftime('%Y-%m-%d - %H:%M:%S')}.pdf", |
| 228 | + "raw": pdf_content, |
| 229 | + "type": "binary", |
| 230 | + "res_model": self._name, |
| 231 | + "res_id": self.id, |
| 232 | + } |
| 233 | + ) |
| 234 | + |
| 235 | + def _attach_completed_documents(self, doc_attachment, log_attachment): |
| 236 | + self.completed_document_attachment_ids = [ |
| 237 | + Command.set([doc_attachment.id, log_attachment.id]) |
| 238 | + ] |
| 239 | + |
| 240 | + |
| 241 | +class SignRequestItem(models.Model): |
| 242 | + _inherit = "sign.request.item" |
| 243 | + |
| 244 | + def _get_user_signature_asset(self, asset_type): |
| 245 | + self.ensure_one() |
| 246 | + sign_user = self.partner_id.user_ids[:1] |
| 247 | + if sign_user and asset_type in ["stamp_sign_stamp", "stamp_sign_stamp_frame"]: |
| 248 | + return sign_user[asset_type] |
| 249 | + return False |
0 commit comments