Keyboard Shortcuts
Likes
Search
updateMktDepthL2 missing data points
Hello,
Trying to source market depth data on FX pairs, USD JPY in the below case. In the below code,?self.series[reqId]?is just a python list that I append new ticks to. def updateMktDepthL2(self, reqId, position, marketMaker, operation, side, price, size, isSmartDepth): Then separately I have a running thread in charge of saving once in a while to a parquet file and resetting the list. ... df.to_parquet(parquet_file_name, compression='gzip') The above code works fine most of the time, however once in a while, I get an exception: Length of values (112071) does not match length of index (112105) Trying to debug the Exception, and putting each column into a separate frame I get the below row count per column:
How is that even possible? Note the numbers are different between the exception?112071 and investigation?2151751+, I believe because data kept arriving between the exception catch and during the debugging I performed. |
Can you add 'operation' in your csv ?
|
Thanks Gordon for your reply.
Yes bid is side, side doesn't mean anything to me, but "bid"==1 clearly means we are talking about bid side. Quantity is size, because size is used in many other things in python and I would rather avoid any name overlap. Essentially if price=0 and size=0 it implies operation=2, and with regards to insert or update, it doesn't make any difference to the analysis I am performing. Considering the amount of data to be stored on multiple FX pairs, I would rather not have a redundant operation column even if only uint8. The other thing is, if it was related to the operation field, then we would not have any discrepancy between "bid" and "position", but we have bid/size 2151825 and?position 2151830 which is rather confusing. Also, how can price and quantity/size have a higher count 2151876 than time 2151751?? |
Looking at Ewald's ib_insync implementation below, it does essentially the same thing,
which is put all info in a tuple, and append that tuple to a list, so I am surprised that no one has encountered this issue, but I am even more surprised that this issue even exists in the first place, as it should work, always, no matter what. ? ? def updateMktDepthL2(
? ? ? ? ? ? self, reqId: int, position: int, marketMaker: str, operation: int,
? ? ? ? ? ? side: int, price: float, size: float, isSmartDepth: bool = False):
? ? ? ? # operation: 0 = insert, 1 = update, 2 = delete
? ? ? ? # side: 0 = ask, 1 = bid
? ? ? ? ticker = self.reqId2Ticker[reqId]
?
? ? ? ? dom = ticker.domBids if side else ticker.domAsks
? ? ? ? if operation == 0:
? ? ? ? ? ? dom.insert(position, DOMLevel(price, size, marketMaker))
? ? ? ? elif operation == 1:
? ? ? ? ? ? dom[position] = DOMLevel(price, size, marketMaker)
? ? ? ? elif operation == 2:
? ? ? ? ? ? if position < len(dom):
? ? ? ? ? ? ? ? level = dom.pop(position)
? ? ? ? ? ? ? ? price = level.price
? ? ? ? ? ? ? ? size = 0
?
? ? ? ? tick = MktDepthData(
? ? ? ? ? ? self.lastTime, position, marketMaker, operation, side, price, size)
? ? ? ? ticker.domTicks.append(tick)
? ? ? ? self.pendingTickers.add(ticker) class MktDepthData(NamedTuple):
? ? time: datetime
? ? position: int
? ? marketMaker: str
? ? operation: int
? ? side: int
? ? price: float
? ? size: float
domTicks: List[MktDepthData] = field(default_factory=list) |
On Sat, Oct 28, 2023 at 08:20 AM, John wrote:
I won't comment as to whether or not your deduction here is correct or not. But, I will say that it seems you may be relying on too many assumptions. After all, it's been said that "premature optimization is the root of all evil (or at least most of it) in programming". Therefore, I would suggest you try two things... 1) make your implementation as "correct" as possible according to the API specification & documentation without taking any shortcuts based on assumptions and 2) continue to use ib_insync as a comparison to see if you can reproduce the problem there. Finally, since code interactions can get quite complicated, copying around snippets that are surrounded by ellipsis will only go so far. So, I'd try to make the smallest possible (but complete) standalone unit test in order to really isolate the issue. Having that will also make it easy to present an argument to support people. They can then confirm the existence of bug(s) for themselves. Unlike the justice system, I think any application that exhibits "strange behavior" is presumed guilty until proven innocent (instead of the other way around). Once this is done it should become very clear where the problem lies. And, you'll be able to go back and optimize more systematically. |
when i implemented support for market depth myself, i recall i also had some issues with processing the incoming data. i kept just the book and updated it with the incoming data, but iirc the issue was that sometimes i had to delete an item with index that did not exist. i wasn't sure whether the issue was in my app or the sequence was really broken back then... anyway, in this case, i would even wonder what is the real value of market depth data on forex market.
|
BTW, since you mention this: On Fri, Oct 27, 2023 at 02:54 PM, John wrote:
I'd like to emphasize that the following is an order of magnitude more important: On Sat, Oct 28, 2023 at 05:38 PM, buddy wrote:
|
Thank you both.
So, I'd try to make the smallest possible (but complete)?standalone unit test?in order to really isolate the issue.@Buddy, these code snippets are pretty self explanatory and stand alone really, put the depth bars in a tuple, append the tuple to a list, put the list in a dataframe and save it to a parquet file. Even using a small toy example below with the same logic, there's clearly no way this code flow can fail on its own even if the input is trash. However you might be correct that it could come from an unfortunate thread concurrency, i.e. a huge amount of data being unequally added to the tuple/list at the exact time the dataframe is created, so I might simply need to hard copy the list before converting that copy to a dataframe instead of the list itself. I'll try that next week and keep the group posted. from time import time output: ? ? ? ? ? ?time col1? col2? col3
0? 1.698522e+09? ? 1? ? ?2? ? ?2
1? 1.698522e+09? ? 1? ? ?0? ? ?3
2? 1.698522e+09? ? 1? ? ?1? None
3? 0.000000e+00? ? 0? None? None
anyway, in this case, i would even wonder what is the real value of market depth data on forex market.@ fordfrog, well that's precisely what I am trying to find out! :) but for this I first need to extract and save a substantial amount of data in order to study it. |
On Sat, Oct 28, 2023 at 08:05 PM, John wrote:
Yes, I'm not sure if a deep copy alone will suffice. But... this is what I meant by "make the smallest possible (but complete) standalone unit test". The test should focus on experimentally confirming your uncertain suspicions; it's a judgement call. Naturally there's no need to confirm what you already know. |
When I read your initial post, my first thought was "data corruption due to multi-threading" and it looks like that is where the discussion with Gordon Eldest, buddy, and fordfrog is coming to. Level II data for active instruments can generate a lot of data and, more importantly, bursts of hundreds even thousands of callbacks in peak seconds. Take a look at this post where I shared some stats for our setup. We have days with 2,000,000 callbacks withing five minute windows which is a sustained rate of 6,000 callbacks per second for the entire period. I am not sure how busy USD.JPY is and whether you subscribe to more than one pair, but it is safe to assume that you will have peak seconds with tens or hundreds of callbacks. On the other side, your code seems to have no synchronization between threads at all. So it is just a matter of time until corruption happens. You pointed to some code from Ewald's ib_sync . If you look a little closer to the entire library you will find carefully placed synchronization statements that make sure no data corruption takes place and a design that eliminates globally shared state as much as possible. You will find several posts in the archive about multi-threading, how to do it and that its hard. And there is a lot of truth to the various contributions but let me share some thoughts on architecture and design for (massively) multi-threaded TWS API clients that has worked very well for us and yielded rock stable applications with little or nor data access synchronization. We even have certain applications with 100++ threads that connect to several TWS/IBGW instances simultaneously and process the individual "data firehouses" flawlessly on a multi-processor machine that executes many of them really in parallel. Granted, we develop in Java and the language has a rich set of concurrency features, but all programming languages (including Python) have add-ons or libraries with similar or identical functionality. The following should, therefore, apply equally to all TWS API implementations (and multi-processing applications in general). We had the following goals for our applications:
So here is what we do ... Ruthless separation of concerns, strict need-to-know-only, and only immutable objects At the architecture level we break classes and modules into the smallest possible pieces so that all functions are related to just one topic/domain/subject. Instead of having an EWrapper object accessible to all functions in the application, we have a thin layer of code that wraps TWS API into a "controller" paradigm and groups the various functions into approx 50 small task oriented interfaces such as: Account, Position, Contract, Order, TickByTick, MarketDepth, ... Also, each interface defines classes for the data returned by TWS API callbacks so that each request returns a single immutable object that carries all parameters from the callback. In Java, these classes cannot be extended and instantiated objects cannot be modified once created. This way, objects can be shared side-effect free with many different modules in many parallel threads. The controller completely hides the existence and details of TWS API (such as requestIds) so that the application code only deals with high level objects along the lines of the domain interfaces. The signature of request calls does not have a requestId parameter any longer but callers provide a handler object instead that receives all callbacks and errors for just that request. In other words and closer to your problem, even multiple MarketData subscriptions are completely separate from each other in that the application provides unique handler objects for each of them. The Java implementation of TWS API ships with an ApiController class that shows how this can be implemented. And the good news is that you can develop your controller over time since it only needs to support the requests your applications actually use. No global state and data streams The main reason why a multi-threaded application needs locks/semaphores is to prevent data corruption when multiple threads simultaneously modify some global state or data. In your case, you have a shared global object that is not only modified by the updateMktDepthL2 callback [self.series[reqId].append( ...] and the thread that saves the data [app.series[reqId] = list()] but also by several independent market data streams for different instruments [e.g. indexed by reqId]. If you try to solve your issue with locks and semaphores, that object will become the application's bottleneck since it will be locked all the time effectively reducing parallelism to just about 1. One way to eliminate the need for synchronization is to eliminate the global series object entirely:
This became a little longer than initially intended but hopefully gives you some food for thought for a more powerful and scalable way to solve the data corruption and deal with MarketDepth fire hoses. ´³¨¹°ù²µ±ð²Ô On Sat, Oct 28, 2023 at 02:44 PM, John wrote:
@Buddy, ... However you might be correct that it could come from an unfortunate thread concurrency, i.e. a huge amount of data being unequally added to the tuple/list at the exact time the dataframe is created, so I might simply need to hard copy the list before converting that copy to a dataframe instead of the list itself. I'll try that next week and keep the group posted. |
my design is pretty the same, except much feature-less than what ´³¨¹°ù²µ±ð²Ô and his buddies implemented.
if i should write it as simple as i can, the design is this:
|
IDK how much more of this you want to read about. But, I'll just try to bolster the crux of ´³¨¹°ù²µ±ð²Ô's reply by referring you to Wikipedia regarding the . I'll also add that the "first approach" they describe (re-entrancy, local store, immutable objs) is often the approach people tend to overlook. This may happen because one doesn't need extra tools for the first approach, it's basically a "style". And, I suspect if you show someone a "thing" (mutex, semaphore, etc) they remember it more than if you describe a "way" (don't share data, put everything on the stack, etc). Well, I'm not an educator so that's all conjecture. The second approach, otoh, can't be avoided in some cases... often when dealing with physical resources. Anyway, you should choose the approach which suits your given circumstances. Only experience and judgement will help you know which. I'd err toward the first approach if there are any doubts. But again, the first approach doesn't often come naturally until it's practiced a bit... and sometimes it's not worth the extra mental gymnastics. As usual, YMMV. |
Thank you Jurgen, fordfrog, buddy for your replies, they are brilliant.
toggle quoted message
Show quoted text
I realize now I wasted this forum's time on a data corruption issue due to multithreading, I hope these answers will be useful to others nevertheless. This FX depth script is only for research purposes, the idea is just to collect a few day's worth of data and analyze it to see what alpha can be found if any. So for now I used a bool flag that turns off writing for 0.1 seconds every hour during file saving, that's all and it works nicely for this humble purpose of sample data collection. If/once I establish that this is something worth trading in production, I will definitely take a more robust approach for data collection, and will apply this feedback. I have a larger app already in place for live trading which does have some of the suggested points, but it is incorrect on many levels raised here. The below was especially useful to understand the right design approach, so thanks for that. One way to eliminate the need for synchronization is to eliminate the global series object entirely: |
I would not characterize your post as wasting the forum's time, John. I think it caused a nice collection of valuable information that already helped you and will likely help others when they interface TWS API with a multi-threaded application or attempt to implement completely asynchronous and event driven applications (as they should). This is not that hard, but also not intuitively obvious. And as @buddy pointed out and @fordfrog reiterated, it is mostly about style and structure (architecture, design) and disciplined coding. @fordfrog your description does not sound any less feature rich than what I an using. Pushing multi-threading to a lower level with topic queues is the right way to go and one of my roadmap items. And I fully agree with the use of provider, consumer, and connector interface abstractions to hide the TWS API details, for improved overall modularity, and to manage information flow even in applications that are not (yet ?) multi-threaded. ´³¨¹°ù²µ±ð²Ô On Tue, Oct 31, 2023 at 02:10 PM, John wrote:
Thank you Jurgen, fordfrog, buddy for your replies, they are brilliant. |
I don't know enough about Python writing speed, but I am not sure you need multi-threading, because IB does buffer answers for you (to a limited extent but should be enough even with L2)
so you might make your life simpler doing your experiment with save (append) in same thread, as IB does hide for you it's own receiving thread. Also you ought to to save 'Operation' in your CSV the only safe way to handle case 1,2 at a later stage. |
@fordfrog your description does not sound any less feature rich than what I an using. Pushing multi-threading to a lower level with topic queues is the right way to go and one of my roadmap items. And I fully agree with the use of provider, consumer, and connector interface abstractions to hide the TWS API details, for improved overall modularity, and to manage information flow even in applications that are not (yet ?) multi-threaded.@´³¨¹°ù²µ±ð²Ô what i had on my mind is that your level of professionalism (which manifest through features like the extensive logging, possibility of replay, data analysis, overall debugability and many more) is much higher, the level that i would like to once achieve too, you inspire me :-) |
today i was writing some tests for my own tws api client implementation and i encountered a new message that i did not notice before (i don't work with market depth data for quite a long time though) and it is not mentioned here in this thread anywhere. the message is (copied from my log):
INFO: Received message: ERROR_MESSAGE[2023-11-14 12:57:13.422]: ---n4-2-1-317-Market depth data has been RESET. Please empty deep book contents before applying any new entries.-- so if i get it right from the message text, immediately after this message is received, one should empty the market depth book and start from fresh with empty book. without respecting this message and cleaning the book, the book becomes broken. |
with the new message 317 (book reset) which is related to market depth, i found a "minor" flaw in my design. currently, i put all the info messages in a general queue, but that is not correct. the messages should be put to the topic queue based on the topic they are related to (like market depth topic). it is even more important in this case, where one receives market depth items, then reset message arrives, and then new market depth items arrive. if the book reset message is processed in a different thread, it is not guaranteed that the reset will occur at the correct order. hence, error messages should be processed in the topic thread they are related to to preserve the order in which the messages should be processed.
|