h_nosonの日記

競プロ、CTFなど

Amazon Dash ButtonでAC数をカウントする

巷で流行りのAmazon Dash Buttonで何かしようということで,競プロのAC数を数えてみたいと思います.

環境

ボタンが押されたことを検知

ボタンが押されたことを検知できないと何も始まらないのでまず検知を行う.
参考:Amazon Dash ButtonをただのIoTボタンとして使う - Qiita

  • Amazon Dash Buttonはボタンが押される度に,IPアドレスを取得するためDHCPサーバにリクエストを行っている
  • DHCPリクエストはブロードキャストで行われる
  • ほとんどのデバイスは,IPアドレスを受け取った後IPアドレスが重複していないか確認するため,ARPプローブを行う(これもブロードキャスト)

DHCPリクエストかARPプローブを監視すればボタンが押されたことを検知できる.
ほとんどの方がARPプローブの監視をしていたけど,自分の場合はARPプローブが行われずARPリクエストが2回行われていたため2回検知することを避けてDHCPリクエストを監視することにした.
scapyに関しては
Scapyで作る・解析するパケットUsage — Scapy v2.1.1-dev documentation
を参考にした.

DHCPリクエストはUDP port 67なので

sniff(filter="udp port 67", prn=lambda x: x.show())

を実行した状態でボタンを押すと以下のようにMACアドレスがわかる.

###[ Ethernet ]###
  dst= ff:ff:ff:ff:ff:ff
  src= xx:xx:xx:xx:xx:xx
  type= 0x800
###[ IP ]###
....

これでMACアドレスによるfilterも行える.

sniff(filter="ether src xx:xx:xx:xx:xx:xx and udp port 67", prn=count)

パケットを受け取った際にprnに設定した関数が呼ばれるので,そこでカウントが行えるようにした.このプログラムではjsonを読み込んでその日のAC数をインクリメントしている.

from scapy.all import sniff
import json
import time
from datetime import date,timedelta,datetime

def count(_):
    try:
        with open("ACcount.json", "r") as fp:
            data = json.load(fp)
    except IOError:
        data = json.loads('{}');

    d = date.today()
    year = str(d.year)
    month = str(d.month)
    day = str(d.day)
    if not year in data:
        data.update({year:{month:{day:0}}})
    elif not month in data[year]:
        data[year].update({month:{day:0}})
    elif not day in data[year][month]:
        data[year][month].update({day:0})
    data[year][month][day] += 1

    print "AC!" + " (" + str(time.ctime()) + ")"
    print "today's AC: " + str(data[year][month][day])
    with open("ACcount.json", "w") as fp:
        json.dump(data, fp)

sniff(filter="ether src xx:xx:xx:xx:xx:xx and udp port 67", prn=count)

これでAC数を数えることができるようになった.

数えただけだとつまらないので一日のAC数をtweetさせてみようと思う.先ほど作ったjsonファイルを読み込んでその内容を23:59にtweetするように調整した.

from requests_oauthlib import OAuth1Session
from datetime import date,timedelta,datetime
import json
import time
import sched

CK = 'XXXXXXXXXXXXXXXXXXXXXXXXX'                          # Consumer Key
CS = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' # Consumer Secret
AT = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' # Access Token
AS = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'      # Access Token Secret
twitter = OAuth1Session(CK,CS,AT,AS)

def get_msg_for_day(data):
    d = date.today()
    year = str(d.year)
    month = str(d.month)
    day = str(d.day)
    if year in data and month in data[year] and day in data[year][month]:
        return u"今日のAC数: " + str(data[year][month][day])
    else:
        return ""

def get_msg_for_month(data):
    d = date.today()
    nd = date.today() + timedelta(days=1)
    if d.month == nd.month: return ""
    year = str(d.year)
    month = str(d.month)
    if year in data and month in data[year]:
        sm = 0
        for day in data[year][month]:
            sm += data[year][month][day]
        return u"今月のAC数: " + str(sm)
    else:
        return ""

def get_msg_for_year(data):
    d = date.today()
    nd = date.today() + timedelta(days=1)
    if d.year == nd.year: return ""
    year = str(d.year)
    if year in data:
        sm = 0
        for month in data[year]:
            for day in data[year][month]:
                sm += data[year][month][day]
        return u"今年のAC数: " + str(sm)
    else:
        return ""

def get_msg():
    try:
        with open("ACcount.json","r") as fp:
            data = json.load(fp)
    except IOError:
        data = json.loads("{}")
    msgs = []
    msgs.append(get_msg_for_day(data))
    msgs.append(get_msg_for_month(data))
    msgs.append(get_msg_for_year(data))
    return '\n'.join(filter(lambda s: s != "",msgs))

def tweet():
    message = get_msg()
    if message == "":
        print("no AC")
    else:
        print("Sending message...")
        print(message)
        url = "https://api.twitter.com/1.1/statuses/update.json"
        params = {'status': message}
        res = twitter.post(url, params = params)
        if res.status_code == 200:
            print("OK")
        else:
            print("Error: %d" % res.status_code)
    add_event()

def add_event():
    d = datetime.today()
    nextd = datetime(d.year,d.month,d.day) + timedelta(days=1) - timedelta(seconds=30)
    print("supposed to tweet at " + str(nextd))
    s.enter((nextd-d).total_seconds(),1,tweet,())

print("Twitter client is running")
s = sched.scheduler(time.time,time.sleep)
add_event()
s.run()

面倒な点

  • ACする度にボタンを押さないといけない
  • tweetするときにノートPCを開いておく必要がある(サーバがほしい)

AC数をカウントすることで今後のモチベーションにつながることを期待してる