众所周知,币圈一天,人间一年。我们进行数字货币交易时,在交易所 APP 或者网站 盯盘并手动下单非常耗时,当币价波动非常大的时候往往会错失良机。这时我们可以创建一个简单的 telegram 交易机器人,来帮助我们进行做空和做多交易。
该机器人可以实现以下功能:
- 做空交易 - 以指定的价格卖出持有货币并在价格下跌时回购
- 做多交易 - 指定的价格购买货币并在价格上涨时卖出
- 列出交易订单
- 显示可用余额
设置 Telegram 机器人
首先需要一个 Telegram 账号,如果没有的话请自己注册一个。然后与BotFather
进行对话,通过输入/newbot
来新建一个telegram机器人,根据指示一步步创建并记住你的token
。
获取交易所的 API keys
查找你的交易所API
文档,看看如何获取对订单和账户余额的访问权限和步骤,记住你的密码和API keys
。本例中我们以bitfinex
为例,Bitmex交易所是目前市面上交易量最大的比特币期货交易所,交易量和交易深度非常大。
安装依赖包
我们这边用的是Python 3.6
版本,同时我们还需要利用CCXT框架获取Bitmex交易所数据,CCXT
是一个JavaScript / Python / PHP
开发库,用于数字货币的交易,支持众多的比特币/以太币/山寨币交易市场和交易所API
。
CCXT
库用于连接数字货币交易所并在世界范围内进行交易和支付处理。使用 ccxt
可以快速访问数字货币市场数据,可以用于存储、分析、可视化、指标开发、 量化交易、策略回溯测试、交易机器人程序以及相关的软件工程。
然后我们将使用python-telegram-bot
与Telegram
进行通讯,对聊天消息做出反应并进行交易。
只需要用下面方法安装以上两个依赖包:
pip install python-telegram-bot ccxt
我们需要交易机器人实现的基本类功能:
1、获取交易所概况,允许创建订单,列出订单详情并获取余额。这将是以 ccxt
实现的装饰器。
2、交易执行者,因为我们希望自动执行做空和做多交易。
3、即时响应的telegram
机器人。
编写机器人
项目结构如下:
main.py
\config
\core
\model
\util
我们将从一个简单的模型开始。因为多空交易两者有很多共同点,可以在\ model
中创建一个基类TradeDetails
:
import abc
class TradeDetails(metaclass=abc.ABCMeta):
def \_\_init\_\_(self, start\_price: float, symbol: str, amount: float, currency: str = "USD"):
self.start_price = start_price
self.symbol = symbol.upper()
self.amount = amount
self.currency = currency
@property
def exchange\_symbol(self):
return f"{self.symbol.upper()}/{self.currency}"
@property
@abc.abstractmethod
def exit\_price(self):
pass
def \_\_str\_\_(self) -> str:
return f"order for {self.amount} {self.exchange\_symbol} with enter price: {self.start\_price:.5}, " \
f"exit\_price: {self.exit\_price:.5}"
具体的为:
LongTrade
from fasttrade.model.trade import TradeDetails
class LongTrade(TradeDetails):
def \_\_init\_\_(self, start\_price: float, symbol: str, amount: float, percent\_change: float = 0.5,
currency: str = "USD") -> None:
super().__init__(start_price, symbol, amount, currency)
self.end_price = start_price * (1 + percent_change / 100)
@property
def exit\_price(self):
return self.end_price
def \_\_str\_\_(self) -> str:
return "Long " + super().__str__()
ShortTrade
from fasttrade.model.trade import TradeDetails
class ShortTrade(TradeDetails):
def \_\_init\_\_(self, start\_price: float, symbol: str, amount: float, percent\_change: float = 0.5,
currency: str = "USD") -> None:
super().__init__(start_price, symbol, amount, currency)
self.end_price = start_price * (1 - percent_change / 100)
@property
def exit\_price(self):
return self.end_price
def \_\_str\_\_(self) -> str:
return "Short " + super().__str__()
接下来是获取交易所数据:
from ccxt import Exchange, OrderNotFound
class CryptoExchange:
def \_\_init\_\_(self, exchange: Exchange):
self.exchange = exchange
self.exchange.load_markets()
@property
def free\_balance(self):
balance = self.exchange.fetch_free_balance()
# surprisingly there are balances with 0, so we need to filter these out
return {k: v for k, v in balance.items() if v > 0}
def fetch\_open\_orders(self, symbol: str = None):
return self.exchange.fetch_open_orders(symbol=symbol)
def fetch\_order(self, order\_id: int):
return self.exchange.fetch_order(order_id)
def cancel\_order(self, order\_id: int):
try:
self.exchange.cancel_order(order_id)
except OrderNotFound:
# treat as success
pass
def create\_sell\_order(self, symbol: str, amount: float, price: float):
return self.exchange.create_order(symbol=symbol, type="limit", side="sell", amount=amount, price=price)
def create\_buy\_order(self, symbol: str, amount: float, price: float):
return self.exchange.create_order(symbol=symbol, type="limit", side="buy", amount=amount, price=price)
然后,我们将执行交易程序。程序将接受交易所数据和超时情况以检查订单是否完成。当做空时,我们以设定的价格卖出,当价格下降到一定水平时回购。我们使用asyncio
协程进行编码,以使等待不会阻塞:
import asyncio
import logging
from ccxt import ExchangeError
from model.longtrade import LongTrade
from model.shorttrade import ShortTrade
class TradeExecutor:
def \_\_init\_\_(self, exchange, check\_timeout: int = 15):
self.check_timeout = check_timeout
self.exchange = exchange
async def execute\_trade(self, trade):
if isinstance(trade, ShortTrade):
await self.execute_short_trade(trade)
elif isinstance(trade, LongTrade):
await self.execute_long_trade(trade)
async def execute\_short\_trade(self, trade: ShortTrade):
sell_price = trade.start_price
buy_price = trade.exit_price
symbol = trade.exchange_symbol
amount = trade.amount
order = self.exchange.create_sell_order(symbol, amount, sell_price)
logging.info(f'Opened sell order: {amount} of {symbol}. Target sell {sell\_price}, buy price {buy\_price}')
await self._wait_order_complete(order['id'])
# post buy order
order = self.exchange.create_buy_order(symbol, amount, buy_price)
await self._wait_order_complete(order['id'])
logging.info(f'Completed short trade: {amount} of {symbol}. Sold at {sell\_price} and bought at {buy\_price}')
async def execute\_long\_trade(self, trade: LongTrade):
buy_price = trade.start_price
sell_price = trade.exit_price
symbol = trade.exchange_symbol
amount = trade.amount
order = self.exchange.create_buy_order(symbol, amount, buy_price)
logging.info(f'Opened long trade: {amount} of {symbol}. Target buy {buy\_price}, sell price {sell\_price}')
await self._wait_order_complete(order.id)
# post sell order
order = self.exchange.create_sell_order(symbol, amount, sell_price)
await self._wait_order_complete(order.id)
logging.info(f'Completed long trade: {amount} of {symbol}. Bought at {buy\_price} and sold at {sell\_price}')
async def \_wait\_order\_complete(self, order\_id):
status = 'open'
while status is 'open':
await asyncio.sleep(self.check_timeout)
order = self.exchange.fetch_order(order_id)
status = order['status']
logging.info(f'Finished order {order\_id} with {status} status')
# do not proceed further if we canceled order
if status == 'canceled':
raise ExchangeError('Trade has been canceled')
ccxt
使用REST API
进行数据传输。它不如某些交易所支持的WebSockets
快,但是对于这个简单的机器人来说,速度或许差别。
async def \_wait\_order\_complete(self, order\_id):
status = 'open'
order = None
while status is 'open':
await asyncio.sleep(self.check_timeout)
order = self.exchange.fetch_order(order_id)
status = order['status']
logging.info(f'Finished order {order\_id} with {status} status')
# do not proceed further if we canceled order
if status == 'canceled':
raise ExchangeError('Trade has been canceled')
return order
接下来将创建Telegram
机器人,这是最有难度的部分,我们将使其拥有以下指令:
1、列出/取消有效订单
2、显示可用余额
3、建立做多或做空交易
我们还需要对机器人做一些安全限制,使其仅对你的消息做出响应,而其他人则无法使用你的帐户进行交易。
主要是进行做多和做空交易的部分:
1、选择做空或者做多
2、输入数字货币品种
3、输入交易数量
4、所占百分比
5、每个价格
6、显示确认信息
7、显示最终交易信息
我们来创建telegrambot.py
并添加以下常量:
SELECTION = "selection"
SHORT_TRADE = "short\_trade"
LONG_TRADE = "long\_trade"
OPEN_ORDERS = "open\_orders"
FREE_BALANCE = "free\_balance"
CANCEL_ORD = "cancel\_order"
PROCESS_ORD_CANCEL = "process\_ord\_cancel"
COIN_NAME = "coin\_select"
PERCENT_CHANGE = "percent\_select"
AMOUNT = "amount"
PRICE = "price"
PROCESS_TRADE = "process\_trade"
CONFIRM = "confirm"
CANCEL = "cancel"
END_CONVERSATION = ConversationHandler.END
我们可以通过扩展BaseFilter
来实现对user_id
的限制。这样机器人必须接受被允许用户的token
、id
才能执行操作。
class TelegramBot:
class PrivateUserFiler(BaseFilter):
def \_\_init\_\_(self, user\_id):
self.user_id = int(user_id)
def filter(self, message):
return message.from_user.id == self.user_id
def \_\_init\_\_(self, token: str, allowed\_user\_id, trade\_executor: TradeExecutor):
self.updater = Updater(token=token)
self.dispatcher = self.updater.dispatcher
self.trade_executor = trade_executor
self.exchange = self.trade_executor.exchange
self.private_filter = self.PrivateUserFiler(allowed_user_id)
self._prepare()
在_prepare()
函数中,我们将创建所有处理函数并将其附加到调度程序。我们开始与机器人聊天时希望显示的基本选项:
def \_prepare(self):
# Create our handlers
def show\_help(bot, update):
update.effective_message.reply_text('Type /trade to show options ')
def show\_options(bot, update):
button_list = [
[InlineKeyboardButton("Short trade", callback_data=SHORT_TRADE),
InlineKeyboardButton("Long trade", callback_data=LONG_TRADE), ],
[InlineKeyboardButton("Open orders", callback_data=OPEN_ORDERS),
InlineKeyboardButton("Available balance", callback_data=FREE_BALANCE)],
]
update.message.reply_text("Trade options:", reply_markup=InlineKeyboardMarkup(button_list))
return TRADE_SELECT
InlineKeyboardButton
允许我们将文本选项显示为键盘。这比键入所有命令更为直观。callback_data
允许在按下按钮时传递其他数据。show_options
返回下一个继续进行对话的处理函数的名称。其他处理函数将使用类似的方法。然后我们执行用户选择的处理程序。在这里,我们主要从一个问题转到另一个问题:
def process\_trade\_selection(bot, update, user\_data):
query = update.callback_query
selection = query.data
if selection == OPEN_ORDERS:
orders = self.exchange.fetch_open_orders()
if len(orders) == 0:
bot.edit_message_text(text="You don't have open orders",
chat_id=query.message.chat_id,
message_id=query.message.message_id)
return END_CONVERSATION
# show the option to cancel active orders
keyboard = [
[InlineKeyboardButton("Ok", callback_data=CONFIRM),
InlineKeyboardButton("Cancel order", callback_data=CANCEL)]
]
bot.edit_message_text(text=formatter.format_open_orders(orders),
chat_id=query.message.chat_id,
message_id=query.message.message_id,
reply_markup=InlineKeyboardMarkup(keyboard))
# attach opened orders, so that we can cancel by index
user_data[OPEN_ORDERS] = orders
return CANCEL_ORD
elif selection == FREE_BALANCE:
balance = self.exchange.free_balance
msg = "You don't have any available balance" if len(balance) == 0 \
else f"Your available balance:\n{formatter.format\_balance(balance)}"
bot.edit_message_text(text=msg,
chat_id=query.message.chat_id,
message_id=query.message.message_id)
return END_CONVERSATION
user_data[TRADE_SELECT] = selection
bot.edit_message_text(text=f'Enter coin name for {selection}',
chat_id=query.message.chat_id,
message_id=query.message.message_id)
return COIN_NAME
def cancel\_order(bot, update):
query = update.callback_query
if query.data == CANCEL:
query.message.reply_text('Enter order index to cancel: ')
return PROCESS_ORD_CANCEL
show_help(bot, update)
return END_CONVERSATION
def process\_order\_cancel(bot, update, user\_data):
idx = int(update.message.text)
order = user_data[OPEN_ORDERS][idx]
self.exchange.cancel_order(order['id'])
update.message.reply_text(f'Canceled order: {formatter.format\_order(order)}')
return END_CONVERSATION
def process\_coin\_name(bot, update, user\_data):
user_data[COIN_NAME] = update.message.text.upper()
update.message.reply_text(f'What amount of {user\_data[COIN\_NAME]}')
return AMOUNT
def process\_amount(bot, update, user\_data):
user_data[AMOUNT] = float(update.message.text)
update.message.reply_text(f'What % change for {user\_data[AMOUNT]} {user\_data[COIN\_NAME]}')
return PERCENT_CHANGE
def process\_percent(bot, update, user\_data):
user_data[PERCENT_CHANGE] = float(update.message.text)
update.message.reply_text(f'What price for 1 unit of {user\_data[COIN\_NAME]}')
return PRICE
def process\_price(bot, update, user\_data):
user_data[PRICE] = float(update.message.text)
keyboard = [
[InlineKeyboardButton("Confirm", callback_data=CONFIRM),
InlineKeyboardButton("Cancel", callback_data=CANCEL)]
]
update.message.reply_text(f"Confirm the trade: '{TelegramBot.build\_trade(user\_data)}'",
reply_markup=InlineKeyboardMarkup(keyboard))
return PROCESS_TRADE
最后,我们构建会话处理程序,设置错误处理程序,并将所有处理程序添加到调度程序中。
def process\_trade(bot, update, user\_data):
query = update.callback_query
if query.data == CONFIRM:
trade = TelegramBot.build_trade(user_data)
self._execute_trade(trade)
update.callback_query.message.reply_text(f'Scheduled: {trade}')
else:
show_help(bot, update)
return END_CONVERSATION
def handle\_error(bot, update, error):
logging.warning('Update "%s" caused error "%s"', update, error)
update.message.reply_text(f'Unexpected error:\n{error}')
# configure our handlers
def build\_conversation\_handler():
entry_handler = CommandHandler('trade', filters=self.private_filter, callback=show_options)
conversation_handler = ConversationHandler(
entry_points=[entry_handler],
fallbacks=[entry_handler],
states={
TRADE_SELECT: [CallbackQueryHandler(process_trade_selection, pass_user_data=True)],
CANCEL_ORD: [CallbackQueryHandler(cancel_order)],
PROCESS_ORD_CANCEL: [MessageHandler(filters=Filters.text, callback=process_order_cancel, pass_user_data=True)],
COIN_NAME: [MessageHandler(filters=Filters.text, callback=process_coin_name, pass_user_data=True)],
AMOUNT: [MessageHandler(Filters.text, callback=process_amount, pass_user_data=True)],
PERCENT_CHANGE: [MessageHandler(Filters.text, callback=process_percent, pass_user_data=True)],
PRICE: [MessageHandler(Filters.text, callback=process_price, pass_user_data=True)],
PROCESS_TRADE: [CallbackQueryHandler(process_trade, pass_user_data=True)],
},
)
return conversation_handler
self.dispatcher.add_handler(CommandHandler('start', filters=self.private_filter, callback=show_help))
self.dispatcher.add_handler(build_conversation_handler())
self.dispatcher.add_error_handler(handle_error)
传递用户数据时允许我们向处理程序提供其他user_data
参数。这样可以确保机器人从一个处理程序传递到另一个处理程序时,保持所有答复的对话状态。我们需要run_async
装饰器在后台执行交易,而又不会阻止机器人对新消息进行响应:
def start\_bot(self):
self.updater.start_polling()
@run\_async
def \_execute\_trade(self, trade):
loop = asyncio.new_event_loop()
task = loop.create_task(self.trade_executor.execute_trade(trade))
loop.run_until_complete(task)
@staticmethod
def build\_trade(user\_data):
current_trade = user_data[TRADE_SELECT]
price = user_data[PRICE]
coin_name = user_data[COIN_NAME]
amount = user_data[AMOUNT]
percent_change = user_data[PERCENT_CHANGE]
if current_trade == LONG_TRADE:
return LongTrade(price, coin_name, amount, percent_change)
elif current_trade == SHORT_TRADE:
return ShortTrade(price, coin_name, amount, percent_change)
else:
raise NotImplementedError
这是用于订单和余额显示的格式化程序:
TITLES = ['idx', 'type', 'remaining', 'symbol', 'price']
SPACING = [4, 6, 8, 10, 8]
def format\_open\_orders(orders) -> str:
def join\_line(ln):
return ' | '.join(str(item).center(SPACING[i]) for i, item in enumerate(ln))
title_line = join_line(TITLES)
lines = [title_line]
for idx, order in enumerate(orders):
line = [idx, order['side'], order['remaining'], order['symbol'], order['price']]
lines.append(join_line(line))
separator_line = '-' * len(title_line)
return f"\n{separator\_line}\n".join(lines)
def format\_order(order):
return f"{order['amount']} {order['symbol']} priced at {order['price']}"
def format\_balance(balance) -> str:
coin_balance_as_list = list(f"{coin}: {val}" for coin, val in balance.items())
return "\n".join(coin_balance_as_list)
最后,我们创建main.py
并将所有内容归结在一起:
import logging
import os
import ccxt
from core.exchange import CryptoExchange
from core.telegrambot import TelegramBot
from core.tradeexcutor import TradeExecutor
if __name__ == '\_\_main\_\_':
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
c_dir = os.path.dirname(__file__)
with open(os.path.join(c_dir, "config/secrets.txt")) as key_file:
api_key, secret, telegram_tkn, user_id = key_file.read().splitlines()
ccxt_ex = ccxt.bitfinex()
ccxt_ex.apiKey = api_key
ccxt_ex.secret = secret
exchange = CryptoExchange(ccxt_ex)
trade_executor = TradeExecutor(exchange)
telegram_bot = TelegramBot(telegram_tkn, user_id, trade_executor)
telegram_bot.start_bot()
我们从secrets.txt
文件中获取交易所密钥,telegram
的token
和用户ID
,构造核心类并启动机器人。使用以下内容在config
文件夹中创建secrets.txt
:
# YOUR\_API\_KEY
# YOUR\_SECRET
# YOUR\_TELEGRAM\_TOKEN
# YOUR\_TELEGRAM\_USER\_ID
总结
对于想要简化交易并拥有更好使用体验的人来说,该机器人更像是一个辅助工具。它不是最先进的算法交易机器人。后面可以进行以下改进:
- 当进行做空交易时获取可用余额并显示用户可以根据余额做空的最大值
- 要求在交易所执行之前验证创建的订单
- 添加TA指标、信号以通知最佳交易时间
- 止盈/止损操作和其他统计数据
- 有策略地根据超时等原因取消订单
Author:MaksimAvdyushkin
本文 源码 获取方式:
公众号底部 回复“ telegram ”