API使って為替レートを取得して5ティック毎にランダムな注文をしてくれるPythonコードを書いてみた
こんにちは、アルゴンです。今日はOANDAのAPIを使って、為替レートを取得して5ティック毎に買い・売りの注文をランダムに選んで注文するPythonのコードを書いてみました。ってことで、自分用メモのためにも記事を書きます。
【人気記事】
当然ですが、これでお金儲けができるわけではなく、機械学習とは程遠い頭の悪いトレードシステムですが、まずはAPIに触ってみるのが目的です。
概要と実行環境について
簡単に今回書いたコードの概要と実行環境についてです。
概要
- OANDAのAPIを使ってみる練習のコード
- デモ口座でやるので脇が甘くても良い
- 今後拡張しやすいような記述を心がける
- API経由で価格のストリーミングを取得
- 5回ティックを取得したら買いor売りをランダムで決定
- コードが選んだ注文をAPIに流す
- それをひたすら繰り返す
実行環境
- Python 3.6
- Mac Book Air
- 外部ライブラリなど使っていません
ファイル構成
これから、色々と改良を加えたいので、それぞれの機能に応じてファイルが分かれています。全部で6つのpythonファイルを作成。全部、同じ階層のディレクトリに入れて実行しました。
- event.py – イベントをまとめる
- execution.py – オーダーの実行
- settings.py – API接続で必要な設定関連
- strategy.py – トレードルール
- streaming.py – 価格ストリーミング
- trading.py – 全部のクラスをまとめて実行する
コード書く前の下準備
OANDAのデモ口座アカウントが必要です。(なぜOANDAのAPIにしたのかはこちらの記事参照)デモ口座解説申請をすると、すぐにログインが可能になります。
デモ口座の管理画面へログインをすると、トップページにてAPIアクセスの管理のメニューがあります。こちらで、APIトークンの発行が必要です。
このトークンを使ってAPIに接続するので、無くしたら困ります。加えて、ホームページ上で注釈も書いてありますが、
personal access tokenはパスワードと同様大切な機密情報です。第三者に譲渡、貸与、売買したり、使用させたりしないよう厳重に管理をお願い致します。このトークンはOANDAアカウント内で一意であり、OANDA内で厳重に管理しております。
ってことなので、気をつけて管理しましょう。
APIとの接続の設定
下準備も終わったので、実際にコードを書いていきましょう。上の実行環境でも書きましたが、Python3を使っています。もし記事を見て、コードを動かしてみようと考えている方は、Pythonのバージョンだけ注意ください。Python2だと動きません。(エラーが出るはずです)
まずは今回の要とも言える、APIの接続の設定をPythonで書いてあげましょう。APIの仕様書を見て見たら、どうやら接続先はoanda.comのようでした。日本OANDAはドメインがoanda.jpなので、これで本番APIも大丈夫なのかしらというちょっとした疑問。(とりあえず今回はデモなのでスルーしますが)
APIの接続先として、 real は本番用、 practice はデモ用、 sandbox はサンドボックスのAPIです。今後の拡張のために、3つの異なるAPI接続先を事前に入れておく。
settings.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
ENVIRONMENTS = { "streaming": { "real": "stream-fxtrade.oanda.com", "practice": "stream-fxpractice.oanda.com", "sandbox": "stream-sandbox.oanda.com" }, "api": { "real": "api-fxtrade.oanda.com", "practice": "api-fxpractice.oanda.com", "sandbox": "api-sandbox.oanda.com" } } DOMAIN = "practice" STREAM_DOMAIN = ENVIRONMENTS["streaming"][DOMAIN] API_DOMAIN = ENVIRONMENTS["api"][DOMAIN] ACCESS_TOKEN = '' #APIのアクセストークンを入れます ACCOUNT_ID = '' #デモ口座アカウントIDを入れます |
コメントも入れていますが、 ACCESS_TOKEN (APIトークン)と ACCOUNT_ID (口座のID)はご自身のものを入力する必要があります。
APIトークンは前述した通り、OANDAの口座管理画面の「APIアクセスの管理」のリンクから取得が可能です。下のはダミーですが、こんな感じの文字列です。
123456789abcderg123456789abcdefg-123456789abcderg123456789abcdefg
同様に口座のIDも管理画面から取得が可能です。トップページの「口座情報」の欄に名前とアカウントのIDの記載があるので、そちらをコピペして上記のコードに入力してあげましょう。(アカウントIDは私のは7桁の数字でした)
イベントをまとめるクラス
次はイベントをまとめるコードを書いてあげましょう。とりあえず最低限必要なティックのイベントとオーダー(注文)のイベントの2つのみ。
event.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
class Event(object): pass class TickEvent(Event): def __init__(self, instrument, time, bid, ask): self.type = 'TICK' self.instrument = instrument self.time = time self.bid = bid self.ask = ask class OrderEvent(Event): def __init__(self, instrument, units, order_type, side): self.type = 'ORDER' self.instrument = instrument self.units = units self.order_type = order_type self.side = side |
トレードルールのクラス
次はトレードのルールを書きましょう。本来であれば、ここが一番重要なコードですが…今回はひとまずランダムに注文を流すルールを書いてあげます。
Pythonの random をインポートして、ティックが5回流れてきたら「買い」か「売り」をランダムに決めて、注文処理を流します。
strategy.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import random from event import OrderEvent class TestRandomStrategy(object): def __init__(self, instrument, units, events): self.instrument = instrument self.units = units self.events = events self.ticks = 0 def calculate_signals(self, event): if event.type == 'TICK': self.ticks += 1 if self.ticks % 5 == 0: side = random.choice(["buy", "sell"]) order = OrderEvent( self.instrument, self.units, "market", side ) self.events.put(order) |
注文を実行するクラス
トレードのルールが出来たので、次は実際にAPIに接続をして注文を流す処理です。setting.pyで設定したAPIトークンや口座IDを引っ張ってきて、APIへ接続して処理を投げてあげます。
execution.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import http.client import urllib.request, urllib.parse, urllib.error class Execution(object): def __init__(self, domain, access_token, account_id): self.domain = domain self.access_token = access_token self.account_id = account_id self.conn = self.obtain_connection() def obtain_connection(self): return http.client.HTTPSConnection(self.domain) def execute_order(self, event): headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization": "Bearer " + self.access_token } params = urllib.parse.urlencode({ "instrument" : event.instrument, "units" : event.units, "type" : event.order_type, "side" : event.side }) self.conn.request( "POST", "/v1/accounts/%s/orders" % str(self.account_id), params, headers ) response = self.conn.getresponse().read() print(response) |
FXの価格をストリーミングするクラス
さて、次は為替の価格をAPIから引っ張ってきてストリーミングする処理を書きましょう。2つのメソッドに分けて、APIから価格を引っ張って処理します。s
connect_to_stream で、Pythonの requests のライブラリを使って、APIのストリーミングソケットにパラメーターとヘッダーでアクセスします。API側のURLを叩いてレスポンスを受け取ります。次に stream_to_queue でAPIから戻ってきたレスポンスを読み込んで、内容を確認した上で、プリントしてあげます。
streaming.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
import requests import json from event import TickEvent class StreamingForexPrices(object): def __init__( self, domain, access_token, account_id, instruments, events_queue ): self.domain = domain self.access_token = access_token self.account_id = account_id self.instruments = instruments self.events_queue = events_queue def connect_to_stream(self): try: s = requests.Session() url = "https://" + self.domain + "/v1/prices" headers = {'Authorization' : 'Bearer ' + self.access_token} params = {'instruments' : self.instruments, 'accountId' : self.account_id} req = requests.Request('GET', url, headers=headers, params=params) pre = req.prepare() resp = s.send(pre, stream=True, verify=False) return resp except Exception as e: s.close() print(("Caught exception when connecting to stream\n") + str(e)) def stream_to_queue(self): response = self.connect_to_stream() if response.status_code != 200: return for line in response.iter_lines(1): if line: try: msg = json.loads(line) except Exception as e: print(("Caught exception when converting message into json\n") + str(e)) return if "instrument" in msg or "tick" in msg: print(msg) instrument = msg["tick"]["instrument"] time = msg["tick"]["time"] bid = msg["tick"]["bid"] ask = msg["tick"]["ask"] tev = TickEvent(instrument, time, bid, ask) self.events_queue.put(tev) |
並行処理で価格取得と注文をする
今回の超絶シンプルなトレードシステムの最低限の要素が整ったので、最後に trading.py として、今までの処理をまとめて実行するコードを書きます。
こちらのtrading.pyですが、Pythonの threading を使って、プライスストリーミングと注文の2つの処理を並列処理してあげます。プライスの取得/ストリーミングと注文の二つの処理を同時に走らせる必要があるので、 threading は重要です。
コード内に詳細のコメントを記載しましたので、詳細はコードを見た方が早いかと思います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
import queue import threading import time from execution import Execution from settings import STREAM_DOMAIN, API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID from strategy import TestRandomStrategy from streaming import StreamingForexPrices def trade(events, strategy, execution): while True: try: event = events.get(False) except queue.Empty: pass else: if event is not None: if event.type == 'TICK': strategy.calculate_signals(event) elif event.type == 'ORDER': print("Executing order!") execution.execute_order(event) time.sleep(heartbeat) if __name__ == "__main__": heartbeat = 0.5 # 0.5秒間隔 events = queue.Queue() # ドル円の1万通貨を取引 instrument = "USD_JPY" units = 10000 # OANDAの為替価格ストリーミングを取得 # アカウント情報などをsetting.pyからインポートしてます prices = StreamingForexPrices( STREAM_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID, instrument, events ) # 価格ストリーミングの実行。execution.pyから読み込み execution = Execution(API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID) # strategy.pyから今回の取引ルールの読み込み # 5ティックごとにランダムで決めてます strategy = TestRandomStrategy(instrument, units, events) # スレッドベースの並列処理で取引処理と価格ストリーミング処理を作成 trade_thread = threading.Thread(target=trade, args=(events, strategy, execution)) price_thread = threading.Thread(target=prices.stream_to_queue, args=[]) # 両方のスレッドを開始! trade_thread.start() price_thread.start() |
まとめと次のステップ
気づいたら、もうジムに行かなくてはいけない時間なので、今日はここまでです(笑)。ひとまず今日の記事では、APIに接続設定から始まり、為替価格のストリーミングの取得、さらにオーダー注文のルールを決めて処理を行うコードを書きました。
本記事の続編として、明日に実行編と今後の改良についてまとめたいと思います。
ブログを読んでいただいてありがとうございます!毎度ですが、Twitterでも絶賛配信しているので、フォローをお願いします!
本記事の続編となる「【実行編】為替レートをAPIで取得して売り買い注文をだすPythonコード」を公開しました。
ディスカッション
コメント一覧
わかりやすい記事をありがとうございます。
oanda v20のデモ口座でこのトレードシステムを動かしてみたのですが、queue.Emptyの例外処理になってしまいます、どうしてでしょうか…
コメントありがとうございます!私の環境では動作しているのですが、、ライブラリなどのバージョン違いなどで動作しないのかもしれないです・・時間がある時に一度検証してみます!
6:from settings import STREAM_DOMAIN, API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID
settings モジュールの タイトル表記が setting.py になっています
こちらご指摘ありがとうございます!おっしゃる通りモジュール名が間違っていますね。。修正しました!