diff --git a/app/database_query_facade.py b/app/database_query_facade.py index 8d363d2e9..32fb4e4ee 100644 --- a/app/database_query_facade.py +++ b/app/database_query_facade.py @@ -1,4 +1,5 @@ from datetime import datetime, timedelta +from re import U import sqlite3 import json @@ -12,9 +13,16 @@ desc, or_, and_, + not_, literal, - func) - + func, + distinct, + exists, + literal_column, + inspect, + case) + +from app.routes.keyword_monitor import KeywordGroup from database_models import (t_keyword_monitor_settings as keyword_monitor_settings, t_keyword_monitor_status as keyword_monitor_status, t_keyword_article_matches as keyword_article_matches, @@ -25,7 +33,22 @@ t_organizational_profiles as organizational_profiles, t_keyword_alerts as keyword_alerts, t_oauth_allowlist as oauth_allowlist, - t_oauth_users as oauth_users) + t_oauth_users as oauth_users, + t_podcasts as podcasts, + t_model_bias_arena_runs as model_bias_arena_runs, + t_model_bias_arena_results as model_bias_arena_results, + t_model_bias_arena_articles as model_bias_arena_articles, + t_mediabias as mediabias, + t_mediabias_settings as mediabias_settings, + t_feed_items as feed_items, + t_feed_keyword_groups as feed_keyword_groups, + t_feed_group_sources as feed_group_sources, + t_user_feed_subscriptions as user_feed_subscriptions, + t_keyword_monitor_checks as keyword_monitor_checks, + t_raw_articles as raw_articles, + t_paper_search_results as paper_search_results, + t_news_search_results as news_search_results, + t_keyword_alert_articles as keyword_alert_articles) class DatabaseQueryFacade: @@ -149,88 +172,77 @@ def article_exists(self, params): ).fetchone() return article_exists - def create_article(self, article_exists, article_url, article, topic, keyword_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - conn.execute("BEGIN IMMEDIATE") + def create_article(self, article_exists, article_url, article, topic, keyword_id): + self.connection.begin() + try: + inserted_new_article = False + if not article_exists: + # Save new article + self.connection.execute(insert(articles).values( + uri=article_url, + title=article['title'], + news_source=article['source'], + publication_date=article['published_date'], + summary=article.get('summary', ''), + topic=topic, + analyzed=False + )) + + inserted_new_article = True + self.logger.info(f"Inserted new article: {article_url}") + + # Create alert + is_keyword_alert_exists = self.connection.execute( + select(keyword_alerts).where(keyword_alerts.c.article_uri == article_url, keyword_alerts.c.keyword_id == keyword_id) + ).fetchone() + + alert_inserted = False + if not is_keyword_alert_exists: + result = self.connection.execute(insert(keyword_alerts).values( + keyword_id=keyword_id, + article_uri=article_url)) + alert_inserted = result.rowcount > 0 + + # Get the group_id for this keyword + group_id = self.connection.execute( + select(monitored_keywords.c.group_id).where(monitored_keywords.c.id == keyword_id) + ).fetchone().scalar() + + # Check if we already have a match for this article in this group + existing_match = self.connection.execute(select(keyword_article_matches.c.id, keyword_article_matches.c.keyword_ids).where( + keyword_article_matches.c.article_uri == article_url, + keyword_article_matches.c.group_id == group_id)).fetchone() + + match_updated = False + + if existing_match: + # Update the existing match with the new keyword + match_id, keyword_ids = existing_match + keyword_id_list = keyword_ids.split(',') + if str(keyword_id) not in keyword_id_list: + keyword_id_list.append(str(keyword_id)) + updated_keyword_ids = ','.join(keyword_id_list) + result = self.connection.execute(update(keyword_article_matches).where( + keyword_article_matches.c.id == match_id + ).values(keyword_ids = updated_keyword_ids)) - try: - inserted_new_article = False - if not article_exists: - # Save new article - cursor.execute(""" - INSERT INTO articles (uri, title, news_source, publication_date, - summary, topic, analyzed) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, ( - article_url, - article['title'], - article['source'], - article['published_date'], - article.get('summary', ''), # Use get() with default - topic, - False # Explicitly mark as not analyzed - )) - inserted_new_article = True - self.logger.info(f"Inserted new article: {article_url}") - - # Create alert - cursor.execute(""" - INSERT INTO keyword_alerts (keyword_id, article_uri) - VALUES (?, ?) ON CONFLICT DO NOTHING - """, (keyword_id, article_url)) - - alert_inserted = cursor.rowcount > 0 - - # Get the group_id for this keyword - cursor.execute(""" - SELECT group_id - FROM monitored_keywords - WHERE id = ? - """, (keyword_id,)) - group_id = cursor.fetchone()[0] - - # Check if we already have a match for this article in this group - cursor.execute(""" - SELECT id, keyword_ids - FROM keyword_article_matches - WHERE article_uri = ? - AND group_id = ? - """, (article_url, group_id)) - - existing_match = cursor.fetchone() - match_updated = False - - if existing_match: - # Update the existing match with the new keyword - match_id, keyword_ids = existing_match - keyword_id_list = keyword_ids.split(',') - if str(keyword_id) not in keyword_id_list: - keyword_id_list.append(str(keyword_id)) - updated_keyword_ids = ','.join(keyword_id_list) - - cursor.execute(""" - UPDATE keyword_article_matches - SET keyword_ids = ? - WHERE id = ? - """, (updated_keyword_ids, match_id)) - match_updated = True - else: - # Create a new match - cursor.execute(""" - INSERT INTO keyword_article_matches (article_uri, keyword_ids, group_id) - VALUES (?, ?, ?) - """, (article_url, str(keyword_id), group_id)) match_updated = True + else: + # Create a new match + self.connection.execute(insert(keyword_article_matches).values( + article_uri=article_url, + keyword_ids=str(keyword_id), + group_id=group_id)) - # Commit transaction - conn.commit() + match_updated = True + + self.connection.commit() - return inserted_new_article, alert_inserted, match_updated - except Exception as e: - conn.rollback() - raise e + return inserted_new_article, alert_inserted, match_updated + + except Exception as e: + self.connection.rollback() + raise e def update_monitored_keyword_last_checked(self, params): statement = update(monitored_keywords).where(monitored_keywords.c.id == params[1]).values(last_checked = params[0]) @@ -244,18 +256,13 @@ def update_keyword_monitor_counter(self, params): def create_keyword_monitor_log_entry(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() + existing = self.connection.execute(select(keyword_monitor_status).where(keyword_monitor_status.c.id == 1)).fetchone() + if existing: + self.connection.execute(update(keyword_monitor_status).where(keyword_monitor_status.c.id == 1).values(last_check_time = params[0], last_error = params[1], requests_today = params[2])) + else: + self.connection.execute(insert(keyword_monitor_status).values(id = 1, last_check_time = params[0], last_error = params[1], requests_today = params[2])) - cursor.execute(""" - INSERT INTO keyword_monitor_status (id, last_check_time, last_error, requests_today) - VALUES (1, ?, ?, ?) ON CONFLICT(id) DO - UPDATE SET - last_check_time = excluded.last_check_time, - last_error = excluded.last_error, - requests_today = excluded.requests_today - """, params) - conn.commit() + self.connection.commit() def get_keyword_monitor_polling_enabled(self): statement = select( @@ -286,77 +293,74 @@ def get_article_by_url(self, url): return self.connection.execute(statement).fetchone() def create_article_with_extracted_content(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO articles - (uri, title, news_source, submission_date, topic, analyzed, summary) - VALUES (?, ?, ?, datetime('now'), ?, ?, ?) - """, params) + statement = insert( + articles + ).values( + uri=params[0], + title=params[1], + news_source=params[2], + submission_date=func.current_timestamp(), + topic=params[3], + analyzed= params[4], + summary=params[5] + ) + self.connection.execute(statement) + self.connection.commit() async def move_alert_to_articles(self, url: str) -> None: - with self.db.get_connection() as conn: - cursor = conn.cursor() - # First get the alert article - cursor.execute(""" - SELECT * - FROM keyword_alert_articles - WHERE url = ? - AND moved_to_articles = FALSE - """, (url,)) - alert = cursor.fetchone() - - if alert: - # Insert into articles table with analyzed flag - cursor.execute(""" - INSERT INTO articles (url, title, summary, source, topic, analyzed) - VALUES (?, ?, ?, ?, ?, FALSE) - """, (alert['url'], alert['title'], alert['summary'], - alert['source'], alert['topic'])) - - # Mark as moved - cursor.execute(""" - UPDATE keyword_alert_articles - SET moved_to_articles = TRUE - WHERE url = ? - """, (url,)) + statement = select( + keyword_alert_articles + ).where( + keyword_alert_articles.c.url == url, + keyword_alert_articles.c.moved_to_articles == False + ) + alert = self.connection.execute(statement).fetchone() + if alert: + statement = insert( + articles + ).values( + uri=alert['url'], + title=alert['title'], + summary=alert['summary'], + source=alert['source'], + topic=alert['topic'], + analyzed=False + ) + self.connection.execute(statement) + + statement = update( + keyword_alert_articles + ).where( + keyword_alert_articles.c.url == url + ).values( + moved_to_articles = True + ) + self.connection.execute(statement) + self.connection.commit() #### REINDEX CHROMA DB QUERIES #### - def get_iter_articles(self, limit: int | None = None): - with self.db.get_connection() as conn: - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - query = ( - """ - SELECT a.*, r.raw_markdown AS raw - FROM articles a - LEFT JOIN raw_articles r ON a.uri = r.uri - ORDER BY a.rowid - """ - ) - if limit: - query += " LIMIT ?" - params = (limit,) - else: - params = () + def get_iter_articles(self, limit: int | None = None): - cursor.execute(query, params) - for row in cursor.fetchall(): - yield dict(row) + statement = select(articles, raw_articles.c.raw_markdown.label('raw')).select_from( + articles.outerjoin(raw_articles, articles.c.uri == raw_articles.c.uri) + ).order_by(articles.c.rowid) - def save_analysis_version(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() + if limit: + statement = statement.limit(limit) - insert_query = """ - INSERT INTO analysis_versions (topic, version_data, model_used, analysis_depth) - VALUES (?, ?, ?, ?) \ - """ - cursor.execute(insert_query, params) + return self.connection.execute(statement).mappings().fetchall() - conn.commit() + def save_analysis_version(self, params): + statement = insert( + analysis_versions + ).values( + topic=params[0], + version_data=params[1], + model_used=params[2], + analysis_depth=params[3] + ) + self.connection.execute(statement) + self.connection.commit() def get_latest_analysis_version(self, topic): statement = select( @@ -464,17 +468,26 @@ def get_organisational_profiles(self): return self.connection.execute(statement).fetchall() def create_organisational_profile(self, params): - insert_query = """ - INSERT INTO organizational_profiles (name, description, industry, organization_type, region, \ - key_concerns, \ - strategic_priorities, risk_tolerance, innovation_appetite, \ - decision_making_style, stakeholder_focus, \ - competitive_landscape, \ - regulatory_environment, custom_context) \ - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \ - """ - - return self.db.execute_query(insert_query, params) + statement = insert( + organizational_profiles + ).values( + name=params[0], + description=params[1], + industry=params[2], + organization_type=params[3], + region=params[4], + key_concerns=params[5], + strategic_priorities=params[6], + risk_tolerance=params[7], + innovation_appetite=params[8], + decision_making_style=params[9], + stakeholder_focus=params[10], + competitive_landscape=params[11], + regulatory_environment=params[12], + custom_context=params[13] + ) + + return self.connection.execute(statement) def delete_organisational_profile(self, profile_id): statement = delete(organizational_profiles).where(organizational_profiles.c.id == profile_id) @@ -821,18 +834,22 @@ def get_recent_articles_for_market_signal_analysis(self, timeframe_days, topic_n def get_topic_filtered_future_signals_with_counts_for_market_signal_analysis(self, topic_name): # We need actual counts, not just the config list - query = """ - SELECT future_signal, COUNT(*) as count - FROM articles - WHERE topic = ? - AND future_signal IS NOT NULL - AND future_signal != '' - AND analyzed = 1 - GROUP BY future_signal - ORDER BY count DESC \ - """ - - return self.db.fetch_all(query, (topic_name,)) + statement = select( + articles.c.future_signal, + func.count().label('count') + ).where( + and_( + articles.c.topic == topic_name, + articles.c.future_signal != None, + articles.c.future_signal != '', + articles.c.analyzed == 1 + ) + ).group_by( + articles.c.future_signal + ).order_by( + desc(func.count()) + ) + return self.connection.execute(statement).fetchall() #### TOPIC MAP ROUTES #### def get_unique_topics(self): @@ -910,13 +927,15 @@ def is_oauth_user_allowed(self, email): return count > 0 def add_oauth_user_to_allowlist(self, email, added_by): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "INSERT OR REPLACE INTO oauth_allowlist (email, added_by) VALUES (?, ?)", - (email.lower(), added_by) - ) - conn.commit() + is_email_exists = self.is_oauth_user_allowed(email) + if is_email_exists: + update_statement = update(oauth_allowlist).where(oauth_allowlist.c.email == email).values(email = email, added_by = added_by) + self.connection.execute(update_statement) + else: + insert_statement = insert(oauth_allowlist).values(email = email, added_by = added_by) + self.connection.execute(insert_statement) + + self.connection.commit() def remove_oauth_user_from_allowlist(self, email): statement = update( @@ -999,17 +1018,19 @@ def update_oauth_allowlist_user(self, params): self.connection.commit() def create_oauth_allowlist_user(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO oauth_users (email, name, provider, provider_id, avatar_url) - VALUES (?, ?, ?, ?, ?) - """, params) - - conn.commit() + statement = insert( + oauth_users + ).values( + email=params[0], + name=params[1], + provider=params[2], + provider_id=params[3], + avatar_url=params[4] + ) + result =self.connection.execute(statement) + self.connection.commit() - return cursor.lastrowid + return result.inserted_primary_key[0] #### ENDPOINT QUERIES #### def get_oauth_allow_list(self): @@ -1025,27 +1046,38 @@ def get_oauth_allow_list(self): return self.connection.execute(statement).fetchall() def get_oauth_system_status_and_settings(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - # Count allowlist entries - cursor.execute("SELECT COUNT(*) FROM oauth_allowlist WHERE is_active = 1") - allowlist_count = cursor.fetchone()[0] - - # Count OAuth users - cursor.execute("SELECT COUNT(*) FROM oauth_users WHERE is_active = 1") - oauth_users_count = cursor.fetchone()[0] - - # Get recent logins - cursor.execute(""" - SELECT provider, COUNT(*) as count - FROM oauth_users - WHERE is_active = 1 - GROUP BY provider - """) - provider_stats = {row[0]: row[1] for row in cursor.fetchall()} - - return allowlist_count, oauth_users_count, provider_stats + #count allowlist entries + statement = select( + func.count() + ).select_from( + oauth_allowlist + ).where( + oauth_allowlist.c.is_active == 1 + ) + allowlist_count = self.connection.execute(statement).fetchone().scalar() + + #count Oauth users + statement = select( + func.count() + ).select_from( + oauth_users + ).where( + oauth_users.c.is_active == 1 + ) + oauth_users_count = self.connection.execute(statement).fetchone().scalar() + + #get recent logins + statement = select( + oauth_users.c.provider, + func.count().label('count') + ).where( + oauth_users.c.is_active == 1 + ).group_by( + oauth_users.c.provider + ) + provider_stats = {row[0]: row[1] for row in self.connection.execute(statement).fetchall()} + + return allowlist_count, oauth_users_count, provider_stats def get_feed_item_tags(self, item_id): statement = select( @@ -1453,36 +1485,43 @@ def get_feed_keywords_by_source_type(self, source_type): return self.connection.execute(statement).fetchall() def get_statistics_for_specific_feed_group(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() + # Get total items count + statement = select( + func.count() + ).select_from( + feed_items + ).where( + feed_items.c.group_id == group_id + ) - # Get total items count - cursor.execute(""" - SELECT COUNT(*) - FROM feed_items - WHERE group_id = ? - """, (group_id,)) - total_items = cursor.fetchone()[0] + total_items = self.connection.execute(statement).fetchone().scalar() + + # Get counts by source type + statement = select( + feed_items.c.source_type, + func.count().label('count') + ).where( + feed_items.c.group_id == group_id + ).group_by( + feed_items.c.source_type + ) + source_counts = dict(self.connection.execute(statement).fetchall()) - # Get counts by source type - cursor.execute(""" - SELECT source_type, COUNT(*) - FROM feed_items - WHERE group_id = ? - GROUP BY source_type - """, (group_id,)) - source_counts = dict(cursor.fetchall()) + # Get recent items count (last 7 days) + # Calculate 7 days ago in Python (portable across dialects) + seven_days_ago = datetime.utcnow() - timedelta(days=7) - # Get recent items count (last 7 days) - cursor.execute(""" - SELECT COUNT(*) - FROM feed_items - WHERE group_id = ? - AND publication_date >= datetime('now', '-7 days') - """, (group_id,)) - recent_items = cursor.fetchone()[0] + statement = select( + func.count() + ).select_from( + feed_items + ).where( + feed_items.c.group_id == group_id, + feed_items.c.publication_date >= seven_days_ago + ) + recent_items = self.connection.execute(statement).fetchone().scalar() - return total_items, source_counts, recent_items + return total_items, source_counts, recent_items def get_is_keyword_monitor_enabled(self): settings = self.get_keyword_monitor_settings_by_id(1) @@ -1590,13 +1629,15 @@ def get_topic(self, topic): return self.connection.execute(statement).fetchone() def get_articles_count_from_topic_and_categories(self, placeholders, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - f"SELECT COUNT(*) FROM articles WHERE LOWER(topic) = LOWER(?) AND category IN ({placeholders})", params) - - return cursor.fetchone()[0] + statement = select( + func.count() + ).select_from( + articles + ).where( + func.lower(articles.c.topic) == func.lower(params[0]), + articles.c.category.in_(placeholders) + ) + return self.connection.execute(statement).fetchone().scalar() def get_article_count_for_topic(self, topic): statement = select( @@ -1609,27 +1650,25 @@ def get_article_count_for_topic(self, topic): return self.connection.execute(statement).fetchone().scalar() def get_recent_articles_for_topic_and_category(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT title, - news_source, - uri, - sentiment, - future_signal, - time_to_impact, - publication_date - FROM articles - WHERE LOWER(topic) = LOWER(?) - AND LOWER(category) = LOWER(?) - AND date (publication_date) >= date ('now' - , '-' || ? || ' days') - ORDER BY publication_date DESC - LIMIT 5 - """, params) + statement = select( + articles.c.title, + articles.c.news_source, + articles.c.uri, + articles.c.sentiment, + articles.c.future_signal, + articles.c.time_to_impact, + articles.c.publication_date + ).where( + and_( + func.lower(articles.c.topic) == func.lower(params[0]), + func.lower(articles.c.category) == func.lower(params[1]), + articles.c.publication_date >= datetime.utcnow() - timedelta(days=params[2]) + ) + ).order_by( + articles.c.publication_date.desc() + ).limit(5) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def get_categories_for_topic(self, topic): statement = select( @@ -1643,54 +1682,45 @@ def get_categories_for_topic(self, topic): return [row[0] for row in self.connection.execute(statement).fetchall()] def get_podcasts_columns(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("PRAGMA table_info(podcasts)") - - table_info = cursor.fetchall() - - return [col[1] for col in table_info] + return [col.name for col in podcasts.columns] def generate_latest_podcasts(self, topic, column_names, has_transcript, has_topic, has_audio_url): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - # Build list of columns to select based on what's available - select_columns = ['id'] - if 'title' in column_names: - select_columns.append('title') - else: - select_columns.append("'Untitled Podcast' as title") + select_columns = [podcasts.c.id] - if 'created_at' in column_names: - select_columns.append('created_at') - else: - select_columns.append('NULL as created_at') - - if has_audio_url: - select_columns.append('audio_url') + if 'title' in column_names: + select_columns.append(podcasts.c.title) + else: + select_columns.append(literal("Untitled Podcast").label("title")) - if has_transcript: - select_columns.append('transcript_text') + if 'created_at' in column_names: + select_columns.append(podcasts.c.created_at) + else: + select_columns.append(literal(None).label("created_at")) - # Build query - query = f"SELECT {', '.join(select_columns)} FROM podcasts " + if has_audio_url: + select_columns.append(podcasts.c.audio_url) - # Add WHERE clause if we can filter by topic - params = [] - if has_topic: - query += "WHERE topic = ? OR topic IS NULL OR topic = 'General' " - params.append(topic) + if has_transcript: + select_columns.append(podcasts.c.transcript) - # Add ORDER BY and LIMIT - query += "ORDER BY created_at DESC LIMIT 1" + # Build base statement + statement = select(*select_columns).select_from(podcasts) - # Execute query + # Add WHERE clause if needed + if has_topic: + statement = statement.where( + or_( + podcasts.c.topic == topic, + podcasts.c.topic.is_(None), + podcasts.c.topic == "General" + ) + ) - cursor.execute(query, params) + # Add ORDER BY and LIMIT + statement = statement.order_by(podcasts.c.created_at.desc()).limit(1) - return cursor.fetchone() + # Execute and return result + return self.connection.execute(statement).fetchone() def get_articles_for_date_range(self, limit, topic, start_date, end_date): statement = select( @@ -1710,437 +1740,421 @@ def get_articles_for_date_range(self, limit, topic, start_date, end_date): return column_names, articles_list def enriched_articles(self, limit): - with self.db.get_connection() as conn: - cursor = conn.cursor() - # Query for articles that have a non-null and non-empty category - query = """ - SELECT * - FROM articles - WHERE category IS NOT NULL \ - AND category != '' - ORDER BY submission_date DESC - LIMIT ? \ - """ - - cursor.execute(query, (limit,)) - articles = [] - - for row in cursor.fetchall(): - # Convert row to dictionary - article_dict = {} - for idx, col in enumerate(cursor.description): - article_dict[col[0]] = row[idx] - - # Process tags - if article_dict.get('tags'): - article_dict['tags'] = article_dict['tags'].split(',') - else: - article_dict['tags'] = [] - - articles.append(article_dict) - - return articles + # Query for articles that have a non-null and non-empty category + statement = select(articles).where( + articles.c.category.isnot(None), + articles.c.category != '' + ).order_by( + articles.c.submission_date.desc() + ).limit(limit) + + articles_list = self.connection.execute(statement).mappings().fetchall() + + articles = [] + for article in articles_list: + if article.get('tags'): + article['tags'] = article['tags'].split(',') + else: + article['tags'] = [] + + articles.append(article) + + return articles def create_model_bias_arena_runs(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = insert(model_bias_arena_runs).values( + name=params[0], + description=params[1], + benchmark_model=params[2], + selected_models=params[3], + article_count=params[4], + rounds=params[5], + current_round=params[6], + status='running' + ) - cursor.execute(""" - INSERT INTO model_bias_arena_runs - (name, description, benchmark_model, selected_models, article_count, rounds, current_round, - status) - VALUES (?, ?, ?, ?, ?, ?, ?, 'running') - """, - params) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.lastrowid + return result.inserted_primary_key[0] def store_evaluation_results(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO model_bias_arena_results - (run_id, article_uri, model_name, response_text, bias_score, - confidence_score, response_time_ms, error_message) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, params) - conn.commit() + statement = insert(model_bias_arena_results).values( + run_id=params[0], + article_uri=params[1], + model_name=params[2], + response_text=params[3], + bias_score=params[4], + confidence_score=params[5], + response_time_ms=params[6], + error_message=params[7] + ) + self.connection.execute(statement) + self.connection.commit() def store_ontological_results(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO model_bias_arena_results - (run_id, article_uri, model_name, response_text, - response_time_ms, sentiment, sentiment_explanation, future_signal, - future_signal_explanation, time_to_impact, time_to_impact_explanation, - driver_type, driver_type_explanation, category, category_explanation, - political_bias, political_bias_explanation, factuality, factuality_explanation, - round_number) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, params) - conn.commit() + statement = insert(model_bias_arena_results).values( + run_id=params[0], + article_uri=params[1], + model_name=params[2], + response_text=params[3], + response_time_ms=params[4], + sentiment=params[5], + sentiment_explanation=params[6], + future_signal=params[7], + future_signal_explanation=params[8], + time_to_impact=params[9], + time_to_impact_explanation=params[10], + driver_type=params[11], + driver_type_explanation=params[12], + category=params[13], + category_explanation=params[14], + political_bias=params[15], + political_bias_explanation=params[16], + factuality=params[17], + factuality_explanation=params[18], + round_number=params[19] + ) + self.connection.execute(statement) + self.connection.commit() def update_run_status(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = update(model_bias_arena_runs).where(model_bias_arena_runs.c.id == params[1]).values( + status=params[0], + completed_at=func.current_timestamp() + ) - cursor.execute(""" - UPDATE model_bias_arena_runs - SET status = ?, - completed_at = CURRENT_TIMESTAMP - WHERE id = ? - """, params) - conn.commit() + self.connection.execute(statement) + self.connection.commit() def get_run_details(self, run_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT id, - name, - description, - benchmark_model, - selected_models, - article_count, - rounds, - current_round, - created_at, - completed_at, - status - FROM model_bias_arena_runs - WHERE id = ? - """, (run_id,)) - - return cursor.fetchone() + statement = select( + model_bias_arena_runs.c.id, + model_bias_arena_runs.c.name, + model_bias_arena_runs.c.description, + model_bias_arena_runs.c.benchmark_model, + model_bias_arena_runs.c.selected_models, + model_bias_arena_runs.c.article_count, + model_bias_arena_runs.c.rounds, + model_bias_arena_runs.c.current_round, + model_bias_arena_runs.c.created_at, + model_bias_arena_runs.c.completed_at, + model_bias_arena_runs.c.status + ).where(model_bias_arena_runs.c.id == run_id) + + return self.connection.execute(statement).fetchone() def get_ontological_results_with_article_info(self, run_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT r.model_name, - r.article_uri, - r.sentiment, - r.sentiment_explanation, - r.future_signal, - r.future_signal_explanation, - r.time_to_impact, - r.time_to_impact_explanation, - r.driver_type, - r.driver_type_explanation, - r.category, - r.category_explanation, - r.political_bias, - r.political_bias_explanation, - r.factuality, - r.factuality_explanation, - r.confidence_score, - r.response_time_ms, - r.error_message, - r.response_text, - r.round_number, - maa.article_title, - maa.article_summary - FROM model_bias_arena_results r - JOIN model_bias_arena_articles maa ON r.article_uri = maa.article_uri - AND r.run_id = maa.run_id - WHERE r.run_id = ? - ORDER BY r.article_uri, r.model_name, r.round_number - """, (run_id,)) + statement = select( + model_bias_arena_results.c.model_name, + model_bias_arena_results.c.article_uri, + model_bias_arena_results.c.sentiment, + model_bias_arena_results.c.sentiment_explanation, + model_bias_arena_results.c.future_signal, + model_bias_arena_results.c.future_signal_explanation, + model_bias_arena_results.c.time_to_impact, + model_bias_arena_results.c.time_to_impact_explanation, + model_bias_arena_results.c.driver_type, + model_bias_arena_results.c.driver_type_explanation, + model_bias_arena_results.c.category, + model_bias_arena_results.c.category_explanation, + model_bias_arena_results.c.political_bias, + model_bias_arena_results.c.political_bias_explanation, + model_bias_arena_results.c.factuality, + model_bias_arena_results.c.factuality_explanation, + model_bias_arena_results.c.confidence_score, + model_bias_arena_results.c.response_time_ms, + model_bias_arena_results.c.error_message, + model_bias_arena_results.c.response_text, + model_bias_arena_results.c.round_number, + model_bias_arena_articles.c.article_title, + model_bias_arena_articles.c.article_summary + ).select_from( + model_bias_arena_results + .join( + model_bias_arena_articles, + and_( + model_bias_arena_results.c.article_uri == model_bias_arena_articles.c.article_uri, + model_bias_arena_results.c.run_id == model_bias_arena_articles.c.run_id + ) + ) + ).where( + model_bias_arena_results.c.run_id == run_id + ).order_by( + model_bias_arena_results.c.article_uri, + model_bias_arena_results.c.model_name, + model_bias_arena_results.c.round_number + ) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def get_benchmark_data_including_media_bias_info(self, run_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT a.uri, - a.title, - a.sentiment, - a.future_signal, - a.time_to_impact, - a.driver_type, - a.category, - a.sentiment_explanation, - a.future_signal_explanation, - a.time_to_impact_explanation, - a.driver_type_explanation, - a.bias, - a.factual_reporting, - a.mbfc_credibility_rating, - a.bias_country, - a.press_freedom, - a.media_type, - a.popularity, - a.news_source - FROM model_bias_arena_articles maa - JOIN articles a ON maa.article_uri = a.uri - WHERE maa.run_id = ? - ORDER BY a.uri - """, (run_id,)) - - return cursor.fetchall() - - def delete_run(self, run_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("DELETE FROM model_bias_arena_runs WHERE id = ?", (run_id,)) - conn.commit() - - return cursor.rowcount - - def get_source_bias_validation_data(self, url): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT bias, - factual_reporting, - mbfc_credibility_rating, - bias_country, - press_freedom, - media_type, - popularity - FROM articles - WHERE uri = ? - """, (url,)) - - return cursor.fetchone() - - def get_run_articles(self, run_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = select( + articles.c.uri, + articles.c.title, + articles.c.sentiment, + articles.c.future_signal, + articles.c.time_to_impact, + articles.c.driver_type, + articles.c.category, + articles.c.sentiment_explanation, + articles.c.future_signal_explanation, + articles.c.time_to_impact_explanation, + articles.c.driver_type_explanation, + articles.c.bias, + articles.c.factual_reporting, + articles.c.mbfc_credibility_rating, + articles.c.bias_country, + articles.c.press_freedom, + articles.c.media_type, + articles.c.popularity, + articles.c.news_source + ).select_from( + articles + .join( + model_bias_arena_articles, model_bias_arena_articles.c.article_uri == articles.c.uri + ) + ).where( + model_bias_arena_articles.c.run_id == run_id + ).order_by( + articles.c.uri + ) + return self.connection.execute(statement).fetchall() - cursor.execute(""" - SELECT article_uri, article_title, article_summary - FROM model_bias_arena_articles - WHERE run_id = ? - ORDER BY id - """, (run_id,)) + def delete_run(self, run_id): + statement = delete(model_bias_arena_runs).where(model_bias_arena_runs.c.id == run_id) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.fetchall() + return result.rowcount - def get_all_bias_evaluation_runs(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() + def get_source_bias_validation_data(self, url): + statement = select( + articles.c.bias, + articles.c.factual_reporting, + articles.c.mbfc_credibility_rating, + articles.c.bias_country, + articles.c.press_freedom, + articles.c.media_type, + articles.c.popularity + ).where(articles.c.uri == url) - cursor.execute(""" - SELECT id, - name, - description, - benchmark_model, - selected_models, - article_count, - rounds, - current_round, - created_at, - completed_at, - status - FROM model_bias_arena_runs - ORDER BY created_at DESC - """) + return self.connection.execute(statement).fetchone() - return cursor.fetchall() + def get_run_articles(self, run_id): + statement = select( + model_bias_arena_articles.c.article_uri, + model_bias_arena_articles.c.article_title, + model_bias_arena_articles.c.article_summary + ).where(model_bias_arena_articles.c.run_id == run_id) + + return self.connection.execute(statement).fetchall() + + def get_all_bias_evaluation_runs(self): + statement = select( + model_bias_arena_articles.c.id, + model_bias_arena_articles.c.name, + model_bias_arena_articles.c.description, + model_bias_arena_articles.c.benchmark_model, + model_bias_arena_articles.c.selected_models, + model_bias_arena_articles.c.article_count, + model_bias_arena_articles.c.rounds, + model_bias_arena_articles.c.current_round, + model_bias_arena_articles.c.created_at, + model_bias_arena_articles.c.completed_at, + model_bias_arena_articles.c.status + ).order_by( + model_bias_arena_articles.c.created_at.desc() + ) + + return self.connection.execute(statement).fetchall() def update_run(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = update(model_bias_arena_runs).where( + model_bias_arena_runs.c.id == params[1] + ).values( + current_round=params[0] + ) - cursor.execute(""" - UPDATE model_bias_arena_runs - SET current_round = ? - WHERE id = ? - """, params) - conn.commit() + self.connection.execute(statement) + self.connection.commit() def get_topics_from_article(self, article_url): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT DISTINCT topic FROM articles WHERE uri = ?", (article_url,)) + statement = select( + articles.c.topic + ).where( + articles.c.uri == article_url + ).distinct() - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def get_run_info(self, run_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT rounds, current_round - FROM model_bias_arena_runs - WHERE id = ? - """, (run_id,)) + statement = select( + model_bias_arena_runs.c.rounds, + model_bias_arena_runs.c.current_round + ).where( + model_bias_arena_runs.c.id == run_id + ) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def add_articles_to_run(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO model_bias_arena_articles - (run_id, article_uri, article_title, article_summary) - VALUES (?, ?, ?, ?) - """, params) - - conn.commit() + statement = insert(model_bias_arena_articles).values( + run_id=params[0], + article_uri=params[1], + article_title=params[2], + article_summary=params[3] + ) + self.connection.execute(statement) + self.connection.commit() def sample_articles(self, count, topic): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = select( + articles.c.uri, + articles.c.title, + articles.c.summary, + articles.c.news_source, + articles.c.topic, + articles.c.category, + articles.c.sentiment, + articles.c.future_signal, + articles.c.time_to_impact, + articles.c.driver_type, + articles.c.bias, + articles.c.factual_reporting, + articles.c.mbfc_credibility_rating, + articles.c.bias_country + ).where( + articles.c.summary.isnot(None), + func.length(articles.c.summary) > 100, + articles.c.analyzed == 1, + articles.c.sentiment.isnot(None), + articles.c.sentiment != '', + articles.c.future_signal.isnot(None), + articles.c.future_signal != '', + articles.c.time_to_impact.isnot(None), + articles.c.time_to_impact != '', + articles.c.driver_type.isnot(None), + articles.c.driver_type != '', + articles.c.category.isnot(None), + articles.c.category != '', + articles.c.news_source.isnot(None), + articles.c.news_source != '' + ) + + if topic: + statement = statement.where( + articles.c.topic == topic + ) - # Only select articles that have all required ontological fields populated (benchmark data) - query = """ - SELECT uri, \ - title, \ - summary, \ - news_source, \ - topic, \ - category, - sentiment, \ - future_signal, \ - time_to_impact, \ - driver_type, - bias, \ - factual_reporting, \ - mbfc_credibility_rating, \ - bias_country - FROM articles - WHERE summary IS NOT NULL - AND LENGTH(summary) > 100 - AND analyzed = 1 - AND sentiment IS NOT NULL \ - AND sentiment != '' - AND future_signal IS NOT NULL AND future_signal != '' - AND time_to_impact IS NOT NULL AND time_to_impact != '' - AND driver_type IS NOT NULL AND driver_type != '' - AND category IS NOT NULL AND category != '' - AND (news_source IS NOT NULL AND news_source != '') \ - """ - params = [] - - if topic: - query += " AND topic = ?" - params.append(topic) - - query += " ORDER BY RANDOM() LIMIT ?" - params.append(count) - - cursor.execute(query, params) + statement = statement.order_by( + func.random() + ).limit(count) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def get_topics_with_article_counts(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT topic, - COUNT(DISTINCT uri) as article_count, - MAX(publication_date) as last_article_date - FROM articles - WHERE topic IS NOT NULL - AND topic != '' - GROUP BY topic - """) - db_topics = {row[0]: {"article_count": row[1], "last_article_date": row[2]} - for row in cursor.fetchall()} - return db_topics + statement = select( + articles.c.topic, + func.count(distinct(articles.c.uri)).label('article_count'), + func.max(articles.c.publication_date).label('last_article_date') + ).where( + articles.c.topic.isnot(None), + articles.c.topic != '', + ) + statement = statement.group_by( + articles.c.topic + ) + + db_topics = {row[0]: {"article_count": row[1], "last_article_date": row[2]} + for row in self.connection.execute(statement).fetchall()} + return db_topics def debug_articles(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM articles") - articles = cursor.fetchall() - return articles + statement = select(articles) + articles = self.connection.execute(statement).fetchall() + return articles def get_rate_limit_status(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT requests_today, last_error - FROM keyword_monitor_status - WHERE id = 1 - """) - row = cursor.fetchone() - - return row + statement = select( + keyword_monitor_status.c.requests_today, + keyword_monitor_status.c.last_error + ).where( + keyword_monitor_status.c.id == 1 + ) + return self.connection.execute(statement).fetchone() def get_monitor_page_keywords(self): - with self.db.get_connection() as conn: - conn.row_factory = sqlite3.Row - cursor = conn.cursor() + statement = select( + keyword_groups.c.id, + keyword_groups.c.name, + keyword_groups.c.topic, + monitored_keywords.c.id.label('keyword_id'), + monitored_keywords.c.keyword + ).select_from( + keyword_groups.join(monitored_keywords, keyword_groups.c.id == monitored_keywords.c.group_id, isouter=True) + ).order_by( + keyword_groups.c.name, + monitored_keywords.c.keyword + ) - # Get keyword groups and their keywords - cursor.execute(""" - SELECT kg.id, - kg.name, - kg.topic, - mk.id as keyword_id, - mk.keyword - FROM keyword_groups kg - LEFT JOIN monitored_keywords mk ON kg.id = mk.group_id - ORDER BY kg.name, mk.keyword - """) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def get_monitored_keywords_for_keyword_alerts_page(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT MAX(last_checked) as last_check_time, - (SELECT check_interval FROM keyword_monitor_settings WHERE id = 1) as check_interval, - (SELECT interval_unit FROM keyword_monitor_settings WHERE id = 1) as interval_unit, - (SELECT last_error FROM keyword_monitor_status WHERE id = 1) as last_error, - (SELECT is_enabled FROM keyword_monitor_settings WHERE id = 1) as is_enabled - FROM monitored_keywords - """) + statement = select( + func.max(monitored_keywords.c.last_checked).label('last_check_time'), + select(keyword_monitor_settings.c.check_interval).where(keyword_monitor_settings.c.id == 1).scalar_subquery().label('check_interval'), + select(keyword_monitor_settings.c.interval_unit).where(keyword_monitor_settings.c.id == 1).scalar_subquery().label('interval_unit'), + select(keyword_monitor_status.c.last_error).where(keyword_monitor_status.c.id == 1).scalar_subquery().label('last_error'), + select(keyword_monitor_settings.c.is_enabled).where(keyword_monitor_settings.c.id == 1).scalar_subquery().label('is_enabled') + ).select_from( + monitored_keywords + ) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def get_all_groups_with_their_alerts_and_status(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - WITH alert_counts AS (SELECT kg.id as group_id, - COUNT(DISTINCT ka.id) as unread_count - FROM keyword_groups kg - LEFT JOIN monitored_keywords mk ON kg.id = mk.group_id - LEFT JOIN keyword_alerts ka ON mk.id = ka.keyword_id AND ka.is_read = 0 - GROUP BY kg.id) - SELECT kg.id, - kg.name, - kg.topic, - ac.unread_count, - (SELECT GROUP_CONCAT(keyword, '||') - FROM monitored_keywords - WHERE group_id = kg.id) as keywords - FROM keyword_groups kg - LEFT JOIN alert_counts ac ON kg.id = ac.group_id - ORDER BY ac.unread_count DESC, kg.name - """) + # CTE for alert counts + alert_counts_cte = select( + keyword_groups.c.id.label('group_id'), + func.count(distinct(keyword_alerts.c.id)).label('unread_count') + ).select_from( + keyword_groups + .join(monitored_keywords, keyword_groups.c.id == monitored_keywords.c.group_id, isouter=True) + .join(keyword_alerts, (monitored_keywords.c.id == keyword_alerts.c.keyword_id) & (keyword_alerts.c.is_read == 0), isouter=True) + ).group_by( + keyword_groups.c.id + ).cte('alert_counts') - return cursor.fetchall() + + # Subquery for keywords + keywords_subq = select( + func.group_concat(monitored_keywords.c.keyword, literal_column('||')) + ).where( + monitored_keywords.c.group_id == keyword_groups.c.id + ).scalar_subquery() - def check_if_keyword_article_matches_table_exists(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() + # Main query + statement = select( + keyword_groups.c.id, + keyword_groups.c.name, + keyword_groups.c.topic, + alert_counts_cte.c.unread_count, + keywords_subq.label('keywords') + ).select_from( + keyword_groups + .join(alert_counts_cte, keyword_groups.c.id == alert_counts_cte.c.group_id, isouter=True) + ).order_by( + alert_counts_cte.c.unread_count.desc(), + keyword_groups.c.name + ) - cursor.execute(""" - SELECT name - FROM sqlite_master - WHERE type = 'table' - AND name = 'keyword_article_matches' - """) + return self.connection.execute(statement).fetchall() - return cursor.fetchone() is not None + def check_if_keyword_article_matches_table_exists(self): + inspector = inspect(self.connection) + return inspector.has_table('keyword_article_matches') def get_keywords_and_articles_for_keywords_alert_page_using_new_structure(self, group_id): with self.db.get_connection() as conn: @@ -2170,329 +2184,294 @@ def get_keywords_and_articles_for_keywords_alert_page_using_new_structure(self, return cursor.fetchall() def get_keywords_and_articles_for_keywords_alert_page_using_old_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT ka.id, - ka.detected_at, - ka.article_uri, - a.title, - a.uri as url, - a.news_source, - a.publication_date, - a.summary, - mk.keyword as matched_keyword - FROM keyword_alerts ka - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - JOIN articles a ON ka.article_uri = a.uri - WHERE mk.group_id = ? - AND ka.is_read = 0 - ORDER BY ka.detected_at DESC - """, (group_id,)) + statement = select( + keyword_alerts.c.id, + keyword_alerts.c.detected_at, + keyword_alerts.c.article_uri, + articles.c.title, + articles.c.uri.label('url'), + articles.c.news_source, + articles.c.publication_date, + articles.c.summary, + monitored_keywords.c.keyword.label('matched_keyword') + ).select_from( + keyword_alerts.join(monitored_keywords, keyword_alerts.c.keyword_id == monitored_keywords.c.id) + .join(articles, keyword_alerts.c.article_uri == articles.c.uri) + ).where( + monitored_keywords.c.group_id == group_id, + keyword_alerts.c.is_read == 0 + ).order_by( + keyword_alerts.c.detected_at.desc() + ) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def get_all_completed_podcasts(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT id, title, created_at, audio_url, transcript - FROM podcasts - WHERE status = 'completed' - ORDER BY created_at DESC LIMIT 50 - """) + statement = select( + podcasts.c.id, + podcasts.c.title, + podcasts.c.created_at, + podcasts.c.audio_url, + podcasts.c.transcript + ).where( + podcasts.c.status == 'completed' + ).order_by( + podcasts.c.created_at.desc() + ).limit(50) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def create_podcast(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO podcasts (id, title, created_at, status, config, article_uris) - VALUES (?, ?, CURRENT_TIMESTAMP, 'processing', ?, ?) - """, params) - conn.commit() + statement = insert(podcasts).values( + id=params[0], + title=params[1], + created_at=func.current_timestamp(), + status= 'processing', + config=params[2], + article_uris=params[3] + ) + self.connection.execute(statement) + self.connection.commit() def update_podcast_status(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - UPDATE podcasts - SET status = ?, - audio_url = ?, - transcript = ? - WHERE id = ? - """, params) - conn.commit() + statement = update(podcasts).where(podcasts.c.id == params[3]).values( + status=params[0], + audio_url=params[1], + transcript=params[2] + ) + self.connection.execute(statement) + self.connection.commit() def get_flow_data(self, topic, timeframe, limit): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - query = ( - "SELECT COALESCE(news_source, 'Unknown') AS source, " - "COALESCE(category, 'Unknown') AS category, " - "COALESCE(sentiment, 'Unknown') AS sentiment, " - "COALESCE(driver_type, 'Unknown') AS driver_type, " - "submission_date " - "FROM articles WHERE 1=1" + statement = select( + func.coalesce(articles.c.news_source, 'Unknown').label('source'), + func.coalesce(articles.c.category, 'Unknown').label('category'), + func.coalesce(articles.c.sentiment, 'Unknown').label('sentiment'), + func.coalesce(articles.c.driver_type, 'Unknown').label('driver_type'), + articles.c.submission_date + ).select_from( + articles + ) + if topic: + statement = statement.where( + articles.c.topic == topic ) - params = [] - - if topic: - query += " AND topic = ?" - params.append(topic) - - if timeframe != "all": - try: - days = int(timeframe) - query += " AND submission_date >= date('now', ?)" - params.append(f'-{days} days') - except ValueError: - self.logger.warning("Invalid timeframe value provided: %s", timeframe) + if timeframe != "all": + try: + days = int(timeframe) + statement = statement.where( + articles.c.submission_date >= datetime.utcnow() - timedelta(days=days) + ) + except ValueError: + self.logger.warning("Invalid timeframe value provided: %s", timeframe) - query += " ORDER BY submission_date DESC LIMIT ?" - params.append(limit) + statement = statement.order_by( + articles.c.submission_date.desc() + ).limit(limit) - cursor.execute(query, params) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def create_keyword_monitor_group(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "INSERT INTO keyword_groups (name, topic) VALUES (?, ?)", - params - ) - conn.commit() + statement = insert(keyword_groups).values( + name=params[0], + topic=params[1] + ) + result =self.connection.execute(statement) + self.connection.commit() - return cursor.lastrowid + return result.inserted_primary_key[0] def create_keyword(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "INSERT INTO monitored_keywords (group_id, keyword) VALUES (?, ?)", - params - ) - conn.commit() + statement = insert(monitored_keywords).values( + group_id=params[0], + keyword=params[1] + ) + self.connection.execute(statement) + self.connection.commit() def delete_keyword(self, keyword_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM monitored_keywords WHERE id = ?", (keyword_id,)) - conn.commit() + statement = delete(monitored_keywords).where(monitored_keywords.c.id == keyword_id) + self.connection.execute(statement) + self.connection.commit() def delete_keyword_group(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM keyword_groups WHERE id = ?", (group_id,)) - conn.commit() + statement = delete(keyword_groups).where(keyword_groups.c.id == group_id) + self.connection.execute(statement) + self.connection.commit() def delete_group_keywords(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - "DELETE FROM monitored_keywords WHERE group_id = ?", - (group_id,) - ) - - conn.commit() + statement = delete(monitored_keywords).where(monitored_keywords.c.group_id == group_id) + self.connection.execute(statement) + self.connection.commit() def create_group(self, group_name, group_topic): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - "INSERT INTO keyword_groups (name, topic) VALUES (?, ?)", - (group_name, group_topic) - ) - - conn.commit() + statement = insert(keyword_groups).values( + name=group_name, + topic=group_topic + ) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.lastrowid + return result.inserted_primary_key[0] def add_keywords_to_group(self, group_id, keyword): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - "INSERT INTO monitored_keywords (group_id, keyword) " - "VALUES (?, ?)", - (group_id, keyword) - ) - - conn.commit() + statement = insert(monitored_keywords).values( + group_id=group_id, + keyword=keyword + ) + self.connection.execute(statement) + self.connection.commit() def get_all_group_ids_associated_to_topic(self, topic_name): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - # First, get all group IDs associated with this topic - cursor.execute("SELECT id FROM keyword_groups WHERE topic = ?", (topic_name,)) - groups = cursor.fetchall() - return groups + statement = select( + keyword_groups.c.id + ).where( + keyword_groups.c.topic == topic_name + ) + return self.connection.execute(statement).fetchall() def get_keyword_ids_associated_to_group(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT id FROM monitored_keywords WHERE group_id = ?", (group_id,)) - return cursor.fetchall() + statement = select( + monitored_keywords.c.id + ).where( + monitored_keywords.c.group_id == group_id + ) + return self.connection.execute(statement).fetchall() def get_keywords_associated_to_group(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = select( + monitored_keywords.c.keyword + ).where( + monitored_keywords.c.group_id == group_id + ) - cursor.execute( - "SELECT keyword FROM monitored_keywords WHERE group_id = ?", - (group_id,) - ) - return [keyword[0] for keyword in cursor.fetchall()] + return [row[0] for row in self.connection.execute(statement).fetchall()] def get_keywords_associated_to_group_ordered_by_keyword(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = select( + monitored_keywords.c.keyword + ).where( + monitored_keywords.c.group_id == group_id + ).order_by( + monitored_keywords.c.keyword + ) - cursor.execute(""" - SELECT keyword - FROM monitored_keywords - WHERE group_id = ? - ORDER BY keyword - """, (group_id,)) - return [keyword[0] for keyword in cursor.fetchall()] + return [row[0] for row in self.connection.execute(statement).fetchall()] def delete_keyword_article_matches_from_new_table_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("DELETE FROM keyword_article_matches WHERE group_id = ?", (group_id,)) - conn.commit() + statement = delete(keyword_article_matches).where(keyword_article_matches.c.group_id == group_id) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def delete_keyword_article_matches_from_old_table_structure(self, ids_str, keyword_ids): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(f"DELETE FROM keyword_alerts WHERE keyword_id IN ({ids_str})", keyword_ids) - conn.commit() + statement = delete(keyword_alerts).where(keyword_alerts.c.keyword_id.in_(keyword_ids)) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def delete_groups_keywords(self, ids_str, group_ids): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(f"DELETE FROM monitored_keywords WHERE group_id IN ({ids_str})", group_ids) - conn.commit() + statement = delete(monitored_keywords).where(monitored_keywords.c.group_id.in_(group_ids)) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def delete_all_keyword_groups(self, topic_name): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM keyword_groups WHERE topic = ?", (topic_name,)) - conn.commit() + statement = delete(keyword_groups).where(keyword_groups.c.topic == topic_name) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def check_if_alert_id_exists_in_new_table_structure(self, alert_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT 1 FROM keyword_article_matches WHERE id = ?", (alert_id,)) + statement = select( + keyword_article_matches.c.id + ).where( + keyword_article_matches.c.id == alert_id + ) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def mark_alert_as_read_or_unread_in_new_table(self, alert_id, read_or_unread): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = update(keyword_article_matches).where(keyword_article_matches.c.id == alert_id).values(is_read = read_or_unread) - cursor.execute( - "UPDATE keyword_article_matches SET is_read = ? WHERE id = ?", - (read_or_unread, alert_id,) - ) - - conn.commit() + self.connection.execute(statement) + self.connection.commit() def mark_alert_as_read_or_unread_in_old_table(self, alert_id, read_or_unread): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - "UPDATE keyword_alerts SET is_read = ? WHERE id = ?", - (read_or_unread, alert_id,) - ) + statement = update(keyword_alerts).where(keyword_alerts.c.id == alert_id).values(is_read = read_or_unread) - conn.commit() + self.connection.execute(statement) + self.connection.commit() def get_number_of_monitored_keywords_by_group_id(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT COUNT(*) FROM monitored_keywords WHERE group_id = ?", (group_id,)) + statement = select( + func.count() + ).select_from( + monitored_keywords + ).where( + monitored_keywords.c.group_id == group_id + ) - return cursor.fetchone()[0] + return self.connection.execute(statement).fetchone().scalar() def get_total_number_of_keywords(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT COUNT(*) FROM monitored_keywords") + statement = select( + func.count() + ).select_from( + monitored_keywords + ) - return cursor.fetchone()[0] + return self.connection.execute(statement).fetchone().scalar() def get_alerts(self, show_read): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - # Modify the query to optionally include read articles - read_condition = "" if show_read else "AND ka.is_read = 0" + statement = select( + keyword_alerts, + articles, + monitored_keywords.c.keyword.label('matched_keyword') + ).select_from( + keyword_alerts.join( + articles, + keyword_alerts.c.article_uri == articles.c.uri + ).join( + monitored_keywords, + keyword_alerts.c.keyword_id == monitored_keywords.c.id + ) + ).order_by( + keyword_alerts.c.detected_at.desc() + ).limit(100) - cursor.execute(f""" - SELECT ka.*, a.*, mk.keyword as matched_keyword - FROM keyword_alerts ka - JOIN articles a ON ka.article_uri = a.uri - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - WHERE 1=1 {read_condition} - ORDER BY ka.detected_at DESC - LIMIT 100 - """) + if not show_read: + statement = statement.where( + keyword_alerts.c.is_read == 0 + ) - columns = [column[0] for column in cursor.description] + columns = [column.name for column in statement.columns] - return columns, cursor.fetchall() + return columns, self.connection.execute(statement).fetchall() def get_article_enrichment(self, article_data): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT category, - sentiment, - driver_type, - time_to_impact, - topic_alignment_score, - keyword_relevance_score, - confidence_score, - overall_match_explanation, - extracted_article_topics, - extracted_article_keywords, - auto_ingested, - ingest_status, - quality_score, - quality_issues - FROM articles - WHERE uri = ? - """, (article_data["uri"],)) + statement = select( + articles.c.category, + articles.c.sentiment, + articles.c.driver_type, + articles.c.time_to_impact, + articles.c.topic_alignment_score, + articles.c.keyword_relevance_score, + articles.c.confidence_score, + articles.c.overall_match_explanation, + articles.c.extracted_article_topics, + articles.c.extracted_article_keywords, + articles.c.auto_ingested, + articles.c.ingest_status, + articles.c.quality_score, + articles.c.quality_issues + ).where(articles.c.uri == article_data["uri"]) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def get_all_groups_with_alerts_and_status_new_table_structure(self): with self.db.get_connection() as conn: @@ -2583,230 +2562,258 @@ def get_all_groups_with_alerts_and_status_old_table_structure(self): return cursor.fetchall() def get_most_recent_unread_alerts_for_group_id_new_table_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT ka.id, - ka.article_uri, - ka.keyword_ids, - NULL as matched_keyword, - ka.is_read, - ka.detected_at, - a.title, - a.summary, - a.uri, - a.news_source, - a.publication_date, - a.topic_alignment_score, - a.keyword_relevance_score, - a.confidence_score, - a.overall_match_explanation, - a.extracted_article_topics, - a.extracted_article_keywords - FROM keyword_article_matches ka - JOIN articles a ON ka.article_uri = a.uri - WHERE ka.group_id = ? - AND ka.is_read = 0 - ORDER BY ka.detected_at DESC LIMIT 25 - """, (group_id,)) + statement = select( + keyword_article_matches.c.id, + keyword_article_matches.c.article_uri, + keyword_article_matches.c.keyword_ids, + literal(None).label("matched_keyword"), + keyword_article_matches.c.is_read, + keyword_article_matches.c.detected_at, + articles.c.title, + articles.c.summary, + articles.c.uri, + articles.c.news_source, + articles.c.publication_date, + articles.c.topic_alignment_score, + articles.c.keyword_relevance_score, + articles.c.confidence_score, + articles.c.overall_match_explanation, + articles.c.extracted_article_topics, + articles.c.extracted_article_keywords + ).select_from( + keyword_article_matches.join( + articles, + keyword_article_matches.c.article_uri == articles.c.uri + ) + ).where( + keyword_article_matches.c.group_id == group_id, + keyword_article_matches.c.is_read == 0 + ).order_by( + keyword_article_matches.c.detected_at.desc() + ).limit(25) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def get_most_recent_unread_alerts_for_group_id_old_table_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT ka.id, - ka.article_uri, - ka.keyword_id, - mk.keyword as matched_keyword, - ka.read as is_read, - ka.detected_at, - a.title, - a.summary, - a.uri, - a.source as news_source, - a.publication_date, - a.topic_alignment_score, - a.keyword_relevance_score, - a.confidence_score, - a.overall_match_explanation, - a.extracted_article_topics, - a.extracted_article_keywords - FROM keyword_alerts ka - JOIN articles a ON ka.article_uri = a.uri - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - WHERE mk.group_id = ? - AND ka.read = 0 - ORDER BY ka.detected_at DESC LIMIT 25 - """, (group_id,)) + statement = select( + keyword_alerts.c.id, + keyword_alerts.c.article_uri, + keyword_alerts.c.keyword_id, + monitored_keywords.c.keyword.label("matched_keyword"), + keyword_alerts.c.is_read, + keyword_alerts.c.detected_at, + articles.c.title, + articles.c.summary, + articles.c.uri, + articles.c.news_source, + articles.c.publication_date, + articles.c.topic_alignment_score, + articles.c.keyword_relevance_score, + articles.c.confidence_score, + articles.c.overall_match_explanation, + articles.c.extracted_article_topics, + articles.c.extracted_article_keywords + ).select_from( + keyword_alerts.join( + articles, + keyword_alerts.c.article_uri == articles.c.uri + ).join( + monitored_keywords, + keyword_alerts.c.keyword_id == monitored_keywords.c.id + ) + ).where( + monitored_keywords.c.group_id == group_id, + keyword_alerts.c.is_read == 0 + ).order_by( + keyword_alerts.c.detected_at.desc() + ).limit(25) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def count_total_group_unread_articles_new_table_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT COUNT(*) - FROM keyword_article_matches ka - JOIN articles a ON ka.article_uri = a.uri - WHERE ka.group_id = ? - AND ka.is_read = 0 - """, (group_id,)) + statement = select( + func.count() + ).select_from( + keyword_article_matches.join( + articles, + keyword_article_matches.c.article_uri == articles.c.uri + ) + ).where( + keyword_article_matches.c.group_id == group_id, + keyword_article_matches.c.is_read == 0 + ) - return cursor.fetchone()[0] + return self.connection.execute(statement).fetchone().scalar() def count_total_group_unread_articles_old_table_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT COUNT(*) - FROM keyword_alerts ka - JOIN articles a ON ka.article_uri = a.uri - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - WHERE mk.group_id = ? - AND ka.read = 0 - """, (group_id,)) - - return cursor.fetchone()[0] + statement = select( + func.count() + ).select_from( + keyword_alerts.join( + articles, + keyword_alerts.c.article_uri == articles.c.uri + ).join( + monitored_keywords, + keyword_alerts.c.keyword_id == monitored_keywords.c.id + ) + ).where( + monitored_keywords.c.group_id == group_id, + keyword_alerts.c.is_read == 0 + ) def get_all_matched_keywords_for_article_and_group(self, placeholders, keyword_id_list_and_group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(f""" - SELECT DISTINCT keyword - FROM monitored_keywords - WHERE id IN ({placeholders}) AND group_id = ? - """, keyword_id_list_and_group_id) - - return [kw[0] for kw in cursor.fetchall()] + statement = select( + monitored_keywords.c.keyword + ).where( + monitored_keywords.c.id.in_(keyword_id_list_and_group_id[:-1]), + monitored_keywords.c.group_id == keyword_id_list_and_group_id[-1] + ).distinct() + + return [row[0] for row in self.connection.execute(statement).fetchall()] def get_all_matched_keywords_for_article_and_group_by_article_url_and_group_id(self, article_url, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT DISTINCT mk.keyword - FROM keyword_alerts ka - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - WHERE ka.article_uri = ? - AND mk.group_id = ? - """, (article_url, group_id)) - - return [kw[0] for kw in cursor.fetchall()] + statement = select( + monitored_keywords.c.keyword + ).select_from( + keyword_alerts.join( + monitored_keywords, + keyword_alerts.c.keyword_id == monitored_keywords.c.id + ) + ).where( + keyword_alerts.c.article_uri == article_url, + monitored_keywords.c.group_id == group_id + ).distinct() + + return [row[0] for row in self.connection.execute(statement).fetchall()] def get_article_enrichment_by_article_url(self, article_url): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = select( + articles.c.category, + articles.c.sentiment, + articles.c.driver_type, + articles.c.time_to_impact, + articles.c.topic_alignment_score, + articles.c.keyword_relevance_score, + articles.c.confidence_score, + articles.c.overall_match_explanation, + articles.c.extracted_article_topics, + articles.c.extracted_article_keywords + ).where(articles.c.uri == article_url) + + return self.connection.execute(statement).fetchone() - cursor.execute(""" - SELECT category, - sentiment, - driver_type, - time_to_impact, - topic_alignment_score, - keyword_relevance_score, - confidence_score, - overall_match_explanation, - extracted_article_topics, - extracted_article_keywords - FROM articles - WHERE uri = ? - """, (article_url,)) + def create_keyword_monitor_table_if_not_exists_and_insert_default_value(self): + # TODO: Move to migrations. - return cursor.fetchone() + # Check if the keyword_monitor_status table has a row with id 1 + statement = select(keyword_monitor_status).where(keyword_monitor_status.c.id == 1) + existing = self.connection.execute(statement).fetchone() - def create_keyword_monitor_table_if_not_exists_and_insert_default_value(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - # TODO: Move to migrations. - cursor.execute(""" - INSERT - OR IGNORE INTO keyword_monitor_status (id, requests_today) - VALUES (1, 0) - """) - conn.commit() + if not existing: + statement = insert(keyword_monitor_status).values( + id = 1, + requests_today = 0 + ) + self.connection.execute(statement) + self.connection.commit() def check_keyword_monitor_status_and_settings_tables(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT * FROM keyword_monitor_status WHERE id = 1") - status_data = cursor.fetchone() + status_data_stmt = select(keyword_monitor_status).where(keyword_monitor_status.c.id == 1) + status_data = self.connection.execute(status_data_stmt).fetchone() - cursor.execute("SELECT * FROM keyword_monitor_settings WHERE id = 1") - settings_data = cursor.fetchone() + settings_data_stmt = select(keyword_monitor_settings).where(keyword_monitor_settings.c.id == 1) + settings_data = self.connection.execute(settings_data_stmt).fetchone() - return status_data, settings_data + return status_data, settings_data def get_count_of_monitored_keywords(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT COUNT(*) - FROM monitored_keywords mk - WHERE EXISTS (SELECT 1 - FROM keyword_groups kg - WHERE kg.id = mk.group_id) - """) + statement = select(func.count()).select_from( + monitored_keywords + ).where( + exists( + select(1).select_from( + keyword_groups + ).where( + keyword_groups.c.id == monitored_keywords.c.group_id + ) + ) + ) + + return self.connection.execute(statement).fetchone().scalar() - return cursor.fetchone()[0] def get_settings_and_status_together(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() + kms_subq = select( + keyword_monitor_status.c.id, + keyword_monitor_status.c.requests_today, + keyword_monitor_status.c.last_error + ).where( + keyword_monitor_status.c.id == 1, + keyword_monitor_status.c.last_reset_date == func.current_date() + ).subquery() - cursor.execute(""" - SELECT s.check_interval, - s.interval_unit, - s.search_fields, - s.language, - s.sort_by, - s.page_size, - s.daily_request_limit, - s.is_enabled, - s.provider, - COALESCE(s.auto_ingest_enabled, FALSE) as auto_ingest_enabled, - COALESCE(s.min_relevance_threshold, 0.0) as min_relevance_threshold, - COALESCE(s.quality_control_enabled, TRUE) as quality_control_enabled, - COALESCE(s.auto_save_approved_only, FALSE) as auto_save_approved_only, - COALESCE(s.default_llm_model, 'gpt-4o-mini') as default_llm_model, - COALESCE(s.llm_temperature, 0.1) as llm_temperature, - COALESCE(s.llm_max_tokens, 1000) as llm_max_tokens, - COALESCE(kms.requests_today, 0) as requests_today, - kms.last_error - FROM keyword_monitor_settings s - LEFT JOIN (SELECT id, requests_today, last_error - FROM keyword_monitor_status - WHERE id = 1 - AND last_reset_date = date ('now') ) kms - ON kms.id = 1 - WHERE s.id = 1 - """) + statement = select( + keyword_monitor_settings.c.check_interval, + keyword_monitor_settings.c.interval_unit, + keyword_monitor_settings.c.search_fields, + keyword_monitor_settings.c.language, + keyword_monitor_settings.c.sort_by, + keyword_monitor_settings.c.page_size, + keyword_monitor_settings.c.daily_request_limit, + keyword_monitor_settings.c.is_enabled, + keyword_monitor_settings.c.provider, + func.coalesce(keyword_monitor_settings.c.auto_ingest_enabled, False).label("auto_ingest_enabled"), + func.coalesce(keyword_monitor_settings.c.min_relevance_threshold, 0.0).label("min_relevance_threshold"), + func.coalesce(keyword_monitor_settings.c.quality_control_enabled, True).label("quality_control_enabled"), + func.coalesce(keyword_monitor_settings.c.auto_save_approved_only, False).label("auto_save_approved_only"), + func.coalesce(keyword_monitor_settings.c.default_llm_model, "gpt-4o-mini").label("default_llm_model"), + func.coalesce(keyword_monitor_settings.c.llm_temperature, 0.1).label("llm_temperature"), + func.coalesce(keyword_monitor_settings.c.llm_max_tokens, 1000).label("llm_max_tokens"), + func.coalesce(kms_subq.c.requests_today, 0).label("requests_today"), + kms_subq.c.last_error + ).select_from( + keyword_monitor_settings.join( + kms_subq, + kms_subq.c.id == 1, + isouter=True + ) + ).where(keyword_monitor_settings.c.id == 1) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def update_or_insert_keyword_monitor_settings(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT OR REPLACE INTO keyword_monitor_settings ( - id, check_interval, interval_unit, search_fields, - language, sort_by, page_size, daily_request_limit, provider, - auto_ingest_enabled, min_relevance_threshold, quality_control_enabled, - auto_save_approved_only, default_llm_model, llm_temperature, llm_max_tokens - ) VALUES ( - 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? - ) - """, params) + values_dict = { + "id": 1, + "check_interval": params[0], + "interval_unit": params[1], + "search_fields": params[2], + "language": params[3], + "sort_by": params[4], + "page_size": params[5], + "daily_request_limit": params[6], + "provider": params[7], + "auto_ingest_enabled": params[8], + "min_relevance_threshold": params[9], + "quality_control_enabled": params[10], + "auto_save_approved_only": params[11], + "default_llm_model": params[12], + "llm_temperature": params[13], + "llm_max_tokens": params[14] + } + + stmt = ( + update(keyword_monitor_settings) + .where(keyword_monitor_settings.c.id == 1) + .values(**values_dict) + if self.connection.execute( + select(keyword_monitor_settings).where(keyword_monitor_settings.c.id == 1) + ).fetchone() + else insert(keyword_monitor_settings).values(**values_dict) + ) + + self.connection.execute(stmt) + self.connection.commit() - conn.commit() def get_trends(self): with self.db.get_connection() as conn: @@ -2845,64 +2852,51 @@ def get_trends(self): return cursor.fetchall() def topic_exists(self, topic): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - "SELECT 1 FROM articles WHERE topic = ? LIMIT 1", - (topic,) - ) + statement = select( + articles.c.topic + ).where(articles.c.topic == topic).limit(1) - return cursor.fetchone() is not None + return self.connection.execute(statement).fetchone() is not None def get_keyword_group_id_by_name_and_topic(self, group_name, topic_name): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - "SELECT id FROM keyword_groups WHERE name = ? AND topic = ?", - (group_name, topic_name) - ) + statement = select( + keyword_groups.c.id + ).where( + keyword_groups.c.name == group_name, + keyword_groups.c.topic == topic_name + ) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def toggle_polling(self, toggle): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = select( + keyword_monitor_settings.c.id + ).where( + keyword_monitor_settings.c.id == 1 + ) - # First check if settings exist - cursor.execute("SELECT 1 FROM keyword_monitor_settings WHERE id = 1") - exists = cursor.fetchone() is not None - - if exists: - # Just update is_enabled if settings exist - cursor.execute(""" - UPDATE keyword_monitor_settings - SET is_enabled = ? - WHERE id = 1 - """, (toggle.enabled,)) - else: - # Insert with defaults if no settings exist - cursor.execute(""" - INSERT INTO keyword_monitor_settings (id, - check_interval, - interval_unit, - search_fields, - language, - sort_by, - page_size, - is_enabled) - VALUES (1, - 15, - 60, - 'title,description,content', - 'en', - 'publishedAt', - 10, - ?) - """, (toggle.enabled,)) - - conn.commit() + # First check if settings exist + settings_exists = self.connection.execute(statement).fetchone() is not None + + if settings_exists: + # Just update is_enabled if settings exist + statement = update(keyword_monitor_settings).where(keyword_monitor_settings.c.id == 1).values(is_enabled = toggle.enabled) + self.connection.execute(statement) + else: + # Insert with defaults if no settings exist + statement = insert(keyword_monitor_settings).values( + id = 1, + check_interval = 15, + interval_unit = 60, + search_fields = 'title,description,content', + language = 'en', + sort_by = 'publishedAt', + page_size = 10, + is_enabled = toggle.enabled + ) + self.connection.execute(statement) + + self.connection.commit() def get_all_alerts_for_export_new_table_structure(self): with self.db.get_connection() as conn: @@ -2929,26 +2923,31 @@ def get_all_alerts_for_export_new_table_structure(self): return cursor.fetchall() def get_all_alerts_for_export_old_table_structure(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT kg.name as group_name, - kg.topic, - a.title, - a.news_source, - a.uri, - a.publication_date, - mk.keyword as matched_keyword, - ka.detected_at - FROM keyword_alerts ka - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - JOIN keyword_groups kg ON mk.group_id = kg.id - JOIN articles a ON ka.article_uri = a.uri - ORDER BY ka.detected_at DESC - """) - - return cursor.fetchall() + statement = select( + keyword_groups.c.name.label("group_name"), + keyword_groups.c.topic, + articles.c.title, + articles.c.news_source, + articles.c.uri, + articles.c.publication_date, + monitored_keywords.c.keyword.label("matched_keyword"), + keyword_alerts.c.detected_at + ).select_from( + keyword_alerts.join( + monitored_keywords, + keyword_alerts.c.keyword_id == monitored_keywords.c.id + ).join( + keyword_groups, + monitored_keywords.c.group_id == keyword_groups.c.id + ).join( + articles, + keyword_alerts.c.article_uri == articles.c.uri + ) + ).order_by( + keyword_alerts.c.detected_at.desc() + ) + + return self.connection.execute(statement).fetchall() def get_all_group_and_topic_alerts_for_export_new_table_structure(self, group_id, topic): with self.db.get_connection() as conn: @@ -2978,50 +2977,49 @@ def get_all_group_and_topic_alerts_for_export_new_table_structure(self, group_id return cursor.fetchall() def get_all_group_and_topic_alerts_for_export_old_table_structure(self, group_id, topic): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - # Use the original table structure - cursor.execute(""" - SELECT kg.name as group_name, - kg.topic, - a.title, - a.news_source, - a.uri, - a.publication_date, - mk.keyword as matched_keyword, - ka.detected_at, - ka.is_read - FROM keyword_alerts ka - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - JOIN keyword_groups kg ON mk.group_id = kg.id - JOIN articles a ON ka.article_uri = a.uri - WHERE kg.id = ? - AND kg.topic = ? - ORDER BY ka.detected_at DESC - """, (group_id, topic)) + statement = select( + keyword_groups.c.name.label("group_name"), + keyword_groups.c.topic, + articles.c.title, + articles.c.news_source, + articles.c.uri, + articles.c.publication_date, + monitored_keywords.c.keyword.label("matched_keyword"), + keyword_alerts.c.detected_at, + keyword_alerts.c.is_read + ).select_from( + keyword_alerts.join( + monitored_keywords, + keyword_alerts.c.keyword_id == monitored_keywords.c.id + ).join( + keyword_groups, + monitored_keywords.c.group_id == keyword_groups.c.id + ).join( + articles, + keyword_alerts.c.article_uri == articles.c.uri + ) + ).where( + keyword_groups.c.id == group_id, + keyword_groups.c.topic == topic + ).order_by( + keyword_alerts.c.detected_at.desc() + ) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def save_keyword_alert(self, article_data): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT - OR IGNORE INTO keyword_alert_articles - (url, title, summary, source, topic, keywords) - VALUES (?, ?, ?, ?, ?, ?) - """, ( - article_data['url'], - article_data['title'], - article_data['summary'], - article_data['source'], - article_data['topic'], - ','.join(article_data['matched_keywords']) - )) - - conn.commit() + is_keyword_alert_exists = self.connection.execute(select(keyword_alert_articles).where(keyword_alert_articles.c.url == article_data['url'])).fetchone() + if not is_keyword_alert_exists: + statement = insert(keyword_alert_articles).values( + url = article_data['url'], + title = article_data['title'], + summary = article_data['summary'], + source = article_data['source'], + topic = article_data['topic'], + keywords = ','.join(article_data['matched_keywords']) + ) + self.connection.execute(statement) + self.connection.commit() def get_alerts_by_group_id_from_new_table_structure(self, status, show_read, group_id, page_size, offset): # Create base statement. @@ -3100,197 +3098,228 @@ def get_alerts_by_group_id_from_new_table_structure(self, status, show_read, gro return self.connection.execute(statement).fetchall() def get_alerts_by_group_id_from_old_table_structure(self, status, show_read, group_id, page_size, offset): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - read_condition = "" if show_read else "AND ka.is_read = 0" - - # Add status filter condition - status_condition = "" - if status == "new": - status_condition = "AND (a.category IS NULL OR a.category = '')" - elif status == "added": - status_condition = "AND (a.category IS NOT NULL AND a.category != '')" - - cursor.execute(f""" - SELECT - ka.id, - ka.article_uri, - ka.keyword_id, - mk.keyword as matched_keyword, - ka.is_read, - ka.detected_at, - a.title, - a.summary, - a.uri, - a.news_source, - a.publication_date, - a.topic_alignment_score, - a.keyword_relevance_score, - a.confidence_score, - a.overall_match_explanation, - a.extracted_article_topics, - a.extracted_article_keywords, - a.category, - a.sentiment, - a.driver_type, - a.time_to_impact, - a.future_signal, - a.bias, - a.factual_reporting, - a.mbfc_credibility_rating, - a.bias_country, - a.press_freedom, - a.media_type, - a.popularity, - a.auto_ingested, - a.ingest_status, - a.quality_score, - a.quality_issues - FROM keyword_alerts ka - JOIN articles a ON ka.article_uri = a.uri - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - WHERE mk.group_id = ? {read_condition} {status_condition} - ORDER BY ka.detected_at DESC - LIMIT ? OFFSET ? - """, (group_id, page_size, offset)) + statement = select( + keyword_alerts.c.id, + keyword_alerts.c.article_uri, + keyword_alerts.c.keyword_id, + monitored_keywords.c.keyword.label("matched_keyword"), + keyword_alerts.c.is_read, + keyword_alerts.c.detected_at, + articles.c.title, + articles.c.summary, + articles.c.uri, + articles.c.news_source, + articles.c.publication_date, + articles.c.topic_alignment_score, + articles.c.keyword_relevance_score, + articles.c.confidence_score, + articles.c.overall_match_explanation, + articles.c.extracted_article_topics, + articles.c.extracted_article_keywords, + articles.c.category, + articles.c.sentiment, + articles.c.driver_type, + articles.c.time_to_impact, + articles.c.future_signal, + articles.c.bias, + articles.c.factual_reporting, + articles.c.mbfc_credibility_rating, + articles.c.bias_country, + articles.c.press_freedom, + articles.c.media_type, + articles.c.popularity, + articles.c.auto_ingested, + articles.c.ingest_status, + articles.c.quality_score, + articles.c.quality_issues + ).select_from( + keyword_alerts.join( + articles, + keyword_alerts.c.article_uri == articles.c.uri + ).join( + monitored_keywords, + keyword_alerts.c.keyword_id == monitored_keywords.c.id + ) + ).where( + monitored_keywords.c.group_id == group_id + ).order_by( + keyword_alerts.c.detected_at.desc() + ) - return cursor.fetchall() + # Add read filter condition + if not show_read: + statement = statement.where( + keyword_alerts.c.is_read == 0 + ) - def count_unread_articles_by_group_id_from_new_table_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() + # Add status filter condition + if status == "new": + statement = statement.where( + or_( + articles.c.category.is_(None), + articles.c.category == '' + ) + ) + elif status == "added": + statement = statement.where( + or_( + articles.c.category.is_not(None), + articles.c.category != '' + ) + ) + + # Add pagination + statement = statement.limit(page_size).offset(offset) - cursor.execute(""" - SELECT COUNT(ka.id) - FROM keyword_article_matches ka - WHERE ka.group_id = ? - AND ka.is_read = 0 - """, (group_id,)) + return self.connection.execute(statement).fetchall() - return cursor.fetchone()[0] + def count_unread_articles_by_group_id_from_new_table_structure(self, group_id): + statement = select( + func.count(keyword_article_matches.c.id) + ).select_from( + keyword_article_matches + ).where( + keyword_article_matches.c.group_id == group_id, + keyword_article_matches.c.is_read == 0 + ) + + return self.connection.execute(statement).fetchone().scalar() def count_unread_articles_by_group_id_from_old_table_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT COUNT(ka.id) - FROM keyword_alerts ka - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - WHERE mk.group_id = ? - AND ka.is_read = 0 - """, (group_id,)) - - return cursor.fetchone()[0] + statement = select( + func.count(keyword_alerts.c.id) + ).select_from( + keyword_alerts + ).join( + monitored_keywords, + keyword_alerts.c.keyword_id == monitored_keywords.c.id + ).where( + monitored_keywords.c.group_id == group_id, + keyword_alerts.c.is_read == 0 + ) + + return self.connection.execute(statement).fetchone().scalar() def count_total_articles_by_group_id_from_new_table_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT COUNT(ka.id) - FROM keyword_article_matches ka - WHERE ka.group_id = ? - """, (group_id,)) + statement = select( + func.count(keyword_article_matches.c.id) + ).select_from( + keyword_article_matches + ).where( + keyword_article_matches.c.group_id == group_id + ) - return cursor.fetchone()[0] + return self.connection.execute(statement).fetchone().scalar() def count_total_articles_by_group_id_from_old_table_structure(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT COUNT(ka.id) - FROM keyword_alerts ka - JOIN monitored_keywords mk ON ka.keyword_id = mk.id - WHERE mk.group_id = ? - """, (group_id,)) + statement = select( + func.count(keyword_alerts.c.id) + ).select_from( + keyword_alerts.join( + monitored_keywords, + keyword_alerts.c.keyword_id == monitored_keywords.c.id + ) + ).where( + monitored_keywords.c.group_id == group_id + ) - return cursor.fetchone()[0] + return self.connection.execute(statement).fetchone().scalar() def update_media_bias(self, source): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "UPDATE mediabias SET enabled = 1 WHERE source = ?", - source - ) - conn.commit() + statement = update(mediabias).where(mediabias.c.source == source).values(enabled = 1) + self.connection.execute(statement) + self.connection.commit() def get_group_name(self, group_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT name FROM keyword_groups WHERE id = ?", (group_id,)) + statement = select( + keyword_groups.c.name + ).where( + keyword_groups.c.id == group_id + ) - group_row = cursor.fetchone() + group_name = self.connection.execute(statement).fetchone().scalar() - return group_row[0] if group_row else "Unknown Group" + return group_name if group_name else "Unknown Group" def get_article_urls_from_news_search_results_by_topic(self, topic_name): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT article_uri - FROM news_search_results - WHERE topic = ? - """, (topic_name,)) + # TODO: add news_search_results table to database_models.py file!! + statement = select( + news_search_results.c.article_uri + ).where( + news_search_results.c.topic == topic_name + ) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def get_article_urls_from_paper_search_results_by_topic(self, topic_name): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT article_uri - FROM paper_search_results - WHERE topic = ? - """, (topic_name,)) + # TODO: add paper_search_results table to database_models.py file!! + statement = select( + paper_search_results.c.article_uri + ).where( + paper_search_results.c.topic == topic_name + ) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def article_urls_by_topic(self, topic_name): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT uri FROM articles WHERE topic = ?", (topic_name,)) - - return cursor.fetchall() + statement = select( + articles.c.uri + ).where( + articles.c.topic == topic_name + ) - def delete_article_matches_by_url(self, url): - with self.db.get_connection() as conn: - cursor = conn.cursor() + return self.connection.execute(statement).fetchall() - cursor.execute("DELETE FROM keyword_article_matches WHERE article_uri = ?", (url,)) + def delete_article_matches_by_url(self, url): + statement = delete( + keyword_article_matches + ).where( + keyword_article_matches.c.article_uri == url + ) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def delete_keyword_alerts_by_url(self, url): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("DELETE FROM keyword_alerts WHERE article_uri = ?", (url,)) + statement = delete( + keyword_alerts + ).where( + keyword_alerts.c.article_uri == url + ) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def delete_news_search_results_by_topic(self, topic_name): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM news_search_results WHERE topic = ?", (topic_name,)) + statement = delete( + news_search_results + ).where( + news_search_results.c.topic == topic_name + ) + self.connection.execute(statement) + self.connection.commit() def delete_paper_search_results_by_topic(self, topic_name): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM paper_search_results WHERE topic = ?", (topic_name,)) + statement = delete( + paper_search_results + ).where( + paper_search_results.c.topic == topic_name + ) + self.connection.execute(statement) + self.connection.commit() def delete_article_by_url(self, url): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM articles WHERE uri = ?", (url,)) - return cursor.rowcount + statement = delete( + articles + ).where( + articles.c.uri == url + ) + result = self.connection.execute(statement) + self.connection.commit() + + return result.rowcount def check_if_keyword_groups_table_exists(self): with self.db.get_connection() as conn: @@ -3299,818 +3328,687 @@ def check_if_keyword_groups_table_exists(self): return cursor.fetchone() def get_all_topics_referenced_in_keyword_groups(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT DISTINCT topic FROM keyword_groups") + statement = select( + keyword_groups.c.topic + ).distinct() + topics = self.connection.execute(statement).fetchall() - return set(row[0] for row in cursor.fetchall()) + return [row[0] for row in topics] def check_if_articles_table_exists(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='articles'") - return cursor.fetchone() + inspector = inspect(self.connection) + return inspector.has_table('articles') def get_urls_and_topics_from_articles(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT uri, topic - FROM articles - WHERE topic IS NOT NULL - AND topic != '' - """) + statement = select( + articles.c.uri, + articles.c.topic + ).where( + articles.c.topic.isnot(None), + articles.c.topic != '' + ) - return cursor.fetchall() + return self.connection.execute(statement).fetchall() def check_if_news_search_results_table_exists(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='news_search_results'") - return cursor.fetchone() + inspector = inspect(self.connection) + return inspector.has_table('news_search_results') def get_urls_and_topics_from_news_search_results(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT nsr.article_uri, nsr.topic - FROM news_search_results nsr - GROUP BY nsr.article_uri, nsr.topic - """) + statement = select( + news_search_results.c.article_uri, + news_search_results.c.topic + ).group_by( + news_search_results.c.article_uri, + news_search_results.c.topic + ) - cursor.fetchall() + return self.connection.execute(statement).fetchall() def get_urls_and_topics_from_paper_search_results(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT psr.article_uri, psr.topic - FROM paper_search_results psr - GROUP BY psr.article_uri, psr.topic - """) + statement = select( + paper_search_results.c.article_uri, + paper_search_results.c.topic + ).group_by( + paper_search_results.c.article_uri, + paper_search_results.c.topic + ) - cursor.fetchall() + return self.connection.execute(statement).fetchall() def check_if_articles_table_has_topic_column(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = select(articles) + columns = self.connection.execute(statement).mappings().fetchone().keys() - cursor.execute("PRAGMA table_info(articles)") - columns = cursor.fetchall() - - return any(col[1] == 'topic' for col in columns) + return 'topic' in columns def check_if_paper_search_results_table_exists(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='paper_search_results'") - return cursor.fetchone() is not None + inspector = inspect(self.connection) + return inspector.has_table('paper_search_results') def get_orphaned_urls_from_news_results_and_or_paper_results(self, has_news_results, has_paper_results): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - query = """ - SELECT a.uri \ - FROM articles a - WHERE 1 = 1 \ - """ - - if has_news_results: - query += """ AND NOT EXISTS ( - SELECT 1 FROM news_search_results nsr WHERE nsr.article_uri = a.uri - )""" + statement = select( + articles.c.uri + ) - if has_paper_results: - query += """ AND NOT EXISTS ( - SELECT 1 FROM paper_search_results psr WHERE psr.article_uri = a.uri - )""" + if has_news_results: + news_exists = exists( + select(news_search_results.c.article_uri).where(news_search_results.c.article_uri == articles.c.uri) + ) + statement = statement.where( + not_(news_exists) + ) - cursor.execute(query) + if has_paper_results: + paper_exists = exists( + select(paper_search_results).where(paper_search_results.c.article_uri == articles.c.uri) + ) + statement = statement.where( + not_(paper_exists) + ) + + result = self.connection.execute(statement).fetchall() - return (row[0] for row in cursor.fetchall()) + return [row[0] for row in result] def delete_keyword_article_matches_from_new_table_structure_by_url(self, url): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("DELETE FROM keyword_article_matches WHERE article_uri = ?", (url,)) - conn.commit() + statement = delete( + keyword_article_matches + ).where( + keyword_article_matches.c.article_uri == url + ) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def delete_keyword_article_matches_from_old_table_structure_by_url(self, url): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("DELETE FROM keyword_alerts WHERE article_uri = ?", (url,)) - conn.commit() + statement = delete( + keyword_alerts + ).where( + keyword_alerts.c.article_uri == url + ) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def delete_news_search_results_by_article_urls(self, placeholders, batch): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(f"DELETE FROM news_search_results WHERE article_uri IN ({placeholders})", batch) - conn.commit() + statement = delete( + news_search_results + ).where( + news_search_results.c.article_uri.in_(batch) + ) + self.connection.execute(statement) + self.connection.commit() def delete_paper_search_results_by_article_urls(self, placeholders, batch): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(f"DELETE FROM paper_search_results WHERE article_uri IN ({placeholders})", batch) - conn.commit() + statement = delete( + paper_search_results + ).where( + paper_search_results.c.article_uri.in_(batch) + ) + self.connection.execute(statement) + self.connection.commit() def delete_articles_by_article_urls(self, placeholders, batch): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(f"DELETE FROM articles WHERE uri IN ({placeholders})", batch) - conn.commit() + statement = delete( + articles + ).where( + articles.c.uri.in_(batch) + ) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def get_monitor_settings(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT check_interval, - interval_unit, - is_enabled, - search_date_range, - daily_request_limit - FROM keyword_monitor_settings - WHERE id = 1 - """) + statement = select( + keyword_monitor_settings.c.check_interval, + keyword_monitor_settings.c.interval_unit, + keyword_monitor_settings.c.is_enabled, + keyword_monitor_settings.c.search_date_range, + keyword_monitor_settings.c.daily_request_limit + ).where( + keyword_monitor_settings.c.id == 1 + ) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def get_request_count_for_today(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT requests_today, last_reset_date - FROM keyword_monitor_status - WHERE id = 1 - """) + statement = select( + keyword_monitor_status.c.requests_today, + keyword_monitor_status.c.last_reset_date + ).where( + keyword_monitor_status.c.id == 1 + ) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def get_articles_by_url(self, url): - with self.db.get_connection() as conn: - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - cursor.execute("SELECT * FROM articles WHERE uri = ?", (url,)) + statement = select( + articles + ).where( + articles.c.uri == url + ) - return cursor.fetchone() + return self.connection.execute(statement).mappings().fetchone() def get_raw_articles_markdown_by_url(self, url): - with self.db.get_connection() as conn: - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - cursor.execute("SELECT raw_markdown FROM raw_articles WHERE uri = ?", (url,)) + statement = select( + raw_articles.c.raw_markdown + ).where( + raw_articles.c.uri == url + ) - return cursor.fetchone() + return self.connection.execute(statement).mappings().fetchone() def get_podcasts_for_newsletter_inclusion(self, column_names): - with self.db.get_connection() as conn: - cursor = conn.cursor() + # Build a query that works with the available columns + # Base columns we need + select_columns = ["id", "title", "created_at"] + if "audio_url" in column_names: + select_columns.append("audio_url") + if "topic" in column_names: + select_columns.append("topic") - # Build a query that works with the available columns - # Base columns we need - select_columns = ["id", "title", "created_at"] - if "audio_url" in column_names: - select_columns.append("audio_url") - if "topic" in column_names: - select_columns.append("topic") - - # Execute query to get recent podcasts - cursor.execute( - f""" - SELECT {', '.join(select_columns)} - FROM podcasts - ORDER BY created_at DESC - LIMIT 20 - """ - ) + # Execute query to get recent podcasts + statement = select(*select_columns).select_from(podcasts).order_by(podcasts.c.created_at.desc()).limit(20) - podcasts = cursor.fetchall() + podcasts = self.connection.execute(statement).fetchall() - # Format results - result = [] - for podcast in podcasts: - podcast_dict = {} - for i, col in enumerate(select_columns): - podcast_dict[col] = podcast[i] - result.append(podcast_dict) + # Format results + result = [] + for podcast in podcasts: + podcast_dict = {} + for i, col in enumerate(select_columns): + podcast_dict[col] = podcast[i] + result.append(podcast_dict) - return result + return result def generate_tts_podcast(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - """ - INSERT INTO podcasts (id, title, status, created_at, transcript, metadata) - VALUES (?, ?, 'processing', CURRENT_TIMESTAMP, ?, ?) - """, - params, - ) - conn.commit() + statement = insert(podcasts).values( + id=params[0], + title=params[1], + status='processing', + created_at=func.current_timestamp(), + transcript=params[2], + metadata=params[3] + ) + self.connection.execute(statement) + self.connection.commit() def mark_podcast_generation_as_complete(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - """ - UPDATE podcasts - SET status = 'completed', - audio_url = ?, - completed_at = CURRENT_TIMESTAMP, - error = NULL, - metadata = ? - WHERE id = ? - """, - params, - ) - conn.commit() + statement = update(podcasts).where(podcasts.c.id == params[2]).values( + status='completed', + audio_url=params[0], + completed_at=func.current_timestamp(), + error=None, + metadata=params[1] + ) + self.connection.execute(statement) + self.connection.commit() def log_error_generating_podcast(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - """ - UPDATE podcasts - SET status = 'error', - error = ?, - completed_at = CURRENT_TIMESTAMP - WHERE id = ? - """, - params, - ) - conn.commit() + statement = update(podcasts).where(podcasts.c.id == params[1]).values( + status='error', + error=params[0], + completed_at=func.current_timestamp() + ) + self.connection.execute(statement) + self.connection.commit() def test_data_select(self): with self.db.get_connection() as conn: cursor = conn.cursor() - cursor.execute("SELECT 1") - - def get_keyword_monitor_is_enabled_and_daily_request_limit(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT is_enabled, daily_request_limit - FROM keyword_monitor_settings - WHERE id = 1 - """) - - return cursor.fetchone() - - def get_topic_statistics(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT topic, - COUNT(*) as article_count, - strftime('%Y-%m-%dT%H:%M:%S.000Z', - MAX(COALESCE(submission_date, publication_date)) - ) as last_article_date - FROM articles - WHERE topic IS NOT NULL - AND topic != '' - GROUP BY topic - ORDER BY CASE - WHEN MAX (COALESCE (submission_date, publication_date)) IS NULL THEN 1 - ELSE 0 - END - , - last_article_date DESC - """) - - return cursor.fetchall() - - def get_last_check_time_using_timezone_format(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT strftime('%Y-%m-%dT%H:%M:%S.000Z', last_check_time) - FROM keyword_monitor_status - WHERE id = 1 - """) - result = cursor.fetchone() - - return result[0] if result else None - - def get_podcast_transcript(self, podcast_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT title, transcript, metadata - FROM podcasts - WHERE id = ? - """, (podcast_id,)) - - return cursor.fetchone() - - def get_all_podcasts(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT id, - title, - status, - audio_url, - created_at, - completed_at, - error, - transcript, - metadata - FROM podcasts - ORDER BY created_at DESC - """) - - return cursor.fetchall() - - def get_podcast_generation_status(self, podcast_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT id, - title, - status, - audio_url, - created_at, - completed_at, - error, - transcript, - metadata - FROM podcasts - WHERE id = ? - """, (podcast_id,)) + cursor.execute("SELECT 1") - return cursor.fetchone() + def get_keyword_monitor_is_enabled_and_daily_request_limit(self): + statement = select( + keyword_monitor_settings.c.is_enabled, + keyword_monitor_settings.c.daily_request_limit + ).where( + keyword_monitor_settings.c.id == 1 + ) - def get_podcast_audio_file(self, podcast_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() + return self.connection.execute(statement).fetchone() - cursor.execute(""" - SELECT audio_url - FROM podcasts - WHERE id = ? - """, (podcast_id,)) + def get_topic_statistics(self): + last_date = func.max(func.coalesce(articles.c.submission_date, articles.c.publication_date)) - return cursor.fetchone() + stmt = ( + select( + articles.c.topic, + func.count().label("article_count"), + last_date.label("last_article_date"), + ) + .where( + articles.c.topic.isnot(None), + articles.c.topic != "" + ) + .group_by(articles.c.topic) + .order_by( + case((last_date.is_(None), 1), else_=0), + last_date.desc() + ) + ) - def delete_podcast(self, podcast_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() + result = self.connection.execute(stmt).fetchall() - # Delete podcast record - cursor.execute(""" - DELETE - FROM podcasts - WHERE id = ? - """, (podcast_id,)) - conn.commit() + return [(row[0], row[1], row[2].strftime("%Y-%m-%dT%H:%M:%S.000Z") if row[2] else None) for row in result] - def search_for_articles_based_on_query_date_range_and_topic(self, query, topic, start_date, end_date, limit): - with self.db.get_connection() as conn: - cursor = conn.cursor() + def get_last_check_time_using_timezone_format(self): + statement = select(keyword_monitor_status.c.last_check_time).where(keyword_monitor_status.c.id == 1) + + result = self.connection.execute(statement).fetchone() + + return result[0].strftime("%Y-%m-%dT%H:%M:%S.000Z") if result else None + + def get_podcast_transcript(self, podcast_id): + statement = select( + podcasts.c.title, + podcasts.c.transcript, + podcasts.c.metadata + ).where( + podcasts.c.id == podcast_id + ) - # Build search query - search_conditions = [] - params = [] + return self.connection.execute(statement).fetchone() - if query: - # Add fuzzy search on title and summary - search_conditions.append("(title LIKE ? OR summary LIKE ?)") - params.extend([f"%{query}%", f"%{query}%"]) + def get_all_podcasts(self): + statement = select( + podcasts.c.id, + podcasts.c.title, + podcasts.c.status, + podcasts.c.audio_url, + podcasts.c.created_at, + podcasts.c.completed_at, + podcasts.c.error, + podcasts.c.transcript, + podcasts.c.metadata + ).order_by( + podcasts.c.created_at.desc() + ) - if topic: - search_conditions.append("topic = ?") - params.append(topic) + return self.connection.execute(statement).fetchall() - if start_date: - search_conditions.append("publication_date >= ?") - params.append(start_date) + def get_podcast_generation_status(self, podcast_id): + statement = select( + podcasts.c.id, + podcasts.c.title, + podcasts.c.status, + podcasts.c.audio_url, + podcasts.c.created_at, + podcasts.c.completed_at, + podcasts.c.error, + podcasts.c.transcript, + podcasts.c.metadata + ).where( + podcasts.c.id == podcast_id + ) - if end_date: - search_conditions.append("publication_date <= ?") - params.append(end_date) + return self.connection.execute(statement).fetchone() - # Construct the WHERE clause - where_clause = " AND ".join(search_conditions) if search_conditions else "1=1" + def get_podcast_audio_file(self, podcast_id): + statement = select( + podcasts.c.audio_url + ).where( + podcasts.c.id == podcast_id + ) - query = f""" - SELECT * FROM articles - WHERE {where_clause} - ORDER BY publication_date DESC - LIMIT {limit} - """ + return self.connection.execute(statement).fetchone() - cursor.execute(query, params) - articles = cursor.fetchall() + def delete_podcast(self, podcast_id): + statement = delete( + podcasts + ).where( + podcasts.c.id == podcast_id + ) + self.connection.execute(statement) + self.connection.commit() - # Convert to list of dictionaries with column names - column_names = [description[0] for description in cursor.description] - result = [] + def search_for_articles_based_on_query_date_range_and_topic(self, query, topic, start_date, end_date, limit): + statement = select(articles) - for row in articles: - article_dict = dict(zip(column_names, row)) - result.append(article_dict) + if query: + statement = statement.where( + or_( + articles.c.title.ilike(f"%{query}%"), + articles.c.summary.ilike(f"%{query}%") + ) + ) + if topic: + statement = statement.where( + articles.c.topic == topic + ) + if start_date: + statement = statement.where( + articles.c.publication_date >= start_date + ) + if end_date: + statement = statement.where( + articles.c.publication_date <= end_date + ) + statement = statement.order_by( + articles.c.publication_date.desc() + ).limit(limit) - return result + return self.connection.execute(statement).mappings().fetchall() def update_article_by_url(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() + statement = update(articles).where(articles.c.uri == params[6]).values( + topic_alignment_score = params[0], + keyword_relevance_score = params[1], + confidence_score = params[2], + overall_match_explanation = params[3], + extracted_article_topics = params[4], + extracted_article_keywords = params[5] + ) + result = self.connection.execute(statement) + self.connection.commit() - cursor.execute(""" - UPDATE articles - SET topic_alignment_score = ?, - keyword_relevance_score = ?, - confidence_score = ?, - overall_match_explanation = ?, - extracted_article_topics = ?, - extracted_article_keywords = ? - WHERE uri = ? - """, params) - - conn.commit() - return cursor.rowcount + return result.rowcount def enable_or_disable_auto_ingest(self, enabled): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - UPDATE keyword_monitor_settings - SET auto_ingest_enabled = ? - WHERE id = 1 - """, (enabled,)) - - conn.commit() + statement = update(keyword_monitor_settings).where(keyword_monitor_settings.c.id == 1).values( + auto_ingest_enabled = enabled + ) + self.connection.execute(statement) + self.connection.commit() def get_auto_ingest_settings(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT auto_ingest_enabled, - min_relevance_threshold, - quality_control_enabled, - auto_save_approved_only, - default_llm_model, - llm_temperature, - llm_max_tokens - FROM keyword_monitor_settings - WHERE id = 1 - """) + statement = select( + keyword_monitor_settings.c.auto_ingest_enabled, + keyword_monitor_settings.c.min_relevance_threshold, + keyword_monitor_settings.c.quality_control_enabled, + keyword_monitor_settings.c.auto_save_approved_only, + keyword_monitor_settings.c.default_llm_model, + keyword_monitor_settings.c.llm_temperature, + keyword_monitor_settings.c.llm_max_tokens + ).where(keyword_monitor_settings.c.id == 1) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def get_processing_statistics(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT COUNT(*) as total_auto_ingested, - COUNT(CASE WHEN ingest_status = 'approved' THEN 1 END) as approved_count, - COUNT(CASE WHEN ingest_status = 'failed' THEN 1 END) as failed_count, - AVG(quality_score) as avg_quality_score - FROM articles - WHERE auto_ingested = 1 - """) + stmt = ( + select( + func.count().label("total_auto_ingested"), + func.count( + case((articles.c.ingest_status == "approved", 1)) + ).label("approved_count"), + func.count( + case((articles.c.ingest_status == "failed", 1)) + ).label("failed_count"), + func.avg(articles.c.quality_score).label("avg_quality_score"), + ) + .where(articles.c.auto_ingested == 1) + ) - return cursor.fetchone() + return self.connection.execute(stmt).fetchone() def stamp_keyword_monitor_status_table_with_todays_date(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO keyword_monitor_status - (id, requests_today, last_check_time, last_reset_date) - VALUES (1, ?, datetime('now'), ?) ON CONFLICT(id) DO - UPDATE SET - requests_today = excluded.requests_today, - last_check_time = excluded.last_check_time, - last_reset_date = excluded.last_reset_date - """, params) - conn.commit() + # check if the keyword_monitor_status table has a row with id 1 + statement = select(keyword_monitor_status).where(keyword_monitor_status.c.id == 1) + result = self.connection.execute(statement).fetchone() + + if result: + update_statement = update(keyword_monitor_status).where(keyword_monitor_status.c.id == 1).values( + requests_today = params[0], + last_check_time = func.current_timestamp(), + last_reset_date = params[1] + ) + self.connection.execute(update_statement) + self.connection.commit() + else: + insert_statement = insert(keyword_monitor_status).values( + id = 1, + requests_today = params[0], + last_check_time = func.current_timestamp(), + last_reset_date = params[1] + ) + self.connection.execute(insert_statement) + self.connection.commit() def get_keyword_monitor_status_daily_request_limit(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT daily_request_limit FROM keyword_monitor_settings WHERE id = 1") - - return cursor.fetchone() + statement = select(keyword_monitor_settings.c.daily_request_limit).where(keyword_monitor_settings.c.id == 1) + return self.connection.execute(statement).fetchone() #### AUTOMATED INGEST SERVICE #### #### MEDIA BIAS #### def check_if_media_bias_has_updated_at_column(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("PRAGMA table_info(mediabias)") - - return [column[1] for column in cursor.fetchall()] + return [column.name for column in mediabias.columns] def insert_media_bias(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO mediabias (source, country, bias, factual_reporting, - press_freedom, media_type, popularity, - mbfc_credibility_rating, updated_at, enabled) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, 1) ON CONFLICT(source) DO - UPDATE SET - country = excluded.country, - bias = excluded.bias, - factual_reporting = excluded.factual_reporting, - press_freedom = excluded.press_freedom, - media_type = excluded.media_type, - popularity = excluded.popularity, - mbfc_credibility_rating = excluded.mbfc_credibility_rating, - updated_at = CURRENT_TIMESTAMP, - enabled = ? - """, params) - - conn.commit() - - return cursor.lastrowid + # check if the source already exists in the mediabias table + statement = select(mediabias).where(mediabias.c.source == params[0]) + result = self.connection.execute(statement).fetchone() + if result: + statement = update(mediabias).where(mediabias.c.source == params[0]).values( + country = params[1], + bias = params[2], + factual_reporting = params[3], + press_freedom = params[4], + media_type = params[5], + popularity = params[6], + mbfc_credibility_rating = params[7], + updated_at = func.current_timestamp(), + enabled = params[8] + ) + result = self.connection.execute(statement) + self.connection.commit() + return result.rowcount + else: + statement = insert(mediabias).values( + source = params[0], + country = params[1], + bias = params[2], + factual_reporting = params[3], + press_freedom = params[4], + media_type = params[5], + popularity = params[6], + mbfc_credibility_rating = params[7], + updated_at = func.current_timestamp(), + enabled = 1 + ) + result = self.connection.execute(statement) + self.connection.commit() + return result.inserted_primary_key[0] def update_media_bias_source(self, params): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - UPDATE mediabias - SET source = ?, - country = ?, - bias = ?, - factual_reporting = ?, - press_freedom = ?, - media_type = ?, - popularity = ?, - mbfc_credibility_rating = ?, - updated_at = CURRENT_TIMESTAMP, - enabled = ? - WHERE id = ? - """, params) - - conn.commit() + statement = update(mediabias).where(mediabias.c.id == params[9]).values( + source = params[0], + country = params[1], + bias = params[2], + factual_reporting = params[3], + press_freedom = params[4], + media_type = params[5], + popularity = params[6], + mbfc_credibility_rating = params[7], + updated_at = func.current_timestamp(), + enabled = params[8] + ) + self.connection.execute(statement) + self.connection.commit() def drop_media_bias_table(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DROP TABLE IF EXISTS mediabias") + mediabias.drop(self.connection, checkfirst=True) + self.connection.commit() def update_media_bias_settings(self, file_path): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - UPDATE mediabias_settings - SET enabled = 1, - source_file = ?, - last_updated = CURRENT_TIMESTAMP - WHERE id = 1 - """, (file_path,)) - - conn.commit() + statement = update(mediabias_settings).where(mediabias_settings.c.id == 1).values( + enabled = 1, + source_file = file_path, + last_updated = func.current_timestamp() + ) + self.connection.execute(statement) + self.connection.commit() def get_all_media_bias_sources(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT source, - country, - bias, - factual_reporting, - press_freedom, - media_type, - popularity, - mbfc_credibility_rating - FROM mediabias - ORDER BY source ASC - """) - - return cursor.fetchall() + statement = select( + mediabias.c.source, + mediabias.c.country, + mediabias.c.bias, + mediabias.c.factual_reporting, + mediabias.c.press_freedom, + mediabias.c.media_type, + mediabias.c.popularity, + mediabias.c.mbfc_credibility_rating + ).order_by( + mediabias.c.source.asc() + ) + return self.connection.execute(statement).fetchall() def get_media_bias_status(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT enabled, last_updated, source_file - FROM mediabias_settings - WHERE id = 1 - """) + statement = select( + mediabias_settings.c.enabled, + mediabias_settings.c.last_updated, + mediabias_settings.c.source_file + ).where(mediabias_settings.c.id == 1) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def get_media_bias_source(self, source_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT id FROM mediabias WHERE id = ?", (source_id,)) + statement = select( + mediabias.c.id + ).where(mediabias.c.id == source_id) - return cursor.fetchone() + return self.connection.execute(statement).fetchone() def delete_media_bias_source(self, source_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("DELETE FROM mediabias WHERE id = ?", (source_id,)) - - conn.commit() + statement = delete(mediabias).where(mediabias.c.id == source_id) + self.connection.execute(statement) + self.connection.commit() def get_total_media_bias_sources(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("SELECT COUNT(*) FROM mediabias") - - return cursor.fetchone()[0] + statement = select( + func.count() + ).select_from( + mediabias + ) + return self.connection.execute(statement).fetchone().scalar() def enable_media_bias_sources(self, enabled): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - UPDATE mediabias_settings - SET enabled = ?, - last_updated = CURRENT_TIMESTAMP - WHERE id = 1 - """, (1 if enabled else 0,)) - - conn.commit() + statement = update(mediabias_settings).where(mediabias_settings.c.id == 1).values( + enabled = 1 if enabled else 0, + last_updated = func.current_timestamp() + ) + self.connection.execute(statement) + self.connection.commit() def update_media_bias_last_updated(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - UPDATE mediabias_settings - SET last_updated = CURRENT_TIMESTAMP - WHERE id = 1 - """) - - conn.commit() + statement = update(mediabias_settings).where(mediabias_settings.c.id == 1).values( + last_updated = func.current_timestamp() + ) + result = self.connection.execute(statement) + self.connection.commit() - return cursor.rowcount + return result.rowcount def reset_media_bias_sources(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - # Delete all media bias data - cursor.execute("DELETE FROM mediabias") - - # Reset settings but keep enabled state - cursor.execute(""" - UPDATE mediabias_settings - SET last_updated = NULL, - source_file = NULL - WHERE id = 1 - """) + # delete all media bias data + statement = delete(mediabias) + self.connection.execute(statement) - conn.commit() + # Reset settings but keep enabled state + statement = update(mediabias_settings).where(mediabias_settings.c.id == 1).values( + last_updated = None, + source_file = None + ) + self.connection.execute(statement) + self.connection.commit() def enable_media_source(self, source): - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "UPDATE mediabias SET enabled = 1 WHERE source = ?", - (source,) - ) - conn.commit() + statement = update(mediabias).where(mediabias.c.source == source).values( + enabled = 1 + ) + self.connection.execute(statement) + self.connection.commit() def search_media_bias_sources(self, query, bias_filter, factual_filter, country_filter, page, per_page): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - # Build base query - query_parts = ["SELECT * FROM mediabias WHERE 1=1"] - params = [] - - # Add filters - if query: - query_parts.append("AND source LIKE ?") - params.append(f"%{query}%") - - if bias_filter: - query_parts.append("AND bias LIKE ?") - params.append(f"%{bias_filter}%") - - if factual_filter: - query_parts.append("AND factual_reporting LIKE ?") - params.append(f"%{factual_filter}%") - - if country_filter: - query_parts.append("AND country LIKE ?") - params.append(f"%{country_filter}%") - - # Get total count first - count_query = f"SELECT COUNT(*) FROM ({' '.join(query_parts)})" - cursor.execute(count_query, params) - total_count = cursor.fetchone()[0] - - # Add pagination - query_parts.append("ORDER BY source ASC LIMIT ? OFFSET ?") - offset = (page - 1) * per_page - params.extend([per_page, offset]) - - # Get data - cursor.execute(' '.join(query_parts), params) - - return total_count, cursor.fetchall() + # Build base query + base_stmt = select(mediabias) + + # Apply filters + if query: + base_stmt = base_stmt.where(mediabias.c.source.ilike(f"%{query}%")) + if bias_filter: + base_stmt = base_stmt.where(mediabias.c.bias.ilike(f"%{bias_filter}%")) + if factual_filter: + base_stmt = base_stmt.where(mediabias.c.factual_reporting.ilike(f"%{factual_filter}%")) + if country_filter: + base_stmt = base_stmt.where(mediabias.c.country.ilike(f"%{country_filter}%")) + + # Get total count + count_stmt = select(func.count()).select_from(base_stmt.subquery()) + total_count = self.connection.execute(count_stmt).scalar() + + # Add pagination + offset_value = (page - 1) * per_page + paginated_stmt = ( + base_stmt + .order_by(asc(mediabias.c.source)) + .limit(per_page) + .offset(offset_value) + ) + + return total_count, self.connection.execute(paginated_stmt).fetchall() def delete_media_bias_source(self, source_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute("DELETE FROM mediabias WHERE id = ?", (source_id,)) - - conn.commit() + statement = delete(mediabias).where(mediabias.c.id == source_id) + self.connection.execute(statement) + self.connection.commit() def get_media_bias_source_by_id(self, source_id): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute(""" - SELECT id, - source, - country, - bias, - factual_reporting, - press_freedom, - media_type, - popularity, - mbfc_credibility_rating - FROM mediabias - WHERE id = ? - """, (source_id,)) - - return cursor.fetchone() + statement = select( + mediabias.c.id, + mediabias.c.source, + mediabias.c.country, + mediabias.c.bias, + mediabias.c.factual_reporting, + mediabias.c.press_freedom, + mediabias.c.media_type, + mediabias.c.popularity, + mediabias.c.mbfc_credibility_rating + ).where(mediabias.c.id == source_id) + + return self.connection.execute(statement).fetchone() def get_media_bias_filter_options(self): - with self.db.get_connection() as conn: - cursor = conn.cursor() - - # Get unique biases - cursor.execute("SELECT DISTINCT bias FROM mediabias WHERE bias IS NOT NULL AND bias != ''") - biases = [row[0] for row in cursor.fetchall()] + # Get unique biases + biases_statement = select( + mediabias.c.bias + ).where( + mediabias.c.bias.isnot(None), + mediabias.c.bias != '' + ).distinct() - # Get unique factual reporting levels - cursor.execute( - "SELECT DISTINCT factual_reporting FROM mediabias WHERE factual_reporting IS NOT NULL AND factual_reporting != ''") - factual_levels = [row[0] for row in cursor.fetchall()] + biases = [row[0] for row in self.connection.execute(biases_statement).fetchall()] - # Get unique countries - cursor.execute("SELECT DISTINCT country FROM mediabias WHERE country IS NOT NULL AND country != ''") - countries = [row[0] for row in cursor.fetchall()] + # Get unique factual reporting levels + factual_reporting_statement = select( + mediabias.c.factual_reporting + ).where( + mediabias.c.factual_reporting.isnot(None), + mediabias.c.factual_reporting != '' + ).distinct() - return biases, factual_levels, countries + factual_levels = [row[0] for row in self.connection.execute(factual_reporting_statement).fetchall()] - def load_media_bias_sources_from_database(self): - sources = [] - with self.db.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM mediabias") - rows = cursor.fetchall() + # Get unique countries + countries_statement = select( + mediabias.c.country + ).where( + mediabias.c.country.isnot(None), + mediabias.c.country != '' + ).distinct() - # Convert to dictionaries with column names - cursor.execute("PRAGMA table_info(mediabias)") - columns = [col[1] for col in cursor.fetchall()] + countries = [row[0] for row in self.connection.execute(countries_statement).fetchall()] - for row in rows: - source = {} - for i, column in enumerate(columns): - source[column] = row[i] - sources.append(source) + return biases, factual_levels, countries - return sources + def load_media_bias_sources_from_database(self): + return self.connection.execute(select(mediabias)).mappings().fetchall()