?
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import MarketDataTypeEnum
from ibapi.errors import *
?
connection_errors = (
CONNECT_FAIL, UPDATE_TWS, NOT_CONNECTED, UNKNOWN_ID, UNSUPPORTED_VERSION,?
#BAD_LENGTH, BAD_MESSAGE,?
SOCKET_EXCEPTION, FAIL_CREATE_SOCK, SSL_FAIL,?
)
connection_error_codes = [error.code() for error in connection_errors]
?
?
import threading
import time
import logging
?
?
class IBapi(EWrapper, EClient):
?
nextorderId = None
ka_interval = None
?
clientid = None
hostname = None
portno = None
?
MarketDataType = None
onConnected = None
?
def __init__(
self, clientid, hostname='127.0.0.1', portno=7497,?
MarketDataType=None, onConnected=None, ka_interval=3):
?
self.clientid = clientid
self.hostname = hostname
self.portno = portno
self.ka_interval = ka_interval
?
EClient.__init__(self, self)
?
self.onConnected = onConnected
self.MarketDataType = MarketDataType or MarketDataTypeEnum.DELAYED
?
self.host_connect()
?
# Initialise the threads for various components
self._thread_ka = threading.Thread(target=self.keepAlive)
self._thread_ka.start()
?
def host_connect(self):
"""Connects to TWS with the appropriate connection parameters"""
if not self.hostname or not self.portno:
logging.error(f'hostname {self.hostname} or portno {self.portno} not [yet] defined')
return
super().connect(self.hostname, self.portno, self.clientid)
self._thread = threading.Thread(target=self.run)
self._thread.start()
?
def error(self, reqId, errorCode, errorString):
"""disconnect to handle communications errors"""
# clean the connection status
if errorCode in connection_error_codes and \
self.connState not in (EClient.CONNECTING,):
logging.error(
f'disconnect on connection_error {errorCode} "{errorString}"')
self.disconnect()
if hasattr(self, "_thread"):
self._thread.join(5)
time.sleep(5)
return super().error(reqId, errorCode, errorString)
?
def keepAlive(self):
data_lock = threading.Lock()
while self.ka_interval:
time.sleep(self.ka_interval)
connState = None
with data_lock:
connState = self.connState
isConnected = connState == EClient.CONNECTED
logging.error(f'is connected: {isConnected}')
if not isConnected:
isConnecting = connState == EClient.CONNECTING
if not isConnecting:
logging.error(f"let's connect")
self.host_connect()
else:
logging.error(f'already connecting')
else:
logging.error(f'requesting CurrentTime for keepAlive')
self.reqCurrentTime()
self.reqIds(1)
?
def host_connected(self):
self.reqMarketDataType(self.MarketDataType)
self.reqPositions()
?
def nextValidId(self, orderId):
print('====================================================')
logging.error(f'The next valid order id is: {orderId}')
print('====================================================')
super().nextValidId(orderId)
self.nextorderId = orderId
self.host_connected()
?
port_TWS_Live = 7496
port_IBGateway_Live = 4001
port_TWS_Simulated = 7497
port_IBGateway_Simulated = 4002
?
def main():
?
logging.basicConfig(
format='%(levelname)s:%(asctime)s:%(message)s', level=logging.WARN)
?
logging.info('Started')
?
app = IBapi(
1234,?
#portno=port_TWS_Live,?
portno=port_TWS_Simulated,?
)
?
logging.info('Finished')
?
if __name__ == '__main__':
main()
?