开云体育

ctrl + shift + ? for shortcuts
© 2025 开云体育

Re: ibapi nextValidId not always invoked


 

thank you @ScottBrian,
this is indeed my first time to deal with threads.
my program is not intended to stop receiving data, but to handle as much of it.

about your solution:
i see the run() method of EReader is abruptly ending when the connection falls,
thus i think it may not snatch responses post disconnection.
to my satisfaction, data isn't being lost, as client.reader is pushing it's input to client.msg_queue, q that survives disconnection.

a friend (also brian) suggested on stackoverflow to join and stop after disconnection the data consumption loop, the EClient.run().
without very specific explanation, this solution actually works.
probably, some later handling of the messages in the queue, does need the now closed connection.

here is the working code. i'd like to set the syntax highlighting too:

?
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import MarketDataTypeEnum
from ibapi.errors import *
?
connection_errors = (
CONNECT_FAIL, UPDATE_TWS, NOT_CONNECTED, UNKNOWN_ID, UNSUPPORTED_VERSION,?
#BAD_LENGTH, BAD_MESSAGE,?
SOCKET_EXCEPTION, FAIL_CREATE_SOCK, SSL_FAIL,?
)
connection_error_codes = [error.code() for error in connection_errors]
?
?
import threading
import time
import logging
?
?
class IBapi(EWrapper, EClient):
?
nextorderId = None
ka_interval = None
?
clientid = None
hostname = None
portno = None
?
MarketDataType = None
onConnected = None
?
def __init__(
self, clientid, hostname='127.0.0.1', portno=7497,?
MarketDataType=None, onConnected=None, ka_interval=3):
?
self.clientid = clientid
self.hostname = hostname
self.portno = portno
self.ka_interval = ka_interval
?
EClient.__init__(self, self)
?
self.onConnected = onConnected
self.MarketDataType = MarketDataType or MarketDataTypeEnum.DELAYED
?
self.host_connect()
?
# Initialise the threads for various components
self._thread_ka = threading.Thread(target=self.keepAlive)
self._thread_ka.start()
?
def host_connect(self):
"""Connects to TWS with the appropriate connection parameters"""
if not self.hostname or not self.portno:
logging.error(f'hostname {self.hostname} or portno {self.portno} not [yet] defined')
return
super().connect(self.hostname, self.portno, self.clientid)
self._thread = threading.Thread(target=self.run)
self._thread.start()
?
def error(self, reqId, errorCode, errorString):
"""disconnect to handle communications errors"""
# clean the connection status
if errorCode in connection_error_codes and \
self.connState not in (EClient.CONNECTING,):
logging.error(
f'disconnect on connection_error {errorCode} "{errorString}"')
self.disconnect()
if hasattr(self, "_thread"):
self._thread.join(5)
time.sleep(5)
return super().error(reqId, errorCode, errorString)
?
def keepAlive(self):
data_lock = threading.Lock()
while self.ka_interval:
time.sleep(self.ka_interval)
connState = None
with data_lock:
connState = self.connState
isConnected = connState == EClient.CONNECTED
logging.error(f'is connected: {isConnected}')
if not isConnected:
isConnecting = connState == EClient.CONNECTING
if not isConnecting:
logging.error(f"let's connect")
self.host_connect()
else:
logging.error(f'already connecting')
else:
logging.error(f'requesting CurrentTime for keepAlive')
self.reqCurrentTime()
self.reqIds(1)
?
def host_connected(self):
self.reqMarketDataType(self.MarketDataType)
self.reqPositions()
?
def nextValidId(self, orderId):
print('====================================================')
logging.error(f'The next valid order id is: {orderId}')
print('====================================================')
super().nextValidId(orderId)
self.nextorderId = orderId
self.host_connected()
?
port_TWS_Live = 7496
port_IBGateway_Live = 4001
port_TWS_Simulated = 7497
port_IBGateway_Simulated = 4002
?
def main():
?
logging.basicConfig(
format='%(levelname)s:%(asctime)s:%(message)s', level=logging.WARN)
?
logging.info('Started')
?
app = IBapi(
1234,?
#portno=port_TWS_Live,?
portno=port_TWS_Simulated,?
)
?
logging.info('Finished')
?
if __name__ == '__main__':
main()
?

Join [email protected] to automatically receive all group messages.