Level 4: Connect components into a full stack AI app

Audience: Users who want to build apps with multiple components.

Prereqs: You know how to build a component.


What is a full stack AI app?

In the ML world, workflows coordinate multiple pieces of code working together. In Lightning, when we coordinate 2 or more Lightning components working together, we instead call it a Lightning App. The difference will become more obvious when we introduce reactive workflows in the advanced section.

For the time being, we’ll go over how to coordinate 2 components together in a traditional workflow setting and explain how it works.

Note

If you’ve used workflow tools for Python, this page describes conventional DAGs. In level 6, we introduce reactive workflows that generalize beyond DAGs so you can build complex systems without much effort.


The toy app

In this app, we define two components that run across 2 separate machines. One to train a model on a GPU machine and one to analyze the model on a separate CPU machine. We save money by stopping the GPU machine when the work is done.

Import Lightning
First, import Lightning
Define Component 1
This component trains a model on a GPU machine
Define Component 2
This component analyzes a model on a CPU machine
Orchestrator
Define the LightningFlow that orchestrates components
Connect component 1
Component 1 will run on a CPU machine
Connect component 2
Component 2 will run on an accelerated GPU machine
Implement run
Describe the workflow in the run method
Train
Training runs first and completes
Analyze
Analyze runs after training completes
Define app placeholder
This allows the app to be runnable
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute


class TrainComponent(LightningWork):
    def run(self, x):
        print(f'train a model on {x}')

class AnalyzeComponent(LightningWork):
    def run(self, x):
        print(f'analyze model on {x}')

class WorkflowOrchestrator(LightningFlow):
    def __init__(self) -> None:
        super().__init__()
        self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
        self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))

    def run(self):
        self.train.run("CPU machine 1")
        self.analyze.run("GPU machine 2")

app = LightningApp(WorkflowOrchestrator())


Now run the app:

Run on Lightning cloud
Run to see these 2 components execute on separate machines 🤯
Your own hardware
Run it locally without code changes 🤯🤯
lightning run app app.py --cloud


Now you can develop distributed cloud apps on your laptop 🤯🤯🤯🤯!


Now you know:

Without going out of your way, you’re now doing the following: (Hint: Click visualize to see an animation describing the code).

Orchestration
Define orchestration in Python with full control-flow
Distributed cloud computing
The two pieces of independent Python code ran on separate machines 🤯🤯
Multi-machine communication
The text "CPU machine 1" was sent from the flow machine to the machine running the TrainComponent
Multi-machine communication
The text "GPU machine 2" was sent from the flow machine to the machine running the AnalyzeComponent
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute


class TrainComponent(LightningWork):
    def run(self, x):
        print(f'train a model on {x}')

class AnalyzeComponent(LightningWork):
    def run(self, x):
        print(f'analyze model on {x}')

class WorkflowOrchestrator(LightningFlow):
    def __init__(self) -> None:
        super().__init__()
        self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
        self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))

    def run(self):
        self.train.run("CPU machine 1")
        self.analyze.run("GPU machine 2")

app = LightningApp(WorkflowOrchestrator())


Maintain full control

Although we’ve abstracted the infrastructure, you still have full control when you need it:

Scheduler
Although you can use Python timers, we have a scheduler short-hand
Crontab syntax
You can also use full cron syntax
Auto-scaling
Code your own auto-scaling syntax (Lightning plays well with Kubernetes)
Organized Python
*Remember* components organize ANY Python code which can even call external non-python scripts such as optimized C++ model servers
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute


class TrainComponent(LightningWork):
    def run(self, x):
        print(f'train a model on {x}')

class AnalyzeComponent(LightningWork):
    def run(self, x):
        print(f'analyze model on {x}')

class WorkflowOrchestrator(LightningFlow):
    def __init__(self) -> None:
        super().__init__()
        self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
        self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))

    def run(self):
        # run training once
        self.train.run("GPU machine 1")

        # run analysis once, then every hour again...
        if self.schedule("hourly"):
            self.analyze.run("CPU machine 2")

app = LightningApp(WorkflowOrchestrator())


Next: Review how to debug

The next levels does a 2 minute review to make sure you know how to debug a Lightning app.