Hi,
this is a data ingestion problem. You need to create your custom bundle where if interested, you extend the basic OHLC model with bid/ask data.
Basically in zipline you can define the data you want to ingest by tweaking the internals quite a bit. Wether it is OHLC, or fundamentals or whatever you want, you can create a custom data structure which you can later query via pipeline and apply any factors and calculations you wish.
First you need your data in the form of csv files, which you may store at any desired location. Just be sure you pass that path to the csvdir parameter in the ‘start_bundle_with_data’ function below.
I will try to summarize the steps followingly. If you have any doubts you can contact me on carloscz25@gmail.com and will try to further assist.
Steps 1 and 2 are needed only if altering the set of columns that you want your bundle to contain. If only ingesting OHLC data, you don’t need to do anything and skip to step 3. If adding or removing columns you will need to inform zipline of the columns and datatypes you are using, and implement the writer and reader classes.
Step 1: Create a Dataset class. There you basically create a class that extends the zipline.pipeline.data.dataset.DataSet class. In it you need to define the columns of your data, so instead open,high,low,close,… you can define whatever you like in the form (this example comes from a fundamental data ingestion I did in the past):
CustomDataset.py
class CustomDataset(DataSet):
"""
:class:` Define the columns you wish to ingest
"""
current = Column(float64_dtype, currency_aware=True)
ebitda = Column(float64_dtype, currency_aware=True)
(....)
REMINDER: If you stick to the OHLC model you don’t need to do this step. This is only needed if you need to alter the data stored.
Step 2: Define BColzReader and BColzWriter classes for your data. These are the classes that in the background the zipline library use to read/write data in the BCOLZ database (the database framework used to store assets data). Later I show you how you set those classes to be linked to the bundle, for zipline to use, when accessing this bundle. The classes implementation is quite long, but you only need to copy them over to your desired location. The default implementations you can base your classes from are in ‘zipline/data/bcolz_daily_bars.py’. In there you select the full implementation of the BcolzDailyBarReader/Writer and copy it into a new file. Then you need to do the adjustments to those classes to adapt them to the set of columns you want to have in your bundle. The classes are quite long if compared to the adjustments you need to make. These are best realized when debugging, and they will highly depend on the type of data you want to store.
IMPORTANT: Again if you stick to OHLC data you don’t need to implement anything, and use the default implementations.
Step 3: Define an ingest function. The default implementation is in ‘zipline/data/bundles/csvdir.py’ in the function ‘csvdir_bundle’. You basically need to adapt it to your needs, so paste its content in your desired location and adapt it to your needs. When doing this step there’s an internal function called ‘_pricing_iter’ that you will surely need to implement aswell, specially if your calendar needs data to be filled in any existing date gaps. It is pretty self explanatory while debugging.
Step 4: Put it all together. This is a custom implementation for an ohlc model. You can take it as a guide for your own implementation. If you skipped steps 1 and 2, you will only need to paste this function and the one in step 3 in a file and call ‘start_bundle_with_data’. The good thing is that you may run this purely on your IDE.
def start_bundle_with_data(
yyyy_mm_dd_from,
yyyy_mm_dd_to,
bundle_name,
calendar_name='XMAD',
bundle_timeframe_name='daily',
csvdir='C:/csvdir',
exchange_name='CSVDIR',
country_code='ES'
):
start_session = pd.Timestamp(yyyy_mm_dd_from, tz='utc')
end_session = pd.Timestamp(yyyy_mm_dd_to, tz='utc')
register(
bundle_name,
None,
calendar_name=calendar_name,
start_session=start_session,
end_session=end_session,
create_writers = False
)
environ = os.environ
#timestr = "bundle"
timestamp = pd.Timestamp.utcnow()
timestamp = timestamp.tz_convert("utc").tz_localize(None)
timestr = to_bundle_ingest_dirname(timestamp)
cachepath = cache_path(bundle_name, environ=environ)
#cache = dataframe_cache(cachepath, clean_on_failure=False)
try:
bundle = bundles[bundle_name]
except KeyError:
raise UnknownBundle(bundle_name)
calendar = get_calendar(bundle.calendar_name)
with dataframe_cache(
cachepath, clean_on_failure=False
) as cache, ExitStack() as stack:
wd = stack.enter_context(working_dir(pth.data_path([], environ=environ)))
#creating writer
daily_bars_path = wd.ensure_dir(*daily_equity_relative(bundle_name, timestr))
daily_bar_writer = BcolzDailyBarWriter(daily_bars_path, calendar, start_session, end_session,)
daily_bar_writer.write(())
minute_bars_path = wd.ensure_dir(*minute_equity_relative(bundle_name, timestr))
minute_bar_writer = BcolzMinuteBarWriter(
minute_bars_path,
calendar,
start_session,
end_session,
minutes_per_day=bundle.minutes_per_day,
)
assets_db_path = wd.getpath(*asset_db_relative(bundle_name, timestr))
asset_db_writer = AssetDBWriter(assets_db_path)
adjustment_db_writer = stack.enter_context(
SQLiteAdjustmentWriter(
wd.getpath(*adjustment_db_relative(bundle_name, timestr)),
BcolzDailyBarReader(daily_bars_path),
overwrite=True,
)
)
o = pth.data_path([bundle_name, timestr], environ=environ)
csvdir_bundle_register(environ, asset_db_writer, minute_bar_writer, daily_bar_writer, adjustment_db_writer, calendar,
start_session, end_session, cache, True, o,
[bundle_timeframe_name],
csvdir,
exchange_name, country_code)
assets_versions = [7]
for version in sorted(set(assets_versions), reverse=True):
version_path = wd.getpath(
*asset_db_relative(
bundle_name,
timestr,
db_version=version,
)
)
with working_file(version_path) as wf:
shutil.copy2(assets_db_path, wf.path)
downgrade(wf.path, version)
The ingest function in this example is ‘csvdir_bundle_ingest’. ‘start_bundle_with_data’ is the function that runs the whole thing. If skipping steps 1 and 2, you only need to implement csvdir_bundle_ingest. If not skipping them, you need to define your dataset, and writer/reader classes.
This is only a guide. An since in this step we are storing data (not reading), you will only see references to the writer class.
Again a good point of this approach, is that you can run it directly with your desired IDE, not needing to run it from the command line (which is the way many tutorials explain this process out there), so it easens things for debugging!
Once data is stored in the .zipline directory (by default) In order to read the data you stored, the process is also a bit tricky, since basically you need to set up things manually. In this example reading the data was doable by running the next code:
#Registering the ingest_func and the bundle name the engine will get the data from
environ = os.environ
bundles.register('your_bundle_name', None)
bundle_name = "your_bundle_name"
#loading and setting the calendar
trading_calendar = get_calendar('XMAD')
bundle_data = BundleData(
asset_finder=AssetFinder(
pth.data_path([])+"\\" + bundle_name+"\\bundle\\assets-7.sqlite",
),
equity_minute_bar_reader=BcolzMinuteBarReader(
pth.data_path([])+"\\" + bundle_name+"\\bundle\\minute_equities.bcolz",
),
equity_daily_bar_reader=BcolzDailyBarReader(
pth.data_path([])+"\\" + bundle_name+"\\bundle\\daily_equities.bcolz",
),
adjustment_reader=SQLiteAdjustmentReader(
pth.data_path([])+"\\" + bundle_name+"\\bundle\\adjustments.sqlite",
),
)
loader = EquityPricingLoader(bundle_data.equity_daily_bar_reader,bundle_data.adjustment_reader, None)
engine = engine = SimplePipelineEngine(get_loader=lambda: loader,asset_finder=bundle_data.asset_finder)
universe_start_date = pd.Timestamp('2019-02-25', tz='UTC')
universe_end_date = pd.Timestamp('2019-06-28', tz='UTC')
universe_tickers = engine.run_pipeline(
Pipeline(domain=ES_EQUITIES),
universe_end_date,
universe_end_date).index.get_level_values(1).values.tolist()
data_portal = DataPortal(
bundle_data.asset_finder,
trading_calendar=trading_calendar,
first_trading_day=bundle_data.equity_daily_bar_reader.first_trading_day,
last_available_session=universe_end_date,
equity_minute_reader=None,
equity_daily_reader=bundle_data.equity_daily_bar_reader,
adjustment_reader=bundle_data.adjustment_reader)
start_date = universe_start_date
end_date = universe_end_date
end_loc = trading_calendar.closes.index.get_loc(end_date)
start_loc = trading_calendar.closes.index.get_loc(start_date)
data = data_portal.get_history_window(
assets=universe_tickers,
end_dt=end_date,
bar_count=end_loc - start_loc,
frequency='1d',
field='close',
data_frequency='daily')
Note that you need to point to your writer/reader classes, in case you defined them, when setting up the BundleData class.
And that’s it. Bear in mind conducting a full ingest/read process takes a bit of time, and since you need to understand what’s in the background I strongly suggest that you start by ingesting plain ohlc data from csv files and debug. This is the fastest way to understand what’s going on and how you can adapt it to your needs.
After that you may use the function defined in step 4.
Good luck.
Regards,
Carlos.