Subscribe to our newsletter
📬 Receive new lessons straight to your inbox (once a month) and join 40K+ developers in learning how to responsibly deliver value with ML.
So far we've implemented our DataOps (ELT, validation, etc.) and MLOps (optimization, training, evaluation, etc.) workflows as Python function calls. This has worked well since our dataset is static and small. But happens when we need to:
We'll need to break down our end-to-end ML pipeline into individual workflows that can be orchestrated as needed. There are several tools that can help us so this such as Airflow, Prefect, Dagster, Luigi, Orchest and even some ML focused options such as Metaflow, Flyte, KubeFlow Pipelines, Vertex pipelines, etc. We'll be creating our workflows using AirFlow for its:
We'll be running Airflow locally but we can easily scale it by running on a managed cluster platform where we can run Python, Hadoop, Spark, etc. on large batch processing jobs (AWS EMR, Google Cloud's Dataproc, on-prem hardware, etc.).
Before we create our specific pipelines, let's understand and implement Airflow's overarching concepts that will allow us to "author, schedule, and monitor workflows".
Separate repository
Our work in this lesson will live in a separate repository so create a new directory (outside our mlops-course repository) called data-engineering. All the work in this lesson can be found in our data-engineering repository.
To install and run Airflow, we can either do so locally or with Docker. If using docker-compose to run Airflow inside Docker containers, we'll want to allocate at least 4 GB in memory.
This will create an airflow directory with the following components:
We're going to edit the airflow.cfg file to best fit our needs:
And we'll perform a reset to implement these configuration changes.
Now we're ready to initialize our database with an admin user, which we'll use to login to access our workflows in the webserver.
Once we've created a user, we're ready to launch the webserver and log in using our credentials.
The webserver allows us to run and inspect workflows, establish connections to external data storage, manager users, etc. through a UI. Similarly, we could also use Airflow's REST API or Command-line interface (CLI) to perform the same operations. However, we'll be using the webserver because it's convenient to visually inspect our workflows.
We'll explore the different components of the webserver as we learn about Airflow and implement our workflows.
Next, we need to launch our scheduler, which will execute and monitor the tasks in our workflows. The schedule executes tasks by reading from the metadata database and ensures the task has what it needs to finish running. We'll go ahead and execute the following commands on the separate terminal window:
As our scheduler reads from the metadata database, the executor determines what worker processes are necessary for the task to run to completion. Since our default database SQLlite, which can't support multiple connections, our default executor is the Sequential Executor. However, if we choose a more production-grade database option such as PostgresSQL or MySQL, we can choose scalable Executor backends Celery, Kubernetes, etc. For example, running Airflow with Docker uses PostgresSQL as the database and so uses the Celery Executor backend to run tasks in parallel.
Workflows are defined by directed acyclic graphs (DAGs), whose nodes represent tasks and edges represent the data flow relationship between the tasks. Direct and acyclic implies that workflows can only execute in one direction and a previous, upstream task cannot run again once a downstream task has started.
DAGs can be defined inside Python workflow scripts inside the airflow/dags directory and they'll automatically appear (and continuously be updated) on the webserver. Before we start creating our DataOps and MLOps workflows, we'll learn about Airflow's concepts via an example DAG outlined in airflow/dags/example.py. Execute the following commands in a new (3rd) terminal window:
Inside each workflow script, we can define some default arguments that will apply to all DAGs within that workflow.
1
2
3
4 | # Default DAG args
default_args = {
"owner": "airflow",
}
|
Typically, our DAGs are not the only ones running in an Airflow cluster. However, it can be messy and sometimes impossible to execute different workflows when they require different resources, package versions, etc. For teams with multiple projects, it’s a good idea to use something like the KubernetesPodOperator to execute each job using an isolated docker image.
We can initialize DAGs with many parameters (which will override the same parameters in default_args) and in several different ways:
using a with statement
1
2
3
4
5
6
7
8
9
10
11
12 | from airflow import DAG
with DAG(
dag_id="example",
description="Example DAG",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["example"],
) as example:
# Define tasks
pass
|
using the dag decorator
1
2
3
4
5
6
7
8
9
10
11
12
13 | from airflow.decorators import dag
@dag(
dag_id="example",
description="Example DAG",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["example"],
)
def example():
# Define tasks
pass
|
There are many parameters that we can initialize our DAGs with, including a start_date and a schedule_interval. While we could have our workflows execute on a temporal cadence, many ML workflows are initiated by events, which we can map using sensors and hooks to external databases, file systems, etc.
Tasks are the operations that are executed in a workflow and are represented by nodes in a DAG. Each task should be a clearly defined single operation and it should be idempotent, which means we can execute it multiple times and expect the same result and system state. This is important in the event we need to retry a failed task and don't have to worry about resetting the state of our system. Like DAGs, there are several different ways to implement tasks:
using the task decorator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@dag(
dag_id="example",
description="Example DAG with task decorators",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["example"],
)
def example():
@task
def task_1():
return 1
@task
def task_2(x):
return x+1
|
using Operators
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | from airflow.decorators import dag
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
@dag(
dag_id="example",
description="Example DAG with Operators",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["example"],
)
def example():
# Define tasks
task_1 = BashOperator(task_id="task_1", bash_command="echo 1")
task_2 = BashOperator(task_id="task_2", bash_command="echo 2")
|
Though the graphs are directed, we can establish certain trigger rules for each task to execute on conditional successes or failures of the parent tasks.
The first method of creating tasks involved using Operators, which defines what exactly the task will be doing. Airflow has many built-in Operators such as the BashOperator or PythonOperator, which allow us to execute bash and Python commands respectively.
1
2
3
4
5
6
7
8
9
10 | # BashOperator
from airflow.operators.bash_operator import BashOperator
task_1 = BashOperator(task_id="task_1", bash_command="echo 1")
# PythonOperator
from airflow.operators.python import PythonOperator
task_2 = PythonOperator(
task_id="task_2",
python_callable=foo,
op_kwargs={"arg1": ...})
|
There are also many other Airflow native Operators (email, S3, MySQL, Hive, etc.), as well as community maintained provider packages (Kubernetes, Snowflake, Azure, AWS, Salesforce, Tableau, etc.), to execute tasks specific to certain platforms or tools.
We can also create our own custom Operators by extending the BashOperator class.
Once we've defined our tasks using Operators or as decorated functions, we need to define the relationships between them (edges). The way we define the relationships depends on how our tasks were defined:
using decorated functions
1
2
3 | # Task relationships
x = task_1()
y = task_2(x=x)
|
using Operators
1
2
3 | # Task relationships
task_1 >> task_2 # same as task_1.set_downstream(task_2) or
# task_2.set_upstream(task_1)
|
In both scenarios, we'll setting task_2 as the downstream task to task_1.
Note
We can even create intricate DAGs by using these notations to define the relationships.
1
2
3 | task_1 >> [task_2_1, task_2_2] >> task_3
task_2_2 >> task_4
[task_3, task_4] >> task_5
|
When we use task decorators, we can see how values can be passed between tasks. But, how can we pass values when using Operators? Airflow uses XComs (cross communications) objects, defined with a key, value, timestamp and task_id, to push and pull values between tasks. When we use decorated functions, XComs are being used under the hood but it's abstracted away, allowing us to pass values amongst Python functions seamlessly. But when using Operators, we'll need to explicitly push and pull the values as we need it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | def _task_1(ti):
x = 2
ti.xcom_push(key="x", value=x)
def _task_2(ti):
x = ti.xcom_pull(key="x", task_ids=["task_1"])[0]
y = x + 3
ti.xcom_push(key="y", value=y)
@dag(
dag_id="example",
description="Example DAG",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["example"],
)
def example2():
# Tasks
task_1 = PythonOperator(task_id="task_1", python_callable=_task_1)
task_2 = PythonOperator(task_id="task_2", python_callable=_task_2)
task_1 >> task_2
|
We can also view our XComs on the webserver by going to Admin >> XComs:
Warning
The data we pass between tasks should be small (metadata, metrics, etc.) because Airflow's metadata database is not equipped to hold large artifacts. However, if we do need to store and use the large results of our tasks, it's best to use an external data storage (blog storage, model registry, etc.) and perform heavy processing using Spark or inside data systems like a data warehouse.
Once we've defined the tasks and their relationships, we're ready to run our DAGs. We'll start defining our DAG like so:
1
2
3 | # Run DAGs
example1_dag = example_1()
example2_dag = example_2()
|
The new DAG will have appeared when we refresh our Airflow webserver.
Our DAG is initially paused since we specified dags_are_paused_at_creation = True inside our airflow.cfg configuration, so we'll have to manually execute this DAG by clicking on it > unpausing it (toggle) > triggering it (button). To view the logs for any of the tasks in our DAG run, we can click on the task > Log.
Note
We could also use Airflow's REST API (will configured authorization) or Command-line interface (CLI) to inspect and trigger workflows (and a whole lot more). Or we could even use the trigger_dagrun Operator to trigger DAGs from within another workflow.
Had we specified a start_date and schedule_interval when defining the DAG, it would have have automatically executed at the appropriate times. For example, the DAG below will have started two days ago and will be triggered at the start of every day.
1
2
3
4
5
6
7
8
9
10
11
12 | from airflow.decorators import dag
from airflow.utils.dates import days_ago
from datetime import timedelta
@dag(
dag_id="example",
default_args=default_args,
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["example"],
catch_up=False,
)
|
Warning
Depending on the start_date and schedule_interval, our workflow should have been triggered several times and Airflow will try to catchup to the current time. We can avoid this by setting catchup=False when defining the DAG. We can also set this configuration as part of the default arguments:
1
2
3
4 | default_args = {
"owner": "airflow",
"catch_up": False,
}
|
However, if we did want to run particular runs in the past, we can manually backfill what we need.
We could also specify a cron expression for our schedule_interval parameter or even use cron presets.
Airflow's Scheduler will run our workflows one schedule_interval from the start_date. For example, if we want our workflow to start on 01-01-1983 and run @daily, then the first run will be immediately after 01-01-1983T11:59.
While it may make sense to execute many data processing workflows on a scheduled interval, machine learning workflows may require more nuanced triggers. We shouldn't be wasting compute by running executing our workflows just in case we have new data. Instead, we can use sensors to trigger workflows when some external condition is met. For example, we can initiate data processing when a new batch of annotated data appears in a database or when a specific file appears in a file system, etc.
There's so much more to Airflow (monitoring, Task groups, smart senors, etc.) so be sure to explore them as you need them by using the official documentation.
Now that we've reviewed Airflow's major concepts, we're ready to create the DataOps workflows. It's the exact same workflow we defined in our data stack lesson -- extract, load and transform -- but this time we'll be doing everything programmatically and orchestrating it with Airflow.
We'll start by creating the script where we'll define our workflows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | from pathlib import Path
from airflow.decorators import dag
from airflow.utils.dates import days_ago
# Default DAG args
default_args = {
"owner": "airflow",
"catch_up": False,
}
BASE_DIR = Path(__file__).parent.parent.parent.absolute()
@dag(
dag_id="dataops",
description="DataOps workflows.",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["dataops"],
)
def dataops():
"""DataOps workflows."""
pass
# Run DAG
do = dataops()
|
In two separate terminals, activate the virtual environment and spin up the Airflow webserver and scheduler:
We're going to use the Airbyte connections we set up in our data-stack lesson but this time we're going to programmatically trigger the data syncs with Airflow. First, let's ensure that Airbyte is running on a separate terminal in it's repository:
Next, let's install the required packages and establish the connection between Airbyte and Airflow:
We could also establish connections programmatically but it’s good to use the UI to understand what’s happening under the hood.
In order to execute our extract and load data syncs, we can use the AirbyteTriggerSyncOperator:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 | @dag(...)
def dataops():
"""Production DataOps workflows."""
# Extract + Load
extract_and_load_projects = AirbyteTriggerSyncOperator(
task_id="extract_and_load_projects",
airbyte_conn_id="airbyte",
connection_id="XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", # REPLACE
asynchronous=False,
timeout=3600,
wait_seconds=3,
)
extract_and_load_tags = AirbyteTriggerSyncOperator(
task_id="extract_and_load_tags",
airbyte_conn_id="airbyte",
connection_id="XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", # REPLACE
asynchronous=False,
timeout=3600,
wait_seconds=3,
)
# Define DAG
extract_and_load_projects
extract_and_load_tags
|
We can find the connection_id for each Airbyte connection by:
We can trigger our DAG right now and view the extracted data be loaded into our BigQuery data warehouse but we'll continue developing and execute our DAG once the entire DataOps workflow has been defined.
The specific process of where and how we extract our data can be bespoke but what's important is that we have validation at every step of the way. We'll once again use Great Expectations, as we did in our testing lesson, to validate our extracted and loaded data before transforming it.
With the Airflow concepts we've learned so far, there are many ways to use our data validation library to validate our data. Regardless of what data validation tool we use (ex. Great Expectations, TFX, AWS Deequ, etc.) we could use the BashOperator, PythonOperator, etc. to run our tests. However, Great Expectations has a Airflow Provider package to make it even easier to validate our data. This package contains a GreatExpectationsOperator which we can use to execute specific checkpoints as tasks.
This will create the following directory within our data-engineering repository:
But first, before we can create our tests, we need to define a new datasource within Great Expectations for our Google BigQuery data warehouse. This will require several packages and exports:
This will open up an interactive notebook where we can fill in the following details:
Next, we can create a suite of expectations for our data assets:
This will open up an interactive notebook where we can define our expectations. Repeat the same for creating a suite for our tags data asset as well.
Expectations for mlops_course.projectsTable expectations
1
2 | # data leak
validator.expect_compound_columns_to_be_unique(column_list=["title", "description"])
|
Column expectations:
1
2
3
4
5
6
7
8
9
10
11
12
13 | # id
validator.expect_column_values_to_be_unique(column="id")
# create_on
validator.expect_column_values_to_not_be_null(column="created_on")
# title
validator.expect_column_values_to_not_be_null(column="title")
validator.expect_column_values_to_be_of_type(column="title", type_="STRING")
# description
validator.expect_column_values_to_not_be_null(column="description")
validator.expect_column_values_to_be_of_type(column="description", type_="STRING")
|
Column expectations:
1
2
3
4
5
6 | # id
validator.expect_column_values_to_be_unique(column="id")
# tag
validator.expect_column_values_to_not_be_null(column="tag")
validator.expect_column_values_to_be_of_type(column="tag", type_="STRING")
|
Once we have our suite of expectations, we're ready to check checkpoints to execute these expectations:
This will, of course, open up an interactive notebook. Just ensure that the following information is correct (the default values may not be):
And repeat the same for creating a checkpoint for our tags suite.
With our checkpoints defined, we're ready to apply them to our data assets in our warehouse.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | GE_ROOT_DIR = Path(BASE_DIR, "great_expectations")
@dag(...)
def dataops():
...
validate_projects = GreatExpectationsOperator(
task_id="validate_projects",
checkpoint_name="projects",
data_context_root_dir=GE_ROOT_DIR,
fail_task_on_validation_failure=True,
)
validate_tags = GreatExpectationsOperator(
task_id="validate_tags",
checkpoint_name="tags",
data_context_root_dir=GE_ROOT_DIR,
fail_task_on_validation_failure=True,
)
# Define DAG
extract_and_load_projects >> validate_projects
extract_and_load_tags >> validate_tags
|
Once we've validated our extracted and loaded data, we're ready to transform it. Our DataOps workflows are not specific to any particular downstream application so the transformation must be globally relevant (ex. cleaning missing data, aggregation, etc.). Just like in our data stack lesson, we're going to use dbt to transform our data. However, this time, we're going to do everything programmatically using the open-source dbt-core package.
In the root of our data-engineering repository, initialize our dbt directory with the following command:
We'll prepare our dbt models as we did using the dbt Cloud IDE in the previous lesson.
and add the following code to our model files:
1
2
3
4
5 | -- models/labeled_projects/labeled_projects.sql
SELECT p.id, created_on, title, description, tag
FROM `made-with-ml-XXXXXX.mlops_course.projects` p -- REPLACE
LEFT JOIN `made-with-ml-XXXXXX.mlops_course.tags` t -- REPLACE
ON p.id = t.id
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | # models/labeled_projects/schema.yml
version: 2
models:
- name: labeled_projects
description: "Tags for all projects"
columns:
- name: id
description: "Unique ID of the project."
tests:
- unique
- not_null
- name: title
description: "Title of the project."
tests:
- not_null
- name: description
description: "Description of the project."
tests:
- not_null
- name: tag
description: "Labeled tag for the project."
tests:
- not_null
|
And we can use the BashOperator to execute our dbt commands like so:
1
2
3
4
5
6
7
8
9
10
11
12 | DBT_ROOT_DIR = Path(BASE_DIR, "dbt_transforms")
@dag(...)
def dataops():
...
# Transform
transform = BashOperator(task_id="transform", bash_command=f"cd {DBT_ROOT_DIR} && dbt run && dbt test")
# Define DAG
extract_and_load_projects >> validate_projects
extract_and_load_tags >> validate_tags
[validate_projects, validate_tags] >> transform
|
Programmatically using dbt Cloud
While we developed locally, we could just as easily use Airflow’s dbt cloud provider to connect to our dbt cloud and use the different operators to schedule jobs. This is recommended for production because we can design jobs with proper environment, authentication, schemas, etc.
Go to Admin > Connections > +
1
2
3
4
5
6
7
8 | from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
transform = DbtCloudRunJobOperator(
task_id="transform",
job_id=118680, # Go to dbt UI > click left menu > Jobs > Transform > job_id in URL
wait_for_termination=True, # wait for job to finish running
check_interval=10, # check job status
timeout=300, # max time for job to execute
)
|
And of course, we'll want to validate our transformations beyond dbt's built-in methods, using great expectations. We'll create a suite and checkpoint as we did above for our projects and tags data assets.
Table expectations
1
2 | # data leak
validator.expect_compound_columns_to_be_unique(column_list=["title", "description"])
|
Column expectations:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | # id
validator.expect_column_values_to_be_unique(column="id")
# create_on
validator.expect_column_values_to_not_be_null(column="created_on")
# title
validator.expect_column_values_to_not_be_null(column="title")
validator.expect_column_values_to_be_of_type(column="title", type_="STRING")
# description
validator.expect_column_values_to_not_be_null(column="description")
validator.expect_column_values_to_be_of_type(column="description", type_="STRING")
# tag
validator.expect_column_values_to_not_be_null(column="tag")
validator.expect_column_values_to_be_of_type(column="tag", type_="STRING")
|
and just like how we added the validation task for our extracted and loaded data, we can do the same for our transformed data in Airflow:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | @dag(...)
def dataops():
...
# Transform
transform = BashOperator(task_id="transform", bash_command=f"cd {DBT_ROOT_DIR} && dbt run && dbt test")
validate_transforms = GreatExpectationsOperator(
task_id="validate_transforms",
checkpoint_name="labeled_projects",
data_context_root_dir=GE_ROOT_DIR,
fail_task_on_validation_failure=True,
)
# Define DAG
extract_and_load_projects >> validate_projects
extract_and_load_tags >> validate_tags
[validate_projects, validate_tags] >> transform >> validate_transforms
|
Now we have our entire DataOps DAG define and executing it will prepare our data from extraction to loading to transformation (and with validation at every step of the way) for downstream applications.
Typically we'll use sensors to trigger workflows when a condition is met or trigger them directly from the external source via API calls, etc. For our ML use cases, this could be at regular intervals or when labeling or monitoring workflows trigger retraining, etc.
Once we have our data prepared, we're ready to create one of the many potential downstream applications that will depend on it. Let's head back to our mlops-course project and follow the same set up instructions for Airflow (you can stop the Airflow webserver and scheduler from our data-engineering project since we'll reuse PORT 8000).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | # airflow/dags/workflows.py
from pathlib import Path
from airflow.decorators import dag
from airflow.utils.dates import days_ago
# Default DAG args
default_args = {
"owner": "airflow",
"catch_up": False,
}
@dag(
dag_id="mlops",
description="MLOps tasks.",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["mlops"],
)
def mlops():
"""MLOps workflows."""
pass
# Run DAG
ml = mlops()
|
We already had an tagifai.elt_data function defined to prepare our data but if we want to leverage the data inside our data warehouse, we'll want to connect to it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 | # airflow/dags/workflows.py
from google.cloud import bigquery
from google.oauth2 import service_account
PROJECT_ID = "made-with-ml-XXXXX" # REPLACE
SERVICE_ACCOUNT_KEY_JSON = "/Users/goku/Downloads/made-with-ml-XXXXXX-XXXXXXXXXXXX.json" # REPLACE
def _extract_from_dwh():
"""Extract labeled data from
our BigQuery data warehouse and
save it locally."""
# Establish connection to DWH
credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_KEY_JSON)
client = bigquery.Client(credentials=credentials, project=PROJECT_ID)
# Query data
query_job = client.query("""
SELECT *
FROM mlops_course.labeled_projects""")
results = query_job.result()
results.to_dataframe().to_csv(Path(config.DATA_DIR, "labeled_projects.csv"), index=False)
@dag(
dag_id="mlops",
description="MLOps tasks.",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["mlops"],
)
def mlops():
"""MLOps workflows."""
extract_from_dwh = PythonOperator(
task_id="extract_data",
python_callable=_extract_from_dwh,
)
# Define DAG
extract_from_dwh
|
Next, we'll use Great Expectations to validate our data. Even though we've already validated our data, it's a best practice to test for data quality whenever there is a hand-off of data from one place to another. We've already created a checkpoint for our labeled_projects in our testing lesson so we'll just leverage that inside our MLOps DAG.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from config import config
GE_ROOT_DIR = Path(config.BASE_DIR, "tests", "great_expectations")
@dag(...)
def mlops():
"""MLOps workflows."""
extract_from_dwh = PythonOperator(
task_id="extract_data",
python_callable=_extract_from_dwh,
)
validate = GreatExpectationsOperator(
task_id="validate",
checkpoint_name="labeled_projects",
data_context_root_dir=GE_ROOT_DIR,
fail_task_on_validation_failure=True,
)
# Define DAG
extract_from_dwh >> validate
|
Finally, we'll optimize and train a model using our validated data.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | from airflow.operators.python_operator import PythonOperator
from config import config
from tagifai import main
@dag(...)
def mlops():
"""MLOps workflows."""
...
optimize = PythonOperator(
task_id="optimize",
python_callable=main.optimize,
op_kwargs={
"args_fp": Path(config.CONFIG_DIR, "args.json"),
"study_name": "optimization",
"num_trials": 1,
},
)
train = PythonOperator(
task_id="train",
python_callable=main.train_model,
op_kwargs={
"args_fp": Path(config.CONFIG_DIR, "args.json"),
"experiment_name": "baselines",
"run_name": "sgd",
},
)
|
And with that we have our MLOps workflow defined that uses the prepared data from our DataOps workflow. At this point, we can add additional tasks for offline/online evaluation, deployment, etc. with the same process as above.
The DataOps and MLOps workflows connect to create an ML system that's capable of continually learning. Such a system will guide us with when to update, what exactly to update and how to update it (easily).
We use the word continual (repeat with breaks) instead of continuous (repeat without interruption / intervention) because we're not trying to create a system that will automatically update with new incoming data without human intervention.
Our production system is live and monitored. When an event of interest occurs (ex. drift), one of several events needs to be triggered:
If we need to improve on the existing version of the model, it's not just the matter of fact of rerunning the model creation workflow on the new dataset. We need to carefully compose the training data in order to avoid issues such as catastrophic forgetting (forget previously learned patterns when presented with new data).
Once we have the proper dataset for retraining, we can kickoff the workflows to update our system!
Upcoming live cohorts
Sign up for our upcoming live cohort, where we'll provide live lessons + QA, compute (GPUs) and community to learn everything in one day.
To cite this content, please use:
1
2
3
4
5
6 | @article{madewithml,
author = {Goku Mohandas},
title = { Orchestration for Machine Learning - Made With ML },
howpublished = {\url{https://madewithml.com/}},
year = {2023}
}
|