Commit 6e284cb2ba3037bba5a19b98ab712dee6fc54631

Authored by Marius Hanne
1 parent c4a2b594b1

CommandClient to connect to and receive data from CommandHandler

Showing 5 changed files with 134 additions and 26 deletions Side-by-side Diff

... ... @@ -43,29 +43,62 @@
43 43  
44 44 EM.run do
45 45  
46   - EM.connect(host, port) do |connection|
  46 + Bitcoin::Network::CommandClient.connect(host, port) do
  47 + on_response do |cmd, data|
  48 + EM.stop unless cmd == "monitor"
  49 + end
  50 + on_info do |info|
  51 + puts JSON.pretty_generate(info)
  52 + end
  53 + on_connections do |connections|
  54 + puts *connections
  55 + end
  56 + [:connect, :disconnect, :getblocks, :getaddr, :relay_tx, :stop].each do |req|
  57 + send("on_#{req}") {|res| p res}
  58 + end
47 59  
48   - connection.send_data([ARGV[0], ARGV[1..-1].join(" ")].to_json + "\x00")
49   -
50   - def connection.receive_data(data)
51   - (@buf ||= BufferedTokenizer.new("\x00")).extract(data).each do |packet|
52   - cmd, result = JSON.load(packet)
53   - next unless cmd == ARGV[0]
54   - if cmd == "monitor"
55   - type, obj, depth = result
56   - puts "#{type}: #{obj['hash'] rescue 'none'} #{depth ? "(#{depth})" : ""}"
57   - else
58   - puts JSON.pretty_generate(result)
59   - EM.stop
60   - end
  60 + on_block do |block, depth|
  61 + puts "block: #{block['hash']} (#{depth})"
  62 + end
  63 + on_tx do |tx|
  64 + puts "tx: #{tx['hash']}"
  65 + end
  66 + on_connection do |type, host|
  67 + if type == "connected"
  68 + puts "Connected: #{host['host']}:#{host['port']}"
  69 + else
  70 + puts "Disconnected: #{host.inspect}"
61 71 end
62 72 end
63 73  
64   - def connection.unbind
65   - puts "Disconnected."
66   - EM.stop
  74 + on_connected do
  75 + request(ARGV[0], *(ARGV[1] || "").split(" "))
67 76 end
68   -
69 77 end
  78 + # client.request("monitor", "connection")
  79 + # EM.connect(host, port) do |connection|
  80 +
  81 + # connection.send_data([ARGV[0], ARGV[1..-1].join(" ")].to_json + "\x00")
  82 +
  83 + # def connection.receive_data(data)
  84 + # (@buf ||= BufferedTokenizer.new("\x00")).extract(data).each do |packet|
  85 + # cmd, result = JSON.load(packet)
  86 + # next unless cmd == ARGV[0]
  87 + # if cmd == "monitor"
  88 + # type, obj, depth = result
  89 + # puts "#{type}: #{obj['hash'] rescue 'none'} #{depth ? "(#{depth})" : ""}"
  90 + # else
  91 + # puts JSON.pretty_generate(result)
  92 + # EM.stop
  93 + # end
  94 + # end
  95 + # end
  96 +
  97 + # def connection.unbind
  98 + # puts "Disconnected."
  99 + # EM.stop
  100 + # end
  101 +
  102 + # end
70 103 end
... ... @@ -21,6 +21,7 @@
21 21 module Network
22 22 autoload :ConnectionHandler, 'bitcoin/network/connection_handler'
23 23 autoload :CommandHandler, 'bitcoin/network/command_handler'
  24 + autoload :CommandClient, 'bitcoin/network/command_client'
24 25 autoload :Node, 'bitcoin/network/node'
25 26 end
26 27  
lib/bitcoin/network/command_client.rb
  1 +class Bitcoin::Network::CommandClient < EM::Connection
  2 +
  3 + def initialize host, port, block, *args
  4 + @host, @port = host, port
  5 + @args = args
  6 + @callbacks = {}
  7 + @block = block
  8 + instance_eval &block if block
  9 + @buffer = BufferedTokenizer.new("\x00")
  10 + end
  11 +
  12 + def log
  13 + return @log if @log
  14 + @log = Bitcoin::Logger.create("command_client")
  15 + @log.level = :info
  16 + @log
  17 + end
  18 +
  19 + def self.connect host, port, *args, &block
  20 + client = EM.connect(host, port, self, host, port, block, *args)
  21 + end
  22 +
  23 + def post_init
  24 + log.info { "Connected" }
  25 + callback :connected
  26 + end
  27 +
  28 + def unbind
  29 + log.info { "Disconnected" }
  30 + sleep 1
  31 + EM.connect(@host, @port, self.class, @host, @port, @block, *@args)
  32 + end
  33 +
  34 + def request cmd, *args
  35 + log.info { "request: #{cmd} #{args.inspect}" }
  36 + register_monitor_callbacks if cmd.to_sym == :monitor
  37 + send_data([cmd, args].to_json + "\x00")
  38 + end
  39 +
  40 + def receive_data data
  41 + @buffer.extract(data).each do |packet|
  42 + cmd, *data = *JSON.load(packet)
  43 + log.info { d = data.inspect
  44 + "response: #{cmd} #{d[0...50]}#{d.size > 50 ? '...' : ''}" }
  45 + callback(:response, cmd, *data)
  46 + callback(cmd.to_sym, *data)
  47 + end
  48 + end
  49 +
  50 + def callback name, *args
  51 + cb = @callbacks[name.to_sym]
  52 + return unless cb
  53 + log.debug { "callback: #{name}" }
  54 + cb.call(*args)
  55 + end
  56 +
  57 + def method_missing(name, *args, &block)
  58 + if name =~ /^on_/
  59 + @callbacks[name.to_s.split("on_")[1].to_sym] = block
  60 + log.debug { "callback #{name} registered" }
  61 + else
  62 + super(name, *args)
  63 + end
  64 + end
  65 +
  66 + def register_monitor_callbacks
  67 + on_monitor do |type, data|
  68 + callback(type, *data)
  69 + end
  70 + end
  71 +
  72 +end
lib/bitcoin/network/command_handler.rb
... ... @@ -15,6 +15,7 @@
15 15 end
16 16  
17 17 def respond(cmd, data)
  18 + return unless data
18 19 @lock.synchronize do
19 20 send_data([cmd, data].to_json + "\x00")
20 21 end
21 22  
... ... @@ -22,10 +23,8 @@
22 23  
23 24 def receive_data data
24 25 @buf.extract(data).each do |packet|
25   - p packet
26 26 cmd, args = JSON::parse(packet)
27   - *args = args.split(" ")
28   - log.debug { line.chomp }
  27 + log.debug { [cmd, args] }
29 28 if respond_to?("handle_#{cmd}")
30 29 respond(cmd, send("handle_#{cmd}", *args))
31 30 else
32 31  
... ... @@ -44,13 +43,14 @@
44 43 case channel.to_sym
45 44 when :block
46 45 head = Bitcoin::P::Block.new(@node.store.get_head.to_payload) rescue nil
47   - respond("monitor", ["block", head, @node.store.get_depth.to_s])
  46 + respond("monitor", ["block", [head, @node.store.get_depth.to_s]])
48 47 when :connection
49 48 @node.connections.select {|c| c.connected?}.each do |conn|
50 49 respond("monitor", [:connection, [:connected, conn.info]])
51 50 end
52 51 end
53 52 end
  53 + nil
54 54 end
55 55  
56 56 def handle_info
... ... @@ -118,6 +118,8 @@
118 118 tx = Bitcoin::Protocol::Tx.new([data].pack("H*"))
119 119 @node.relay_tx(tx)
120 120 tx.to_hash
  121 + rescue
  122 + {:error => $!}
121 123 end
122 124  
123 125 def handle_stop
lib/bitcoin/network/node.rb
... ... @@ -243,9 +243,9 @@
243 243 if @store.send("store_#{obj[0]}", obj[1])
244 244 if obj[0].to_sym == :block
245 245 block = @store.get_block(obj[1].hash)
246   - @notifiers[:block].push([obj[0], obj[1], block.depth])
  246 + @notifiers[:block].push([obj[1], block.depth]) if block.chain == 0
247 247 else
248   - @notifiers[:tx].push([obj[0], obj[1]])
  248 + @notifiers[:tx].push([obj[1]])
249 249 end
250 250 end
251 251 rescue
252 252  
... ... @@ -259,9 +259,9 @@
259 259 # check for new items in the inv queue and process them,
260 260 # unless the queue is already full
261 261 def work_inv_queue
262   - @log.debug { "inv queue worker running" }
263 262 EM.defer(nil, proc { work_inv_queue }) do
264 263 sleep @config[:intervals][:inv_queue] if @inv_queue.size == 0
  264 + @log.debug { "inv queue worker running" }
265 265 next if @queue.size >= @config[:max][:queue]
266 266 while inv = @inv_queue.shift
267 267 # next if @store.send("has_#{inv[0]}", inv[1])