Level 4: Connect components into a full stack AI app¶
Audience: Users who want to build apps with multiple components.
Prereqs: You know how to build a component.
What is a full stack AI app?¶
In the ML world, workflows coordinate multiple pieces of code working together. In Lightning, when we coordinate 2 or more Lightning components working together, we instead call it a Lightning App. The difference will become more obvious when we introduce reactive workflows in the advanced section.
For the time being, we’ll go over how to coordinate 2 components together in a traditional workflow setting and explain how it works.
Note
If you’ve used workflow tools for Python, this page describes conventional DAGs. In level 6, we introduce reactive workflows that generalize beyond DAGs so you can build complex systems without much effort.
The toy app¶
In this app, we define two components that run across 2 separate machines. One to train a model on a GPU machine and one to analyze the model on a separate CPU machine. We save money by stopping the GPU machine when the work is done.
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
Now run the app:
lightning run app app.py --cloud
lightning run app app.py
Now you can develop distributed cloud apps on your laptop 🤯🤯🤯🤯!
Now you know:¶
Without going out of your way, you’re now doing the following: (Hint: Click visualize to see an animation describing the code).
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
self.train.run("CPU machine 1")
self.analyze.run("GPU machine 2")
app = LightningApp(WorkflowOrchestrator())
Maintain full control¶
Although we’ve abstracted the infrastructure, you still have full control when you need it:
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
# run training once
self.train.run("GPU machine 1")
# run analysis once, then every hour again...
if self.schedule("hourly"):
self.analyze.run("CPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
# run training once
self.train.run("GPU machine 1")
# run analysis once, then every hour again...
if self.schedule("5 4 * * *"):
self.analyze.run("CPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class TrainComponent(LightningWork):
def run(self, x):
print(f'train a model on {x}')
class AnalyzeComponent(LightningWork):
def run(self, x):
print(f'analyze model on {x}')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(cloud_compute=CloudCompute('cpu'))
self.analyze = AnalyzeComponent(cloud_compute=CloudCompute('gpu'))
def run(self):
# run() starts the machine
self.train.run("GPU machine 1")
# stop() stops the machine
self.train.stop()
# run analysis ONLY when machine 1 stopped
if self.train.status.STOPPED:
self.analyze.run("CPU machine 2")
app = LightningApp(WorkflowOrchestrator())
# app.py
import subprocess
from lightning.app import LightningWork, LightningFlow, LightningApp, CloudCompute
class ExternalModelServer(LightningWork):
def run(self, x):
# compile
process = subprocess.Popen('g++ model_server.cpp -o model_server')
process.wait()
process = subprocess.Popen('./model_server')
process.wait()
class LocustLoadTester(LightningWork):
def run(self, x):
cmd = f'locust --master-host {self.host} --master-port {self.port}'
process = subprocess.Popen(cmd)
process.wait()
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.serve = ExternalModelServer(
cloud_compute=CloudCompute('cpu'), parallel=True
)
self.load_test = LocustLoadTester(cloud_compute=CloudCompute('cpu'))
def run(self):
# start the server (on a CPU machine 1)
self.serve.run()
# load testing when the server is up (on a separate cpu machine 2)
if self.serve.state.RUNNING:
self.load_test.run()
app = LightningApp(WorkflowOrchestrator())
Next: Review how to debug¶
The next levels does a 2 minute review to make sure you know how to debug a Lightning app.