Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 149 additions & 83 deletions modules/backup/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@
log injection via forged line breaks or terminal control sequences.
- Truncates overly long values to avoid log flooding.
"""
# Ensure we are working with a string representation
if not isinstance(value, str):
value = str(value)
# Strip all ASCII control characters (U+0000–U+001F and U+007F), including CR/LF
control_chars = ''.join(chr(i) for i in range(32)) + chr(127)
translation_table = str.maketrans('', '', control_chars)
cleaned = value.translate(translation_table)
Expand Down Expand Up @@ -82,7 +80,6 @@
use_external_storage = db.Column(db.Boolean, default=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow)

# We only need Settings reference for auto_backup_enabled check
class Settings(db.Model):
__tablename__ = 'settings'
__table_args__ = {'extend_existing': True}
Expand Down Expand Up @@ -110,7 +107,7 @@
if encrypt:
cfg = module._get_config()
if cfg.encrypt_method == 'none':
password = None # no encryption even if requested
password = None
elif cfg.encrypt_method == 'custom' and cfg.custom_password:
password = cfg.custom_password
else:
Expand Down Expand Up @@ -217,12 +214,10 @@
'ADD COLUMN use_external_storage BOOLEAN DEFAULT 1'))
conn.commit()
except Exception as e:
logger.debug('backup_config migration: %s', e) # table may not exist yet
logger.debug('backup_config migration: %s', e)

# Run startup backup immediately (first launch of the day)
self._perform_startup_backup()

# Register daily backup job with the scheduler
self.core.scheduler.add_job(
job_id='backup.daily',
func=self._scheduled_backup,
Expand Down Expand Up @@ -270,7 +265,6 @@
enc_none = 'checked' if cfg.encrypt_method == 'none' else ''
enc_app = 'checked' if cfg.encrypt_method == 'app_password' else ''
enc_cust = 'checked' if cfg.encrypt_method == 'custom' else ''
# Default to app_password if not set to any known value
if not enc_none and not enc_cust:
enc_app = 'checked'
cp = cfg.custom_password or ''
Expand All @@ -293,15 +287,13 @@
'Full backups include database (JSON) and all uploaded files '
'in a ZIP archive.</p>')

# auto backup checkbox
a('<div style="margin-bottom:15px;">')
a('<label style="display:block;font-weight:normal;cursor:pointer;">')
a(f'<input type="checkbox" name="auto_backup_enabled" {auto_ck}'
f' style="{RS}"> Enable automatic daily backup</label>')
a('<small style="display:block;margin-top:4px;margin-left:24px;'
'color:#666;">Runs once per day on startup.</small></div>')

# custom backup path
a('<div class="form-group">')
a('<label for="bk_backup_path">Backup Directory (optional)</label>')
a(f'<input type="text" id="bk_backup_path" name="bk_backup_path"'
Expand All @@ -310,7 +302,6 @@
a('<small style="display:block;margin-top:4px;color:#666;">'
'Absolute path or relative to app root.</small></div>')

# external storage option
if ext_ok:
a('<div style="margin-bottom:15px;">')
a('<label style="display:block;font-weight:normal;cursor:pointer;">')
Expand All @@ -325,7 +316,6 @@
'\u2139\ufe0f Enable the <strong>External Storage</strong> '
'module to send backups to S3 or other remote storage.</div>')

# encryption
LS = 'display:block;margin-bottom:8px;font-weight:normal;cursor:pointer;'
a('<div style="margin-bottom:20px;">')
a('<label style="display:block;margin-bottom:8px;font-weight:bold;'
Expand Down Expand Up @@ -354,22 +344,18 @@

a('<hr style="margin:20px 0;border:none;border-top:1px solid #e0e0e0;">')

# create backup buttons
a('<h4 style="margin-bottom:10px;">Create New Backup</h4>')
a('<div style="margin-bottom:20px;">')
a(self._btn_backup(encrypt=True))
a(self._btn_backup(encrypt=False))
a('</div>')

# restore / demo data
a('<h4 style="margin-bottom:10px;">Restore Data</h4>')
a('<div style="margin-bottom:20px;">')
# Upload JSON backup
js_upload = "window.location.href='/backup/upload-restore';"
a(f'<button type="button" class="btn btn-info"'
f' style="margin-right:10px;" onclick="{js_upload}">'
f'\U0001f4e4 Upload JSON Backup</button>')
# Load demo data
demo_path = Path(self.core.app_path) / 'demo_data.json'
if demo_path.exists():
js_demo = (
Expand All @@ -386,7 +372,6 @@
f'\U0001f9ea Load Demo Data</button>')
a('</div>')

# backup list
a('<h4>Saved Backups</h4>')
if backups:
a(self._render_backup_table(backups))
Expand Down Expand Up @@ -475,8 +460,6 @@
return '\n'.join(h)

def save_settings(self, settings, form):
# Only update auto_backup if the field was actually in the form
# (i.e. submitted from the Security tab, not General Settings)
if 'bk_encrypt_method' in form:
settings.auto_backup_enabled = form.get('auto_backup_enabled') == 'on'
try:
Expand Down Expand Up @@ -539,7 +522,7 @@
data = {'version': '2.1', 'created_at': datetime.now().isoformat(),
'tables': {}}
inspector = sa_inspect(self._db.engine)
skip = {'backup_config'} # don't backup our own config
skip = {'backup_config'}
for table_name in inspector.get_table_names():
if table_name in skip:
continue
Expand Down Expand Up @@ -573,29 +556,64 @@
raw = self._decrypt_bytes(raw, password)
except Exception:
return False, 'Decryption failed. Wrong password?'
db_path = Path('instance/invoices.db')

# Validate it's actually a ZIP before proceeding
if not zipfile.is_zipfile(BytesIO(raw)):
return False, 'Invalid backup file (not a valid ZIP). If encrypted, check your password.'

# Pre-restore safety copy of the SQLite DB — use app_path, not CWD
app_root = Path(self.core.app_path)
db_path = app_root / 'instance' / 'invoices.db'
if db_path.exists():
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
shutil.copy2(db_path, f'instance/invoices.db.backup_{ts}')
app_root = Path(self.core.app_path)
try:
shutil.copy2(db_path, db_path.parent / f'invoices.db.backup_{ts}')
except Exception as e:
logger.warning('Could not create pre-restore DB backup: %s', e)

with zipfile.ZipFile(BytesIO(raw), 'r') as zf:
if 'db_backup.json' in zf.namelist():
jd = json.loads(zf.read('db_backup.json').decode('utf-8'))
names = zf.namelist()

# Restore database
if 'db_backup.json' in names:
try:
jd = json.loads(zf.read('db_backup.json').decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
return False, f'db_backup.json is corrupt or unreadable: {e}'
ok, msg = self._restore_db_from_json(jd)
if not ok:
return False, msg
for name in zf.namelist():
else:
logger.warning('No db_backup.json found in archive — skipping DB restore')

# Restore uploaded files
files_restored = 0
files_skipped = 0
for name in names:
if name == 'db_backup.json':
continue
parts = name.split('/')
if parts[0] in FILE_FOLDERS:
target = app_root / name
if name.endswith('/'): # skip directory entries
continue
parts = name.replace('\\', '/').split('/')
if not parts or parts[0] not in FILE_FOLDERS:
files_skipped += 1
continue
target = app_root / name
try:
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(zf.read(name))
return True, 'Backup restored! Please restart the application.'
files_restored += 1
except Exception as e:
logger.warning('Could not restore file %s: %s', name, e)
files_skipped += 1

logger.info('Restore complete: %d files restored, %d skipped',
files_restored, files_skipped)
return True, f'Backup restored ({files_restored} files). Please restart the application.'
except zipfile.BadZipFile:
return False, 'Invalid backup file (not a valid ZIP)'
except Exception as e:
logger.error('Restore failed: %s', e, exc_info=True)
return False, f'Error restoring backup: {e}'

def _restore_db_from_json(self, json_data):
Expand All @@ -605,7 +623,6 @@
_SAFE_NAME = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$')

def _check_name(name):
"""Validate identifier against schema and allowed characters."""
if not _SAFE_NAME.match(name):
raise ValueError(f'Invalid identifier: {name!r}')
return name
Expand All @@ -617,7 +634,7 @@
existing = set(inspector.get_table_names())
skip = {'backup_config', 'module_enabled'}

# Build dependency order: tables with FKs come after referenced tables
# Build dependency graph
fk_deps = {}
for tname in existing:
if tname in skip:
Expand All @@ -629,76 +646,125 @@
refs.add(ref)
fk_deps[tname] = refs

# Topological sort for delete (reverse) and insert (forward)
# Topological sort with cycle detection
ordered = []
visited = set()
in_progress = set()

def visit(t):
if t in visited or t not in fk_deps:
if t in visited:
return
visited.add(t)
if t in in_progress:
logger.warning(
'Circular FK dependency at table %s — skipping cycle', t)
return
if t not in fk_deps:
return
in_progress.add(t)
for dep in fk_deps.get(t, set()):
visit(dep)
in_progress.discard(t)
visited.add(t)
ordered.append(t)

for t in fk_deps:
for t in list(fk_deps.keys()):
visit(t)

# Delete in reverse order (children first)
for tname in reversed(ordered):
if tname in tables or tname in existing:
# Delete in reverse order (children first), all in one transaction
with db.engine.begin() as conn:
for tname in reversed(ordered):
try:
safe_t = _check_name(tname)
db.session.execute(text(f'DELETE FROM "{safe_t}"'))
conn.execute(text(f'DELETE FROM "{safe_t}"'))
except Exception as e:
logger.debug('Could not clear table %s: %s', tname, e)

# Insert in forward order (parents first)
date_fields = {'invoice_date', 'due_date', 'expense_date'}
dt_fields = {'created_at', 'updated_at'}
for tname in ordered:
if tname not in tables:
continue
safe_t = _check_name(tname)
cols = [c['name'] for c in inspector.get_columns(tname)]
for rd in tables[tname]:
# Normalize date and datetime fields if present
for k, v in list(rd.items()):
if v and k in date_fields:
try:
rd[k] = datetime.fromisoformat(v).date()
except (ValueError, TypeError) as exc:
safe_k = _sanitize_for_log(k)
safe_v = _sanitize_for_log(repr(v))
logger.debug(
"Skipping invalid date value for key '%s': %r (%s)",
safe_k, safe_v, exc
)
elif v and k in dt_fields:
try:
rd[k] = datetime.fromisoformat(v)
except (ValueError, TypeError) as exc:
safe_k = _sanitize_for_log(k)
safe_v = _sanitize_for_log(repr(v))
logger.debug(
"Skipping invalid datetime value for user key [%s]: %r (%s)",
safe_k, safe_v, exc
)
# Only insert columns that exist in current schema
row_cols = [_check_name(c) for c in rd if c in cols]
if not row_cols:
# Known date and datetime field names for coercion
date_fields = {
'invoice_date', 'due_date', 'expense_date',
'payment_date', 'document_date', 'expiry_date',
}
dt_fields = {
'created_at', 'updated_at', 'uploaded_at', 'signed_at',
}

def _coerce(v, field):
"""Coerce a JSON value to the right Python type for date/datetime fields."""
if v is None:
return None
# Already the right type
if field in date_fields and hasattr(v, 'year'):
return v
if field in dt_fields and isinstance(v, datetime):
return v
s = str(v)
try:
dt = datetime.fromisoformat(s)
return dt.date() if field in date_fields else dt
except (ValueError, TypeError) as exc:
logger.debug('Skipping bad date for %s=%r: %s', field, v, exc)

Check warning

Code scanning / CodeQL

Log Injection Medium

This log entry depends on a
user-provided value
.

Check warning

Code scanning / CodeQL

Log Injection Medium

This log entry depends on a
user-provided value
.
return None

# Insert in forward order (parents first), all in one transaction
with db.engine.begin() as conn:
for tname in ordered:
if tname not in tables:
continue
placeholders = ', '.join(f':{c}' for c in row_cols)
col_names = ', '.join(f'"{c}"' for c in row_cols)
vals = {c: rd[c] for c in row_cols}
db.session.execute(
text(f'INSERT INTO "{safe_t}" ({col_names}) '
f'VALUES ({placeholders})'), vals)

db.session.commit()
safe_t = _check_name(tname)
cols = {c['name'] for c in inspector.get_columns(tname)}

for rd in tables[tname]:
# Coerce date/datetime fields
coerced = {}
for k, v in rd.items():
if k in date_fields and v is not None:
coerced[k] = _coerce(v, k)
elif k in dt_fields and v is not None:
coerced[k] = _coerce(v, k)
else:
coerced[k] = v

# Only insert columns that exist in current schema
row_cols = [_check_name(c) for c in coerced if c in cols]
if not row_cols:
continue
placeholders = ', '.join(f':{c}' for c in row_cols)
col_names = ', '.join(f'"{c}"' for c in row_cols)
vals = {c: coerced[c] for c in row_cols}
try:
conn.execute(
text(f'INSERT INTO "{safe_t}" ({col_names}) '
f'VALUES ({placeholders})'),
vals)
except Exception as e:
logger.warning(
'Skipping row in %s due to error: %s | row keys: %s',
tname, e, list(rd.keys())[:5])

Check warning

Code scanning / CodeQL

Log Injection Medium

This log entry depends on a
user-provided value
.

# Reset SQLite autoincrement sequences so subsequent inserts get
# correct IDs and don't collide with restored data.
try:
rows = conn.execute(
text("SELECT name FROM sqlite_sequence")).fetchall()
seq_tables = {r[0] for r in rows}
for tname in ordered:
if tname not in seq_tables:
continue
try:
safe_t = _check_name(tname)
conn.execute(text(
'UPDATE sqlite_sequence '
'SET seq = (SELECT COALESCE(MAX(id), 0) FROM "'
+ safe_t + '") '
'WHERE name = :t'), {'t': tname})
except Exception:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.
pass
except Exception:
pass # sqlite_sequence may not exist if no autoincrement tables

return True, 'OK'
except Exception as e:
db.session.rollback()
logger.error('DB restore error: %s', e, exc_info=True)
return False, f'DB restore error: {e}'

# ── encryption ──────────────────────────────────────────────────
Expand Down
Loading