2024 DevOps Lifecycle: Share your expertise on CI/CD, deployment metrics, tech debt, and more for our Feb. Trend Report (+ enter a raffle!).
Kubernetes in the Enterprise: Join our Virtual Roundtable as we dive into Kubernetes over the past year, core usages, and emerging trends.
Welcome to the Data Engineering category of DZone, where you will find all the information you need for AI/ML, big data, data, databases, and IoT. As you determine the first steps for new systems or reevaluate existing ones, you're going to require tools and resources to gather, store, and analyze data. The Zones within our Data Engineering category contain resources that will help you expertly navigate through the SDLC Analysis stage.
Artificial intelligence (AI) and machine learning (ML) are two fields that work together to create computer systems capable of perception, recognition, decision-making, and translation. Separately, AI is the ability for a computer system to mimic human intelligence through math and logic, and ML builds off AI by developing methods that "learn" through experience and do not require instruction. In the AI/ML Zone, you'll find resources ranging from tutorials to use cases that will help you navigate this rapidly growing field.
Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Data is at the core of software development. Think of it as information stored in anything from text documents and images to entire software programs, and these bits of information need to be processed, read, analyzed, stored, and transported throughout systems. In this Zone, you'll find resources covering the tools and strategies you need to handle data properly.
A database is a collection of structured data that is stored in a computer system, and it can be hosted on-premises or in the cloud. As databases are designed to enable easy access to data, our resources are compiled here for smooth browsing of everything you need to know from database management systems to database languages.
IoT, or the Internet of Things, is a technological field that makes it possible for users to connect devices and systems and exchange data over the internet. Through DZone's IoT resources, you'll learn about smart devices, sensors, networks, edge computing, and many other technologies — including those that are now part of the average person's daily life.
Enterprise AI
In recent years, artificial intelligence has become less of a buzzword and more of an adopted process across the enterprise. With that, there is a growing need to increase operational efficiency as customer demands arise. AI platforms have become increasingly more sophisticated, and there has become the need to establish guidelines and ownership. In DZone’s 2022 Enterprise AI Trend Report, we explore MLOps, explainability, and how to select the best AI platform for your business. We also share a tutorial on how to create a machine learning service using Spring Boot, and how to deploy AI with an event-driven platform. The goal of this Trend Report is to better inform the developer audience on practical tools and design paradigms, new technologies, and the overall operational impact of AI within the business. This is a technology space that's constantly shifting and evolving. As part of our December 2022 re-launch, we've added new articles pertaining to knowledge graphs, a solutions directory for popular AI tools, and more.
Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
Distributed SQL Essentials
In the dynamic landscape of modern application development, the synthesis of Streamlit, OpenAI, and Elasticsearch presents an exciting opportunity to craft intelligent chatbot applications that transcend conventional interactions. This article guides developers through the process of building a sophisticated chatbot that seamlessly integrates the simplicity of Streamlit, the natural language processing prowess of OpenAI, and the robust search capabilities of Elasticsearch. As we navigate through each component, from setting up the development environment to optimizing performance and deployment, readers will gain invaluable insights into harnessing the power of these technologies. Join us in exploring how this potent trio can elevate user engagement, foster more intuitive conversations, and redefine the possibilities of interactive, AI-driven applications. What Is Streamlit? Streamlit is a powerful and user-friendly Python library designed to simplify the creation of web applications, particularly for data science and machine learning projects. It stands out for its ability to transform data scripts into interactive and shareable web apps with minimal code, making it accessible to both beginners and experienced developers. Streamlit's emphasis on simplicity and rapid prototyping significantly reduces the learning curve associated with web development, allowing developers to focus on the functionality and user experience of their applications. Why Choose Streamlit for Building Chatbot Applications When it comes to constructing chatbot applications, Streamlit offers a compelling set of advantages. Its simplicity enables developers to create dynamic chat interfaces with ease, streamlining the development process. The library's real-time feedback feature allows for instant adjustments, facilitating quick iterations during the development of conversational interfaces. Streamlit's integration capabilities with data processing libraries and machine learning models make it well-suited for chatbots that require data interaction and AI-driven functionalities. Additionally, the platform's commitment to rapid prototyping aligns seamlessly with the iterative nature of refining chatbot interactions based on user feedback. Overview of Streamlit’s Features and Benefits Streamlit boasts a rich set of features that enhance the development of chatbot applications. Its diverse widgets, including sliders, buttons, and text inputs, empower developers to create interactive interfaces without delving into complex front-end coding. The platform supports easy integration of data visualization tools, making it convenient for chatbots to present information graphically. Streamlit's customization options allow developers to tailor the look and feel of their applications, ensuring a polished and brand-aligned user experience. Furthermore, Streamlit simplifies the deployment process, enabling developers to share their chatbot applications effortlessly through URLs, contributing to wider accessibility and user engagement. In essence, Streamlit offers a potent combination of simplicity, flexibility, and deployment convenience, making it an optimal choice for developers seeking an efficient framework for building intelligent chatbot applications. Overview of Chatbots Chatbots, driven by advancements in natural language processing (NLP) and artificial intelligence, have become integral components of digital interactions across various industries. These intelligent conversational agents are designed to simulate human-like interactions, providing users with a seamless and responsive experience. Deployed on websites, messaging platforms, and mobile apps, chatbots serve diverse purposes, from customer support and information retrieval to transaction processing and entertainment. One key driver behind the rise of chatbots is their ability to enhance customer engagement and satisfaction. By leveraging NLP algorithms, chatbots can understand and interpret user queries, allowing for more natural and context-aware conversations. This capability not only improves the efficiency of customer interactions but also provides a personalized touch, creating a more engaging user experience. Chatbots are particularly valuable in scenarios where instant responses and round-the-clock availability are essential, such as in customer service applications. Beyond customer-facing interactions, chatbots also find utility in streamlining business processes. They can automate repetitive tasks, answer frequently asked questions, and assist users in navigating through services or products. Moreover, chatbots contribute to data collection and analysis, as they can gather valuable insights from user interactions, helping organizations refine their products and services. As technology continues to evolve, chatbots are poised to play an increasingly pivotal role in shaping the future of human-computer interactions, offering a versatile and efficient means of communication across a wide array of domains. Introduction to OpenAI OpenAI stands as a trailblazer in the realm of artificial intelligence, known for pushing the boundaries of what machines can achieve in terms of understanding and generating human-like language. Established with a mission to ensure that artificial general intelligence (AGI) benefits all of humanity, OpenAI has been at the forefront of cutting-edge research and development. The organization's commitment to openness and responsible AI practices is reflected in its pioneering work, which includes the creation of advanced language models like GPT (Generative Pre-trained Transformer). OpenAI's contributions have reshaped the landscape of natural language processing, empowering applications ranging from chatbots and language translation to content generation. As a driving force in the AI community, OpenAI continues to pave the way for innovations that not only enhance machine capabilities but also address ethical considerations and the broader societal impact of artificial intelligence. Setting up the Development Environment Below are key steps to set up the development environment for building a Streamlit Chatbot Application with OpenAI and Elasticsearch: Install Streamlit: Begin by installing Streamlit using pip install streamlit in your Python environment. Streamlit simplifies the creation of interactive web applications and serves as the foundation for your chatbot interface. OpenAI API Access: Obtain access to the OpenAI API by signing up on the OpenAI platform. Retrieve your API key, which will enable your application to leverage OpenAI's natural language processing capabilities for intelligent chatbot responses. Set up Elasticsearch: Install and configure Elasticsearch, a powerful search engine, to enhance your chatbot's capabilities. You can download Elasticsearch from the official website and follow the setup instructions to get it running locally. Dependencies: Ensure you have the necessary Python libraries installed, including those required for interfacing with OpenAI (e.g., openai library) and connecting to Elasticsearch (e.g., elasticsearch library). How To Build a Chatbot Building a chatbot that integrates Elasticsearch for information retrieval and OpenAI's LangChain for advanced natural language processing involves several steps. Below is a simplified example using Python, Streamlit for the interface, and the elasticsearch and openai libraries. Step 1: Install Required Libraries Python pip install streamlit openai elasticsearch Step 2: Set Up Elasticsearch Connection Python pip install elasticsearch Step 3: Update OpenAI API Key Update your_openai_api_key in code using OpenAI API key from the OpenAI platform Step 4: Create a Streamlit App Python import streamlit as st import openai from elasticsearch import Elasticsearch # Set up OpenAI API key openai.api_key = 'your_openai_api_key' # Set up Elasticsearch connection es = Elasticsearch() # Streamlit App def main(): st.title("Chatbot using OpenAI and Elasticsearch") # User input user_input = st.text_input("Question:") if st.button("Answer"): # Call OpenAI API for generating response response = get_openai_response(user_input) # Display response st.text("Response: " + response) # Store the conversation in Elasticsearch index_conversation(user_input, response) # OpenAI API call function def get_openai_response(user_input): prompt = f"User: {user_input}\nChatbot:" response = openai.Completion.create( engine="text-davinci-003", prompt=prompt, temperature=0.7, max_tokens=150, n=1, ) return response['choices'][0]['text'].strip() # Store conversation in Elasticsearch def index_conversation(user_input, chatbot_response): doc = { 'user_input': user_input, 'chatbot_response': chatbot_response } es.index(index='chat_data', body=doc) if __name__ == "__main__": main() Step 5: Run the Streamlit App Python streamlit run your_script_name.py Enhancements and Efficiency Suggestions When integrating OpenAI with Elasticsearch using Streamlit, there are several enhancements and optimization techniques you can implement to improve the performance, user experience, and overall functionality of your chatbot application. Here are some suggestions: Context tracking for multi-turn conversations: Enhance the chatbot to handle multi-turn conversations by maintaining context between user interactions. Error handling: Implement robust error handling to gracefully manage situations where Elasticsearch queries return no results or when there are issues with the OpenAI API. User authentication and personalization: Consider implementing user authentication to personalize the chatbot experience. Optimize Elasticsearch queries: Fine-tune your Elasticsearch queries for optimal performance. Caching responses: Implement a caching mechanism to store and retrieve frequently used responses from both Elasticsearch and OpenAI. Implement throttling and rate limiting: To prevent abuse and control costs, consider implementing throttling and rate limiting for both Elasticsearch and OpenAI API requests. Integration with additional data sources: Expand the chatbot's capabilities by integrating it with other data sources or APIs. Natural Language Understanding (NLU) enhancements: Improve the natural language understanding of your chatbot by incorporating NLU models or techniques. User interface enhancements: Enhance the Streamlit user interface by incorporating features like interactive buttons, sliders, or dropdowns for user input. Monitoring and analytics: Implement monitoring and analytics tools to track user interactions, performance metrics, and potential issues. A/B testing: Conduct A/B testing to experiment with different variations of your chatbot's responses, Elasticsearch queries, or user interface elements. Security considerations: Ensure that your application follows best practices for security, especially when handling user data or sensitive information. Documentation and user guidance: Provide clear documentation and user guidance within the application to help users understand the capabilities of the chatbot. By incorporating these enhancements and optimization techniques, you can create a more robust, efficient, and user-friendly OpenAI and Elasticsearch integration using Streamlit. Use Cases Integrating OpenAI with Elasticsearch using Streamlit can offer a versatile solution for various use cases where natural language understanding, information retrieval, and user interaction are crucial. Here are a few use cases for such an integration: Customer support chatbots: Deploy an OpenAI-powered chatbot integrated with Elasticsearch for quick and accurate responses to customer queries. Knowledge base access: Enable users to access and search through a knowledge base using natural language queries. Interactive educational platforms: Develop interactive educational platforms where students can engage in natural language conversations with an OpenAI-based tutor. Technical troubleshooting: Build a technical support chatbot that assists users in troubleshooting issues. Interactive data exploration: Develop a chatbot that assists users in exploring and analyzing data stored in Elasticsearch indices. Personalized content recommendations: Implement a content recommendation chatbot that uses OpenAI to understand user preferences. Legal document assistance: Build a chatbot to assist legal professionals in retrieving information from legal documents stored in Elasticsearch. These use cases highlight the versatility of integrating OpenAI with Elasticsearch using Streamlit, offering solutions across various domains where natural language understanding and effective information retrieval are paramount. Conclusion Integration of OpenAI with Elasticsearch through the Streamlit framework offers a dynamic and sophisticated solution for building intelligent chatbot applications. This synergy harnesses the natural language processing capabilities of OpenAI, the efficient data retrieval of Elasticsearch, and the streamlined interface of Streamlit to create a responsive and user-friendly conversational experience. The outlined enhancements, from context tracking and error handling to user authentication and personalized responses, contribute to a versatile chatbot capable of addressing diverse user needs. This guide provides a comprehensive blueprint, emphasizing optimization techniques, security considerations, and the importance of continuous improvement through monitoring and A/B testing. Ultimately, the resulting application not only interprets user queries accurately but also delivers a seamless, engaging, and efficient interaction, marking a significant stride in the evolution of intelligent chatbot development.
AI holds significant promise for the IoT, but running these models on IoT semiconductors is challenging. These devices’ limited hardware makes running intelligent software locally difficult. Recent breakthroughs in neuromorphic computing (NC) could change that. Even outside the IoT, AI faces a scalability problem. Running larger, more complex algorithms with conventional computing consumes a lot of energy. The strain on power management semiconductors aside, this energy usage leads to sustainability and cost complications. For AI to sustain its current growth, tech companies must rethink their approach to computing itself. What Is Neuromorphic Computing? Neuromorphic computing models computer systems after the human brain. As neural networks teach software to think like humans, NC designs circuits to imitate human synapses and neurons. These biological systems are far more versatile and efficient than artificial “thinking” machines, so taking inspiration from them could lead to significant computing advancements. NC has been around as a concept for decades but has struggled to come to fruition. That may not be the case for long. Leading computing companies have come out with and refined several neuromorphic chips over the past few years. Another breakthrough came in August 2022, when researchers revealed a neuromorphic chip twice as energy efficient than previous models. These circuits typically store memory on the chip — or neuron — instead of connecting separate systems. Many also utilize analog memory to store more data in less space. NC is also parallel by design, letting all components operate simultaneously instead of processes moving from one point to another. How Neuromorphic Computing Could Change AI and IoT As this technology becomes more reliable and accessible, it could forever change the IoT semiconductor. This increased functionality would enable further improvements in AI, too. Here are a few of the most significant of these benefits. More Powerful AI Neuromorphic computing’s most obvious advantage is that it can handle much more complex tasks on smaller hardware. Conventional computing struggles to overcome the Von Neumann bottleneck — moving data between memory and processing locations slows it down. Since NC collocates memory and processing, it avoids this bottleneck. Recent neuromorphic chips are 4,000 times faster than the previous generation and have lower latencies than any conventional system. Consequently, they enable much more responsive AI. Near-real-time decision-making in applications like driverless vehicles and industrial robots would become viable. These AI systems could be as responsive and versatile as the human brain. The same hardware could process real-time responses in power management semiconductors and monitor for cyber threats in a connected energy grid. Robots could fill multiple roles as needed instead of being highly specialized. Lower Power Consumption NC also poses a solution to AI’s power problem. Like the human brain, NC is event-driven. Each specific neuron wakes in response to signals from others and can function independently. As a result, the only components using energy at any given point are those actually processing data. This segmentation, alongside the removal of the Von Neumann bottleneck, means NCs use far less energy while accomplishing more. On a large scale, that means computing giants can minimize their greenhouse gas emissions. On a smaller scale, it makes local AI computation possible on IoT semiconductors. Extensive Edge Networks The combination of higher processing power and lower power consumption is particularly beneficial for edge computing applications. Experts predict 75% of enterprise data processing will occur at the edge by 2025, but edge computing still faces several roadblocks. Neuromorphic computing promises a solution. Conventional IoT devices lack the processing capacity to run advanced applications in near-real-time locally. Network constraints further restrain that functionality. By making AI more accessible on smaller, less energy-hungry devices, NC overcomes that barrier. NC also supports the scalability the edge needs. Adding more neuromorphic chips increases these systems’ computing capacity without introducing energy or speed bottlenecks. As a result, it’s easier to implement a wider, more complex device network that can effectively function as a cohesive system. Increased Reliability NC could also make AI and IoT systems more reliable. These systems store information in multiple places instead of a centralized memory unit. If one neuron fails, the rest of the system can still function normally. This resilience complements other IoT hardware innovations to enable hardier edge computing networks. Thermoset composite plastics could prevent corrosion in the semiconductor, protecting the hardware, while NC ensures the software runs smoothly even if one component fails. These combined benefits expand the IoT’s potential use cases, bringing complex AI processes to even the most extreme environments. Edge computing systems in heavy industrial settings like construction sites or mines would become viable. Remaining Challenges in NC NC’s potential for IoT semiconductors and AI applications is impressive, but several obstacles remain. High costs and complexity are the most obvious. These brain-mimicking semiconductors are only effective with more recent, expensive memory and processing components. On top of introducing higher costs, these technologies’ newness means limited data on their efficacy in real-world applications. Additional testing and research will inevitably lead to breakthroughs past these obstacles, but that will take time. Most AI models today are also designed with conventional computing architectures in mind. Converting them for optimized use on a neuromorphic system could lower model accuracy and introduce additional costs. AI companies must develop NC-specific models to use this technology to its full potential. As with any AI application, neuromorphic computing may heighten ethical concerns. AI poses serious ethical challenges regarding bias, employment, cybersecurity, and privacy. If NC makes IoT semiconductors capable of running much more advanced AI, those risks become all the more threatening. Regulators and tech leaders must learn to navigate this moral landscape before deploying this new technology. Neuromorphic Computing Will Change the IoT Semiconductor Neuromorphic computing could alter the future of technology, from power management semiconductors to large-scale cloud data centers. It’d spur a wave of more accurate, versatile, reliable, and accessible AI, but those benefits come with equal challenges. NC will take more research and development before it’s ready for viable real-world use. However, its potential is undeniable. This technology will define the future of AI and the IoT. The question is when that will happen and how positive that impact will be.
Have you ever wondered how data warehouses are different from Databases? And what are Data Lakes and Data Lake Houses? Let’s understand these with a hypothetical example. Bookster.biz is the new sensation in selling books worldwide. The business is flourishing, and they need to keep track of a lot of data: a large catalog of millions of books, millions of customers worldwide placing billions of orders to buy books. How do they keep track of all this data? How do they ensure their website and apps don’t grind to a halt because of all this load? Databases to the Rescue Databases are the workhorses of websites and mobile apps, handling all the data and millions of transactions. These databases come in many flavors (we will cover all different types of databases in a separate post). Still, the most popular ones are called Relational Databases (aka RDBMS), like MySQL, Postgres, Oracle, etc. Bookster would possibly have the following tables and schema (not exhaustive for brevity): BookCatalog: book ID, ISBN, title, authors, description, publisher, … BookInventory: book ID, number of books available for sale, ... Users: user ID, user name, email, … Orders: Order ID, book ID, user ID, payment information, order status, … When a user orders a book, Bookster will update two records simultaneously: reducing book inventory and inserting a new order entry in the Orders table. RDBMSs support transactions that enable such atomic operations where either all such operations succeed or all fail. Imagine if two or more users could order the last copy of a popular book. Without transaction support, all customers will place orders, and Bookster will have many pissed-off customers except one. Similarly, if the Database host crashes during the processing, the data may be inconsistent without transactions. This database interaction type is called Online Transaction Processing (aka OLTP), where the read and write operations happen very fast on a small amount of data, i.e., precisely two rows in the previous example. This is great. The customers are now happy, and they can order books fast. But the management wants to know what’s going on with the business. Which books are the best-sellers in different categories? Which authors are trending, and which are not selling much? How many orders are coming from which geographies or demographics? These kinds of answers are not accessible with just the databases. Data Warehouses Shine for Analytical Queries Data Warehouses (DWs) can handle large amounts of data, e.g., billions of orders, millions of book entries, etc. Bookster can load the data from the Database to the DW to answer the management questions. The analytical queries read a lot of data and summarise it in some form, like listing the total number of orders for a particular book broken down by geography and demographics. Examples of popular DWs are AWS Redshift, GCP BigQuery, etc. This database interaction type is called Online Analytical Processing (aka OLAP), where most reads happen on a large amount of data. The data is uploaded to the DWs in batches or can be streamed. The loading process is also known as ETL (Extract, Transform, and Load), which is done regularly to keep the DW in sync with the Database updates. DWs typically don't allow updating data but only add a newer version. Like RDBMS, DWs also have a notion of schema where tables and schema are well defined, and the ETL process converts the data into appropriate schema for loading. Some data doesn’t fit the schema easily but can be used by Machine Learning (ML) processes. For example, customers review different books as a text or a video review, and some rockstar ML engineers want to generate popular books by training an LLM on all books. So, the data can’t be structured as a strict schema anymore. Data Lakes help here by storing even more significant amounts of data with different formats and allowing efficient processing. Data Lakes and Data Lake Houses Are the Relatively New Kids on the Block Data Lakes (DLs) overcome the friction of converting the data into a specific format irrespective of if and when it will be used. Vast amounts of data in different native formats like JSON, text, binary, images, videos, etc., can be stored in a DL and converted to a specific schema at read time only when there is a need to process the data. The processing is flexible and scalable as DLs can support big data processing frameworks like Apache Spark. On the flip side, such flexibility could become a drawback if most of the data ingested is low quality due to the lack of data quality check or governance, making DL a ‘Data Swamp’ instead. That’s where the clever people of Databricks combined the goodness of DWs with DLs to create Data Lake Houses (DLHs). DLHs are more flexible than DWs, allowing schema both at the time of writing or reading, as needed, but with stricter mechanisms for data quality checks and metadata management, aka Data Governance. Also, DLHs allow flexibility in big data processing like DLs. The following table summarises the differences between these technologies: Key Characteristics Suitable for Drawbacks Examples Database Fast, small queries, transaction support Online use cases (OLTP) Not ideal for large analytical queries RDBMS: MySQL Data Warehouse Slow, large queries, no updates after write Analytics (OLAP) Less flexible as strict schema and lack of support for big data processing frameworks AWS Redshift, Google BigQuery, *Snowflake Data Lake Unstructured data, schema on read, flexible and big data processing Analytics (OLAP) Data quality issues due to lack of Data Governance *Snowflake, **AWS Lake Formation, **Databricks Delta Lake Data Lake House Structured or unstructured data, flexible with better Data Governance and supports big data processing Analytics (OLAP) More complex, less performance, and more expensive compared to DW *Snowflake, **AWS Lake Formation, **Databricks Delta Lake *Snowflake can be configured as a Data Warehouse, Data Lake, or Data Lake House. **AWS Lake Formation and Databricks Delta Lake can be configured as either Data Lake or Data Lake House.
Improving an organization's overall data capabilities enables teams to operate more efficiently. Emerging technologies have brought real-time data closer to business users, which plays a critical role in effective decision-making. In data analytics, the "hot path" and "cold path" refer to two distinct processing routes for handling data. The hot path involves real-time or near-real-time processing of data, where information is analyzed and acted upon immediately as it arrives. This path is crucial for time-sensitive applications, enabling quick responses to emerging trends or events. On the other hand, the cold path involves the batch processing of historical or less time-sensitive data, allowing for in-depth analysis, long-term trends identification, and comprehensive reporting, making it ideal for strategic planning and retrospective insights in data analytics workflows. In typical analytics solutions, the integration of incoming telemetry data with corresponding meta-data related to entities such as devices, users, or applications is a prerequisite on the server side before effective visualization in an application can occur. In this article, we will explore innovative methodologies for seamlessly combining data from diverse sources so that an effective dashboard can be built. The Event-Driven Architecture for Real-Time Anomalies Let's explore a real-time dashboard wherein administrators meticulously monitor network usage. In this scenario, live data on network usage from each device is transmitted in real-time, undergoing aggregation on the server side, inclusive of associating the data with respective client names before refreshing the user's table. In such use cases, the implementation of Event-Driven architecture patterns emerges as the optimal approach for ensuring seamless data processing and real-time insights. Event-driven design seamlessly orchestrates data flow between disparate microservices, enabling the aggregation of critical data points. Through clearly defined events, information from two distinct microservices is aggregated, ensuring real-time updates. The culmination of this event-driven approach provides a comprehensive and up-to-date representation of key metrics and insights for informed decision-making. In the depicted scenario, the telemetry data is seamlessly transmitted to the service bus for integration into the Dashboard service. Conversely, device metadata exhibits infrequent changes. Upon receipt of new telemetry events, the Dashboard service dynamically augments each record with all relevant metadata, presenting a comprehensive dataset for consumption by APIs. This entire process unfolds in real-time, empowering administrators to promptly identify network anomalies and initiate timely corrective measures. This methodology proves effective for those real-time scenarios, characterized by frequent incremental data ingestion to the server and a resilient system for processing those events. The Materialized View Architecture for Historical Reports For a historical report dashboard, adopting an event-driven approach might entail unnecessary effort, given that real-time updates are not imperative. A more efficient strategy would involve leveraging PostgreSQL Materialized Views, which is particularly suitable for handling bursty data updates. This approach allows for scheduled data crunching at predefined intervals, such as daily, weekly, or monthly, aligning with the periodic nature of the reporting requirements. PostgreSQL Materialized Views provide a robust mechanism for persistently storing the results of complex joins between disparate tables as physical tables. One of the standout advantages of materialized views is their ability to significantly improve the efficiency of data retrieval operations in APIs, as a considerable portion of the data is pre-computed. The incorporation of materialized views within PostgreSQL represents a substantial performance boost for read queries, particularly beneficial when the application can tolerate older, stale data. This feature serves to reduce disk access and streamline complex query computations by transforming the result set of a view into a tangible physical table. Let’s look at the above example with Device telemetry and metadata tables. The mat view can be created by the command below in SQL. SQL CREATE MATERIALIZED VIEW device_health_mat AS SELECT t.bsod_count, t.storage_used, t.date FROM device_telemetry t INNER JOIN device d ON t.ID = d.ID WITH DATA; Materialized views are beneficial in data warehousing and business intelligence applications where complex queries, data transformation, and aggregations are the norms. You can leverage materialized views when you have complex queries powering user-facing visualizations that need to load quickly to provide a great user experience. The only bottleneck with them is that the refresh needs to be explicitly done when the underlying tables have new data and can be scheduled with the command below. SQL REFRESH MATERIALIZED VIEW device_health_mat; (or) REFRESH MATERIALIZED VIEW CONCURRENTLY device_health_mat; In conclusion, while both aforementioned use cases share a dashboard requirement, the selection of tools and design must be meticulously tailored to the specific usage patterns to ensure the effectiveness of the solution.
In today's world of distributed systems and microservices, it is crucial to maintain consistency. Microservice architecture is considered almost a standard for building modern, flexible, and reliable high-loaded systems. But at the same time introduces additional complexities. Monolith vs Microservices In monolithic applications, consistency can be achieved using transactions. Within a transaction, we can modify data in multiple tables. If an error occurred during the modification process, the transaction would roll back and the data would remain consistent. Thus consistency was achieved by the database tools. In a microservice architecture, things get much more complicated. At some point, we will have to change data not only in the current microservice but also in other microservices. Imagine a scenario where a user interacts with a web application and creates an order on the website. When the order is created, it is necessary to reduce the number of items in stock. In a monolithic application, this could look like the following: In a microservice architecture, such tables can change within different microservices. When creating an order, we need to call another service using, for example, REST or Kafka. But there are many problems here: the request may fail, the network or the microservice may be temporarily unavailable, the microservice may stop immediately after creating a record in the orders table and the message will not be sent, etc. Transactional Outbox One solution to this problem is to use the transactional outbox pattern. We can create an order and a record in the outbox table within one transaction, where we will add all the necessary data for a future event. A specific handler will read this record and send the event to another microservice. This way we ensure that the event will be sent if we have successfully created an order. If the network or microservice is unavailable, then the handler will keep trying to send the message until it receives a successful response. This will result in eventual consistency. It is worth noting here that it is necessary to support idempotency because, in such architectures, request processing may be duplicated. Implementation Let's consider an example of implementation in a Spring Boot application. We will use a ready solution transaction-outbox. First, let's start PostgreSQL in Docker: Shell docker run -d -p 5432:5432 --name db \ -e POSTGRES_USER=admin \ -e POSTGRES_PASSWORD=password \ -e POSTGRES_DB=demo \ postgres:12-alpine Add a dependency to build.gradle: Groovy implementation 'com.gruelbox:transactionoutbox-spring:5.3.370' Declare the configuration: Java @Configuration @EnableScheduling @Import({ SpringTransactionOutboxConfiguration.class }) public class TransactionOutboxConfig { @Bean public TransactionOutbox transactionOutbox(SpringTransactionManager springTransactionManager, SpringInstantiator springInstantiator) { return TransactionOutbox.builder() .instantiator(springInstantiator) .initializeImmediately(true) .retentionThreshold(Duration.ofMinutes(5)) .attemptFrequency(Duration.ofSeconds(30)) .blockAfterAttempts(5) .transactionManager(springTransactionManager) .persistor(Persistor.forDialect(Dialect.POSTGRESQL_9)) .build(); } } Here we specify how many attempts should be made in case of unsuccessful request sending, the interval between attempts, etc. For the functioning of a separate thread that will parse records from the outbox table, we need to call outbox.flush() periodically. For this purpose, let's declare a component: Java @Component @AllArgsConstructor public class TransactionOutboxWorker { private final TransactionOutbox transactionOutbox; @Scheduled(fixedDelay = 5000) public void flushTransactionOutbox() { transactionOutbox.flush(); } } The execution time of flush should be chosen according to your requirements. Now we can implement the method with business logic. We need to create an Order in the database and send the event to another microservice. For demonstration purposes, I will not implement the actual call but will simulate the error of sending the event by throwing an exception. The method itself should be marked @Transactional, and the event sending should be done not directly, but using the TransactionOutbox object: Java @Service @AllArgsConstructor @Slf4j public class OrderService { private OrderRepository repository; private TransactionOutbox outbox; @Transactional public String createOrderAndSendEvent(Integer productId, Integer quantity) { String uuid = UUID.randomUUID().toString(); repository.save(new OrderEntity(uuid, productId, quantity)); outbox.schedule(getClass()).sendOrderEvent(uuid, productId, quantity); return uuid; } void sendOrderEvent(String uuid, Integer productId, Integer quantity) { log.info(String.format("Sending event for %s...", uuid)); if (ThreadLocalRandom.current().nextBoolean()) throw new RuntimeException(); log.info(String.format("Event sent for %s", uuid)); } } Here randomly the method may throw an exception. However, the key feature is that this method is not called directly, and the call information is stored in the Outbox table within a single transaction. Let's start the service and execute the query: Shell curl --header "Content-Type: application/json" \ --request POST \ --data '{"productId":"10","quantity":"2"}' \ http://localhost:8080/order {"id":"6a8e2960-8e94-463b-90cb-26ce8b46e96c"} If the method is successful, the record is removed from the table, but if there is a problem, we can see the record in the table: Shell docker exec -ti <CONTAINER ID> bash psql -U admin demo psql (12.16) Type "help" for help. demo=# \x Expanded display is on. demo=# SELECT * FROM txno_outbox; -[ RECORD 1 ]---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ id | d0b69f7b-943a-44c9-9e71-27f738161c8e invocation | {"c":"orderService","m":"sendOrderEvent","p":["String","Integer","Integer"],"a":[{"t":"String","v":"6a8e2960-8e94-463b-90cb-26ce8b46e96c"},{"t":"Integer","v":10},{"t":"Integer","v":2}]} nextattempttime | 2023-11-19 17:59:12.999 attempts | 1 blocked | f version | 1 uniquerequestid | processed | f lastattempttime | 2023-11-19 17:58:42.999515 Here we can see the parameters of the method call, the time of the next attempt, the number of attempts, etc. According to your settings, the handler will try to execute the request until it succeeds or until it reaches the limit of attempts. This way, even if our service restarts (which is considered normal for cloud-native applications), we will not lose important data about the external service call, and eventually the message will be delivered to the recipient. Conclusion Transactional outbox is a powerful solution for addressing data consistency issues in distributed systems. It provides a reliable and organized approach to managing transactions between microservices. This greatly reduces the risks associated with data inconsistency. We have examined the fundamental principles of the transactional outbox pattern, its implementation, and its benefits in maintaining a coherent and synchronized data state. The project code is available on GitHub.
Apache Airflow is an open-source platform that allows you to programmatically author, schedule, and monitor workflows. It uses Python as its programming language and offers a flexible architecture suited for both small-scale and large-scale data processing. The platform supports the concept of Directed Acyclic Graphs to define workflows, making it easy to visualize complex data pipelines. One of the key features of Apache Airflow is its ability to schedule and trigger batch jobs, making it a popular choice for processing large volumes of data. It provides excellent support for integrating with various data processing technologies and frameworks such as Apache Hadoop and Apache Spark. By using Apache Airflow for batch processing, you can easily define and schedule your data processing tasks, ensuring that they are executed in the desired order and within the specified time constraints. Batch processing is a common approach in big data processing that involves the processing of data in large volumes, typically at regular time intervals. This approach is well-suited for scenarios where data can be collected over a period and processed together as a batch. Within the fintech sector, batch processing caters to a wide range of applications, including but not limited to authorization and settlement processes, management of recurring payments, enabling reconciliation operations, performing fraud detection and analytic tasks, adhering to regulatory mandates, and overseeing changes to customer relationship management systems. Let's explore a simple use case of processing an input file and writing back to the output file using Apache Airflow. To get started with Apache Airflow, you can follow the official documentation for installation and setup. Overview diagram illustrating the basic flow of a batch processing scenario Setting the Stage Python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta # Default arguments for the DAG default_args = { 'owner': 'airflow', 'start_date': datetime(2021, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), } The script begins by importing necessary modules and defining default arguments for the DAG. These default parameters include the DAG owner, start date, and retry settings. Reading Function: Extracting Data Python def read_function(**kwargs): ti = kwargs["ti"] # Read from a file (example: input.txt) with open("path/to/file/input_file.txt", "r") as file: # Read the remaining lines lines = file.readlines() # Push each line to XCom storage for i, line in enumerate(lines): ti.xcom_push(key=f"line_{i}", value=line.strip()) # Push the total number of lines to XCom storage ti.xcom_push(key="num_lines", value=len(lines)) The read_function simulates the extraction of data by reading lines from a file (`input.txt`). It then uses Airflow's XCom feature to push each line and the total number of lines into storage, making it accessible to subsequent tasks. Sample Input File Plain Text CardNumber,TransactionId,Amount,TxnType,Recurring,Date 1,123456789,100.00,Debit,Monthly,2023-12-31 2,987654321,50.00,Credit,Weekly,2023-10-15 3,456789012,75.50,Debit,Monthly,2023-11-30 4,555111222,120.75,Credit,Daily,2023-09-30 In the given input file, we can see the handling of a recurring transactions file. Processing Function: Transforming Data Python def process_function(**kwargs): ti = kwargs["ti"] # Pull all lines from XCom storage lines = [ti.xcom_pull(task_ids="read", key=f"line_{i}") for i in range(ti.xcom_pull(task_ids="read", key="num_lines"))] # Process and print all lines for i, line in enumerate(lines): logging.info(f"Make Payment Transaction {i + 1}: {line}") The process_function pulls all lines from XCom storage and simulates the transformation process by printing each line to the console. This task demonstrates the flexibility of Airflow in handling data flow between tasks. The process_function can have multiple implementations, allowing it to either invoke a web service call to execute the transaction or call another DAG to follow a different flow. Logs Plain Text [2023-11-28, 03:49:06 UTC] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='batch_processing_dag' AIRFLOW_CTX_TASK_ID='process' AIRFLOW_CTX_EXECUTION_DATE='2023-11-28T03:48:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-11-28T03:48:00+00:00' [2023-11-28, 03:49:06 UTC] {batch_processing_dag.py:38} INFO - Make Payment Transaction 1: 1,123456789,100.00,Debit,Monthly,2023-12-31 [2023-11-28, 03:49:06 UTC] {batch_processing_dag.py:38} INFO - Make Payment Transaction 2: 2,987654321,50.00,Credit,Weekly,2023-10-15 [2023-11-28, 03:49:06 UTC] {batch_processing_dag.py:38} INFO - Make Payment Transaction 3: 3,456789012,75.50,Debit,Monthly,2023-11-30 [2023-11-28, 03:49:06 UTC] {batch_processing_dag.py:38} INFO - Make Payment Transaction 4: 4,555111222,120.75,Credit,Daily,2023-09-30 [2023-11-28, 03:49:06 UTC] {python.py:194} INFO - Done. Returned value was: None Writing Function: Loading Data Python def write_function(**kwargs): ti = kwargs["ti"] # Pull all lines from XCom storage lines = [ti.xcom_pull(task_ids="read", key=f"line_{i}") for i in range(ti.xcom_pull(task_ids="read", key="num_lines"))] # Write all lines to an output file (example: output.txt) with open("path/to/file/processed.txt", "a") as file: for i, line in enumerate(lines): processed_line = f"{line.strip()} PROCESSED" file.write(f"{processed_line}\n") The write_function pulls all lines from XCom storage and writes them to an output file (`processed.txt`). Sample Output File After Transaction Is Processed Plain Text 1,123456789,100.00,Debit,Monthly,2023-12-31 PROCESSED 2,987654321,50.00,Credit,Weekly,2023-10-15 PROCESSED 3,456789012,75.50,Debit,Monthly,2023-11-30 PROCESSED 4,555111222,120.75,Credit,Daily,2023-09-30 PROCESSED DAG Definition: Orchestrating the Workflow Python dag = DAG( 'batch_processing_dag', default_args=default_args, description='DAG with Read, Process, and Write functions', schedule_interval='*/1 * * * *', # Set the schedule interval according to your needs catchup=False, ) The DAG is instantiated with the name batch_processing_dag, the previously defined default arguments, a description, a schedule interval (running every 1 minute), and the catchup parameter set to False. Task Definitions: Executing the Functions Python # Task to read from a file and push to XCom storage read_task = PythonOperator( task_id='read', python_callable=read_function, provide_context=True, dag=dag, ) # Task to process the data from XCom storage (print to console) process_task = PythonOperator( task_id='process', python_callable=process_function, provide_context=True, dag=dag, ) # Task to write the data back to an output file write_task = PythonOperator( task_id='write', python_callable=write_function, provide_context=True, dag=dag, ) Three tasks (read_task, process_task, and write_task) are defined using the PythonOperator. Each task is associated with one of the Python functions (read_function, process_function, and write_function). The provide_context=True parameter allows the functions to access the task instance and context information. Defining Task Dependencies Python # Define task dependencies read_task >> process_task >> write_task The task dependencies are specified using the >> operator, indicating the order in which the tasks should be executed. Conclusion In conclusion, Apache Airflow proves to be a flexible open-source tool that is great at managing workflows, especially when it comes to batch processing. It is the best choice for organizations of all sizes because it has features like dynamic workflow definition, support for Directed Acyclic Graphs (DAGs), careful task dependency management, full monitoring and logging, efficient parallel execution, and strong error handling. Illustrated by a straightforward batch processing scenario, the example emphasizes Apache Airflow's user-friendly interface and its adaptability to a range of data processing needs, showcasing its ease of use and versatility.
The AIDocumentLibraryChat project uses the Spring AI project with OpenAI to search in a document library for answers to questions. To do that, Retrieval Augmented Generation is used on the documents. Retrieval Augmented Generation The process looks like this: The process looks like this: Upload Document Store Document in Postgresql DB. Split Document to create Embeddings. Create Embeddings with a call to the OpenAI Embedding Model. Store the Document Embeddings in the Postgresql Vector DB. Search Documents: Create Search Prompt Create Embedding of the Search Prompt with a call to the OpenAI Embedding Model. Query the Postgresql Vector DB for documents with nearest Embedding distances. Query Postgresql DB for Document. Create Prompt with the Search Prompt and the Document text chunk. Request an answer from GPT Model and show the answer based on the search prompt and the Document text chunk. Document Upload The uploaded document is stored in the database to have the source document of the answer. The document text has to be split in chunks to create embeddings per chunk. The embeddings are created by an embedding model of OpenAI and are a vectors with more than 1500 dimensions to represent the text chunk. The embedding is stored in an AI document with the chunk text and the id of the source document in the vector database. Document Search The document search takes the search prompt and uses the Open AI embedding model to turn it in an embedding. The embedding is used to search in the vector database for the nearest neighbor vector. That means that the embeddings of search prompt and the text chunk that have the biggest similarities. The id in the AIDocument is used to read the document of the relational database. With the Search Prompt and the text chunk of the AIDocument, the Document Prompt created. Then, the OpenAI GPT model is called with the prompt to create an answer based on Search Prompt and the document context. That causes the model to create answers that are closely based on the documents provided and improves the accuracy. The answer of the GPT model is returned and displayed with a link of the document to provide the source of the answer. Architecture The architecture of the project is built around Spring Boot with Spring AI. The Angular UI provides the user interface to show the document list, upload the documents and provide the Search Prompt with the answer and the source document. It communicates with the Spring Boot backend via the rest interface. The Spring Boot backend provides the rest controllers for the frontend and uses Spring AI to communicate with the OpenAI models and the Postgresql Vector database. The documents are stored with Jpa in the Postgresql Relational database. The Postgresql database is used because it combines the relational database and the vector database in a Docker image. Implementation Frontend The frontend is based on lazy loaded standalone components build with Angular. The lazy loaded standalone components are configured in the app.config.ts: TypeScript export const appConfig: ApplicationConfig = { providers: [provideRouter(routes), provideAnimations(), provideHttpClient()] }; The configuration sets the routes and enables the the http client and the animations. The lazy loaded routes are defined in app.routes.ts: TypeScript export const routes: Routes = [ { path: "doclist", loadChildren: () => import("./doc-list").then((mod) => mod.DOCLIST), }, { path: "docsearch", loadChildren: () => import("./doc-search").then((mod) => mod.DOCSEARCH), }, { path: "**", redirectTo: "doclist" }, ]; In 'loadChildren' the 'import("...").then((mod) => mod.XXX)' loads the the route lazily from the provided path and sets the exported routes defined in the 'mod.XXX' constant. The lazy loaded route 'docsearch' has the index.ts to export the constant: TypeScript export * from "./doc-search.routes"; That exports the doc-search.routes.ts: TypeScript export const DOCSEARCH: Routes = [ { path: "", component: DocSearchComponent, }, { path: "**", redirectTo: "" }, ]; It defines the routing to the 'DocSearchComponent'. The fileupload can be found in the DocImportComponent with the template doc-import.component.html: HTML <h1 mat-dialog-title i18n="@@docimportImportFile">Import file</h1> <div mat-dialog-content> <p i18n="@@docimportFileToImport">File to import</p> @if(uploading) { <div class="upload-spinner"><mat-spinner></mat-spinner></div> } @else { <input type="file" (change)="onFileInputChange($event)"> } @if(!!file) { <div> <ul> <li>Name: {{file.name}</li> <li>Type: {{file.type}</li> <li>Size: {{file.size} bytes</li> </ul> </div> } </div> <div mat-dialog-actions> <button mat-button (click)="cancel()" i18n="@@cancel">Cancel</button> <button mat-flat-button color="primary" [disabled]="!file || uploading" (click)="upload()" i18n="@@docimportUpload">Upload</button> </div> The fileupload is done with the '<input type="file" (change)="onFileInputChange($event)">' tag. It provides the upload feature and calls the 'onFileInputChange(...)' method after each upload. The 'Upload' button calls the 'upload()' method to send the file to the server on click. The doc-import.component.ts has methods for the template: TypeScript @Component({ selector: 'app-docimport', standalone: true, imports: [CommonModule,MatFormFieldModule, MatDialogModule,MatButtonModule, MatInputModule, FormsModule, MatProgressSpinnerModule], templateUrl: './doc-import.component.html', styleUrls: ['./doc-import.component.scss'] }) export class DocImportComponent { protected file: File | null = null; protected uploading = false; private destroyRef = inject(DestroyRef); constructor(private dialogRef: MatDialogRef<DocImportComponent>, @Inject(MAT_DIALOG_DATA) public data: DocImportComponent, private documentService: DocumentService) { } protected onFileInputChange($event: Event): void { const files = !$event.target ? null : ($event.target as HTMLInputElement).files; this.file = !!files && files.length > 0 ? files[0] : null; } protected upload(): void { if(!!this.file) { const formData = new FormData(); formData.append('file', this.file as Blob, this.file.name as string); this.documentService.postDocumentForm(formData) .pipe(tap(() => {this.uploading = true;}), takeUntilDestroyed(this.destroyRef)) .subscribe(result => {this.uploading = false; this.dialogRef.close();}); } } protected cancel(): void { this.dialogRef.close(); } } This is the standalone component with its module imports and the injected 'DestroyRef'. The 'onFileInputChange(...)' method takes the event parameter and stores its 'files' property in the 'files' constant. Then it checks for the first file and stores it in the 'file' component property. The 'upload()' method checks for the 'file' property and creates the 'FormData()' for the file upload. The 'formData' constant has the datatype ('file'), the content ('this.file') and the filename ('this.file.name') appended. Then the 'documentService' is used to post the 'FormData()' object to the server. The 'takeUntilDestroyed(this.destroyRef)' function unsubscribes the Rxjs pipeline after the component is destroyed. That makes unsubscribing pipelines very convenient in Angular. Backend The backend is a Spring Boot application with the Spring AI framework. Spring AI manages the requests to the OpenAI models and the Vector Database Requests. Liquibase Database setup The database setup is done with Liquibase and the script can be found in the db.changelog-1.xml: XML <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd"> <changeSet id="1" author="angular2guy"> <sql>CREATE EXTENSION if not exists hstore;</sql> </changeSet> <changeSet id="2" author="angular2guy"> <sql>CREATE EXTENSION if not exists vector;</sql> </changeSet> <changeSet id="3" author="angular2guy"> <sql>CREATE EXTENSION if not exists "uuid-ossp";</sql> </changeSet> <changeSet author="angular2guy" id="4"> <createTable tableName="document"> <column name="id" type="bigint"> <constraints primaryKey="true"/> </column> <column name="document_name" type="varchar(255)"> <constraints notNullConstraintName="document_document_name_notnull" nullable="false"/> </column> <column name="document_type" type="varchar(25)"> <constraints notNullConstraintName="document_document_type_notnull" nullable="false"/> </column> <column name="document_content" type="blob"/> </createTable> </changeSet> <changeSet author="angular2guy" id="5"> <createSequence sequenceName="document_seq" incrementBy="50" startValue="1000" /> </changeSet> <changeSet id="6" author="angular2guy"> <createTable tableName="vector_store"> <column name="id" type="uuid" defaultValueComputed="uuid_generate_v4 ()"> <constraints primaryKey="true"/> </column> <column name="content" type="text"/> <column name="metadata" type="json"/> <column name="embedding" type="vector(1536)"> <constraints notNullConstraintName= "vectorstore_embedding_type_notnull" nullable="false"/> </column> </createTable> </changeSet> <changeSet id="7" author="angular2guy"> <sql>CREATE INDEX vectorstore_embedding_index ON vector_store USING HNSW (embedding vector_cosine_ops);</sql> </changeSet> </databaseChangeLog> In the changeset 4 the table for the Jpa document entity is created with the primary key 'id'. The content type/size is unknown and because of that set to 'blob'. I changeset 5 the sequence for the Jpa entity is created with the default properties of Hibernate 6 sequences that are used by Spring Boot 3.x. In changeset 6 the table 'vector_store' is created with a primary key 'id' of type 'uuid' that is created by the 'uuid-ossp' extension. The column 'content' is of type 'text'('clob' in other databases) to has a flexible size. The 'metadata' column stores the metadata in a 'json' type for the AIDocuments. The 'embedding' column stores the embedding vector with the number of OpenAI dimensions. In changeset 7 the index for the fast search of the 'embeddings' column is set. Due to limited parameters of the Liquibase '<createIndex ...>' '<sql>' is used directly to create it. Spring Boot / Spring AI implementation The DocumentController for the frontend looks like this: Java @RestController @RequestMapping("rest/document") public class DocumentController { private final DocumentMapper documentMapper; private final DocumentService documentService; public DocumentController(DocumentMapper documentMapper, DocumentService documentService) { this.documentMapper = documentMapper; this.documentService = documentService; } @PostMapping("/upload") public long handleDocumentUpload( @RequestParam("file") MultipartFile document) { var docSize = this.documentService .storeDocument(this.documentMapper.toEntity(document)); return docSize; } @GetMapping("/list") public List<DocumentDto> getDocumentList() { return this.documentService.getDocumentList().stream() .flatMap(myDocument ->Stream.of(this.documentMapper.toDto(myDocument))) .flatMap(myDocument -> { myDocument.setDocumentContent(null); return Stream.of(myDocument); }).toList(); } @GetMapping("/doc/{id}") public ResponseEntity<DocumentDto> getDocument( @PathVariable("id") Long id) { return ResponseEntity.ofNullable(this.documentService .getDocumentById(id).stream().map(this.documentMapper::toDto) .findFirst().orElse(null)); } @GetMapping("/content/{id}") public ResponseEntity<byte[]> getDocumentContent( @PathVariable("id") Long id) { var resultOpt = this.documentService.getDocumentById(id).stream() .map(this.documentMapper::toDto).findFirst(); var result = resultOpt.stream().map(this::toResultEntity) .findFirst().orElse(ResponseEntity.notFound().build()); return result; } private ResponseEntity<byte[]> toResultEntity(DocumentDto documentDto) { var contentType = switch (documentDto.getDocumentType()) { case DocumentType.PDF -> MediaType.APPLICATION_PDF; case DocumentType.HTML -> MediaType.TEXT_HTML; case DocumentType.TEXT -> MediaType.TEXT_PLAIN; case DocumentType.XML -> MediaType.APPLICATION_XML; default -> MediaType.ALL; }; return ResponseEntity.ok().contentType(contentType) .body(documentDto.getDocumentContent()); } @PostMapping("/search") public DocumentSearchDto postDocumentSearch(@RequestBody SearchDto searchDto) { var result = this.documentMapper .toDto(this.documentService.queryDocuments(searchDto)); return result; } } The 'handleDocumentUpload(...)' handles the uploaded file with the 'documentService' at the '/rest/document/upload' path. The 'getDocumentList()' handles the get requests for the document lists and removes the document content to save on the response size. The 'getDocumentContent(...)' handles the get requests for the document content. It loads the document with the 'documentService' and maps the 'DocumentType' to the 'MediaType'. Then it returns the content and the content type, and the browser opens the content based on the content type. The 'postDocumentSearch(...)' method puts the request content in the 'SearchDto' object and returns the AI generated result of the 'documentService.queryDocuments(...)' call. The method 'storeDocument(...)' of the DocumentService looks like this: Java public Long storeDocument(Document document) { var myDocument = this.documentRepository.save(document); Resource resource = new ByteArrayResource(document.getDocumentContent()); var tikaDocuments = new TikaDocumentReader(resource).get(); record TikaDocumentAndContent(org.springframework.ai.document.Document document, String content) { } var aiDocuments = tikaDocuments.stream() .flatMap(myDocument1 -> this.splitStringToTokenLimit( myDocument1.getContent(), CHUNK_TOKEN_LIMIT) .stream().map(myStr -> new TikaDocumentAndContent(myDocument1, myStr))) .map(myTikaRecord -> new org.springframework.ai.document.Document( myTikaRecord.content(), myTikaRecord.document().getMetadata())) .peek(myDocument1 -> myDocument1.getMetadata() .put(ID, myDocument.getId().toString())).toList(); LOGGER.info("Name: {}, size: {}, chunks: {}", document.getDocumentName(), document.getDocumentContent().length, aiDocuments.size()); this.documentVsRepository.add(aiDocuments); return Optional.ofNullable(myDocument.getDocumentContent()).stream() .map(myContent -> Integer.valueOf(myContent.length).longValue()) .findFirst().orElse(0L); } private List<String> splitStringToTokenLimit(String documentStr, int tokenLimit) { List<String> splitStrings = new ArrayList<>(); var tokens = new StringTokenizer(documentStr).countTokens(); var chunks = Math.ceilDiv(tokens, tokenLimit); if (chunks == 0) { return splitStrings; } var chunkSize = Math.ceilDiv(documentStr.length(), chunks); var myDocumentStr = new String(documentStr); while (!myDocumentStr.isBlank()) { splitStrings.add(myDocumentStr.length() > chunkSize ? myDocumentStr.substring(0, chunkSize) : myDocumentStr); myDocumentStr = myDocumentStr.length() > chunkSize ? myDocumentStr.substring(chunkSize) : ""; } return splitStrings; } The 'storeDocument(...)' method saves the document to the relational database. Then, the document is converted in a 'ByteArrayResource' and read with the 'TikaDocumentReader' of Spring AI to turn it in a AIDocument list. Then the AIDocument list is flatmapped to split the documents into chunks with the the 'splitToTokenLimit(...)' method that are turned in new AIDocuments with the 'id' of the stored document in the Metadata map. The 'id' in the Metadata enables loading the matching document entity for the AIDocuments. Then the embeddings for the AIDocuments are created implicitly with calls to the 'documentVsRepository.add(...)' method that calls the OpenAI Embedding model and stores the AIDocuments with the embeddings in the vector database. Then the result is returned. The method 'queryDocument(...)' looks like this: Java public AiResult queryDocuments(SearchDto searchDto) { var similarDocuments = this.documentVsRepository .retrieve(searchDto.getSearchString()); var mostSimilar = similarDocuments.stream() .sorted((myDocA, myDocB) -> ((Float) myDocA.getMetadata().get(DISTANCE)) .compareTo(((Float) myDocB.getMetadata().get(DISTANCE)))).findFirst(); var documentChunks = mostSimilar.stream().flatMap(mySimilar -> similarDocuments.stream().filter(mySimilar1 -> mySimilar1.getMetadata().get(ID).equals( mySimilar.getMetadata().get(ID)))).toList(); Message systemMessage = switch (searchDto.getSearchType()) { case SearchDto.SearchType.DOCUMENT -> this.getSystemMessage( documentChunks, (documentChunks.size() <= 0 ? 2000 : Math.floorDiv(2000, documentChunks.size()))); case SearchDto.SearchType.PARAGRAPH -> this.getSystemMessage(mostSimilar.stream().toList(), 2000); }; UserMessage userMessage = new UserMessage(searchDto.getSearchString()); Prompt prompt = new Prompt(List.of(systemMessage, userMessage)); LocalDateTime start = LocalDateTime.now(); AiResponse response = aiClient.generate(prompt); LOGGER.info("AI response time: {}ms", ZonedDateTime.of(LocalDateTime.now(), ZoneId.systemDefault()).toInstant().toEpochMilli() - ZonedDateTime.of(start, ZoneId.systemDefault()).toInstant() .toEpochMilli()); var documents = mostSimilar.stream().map(myGen -> myGen.getMetadata().get(ID)).filter(myId -> Optional.ofNullable(myId).stream().allMatch(myId1 -> (myId1 instanceof String))).map(myId -> Long.parseLong(((String) myId))) .map(this.documentRepository::findById) .filter(Optional::isPresent) .map(Optional::get).toList(); return new AiResult(searchDto.getSearchString(), response.getGenerations(), documents); } private Message getSystemMessage( List<org.springframework.ai.document.Document> similarDocuments, int tokenLimit) { String documents = similarDocuments.stream() .map(entry -> entry.getContent()) .filter(myStr -> myStr != null && !myStr.isBlank()) .map(myStr -> this.cutStringToTokenLimit(myStr, tokenLimit)) .collect(Collectors.joining("\n")); SystemPromptTemplate systemPromptTemplate = new SystemPromptTemplate(this.systemPrompt); Message systemMessage = systemPromptTemplate .createMessage(Map.of("documents", documents)); return systemMessage; } private String cutStringToTokenLimit(String documentStr, int tokenLimit) { String cutString = new String(documentStr); while (tokenLimit < new StringTokenizer(cutString, " -.;,").countTokens()){ cutString = cutString.length() > 1000 ? cutString.substring(0, cutString.length() - 1000) : ""; } return cutString; } The method first loads the documents best matching the 'searchDto.getSearchString()' from the vector database. To do that the OpenAI Embedding model is called to turn the search string into an embedding and with that embedding the vector database is queried for the AIDocuments with the lowest distance(the distance between the vectors of the search embedding and the database embedding). Then the AIDocument with the lowest distance is stored in the 'mostSimilar' variable. Then all the AIDocuments of the document chunks are collected by matching the document entity id of their Metadata 'id's. The 'systemMessage' is created with the 'documentChunks' or the 'mostSimilar' content. The 'getSystemMessage(...)' method takes them and cuts the contentChunks to a size that the OpenAI GPT models can handle and returns the 'Message'. Then the 'systemMessage' and the 'userMessage' are turned into a 'prompt' that is send with 'aiClient.generate(prompt)' to the OpenAi GPT model. After that the AI answer is available and the document entity is loaded with the id of the metadata of the 'mostSimilar' AIDocument. The 'AiResult' is created with the search string, the GPT answer, the document entity and is returned. The vector database repository DocumentVsRepositoryBean with the Spring AI 'VectorStore' looks like this: Java @Repository public class DocumentVSRepositoryBean implements DocumentVsRepository { private final VectorStore vectorStore; public DocumentVSRepositoryBean(JdbcTemplate jdbcTemplate, EmbeddingClient embeddingClient) { this.vectorStore = new PgVectorStore(jdbcTemplate, embeddingClient); } public void add(List<Document> documents) { this.vectorStore.add(documents); } public List<Document> retrieve(String query, int k, double threshold) { return new VectorStoreRetriever(vectorStore, k, threshold).retrieve(query); } public List<Document> retrieve(String query) { return new VectorStoreRetriever(vectorStore).retrieve(query); } } The repository has the 'vectorStore' property that is used to access the vector database. It is created in the constructor with the injected parameters with the 'new PgVectorStore(...)' call. The PgVectorStore class is provided as the Postgresql Vector database extension. It has the 'embeddingClient' to use the OpenAI Embedding model and the 'jdbcTemplate' to access the database. The method 'add(...)' calls the OpenAI Embedding model and adds AIDocuments to the vector database. The methods 'retrieve(...)' query the vector database for embeddings with the lowest distances. Conclusion Angular made the creation of the front end easy. The standalone components with lazy loading have made the initial load small. The Angular Material components have helped a lot with the implementation and are easy to use. Spring Boot with Spring AI has made the use of Large Language Models easy. Spring AI provides the framework to hide the creation of embeddings and provides an easy-to-use interface to store the AIDocuments in a vector database(several are supported). The creation of the embedding for the search prompt to load the nearest AIDocuments is also done for you and the interface of the vector database is simple. The Spring AI prompt classes make the creation of the prompt for the OpenAI GPT models also easy. Calling the model is done with the injected 'aiClient,' and the results are returned. Spring AI is a very good Framework from the Spring Team. There have been no problems with the experimental version. With Spring AI, the Large Language Models are now easy to use on our own documents.
In this article, I will show you how to use Cloudera DataFlow powered by Apache NiFi to interact with IBM WatsonX.AI foundation large language models in real time. We can work with any of the foundation models such as Google FLAN T5 XXL or IBM Granite models. I’ll show you how easy it is to build a real-time data pipeline feeding your Slack-like and mobile applications questions directly to secure WatsonX.AI models running in IBM Cloud. We will handle all the security, management, lineage, and governance with Cloudera Data Flow. As part of decision-making, we can choose different WatsonX.AI models on the fly based on what type of prompt it is. For example, if we want to continue a sentence versus answering a question I can pick different models. For questions answering Google FLAN T5 XXL works well. If I want to continue sentences I would use one of the IBM Granite models. You will notice how amazingly fast the WatsonX.AI models return the results we need. I do some quick enrichment and transformation and then send them out their way to Cloudera Apache Kafka to be used for continuous analytics and distribution to many other applications, systems, platforms, and downstream consumers. We will also output our answers to the original requester which could be someone in a Slack channel or someone in an application. All of this happens in real-time, with no code, full governance, lineage, data management, and security at any scale and on any platform. The power of IBM and Cloudera together in private, public, and hybrid cloud environments for real-time data and AI is just getting started. Try it today. Step By Step Real-Time Flow First, in Slack, I type a question: “Q: What is a good way to integrate Generative AI and Apache NiFi?” NiFi Flow Top Once that question is typed, the Slack server sends these events to our registered service. This can be hosted anywhere publicly facing. (Click here for Slack API link) Slack API Once enabled, your server will start receiving JSON events for each Slack post. This is easy to receive and parse in NiFi. Cloudera DataFlow enables receiving secure HTTPS REST calls in the public cloud-hosted edition with ease, even in Designer mode. NiFi Top Flow 2 In the first part of the flow, we received the REST JSON Post, which is as follows. Slackbot 1.0 (+https://api.slack.com/robots) application/json POST HTTP/1.1 { "token" : "qHvJe59yetAp1bao6wmQzH0C", "team_id" : "T1SD6MZMF", "context_team_id" : "T1SD6MZMF", "context_enterprise_id" : null, "api_app_id" : "A04U64MN9HS", "event" : { "type" : "message", "subtype" : "bot_message", "text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 20 This is a very rich detailed JSON file that we could push immediately raw to an Apache Iceberg Open Cloud Lakehouse, a Kafka topic, or an object store as a JSON document (Enhancement Option). I am just going to parse what I need. EvaluateJSONPath We parse out the channel ID and plain text of the post. I only want messages from general (“C1SD6N197”). Then I copy the texts to an inputs field as is required for Hugging Face. We check our input: if it’s stocks or weather (more to come) we avoid calling the LLM. SELECT * FROM FLOWFILE WHERE upper(inputs) like '%WEATHER%' AND not upper(inputs) like '%LLM SKIPPED%' SELECT * FROM FLOWFILE WHERE upper(inputs) like '%STOCK%' AND not upper(inputs) like '%LLM SKIPPED%' SELECT * FROM FLOWFILE WHERE (upper(inputs) like 'QUESTION:%' OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%' and not upper(inputs) like '%STOCK%' For Stocks processing: To parse what stock we need I am using my Open NLP processor to get it. So you will need to download the processor and the Entity extraction models. GitHub - tspannhw/nifi-nlp-processor: Apache NiFi NLP Processor Open NLP Example Apache NiFi Processor Then we pass that company name to an HTTP REST endpoint from AlphaVantage that converts the Company Name to Stock symbols. In free accounts, you only get a few calls a day, so if we fail we then bypass this step and try to just use whatever you passed in. Using RouteOnContent we filter an Error message out. Then we use a QueryRecord processor to convert from CSV to JSON and filter. SELECT name as companyName, symbol FROM FLOWFILE ORDER BY matchScore DESC LIMIT 1 We do a SplitRecord to ensure we are only one record. We then run EvaluateJsonPath to get our fields as attributes. In an UpdateAttribute we trim the symbol just in case. ${stockSymbol:trim()} We then pass that stock symbol to Twelve Data via InvokeHTTP to get our stock data. We then get a lot of stock data back. { "meta" : { "symbol" : "IBM", "interval" : "1min", "currency" : "USD", "exchange_timezone" : "America/New_York", "exchange" : "NYSE", "mic_code" : "XNYS", "type" : "Common Stock" }, "values" : [ { "datetime" : "2023-11-15 10:37:00", "open" : "152.07001", "high" : "152.08000", "low" : "151.99500", "close" : "152.00999", "volume" : "8525" }, { "datetime" : "2023-11-15 10:36:00", "open" : "152.08501", "high" : "152.12250", "low" : "152.08000", "close" : "152.08501", "volume" : "15204" } ... We then run EvaluateJSONPath to grab the exchange information. We fork the record to just get one record as this is just to return to Slack. We use UpdateRecord calls to enrich the stock data with other values. We then run a QueryRecord to limit us to 1 record to send to Slack. SELECT * FROM FLOWFILE ORDER BY 'datetime' DESC LIMIT 1 We run an EvaluateJsonPath to get the most value fields to display. We then run a PutSlack with our message. LLM Skipped. Stock Value for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} is ${closeStockValue}. stock date ${stockdateTime}. stock exchange ${exchange} We also have a separate flow that is split from Company Name. In the first step, we call Yahoo Finance to get RSS headlines for that stock. https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}®ion=US&lang=en-US We use QueryRecord to convert RSS/XML Records to JSON. We then run a SplitJSON to break out the news items. We run a SplitRecord to limit to 1 record. We use EvaluateJSONPath to get the fields we need for our Slack message. We then run UpdateRecord to finalize our JSON. We then send this message to Slack. LLM Skipped. Stock News Information for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} ${title} : ${description}. ${guid} article date ${pubdate} For those who selected weather, we follow a similar route (we should add caching with Redis @ Aiven) to stocks. We use my OpenNLP processor to extract locations you might want to have weather on. The next step is taking the output of the processor and building a value to send to our Geoencoder. weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "New York City")} If we can’t find a valid location, I am going to say “New York City." We could use some other lookup. I am doing some work on loading all locations and could do some advanced PostgreSQL searches on that - or perhaps OpenSearch or a vectorized datastore. I pass that location to Open Meteo to find the geo via InvokeHTTP. https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json We then parse the values we need from the results. { "results" : [ { "id" : 5128581, "name" : "New York", "latitude" : 40.71427, "longitude" : -74.00597, "elevation" : 10.0, "feature_code" : "PPL", "country_code" : "US", "admin1_id" : 5128638, "timezone" : "America/New_York", "population" : 8175133, "postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ], "country_id" : 6252001, "country" : "United States", "admin1" : "New York" } ], "generationtime_ms" : 0.92196465 } We then parse the results so we can call another API to get the current weather for that latitude and longitude via InvokeHTTP. https://api.weather.gov/points/${latitude:trim()},${longitude:trim()} The results are geo-json. { "@context": [ "https://geojson.org/geojson-ld/geojson-context.jsonld", { "@version": "1.1", "wx": "https://api.weather.gov/ontology#", "s": "https://schema.org/", "geo": "http://www.opengis.net/ont/geosparql#", "unit": "http://codes.wmo.int/common/unit/", "@vocab": "https://api.weather.gov/ontology#", "geometry": { "@id": "s:GeoCoordinates", "@type": "geo:wktLiteral" }, "city": "s:addressLocality", "state": "s:addressRegion", "distance": { "@id": "s:Distance", "@type": "s:QuantitativeValue" }, "bearing": { "@type": "s:QuantitativeValue" }, "value": { "@id": "s:value" }, "unitCode": { "@id": "s:unitCode", "@type": "@id" }, "forecastOffice": { "@type": "@id" }, "forecastGridData": { "@type": "@id" }, "publicZone": { "@type": "@id" }, "county": { "@type": "@id" } } ], "id": "https://api.weather.gov/points/40.7143,-74.006", "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.006, 40.714300000000001 ] }, "properties": { "@id": "https://api.weather.gov/points/40.7143,-74.006", "@type": "wx:Point", "cwa": "OKX", "forecastOffice": "https://api.weather.gov/offices/OKX", "gridId": "OKX", "gridX": 33, "gridY": 35, "forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast", "forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly", "forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35", "observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations", "relativeLocation": { "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.0279259, 40.745251000000003 ] }, "properties": { "city": "Hoboken", "state": "NJ", "distance": { "unitCode": "wmoUnit:m", "value": 3906.1522008034999 }, "bearing": { "unitCode": "wmoUnit:degree_(angle)", "value": 151 } } }, "forecastZone": "https://api.weather.gov/zones/forecast/NYZ072", "county": "https://api.weather.gov/zones/county/NYC061", "fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212", "timeZone": "America/New_York", "radarStation": "KDIX" } } We use EvaluateJSONPath to grab a forecast URL. Then we call that forecast URL via invokeHTTP. That produces a larger JSON output that we will parse for the results we want to return to Slack. { "@context": [ "https://geojson.org/geojson-ld/geojson-context.jsonld", { "@version": "1.1", "wx": "https://api.weather.gov/ontology#", "geo": "http://www.opengis.net/ont/geosparql#", "unit": "http://codes.wmo.int/common/unit/", "@vocab": "https://api.weather.gov/ontology#" } ], "type": "Feature", "geometry": { "type": "Polygon", "coordinates": [ [ [ -74.025095199999996, 40.727052399999998 ], [ -74.0295579, 40.705361699999997 ], [ -74.000948300000005, 40.701977499999998 ], [ -73.996479800000003, 40.723667899999995 ], [ -74.025095199999996, 40.727052399999998 ] ] ] }, "properties": { "updated": "2023-11-15T14:34:46+00:00", "units": "us", "forecastGenerator": "BaselineForecastGenerator", "generatedAt": "2023-11-15T15:11:39+00:00", "updateTime": "2023-11-15T14:34:46+00:00", "validTimes": "2023-11-15T08:00:00+00:00/P7DT17H", "elevation": { "unitCode": "wmoUnit:m", "value": 2.1335999999999999 }, "periods": [ { "number": 1, "name": "Today", "startTime": "2023-11-15T10:00:00-05:00", "endTime": "2023-11-15T18:00:00-05:00", "isDaytime": true, "temperature": 51, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 2.2222222222222223 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 68 }, "windSpeed": "1 to 7 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/day/bkn?size=medium", "shortForecast": "Partly Sunny", "detailedForecast": "Partly sunny, with a high near 51. Southwest wind 1 to 7 mph." }, { "number": 2, "name": "Tonight", "startTime": "2023-11-15T18:00:00-05:00", "endTime": "2023-11-16T06:00:00-05:00", "isDaytime": false, "temperature": 44, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 3.8888888888888888 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 82 }, "windSpeed": "8 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/night/sct?size=medium", "shortForecast": "Partly Cloudy", "detailedForecast": "Partly cloudy, with a low around 44. Southwest wind around 8 mph." }, { "number": 3, "name": "Thursday", "startTime": "2023-11-16T06:00:00-05:00", "endTime": "2023-11-16T18:00:00-05:00", "isDaytime": true, "temperature": 60, "temperatureUnit": "F", "temperatureTrend": "falling", "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 5.5555555555555554 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 82 }, "windSpeed": "6 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/day/few?size=medium", "shortForecast": "Sunny", "detailedForecast": "Sunny. High near 60, with temperatures falling to around 58 in the afternoon. Southwest wind around 6 mph." }, { "number": 4, "name": "Thursday Night", "startTime": "2023-11-16T18:00:00-05:00", "endTime": "2023-11-17T06:00:00-05:00", "isDaytime": false, "temperature": 47, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 6.1111111111111107 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 80 }, "windSpeed": "3 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/night/few?size=medium", "shortForecast": "Mostly Clear", "detailedForecast": "Mostly clear, with a low around 47. Southwest wind around 3 mph." }, { "number": 5, "name": "Friday", "startTime": "2023-11-17T06:00:00-05:00", "endTime": "2023-11-17T18:00:00-05:00", "isDaytime": true, "temperature": 63, "temperatureUnit": "F", "temperatureTrend": "falling", "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 20 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 12.222222222222221 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 86 }, "windSpeed": "2 to 10 mph", "windDirection": "S", "icon": "https://api.weather.gov/icons/land/day/bkn/rain,20?size=medium", "shortForecast": "Partly Sunny then Slight Chance Light Rain", "detailedForecast": "A slight chance of rain after 1pm. Partly sunny. High near 63, with temperatures falling to around 61 in the afternoon. South wind 2 to 10 mph. Chance of precipitation is 20%." }, { "number": 6, "name": "Friday Night", "startTime": "2023-11-17T18:00:00-05:00", "endTime": "2023-11-18T06:00:00-05:00", "isDaytime": false, "temperature": 51, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 70 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 12.777777777777779 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 100 }, "windSpeed": "6 to 10 mph", "windDirection": "SW", "icon": "https://api.weather.gov/icons/land/night/rain,60/rain,70?size=medium", "shortForecast": "Light Rain Likely", "detailedForecast": "Rain likely. Cloudy, with a low around 51. Chance of precipitation is 70%. New rainfall amounts between a quarter and half of an inch possible." }, { "number": 7, "name": "Saturday", "startTime": "2023-11-18T06:00:00-05:00", "endTime": "2023-11-18T18:00:00-05:00", "isDaytime": true, "temperature": 55, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 70 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 11.111111111111111 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 100 }, "windSpeed": "8 to 18 mph", "windDirection": "NW", "icon": "https://api.weather.gov/icons/land/day/rain,70/rain,30?size=medium", "shortForecast": "Light Rain Likely", "detailedForecast": "Rain likely before 1pm. Partly sunny, with a high near 55. Chance of precipitation is 70%." }, { "number": 8, "name": "Saturday Night", "startTime": "2023-11-18T18:00:00-05:00", "endTime": "2023-11-19T06:00:00-05:00", "isDaytime": false, "temperature": 40, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 1.1111111111111112 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 65 }, "windSpeed": "12 to 17 mph", "windDirection": "NW", "icon": "https://api.weather.gov/icons/land/night/few?size=medium", "shortForecast": "Mostly Clear", "detailedForecast": "Mostly clear, with a low around 40." }, { "number": 9, "name": "Sunday", "startTime": "2023-11-19T06:00:00-05:00", "endTime": "2023-11-19T18:00:00-05:00", "isDaytime": true, "temperature": 50, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": -0.55555555555555558 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 65 }, "windSpeed": "10 to 14 mph", "windDirection": "W", "icon": "https://api.weather.gov/icons/land/day/few?size=medium", "shortForecast": "Sunny", "detailedForecast": "Sunny, with a high near 50." }, { "number": 10, "name": "Sunday Night", "startTime": "2023-11-19T18:00:00-05:00", "endTime": "2023-11-20T06:00:00-05:00", "isDaytime": false, "temperature": 38, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": -0.55555555555555558 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 67 }, "windSpeed": "13 mph", "windDirection": "NW", "icon": "https://api.weather.gov/icons/land/night/few?size=medium", "shortForecast": "Mostly Clear", "detailedForecast": "Mostly clear, with a low around 38." }, { "number": 11, "name": "Monday", "startTime": "2023-11-20T06:00:00-05:00", "endTime": "2023-11-20T18:00:00-05:00", "isDaytime": true, "temperature": 46, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": -1.6666666666666667 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 70 }, "windSpeed": "13 mph", "windDirection": "NW", "icon": "https://api.weather.gov/icons/land/day/sct?size=medium", "shortForecast": "Mostly Sunny", "detailedForecast": "Mostly sunny, with a high near 46." }, { "number": 12, "name": "Monday Night", "startTime": "2023-11-20T18:00:00-05:00", "endTime": "2023-11-21T06:00:00-05:00", "isDaytime": false, "temperature": 38, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": null }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": -1.1111111111111112 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 70 }, "windSpeed": "10 mph", "windDirection": "N", "icon": "https://api.weather.gov/icons/land/night/sct?size=medium", "shortForecast": "Partly Cloudy", "detailedForecast": "Partly cloudy, with a low around 38." }, { "number": 13, "name": "Tuesday", "startTime": "2023-11-21T06:00:00-05:00", "endTime": "2023-11-21T18:00:00-05:00", "isDaytime": true, "temperature": 49, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 30 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 2.7777777777777777 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 73 }, "windSpeed": "9 to 13 mph", "windDirection": "E", "icon": "https://api.weather.gov/icons/land/day/bkn/rain,30?size=medium", "shortForecast": "Partly Sunny then Chance Light Rain", "detailedForecast": "A chance of rain after 1pm. Partly sunny, with a high near 49. Chance of precipitation is 30%." }, { "number": 14, "name": "Tuesday Night", "startTime": "2023-11-21T18:00:00-05:00", "endTime": "2023-11-22T06:00:00-05:00", "isDaytime": false, "temperature": 46, "temperatureUnit": "F", "temperatureTrend": null, "probabilityOfPrecipitation": { "unitCode": "wmoUnit:percent", "value": 50 }, "dewpoint": { "unitCode": "wmoUnit:degC", "value": 7.7777777777777777 }, "relativeHumidity": { "unitCode": "wmoUnit:percent", "value": 86 }, "windSpeed": "13 to 18 mph", "windDirection": "S", "icon": "https://api.weather.gov/icons/land/night/rain,50?size=medium", "shortForecast": "Chance Light Rain", "detailedForecast": "A chance of rain. Mostly cloudy, with a low around 46. Chance of precipitation is 50%." } ] } } We parse the data with EvaluateJSONPath to get primary fields for the weather. We then format those fields to PutSlack. LLM Skipped. Read forecast on ${date} for ${weatherlocation} @ ${latitude},${longitude} Used ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend} There is a wind ${winddirection} at ${windspeed}. ${detailedforecast} Slack Output If we do have an LLM question, let’s make sure it’s just one record. We use a few different models that are available at IBM WatsonX.AI on IBM Cloud to quickly be accessed by our REST prompts. I tested and built the prompts initially at IBM’s Prompt Lab and then copied the initial curl statement from there. Click here for supported foundation models available with IBM watsonx.ai. ibm/mpt-7b-instruct2meta-llama/llama-2–70b-chatibm/granite-13b-chat-v1 We have to send our unique secure key to IBM and they will give us a token to use in our next call. We parse out the question and then send it to WatsonX via REST API. We build a prompt to send to IBM as follows. { "model_id": "meta-llama/llama-2-70b-chat", "input": "${inputs:urlEncode()}", "parameters": { "decoding_method": "greedy", "max_new_tokens": 200, "min_new_tokens": 50, "stop_sequences": [], "repetition_penalty": 1 }, "project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" } We parse the generated text which is our Generative AI results plus some helpful metadata on timings. The result posted to Slack is as follows: “You can use Apache NiFi to integrate Generative AI models in several ways: Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi’s PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data. Real-time Inference: You can use NiFi’s StreamingJobs” After the Slackbot posted the results, it posted metrics and debugging information to the chat channel. All of the metadata is posted to another Slack channel for administrator monitoring. ==== NiFi to IBM WatsonX.AI LLM Answers On Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z Prompt: Q: What is a good way to integrate Generative AI and Apache NiFi? Response: ) You can use Apache NiFi to integrate Generative AI models in several ways: 1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering. 2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script. 3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data. 4. Real-time Inference: You can use NiFi's StreamingJobs Token: 200 Req Duration: 8153 HTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb IBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx IBM Msg ID: disclaimer_warning Model ID: meta-llama/llama-2-70b-chat Stop Reason: max_tokens Token Count: 38 TX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756 UUID: da0806cb-6133-4bf4-808e-1fbf419c09e3 Corr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756 Global TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b Service Time: 478 Request ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c File Name: 1a3c4386-86d2-4969-805b-37649c16addb Request Duration: 8153 Request URL: https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29 cf-ray: 82689bfd28e48ce2-EWR ===== Make Your Own Slackbot Slack Output Kafka Distribute Apache Flink SQL Table Creation DDL CREATE TABLE `ssb`.`Meetups`.`watsonairesults` ( `date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `advisoryId` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND ) WITH ( 'deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillmanswers', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'watsonxaillmconsumer' ) CREATE TABLE `ssb`.`Meetups`.`watsonxresults` ( `date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND ) WITH ( 'deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillm', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'allwatsonx1' ) Example Prompt {"inputs":"Please answer to the following question. What is the capital of the United States?"} IBM DB2 SQL alter table "DB2INST1"."TRAVELADVISORY" add column "summary" VARCHAR(2048); -- DB2INST1.TRAVELADVISORY definition CREATE TABLE "DB2INST1"."TRAVELADVISORY" ( "TITLE" VARCHAR(250 OCTETS) , "PUBDATE" VARCHAR(250 OCTETS) , "LINK" VARCHAR(250 OCTETS) , "GUID" VARCHAR(250 OCTETS) , "ADVISORYID" VARCHAR(250 OCTETS) , "DOMAIN" VARCHAR(250 OCTETS) , "CATEGORY" VARCHAR(4096 OCTETS) , "DESCRIPTION" VARCHAR(4096 OCTETS) , "UUID" VARCHAR(250 OCTETS) NOT NULL , "TS" BIGINT NOT NULL , "summary" VARCHAR(2048 OCTETS) ) IN "IBMDB2SAMPLEREL" ORGANIZE BY ROW; ALTER TABLE "DB2INST1"."TRAVELADVISORY" ADD PRIMARY KEY ("UUID") ENFORCED; GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1"; GRANT CONTROL ON INDEX "SYSIBM "."SQL230620142604860" TO USER "DB2INST1"; SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE FROM DB2INST1.TRAVELADVISORY t WHERE "summary" IS NOT NULL ORDER BY ts DESC Example Output Email GitHub README GitHub repo Video Source Code Source Code
In the vast landscape of big data processing, Apache Spark stands out as a powerful and versatile framework. While developing Spark applications is crucial, deploying and executing them efficiently is equally vital. One key aspect of deploying Spark applications is the use of "spark-submit," a command-line interface that facilitates the submission of Spark applications to a cluster. Understanding Spark Submit At its core, spark-submit is the entry point for submitting Spark applications. Whether you are dealing with a standalone cluster, Apache Mesos, Hadoop YARN, or Kubernetes, spark-submit acts as the bridge between your developed Spark code and the cluster where it will be executed. Configuring Spark Submit Configuring spark-submit is a crucial aspect of deploying Apache Spark applications, allowing developers to optimize performance, allocate resources efficiently, and tailor the execution environment to specific requirements. Here's a guide on configuring spark-submit for various scenarios: 1. Specifying the Application JAR Use the --class option to specify the main class for a Java/Scala application or the script file for a Python/R application. spark-submit --class com.example.MainClass mysparkapp.jar 2. Setting Master and Deploy Mode Specify the Spark master URL using the --master option. Choose the deploy mode with --deploy-mode (client or cluster). spark-submit --master spark://<master-url> --deploy-mode client mysparkapp.jar 3. Configuring Executor and Driver Memory Allocate memory for executors using --executor-memory. Set driver memory using --driver-memory. spark-submit --executor-memory 4G --driver-memory 2G mysparkapp.jar 4. Adjusting Executor Cores Use --executor-cores to specify the number of cores for each executor. spark-submit --executor-cores 4 mysparkapp.jar 5. Dynamic Allocation Enable dynamic allocation to dynamically adjust the number of executors based on workload. spark-submit --conf spark.dynamicAllocation.enabled=true mysparkapp.jar 6. Setting Configuration Properties Pass additional Spark configurations using --conf. spark-submit --conf spark.shuffle.compress=true mysparkapp.jar 7. External Dependencies Include external JARs using --jars. For Python dependencies, use --py-files. spark-submit --jars /path/to/dependency.jar mysparkapp.jar 8. Cluster Manager Integration For YARN, set the YARN queue using --queue. For Kubernetes, use --master k8s://<k8s-apiserver>. spark-submit --master yarn --deploy-mode cluster --queue myQueue mysparkapp.jar 9. Debugging and Logging Increase logging verbosity for debugging with --verbose. Redirect logs to a file using --conf spark.logFile=spark.log. spark-submit --verbose --conf spark.logFile=spark.log mysparkapp.jar 10. Application Arguments Pass arguments to your application after specifying the JAR file. spark-submit mysparkapp.jar arg1 arg2 Conclusion In this article, we delve into the nuances of spark-submit to empower developers with the knowledge needed for effective Spark application deployment. By mastering this command-line interface, developers can unlock the true potential of Apache Spark, ensuring that their big data applications run efficiently and seamlessly across diverse clusters. Stay tuned as we explore each facet of spark-submit to elevate your Spark deployment skills.
This is an article from DZone's 2023 Observability and Application Performance Trend Report.For more: Read the Report AIOps applies AI to IT operations, enabling agility, early issue detection, and proactive resolution to maintain service quality. AIOps integrates DataOps and MLOps, enhancing efficiency, collaboration, and transparency. It aligns with DevOps for application lifecycle management and automation, optimizing decisions throughout DataOps, MLOps, and DevOps. Observability for IT operations is a transformative approach that provides real-time insights, proactive issue detection, and comprehensive performance analysis, ensuring the reliability and availability of modern IT systems. Why AIOps Is Fundamental to Modern IT Operations AIOps streamlines operations by automating problem detection and resolution, leading to increased IT staff efficiency, outage prevention, improved user experiences, and optimized utilization of cloud technologies. The major contributions of AIOps are shared in Table 1: CONTRIBUTIONS OF AIOPS Key Functions Function Explanations Event correlation Uses rules and logic to filter and group event data, prioritizing service issues based on KPIs and business metrics. Anomaly detection Identifies normal and abnormal behavior patterns, monitoring multiple services to predict and mitigate potential issues. Automated incident management Aims to automate all standardized, high-volume, error-sensitive, audit-critical, repetitive, multi-person, and time-sensitive tasks. Meanwhile, it preserves human involvement in low ROI and customer support-related activities. Performance optimization Analyzes large datasets employing AI and ML, proactively ensuring service levels and identifying issue root causes. Enhanced collaboration Fosters collaboration between IT teams, such as DevOps, by providing a unified platform for monitoring, analysis, and incident response. Table 1 How Does AIOps Work? AIOps involves the collection and analysis of vast volumes of data generated within IT environments, such as network performance metrics, application logs, and system alerts. AIOps uses these insights to detect patterns and anomalies, providing early warnings for potential issues. By integrating with other DevOps practices, such as DataOps and MLOps, it streamlines processes, enhances efficiency, and ensures a proactive approach to problem resolution. AIOps is a crucial tool for modern IT operations, offering the agility and intelligence required to maintain service quality in complex and dynamic digital environments. Figure 1: How AIOps works Popular AIOps Platforms and Key Features Leading AIOps platforms are revolutionizing IT operations by seamlessly combining AI and observability, enhancing system reliability, and optimizing performance across diverse industries. The following tools are just a few of many options: Prometheus acts as an efficient AIOps platform by capturing time-series data, monitoring IT environments, and providing anomaly alerts. OpenNMS automatically discovers, maps, and monitors complex IT environments, including networks, applications, and systems. Shinken enables users to monitor and troubleshoot complex IT environments, including networks and applications. The key features of the platforms and the role they play in AIOps are shared in Table 2: KEY FEATURES OF AIOPS PLATFORMS AND THE CORRESPONDING TASKS Features Tasks Visibility Provides insight into the entire IT environment, allowing for comprehensive monitoring and analysis. Monitoring and management Monitors the performance of IT systems and manages alerts and incidents. Performance Measures and analyzes system performance metrics to ensure optimal operation. Functionality Ensures that the AIOps platform offers a range of functionalities to meet various IT needs. Issue resolution Utilizes AI-driven insights to address and resolve IT issues more effectively. Analysis Analyzes data and events to identify patterns, anomalies, and trends, aiding in proactive decision-making. Table 2 Observability's Role in IT Operations Observability plays a pivotal role in IT operations by offering the means to monitor, analyze, and understand the intricacies of complex IT systems. It enables continuous tracking of system performance, early issue detection, and root cause analysis. Observability data empowers IT teams to optimize performance, allocate resources efficiently, and ensure a reliable user experience. It supports proactive incident management, compliance monitoring, and data-driven decision-making. In a collaborative DevOps environment, observability fosters transparency and enables teams to work cohesively toward system reliability and efficiency. Data sources like logs, metrics, and traces play a crucial role in observability by providing diverse and comprehensive insights into the behavior and performance of IT systems. ROLES OF DATA SOURCES Logs Metrics Traces Event tracking Root cause analysis Anomaly detection Compliance and auditing Performance monitoring Threshold alerts Capacity planning End-to-end visibility Latency analysis Dependency mapping Table 3 Challenges of Observability Observability is fraught with multiple technical challenges. Accidental invisibility takes place where critical system components or behaviors are not being monitored, leading to blind spots in observability. The challenge of insufficient source data can result in incomplete or inadequate observability, limiting the ability to gain insights into system performance. Dealing with multiple information formats poses difficulties in aggregating and analyzing data from various sources, making it harder to maintain a unified view of the system. Popular Observability Platforms and Key Features Observability platforms offer a set of key capabilities essential for monitoring, analyzing, and optimizing complex IT systems. OpenObserve provides scheduled and real-time alerts and reduces operational costs. Vector allows users to collect and transform logs, metrics, and traces. The Elastic Stack — comprising Elasticsearch, Kibana, Beats, and Logstash — can search, analyze, and visualize data in real time. The capabilities of observability platforms include real-time data collection from various sources such as logs, metrics, and traces, providing a comprehensive view of system behavior. They enable proactive issue detection, incident management, root cause analysis, system reliability aid, and performance optimization. Observability platforms often incorporate machine learning for anomaly detection and predictive analysis. They offer customizable dashboards and reporting for in-depth insights and data-driven decision-making. These platforms foster collaboration among IT teams by providing a unified space for developers and operations to work together, fostering a culture of transparency and accountability. Leveraging AIOps and Observability for Enhanced Performance Analytics Synergizing AIOps and observability represents a cutting-edge strategy to elevate performance analytics in IT operations, enabling data-driven insights, proactive issue resolution, and optimized system performance. Observability Use Cases Best Supported by AIOps Elevating cloud-native and hybrid cloud observability with AIOps: AIOps transcends the boundaries between cloud-native and hybrid cloud environments, offering comprehensive monitoring, anomaly detection, and seamless incident automation. It adapts to the dynamic nature of cloud-native systems while optimizing on-premises and hybrid cloud operations. This duality makes AIOps a versatile tool for modern enterprises, ensuring a consistent and data-driven approach to observability, regardless of the infrastructure's intricacies. Seamless collaboration of dev and ops teams with AIOps: AIOps facilitates the convergence of dev and ops teams in observability efforts. By offering a unified space for data analysis, real-time monitoring, and incident management, AIOps fosters transparency and collaboration. It enables dev and ops teams to work cohesively, ensuring the reliability and performance of IT systems. Challenges To Adopting AIOps and Observability The three major challenges to adopting AIOps and observability are data complexity, integration complexity, and data security. Handling the vast and diverse data generated by modern IT environments can be overwhelming. Organizations need to manage, store, and analyze this data efficiently. Integrating AIOps and observability tools with existing systems and processes can be complex and time-consuming, potentially causing disruptions if not executed properly. The increased visibility into IT systems also raises concerns about data security and privacy. Ensuring the protection of sensitive information is crucial. Impacts and Benefits of Combining AIOps and Observability Across Sectors The impacts and benefits of integrating AIOps and observability transcend industries, enhancing reliability, efficiency, and performance across diverse sectors. It helps in improved incident response by using machine learning to detect patterns and trends, enabling proactive issue resolution, and minimizing downtime. Predictive analytics anticipates capacity needs and optimizes resource allocation in advance, which ensures uninterrupted operations. Full-stack observability leverages data from various sources — including metrics, events, logs, and traces (MELT) — to gain comprehensive insights into system performance, supporting timely issue identification and resolution. MELT capabilities are the key drivers where metrics help pinpoint issues, events automate alert prioritization, logs aid in root cause analysis, and traces assist in locating problems within the system. All contribute to improved operational efficiency. APPLICATION SCENARIOS OF COMBINING AIOPS AND OBSERVABILITY Industry Sectors Key Contributions Finance Enhance fraud detection, minimize downtime, and ensure compliance with regulatory requirements, thus safeguarding financial operations. Healthcare Improve patient outcomes by guaranteeing the availability and performance of critical healthcare systems and applications, contributing to better patient care. Retail Optimize supply chain operations, boost customer experiences, and maintain online and in-store operational efficiency. Manufacturing Enhance the reliability and efficiency of manufacturing processes through predictive maintenance and performance optimization. Telecommunications Support network performance to ensure reliable connectivity and minimal service disruptions. E-commerce Real-time insights into website performance, leading to seamless shopping experiences and improved conversion rates. Table 4 The application scenarios of combining AIOps and observability span diverse industries, showcasing their transformative potential in improving system reliability, availability, and performance across the board. Operational Guidance for AIOps Implementation Operational guidance for AIOps implementation offers a strategic roadmap to navigate the complexities of integrating AI into IT operations, ensuring successful deployment and optimization. Figure 2: Steps for implementing AIOps The Future of AIOps in Observability: The Road Ahead AIOps' future in observability promises to be transformative. As IT environments become more complex and dynamic, AIOps will play an increasingly vital role in ensuring system reliability and performance and will continue to evolve, integrating with advanced technologies like cognitive automation, natural language understanding (NLU), large language models (LLMs), and generative AI. APPLICATION SCENARIOS OF COMBINING AIOPS AND OBSERVABILITY Impact Area Role of AIOps Synergy With Cognitive Automation LLM and Generative AI Integration Data collection and analysis Collects and analyzes a wide range of IT data, including performance metrics, logs, and incidents Process unstructured data, such as emails, documents, and images Predict potential issues based on historical data patterns and generate reports Incident management Automatically detects, prioritizes, and responds to IT incidents Extract relevant information from incident reports and suggest or implement appropriate actions Understand its context and generate appropriate responses Root cause analysis Identifies root causes of incidents Access historical documentation and knowledge bases to offer detailed explanations and solutions Provide recommendations by analyzing historical data for resolving issues NLU Uses NLU to process user queries and understand context Engage in natural language conversations with IT staff or end-users, improving user experiences Power chatbots and virtual IT assistants, offering user-friendly interaction and support to answer queries and provide guidance Table 5 Conclusion The fusion of AI/ML with AIOps has ushered in a new era of observability. IT operations are constantly evolving, and so is the capability to monitor, analyze, and optimize performance. In the age of AI/ML-driven observability, our IT operations won't merely survive, but will thrive, underpinned by data-driven insights, predictive analytics, and an unwavering commitment to excellence. References: OpenNMS repositories, GitHub OpenObserve repositories, GitHub OpsPAI/awesome-AIOps, GitHub Precompiled binaries and Docker images for Prometheus components Shinken documentation This is an article from DZone's 2023 Observability and Application Performance Trend Report.For more: Read the Report