diff --git a/app/models/solid_cable/message.rb b/app/models/solid_cable/message.rb index 193d45a..6c60a18 100644 --- a/app/models/solid_cable/message.rb +++ b/app/models/solid_cable/message.rb @@ -6,7 +6,12 @@ class Message < SolidCable::Record where(created_at: ...::SolidCable.message_retention.ago) } scope :broadcastable, lambda { |channels, last_id| - where(channel_hash: channel_hashes_for(channels)). + where(broadcast_to_list: false). + where(channel_hash: channel_hashes_for(channels)). + where(id: (last_id + 1)..).order(:id) + } + scope :broadcastable_to_list, lambda { |channels, last_id| + where(broadcast_to_list: true). where(id: (last_id + 1)..).order(:id) } @@ -16,6 +21,11 @@ def broadcast(channel, payload) channel_hash: channel_hash_for(channel) }) end + def broadcast_list(channel, payload) + insert({ created_at: Time.current, channel:, payload:, + channel_hash: channel_hash_for(channel), broadcast_to_list: true }) + end + def channel_hashes_for(channels) channels.map { |channel| channel_hash_for(channel) } end diff --git a/bench/db/migrate/20241008221317_add_broadcast_to_list.solid_cable.rb b/bench/db/migrate/20241008221317_add_broadcast_to_list.solid_cable.rb new file mode 100644 index 0000000..8539e43 --- /dev/null +++ b/bench/db/migrate/20241008221317_add_broadcast_to_list.solid_cable.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +class AddBroadcastToList < ActiveRecord::Migration[7.2] + def change + add_column :solid_cable_messages, :broadcast_to_list, :boolean, null: false, default: false + end +end diff --git a/bench/db/schema.rb b/bench/db/schema.rb index cbb4611..3bb8190 100644 --- a/bench/db/schema.rb +++ b/bench/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[8.0].define(version: 2024_09_12_235943) do +ActiveRecord::Schema[8.0].define(version: 2024_10_08_221317) do create_table "active_error_faults", force: :cascade do |t| t.integer "cause_id" t.binary "backtrace", limit: 536870912 @@ -48,6 +48,7 @@ t.binary "payload", limit: 536870912, null: false t.datetime "created_at", null: false t.integer "channel_hash", limit: 8, null: false + t.boolean "broadcast_to_list", default: false, null: false t.index ["channel"], name: "index_solid_cable_messages_on_channel" t.index ["channel_hash"], name: "index_solid_cable_messages_on_channel_hash" t.index ["created_at"], name: "index_solid_cable_messages_on_created_at" diff --git a/lib/action_cable/subscription_adapter/solid_cable.rb b/lib/action_cable/subscription_adapter/solid_cable.rb index 401c273..22b2f99 100644 --- a/lib/action_cable/subscription_adapter/solid_cable.rb +++ b/lib/action_cable/subscription_adapter/solid_cable.rb @@ -20,6 +20,12 @@ def broadcast(channel, payload) ::SolidCable::TrimJob.perform_now if ::SolidCable.autotrim? end + def broadcast_list(channel, payload) + ::SolidCable::Message.broadcast_list(channel, payload) + + ::SolidCable::TrimJob.perform_now if ::SolidCable.autotrim? + end + def subscribe(channel, callback, success_callback = nil) listener.add_subscriber(channel, callback, success_callback) end @@ -78,6 +84,7 @@ def invoke_callback(*) private attr_reader :event_loop, :thread attr_writer :running, :last_id + attr_writer :running, :last_list_id def running? if defined?(@running) @@ -88,7 +95,11 @@ def running? end def last_id - @last_id ||= ::SolidCable::Message.maximum(:id) || 0 + @last_id ||= ::SolidCable::Message.where(broadcast_to_list: false).maximum(:id) || 0 + end + + def last_list_id + @last_list_id ||= ::SolidCable::Message.where(broadcast_to_list: true).maximum(:id) || 0 end def channels @@ -101,6 +112,14 @@ def broadcast_messages broadcast(message.channel, message.payload) self.last_id = message.id end + + ::SolidCable::Message.broadcastable_to_list(channels, last_list_id). + each do |message| + find_matching_channels(message.channel, channels).each do |channel| + broadcast(channel, message.payload) + end + self.last_list_id = message.id + end end def with_polling_volume @@ -110,6 +129,27 @@ def with_polling_volume yield end end + + # Returns a list of channels that match the broadcast_to_list nomenclature + # For example, if channel is "posts:1", and channels is ["posts:1-2", "posts:2-3", "posts:1-3"], + # this method will return ["posts:1-2", "post:1-3"]. + # channel attr must have parts separated by ":" and must have at least 2 parts + # channels must have parts separated by ":" and must have at least 2 parts, and in the last part + # which represents the identifiers, they must be separated by "-" + def find_matching_channels(channel, channels) + parts = channel.split(":") + return [] if parts.length == 1 + + id = parts.pop + base_channel = parts.join(":") + + channels.filter_map do |ch| + if ch.start_with?(base_channel) + ids = ch.split(":").last + ch if ids != ch && ids.split("-").include?(id) + end + end + end end end end diff --git a/lib/generators/solid_cable/install/templates/db/cable_schema.rb b/lib/generators/solid_cable/install/templates/db/cable_schema.rb index 2366660..d0f1d8b 100644 --- a/lib/generators/solid_cable/install/templates/db/cable_schema.rb +++ b/lib/generators/solid_cable/install/templates/db/cable_schema.rb @@ -4,6 +4,7 @@ t.binary "payload", limit: 536870912, null: false t.datetime "created_at", null: false t.integer "channel_hash", limit: 8, null: false + t.boolean "broadcast_to_list", default: false, null: false t.index ["channel"], name: "index_solid_cable_messages_on_channel" t.index ["channel_hash"], name: "index_solid_cable_messages_on_channel_hash" t.index ["created_at"], name: "index_solid_cable_messages_on_created_at" diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 229f6a9..d6f80cd 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -16,6 +16,7 @@ t.binary "payload", limit: 536870912, null: false t.datetime "created_at", null: false t.integer "channel_hash", limit: 8, null: false + t.boolean "broadcast_to_list", default: false, null: false t.index ["channel"], name: "index_solid_cable_messages_on_channel" t.index ["channel_hash"], name: "index_solid_cable_messages_on_channel_hash" t.index ["created_at"], name: "index_solid_cable_messages_on_created_at" diff --git a/test/lib/action_cable/subscription_adapter/solid_cable_test.rb b/test/lib/action_cable/subscription_adapter/solid_cable_test.rb index 6dedad8..7e2920a 100644 --- a/test/lib/action_cable/subscription_adapter/solid_cable_test.rb +++ b/test/lib/action_cable/subscription_adapter/solid_cable_test.rb @@ -148,6 +148,16 @@ class ActionCable::SubscriptionAdapter::SolidCableTest < ActionCable::TestCase end end + test "broadcast_list" do + subscribe_as_queue("channel:2-3") do |queue| + subscribe_as_queue("channel:3-4") do |queue2| + @tx_adapter.broadcast_list("channel:2", "hello world") + assert_empty queue2 + end + assert_equal "hello world", queue.pop + end + end + private def cable_config { adapter: "solid_cable", message_retention: "1.second",