import argparse
import datetime
import logging
import time
import threading
from ibapi.wrapper import EWrapper
from ibapi.client import EClient
from ibapi.common import *
from ibapi.contract import *
from ibapi.ticktype import *
from decimal import Decimal
class TestApp(EWrapper, EClient):
def __init__(self, file_name, market_data_type, data_lines=65, loops=1):
EWrapper.__init__(self)
EClient.__init__(self, wrapper=self)
self.started = False #start only once
self.nextValidOrderId = None
self.reqIdDict = {} #keep track of requests and recieved data
self.data_lines = data_lines # Set the allowed number of subscribtions = "not done" and not cancelled dictionaries
self.market_data_type = market_data_type
self.loops = loops # Number of loops to run the reqMktData_test function
#field names that are required to be present in the dictionary for each reqId to be considered done
self.required_fields = ["bid", "ask", "last", "volume", "last_timestamp", "high", "low"]
#as the sending data and receiving data (callbacks) are different thread, we need a lock to avoid conflicts
self.lock = threading.Lock() # Initialize a lock
self.last_print_time = time.time() - 5 # help to track how often we print the progress, not to spam
#array of symbols and exchanges we want data for
self.data = []
if file_name:
with open(file_name, 'r') as file:
for i, line in enumerate(file):
if i == 0:
continue
symbol, exchange = line.strip().split(',')
self.data.append({"symbol": symbol, "primaryExchange": exchange})
def connectAck(self):
if self.asynchronous:
self.startApi()
#wrapper function, once we have the number, we can start the test
def nextValidId(self, orderId: int):
print("NextValidId received:", orderId)
#make sure the next valid order ID is set to a value greater than the current order ID
#so we can correlate those IDs to the gateway log
try:
with open('next_valid_order_id.txt', 'r') as file:
self.nextValidOrderId = int(file.read().strip())
except Exception:
self.nextValidOrderId = 1000000 # Default value if any error occurs
if orderId >= self.nextValidOrderId:
self.nextValidOrderId = orderId
print("NextValidId set :", self.nextValidOrderId)
super().nextValidId(self.nextValidOrderId)
if self.started:
return
self.started = True
print("Executing requests")
threading.Thread(target=self.reqMktData_test).start()
#herlper, get a next valid order ID, increase the counter and save it to a file
def nextOrderId(self):
oid = self.nextValidOrderId
self.nextValidOrderId += 1
with open('next_valid_order_id.txt', 'w') as file:
file.write(str(self.nextValidOrderId ))
return oid
#main function to test the reqMktData
def reqMktData_test(self):
self.reqMarketDataType(self.market_data_type)
#run the test loops
for i in range(self.loops):
print(f"LOOP {i} START")
self.reqMktData_test_work()
print(f" ALL Tests DONE Ctrl+C to exit ")
def reqMktData_test_work(self):
#reset all the data
with self.lock:
self.reqIdDict = {}
begin_loop = datetime.datetime.now() #help to track how long the work takes
#sent the requests
for item in self.data:
contract = Contract()
contract.symbol = item["symbol"]
contract.secType = "STK"
contract.currency = "USD"
contract.exchange = item["primaryExchange"]
orderId = self.nextOrderId()
with self.lock:
#keep track of the request we sent out, Note: default is "done" : False, "sentCancelMktData" : False
#the wrapper callbacks will update the dictionary with data and eventually set "done" : True and "sentCancelMktData" : True and cancel the request
self.reqIdDict[orderId] = {"reqId" : orderId, "symbol": item["symbol"], "done" : False, "sentCancelMktData" : False, "sent_time" : datetime.datetime.now().isoformat()} # Update the dictionary
self.reqMktData(orderId, contract, "", False, False, [])
#wait if we already sent the maximum number of requests. i.e. all data lines are used
self.monitor_not_done()
#now all sent out. wait for all data to be received or cancelled due to timeout
self.monitor_not_done(finalize=True)
#now all data received or cancelled due to timeout print the results
with self.lock:
# Print the table of failed contracts
not_done_count = sum(1 for v in self.reqIdDict.values() if not v["done"])
print(f" Number of not done symbols: {not_done_count}")
# Print header
print(f"{'reqId':<9} {'symbol':<6} {'done':<5} {'bid':<7} {'ask':<7} {'last':<7} {'volume':<7} {'high':<7} {'low':<7} {'last_timestamp':<20}")
if not_done_count > 0:
for item in self.reqIdDict.values():
if not item["done"]:
print(f"{item['reqId']:<9} {item['symbol']:<6} {item['done']:<5} {item.get('bid', 'N/A'):<7} {item.get('ask', 'N/A'):<7} {item.get('last', 'N/A'):<7} {item.get('volume', 'N/A'):<7} {item.get('high', 'N/A'):<7} {item.get('low', 'N/A'):<7} {item.get('last_timestamp', 'N/A'):<20}")
print(f"DONE reqMktData_test {round((datetime.datetime.now() - begin_loop).total_seconds(), 1)} seconds")
def monitor_not_done(self, finalize=False):
while True:
#count the used data lines (=work_count)
with self.lock:
work_count = sum(1 for v in self.reqIdDict.values() if not v["sentCancelMktData"])
done_count = sum(1 for v in self.reqIdDict.values() if v["done"])
if work_count < (1 if finalize else self.data_lines):
break
#print the progress every 5 seconds
if time.time() - self.last_print_time >= 5:
print(f"{self.nextValidOrderId} Waiting... Symbols in work : {work_count}, done symbols: {done_count}")
self.last_print_time = time.time()
with self.lock:
for k, v in self.reqIdDict.items():
#cancel done contracts to free up resources
time_delta = datetime.datetime.now() - datetime.datetime.fromisoformat(v["sent_time"])
#timeout for contracts that are not done for a long time
if not v["done"] and not v["sentCancelMktData"] and time_delta.total_seconds() > 20:
self.cancelMktData(k)
v["sentCancelMktData"] = True
v["timeout"] = True
v["done_time"] = datetime.datetime.now().isoformat()
print(f"TIMEOUT reqId: {k}, symbol: {v['symbol']} CANCEL TIMEOUT!!!!")
time.sleep(0.1) #sleep a bit to avoid busy waiting
#helper function for wrapper callbacks
def legal_to_process(self, reqId):
#can only process the input which has this reqId AND has not been cancelled
if reqId not in self.reqIdDict:
return False
if self.reqIdDict[reqId].get("sentCancelMktData", False):
return False
return True
#helper function for wrapper callbacks
def mark_as_done(self, reqId):
if not [field for field in self.required_fields if field not in self.reqIdDict[reqId]]:
#no missing fields, mark as done
self.reqIdDict[reqId]["done"] = True
#cancel if not cancelled yet
if self.reqIdDict[reqId]["done"] and not self.reqIdDict[reqId]["sentCancelMktData"]:
self.reqIdDict[reqId]["sentCancelMktData"] = True
self.cancelMktData(reqId)
self.reqIdDict[reqId]["done_time"] = datetime.datetime.now().isoformat()
# reat are wrapper functions, wrapper will call these functions when data is received
def error(self, reqId: TickerId, errorCode: int, errorString: str, advancedOrderRejectJson = ""):
super().error(reqId, errorCode, errorString, advancedOrderRejectJson)
if advancedOrderRejectJson:
print("Error. Id:", reqId, "Code:", errorCode, "Msg:", errorString, "AdvancedOrderRejectJson:", advancedOrderRejectJson)
else:
print("Error. Id:", reqId, "Code:", errorCode, "Msg:", errorString)
with self.lock:
if self.legal_to_process(reqId):
if "errorCode" not in self.reqIdDict[reqId]:
self.reqIdDict[reqId]["errorCode"] = errorCode
self.reqIdDict[reqId]["errorString"] = errorString
self.reqIdDict[reqId]["done"] = True #mark as done, can cancel
self.mark_as_done(reqId)
def tickPrice(self, reqId: TickerId, tickType: TickType, price: float,
attrib: TickAttrib):
with self.lock:
if self.legal_to_process(reqId):
# Update the dictionary with the tickType and price
self.reqIdDict[reqId][TickTypeEnum.toStr(tickType).lower()] = price
self.mark_as_done(reqId)
def tickSize(self, reqId: TickerId, tickType: TickType, size: Decimal):
with self.lock:
if self.legal_to_process(reqId):
self.reqIdDict[reqId][TickTypeEnum.toStr(tickType).lower()] = decimalMaxString(size)
self.mark_as_done(reqId)
def tickGeneric(self, reqId: TickerId, tickType: TickType, value: float):
with self.lock:
if self.legal_to_process(reqId):
self.reqIdDict[reqId][TickTypeEnum.toStr(tickType).lower()] = value
self.mark_as_done(reqId)
def tickString(self, reqId: TickerId, tickType: TickType, value: str):
with self.lock:
if self.legal_to_process(reqId):
tickType_str = TickTypeEnum.toStr(tickType).lower()
value_tmp = value
if tickType_str == "last_timestamp":
value_tmp = datetime.datetime.fromtimestamp(int(value)).strftime('%Y-%m-%d %H:%M:%S')
self.reqIdDict[reqId][tickType_str] = value_tmp
self.mark_as_done(reqId)
def main():
logging.getLogger().setLevel(logging.ERROR)
cmdLineParser = argparse.ArgumentParser("api tests")
cmdLineParser.add_argument("-p", "--port", action="store", type=int,
dest="port", default=4001, help="The TCP port to use")
cmdLineParser.add_argument("-C", "--global-cancel", action="store_true",
dest="global_cancel", default=False,
help="whether to trigger a globalCancel req")
cmdLineParser.add_argument("-f", "--file", action="store", type=str,
dest="file", default="", required=False, help="The name of the file to work with, defualt is empty = no file")
cmdLineParser.add_argument("-m", "--market-data-type", action="store", type=str,
dest="market_data_type", default="REALTIME",
help="The market data type to use (REALTIME, FROZEN, DELAYED, DELAYED_FROZEN)")
cmdLineParser.add_argument("-d", "--data-lines", action="store", type=int,
dest="data_lines", default=65, required=False,
help="The number of data lines to process")
cmdLineParser.add_argument("-l", "--loops", action="store", type=int,
dest="loops", default=1, help="The number of loops to run")
args = cmdLineParser.parse_args()
print("Using args", args)
logging.debug("Using args %s", args)
#need to convert the string to number, as API expects number
market_data_type = getattr(MarketDataTypeEnum, args.market_data_type.upper(), MarketDataTypeEnum.REALTIME)
app = TestApp(args.file, market_data_type, data_lines=args.data_lines, loops=args.loops)
app.setConnectOptions("+PACEAPI")
app.connect("127.0.0.1", args.port, clientId=0)
print("serverVersion:%s connectionTime:%s" % (app.serverVersion(),
app.twsConnectionTime()))
app.run()
if __name__ == "__main__":
main()