class Irc::Bot::Journal::JournalBroker
Attributes
Public Class Methods
# File lib/rbot/journal.rb, line 294 def initialize(opts={}) # overrides the internal consumer with a block @consumer = opts[:consumer] # storage backend @storage = opts[:storage] unless @storage warning 'journal broker: no storage set up, won\'t persist messages' end @queue = Queue.new # consumer thread: @thread = Thread.new do while message = @queue.pop begin consume message # pop(true) ... rescue ThreadError => e rescue Exception => e error 'journal broker: exception in consumer thread' error $! end end end @subscriptions = [] # lookup-table for subscriptions by their topic @topic_subs = {} end
Public Instance Methods
# File lib/rbot/journal.rb, line 320 def consume(message) return unless message @consumer.call(message) if @consumer # notify subscribers if @topic_subs.has_key? message.topic @topic_subs[message.topic].each do |s| s.block.call(message) end end @storage.insert(message) if @storage end
# File lib/rbot/journal.rb, line 407 def count(query=nil) unless query.is_a? Query query = Query.define(query) end @storage.count(query) end
# File lib/rbot/journal.rb, line 421 def ensure_payload_index(key) @storage.ensure_payload_index(key) end
Find and return persisted messages by a query.
This method will either return all messages or call the provided block for each message. It will filter the messages by the provided Query
instance. Limit and offset might be used to constrain the result. The query might also be a hash or proc that is passed to Query.define
first.
@param query [Query] @param limit [Integer] how many items to return @param offset [Integer] relative offset in results
# File lib/rbot/journal.rb, line 396 def find(query, limit=100, offset=0, &block) unless query.is_a? Query query = Query.define(query) end if block_given? @storage.find(query, limit, offset, &block) else @storage.find(query, limit, offset) end end
# File lib/rbot/journal.rb, line 334 def persists? true if @storage end
# File lib/rbot/journal.rb, line 347 def publish(topic, payload) debug 'journal publish message in %s: %s' % [topic, payload.inspect] @queue << JournalMessage::create(topic, payload) nil end
# File lib/rbot/journal.rb, line 414 def remove(query=nil) unless query.is_a? Query query = Query.define(query) end @storage.remove(query) end
# File lib/rbot/journal.rb, line 338 def shutdown log 'journal shutdown' @subscriptions.clear @topic_subs.clear @queue << nil @thread.join @thread = nil end
Subscribe to receive messages from a topic.
You can use this method to subscribe to messages that are published within a specified topic. You must provide a receiving block to receive messages one-by-one. The method returns an instance of Subscription
that can be used to cancel the subscription by invoking cancel on it.
journal.subscribe('irclog') do |message| # received irclog messages... end
# File lib/rbot/journal.rb, line 366 def subscribe(topic=nil, &block) raise ArgumentError.new unless block_given? s = Subscription.new(self, topic, block) @subscriptions << s unless @topic_subs.has_key? topic @topic_subs[topic] = [] end @topic_subs[topic] << s s end
# File lib/rbot/journal.rb, line 377 def unsubscribe(s) if @topic_subs.has_key? s.topic @topic_subs[s.topic].delete(s) end @subscriptions.delete s end