Level 7: Share variables between components¶
Audience: Users who want to share variables and files across Lightning components.
Prereqs: You must have finished intermediate level 5+.
Send a variable from Flow to a Component¶
When a variable is defined on the LightningFlow (orchestrator), and then it’s passed into functions for the work components, under the hood Lightning sends the variables across the machines for you automatically.
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class Component(LightningWork):
def run(self, x):
print(f'MACHINE 1: this string came from machine 0: "{x}"')
print('MACHINE 1: this string is on machine 1')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.component = Component()
def run(self):
x = 'hello from machine 0'
self.component.run(x)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class Component(LightningWork):
def run(self, x):
print(f'MACHINE 1: this string came from machine 0: "{x}"')
print('MACHINE 1: this string is on machine 1')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.component = Component()
def run(self):
x = 'hello from machine 0'
self.component.run(x)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class Component(LightningWork):
def run(self, x):
print(f'MACHINE 1: this string came from machine 0: "{x}"')
print('MACHINE 1: this string is on machine 1')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.component = Component()
def run(self):
x = 'hello from machine 0'
self.component.run(x)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class Component(LightningWork):
def run(self, x):
print(f'MACHINE 1: this string came from machine 0: "{x}"')
print('MACHINE 1: this string is on machine 1')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.component = Component()
def run(self):
x = 'hello from machine 0'
self.component.run(x)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class Component(LightningWork):
def run(self, x):
print(f'MACHINE 1: this string came from machine 0: "{x}"')
print('MACHINE 1: this string is on machine 1')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.component = Component()
def run(self):
x = 'hello from machine 0'
self.component.run(x)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class Component(LightningWork):
def run(self, x):
print(f'MACHINE 1: this string came from machine 0: "{x}"')
print('MACHINE 1: this string is on machine 1')
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.component = Component()
def run(self):
x = 'hello from machine 0'
self.component.run(x)
app = LightningApp(WorkflowOrchestrator())
CLI output
$ lightning run app app.py --open-ui=false
Your Lightning App is starting. This won't take long.
INFO: Your app has started. View it in your browser: http://127.0.0.1:7501/view
MACHINE 1: this string came from machine 0: "hello from machine 0"
MACHINE 1: this string is on machine 1
In this example, we learned that we can send variables to components like in regular Python. On a local machine, it will behave like Python. When the workflow is distributed on the cloud, it makes network calls under the hood, but still functions like Python to you.
Send a variable between two components¶
A majority of workflows (especially in ML), require components to respond to a change in a component likely running on a separate machine or even cluster.
Example Continuous deployment: Every time a model saves a checkpoint, we redeploy a model:
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
import time
class TrainComponent(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_checkpoint_path = None
def run(self):
# pretend to train and save a checkpoint every 10 steps
for step in (range(1000)):
time.sleep(1.0)
fake_loss = round(1/(step + 0.00001), 4)
print(f'{step=}: {fake_loss=} ')
if step % 10 == 0:
self.last_checkpoint_path = f'/some/path/{step=}_{fake_loss=}'
print(f'TRAIN COMPONENT: saved new checkpoint: {self.last_checkpoint_path}')
class ModelDeploymentComponent(LightningWork):
def run(self, new_checkpoint):
print(f'DEPLOY COMPONENT: load new model from checkpoint: {new_checkpoint}')
class ContinuousDeployment(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(parallel=True)
self.model_deployment = ModelDeploymentComponent(parallel=True)
def run(self):
self.train.run()
if self.train.last_checkpoint_path:
self.model_deployment.run(self.train.last_checkpoint_path)
app = LightningApp(ContinuousDeployment())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
import time
class TrainComponent(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_checkpoint_path = None
def run(self):
# pretend to train and save a checkpoint every 10 steps
for step in (range(1000)):
time.sleep(1.0)
fake_loss = round(1/(step + 0.00001), 4)
print(f'{step=}: {fake_loss=} ')
if step % 10 == 0:
self.last_checkpoint_path = f'/some/path/{step=}_{fake_loss=}'
print(f'TRAIN COMPONENT: saved new checkpoint: {self.last_checkpoint_path}')
class ModelDeploymentComponent(LightningWork):
def run(self, new_checkpoint):
print(f'DEPLOY COMPONENT: load new model from checkpoint: {new_checkpoint}')
class ContinuousDeployment(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(parallel=True)
self.model_deployment = ModelDeploymentComponent(parallel=True)
def run(self):
self.train.run()
if self.train.last_checkpoint_path:
self.model_deployment.run(self.train.last_checkpoint_path)
app = LightningApp(ContinuousDeployment())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
import time
class TrainComponent(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_checkpoint_path = None
def run(self):
# pretend to train and save a checkpoint every 10 steps
for step in (range(1000)):
time.sleep(1.0)
fake_loss = round(1/(step + 0.00001), 4)
print(f'{step=}: {fake_loss=} ')
if step % 10 == 0:
self.last_checkpoint_path = f'/some/path/{step=}_{fake_loss=}'
print(f'TRAIN COMPONENT: saved new checkpoint: {self.last_checkpoint_path}')
class ModelDeploymentComponent(LightningWork):
def run(self, new_checkpoint):
print(f'DEPLOY COMPONENT: load new model from checkpoint: {new_checkpoint}')
class ContinuousDeployment(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(parallel=True)
self.model_deployment = ModelDeploymentComponent(parallel=True)
def run(self):
self.train.run()
if self.train.last_checkpoint_path:
self.model_deployment.run(self.train.last_checkpoint_path)
app = LightningApp(ContinuousDeployment())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
import time
class TrainComponent(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_checkpoint_path = None
def run(self):
# pretend to train and save a checkpoint every 10 steps
for step in (range(1000)):
time.sleep(1.0)
fake_loss = round(1/(step + 0.00001), 4)
print(f'{step=}: {fake_loss=} ')
if step % 10 == 0:
self.last_checkpoint_path = f'/some/path/{step=}_{fake_loss=}'
print(f'TRAIN COMPONENT: saved new checkpoint: {self.last_checkpoint_path}')
class ModelDeploymentComponent(LightningWork):
def run(self, new_checkpoint):
print(f'DEPLOY COMPONENT: load new model from checkpoint: {new_checkpoint}')
class ContinuousDeployment(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(parallel=True)
self.model_deployment = ModelDeploymentComponent(parallel=True)
def run(self):
self.train.run()
if self.train.last_checkpoint_path:
self.model_deployment.run(self.train.last_checkpoint_path)
app = LightningApp(ContinuousDeployment())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
import time
class TrainComponent(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_checkpoint_path = None
def run(self):
# pretend to train and save a checkpoint every 10 steps
for step in (range(1000)):
time.sleep(1.0)
fake_loss = round(1/(step + 0.00001), 4)
print(f'{step=}: {fake_loss=} ')
if step % 10 == 0:
self.last_checkpoint_path = f'/some/path/{step=}_{fake_loss=}'
print(f'TRAIN COMPONENT: saved new checkpoint: {self.last_checkpoint_path}')
class ModelDeploymentComponent(LightningWork):
def run(self, new_checkpoint):
print(f'DEPLOY COMPONENT: load new model from checkpoint: {new_checkpoint}')
class ContinuousDeployment(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(parallel=True)
self.model_deployment = ModelDeploymentComponent(parallel=True)
def run(self):
self.train.run()
if self.train.last_checkpoint_path:
self.model_deployment.run(self.train.last_checkpoint_path)
app = LightningApp(ContinuousDeployment())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
import time
class TrainComponent(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_checkpoint_path = None
def run(self):
# pretend to train and save a checkpoint every 10 steps
for step in (range(1000)):
time.sleep(1.0)
fake_loss = round(1/(step + 0.00001), 4)
print(f'{step=}: {fake_loss=} ')
if step % 10 == 0:
self.last_checkpoint_path = f'/some/path/{step=}_{fake_loss=}'
print(f'TRAIN COMPONENT: saved new checkpoint: {self.last_checkpoint_path}')
class ModelDeploymentComponent(LightningWork):
def run(self, new_checkpoint):
print(f'DEPLOY COMPONENT: load new model from checkpoint: {new_checkpoint}')
class ContinuousDeployment(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(parallel=True)
self.model_deployment = ModelDeploymentComponent(parallel=True)
def run(self):
self.train.run()
if self.train.last_checkpoint_path:
self.model_deployment.run(self.train.last_checkpoint_path)
app = LightningApp(ContinuousDeployment())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
import time
class TrainComponent(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_checkpoint_path = None
def run(self):
# pretend to train and save a checkpoint every 10 steps
for step in (range(1000)):
time.sleep(1.0)
fake_loss = round(1/(step + 0.00001), 4)
print(f'{step=}: {fake_loss=} ')
if step % 10 == 0:
self.last_checkpoint_path = f'/some/path/{step=}_{fake_loss=}'
print(f'TRAIN COMPONENT: saved new checkpoint: {self.last_checkpoint_path}')
class ModelDeploymentComponent(LightningWork):
def run(self, new_checkpoint):
print(f'DEPLOY COMPONENT: load new model from checkpoint: {new_checkpoint}')
class ContinuousDeployment(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.train = TrainComponent(parallel=True)
self.model_deployment = ModelDeploymentComponent(parallel=True)
def run(self):
self.train.run()
if self.train.last_checkpoint_path:
self.model_deployment.run(self.train.last_checkpoint_path)
app = LightningApp(ContinuousDeployment())
CLI output:
$ lightning run app app.py --open-ui=false
Your Lightning App is starting. This won't take long.
INFO: Your app has started. View it in your browser: http://127.0.0.1:7501/view
step=0: fake_loss=100000.0
TRAIN COMPONENT: saved new checkpoint: /some/path/step=0_fake_loss=100000.0
step=1: fake_loss=1.0
DEPLOY COMPONENT: load new model from checkpoint: /some/path/step=0_fake_loss=100000.0
step=2: fake_loss=0.5
step=3: fake_loss=0.3333
step=4: fake_loss=0.25
step=5: fake_loss=0.2
step=6: fake_loss=0.1667
step=7: fake_loss=0.1429
step=8: fake_loss=0.125
step=9: fake_loss=0.1111
step=10: fake_loss=0.1
TRAIN COMPONENT: saved new checkpoint: /some/path/step=10_fake_loss=0.1
DEPLOY COMPONENT: load new model from checkpoint: /some/path/step=10_fake_loss=0.1
step=11: fake_loss=0.0909
step=12: fake_loss=0.0833
step=13: fake_loss=0.0769
step=14: fake_loss=0.0714
step=15: fake_loss=0.0667
step=16: fake_loss=0.0625
step=17: fake_loss=0.0588
step=18: fake_loss=0.0556
step=19: fake_loss=0.0526
step=20: fake_loss=0.05
TRAIN COMPONENT: saved new checkpoint: /some/path/step=20_fake_loss=0.05
DEPLOY COMPONENT: load new model from checkpoint: /some/path/step=20_fake_loss=0.05
step=21: fake_loss=0.0476
Send a large variable between two components¶
For large variables such as arrays, tensors, embeddings and so on, use Payload to enable transferring them across components.
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class EmbeddingProcessor(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.embeddings = None
def run(self):
print('PROCESSOR: Generating embeddings...')
fake_embeddings = [[1, 2, 3], [2, 3, 4]]
self.embeddings = storage.Payload(fake_embeddings)
class EmbeddingServer(LightningWork):
def run(self, payload):
print('SERVER: Using embeddings from processor', payload)
embeddings = payload.value
print('serving embeddings sent from EmbeddingProcessor: ', embeddings)
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.processor = EmbeddingProcessor()
self.server = EmbeddingServer()
def run(self):
self.processor.run()
self.server.run(self.processor.embeddings)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class EmbeddingProcessor(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.embeddings = None
def run(self):
print('PROCESSOR: Generating embeddings...')
fake_embeddings = [[1, 2, 3], [2, 3, 4]]
self.embeddings = storage.Payload(fake_embeddings)
class EmbeddingServer(LightningWork):
def run(self, payload):
print('SERVER: Using embeddings from processor', payload)
embeddings = payload.value
print('serving embeddings sent from EmbeddingProcessor: ', embeddings)
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.processor = EmbeddingProcessor()
self.server = EmbeddingServer()
def run(self):
self.processor.run()
self.server.run(self.processor.embeddings)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class EmbeddingProcessor(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.embeddings = None
def run(self):
print('PROCESSOR: Generating embeddings...')
fake_embeddings = [[1, 2, 3], [2, 3, 4]]
self.embeddings = storage.Payload(fake_embeddings)
class EmbeddingServer(LightningWork):
def run(self, payload):
print('SERVER: Using embeddings from processor', payload)
embeddings = payload.value
print('serving embeddings sent from EmbeddingProcessor: ', embeddings)
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.processor = EmbeddingProcessor()
self.server = EmbeddingServer()
def run(self):
self.processor.run()
self.server.run(self.processor.embeddings)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class EmbeddingProcessor(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.embeddings = None
def run(self):
print('PROCESSOR: Generating embeddings...')
fake_embeddings = [[1, 2, 3], [2, 3, 4]]
self.embeddings = storage.Payload(fake_embeddings)
class EmbeddingServer(LightningWork):
def run(self, payload):
print('SERVER: Using embeddings from processor', payload)
embeddings = payload.value
print('serving embeddings sent from EmbeddingProcessor: ', embeddings)
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.processor = EmbeddingProcessor()
self.server = EmbeddingServer()
def run(self):
self.processor.run()
self.server.run(self.processor.embeddings)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class EmbeddingProcessor(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.embeddings = None
def run(self):
print('PROCESSOR: Generating embeddings...')
fake_embeddings = [[1, 2, 3], [2, 3, 4]]
self.embeddings = storage.Payload(fake_embeddings)
class EmbeddingServer(LightningWork):
def run(self, payload):
print('SERVER: Using embeddings from processor', payload)
embeddings = payload.value
print('serving embeddings sent from EmbeddingProcessor: ', embeddings)
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.processor = EmbeddingProcessor()
self.server = EmbeddingServer()
def run(self):
self.processor.run()
self.server.run(self.processor.embeddings)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class EmbeddingProcessor(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.embeddings = None
def run(self):
print('PROCESSOR: Generating embeddings...')
fake_embeddings = [[1, 2, 3], [2, 3, 4]]
self.embeddings = storage.Payload(fake_embeddings)
class EmbeddingServer(LightningWork):
def run(self, payload):
print('SERVER: Using embeddings from processor', payload)
embeddings = payload.value
print('serving embeddings sent from EmbeddingProcessor: ', embeddings)
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.processor = EmbeddingProcessor()
self.server = EmbeddingServer()
def run(self):
self.processor.run()
self.server.run(self.processor.embeddings)
app = LightningApp(WorkflowOrchestrator())
# app.py
from lightning.app import LightningWork, LightningFlow, LightningApp
class EmbeddingProcessor(LightningWork):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.embeddings = None
def run(self):
print('PROCESSOR: Generating embeddings...')
fake_embeddings = [[1, 2, 3], [2, 3, 4]]
self.embeddings = storage.Payload(fake_embeddings)
class EmbeddingServer(LightningWork):
def run(self, payload):
print('SERVER: Using embeddings from processor', payload)
embeddings = payload.value
print('serving embeddings sent from EmbeddingProcessor: ', embeddings)
class WorkflowOrchestrator(LightningFlow):
def __init__(self) -> None:
super().__init__()
self.processor = EmbeddingProcessor()
self.server = EmbeddingServer()
def run(self):
self.processor.run()
self.server.run(self.processor.embeddings)
app = LightningApp(WorkflowOrchestrator())
CLI output
$ lightning run app app.py --open-ui=false
Your Lightning App is starting. This won't take long.
INFO: Your app has started. View it in your browser: http://127.0.0.1:7501/view
PROCESSOR: Generating embeddings...
SERVER: Using embeddings from processor <lightning.app.storage.payload.Payload object at 0x123383d60>
serving embeddings sent from EmbeddingProcessor: [[1, 2, 3], [2, 3, 4]]
The payload object keeps the data on the machine and passes a pointer to the data around the app until the data is needed by a component.
Next steps: Share files between components¶
Now that you know how to run components in parallel, we’ll learn to share variables across components to simplify complex workflows.