Generating production-level streaming microservices using AI

admin

[faststream-gen](https://github.com/airtai/faststream-gen/) is a Python library that uses generative AI to automatically generate [FastStream](https://github.com/airtai/faststream) applications.Simply describe your microservice in plain English, and [faststream-gen](https://github.com/airtai/faststream-gen/) will generate a production-level FastStream application ready to deploy in a few minutes and under $1 cost.Documentation: [https://faststream-gen.airt.ai](https://faststream-gen.airt.ai) Source Code: [https://github.com/airtai/faststream-gen](https://github.com/airtai/faststream-gen) The code generator for FastStream is a Python library that automates the…

[faststream-gen](https://github.com/airtai/faststream-gen/) is a Python library that uses generative AI to automatically generate [FastStream](https://github.com/airtai/faststream) applications.Simply describe your microservice in plain English, and [faststream-gen](https://github.com/airtai/faststream-gen/) will generate a production-level FastStream application ready to deploy in a few minutes and under $1 cost.Documentation: [https://faststream-gen.airt.ai](https://faststream-gen.airt.ai) Source Code: [https://github.com/airtai/faststream-gen](https://github.com/airtai/faststream-gen) The code generator for FastStream is a Python library that automates the process of creating streaming microservices for Apache Kafka, RabbitMQ and other supported protocols.It works by taking your application descriptions and swiftly turning them into a ready-to-deploy FastStream applications.The key features are: Automatic code generation: faststream-gen enables you to easily generate complete FastStream application with minimal effort.This library allows you to outline your application requirements, and it will quickly transform them into a fully-fledged FastStream project.Automatic test generation: faststream-gen provides dependable code through rigorous testing, including pre-implemented integration tests, ensuring stability and functionality, saving development time, and preventing common bugs.Script Templates: Streamline the deployment of your FastStream application using faststream-gen’s built-in scripts, tailored for initiating, subscribing to Kafka topic and shutting down the local Kafka broker.

Automatic CI/CD integration: faststream-gen integrates seamlessly with your version control and continuous integration pipeline through its GitHub workflow files.These predefined configuration files are optimized for FastStream projects, enabling smooth integration with GitHub Actions.You can automate tasks such as code validation, testing, and deployment, ensuring that your FastStream application remains in top shape throughout its development lifecycle.In this tutorial, we will walk through the process of using the faststream-gen Python library to generate two FastStream applications.

The first application will demonstrate how to retrieve current cryptocurrency prices from various web sources.Subsequently, we will utilize the functionalities of FastStream to publish retrieved data as messages to a Kafka topic.The second application will showcase how to consume messages from the Kafka topic using FastStream consumer capabilities.It will process these messages to extract information about cryptocurrencies and calculate the price mean for each cryptocurrency within a certain time window and publish the price mean to another topic.

A short version of the tutorial is available as a video at the following link (roughly half of it): Let’s get started! [ ](#cryptocurrency-analysis-with-faststream) Cryptocurrency analysis with FastStream In this tutorial, we will walk you through the process of using the faststream-gen Python library to retrieve cryptocurrency prices in real time and calculate their moving average.To accomplish that, we will generate the following two [FastStream](https://faststream.airt.ai) applications: A microservice retrieving current cryptocurrency prices from an external [web service](https://docs.cloud.coinbase.com/sign-in-with-coinbase/docs/api-prices#get-spot-price) A microservice consuming such messages, calculating the moving average price for each cryptocurrency and publishing it to another Kafka topic.

Let’s get started! [ ](#installation) Installation To complete this tutorial, you will need the following software and Python library: [Python](https://www.python.org/)(version 3.8 and upward) a valid [OPENAI API key](https://platform.openai.com/account/api-keys) [optional] [github account](https://github.com/)and installed [git command](https://git-scm.com/) It is recommended to use a virtual environment for your Python projects.In this tutorial, we will be using Python’s venv module to create a virtual environment.First, create a root directory for this tutorial.

Navigate to the desired location and create a new directory called faststream_gen_tutorial and enter it.mkdir faststream_gen_tutorial cd faststream_gen_tutorial [ ](#creating-and-activating-a-new-python-virtual-environment) Creating and activating a new Python virtual environment Create a new virtual environment using [venv](https://docs.python.org/3/library/venv.html): python3 -m venv venv Next, activate your new virtual environment: source venv/bin/activate [ ](#installing-the-packages) Installing the packages Upgrade pip if needed and install faststream-gen package: pip install –upgrade pip && pip install faststream-gen Check that the installation was successful by running the following command: faststream_gen –help You should see the full list of options of the command in the output.Now you have successfully set up the environment and installed the faststream-gen package.

[ ](#setting-up-openai-api-key) Setting up OpenAI API key faststream-gen uses OpenAI API and you need to export your API key in environment variable OPENAI_API_KEY.If you use bash or compatible shell, you can do that with the following command: export OPENAI_API_KEY=”sk-your-openai-api-key” If you don’t already have OPENAI_API_KEY, you can create one [here](https://platform.openai.com/account/api-keys).[ ](#generating-faststream-applications) Generating FastStream applications [ ](#retrieving-and-publishing-crypto-prices) Retrieving and publishing crypto prices Now, we will create an application which retrieves information about current cryptocurrency prices from an external [web service](https://docs.cloud.coinbase.com/sign-in-with-coinbase/docs/api-prices#get-spot-price) and publishes messages to a Kafka topic.In order to achieve this, we need to provide a high-level description of the application in plain English containing only the necessary information needed by a knowledgeable Python developer familiar with FastStream framework to implement it.

This should include details such as the message schema, instructions on external API-s and web service, and guidance on selecting the appropriate topic and partition keys.Below is an example of such a description used in this particular case.Notice that we did not specify steps needed to actually implement, we specified what the service should do, but not how to do it.Create a FastStream application which will retrieve the current cryptocurrency price and publish it to the new_crypto_price topic.

The application should retrieve the data every 2 seconds.A message which will be produced is JSON with the two attributes: – price: non-negative float (current price of cryptocurrency in USD) – crypto_currency: string (the cryptocurrency e.g.BTC, ETH…) The current price of Bitcoin can be retrieved by a simple GET request to: – https://api.coinbase.com/v2/prices/BTC-USD/spot The current price of Ethereum can be retrieved by a simple GET request to: – https://api.coinbase.com/v2/prices/ETH-USD/spot The response of this GET request is a JSON and you can get information about the crypto_currency in: response[‘data’][‘base’] and the information about the price in: response[‘data’][‘amount’] Use utf-8 encoded crypto_currency attribute as a partition key when publishing the message to new_crypto_price topic.Let’s generate a new FastStream project inside the retrieve-publish-crypto directory.

First, copy the previous description and paste it into a file called description_retrieve_publish.txt.Next, run the following command (parameter -i specifies the file path for the app description file, while the parameter -o specifies the directory where the generated project files will be saved): faststream_gen -i description_retrieve_publish.txt -o retrieve-publish-crypto ✨ Generating a new FastStream application! ✔ Application description validated.✔ FastStream app skeleton code generated.✔ The app and the tests are generated.

✔ New FastStream project created.✔ Integration tests were successfully completed.

Tokens used: 36938 Total Cost (USD): $0.11436 ✨ All files were successfully generated! [ ](#failed-generation) Failed generation The generation process is not bullet-proof and it could fail with a message like this: ✨ Generating a new FastStream application! ✔ Application description validated.✔ New FastStream project created.✔ FastStream app skeleton code generated.✘ Error: Failed to generate a valid application and test code.✘ Error: Integration tests failed.Tokens used: 79384 Total Cost (USD): $0.24567 Apologies, we couldn’t generate a working application and test code from your application description.

Please run the following command to start manual debugging: cd retrieve_without_logs && pytest For in-depth debugging, check the retrieve-publish-crypto/_faststream_gen_logs directory for complete logs, including individual step information.There can be a number of reasons for that, the most common being: The specification you provided was not sufficiently detailed to generate the application.In such cases, you could try to add more detailed instructions and try again.The task is too difficult for the default GPT-3.5 model to handle and you could try to use GPT-4 instead: faststream_gen –model gpt-4 -i description_retrieve_publish.txt -o retrieve-publish-crypto – You were unlucky and you just need to execute the command again.Large language models are stochastic in their nature and they always give different answers to the same questions.There are retry mechanisms in the engine, but sometimes they are not enough and just rerunning the command can mediate the problem.If none of the above strategies work, check the already generated files and see what the potential problem could be.You can also finish the implementation or tests yourself.

[ ](#successful-generation) Successful generation If successful, the command will generate a FastStream project with the following structure: retrieve-publish-crypto ├── .github │ └── workflows │ ├── build_docker.yml │ ├── deploy_docs.yml │ └── test.yml ├── .gitignore ├── Dockerfile ├── LICENSE ├── README.md ├── app │ └── application.py ├── pyproject.toml ├── scripts │ ├── build_docker.sh │ ├── lint.sh │ ├── services.yml │ ├── start_kafka_broker_locally.sh │ ├── static-analysis.sh │ ├── stop_kafka_broker_locally.sh │ └── subscribe_to_kafka_broker_locally.sh └── tests ├── __init__.py └── test_application.py The generated application is located in the app/ directory, while the tests are located in the tests/ directory.It is important to keep in mind that these files are generated by LLM and may vary with each generation.app/application.py: import asyncio import json from datetime import datetime import aiohttp from pydantic import BaseModel, Field, NonNegativeFloat from faststream import ContextRepo, FastStream, Logger from faststream.kafka import KafkaBroker broker = KafkaBroker(“localhost:9092″) app = FastStream(broker) class CryptoPrice(BaseModel): price: NonNegativeFloat = Field( …, examples=[50000.0], description=”Current price of cryptocurrency in USD” ) crypto_currency: str = Field( …, examples=[“BTC”], description=”The cryptocurrency” ) publisher = broker.publisher(“new_crypto_price”) async def fetch_crypto_price( url: str, crypto_currency: str, logger: Logger, context: ContextRepo, time_interval: int = 2 ) -> None: # Always use context: ContextRepo for storing app_is_running variable while context.get(“app_is_running”): async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() price = data[“data”][“amount”] new_crypto_price = CryptoPrice( price=price, crypto_currency=crypto_currency ) await publisher.publish( new_crypto_price, key=crypto_currency.encode(“utf-8″), ) else: logger.warning( f”Failed API request {url} at time {datetime.now()}” ) await asyncio.sleep(time_interval) @app.on_startup async def app_setup(context: ContextRepo): context.set_global(“app_is_running”, True) @app.on_shutdown async def shutdown(context: ContextRepo): context.set_global(“app_is_running”, False) # Get all the running tasks and wait them to finish fetch_tasks = context.get(“fetch_tasks”) await asyncio.gather(*fetch_tasks) @app.after_startup async def publish_crypto_price(logger: Logger, context: ContextRepo): logger.info(“Starting publishing:”) cryptocurrencies = [(“Bitcoin”, “BTC”), (“Ethereum”, “ETH”)] fetch_tasks = [ asyncio.create_task( fetch_crypto_price( f”https://api.coinbase.com/v2/prices/{crypto_currency}-USD/spot”, crypto_currency, logger, context, ) ) for _, crypto_currency in cryptocurrencies ] # you need to save asyncio tasks so you can wait them to finish at app shutdown (the function with @app.on_shutdown function) context.set_global(“fetch_tasks”, fetch_tasks) tests/test_application.py: import pytest from faststream import Context, TestApp from faststream.kafka import TestKafkaBroker from app.application import CryptoPrice, app, broker @broker.subscriber(“new_crypto_price”) async def on_new_crypto_price( msg: CryptoPrice, key: bytes = Context(“message.raw_message.key”) ): pass @pytest.mark.asyncio async def test_fetch_crypto_price(): async with TestKafkaBroker(broker): async with TestApp(app): await on_new_crypto_price.wait_call(2) on_new_crypto_price.mock.assert_called() [ ](#creating-a-new-python-virtual-environment) Creating a new Python virtual environment All the required dependencies to run the newly generated FastStream project are located within the pyproject.toml file.Create and activate a new virtual environment inside the retrieve-publish-crypto directory by using [venv](https://docs.python.org/3/library/venv.html): cd retrieve-publish-crypto python3 -m venv venv source venv/bin/activate Upgrade pip if needed and install the development dependencies: pip install –upgrade pip && pip install -e .[dev] [ ](#testing-the-application) Testing the application In order to verify the functional correctness of the application, it is recommended to execute the generated unit and integration test by running the pytest command.pytest test session starts platform linux — Python 3.11.4, pytest-7.4.2, pluggy-1.3.0 rootdir: /work/fastkafka-gen/docs_src/tutorial/retrieve-publish-crypto configfile: pyproject.toml plugins: anyio-3.7.1, asyncio-0.21.1 asyncio: mode=Mode.STRICT collected 1 item tests/test_application.py .[100%] 1 passed in 2.65s [ ](#previewing-asyncapi-docs) Previewing AsyncAPI Docs To preview AsyncAPI Docs for the application, execute the following command: faststream docs serve app.application:app INFO: Started server process [3575270] INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit) You can now access the AsyncAPI Docs by opening [localhost:8000](http://localhost:8000) in your browser.

[ ](#starting-localhost-kafka-broker) Starting localhost Kafka broker To run the FastStream application locally, ensure that you have a running Kafka broker.You can start a Kafka Docker container by executing the start_kafka_broker_locally.sh shell script: ./scripts/start_kafka_broker_locally.sh [+] Running 2/2 ⠿ Network scripts_default Created 0.1s ⠿ Container bitnami_kafka Started [ ](#starting-the-application) Starting the application To start the application, execute the following command: faststream run app.application:app 2023-09-15 13:41:21,948 INFO – FastStream app starting…2023-09-15 13:41:22,144 INFO – | – Starting publishing: 2023-09-15 13:41:22,144 INFO – FastStream app started successfully! To exit press CTRL+C Topic new_crypto_price not found in cluster metadata Ensure that the app remains running, it is needed for the subsequent steps.[ ](#calculating-the-moving-average) Calculating the moving average Let’s develop an application that calculates the average price of the three most recent messages received from the new_crypto_price topic for each cryptocurrency.Subsequently, we will publish the computed average price to the price_mean topic.Here is the full description of the desired application: Create a FastStream application for consuming messages from the new_crypto_price topic.

This topic needs to use a partition key.new_crypto_price messages use JSON with two attributes (create class CryptoPrice with these attributes): – price: non-negative float (it represents the current price of the crypto) – crypto_currency: string (it represents the cryptocurrency e.g.BTC, ETH…) The application should save each message to a dictionary (global variable) – partition key should be used as a dictionary key and value should be a List of prices.Keep only the last 100 messages in the dictionary.If there are fewer than 3 messages for a given partition key, do not publish any messages.Otherwise, Calculate the price mean of the last 3 messages for the given partition key.Publish the price mean to the price_mean topic and use the same partition key that the new_crypto_price topic is using.

Please open a new terminal and navigate to the root directory of this tutorial, which is called faststream_gen_tutorial.Once you are inside the faststream_gen_tutorial folder, please activate the virtual environment.cd path_to/faststream_gen_tutorial source venv/bin/activate To create a faststream application inside the calculate-mean-app directory, first copy the previous description and paste it into the description_calculate_mean.txt file.

Next, run the following command: faststream_gen -i description_calculate_mean.txt -o calculate-mean-app ✨ Generating a new FastStream application! ✔ Application description validated.✔ FastStream app skeleton code generated.✔ The app and the tests are generated.✔ New FastStream project created.

✔ Integration tests were successfully completed.Tokens used: 13367 Total Cost (USD): $0.04147 ✨ All files were successfully generated! If successful, the command will generate calculate-mean-app directory with app/application.py and tests/test_application.py inside.If not, just rerun the command again until it succeds.app/application.py: from typing import Dict, List from pydantic import BaseModel, Field, NonNegativeFloat from faststream import Context, ContextRepo, FastStream, Logger from faststream.kafka import KafkaBroker class CryptoPrice(BaseModel): price: NonNegativeFloat = Field( …, examples=[50000], description=”Current price of the cryptocurrency” ) crypto_currency: str = Field( …, examples=[“BTC”], description=”Cryptocurrency symbol” ) broker = KafkaBroker(“localhost:9092”) app = FastStream(broker) publisher = broker.publisher(“price_mean”) @app.on_startup async def app_setup(context: ContextRepo): message_history: Dict[str, List[float]] = {} context.set_global(“message_history”, message_history) @broker.subscriber(“new_crypto_price”) async def on_new_crypto_price( msg: CryptoPrice, logger: Logger, message_history: Dict[str, List[float]] = Context(), key: bytes = Context(“message.raw_message.key”), ) -> None: logger.info(f”New crypto price {msg=}”) crypto_key = key.decode(“utf-8”) if crypto_key not in message_history: message_history[crypto_key] = [] message_history[crypto_key].append(msg.price) if len(message_history[crypto_key]) > 100: message_history[crypto_key] = message_history[crypto_key][-100:] if len(message_history[crypto_key]) >= 3: price_mean = sum(message_history[crypto_key][-3:]) / 3 await publisher.publish(price_mean, key=key) tests/test_application.py: import pytest from faststream import Context, TestApp from faststream.kafka import TestKafkaBroker from app.application import CryptoPrice, app, broker, on_new_crypto_price @broker.subscriber(“price_mean”) async def on_price_mean(msg: float, key: bytes = Context(“message.raw_message.key”)): pass @pytest.mark.asyncio async def test_app(): async with TestKafkaBroker(broker): async with TestApp(app): await broker.publish( CryptoPrice(price=50000, crypto_currency=”BTC”), “new_crypto_price”, key=b”BTC”, ) on_new_crypto_price.mock.assert_called_with( dict(CryptoPrice(price=50000, crypto_currency=”BTC”)) ) on_price_mean.mock.assert_not_called() await broker.publish( CryptoPrice(price=60000, crypto_currency=”BTC”), “new_crypto_price”, key=b”BTC”, ) on_new_crypto_price.mock.assert_called_with( dict(CryptoPrice(price=60000, crypto_currency=”BTC”)) ) on_price_mean.mock.assert_not_called() await broker.publish( CryptoPrice(price=70000, crypto_currency=”BTC”), “new_crypto_price”, key=b”BTC”, ) on_new_crypto_price.mock.assert_called_with( dict(CryptoPrice(price=70000, crypto_currency=”BTC”)) ) on_price_mean.mock.assert_called_with(60000.0) [ ](#creating-a-new-python-virtual-environment) Creating a new Python virtual environment All the required dependencies to run the newly generated FastStream project are located within the pyproject.toml file.

Create a new virtual environment and install the development dependencies for the project.Create and activate a new virtual environment inside the calculate-mean-app directory by using [env](https://docs.python.org/3/library/venv.html): cd calculate-mean-app python3 -m venv venv source venv/bin/activate Upgrade pip if needed and install the development dependencies: pip install –upgrade pip && pip install -e .[dev] [ ](#testing-the-application) Testing the application In order to verify functional correctness of the application, it is recommended to execute the generated unit and integration test by running the pytest command.

pytest test session starts platform linux — Python 3.11.4, pytest-7.4.2, pluggy-1.3.0 rootdir: /work/fastkafka-gen/docs_src/tutorial/calculate-mean-app configfile: pyproject.toml plugins: anyio-3.7.1, asyncio-0.21.1 asyncio: mode=Mode.STRICT collected 1 item tests/test_application.py .

[100%] 1 passed in 0.44s.

Leave a Reply

Next Post

From World Bank to Web3: A Journey of Investment in Tech and Blockchain

From World Bank to Web3: A Journey of Investment in Tech and Blockchain CXOToday has engaged in an exclusive interview with Ms.Kavita Gupta, Founder – Delta Blockchain Funds & FINTECH.TV - What prompted you to start investing in tech? I started my own company after working for years at the World Bank and IFC.I started…
From World Bank to Web3: A Journey of Investment in Tech and Blockchain

Subscribe US Now