Source code for bitfinex.websockets.client

# coding=utf-8
import threading
import json
import hmac
import hashlib
from autobahn.twisted.websocket import WebSocketClientFactory, \
    WebSocketClientProtocol, \
    connectWS
from twisted.internet import reactor, ssl
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet.error import ReactorAlreadyRunning
from bitfinex import utils
from . import abbreviations

# Example used to make send logic
# https://stackoverflow.com/questions/18899515/writing-an-interactive-client-with-twisted-autobahn-websockets


class BitfinexClientProtocol(WebSocketClientProtocol):

    def __init__(self, factory, payload=None):
        super().__init__()
        self.factory = factory
        self.payload = payload

    def onOpen(self):
        self.factory.protocol_instance = self

    def onConnect(self, response):
        if self.payload:
            self.sendMessage(self.payload, isBinary=False)
        # reset the delay after reconnecting
        self.factory.resetDelay()

    def onMessage(self, payload, isBinary):
        if not isBinary:
            try:
                payload_obj = json.loads(payload.decode('utf8'))
            except ValueError:
                pass
            else:
                self.factory.callback(payload_obj)


class BitfinexReconnectingClientFactory(ReconnectingClientFactory):

    # set initial delay to a short time
    initialDelay = 0.1

    maxDelay = 20

    maxRetries = 30


class BitfinexClientFactory(WebSocketClientFactory, BitfinexReconnectingClientFactory):

    def __init__(self, *args, payload=None, **kwargs):
        WebSocketClientFactory.__init__(self, *args, **kwargs)
        self.protocol_instance = None
        self.base_client = None
        self.payload = payload

    protocol = BitfinexClientProtocol
    _reconnect_error_payload = {
        'e': 'error',
        'm': 'Max reconnect retries reached'
    }

    def clientConnectionFailed(self, connector, reason):
        self.retry(connector)
        if self.retries > self.maxRetries:
            self.callback(self._reconnect_error_payload)

    def clientConnectionLost(self, connector, reason):
        self.retry(connector)
        if self.retries > self.maxRetries:
            self.callback(self._reconnect_error_payload)

    def buildProtocol(self, addr):
        return BitfinexClientProtocol(self, payload=self.payload)


class BitfinexSocketManager(threading.Thread):

    STREAM_URL = 'wss://api.bitfinex.com/ws/2'

    def __init__(self):  # client
        """Initialise the BitfinexSocketManager"""
        threading.Thread.__init__(self)
        self.factories = {}
        self._connected_event = threading.Event()
        self._conns = {}
        self._user_timer = None
        self._user_listen_key = None
        self._user_callback = None

    def _start_socket(self, id_, payload, callback):
        if id_ in self._conns:
            return False

        factory_url = self.STREAM_URL
        factory = BitfinexClientFactory(factory_url, payload=payload)
        factory.base_client = self
        factory.protocol = BitfinexClientProtocol
        factory.callback = callback
        factory.reconnect = True
        self.factories[id_] = factory
        reactor.callFromThread(self.add_connection, id_)

    def add_connection(self, id_):
        """
        Convenience function to connect and store the resulting
        connector.
        """
        factory = self.factories[id_]
        context_factory = ssl.ClientContextFactory()
        self._conns[id_] = connectWS(factory, context_factory)

    def stop_socket(self, conn_key):
        """Stop a websocket given the connection key

        Parameters
        ----------
        conn_key : str
            Socket connection key

        Returns
        -------
        str, bool
            connection key string if successful, False otherwise
        """
        if conn_key not in self._conns:
            return

        # disable reconnecting if we are closing
        self._conns[conn_key].factory = WebSocketClientFactory(self.STREAM_URL)
        self._conns[conn_key].disconnect()
        del self._conns[conn_key]

    def run(self):
        try:
            reactor.run(installSignalHandlers=False)
        except ReactorAlreadyRunning:
            # Ignore error about reactor already running
            pass

    def close(self):
        """Close all connections
        """
        keys = set(self._conns.keys())
        for key in keys:
            self.stop_socket(key)
        self._conns = {}


[docs]class WssClient(BitfinexSocketManager): """Websocket client for bitfinex. Parameters ---------- key : str Your API key. secret : str Your API secret .. Hint:: Do not store your key or secret directly in the code. Use environment variables. and fetch them with ``os.environ.get("BITFINEX_KEY")`` """ ########################################################################### # Bitfinex commands ########################################################################### def __init__(self, key=None, secret=None, nonce_multiplier=1.0): # client super().__init__() self.key = key self.secret = secret self.nonce_multiplier = nonce_multiplier
[docs] def stop(self): """Tries to close all connections and finally stops the reactor. Properly stops the program.""" try: self.close() finally: reactor.stop()
def _nonce(self): """Returns a nonce used in authentication. Nonce must be an increasing number, if the API key has been used earlier or other frameworks that have used higher numbers you might need to increase the nonce_multiplier.""" return str(utils.get_nonce(self.nonce_multiplier))
[docs] def authenticate(self, callback, filters=None): """Method used to create an authenticated channel that both recieves account spesific messages and is used to send account spesific messages. So in order to be able to use the new_order method, you have to create a authenticated channel before starting the connection. Parameters ---------- callback : func A function to use to handle incomming messages. This channel wil be handling all messages returned from operations like new_order or cancel_order, so make sure you handle all these messages. filters : List[str] A list of filter strings. See more information here: https://docs.bitfinex.com/v2/docs/ws-auth#section-channel-filters Example ------- :: def handle_account_messages(message): print(message) # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.authenticate( callback=handle_account_messages ) my_client.start() """ nonce = self._nonce() auth_payload = 'AUTH{}'.format(nonce) signature = hmac.new( self.secret.encode(), # settings.API_SECRET.encode() msg=auth_payload.encode('utf8'), digestmod=hashlib.sha384 ).hexdigest() data = { # docs: http://bit.ly/2CEx9bM 'event': 'auth', 'apiKey': self.key, 'authSig': signature, 'authPayload': auth_payload, 'authNonce': nonce, 'calc': 1 } if filters: data['filter'] = filters payload = json.dumps(data, ensure_ascii=False).encode('utf8') return self._start_socket("auth", payload, callback)
[docs] def subscribe_to_ticker(self, symbol, callback): """Subscribe to the passed symbol ticks data channel. Parameters ---------- symbol : str Symbol to request data for. callback : func A function to use to handle incomming messages Example ------- :: def my_handler(message): # Here you can do stuff with the messages print(message) # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.authenticate(print) my_client.subscribe_to_ticker( symbol="BTCUSD", callback=my_handler ) my_client.start() """ symbol = utils.order_symbol(symbol) id_ = "_".join(["ticker", symbol]) data = { 'event': 'subscribe', 'channel': 'ticker', 'symbol': symbol, } payload = json.dumps(data, ensure_ascii=False).encode('utf8') return self._start_socket(id_, payload, callback)
[docs] def subscribe_to_trades(self, symbol, callback): """Subscribe to the passed symbol trades data channel. Parameters ---------- symbol : str Symbol to request data for. callback : func A function to use to handle incomming messages Example ------- :: def my_handler(message): # Here you can do stuff with the messages print(message) # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.authenticate(print) my_client.subscribe_to_trades( symbol="BTCUSD", callback=my_handler ) my_client.start() """ symbol = utils.order_symbol(symbol) id_ = "_".join(["trades", symbol]) data = { 'event': 'subscribe', 'channel': 'trades', 'symbol': symbol, } payload = json.dumps(data, ensure_ascii=False).encode('utf8') return self._start_socket(id_, payload, callback)
# Precision: R0, P0, P1, P2, P3
[docs] def subscribe_to_orderbook(self, symbol, precision, callback): """Subscribe to the orderbook of a given symbol. Parameters ---------- symbol : str Symbol to request data for. precision : str Accepted values as strings {R0, P0, P1, P2, P3} callback : func A function to use to handle incomming messages Example ------- :: def my_handler(message): # Here you can do stuff with the messages print(message) # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.subscribe_to_orderbook( symbol="BTCUSD", precision="P1", callback=my_handler ) my_client.start() """ symbol = utils.order_symbol(symbol) id_ = "_".join(["order", symbol]) data = { 'event': 'subscribe', "channel": "book", "prec": precision, 'symbol': symbol, } payload = json.dumps(data, ensure_ascii=False).encode('utf8') return self._start_socket(id_, payload, callback)
[docs] def subscribe_to_candles(self, symbol, timeframe, callback): """Subscribe to the passed symbol's OHLC data channel. Parameters ---------- symbol : str Symbol to request data for timeframe : str Accepted values as strings {1m, 5m, 15m, 30m, 1h, 3h, 6h, 12h, 1D, 7D, 14D, 1M} callback : func A function to use to handle incomming messages Returns ------- str The socket identifier. Example ------- :: def my_candle_handler(message): # Here you can do stuff with the candle bar messages print(message) # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.subscribe_to_candles( symbol="BTCUSD", timeframe="1m", callback=my_candle_handler ) my_client.subscribe_to_candles( symbol="ETHUSD", timeframe="5m", callback=my_candle_handler ) my_client.start() """ valid_tfs = ['1m', '5m', '15m', '30m', '1h', '3h', '6h', '12h', '1D', '7D', '14D', '1M'] if timeframe: if timeframe not in valid_tfs: raise ValueError("timeframe must be any of %s" % valid_tfs) else: timeframe = '1m' identifier = ('candles', symbol, timeframe) id_ = "_".join(identifier) symbol = utils.order_symbol(symbol) key = 'trade:' + timeframe + ':' + symbol data = { 'event': 'subscribe', 'channel': 'candles', 'key': key, } payload = json.dumps(data, ensure_ascii=False).encode('utf8') return self._start_socket(id_, payload, callback)
[docs] def ping(self, channel="auth"): """Ping bitfinex. Parameters ---------- channel : str What channel id that should be pinged. Default "auth". To get channel id’s use ´client._conns.keys()´. """ client_cid = utils.create_cid() data = { 'event': 'ping', 'cid': client_cid } payload = json.dumps(data, ensure_ascii=False).encode('utf8') self.factories[channel].protocol_instance.sendMessage(payload, isBinary=False) return client_cid
[docs] def new_order_op(self, order_type, symbol, amount, price, price_trailing=None, price_aux_limit=None, price_oco_stop=None, hidden=0, flags=None, tif=None, set_cid=True): """Create new order operation Parameters ---------- order_type : str Order type. Must be one of: "LIMIT", "STOP", "MARKET", "TRAILING STOP", "FOK", "STOP LIMIT" or equivelent with "EXCHANGE" prepended to it. All orders starting with EXCHANGE are made on the exchange wallet. Orders without it is made on the margin wallet and will start or change a position. symbol : str The currency symbol to be traded. e.g. BTCUSD amount : decimal str The amount to be traided. price : decimal str The price to buy at. Will be ignored for market orders. price_trailing : decimal string The trailing price price_aux_limit : decimal string Auxiliary Limit price (for STOP LIMIT) price_oco_stop : decimal string OCO stop price hidden : bool Whether or not to use the hidden order type. flags : list A list of integers for the different flags. Will be added together into a unique integer. tif : datetime string set_cid : bool wheter or not to set a cid. Returns ------- dict A dict containing the order detials. Used in new_order and for creating multiorders. Example ------- Note if you only want to create a new order, use the ´´new_order´´ method bellow. However if you want to submitt multiple order and cancel orders at the same time use this method to create order operations and send them with the ``multi_order`` method:: # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.authenticate() my_client.start() order_operation = my_client.new_order_op( order_type="LIMIT", symbol="BTCUSD", amount=0.004, price=1000.0 ) # Usefull to keep track of an order by its client id, for later # operations (like cancel order). clinet_id = order_operation["cid"] my_client.multi_order( operations=[order_operation] ) """ flags = flags or [] order_op = { 'type': order_type, 'symbol': utils.order_symbol(symbol), 'amount': amount, 'price': price, 'hidden': hidden, 'flags': sum(flags), 'meta': { 'aff_code': 'b2UR2iQr' } } if price_trailing: order_op['price_trailing'] = price_trailing if price_aux_limit: order_op['price_aux_limit'] = price_aux_limit if price_oco_stop: order_op['price_oco_stop'] = price_oco_stop if tif: order_op['tif'] = tif if set_cid: client_order_id = utils.create_cid() order_op['cid'] = client_order_id order_op = [ abbreviations.get_notification_code('order new'), order_op ] return order_op
[docs] def new_order(self, order_type, symbol, amount, price, price_trailing=None, price_aux_limit=None, price_oco_stop=None, hidden=0, flags=None, tif=None, set_cid=True): """ Create new order. Parameters ---------- order_type : str Order type. Must be one of: "LIMIT", "STOP", "MARKET", "TRAILING STOP", "FOK", "STOP LIMIT" or equivelent with "EXCHANGE" prepended to it. All orders starting with EXCHANGE are made on the exchange wallet. Orders without it is made on the margin wallet and will start or change a position. symbol : str The currency symbol to be traded. e.g. BTCUSD amount : decimal string The amount to be traided. price : decimal string The price to buy at. Will be ignored for market orders. price_trailing : decimal string The trailing price price_aux_limit : decimal string Auxiliary Limit price (for STOP LIMIT) price_oco_stop : decimal string OCO stop price hidden : bool Whether or not to use the hidden order type. flags : list A list of integers for the different flags. Will be added together into a unique integer. tif : datetime string set_cid : bool wheter or not to set a cid. Returns ------- int Order client id (cid). The CID is also a mts date stamp of when the order was created. Example ------- :: # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.authenticate() my_client.start() order_client_id = my_client.new_order( order_type="LIMIT", symbol="BTCUSD", amount=0.004, price=1000.0 ) """ operation = self.new_order_op( order_type=order_type, symbol=symbol, amount=amount, price=price, price_trailing=price_trailing, price_aux_limit=price_aux_limit, price_oco_stop=price_oco_stop, hidden=hidden, flags=flags, tif=tif, set_cid=set_cid ) data = [0, operation[0], None, operation[1]] payload = json.dumps(data, ensure_ascii=False).encode('utf8') self.factories["auth"].protocol_instance.sendMessage(payload, isBinary=False) if set_cid is True: return operation[1]["cid"] else: return None
[docs] def multi_order(self, operations): """Multi order operation. Parameters ---------- operations : list a list of operations. Read more here: https://bitfinex.readme.io/v2/reference#ws-input-order-multi-op Hint. you can use the self.new_order_op() for easy new order operation creation. Returns ------- list A list of all the client ids created for each order. Returned in the order they are given to the method. Example ------- :: # You should only need to create and authenticate a client once. # Then simply reuse it later from bitfinex import utils my_client = WssClient(key, secret) my_client.authenticate() my_client.start() example_order_cid_to_cancel = 153925861909296 # docs: http://bit.ly/2BVqwW6 cancel_order_operation = { 'cid': example_order_cid_to_cancel, 'cid_date': utils.cid_to_date(example_order_cid_to_cancel) } new_order_operation = my_client.new_order_op( order_type="LIMIT", symbol="BTCUSD", amount="0.004", price="1000.0" ) order_client_id = my_client.multi_order([ cancel_order_operation, new_order_operation ]) """ data = [ 0, abbreviations.get_notification_code('order multi-op'), None, operations ] payload = json.dumps(data, ensure_ascii=False).encode('utf8') self.factories["auth"].protocol_instance.sendMessage(payload, isBinary=False) return [order[1].get("cid", None) for order in operations]
[docs] def cancel_order(self, order_id): """Cancel order Parameters ---------- order_id : int, str Order id created by Bitfinex Example ------- :: # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.authenticate() my_client.start() my_client.cancel_order( order_id=1234 ) """ data = [ 0, abbreviations.get_notification_code('order cancel'), None, { # docs: http://bit.ly/2BVqwW6 'id': order_id } ] payload = json.dumps(data, ensure_ascii=False).encode('utf8') self.factories["auth"].protocol_instance.sendMessage(payload, isBinary=False)
[docs] def cancel_order_cid(self, order_cid, order_date): """Cancel order using the client id and the date of the cid. Both are returned from the new_order command from this library. Parameters ---------- order_cid : str cid string. e.g. "1234154" order_date : str Iso formated order date. e.g. "2012-01-23" Example ------- :: # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.authenticate() my_client.start() # order_cid created by this library is always a milliseconds # time stamp. So you can just divide it by 1000 to get the timestamp. my_client.cancel_order( order_cid=1538911910035, order_date=( datetime.utcfromtimestamp( 1538911910035/1000.0 ).strftime("%Y-%m-%d") ) ) """ data = [ 0, abbreviations.get_notification_code('order cancel'), None, { # docs: http://bit.ly/2BVqwW6 'cid': order_cid, 'cid_date': order_date } ] payload = json.dumps(data, ensure_ascii=False).encode('utf8') self.factories["auth"].protocol_instance.sendMessage(payload, isBinary=False)
[docs] def update_order(self, **order_settings): """Update order using the order id Parameters ---------- id : int64 Order ID gid : int32 Group Order ID price : decimal string Price amount : decimal string Amount delta : decimal string Change of amount price_aux_limit : decimal string Auxiliary limit price price_trailing : decimal string Trailing price delta tif : datetime string Time-In-Force: datetime for automatic order cancellation (ie. 2020-01-01 10:45:23) """ data = [ 0, abbreviations.get_notification_code('order update'), None, order_settings ] payload = json.dumps(data, ensure_ascii=False).encode('utf8') self.factories["auth"].protocol_instance.sendMessage(payload, isBinary=False)
[docs] def calc(self, *calculations): """ This message will be used by clients to trigger specific calculations, so we don't end up in calculating data that is not usually needed. You can request calculations to the websocket server that sends you the same message, with the required fields. List items must be one of the following: - margin_sym_SYMBOL (e.g. margin_sym_tBTCUSD) - funding_sym_SYMBOL - position_SYMBOL - wallet_WALLET-TYPE_CURRENCY Parameters ---------- *calculations : list list of calculations wanted Returns ------- None Data is returned over the auth channel. See the abbreviation glossary: https://docs.bitfinex.com/v2/docs/abbreviations-glossary Examples -------- :: # You should only need to create and authenticate a client once. # Then simply reuse it later my_client = WssClient(key, secret) my_client.authenticate(print) my_client.start() my_client.calc(["margin_sym_tBTCUSD", "funding_sym_fUSD"]) my_client.calc(["margin_sym_tBTCUSD"]) my_client.calc(["position_tBTCUSD"]) my_client.calc(["wallet_exachange_USD"]) .. Note:: Calculations are on demand, so no more streaming of unnecessary data. Websocket server allows up to 30 calculations per batch. If the client sends too many concurrent requests (or tries to spam) requests, it will receive an error and potentially a disconnection. The Websocket server performs a maximum of 8 calculations per second per client. """ data = [ 0, 'calc', None, calculations ] payload = json.dumps(data, ensure_ascii=False).encode('utf8') self.factories["auth"].protocol_instance.sendMessage(payload, isBinary=False)