Estuary

How to Ingest Data into Elasticsearch: Logstash, REST API & Estuary

Learn 3 proven methods to ingest data into Elasticsearch — Logstash, REST Bulk API, and Estuary for real-time CDC. Step-by-step guide with configs, code examples, and a side-by-side comparison to help you choose the right approach.

Ingest Data Into Elasticsearch
Share this article
SocialHP success story logo
SocialHP

SocialHP Scales Real-time Analytics to the Next Level with Estuary and ElasticSearch.

Read Success Story

Introduction

Elasticsearch has become one of the most widely adopted search and analytics engines in the industry, powering everything from e-commerce product search to security log analysis, observability dashboards, and real-time AI applications. At the heart of every Elasticsearch deployment is a fundamental question: how do you get your data in?

The answer is not one-size-fits-all. Whether you are streaming CDC events from a production PostgreSQL database, shipping server logs from a Kubernetes cluster, or indexing documents directly from your application, the right ingestion method depends on your latency requirements, data volume, transformation complexity, and operational constraints.

This guide covers the three most widely used methods for ingesting data into Elasticsearch:

  • Method 1: Logstash - The battle-tested ETL pipeline from the Elastic Stack, ideal for log processing and complex multi-source transformations.
  • Method 2: Elasticsearch REST API and Bulk API - Direct, application-level indexing with no additional infrastructure.
  • Method 3: Estuary - right-time data platform that unifies batch and streaming into a single managed pipeline, delivering sub-second CDC with no infrastructure management.

Each section includes a step-by-step setup guide, configuration examples, a pros-and-cons breakdown, and a comparison table to help you make the right choice for your use case.

Prerequisites (Applies to All Methods)

Before starting with any of the methods below, ensure you have:

  • A running Elasticsearch cluster — self-hosted, Elastic Cloud, or Elastic Cloud Enterprise (ECE).
  • Your cluster endpoint URL (e.g., https://CLUSTER_ID.REGION.cloud.es.io:9243).
  • Authentication credentials: generally an API key (id:api_key), though some integration methods accept username/password auth.
  • An Elasticsearch role with at minimum: cluster privilege monitor, and per-index privileges read, write, view_index_metadata, and create_index. You can use a wildcard * to grant these across all indices.
  • Basic familiarity with JSON documents and command-line tooling.

💡 If your Elasticsearch cluster is on a private network, you will also need to configure IP allowlisting or SSH tunneling, depending on the method you choose. Each method section covers the specifics.

Method 1: Ingesting Data into Elasticsearch Using Logstash

Best for: Complex ETL pipelines, log and metric processing, multi-source aggregation, and scenarios requiring powerful transformation logic.

What is Logstash?

Logstash is a free and open-source server-side data processing pipeline that is part of the Elastic Stack. It can ingest data from virtually any source — files, databases, message queues, HTTP endpoints, and more — apply powerful filters and transformations, and route the output to one or more destinations, with Elasticsearch being the most common.

Logstash operates on a simple but powerful three-stage pipeline model:

  • Input: Defines where data comes from. Logstash has 200+ plugins including input plugins for file, jdbc, kafka, syslog, http, beats, and tcp sources.
  • Filter: Applies transformations: parsing unstructured text with grok, adding geolocation data via geoip, renaming or removing fields with mutate, parsing JSON, converting dates, and more.
  • Output: Sends the processed events to a destination. The Elasticsearch output plugin handles authentication, index routing, and retry logic.

💡 A common and recommended pattern is to combine Beats + Logstash: use lightweight Beats agents to collect data close to the source, then pass it through Logstash for enrichment and transformation before indexing into Elasticsearch.

Step-by-Step: Ingesting Data into Elasticsearch with Logstash

Step 1: Install Logstash

Download Logstash from the official Elastic website and install it using your platform's package manager or by extracting the archive. The latest versions of Logstash require Java 17 or 21.

plaintext
# macOS / Linux via archive tar -xzf logstash-9.x.x-linux-x86_64.tar.gz cd logstash-9.x.x
plaintext
# Verify installation bin/logstash --version

Step 2: Create a Pipeline Configuration File

Logstash pipelines are defined in .conf files. Create a file named elasticsearch-pipeline.conf with the following structure:

plaintext
input { jdbc { jdbc_driver_library => "/path/to/mysql-connector.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" jdbc_user => "db_user" jdbc_password => "db_password" statement => "SELECT * FROM orders WHERE updated_at > :sql_last_value" use_column_value => true tracking_column => "updated_at" schedule => "*/5 * * * *" # poll every 5 minutes } }
plaintext
filter { mutate { rename => { "order_total" => "total_amount" } convert => { "total_amount" => "float" } remove_field => ["@version"] } date { match => ["created_at", "ISO8601"] target => "@timestamp" } }
plaintext
output { elasticsearch { hosts => ["https://CLUSTER_ID.REGION.cloud.es.io:9243"] index => "orders-%{+YYYY.MM.dd}" user => "elastic" password => "your-password" # OR use api_key: # api_key => "base64encodedIdAndKey==" ssl_certificate_verification => true } }

Step 3: Run the Pipeline

plaintext
bin/logstash -f elasticsearch-pipeline.conf

Logstash will start, connect to the database, execute the SQL statement, apply the filter transformations, and begin indexing documents into Elasticsearch. You will see index confirmation messages in the console output.

Step 4: Verify in Elasticsearch

plaintext
GET /orders-*/_search { "query": { "match_all": {} }, "size": 5 }
ProsCons
200+ input, filter, and output pluginsOperationally complex — requires JVM, configuration tuning
Powerful transformation capabilities (grok, geoip, mutate)Higher resource consumption than Beats alone
Native Elastic Stack integrationNo built-in real-time CDC for relational databases
Dead letter queue for error handlingSelf-managed infrastructure — no managed option
Supports persistent queues for durabilityPolling-based JDBC connector introduces latency

Method 2: Elasticsearch REST API and Bulk API

Best for: Application-level direct indexing, one-off data loads, development and debugging, and scenarios where you want to index documents from within your application code.

What is the Elasticsearch REST API?

Elasticsearch exposes a comprehensive RESTful HTTP API that allows you to index, update, delete, and search documents directly. For production workloads with high throughput, the Bulk API is the preferred approach — it allows you to send multiple operations in a single HTTP request, dramatically reducing overhead.

Elasticsearch also provides an Ingest Pipeline feature — a built-in preprocessing capability that lets you apply processors (similar to Logstash filters) directly within Elasticsearch nodes, without any external tooling. This is ideal for lightweight transformations on data coming in through the REST API.

Single Document Indexing

Use a POST request to index a single document into an Elasticsearch index:

plaintext
POST /my-index/_doc { "order_id": "ORD-1001", "customer": "Jane Smith", "total_amount": 149.99, "status": "shipped", "created_at": "2025-03-01T10:30:00Z" }

To index with a specific document ID (enabling upserts), use PUT:

plaintext
PUT /my-index/_doc/ORD-1001 { "order_id": "ORD-1001", "status": "delivered" }

Bulk Indexing for High-Volume Loads

The Bulk API allows you to send thousands of documents in a single request. Each operation requires two lines: an action/metadata line followed by the document body.

plaintext
POST _bulk { "index": { "_index": "my-orders", "_id": "1" } } { "order_id": "ORD-1001", "customer": "Jane", "total": 149.99, "status": "shipped" } { "index": { "_index": "my-orders", "_id": "2" } } { "order_id": "ORD-1002", "customer": "Bob", "total": 89.50, "status": "pending" } { "update": { "_index": "my-orders", "_id": "1" } } { "doc": { "status": "delivered" } } { "delete": { "_index": "my-orders", "_id": "2" } }

💡 Best practice: keep bulk request sizes between 5–15 MB per batch. Monitor indexing performance and adjust request size accordingly. Use multiple threads or async clients to maximize throughput.

Using Ingest Pipelines for Preprocessing

Define an ingest pipeline to apply transformations before indexing. This removes the need for Logstash for simple enrichment tasks:

plaintext
PUT _ingest/pipeline/orders_pipeline { "description": "Enrich and normalize order documents", "processors": [ { "set": { "field": "ingested_at", "value": "{{_ingest.timestamp}}" } }, { "uppercase": { "field": "status" } }, { "remove": { "field": "internal_id", "ignore_missing": true } } ] }

Then reference the pipeline when indexing:

plaintext
POST /my-orders/_doc?pipeline=orders_pipeline { "order_id": "ORD-1003", "status": "shipped", "internal_id": "X9921" }

Using an Official Elasticsearch Client

For application-level ingestion, Elastic provides official clients for Python, Node.js, Java, Go, Ruby, and .NET. Using these clients is strongly recommended over raw HTTP calls — they handle connection pooling, retry logic, and serialization automatically.

plaintext
# Python example using the elasticsearch-py client from elasticsearch import Elasticsearch, helpers es = Elasticsearch( "https://CLUSTER_ID.REGION.cloud.es.io:9243", api_key="base64encodedIdAndKey==" )
plaintext
# Bulk index using helpers.bulk() docs = [ { "_index": "my-orders", "_id": doc["id"], **doc } for doc in your_data_source() ] helpers.bulk(es, docs)
ProsCons
No additional infrastructure requiredNo continuous CDC — application must push data explicitly
Lowest latency for application-level writesDevelopers must implement retry, deduplication, and error logic
Official clients for all major languagesBulk API batching adds engineering complexity at scale
Ingest pipelines handle lightweight transformationsNo automatic schema discovery or index creation
Perfect for dev, debugging, and ad-hoc loadsDeletes/updates must be handled manually in application code

Method 3: Ingesting Data into Elasticsearch in Real Time Using Estuary

Best for: Real-time CDC from databases and APIs, no-code pipeline setup, continuous sync with automatic index creation, and enterprise workloads requiring dependable right-time data delivery.

What is Estuary?

Estuary is the right-time data platform — a fully managed platform that replaces fragmented data stacks by consolidating CDC, streaming, batch, and pipeline management into a single system. Its core philosophy, the Right-Time Data Manifesto, is that data should move when your business needs it, not when your architecture allows it.

Unlike Logstash (which polls) or the REST API (which requires the application to push), Estuary captures changes from source systems in real time using Change Data Capture (CDC) and continuously materializes them into destinations like Elasticsearch — with sub-second latency, exactly-once semantics, and automatic schema handling.

Estuary's architecture is built around three core primitives:

  • Capture: A connector that ingests data from a source (PostgreSQL, MySQL, MongoDB, Salesforce, S3, Kafka, and 120+ others) into an Estuary Collection.
  • Collection: A real-time, schema-enforced data lake backed by cloud object storage (S3, GCS, or Azure Blob). Collections act as the durable source of truth for all downstream materializations.
  • Materialization: A connector that continuously writes Collection data to a destination. The Elasticsearch connector materializes data into Elasticsearch indices with automatic mapping, upsert/delete propagation, and configurable field types.

Step-by-Step: Ingesting Data into Elasticsearch with Estuary

Step 1: Create an Estuary Account

Register for a free account at dashboard.estuary.dev/register. No credit card required. The free tier is generous enough for most development and proof-of-concept workloads.

Step 2: Configure Network Access

You must allow Estuary's egress IPs to connect to your Elasticsearch cluster. Choose one of the following methods:

  • IP Allowlist (recommended for Elastic Cloud): Add Estuary's published IP addresses to your cluster's network firewall.
  • SSH Tunnel (for private or on-premises clusters): Set up an SSH server on your cloud platform and add a networkTunnel stanza to your connector configuration.

⚠️ A common cause of connection failures is an incomplete IP allowlist. If you see dial timeout errors, double-check that all Estuary IPs are allowlisted — not just a subset.

Step 3: Set Up a Capture (Source Connector)

A Capture pulls data from your source system into an Estuary Collection in real time.

  1. In the Estuary dashboard, click Sources in the left sidebar.
  2. Click + New Capture.
  3. Search for your source connector (e.g., PostgreSQL, MySQL, MongoDB, Salesforce, Amazon S3).
  4. Enter your connection credentials and click Next.
  5. Estuary auto-discovers your source schema and generates a Collection schema with inferred field types.
  6. Review the schema on the Collection tab to verify the inferred structure.
  7. Click Save and Publish.

💡 If you haven't set up a source yet, start from the Estuary quickstart guide to walk through creating your first Capture. You can pick back up here once the Capture is running.

Step 4: Create the Elasticsearch Materialization

  1. In the dashboard, click Destinations in the left sidebar.
  2. Click + New Materialization.
  3. Search for Elastic and select the Elasticsearch connector tile.
  4. Give your materialization a unique name (e.g., abc/mat-elasticsearch).
Create the Elasticsearch Materialization

Step 5: Configure the Connector Endpoint

Configure Elasticsearch Connector

Fill in the following connection details:

Endpoint:  https://CLUSTER_ID.REGION.cloud.es.io:9243

Authentication — choose one method:

  • Username + password authentication
  • API key authentication

💡 For API Key auth, the key must be Base64-encoded as id:api_key (colon-separated). A common auth error is providing the raw API key ID without encoding. Encode it with: echo -n 'your_id:your_api_key' | base64

  1. Click Source from Capture and select the Capture you created in Step 3.
  2. Estuary lists all available Collections from that Capture.
  3. Select the Collections to materialize into Elasticsearch.

Each Collection becomes a binding — a mapping from a Collection to an Elasticsearch index. For each binding, configure:

  • index — The Elasticsearch index name. Defaults to the last segment of the Collection name. Customize as needed.
  • delta_updates — false (default) for standard upsert mode; true for append-only / event stream mode.
  • number_of_shards — Number of shards for the index. Default is 1.

💡 Estuary automatically creates the Elasticsearch index with appropriate field mappings derived from your Collection schema. You do not need to pre-create the index manually.

Step 7: Advanced Field Configuration (Optional)

For fine-grained control over index mappings and routing, use the Advanced Specification Editor or flowctl to update the field configuration on your binding. Options include:

  • routing: Route documents to specific shards using a key field. Improves read performance for queries that filter on that field.
  • mapping: Set the type of a field for the index mapping. Optionally specify a format as well. Common use cases include:
    • "type": "keyword": Force string fields to keyword type for exact-match filtering and aggregations.
    • "type": "date" with "format": "epoch_seconds": Map Unix timestamps correctly for time-series queries and Kibana visualizations.

For example, your specification might look like this in the connector’s Advanced Specification Editor:

plaintext
{ "bindings": [ { "resource": { "index": "my-orders-index" }, "source": "PREFIX/orders_collection", "fields": { "require": { "id": { "routing": true }, "status": { "mapping": { "type": "keyword" } }, "created_at": { "mapping": { "type": "date", "format": "epoch_seconds" } } }, "recommended": true } } ] }

Step 8: Full YAML Specification with flowctl (Optional)

If you prefer to manage your pipeline as code using the flowctl CLI, create a materialization specification following this format:

plaintext
materializations: PREFIX/mat_elasticsearch: endpoint: connector: image: ghcr.io/estuary/materialize-elasticsearch:v3 config: endpoint: https://ec47fc4d.us-east-1.aws.found.io:9243 credentials: username: flow_user password: secret advanced: number_of_replicas: 1 bindings: # Standard upsert mode - resource: index: my-orders-index delta_updates: false number_of_shards: 3 source: PREFIX/orders_collection # Delta / append-only mode for event streams - resource: index: my-events-index delta_updates: true source: PREFIX/events_collection

Step 9: Save, Publish, and Monitor

  1. Click Next, then Save and Publish in the top-right corner.
  2. Estuary validates the configuration and deploys the pipeline.
  3. Monitor pipeline health and data throughput from the Estuary dashboard.

Step 10: Verify Data in Elasticsearch

plaintext
GET /my-orders-index/_search { "query": { "match_all": {} }, "size": 5 }
plaintext
# Check generated field mappings GET /my-orders-index/_mapping

Start Moving Data into Elasticsearch in Real Time

Deliver right-time data to Elasticsearch with unified CDC and streaming pipelines.

Understanding Update Modes

Estuary supports two update strategies per binding. This is one of its most powerful and frequently misunderstood features.

  • Standard Updates (delta_updates: false — default) behave like an upsert: inserts create new documents, updates modify existing documents by key, and deletes from CDC-enabled sources (PostgreSQL, MySQL) are propagated to Elasticsearch. Use this mode for entity records, dimension tables, or any data where you need the current state.
  • Delta Updates (delta_updates: true) are append-only: every change event is inserted as a new Elasticsearch document. Ideal for event streams, audit logs, or analytics workloads where you want complete history. Deletes from the source are not propagated to Elasticsearch in this mode.

⚠️ If you switch an existing materialization between standard and delta update modes on a live index, backfilling your data is strongly recommended. Changing modes can cause duplicate key errors or unexpected document behavior. Always test mode changes in a non-production index first.

ProsCons
Sub-second real-time CDC with exactly-once semanticsCustom Elasticsearch analyzers not configurable via connector (pre-create index as workaround)
Automatic index creation with schema-derived field mappingsgeo_point field type not natively supported (use lat/lng floats)
No infrastructure to manage — fully managed SaaSManaged SaaS pricing model (vs. self-hosted open source)
120+ source connectors (DBs, SaaS APIs, files, streams)20-day data retention on managed cloud storage (configure own bucket for production)
Handles inserts, updates, AND deletes automatically 
Supports BYOC and private deployment for enterprise security 
Latency should be a dial you control, not a limitation you accept. 

Method Comparison at a Glance

FeatureLogstashREST / Bulk APIEstuary
Setup complexityHighLow–MediumLow (no-code UI)
Real-time CDCLimitedNoYes (sub-second)
Transform supportExtensive (350+ plugins)Via ingest pipelinesVia derivations
Managed infraSelf-managedSelf-managedFully managed
Handles deletes (CDC)Plugin-dependentManualYes (standard mode)
Best latencySecondsOn-demandSub-second
Best forLog pipelines, ETLApp-level indexing, dev/debugDB/API real-time sync
Cost modelOpen source + infraOpen source + infraManaged SaaS pricing

Troubleshooting Common Issues

Connection and Network Errors

SymptomLikely CauseFix
dial timeout or connection refusedEstuary IPs not allowlistedRe-check all Estuary IP addresses (docs.estuary.dev/reference/allow-ip-addresses/) are in your cluster firewall rules.
Logstash Connection refused to ElasticsearchWrong host or port in output configVerify the full URL including port (9200 for local, 9243 for Elastic Cloud). Ensure SSL settings match.
Auth error on startupWrong credential format for Estuary API keyFor API key auth, encode as Base64 of id:api_key. Run: echo -n 'id:key'

Schema and Type Mapping Errors

SymptomLikely CauseFix
Field mapped as wrong typeElasticsearch dynamic mapping mismatchFor Estuary: use fields.require.<field>.mapping to explicitly set the type. For REST API: define explicit mappings with PUT /index before indexing.
String field not aggregatable in KibanaField mapped as text instead of keywordEstuary: set mapping type to keyword. REST API: use .keyword sub-field or explicit keyword mapping.
Timestamps not recognized as datesUnix epoch not mapped as date typeEstuary: set mapping type: date with format: epoch_seconds. Ingest pipeline: use date processor.
geo_point not supported (Estuary)Connector limitationUse separate latitude and longitude float fields as a workaround.

Index and Write Errors

SymptomLikely CauseFix
Index not created automaticallyInsufficient role privilegesVerify create_index privilege is granted. Use a wildcard * to cover all indices.
Logstash bulk indexing errorsDocuments too large or malformedCheck the Logstash dead letter queue. Reduce max_batch_size in the Elasticsearch output plugin.
Bulk API returns partial success (errors: true)Some documents failed validationInspect the items array in the bulk response. Each failed item includes a status code and error message.
Custom analyzer not applied (Estuary)Analyzer config not supported via connectorPre-create the index in Elasticsearch with your desired settings.analysis config before connecting Estuary. This requires an advanced feature flag setup.

Which Method Should You Choose?

Use the table below to match your use case to the right ingestion method. In practice, production environments can combine two or more methods — for example, using Estuary to ingest continuous database CDC alongside Logstash for low-level application or system logs.

Use CaseRecommended MethodWhy
Real-time CDC from a database (Postgres, MySQL, MongoDB)EstuarySub-second latency, handles inserts/updates/deletes automatically
Log or metric data from servers / Kubernetes / cloudLogstash + BeatsBeats modules natively understand these formats, 70+ pre-built integrations
Indexing documents from your application at write timeREST / Bulk APIDirect, low-overhead, no extra infrastructure required
Large one-off historical data migrationBulk APIEfficient batch indexing with full control over request sizing
Multi-source ETL pipeline with complex transformationsLogstash200+ input/filter/output plugins, rich transformation capabilities
Syncing SaaS APIs (Salesforce, HubSpot, Stripe, etc.) to ElasticsearchEstuaryPre-built connectors, no-code setup, continuous sync
Prototyping or dev/debugREST API (Kibana Dev Tools)Fastest feedback loop; no infrastructure needed
Enterprise with strict security / private deployment needsEstuarySupports BYOC and private deployment with fine-grained control

Conclusion

There is no single best way to ingest data into Elasticsearch — the right method depends entirely on your data sources, latency requirements, and operational preferences.

Logstash remains the right choice when you need flexible, plugin-rich ETL for logs and metrics, particularly within a full Elastic Stack deployment.

The REST and Bulk API is the simplest path for application developers who need to index documents directly from their code, or for one-off data loads and prototyping.

Estuary stands out when you need a dependable, right-time data platform — one that delivers sub-second CDC from your operational databases and SaaS tools into Elasticsearch with zero infrastructure overhead and exactly-once guarantees. As Estuary's philosophy states: latency should be a dial you control, not a limitation you accept.

The best-performing Elasticsearch deployments tend to combine methods thoughtfully: real-time CDC via Estuary for operational data, Beats + Logstash for observability and log pipelines, and the Bulk API for historical migrations and application writes. Understanding each method deeply — as this guide has aimed to do — is the foundation for making that choice well.

Start Ingesting Data into Elasticsearch with Estuary Today

Stop settling for stale data. Estuary is the right-time data platform that delivers sub-second CDC from your databases and APIs into Elasticsearch — fully managed, exactly-once, and

built for the way your business actually moves.

Start streaming your data for free

Build a Pipeline

About the author

Picture of Dani Pálma
Dani PálmaHead of Data & Marketing

Dani is a data professional with a rich background in data engineering and real-time data platforms. At Estuary, Daniel focuses on promoting cutting-edge streaming solutions, helping to bridge the gap between technical innovation and developer adoption. With deep expertise in cloud-native and streaming technologies, Dani has successfully supported startups and enterprises in building robust data solutions.

Related Articles

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.