diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..45e0b8f --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +ignore = E501 W504 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..68ec8d9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +__pycache__ +.envrc +.zipline +.pyversion +.vscode +.coverage +.python-version +.pytest_cache +algo-state.pkl +/data +console.log diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..2dc8575 --- /dev/null +++ b/Pipfile @@ -0,0 +1,15 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +alpaca-trade-api = ">=0.39" +websockets = "*" + +[dev-packages] +pytest = "*" +pytest-cov = "*" + +[requires] +python_version = "3.6" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000..921d6ee --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,288 @@ +{ + "_meta": { + "hash": { + "sha256": "518072c72e75613128f752d2f49bab346ab47f3a88ea4c9ce488b6022b0c7460" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.6" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "alpaca-trade-api": { + "hashes": [ + "sha256:78ad750210f8fd8872a2e387e02bb82261152df0cf27156e65c8bb7be7421048", + "sha256:901f90bb90fceec1e8027a804dfe76d9080dfdcc97ac1891aa6eb6d47b727754" + ], + "index": "pypi", + "version": "==0.39" + }, + "asyncio-nats-client": { + "hashes": [ + "sha256:c36e464a33e2d1bb59437b68ad74051f9f3113969108e4f8008b1e3fb5a2969f" + ], + "version": "==0.9.2" + }, + "certifi": { + "hashes": [ + "sha256:e4f3620cfea4f83eedc95b24abd9cd56f3c4b146dd0177e83a21b4eb49e21e50", + "sha256:fd7c7c74727ddcf00e9acd26bba8da604ffec95bf1c2144e67aff7a8b50e6cef" + ], + "version": "==2019.9.11" + }, + "chardet": { + "hashes": [ + "sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae", + "sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691" + ], + "version": "==3.0.4" + }, + "idna": { + "hashes": [ + "sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407", + "sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c" + ], + "version": "==2.8" + }, + "numpy": { + "hashes": [ + "sha256:05dbfe72684cc14b92568de1bc1f41e5f62b00f714afc9adee42f6311738091f", + "sha256:0d82cb7271a577529d07bbb05cb58675f2deb09772175fab96dc8de025d8ac05", + "sha256:10132aa1fef99adc85a905d82e8497a580f83739837d7cbd234649f2e9b9dc58", + "sha256:12322df2e21f033a60c80319c25011194cd2a21294cc66fee0908aeae2c27832", + "sha256:16f19b3aa775dddc9814e02a46b8e6ae6a54ed8cf143962b4e53f0471dbd7b16", + "sha256:3d0b0989dd2d066db006158de7220802899a1e5c8cf622abe2d0bd158fd01c2c", + "sha256:438a3f0e7b681642898fd7993d38e2bf140a2d1eafaf3e89bb626db7f50db355", + "sha256:5fd214f482ab53f2cea57414c5fb3e58895b17df6e6f5bca5be6a0bb6aea23bb", + "sha256:73615d3edc84dd7c4aeb212fa3748fb83217e00d201875a47327f55363cef2df", + "sha256:7bd355ad7496f4ce1d235e9814ec81ee3d28308d591c067ce92e49f745ba2c2f", + "sha256:7d077f2976b8f3de08a0dcf5d72083f4af5411e8fddacd662aae27baa2601196", + "sha256:a4092682778dc48093e8bda8d26ee8360153e2047826f95a3f5eae09f0ae3abf", + "sha256:b458de8624c9f6034af492372eb2fee41a8e605f03f4732f43fc099e227858b2", + "sha256:e70fc8ff03a961f13363c2c95ef8285e0cf6a720f8271836f852cc0fa64e97c8", + "sha256:ee8e9d7cad5fe6dde50ede0d2e978d81eafeaa6233fb0b8719f60214cf226578", + "sha256:f4a4f6aba148858a5a5d546a99280f71f5ee6ec8182a7d195af1a914195b21a2" + ], + "version": "==1.17.2" + }, + "pandas": { + "hashes": [ + "sha256:18d91a9199d1dfaa01ad645f7540370ba630bdcef09daaf9edf45b4b1bca0232", + "sha256:3f26e5da310a0c0b83ea50da1fd397de2640b02b424aa69be7e0784228f656c9", + "sha256:4182e32f4456d2c64619e97c58571fa5ca0993d1e8c2d9ca44916185e1726e15", + "sha256:426e590e2eb0e60f765271d668a30cf38b582eaae5ec9b31229c8c3c10c5bc21", + "sha256:5eb934a8f0dc358f0e0cdf314072286bbac74e4c124b64371395e94644d5d919", + "sha256:717928808043d3ea55b9bcde636d4a52d2236c246f6df464163a66ff59980ad8", + "sha256:8145f97c5ed71827a6ec98ceaef35afed1377e2d19c4078f324d209ff253ecb5", + "sha256:8744c84c914dcc59cbbb2943b32b7664df1039d99e834e1034a3372acb89ea4d", + "sha256:c1ac1d9590d0c9314ebf01591bd40d4c03d710bfc84a3889e5263c97d7891dee", + "sha256:cb2e197b7b0687becb026b84d3c242482f20cbb29a9981e43604eb67576da9f6", + "sha256:d4001b71ad2c9b84ff18b182cea22b7b6cbf624216da3ea06fb7af28d1f93165", + "sha256:d8930772adccb2882989ab1493fa74bd87d47c8ac7417f5dd3dd834ba8c24dc9", + "sha256:dfbb0173ee2399bc4ed3caf2d236e5c0092f948aafd0a15fbe4a0e77ee61a958", + "sha256:eebfbba048f4fa8ac711b22c78516e16ff8117d05a580e7eeef6b0c2be554c18", + "sha256:f1b21bc5cf3dbea53d33615d1ead892dfdae9d7052fa8898083bec88be20dcd2" + ], + "version": "==0.25.1" + }, + "python-dateutil": { + "hashes": [ + "sha256:7e6584c74aeed623791615e26efd690f29817a27c73085b78e4bad02493df2fb", + "sha256:c89805f6f4d64db21ed966fda138f8a5ed7a4fdbc1a8ee329ce1b74e3c74da9e" + ], + "version": "==2.8.0" + }, + "pytz": { + "hashes": [ + "sha256:26c0b32e437e54a18161324a2fca3c4b9846b74a8dccddd843113109e1116b32", + "sha256:c894d57500a4cd2d5c71114aaab77dbab5eabd9022308ce5ac9bb93a60a6f0c7" + ], + "version": "==2019.2" + }, + "requests": { + "hashes": [ + "sha256:11e007a8a2aa0323f5a921e9e6a2d7e4e67d9877e85773fba9ba6419025cbeb4", + "sha256:9cf5292fcd0f598c671cfc1e0d7d1a7f13bb8085e9a590f48c010551dc6c4b31" + ], + "version": "==2.22.0" + }, + "six": { + "hashes": [ + "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", + "sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73" + ], + "version": "==1.12.0" + }, + "urllib3": { + "hashes": [ + "sha256:2393a695cd12afedd0dcb26fe5d50d0cf248e5a66f75dbd89a3d4eb333a61af4", + "sha256:a637e5fae88995b256e3409dc4d52c2e2e0ba32c42a6365fee8bbd2238de3cfb" + ], + "version": "==1.24.3" + }, + "websocket-client": { + "hashes": [ + "sha256:1151d5fb3a62dc129164292e1227655e4bbc5dd5340a5165dfae61128ec50aa9", + "sha256:1fd5520878b68b84b5748bb30e592b10d0a91529d5383f74f4964e72b297fd3a" + ], + "version": "==0.56.0" + }, + "websockets": { + "hashes": [ + "sha256:049e694abe33f8a1d99969fee7bfc0ae6761f7fd5f297c58ea933b27dd6805f2", + "sha256:73ce69217e4655783ec72ce11c151053fcbd5b837cc39de7999e19605182e28a", + "sha256:83e63aa73331b9ca21af61df8f115fb5fbcba3f281bee650a4ad16a40cd1ef15", + "sha256:882a7266fa867a2ebb2c0baaa0f9159cabf131cf18c1b4270d79ad42f9208dc5", + "sha256:8c77f7d182a6ea2a9d09c2612059f3ad859a90243e899617137ee3f6b7f2b584", + "sha256:8d7a20a2f97f1e98c765651d9fb9437201a9ccc2c70e94b0270f1c5ef29667a3", + "sha256:a7affaeffbc5d55681934c16bb6b8fc82bb75b175e7fd4dcca798c938bde8dda", + "sha256:c82e286555f839846ef4f0fdd6910769a577952e1e26aa8ee7a6f45f040e3c2b", + "sha256:e906128532a14b9d264a43eb48f9b3080d53a9bda819ab45bf56b8039dc606ac", + "sha256:e9102043a81cdc8b7c8032ff4bce39f6229e4ac39cb2010946c912eeb84e2cb6", + "sha256:f5cb2683367e32da6a256b60929a3af9c29c212b5091cf5bace9358d03011bf5" + ], + "index": "pypi", + "version": "==8.0.2" + } + }, + "develop": { + "atomicwrites": { + "hashes": [ + "sha256:03472c30eb2c5d1ba9227e4c2ca66ab8287fbfbbda3888aa93dc2e28fc6811b4", + "sha256:75a9445bac02d8d058d5e1fe689654ba5a6556a1dfd8ce6ec55a0ed79866cfa6" + ], + "version": "==1.3.0" + }, + "attrs": { + "hashes": [ + "sha256:69c0dbf2ed392de1cb5ec704444b08a5ef81680a61cb899dc08127123af36a79", + "sha256:f0b870f674851ecbfbbbd364d6b5cbdff9dcedbc7f3f5e18a6891057f21fe399" + ], + "version": "==19.1.0" + }, + "coverage": { + "hashes": [ + "sha256:08907593569fe59baca0bf152c43f3863201efb6113ecb38ce7e97ce339805a6", + "sha256:0be0f1ed45fc0c185cfd4ecc19a1d6532d72f86a2bac9de7e24541febad72650", + "sha256:141f08ed3c4b1847015e2cd62ec06d35e67a3ac185c26f7635f4406b90afa9c5", + "sha256:19e4df788a0581238e9390c85a7a09af39c7b539b29f25c89209e6c3e371270d", + "sha256:23cc09ed395b03424d1ae30dcc292615c1372bfba7141eb85e11e50efaa6b351", + "sha256:245388cda02af78276b479f299bbf3783ef0a6a6273037d7c60dc73b8d8d7755", + "sha256:331cb5115673a20fb131dadd22f5bcaf7677ef758741312bee4937d71a14b2ef", + "sha256:386e2e4090f0bc5df274e720105c342263423e77ee8826002dcffe0c9533dbca", + "sha256:3a794ce50daee01c74a494919d5ebdc23d58873747fa0e288318728533a3e1ca", + "sha256:60851187677b24c6085248f0a0b9b98d49cba7ecc7ec60ba6b9d2e5574ac1ee9", + "sha256:63a9a5fc43b58735f65ed63d2cf43508f462dc49857da70b8980ad78d41d52fc", + "sha256:6b62544bb68106e3f00b21c8930e83e584fdca005d4fffd29bb39fb3ffa03cb5", + "sha256:6ba744056423ef8d450cf627289166da65903885272055fb4b5e113137cfa14f", + "sha256:7494b0b0274c5072bddbfd5b4a6c6f18fbbe1ab1d22a41e99cd2d00c8f96ecfe", + "sha256:826f32b9547c8091679ff292a82aca9c7b9650f9fda3e2ca6bf2ac905b7ce888", + "sha256:93715dffbcd0678057f947f496484e906bf9509f5c1c38fc9ba3922893cda5f5", + "sha256:9a334d6c83dfeadae576b4d633a71620d40d1c379129d587faa42ee3e2a85cce", + "sha256:af7ed8a8aa6957aac47b4268631fa1df984643f07ef00acd374e456364b373f5", + "sha256:bf0a7aed7f5521c7ca67febd57db473af4762b9622254291fbcbb8cd0ba5e33e", + "sha256:bf1ef9eb901113a9805287e090452c05547578eaab1b62e4ad456fcc049a9b7e", + "sha256:c0afd27bc0e307a1ffc04ca5ec010a290e49e3afbe841c5cafc5c5a80ecd81c9", + "sha256:dd579709a87092c6dbee09d1b7cfa81831040705ffa12a1b248935274aee0437", + "sha256:df6712284b2e44a065097846488f66840445eb987eb81b3cc6e4149e7b6982e1", + "sha256:e07d9f1a23e9e93ab5c62902833bf3e4b1f65502927379148b6622686223125c", + "sha256:e2ede7c1d45e65e209d6093b762e98e8318ddeff95317d07a27a2140b80cfd24", + "sha256:e4ef9c164eb55123c62411f5936b5c2e521b12356037b6e1c2617cef45523d47", + "sha256:eca2b7343524e7ba246cab8ff00cab47a2d6d54ada3b02772e908a45675722e2", + "sha256:eee64c616adeff7db37cc37da4180a3a5b6177f5c46b187894e633f088fb5b28", + "sha256:ef824cad1f980d27f26166f86856efe11eff9912c4fed97d3804820d43fa550c", + "sha256:efc89291bd5a08855829a3c522df16d856455297cf35ae827a37edac45f466a7", + "sha256:fa964bae817babece5aa2e8c1af841bebb6d0b9add8e637548809d040443fee0", + "sha256:ff37757e068ae606659c28c3bd0d923f9d29a85de79bf25b2b34b148473b5025" + ], + "version": "==4.5.4" + }, + "importlib-metadata": { + "hashes": [ + "sha256:aa18d7378b00b40847790e7c27e11673d7fed219354109d0e7b9e5b25dc3ad26", + "sha256:d5f18a79777f3aa179c145737780282e27b508fc8fd688cb17c7a813e8bd39af" + ], + "markers": "python_version < '3.8'", + "version": "==0.23" + }, + "more-itertools": { + "hashes": [ + "sha256:409cd48d4db7052af495b09dec721011634af3753ae1ef92d2b32f73a745f832", + "sha256:92b8c4b06dac4f0611c0729b2f2ede52b2e1bac1ab48f089c7ddc12e26bb60c4" + ], + "version": "==7.2.0" + }, + "packaging": { + "hashes": [ + "sha256:28b924174df7a2fa32c1953825ff29c61e2f5e082343165438812f00d3a7fc47", + "sha256:d9551545c6d761f3def1677baf08ab2a3ca17c56879e70fecba2fc4dde4ed108" + ], + "version": "==19.2" + }, + "pluggy": { + "hashes": [ + "sha256:0db4b7601aae1d35b4a033282da476845aa19185c1e6964b25cf324b5e4ec3e6", + "sha256:fa5fa1622fa6dd5c030e9cad086fa19ef6a0cf6d7a2d12318e10cb49d6d68f34" + ], + "version": "==0.13.0" + }, + "py": { + "hashes": [ + "sha256:64f65755aee5b381cea27766a3a147c3f15b9b6b9ac88676de66ba2ae36793fa", + "sha256:dc639b046a6e2cff5bbe40194ad65936d6ba360b52b3c3fe1d08a82dd50b5e53" + ], + "version": "==1.8.0" + }, + "pyparsing": { + "hashes": [ + "sha256:6f98a7b9397e206d78cc01df10131398f1c8b8510a2f4d97d9abd82e1aacdd80", + "sha256:d9338df12903bbf5d65a0e4e87c2161968b10d2e489652bb47001d82a9b028b4" + ], + "version": "==2.4.2" + }, + "pytest": { + "hashes": [ + "sha256:13c1c9b22127a77fc684eee24791efafcef343335d855e3573791c68588fe1a5", + "sha256:d8ba7be9466f55ef96ba203fc0f90d0cf212f2f927e69186e1353e30bc7f62e5" + ], + "index": "pypi", + "version": "==5.2.0" + }, + "pytest-cov": { + "hashes": [ + "sha256:2b097cde81a302e1047331b48cadacf23577e431b61e9c6f49a1170bbe3d3da6", + "sha256:e00ea4fdde970725482f1f35630d12f074e121a23801aabf2ae154ec6bdd343a" + ], + "index": "pypi", + "version": "==2.7.1" + }, + "six": { + "hashes": [ + "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", + "sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73" + ], + "version": "==1.12.0" + }, + "wcwidth": { + "hashes": [ + "sha256:3df37372226d6e63e1b1e1eda15c594bca98a22d33a23832a90998faa96bc65e", + "sha256:f4ebe71925af7b40a864553f761ed559b43544f8f71746c2d756c7fe788ade7c" + ], + "version": "==0.1.7" + }, + "zipp": { + "hashes": [ + "sha256:3718b1cbcd963c7d4c5511a8240812904164b7f381b647143a89d3b98f9bcd8e", + "sha256:f06903e9f1f43b12d371004b4ac7b06ab39a44adc747266928ae6debfa7b3335" + ], + "version": "==0.6.0" + } + } +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..ccf0b20 --- /dev/null +++ b/README.md @@ -0,0 +1,138 @@ +# Concurrent Scalp Algo + +This python script is a working example to execute scalping trading algorithm for +[Alpaca API](https://alpaca.markets). This algorithm uses real time order updates +as well as minute level bar streaming from Polygon via Websockets (see +[document](https://docs.alpaca.markets/market-data/#consolidated-market-data) for +Polygon data access). +One of the contributions of this example is to demonstrate how to handle +multiple stocks concurrently as independent routine using Python's asyncio. + +The strategy holds positions for very short period and exits positions quickly, so +you have to have more than $25k equity in your account due to the Pattern Day Trader rule, +to run this example. For more information about PDT rule, please read the +[document](https://docs.alpaca.markets/user-protections/#the-rule). + +## Dependency +This script needs latest [Alpaca Python SDK](https://github.com/alpacahq/alpaca-trade-api-python). +Please install it using pip + +```sh +$ pip3 install alpaca-trade-api +``` + +or use pipenv using `Pipfile` in this directory. + +```sh +$ pipenv install +``` + +## Usage + +```sh +$ python main.py --lot=2000 TSLA FB AAPL +``` + +You can specify as many symbols as you want. The script is designd to kick off while market +is open. Nothing would happen until 21 minutes from the market open as it relies on the +simple moving average as the buy signal. + + +## Strategy +The algorithm idea is to buy the stock upon the buy signal (MA20/price cross over in 1-minute bar) as +much as `lot` amount of dollar, then immediately sell the position at or above the entry price. +The buy signal is expremely simple, but what this strategy achieves is the quick reaction to +exit the position as soon as the buy order fills. There are reasonable chances that you can sell +the positions at the better prices than your entry within the short period of time. We send +limit order at the last trade or entry price whichever the higher to avoid unnecessary slippage. + +The buy order is canceled after 2 minutes if it does not fill, assuming the signal is not +effective anymore. This could happen in a fast-moving market situation. Sells are left +indifinitely until it fills, but this may cause loss more than the accumulated profit depending +on the market situation. This is where you can improve the risk control beyond this example. + +The buy signal is calculated as soon as a minute bar arrives, which typically happen about 4 seconds +after the top of every minute (this is Polygon's behavior for minute bar streaming). + +This example liquidates all watching positions with market order at the end of market hours (03:55pm ET). + + +## Implementation +This example heavily relies on Python's asyncio. Although the thread is single, we handle +multiple symbols concurrently using this async loop. + +We keep track of each symbol state in a separate `ScalpAlgo` class instance. That way, +everything stays simple without complex data struture and easy to read. The `main()` +function creates the algo instance for each symbol and creates streaming object +to listen the bar events. As soon as we receive a minute bar, we invoke event handler +for each symbol. + +The main routine also starts a period check routine to do some work in background every 30 seconds. +In this background task, we check market state with the clock API and liquidate positions +before the market closes. + +### Algo Instance and State Management +Each algo instance initializes its state by fetching day's bar data so far and position/order +from Alpaca API to synchronize, in case the script restarts after some trades. There are +four internal states and transitions as events happen. + +- `TO_BUY`: no position, no order. Can transition to `BUY_SUBMITTED` +- `BUY_SUBMITTED`: buy order has been submitted. Can transition to `TO_BUY` or `TO_SELL` +- `TO_SELL`: buy is filled and holding position. Can transition to `SELL_SUBMITTED` +- `SELL_SUBMITTED`: sell order has been submitted. Can transition to `TO_SELL` or `TO_BUY` + +### Event Handlers +`on_bar()` is an event handler for the bar data. Here we calculate signal that triggers +a buy order in the `TO_BUY` state. Once order is submitted, it goes to the `BUY_SUBMITTED` +state. + +If order is filled, `on_order_update()` handler is called with `event=fill`. The state +transitions to `TO_SELL` and immediately submits a sell order, to transition to the +`SELL_SUBMITTED` state. + +Orders may be canceled or rejected (caused by this script or you manually cancel them +from the dashboard). In these cases, the state transitions to `TO_BUY` (if not holding +a position) or `TO_SELL` (if holding a position) and wait for the next events. + +`checkup()` method is the background periodic job to check several conditions, where +we cancel open orders and sends market sell order if there is an open position. + +It exits once the market closes. + +### Note +Each algo instance owns its child logger, prefixed by the symbol name. The console +log is also emitted to a file `console.log` under the same directory for your later review. + +Again, the beautify of this code is that there is no multithread code but each +algo instance can focus on the bar/order/position data only for its own. It still +handles multiple symbols concurrently plus runs background periodic job in the +same async loop. + +The trick to run additional async routine is as follows. + +```py + loop = stream.loop + loop.run_until_complete(asyncio.gather( + stream.subscribe(channels), + periodic(), + )) + loop.close() +``` + +We use `asyncio.gather()` to run all bar handler, order update handler and periodic job +in one async loop indifinitely. You can kill it by `Ctrl+C`. + +### Customization +Instead of using this buy signal of 20 minute simple moving average cross over, you can +use your own buy signal. To do so, extend the `ScalpAlgo` class and write your own +`_calc_buy_signal()` method. + +```py + class MyScalpAlgo(ScalpAlgo): + def _calculate_buy_signal(self): + '''self._bars has all minute bars in the session so far. Return True to + trigger buy order''' + pass +``` + +And use it instead of the original class. diff --git a/main.py b/main.py new file mode 100644 index 0000000..6bb3f11 --- /dev/null +++ b/main.py @@ -0,0 +1,262 @@ +import alpaca_trade_api as alpaca +import asyncio +import pandas as pd +import sys + +import logging + +logger = logging.getLogger() + + +class ScalpAlgo: + + def __init__(self, api, symbol, lot): + self._api = api + self._symbol = symbol + self._lot = lot + self._bars = [] + self._l = logger.getChild(self._symbol) + + now = pd.Timestamp.now(tz='America/New_York').floor('1min') + market_open = now.replace(hour=9, minute=30) + today = now.strftime('%Y-%m-%d') + tomorrow = (now + pd.Timedelta('1day')).strftime('%Y-%m-%d') + data = api.polygon.historic_agg_v2( + symbol, 1, 'minute', today, tomorrow, unadjusted=False).df + bars = data[market_open:] + self._bars = bars + + self._init_state() + + def _init_state(self): + symbol = self._symbol + order = [o for o in self._api.list_orders() if o.symbol == symbol] + position = [p for p in self._api.list_positions() + if p.symbol == symbol] + self._order = order[0] if len(order) > 0 else None + self._position = position[0] if len(position) > 0 else None + if self._position is not None: + if self._order is None: + self._state = 'TO_SELL' + else: + self._state = 'SELL_SUBMITTED' + if self._order.side != 'sell': + self._l.warn( + f'state {self._state} mismatch order {self._order}') + else: + if self._order is None: + self._state = 'TO_BUY' + else: + self._state = 'BUY_SUBMITTED' + if self._order.side != 'buy': + self._l.warn( + f'state {self._state} mismatch order {self._order}') + + def _now(self): + return pd.Timestamp.now(tz='America/New_York') + + def _outofmarket(self): + return self._now().time() >= pd.Timestamp('15:55').time() + + def checkup(self, position): + # self._l.info('periodic task') + + now = self._now() + order = self._order + if (order is not None and + order.side == 'buy' and now - + order.submitted_at > pd.Timedelta('2 min')): + last_price = self._api.polygon.last_trade(self._symbol).price + self._l.info( + f'canceling missed buy order {order.id} at {order.limit_price} ' + f'(current price = {last_price})') + self._cancel_order() + + if self._position is not None and self._outofmarket(): + self._submit_sell(bailout=True) + + def _cancel_order(self): + if self._order is not None: + self._api.cancel_order(self._order.id) + + def _calc_buy_signal(self): + mavg = self._bars.rolling(20).mean().close.values + closes = self._bars.close.values + if closes[-2] < mavg[-2] and closes[-1] > mavg[-1]: + self._l.info( + f'buy signal: closes[-2] {closes[-2]} < mavg[-2] {mavg[-2]} ' + f'closes[-1] {closes[-1]} > mavg[-1] {mavg[-1]}') + return True + else: + self._l.info( + f'closes[-2:] = {closes[-2:]}, mavg[-2:] = {mavg[-2:]}') + return False + + def on_bar(self, bar): + self._bars = self._bars.append(pd.DataFrame({ + 'open': bar.open, + 'high': bar.high, + 'low': bar.low, + 'close': bar.close, + 'volume': bar.volume, + }, index=[bar.start])) + + self._l.info( + f'received bar start = {bar.start}, close = {bar.close}, len(bars) = {len(self._bars)}') + if len(self._bars) < 21: + return + if self._outofmarket(): + return + if self._state == 'TO_BUY': + signal = self._calc_buy_signal() + if signal: + self._submit_buy() + + def on_order_update(self, event, order): + self._l.info(f'order update: {event} = {order}') + if event == 'fill': + self._order = None + if self._state == 'BUY_SUBMITTED': + self._position = self._api.get_position(self._symbol) + self._transition('TO_SELL') + self._submit_sell() + return + elif self._state == 'SELL_SUBMITTED': + self._position = None + self._transition('TO_BUY') + return + elif event == 'partial_fill': + self._position = self._api.get_position(self._symbol) + self._order = self._api.get_order(order['id']) + return + elif event in ('canceled', 'rejected'): + if event == 'rejected': + self._l.warn(f'order rejected: current order = {self._order}') + self._order = None + if self._state == 'BUY_SUBMITTED': + if self._position is not None: + self._transition('TO_SELL') + self._submit_sell() + else: + self._transition('TO_BUY') + elif self._state == 'SELL_SUBMITTED': + self._transition('TO_SELL') + self._submit_sell(bailout=True) + else: + self._l.warn(f'unexpected state for {event}: {self._state}') + + def _submit_buy(self): + trade = self._api.polygon.last_trade(self._symbol) + amount = int(self._lot / trade.price) + try: + order = self._api.submit_order( + symbol=self._symbol, + side='buy', + type='limit', + qty=amount, + time_in_force='day', + limit_price=trade.price, + ) + except Exception as e: + self._l.info(e) + self._transition('TO_BUY') + return + + self._order = order + self._l.info(f'submitted buy {order}') + self._transition('BUY_SUBMITTED') + + def _submit_sell(self, bailout=False): + params = dict( + symbol=self._symbol, + side='sell', + qty=self._position.qty, + time_in_force='day', + ) + if bailout: + params['type'] = 'market' + else: + current_price = float( + self._api.polygon.last_trade( + self._symbol).price) + cost_basis = float(self._position.avg_entry_price) + limit_price = max(cost_basis + 0.01, current_price) + params.update(dict( + type='limit', + limit_price=limit_price, + )) + try: + order = self._api.submit_order(**params) + except Exception as e: + self._l.error(e) + self._transition('TO_SELL') + return + + self._order = order + self._l.info(f'submitted sell {order}') + self._transition('SELL_SUBMITTED') + + def _transition(self, new_state): + self._l.info(f'transition from {self._state} to {new_state}') + self._state = new_state + + +def main(args): + api = alpaca.REST() + stream = alpaca.StreamConn() + + fleet = {} + symbols = args.symbols + for symbol in symbols: + algo = ScalpAlgo(api, symbol, lot=args.lot) + fleet[symbol] = algo + + @stream.on(r'^AM') + async def on_bars(conn, channel, data): + if data.symbol in fleet: + fleet[data.symbol].on_bar(data) + + @stream.on(r'trade_updates') + async def on_trade_updates(conn, channel, data): + logger.info(f'trade_updates {data}') + symbol = data.order['symbol'] + if symbol in fleet: + fleet[symbol].on_order_update(data.event, data.order) + + async def periodic(): + while True: + if not api.get_clock().is_open: + logger.info('exit as market is not open') + sys.exit(0) + await asyncio.sleep(30) + positions = api.list_positions() + for symbol, algo in fleet.items(): + pos = [p for p in positions if p.symbol == symbol] + algo.checkup(pos[0] if len(pos) > 0 else None) + channels = ['trade_updates'] + [ + 'AM.' + symbol for symbol in symbols + ] + + loop = stream.loop + loop.run_until_complete(asyncio.gather( + stream.subscribe(channels), + periodic(), + )) + loop.close() + + +if __name__ == '__main__': + import argparse + + fmt = '%(asctime)s:%(filename)s:%(lineno)d:%(levelname)s:%(name)s:%(message)s' + logging.basicConfig(level=logging.INFO, format=fmt) + fh = logging.FileHandler('console.log') + fh.setLevel(logging.INFO) + fh.setFormatter(logging.Formatter(fmt)) + logger.addHandler(fh) + + parser = argparse.ArgumentParser() + parser.add_argument('symbols', nargs='+') + parser.add_argument('--lot', type=float, default=2000) + + main(parser.parse_args())