import argparse import datetime import logging import time import threading from ibapi.wrapper import EWrapper from ibapi.client import EClient from ibapi.common import * from ibapi.contract import * from ibapi.ticktype import * from decimal import Decimal class TestApp(EWrapper, EClient): def __init__(self, file_name, market_data_type, data_lines=65, loops=1): EWrapper.__init__(self) EClient.__init__(self, wrapper=self) self.started = False #start only once self.nextValidOrderId = None self.reqIdDict = {} #keep track of requests and recieved data self.data_lines = data_lines # Set the allowed number of subscribtions = "not done" and not cancelled dictionaries self.market_data_type = market_data_type self.loops = loops # Number of loops to run the reqMktData_test function #field names that are required to be present in the dictionary for each reqId to be considered done self.required_fields = ["bid", "ask", "last", "volume", "last_timestamp", "high", "low"] #as the sending data and receiving data (callbacks) are different thread, we need a lock to avoid conflicts self.lock = threading.Lock() # Initialize a lock self.last_print_time = time.time() - 5 # help to track how often we print the progress, not to spam #array of symbols and exchanges we want data for self.data = [] if file_name: with open(file_name, 'r') as file: for i, line in enumerate(file): if i == 0: continue symbol, exchange = line.strip().split(',') self.data.append({"symbol": symbol, "primaryExchange": exchange}) def connectAck(self): if self.asynchronous: self.startApi() #wrapper function, once we have the number, we can start the test def nextValidId(self, orderId: int): print("NextValidId received:", orderId) #make sure the next valid order ID is set to a value greater than the current order ID #so we can correlate those IDs to the gateway log try: with open('next_valid_order_id.txt', 'r') as file: self.nextValidOrderId = int(file.read().strip()) except Exception: self.nextValidOrderId = 1000000 # Default value if any error occurs if orderId >= self.nextValidOrderId: self.nextValidOrderId = orderId print("NextValidId set :", self.nextValidOrderId) super().nextValidId(self.nextValidOrderId) if self.started: return self.started = True print("Executing requests") threading.Thread(target=self.reqMktData_test).start() #herlper, get a next valid order ID, increase the counter and save it to a file def nextOrderId(self): oid = self.nextValidOrderId self.nextValidOrderId += 1 with open('next_valid_order_id.txt', 'w') as file: file.write(str(self.nextValidOrderId )) return oid #main function to test the reqMktData def reqMktData_test(self): self.reqMarketDataType(self.market_data_type) #run the test loops for i in range(self.loops): print(f"LOOP {i} START") self.reqMktData_test_work() print(f" ALL Tests DONE Ctrl+C to exit ") def reqMktData_test_work(self): #reset all the data with self.lock: self.reqIdDict = {} begin_loop = datetime.datetime.now() #help to track how long the work takes #sent the requests for item in self.data: contract = Contract() contract.symbol = item["symbol"] contract.secType = "STK" contract.currency = "USD" contract.exchange = item["primaryExchange"] orderId = self.nextOrderId() with self.lock: #keep track of the request we sent out, Note: default is "done" : False, "sentCancelMktData" : False #the wrapper callbacks will update the dictionary with data and eventually set "done" : True and "sentCancelMktData" : True and cancel the request self.reqIdDict[orderId] = {"reqId" : orderId, "symbol": item["symbol"], "done" : False, "sentCancelMktData" : False, "sent_time" : datetime.datetime.now().isoformat()} # Update the dictionary self.reqMktData(orderId, contract, "", False, False, []) #wait if we already sent the maximum number of requests. i.e. all data lines are used self.monitor_not_done() #now all sent out. wait for all data to be received or cancelled due to timeout self.monitor_not_done(finalize=True) #now all data received or cancelled due to timeout print the results with self.lock: # Print the table of failed contracts not_done_count = sum(1 for v in self.reqIdDict.values() if not v["done"]) print(f" Number of not done symbols: {not_done_count}") # Print header print(f"{'reqId':<9} {'symbol':<6} {'done':<5} {'bid':<7} {'ask':<7} {'last':<7} {'volume':<7} {'high':<7} {'low':<7} {'last_timestamp':<20}") if not_done_count > 0: for item in self.reqIdDict.values(): if not item["done"]: print(f"{item['reqId']:<9} {item['symbol']:<6} {item['done']:<5} {item.get('bid', 'N/A'):<7} {item.get('ask', 'N/A'):<7} {item.get('last', 'N/A'):<7} {item.get('volume', 'N/A'):<7} {item.get('high', 'N/A'):<7} {item.get('low', 'N/A'):<7} {item.get('last_timestamp', 'N/A'):<20}") print(f"DONE reqMktData_test {round((datetime.datetime.now() - begin_loop).total_seconds(), 1)} seconds") def monitor_not_done(self, finalize=False): while True: #count the used data lines (=work_count) with self.lock: work_count = sum(1 for v in self.reqIdDict.values() if not v["sentCancelMktData"]) done_count = sum(1 for v in self.reqIdDict.values() if v["done"]) if work_count < (1 if finalize else self.data_lines): break #print the progress every 5 seconds if time.time() - self.last_print_time >= 5: print(f"{self.nextValidOrderId} Waiting... Symbols in work : {work_count}, done symbols: {done_count}") self.last_print_time = time.time() with self.lock: for k, v in self.reqIdDict.items(): #cancel done contracts to free up resources time_delta = datetime.datetime.now() - datetime.datetime.fromisoformat(v["sent_time"]) #timeout for contracts that are not done for a long time if not v["done"] and not v["sentCancelMktData"] and time_delta.total_seconds() > 20: self.cancelMktData(k) v["sentCancelMktData"] = True v["timeout"] = True v["done_time"] = datetime.datetime.now().isoformat() print(f"TIMEOUT reqId: {k}, symbol: {v['symbol']} CANCEL TIMEOUT!!!!") time.sleep(0.1) #sleep a bit to avoid busy waiting #helper function for wrapper callbacks def legal_to_process(self, reqId): #can only process the input which has this reqId AND has not been cancelled if reqId not in self.reqIdDict: return False if self.reqIdDict[reqId].get("sentCancelMktData", False): return False return True #helper function for wrapper callbacks def mark_as_done(self, reqId): if not [field for field in self.required_fields if field not in self.reqIdDict[reqId]]: #no missing fields, mark as done self.reqIdDict[reqId]["done"] = True #cancel if not cancelled yet if self.reqIdDict[reqId]["done"] and not self.reqIdDict[reqId]["sentCancelMktData"]: self.reqIdDict[reqId]["sentCancelMktData"] = True self.cancelMktData(reqId) self.reqIdDict[reqId]["done_time"] = datetime.datetime.now().isoformat() # reat are wrapper functions, wrapper will call these functions when data is received def error(self, reqId: TickerId, errorCode: int, errorString: str, advancedOrderRejectJson = ""): super().error(reqId, errorCode, errorString, advancedOrderRejectJson) if advancedOrderRejectJson: print("Error. Id:", reqId, "Code:", errorCode, "Msg:", errorString, "AdvancedOrderRejectJson:", advancedOrderRejectJson) else: print("Error. Id:", reqId, "Code:", errorCode, "Msg:", errorString) with self.lock: if self.legal_to_process(reqId): if "errorCode" not in self.reqIdDict[reqId]: self.reqIdDict[reqId]["errorCode"] = errorCode self.reqIdDict[reqId]["errorString"] = errorString self.reqIdDict[reqId]["done"] = True #mark as done, can cancel self.mark_as_done(reqId) def tickPrice(self, reqId: TickerId, tickType: TickType, price: float, attrib: TickAttrib): with self.lock: if self.legal_to_process(reqId): # Update the dictionary with the tickType and price self.reqIdDict[reqId][TickTypeEnum.toStr(tickType).lower()] = price self.mark_as_done(reqId) def tickSize(self, reqId: TickerId, tickType: TickType, size: Decimal): with self.lock: if self.legal_to_process(reqId): self.reqIdDict[reqId][TickTypeEnum.toStr(tickType).lower()] = decimalMaxString(size) self.mark_as_done(reqId) def tickGeneric(self, reqId: TickerId, tickType: TickType, value: float): with self.lock: if self.legal_to_process(reqId): self.reqIdDict[reqId][TickTypeEnum.toStr(tickType).lower()] = value self.mark_as_done(reqId) def tickString(self, reqId: TickerId, tickType: TickType, value: str): with self.lock: if self.legal_to_process(reqId): tickType_str = TickTypeEnum.toStr(tickType).lower() value_tmp = value if tickType_str == "last_timestamp": value_tmp = datetime.datetime.fromtimestamp(int(value)).strftime('%Y-%m-%d %H:%M:%S') self.reqIdDict[reqId][tickType_str] = value_tmp self.mark_as_done(reqId) def main(): logging.getLogger().setLevel(logging.ERROR) cmdLineParser = argparse.ArgumentParser("api tests") cmdLineParser.add_argument("-p", "--port", action="store", type=int, dest="port", default=4001, help="The TCP port to use") cmdLineParser.add_argument("-C", "--global-cancel", action="store_true", dest="global_cancel", default=False, help="whether to trigger a globalCancel req") cmdLineParser.add_argument("-f", "--file", action="store", type=str, dest="file", default="", required=False, help="The name of the file to work with, defualt is empty = no file") cmdLineParser.add_argument("-m", "--market-data-type", action="store", type=str, dest="market_data_type", default="REALTIME", help="The market data type to use (REALTIME, FROZEN, DELAYED, DELAYED_FROZEN)") cmdLineParser.add_argument("-d", "--data-lines", action="store", type=int, dest="data_lines", default=65, required=False, help="The number of data lines to process") cmdLineParser.add_argument("-l", "--loops", action="store", type=int, dest="loops", default=1, help="The number of loops to run") args = cmdLineParser.parse_args() print("Using args", args) logging.debug("Using args %s", args) #need to convert the string to number, as API expects number market_data_type = getattr(MarketDataTypeEnum, args.market_data_type.upper(), MarketDataTypeEnum.REALTIME) app = TestApp(args.file, market_data_type, data_lines=args.data_lines, loops=args.loops) app.setConnectOptions("+PACEAPI") app.connect("127.0.0.1", args.port, clientId=0) print("serverVersion:%s connectionTime:%s" % (app.serverVersion(), app.twsConnectionTime())) app.run() if __name__ == "__main__": main()