TWS and Akka actors

Interactive Broker’s TWS, Scala and Akka

Recently, I was writing a small Portfolio Optimizer that could automatically create and issue trades through Interactive Broker’s TWS API. At first, I was simply following the guide and used the provided Wrapper to implement the callbacks for gathering any information. For example, if one wants to know the current positions in the portfolio, one has to implement the position function
public void position(String account, Contract contract, double pos, double avgCost)
and gather the information for each position separately. The stream of information is over, once the callback function positionEnd is called
public void positionEnd()

This pattern is repeated for all other concerns like trades, market data, contract details and so on. As this becomes quite cumbersome quickly, I wanted to abstract away the caching and retrieval of these kind of results.

As I was using Scala and had some prior experience with akka actors in a similar situation, I decided to use the same setup. Below I will quickly summarize the solution so anybody else who will come across the same problem has a good first starting point.

Prerequisites

I was using Scala 2.13.5, akka-actor-typed 2.6.16 and tws-api 9.73.01. I have downloaded the source for the TWS client from here. For some reason the download page says version 9.76 but in the code the version is 9.73.01. Go to the directory

twsapi_macunix/IBJts/source/JavaClient/

and adjust the version in the pom file by stripping away the “-SNAPSHOT” part. Then, build and install the client locally with maven.

Basic Idea

To use the TWS API, one usually sends a request ID, which is also sent along in the callback calls. So we want a cache that uses the request ID as the key. We also need a way to signal that streaming of information has stopped. So we use the following classes to model a linked list with the results.

sealed trait ResultList[T] object ResultList { case class Head[T]() extends ResultList[T] case class Item[T](item: T, list: ResultList[T]) extends ResultList[T] case class End[T](list: ResultList[T]) extends ResultList[T] { def toSeq: Either[String, Seq[T]] = { @tailrec def walk(resultList: ResultList[T], result: Seq[T] = List.empty): Either[String, Seq[T]] = resultList match { case Head() => Right(result) case Item(item, rest) => walk(rest, item +: result) case Error(msg) => Left(msg) case End(_) => Left(s"End can only have Item or Head but got another End") } walk(list) } } case class Error[T](msg: String) extends ResultList[T] def add[T](item: T, list: ResultList[T]): Either[String, ResultList[T]] = list match { case End(_) => Left("Can only add an item to a non-End result list") case _ => Right(Item(item, list)) } }

We have it here specific per type but we will actually store them all in the same map no matter the type (one could maybe make this smarter).

The Head is used for starting a new list, the Item contains an item and the End gives the end of the list and indicates that no more data is expected. If there is an error it can be signalled as well with Error.

If you want to obtain the results as a Seq, you check if the value for a certain key is End (if not wait). Then, calling toSeq, you walk along the list, picking up the items and putting them into a Seq until you hit Head. If there is an Error, return it.

There is also a small add function to add a new item onto a list.

Next, we define the messages api.

object api { trait Message case class RegisterResult[T](reqId: Int, result: T) extends Message { type Result = T } case class RegisterEnd(reqId: Int) extends Message case class RegisterError(reqId: Int, message: String) extends Message case class ResultRequest(reqId: Int, replyTo: ActorRef[Message], counter: Int = 0) extends Message case class ResultMessage[T](results: Either[String, Seq[T]]) extends Message case class RelayMessage[T <: Message](message: T, replyTo: ActorRef[Message]) extends Message case class Position(ticker: String, assetClass: String, price: Double, size: Double) case class PortfolioRequest(replyTo: ActorRef[Message]) extends Message case class PortfolioResponse(response: Either[Error, Seq[Position]]) extends Message // other requests/responses for trades/contracts etc. would come here }

In my code there are some more request/responses for Orders and Contracts but they are very similar.

Caching Actor

The caching actor maintains as his state the results so far as a Map with the request ID as the key and checks if all the results are present in case somebody asks for them. It uses several interaction patterns and looks something like this

object TwsCache { import api._ def apply(): Behavior[Message] = Behaviors.withTimers { timers => def state(cache: Map[Int, ResultList[_]] = Map.empty) = Behaviors.receiveMessagePartial { case reg@RegisterResult(reqId, result) => val resultList = cache .getOrElse(reqId, ResultList.Head[reg.Result]) .asInstanceOf[ResultList[reg.Result]] val newResultList = ResultList.add[reg.Result](result, resultList) match { case Right(r) => r case Left(msg) => ResultList.Error(msg) } state(cache + (reqId -> newResultList)) case RegisterEnd(reqId) => val resultList = cache.getOrElse(reqId, ResultList.head) state(cache + (reqId -> ResultList.End(resultList))) case RegisterError(reqId, message) => state(cache + (reqId -> ResultList.Error(message))) case msg@ResultRequest(reqId, replyTo, counter) => cache.get(reqId).collect { case e@ResultList.End(_) => e.toSeq case ResultList.Error(message) => Left(message) } match { case Some(result) => // send it back replyTo ! ResultMessage(result) state(cache - reqId) case None if counter >= 5 => replyTo ! ResultMessage(Left(s"Result is not coming in fast enough for $reqId. Bailing out!")) state(cache - reqId) case None => timers.startSingleTimer(msg.copy(counter = counter + 1), 1 seconds) Behaviors.same } } } }

So this actor saves incoming results in the cache. You register a new item, the end of the stream or an error. If you want to retrieve the results for a request ID, it checks if the data is there. If not, it resends the request to itself using a timer. If it is resent more than 5 times, we return an error

Putting it all together

To put everything together, we move everything into another actor. Let’s first see the code and then explain it.

object TwsTrader { val RealTrading = 7496 val PaperTrading = 7497 // The class with the call backs to be implemented class TwsWrapper(twsCache: ActorRef[Message]) extends DefaultEWrapper with LazyLogging { override def error(id: Int, errorCode: Int, errorMsg: Ticker): Unit = { if (id < 0) logger.info(s"$id - $errorCode: $errorMsg") else { logger.error(s"$id - $errorCode: $errorMsg") twsCache ! RegisterError(id, s"$errorCode: $errorMsg") } } private var is_ready: Boolean = false // TWS uses this call-back to signal that it is ready override def managedAccounts(accountsList: Ticker): Unit = { accounts = accountsList.split(",") is_ready = true } def isReady: Boolean = is_ready def positionReqId: Int = - 111 // can be fixed override def position(account: String, contract: Contract, pos: Double, avgCost: Double): Unit = { logger.info(s"Position came in: ${contract.symbol}") val assetClass = contract.secType() match { case SecType.STK => "Stock" case SecType.CASH => "Currency" case other => throw new IllegalArgumentException(s"Cannot handle contract of type $other") } val ticker = contract.symbol() val tradingPos = Position(ticker, assetClass, avgCost, pos) twsCache ! RegisterResult(positionReqId, tradingPos) } override def positionEnd(): Unit = { logger.info(s"END POSITION") twsCache ! RegisterEnd(positionReqId) } // more callbacks can be implemented here } // The actual actor holding everything together def apply(clientId: Int = 0, port: Int = PaperTrading): Behavior[Message] = Behaviors.setup { ctx => val cache = ctx.spawn(TwsCache(), "tws-cache") val wrapper = new TwsWrapper(cache) // some setting up of TWS API instances val signal = new EJavaSignal val socket = { val s = new EClientSocket(wrapper, signal) s.eConnect("localhost", port, clientId) s } val reader = new EReader(socket, signal) reader.start() val readerThread = new Thread(() => { while (socket.isConnected) { signal.waitForSignal() try { reader.processMsgs() } catch { case e: Exception => ctx.log.warn(e.getMessage) ctx.log.warn(e.getStackTrace.mkString("\n ")) } } }) readerThread.start() // we keep track of the current request id; this actually only needed for // all requests *except* positions requests def state(requestId: Int): Behavior[Message] = Behaviors .receiveMessagePartial[Message] { case RelayMessage(message, replyTo) => replyTo ! message Behaviors.same case PortfolioRequest(replyTo) => socket.reqPositions() ctx.ask(cache, (ref: ActorRef[Message]) => ResultRequest(wrapper.positionReqId, ref)) { case Success(res: ResultMessage[Seq[Position]]) => RelayMessage(PortfolioResponse(res.results), replyTo) case Failure(exception) => RelayMessage(PortfolioResponse(Left(exception.getMessage)), replyTo) case other => RelayMessage(PortfolioResponse(Left(s"Unexpected result: $other")), replyTo) } state(requestId + 1) } .receiveSignal { case (ctx, PostStop) => ctx.log.warn("Shutting down Tws Trader Actor...") socket.eDisconnect() readerThread.interrupt() ctx.log.warn("...Tws Trader Actor shutdown") Behaviors.same } ctx.log.info("Waiting for connection to be ready...") while(!wrapper.isReady) Thread.sleep(200) ctx.log.info("... connection is ready") state(0) } }

First, there is the wrapper class that implements all the callbacks. It simply gets the results and sends them further to the TwsCache actor.

Next, comes the actual actor holding everything together. We spawn the cache actor and create a wrapper instance. Then, we set up some needed classes like the socket/client and the reader.

The behavior of the actor is simple. For each specific request, we send off the request through the socket. Then, we ask the cache for the result. We use the looping feature of the cache to keep asking for the results until the End is registered. Then, it is send back (via the RelayMessage) to whoever was requesting the information.

And that is all you need to know! Let me know if you have any more questions or comments.

Previous
Previous

How digitalisation impacts the bottom line