Skip to main content

Collaborative use case

Here we will describe a collaborative use case where two employees are working together toward a common goal.
Created on March 22|Last edited on June 8

Introduction

AcmeFinTech Inc., a fictitious company, has three (also fictitious) all-star employees: Chris, Lukas, and Shawn. They want to investigate how much data from Twitter can influence the predictive power of AcmeFinTech's leading Deep Reinforcement Learning model. They use Weights & Biases to easily pass data back and forth and work concurrently.

Objective: Multi-stock trading

Our overall objective is to earn larger returns each day by trading multiple stocks. On any given day, for any of our chosen stocks, the stock portfolio includes indicators such as the daily high, daily low, number of trades that day, etc. Combined, these indicators make up the state of the world that our model lives in.
Our model outputs actions, which are to buy, sell, or hold any number of stocks for each company. The model uses a policy to produce daily actions given the daily portfolio state. We will train the model using one year of historical data, where it will update its policy after determining the success of its actions after each day. Then, we will test the model using data from the following year.

Asynchronous workflows

Chris: Data analytics

Chris is a data pro, so he takes the lead on setting up the dataset. His first step is to load the raw data and save it as a W&B Artifact, so that he can easily access it later.
# Log the original dataset as an artifact
with wandb.init(
entity=ENTITY_NAME,
project=PROJECT_NAME,
config=wandb_config,
job_type='load_data',
save_code=True, # upload code to wandb
) as run:
dataset = YahooDownloader(
start_date = config.START_DATE,
end_date = config.END_DATE,
ticker_list = config.DOW_30_TICKER).fetch_data()
dataset_artifact = wandb.Artifact(
'stock-data',
type='dataset',
description='Daily stock information',)
dataset_artifact.add(wandb.Table(dataframe=dataset), 'data-table')
run.log_artifact(dataset_artifact, aliases=['latest', 'raw'])
Take note of the last line: Chris included 'raw' in the list of aliases. This means that someone can request stock-data:raw at a later time to get this version.
Of course with most datasets one must do some preprocessing before applying any machine learning model. Chris can do this and log it in the same artifact as a new labeled version. This makes it easy for collaborators to control what dataset iteration they work with later. It is also deduplicated when stored, which reduce total storage costs by only saving the difference between the 'preprocessed' and 'raw' versions.
# Log the preprocessed dataset in the same artifact
with wandb.init(
entity=ENTITY_NAME,
project=PROJECT_NAME,
config=wandb_config,
job_type='preprocess-data',
save_code=True, # upload code to wandb
) as run:
# Load the raw data from the existing artifact
dataset_table = run.use_artifact('stock-data:raw').get('data-table') # want to start with the raw dataset
# Preprocess the data
dataset, ticker_list = preprocess_data(pd.DataFrame(data=dataset_table.data, columns=dataset_table.columns))
# Log the artifact again
dataset_artifact = wandb.Artifact(
'stock-data', type='dataset',
description='Daily stock information',
)
dataset_artifact.add(wandb.Table(dataframe=dataset), 'data-table')
run.log_artifact(dataset_artifact, aliases=['no-tweets'])
Finally, Chris needs to prep a version of the dataset that has Tweet information. To make it easier to explore the data, he is going to log two artifacts. One just has the tweet data and the other has the tweet data concatenated onto the rest of the stock dataset.
with wandb.init(
entity=ENTITY_NAME,
project=PROJECT_NAME,
config=wandb_config,
job_type='append-tweets'
save_code=True, # upload code to wandb
) as run:
df_columns = ['date', 'tic', 'tweet', 'polarity', 'subjectivity', 'sentiment', 'url', 'id']
sentiment_df, tweet_table = tweet_sentiment_analysis(df_columns, ticker_list)
sentiment_df = sentiment_df.fillna(0)[['date', 'tic', 'id', 'polarity', 'subjectivity', 'sentiment']].sort_values(['date', 'tic'])
dataset_artifact = run.use_artifact('stock-data:no-tweets')
dataset_table = dataset_artifact.get('data-table')
dataset = pd.DataFrame(data=dataset_table.data, columns=dataset_table.columns).merge(
sentiment_df, how='left', on=['date', 'tic']).fillna(0)
dataset_artifact = wandb.Artifact(
'stock-data', type='dataset',
description='Daily stock information',
)
dataset_artifact.add(wandb.Table(dataframe=dataset), 'data-table')
run.log_artifact(dataset_artifact, aliases=['with-tweets'])
tweet_artifact = wandb.Artifact(
'tweet-data', type='dataset',
description='Daily stock tweets'
)
tweet_artifact.add(tweet_table, 'data-table')
run.log_artifact(tweet_artifact)
Now we can see the three versions in the W&B App by navigating to the project page and clicking on the Artifacts icon.

We can then click through to view the Tweet table:

date
tic
tweet
polarity
subjectivity
url
1
2
3
and a version of the preprocessed dataset that has tweet sentiment data:

date
tic
open
high
low
close
volume
day
macd
boll_ub
boll_lb
rsi_30
cci_30
dx_30
close_30_sma
close_60_sma
vix
turbulence
id
polarity
subjectivity
sentiment
1
2
3

Lukas: Parameter selection

Lukas has tasked himself with setting up the parameter sweep while Chris is iterating on his preprocessing function. Ultimately their goal is to figure out whether or not Tweet data is helpful for stock trend prediction, so it is important that they run a full sweep of parameter values for each case to give both models a fighting chance.
Lukas is going to use W&B Sweeps to find the optimal set of hyperparameters for the model trained with and without Tweet data. To save on the required compute time, he will train for a limited number of time steps during the sweep. The team can do a longer training session once they have a set of good parameters for each condition. The model they are using has three parameters:
  1. lambda: This gives a trade-off for how important historical success is to updating the current action policy. A value of 0 means it only uses the success of its most recent trade to update its policy. A value of 1 means it takes an average success of all of its previous trades, including its most recent, to update its policy.
  2. ent_coef: This influences the "randomness" of the agent's actions on any given day.
  3. learning_rate: This influences how much any given day of trading can influence the agent's general trading policy.
Lukas is going to use the Sharpe Ratio as a metric for success. The code below runs the sweeps using a pre-defined objective function.
# Hyperparameter options
sweep_total_timesteps = 10000
sweep_config_base_name = 'finrl-sweep'
sweep_config = {
'name' : sweep_config_base_name,
'method' : 'grid',
'metric': {
'name': 'sharpe_ratio',
'goal': 'maximize'
},
'parameters' : {
'ent_coef' : {
'distribution': 'categorical',
'values': [0.0001, 0.001, 0.01, 0.1]
},
'n_steps' : {
'distribution': 'categorical',
'values' : [5, 10, 15]
},
'learning_rate' :{
'distribution': 'categorical',
'values': [0.00009, 0.0001, 0.001, 0.01, 0.1]
}
}
}
# Run the sweep for each dataset
all_prefix = ['no_tweets', 'tweets']
sweep_ids = []
for prefix, train_trade_gyms in zip(all_prefix, all_gyms):
# Use different sweep names for each experimental condition
sweep_config['name'] = f'{prefix}-{sweep_config_base_name}'
# Initializing a sweep
sweep_id = wandb.sweep(sweep_config, project=PROJECT_NAME, entity=ENTITY_NAME)
sweep_ids.append(sweep_id)
# Run it
train_gym, trade_gym = train_trade_gyms
env_train, _ = train_gym.get_sb_env()
agent = wandb_DRLAgent(env=env_train)
count = 20 # number of runs to execute
wandb.agent(sweep_id, function=objective, count=count)
After the sweep finishes Lukas can look through the results for trends in the data. Here are the parameter importance plots that are produced:

Sweep: no_tweets-finrl-sweep 1
20



Sweep: tweets-finrl-sweep 1
19

Over time Lukas might adjust the possible ranges for parameter selection, or change up the search method from a grid search to a Bayesian search. While he is doing this, Shawn is going to prepare the final end of the pipeline: training and testing.

Shawn: Training & testing the model

With preprocessed data and carefully selected hyperparameters, Shawn can begin the larger training process and generate the evaluation plots. Since he wants to get started on this process before his two colleagues are finished, he will write code that loads the latest version of the hyperparameter search and preprocessing pipeline. He will not need to change his code as his colleagues iterate on their sections.
api = wandb.Api()
all_gyms = get_latest_gym_objects(all_prefix, all_indicators, all_columns) # get the latest datasets versions as gyms
for sweep_id, prefix, train_trade_gyms in zip(sweep_ids, all_prefix, all_gyms):
# Get the best parameters from the earlier sweeps
path_to_sweep = f'{ENTITY_NAME}/{PROJECT_NAME}/{sweep_id}'
sweep = api.sweep(path_to_sweep)
best_run = sweep.best_run()
print(f'Identified the best sweep run: {best_run.name}\nwith parameters: {best_run.config}')
# Update config dictionaries and files from sweep data
update_configs(best_run.config)
# Get separate gyms for training & trading scenarios
train_gym, trade_gym = train_trade_gyms
env_train, _ = train_gym.get_sb_env()
# Set up training run
with wandb.init(
entity=ENTITY_NAME,
project=PROJECT_NAME,
config=wandb_config,
sync_tensorboard=True, # auto-upload sb3's tensorboard metrics
monitor_gym=True, # auto-upload the values of agents
save_code=True, # upload code to wandb
) as run:
# Get and train the model
agent = wandb_DRLAgent(env=env_train)
model_a2c = agent.get_model(
wandb_config['policy_type'],
verbose=1,
tensorboard_log=f'runs/{run.id}'
)
callback_list = get_callbacks(run.id, wandb_config, env_train)
trained_a2c = agent.train_model(
model=model_a2c,
tb_log_name=wandb_config['policy_type'],
total_timesteps=wandb_config['total_timesteps'],
callback=callback_list,
)
# Save the trained model to an Artifact
trained_model_artifact = wandb.Artifact('a2c', type='model')
trained_a2c.save(config.TRAINED_MODEL_DIR)
trained_model_artifact.add_dir(config.TRAINED_MODEL_DIR)
run.log_artifact(trained_model_artifact)
He is using the import and export API by starting with api = wanb.Api(). This allows him to load a read-only version of each sweep and get the best parameters. This also doesn't have to be done in the context of a wandb run.
Once the training is finished, he can generate plots to compare the two conditions. It looks like the tweet information is helpful after all!

Run set
2


Wrap-up

This report demonstrated how colleagues can use W&B Artifact aliases to pass information for concurrently working on a project. Because we used aliases in a single artifact, the different variations of the dataset are stored efficiently with deduplication.
File<(table)>
File<(table)>