
How to Ingest Data into Elasticsearch: Logstash, REST API & Estuary
Learn 3 proven methods to ingest data into Elasticsearch — Logstash, REST Bulk API, and Estuary for real-time CDC. Step-by-step guide with configs, code examples, and a side-by-side comparison to help you choose the right approach.

Introduction
Elasticsearch has become one of the most widely adopted search and analytics engines in the industry, powering everything from e-commerce product search to security log analysis, observability dashboards, and real-time AI applications. At the heart of every Elasticsearch deployment is a fundamental question: how do you get your data in?
The answer is not one-size-fits-all. Whether you are streaming CDC events from a production PostgreSQL database, shipping server logs from a Kubernetes cluster, or indexing documents directly from your application, the right ingestion method depends on your latency requirements, data volume, transformation complexity, and operational constraints.
This guide covers the three most widely used methods for ingesting data into Elasticsearch:
- Method 1: Logstash - The battle-tested ETL pipeline from the Elastic Stack, ideal for log processing and complex multi-source transformations.
- Method 2: Elasticsearch REST API and Bulk API - Direct, application-level indexing with no additional infrastructure.
- Method 3: Estuary - right-time data platform that unifies batch and streaming into a single managed pipeline, delivering sub-second CDC with no infrastructure management.
Each section includes a step-by-step setup guide, configuration examples, a pros-and-cons breakdown, and a comparison table to help you make the right choice for your use case.
Prerequisites (Applies to All Methods)
Before starting with any of the methods below, ensure you have:
- A running Elasticsearch cluster — self-hosted, Elastic Cloud, or Elastic Cloud Enterprise (ECE).
- Your cluster endpoint URL (e.g., https://CLUSTER_ID.REGION.cloud.es.io:9243).
- Authentication credentials: generally an API key (id:api_key), though some integration methods accept username/password auth.
- An Elasticsearch role with at minimum: cluster privilege monitor, and per-index privileges read, write, view_index_metadata, and create_index. You can use a wildcard * to grant these across all indices.
- Basic familiarity with JSON documents and command-line tooling.
💡 If your Elasticsearch cluster is on a private network, you will also need to configure IP allowlisting or SSH tunneling, depending on the method you choose. Each method section covers the specifics.
Method 1: Ingesting Data into Elasticsearch Using Logstash
Best for: Complex ETL pipelines, log and metric processing, multi-source aggregation, and scenarios requiring powerful transformation logic.
What is Logstash?
Logstash is a free and open-source server-side data processing pipeline that is part of the Elastic Stack. It can ingest data from virtually any source — files, databases, message queues, HTTP endpoints, and more — apply powerful filters and transformations, and route the output to one or more destinations, with Elasticsearch being the most common.
Logstash operates on a simple but powerful three-stage pipeline model:
- Input: Defines where data comes from. Logstash has 200+ plugins including input plugins for file, jdbc, kafka, syslog, http, beats, and tcp sources.
- Filter: Applies transformations: parsing unstructured text with grok, adding geolocation data via geoip, renaming or removing fields with mutate, parsing JSON, converting dates, and more.
- Output: Sends the processed events to a destination. The Elasticsearch output plugin handles authentication, index routing, and retry logic.
💡 A common and recommended pattern is to combine Beats + Logstash: use lightweight Beats agents to collect data close to the source, then pass it through Logstash for enrichment and transformation before indexing into Elasticsearch.
Step-by-Step: Ingesting Data into Elasticsearch with Logstash
Step 1: Install Logstash
Download Logstash from the official Elastic website and install it using your platform's package manager or by extracting the archive. The latest versions of Logstash require Java 17 or 21.
plaintext# macOS / Linux via archive
tar -xzf logstash-9.x.x-linux-x86_64.tar.gz
cd logstash-9.x.x
plaintext# Verify installation
bin/logstash --version
Step 2: Create a Pipeline Configuration File
Logstash pipelines are defined in .conf files. Create a file named elasticsearch-pipeline.conf with the following structure:
plaintextinput {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "db_user"
jdbc_password => "db_password"
statement => "SELECT * FROM orders WHERE updated_at > :sql_last_value"
use_column_value => true
tracking_column => "updated_at"
schedule => "*/5 * * * *" # poll every 5 minutes
}
}
plaintextfilter {
mutate {
rename => { "order_total" => "total_amount" }
convert => { "total_amount" => "float" }
remove_field => ["@version"]
}
date {
match => ["created_at", "ISO8601"]
target => "@timestamp"
}
}
plaintextoutput {
elasticsearch {
hosts => ["https://CLUSTER_ID.REGION.cloud.es.io:9243"]
index => "orders-%{+YYYY.MM.dd}"
user => "elastic"
password => "your-password"
# OR use api_key:
# api_key => "base64encodedIdAndKey=="
ssl_certificate_verification => true
}
}
Step 3: Run the Pipeline
plaintextbin/logstash -f elasticsearch-pipeline.confLogstash will start, connect to the database, execute the SQL statement, apply the filter transformations, and begin indexing documents into Elasticsearch. You will see index confirmation messages in the console output.
Step 4: Verify in Elasticsearch
plaintextGET /orders-*/_search
{
"query": { "match_all": {} },
"size": 5
}
| Pros | Cons |
|---|---|
| 200+ input, filter, and output plugins | Operationally complex — requires JVM, configuration tuning |
| Powerful transformation capabilities (grok, geoip, mutate) | Higher resource consumption than Beats alone |
| Native Elastic Stack integration | No built-in real-time CDC for relational databases |
| Dead letter queue for error handling | Self-managed infrastructure — no managed option |
| Supports persistent queues for durability | Polling-based JDBC connector introduces latency |
Method 2: Elasticsearch REST API and Bulk API
Best for: Application-level direct indexing, one-off data loads, development and debugging, and scenarios where you want to index documents from within your application code.
What is the Elasticsearch REST API?
Elasticsearch exposes a comprehensive RESTful HTTP API that allows you to index, update, delete, and search documents directly. For production workloads with high throughput, the Bulk API is the preferred approach — it allows you to send multiple operations in a single HTTP request, dramatically reducing overhead.
Elasticsearch also provides an Ingest Pipeline feature — a built-in preprocessing capability that lets you apply processors (similar to Logstash filters) directly within Elasticsearch nodes, without any external tooling. This is ideal for lightweight transformations on data coming in through the REST API.
Single Document Indexing
Use a POST request to index a single document into an Elasticsearch index:
plaintextPOST /my-index/_doc
{
"order_id": "ORD-1001",
"customer": "Jane Smith",
"total_amount": 149.99,
"status": "shipped",
"created_at": "2025-03-01T10:30:00Z"
}
To index with a specific document ID (enabling upserts), use PUT:
plaintextPUT /my-index/_doc/ORD-1001
{
"order_id": "ORD-1001",
"status": "delivered"
}
Bulk Indexing for High-Volume Loads
The Bulk API allows you to send thousands of documents in a single request. Each operation requires two lines: an action/metadata line followed by the document body.
plaintextPOST _bulk
{ "index": { "_index": "my-orders", "_id": "1" } }
{ "order_id": "ORD-1001", "customer": "Jane", "total": 149.99, "status": "shipped" }
{ "index": { "_index": "my-orders", "_id": "2" } }
{ "order_id": "ORD-1002", "customer": "Bob", "total": 89.50, "status": "pending" }
{ "update": { "_index": "my-orders", "_id": "1" } }
{ "doc": { "status": "delivered" } }
{ "delete": { "_index": "my-orders", "_id": "2" } }
💡 Best practice: keep bulk request sizes between 5–15 MB per batch. Monitor indexing performance and adjust request size accordingly. Use multiple threads or async clients to maximize throughput.
Using Ingest Pipelines for Preprocessing
Define an ingest pipeline to apply transformations before indexing. This removes the need for Logstash for simple enrichment tasks:
plaintextPUT _ingest/pipeline/orders_pipeline
{
"description": "Enrich and normalize order documents",
"processors": [
{ "set": { "field": "ingested_at", "value": "{{_ingest.timestamp}}" } },
{ "uppercase": { "field": "status" } },
{ "remove": { "field": "internal_id", "ignore_missing": true } }
]
}
Then reference the pipeline when indexing:
plaintextPOST /my-orders/_doc?pipeline=orders_pipeline
{
"order_id": "ORD-1003",
"status": "shipped",
"internal_id": "X9921"
}
Using an Official Elasticsearch Client
For application-level ingestion, Elastic provides official clients for Python, Node.js, Java, Go, Ruby, and .NET. Using these clients is strongly recommended over raw HTTP calls — they handle connection pooling, retry logic, and serialization automatically.
plaintext# Python example using the elasticsearch-py client
from elasticsearch import Elasticsearch, helpers
es = Elasticsearch(
"https://CLUSTER_ID.REGION.cloud.es.io:9243",
api_key="base64encodedIdAndKey=="
)
plaintext# Bulk index using helpers.bulk()
docs = [
{ "_index": "my-orders", "_id": doc["id"], **doc }
for doc in your_data_source()
]
helpers.bulk(es, docs)
| Pros | Cons |
|---|---|
| No additional infrastructure required | No continuous CDC — application must push data explicitly |
| Lowest latency for application-level writes | Developers must implement retry, deduplication, and error logic |
| Official clients for all major languages | Bulk API batching adds engineering complexity at scale |
| Ingest pipelines handle lightweight transformations | No automatic schema discovery or index creation |
| Perfect for dev, debugging, and ad-hoc loads | Deletes/updates must be handled manually in application code |
Method 3: Ingesting Data into Elasticsearch in Real Time Using Estuary
Best for: Real-time CDC from databases and APIs, no-code pipeline setup, continuous sync with automatic index creation, and enterprise workloads requiring dependable right-time data delivery.
What is Estuary?
Estuary is the right-time data platform — a fully managed platform that replaces fragmented data stacks by consolidating CDC, streaming, batch, and pipeline management into a single system. Its core philosophy, the Right-Time Data Manifesto, is that data should move when your business needs it, not when your architecture allows it.
Unlike Logstash (which polls) or the REST API (which requires the application to push), Estuary captures changes from source systems in real time using Change Data Capture (CDC) and continuously materializes them into destinations like Elasticsearch — with sub-second latency, exactly-once semantics, and automatic schema handling.
Estuary's architecture is built around three core primitives:
- Capture: A connector that ingests data from a source (PostgreSQL, MySQL, MongoDB, Salesforce, S3, Kafka, and 120+ others) into an Estuary Collection.
- Collection: A real-time, schema-enforced data lake backed by cloud object storage (S3, GCS, or Azure Blob). Collections act as the durable source of truth for all downstream materializations.
- Materialization: A connector that continuously writes Collection data to a destination. The Elasticsearch connector materializes data into Elasticsearch indices with automatic mapping, upsert/delete propagation, and configurable field types.
Step-by-Step: Ingesting Data into Elasticsearch with Estuary
Step 1: Create an Estuary Account
Register for a free account at dashboard.estuary.dev/register. No credit card required. The free tier is generous enough for most development and proof-of-concept workloads.
Step 2: Configure Network Access
You must allow Estuary's egress IPs to connect to your Elasticsearch cluster. Choose one of the following methods:
- IP Allowlist (recommended for Elastic Cloud): Add Estuary's published IP addresses to your cluster's network firewall.
- SSH Tunnel (for private or on-premises clusters): Set up an SSH server on your cloud platform and add a networkTunnel stanza to your connector configuration.
⚠️ A common cause of connection failures is an incomplete IP allowlist. If you see dial timeout errors, double-check that all Estuary IPs are allowlisted — not just a subset.
Step 3: Set Up a Capture (Source Connector)
A Capture pulls data from your source system into an Estuary Collection in real time.
- In the Estuary dashboard, click Sources in the left sidebar.
- Click + New Capture.
- Search for your source connector (e.g., PostgreSQL, MySQL, MongoDB, Salesforce, Amazon S3).
- Enter your connection credentials and click Next.
- Estuary auto-discovers your source schema and generates a Collection schema with inferred field types.
- Review the schema on the Collection tab to verify the inferred structure.
- Click Save and Publish.
💡 If you haven't set up a source yet, start from the Estuary quickstart guide to walk through creating your first Capture. You can pick back up here once the Capture is running.
Step 4: Create the Elasticsearch Materialization
- In the dashboard, click Destinations in the left sidebar.
- Click + New Materialization.
- Search for Elastic and select the Elasticsearch connector tile.
- Give your materialization a unique name (e.g., abc/mat-elasticsearch).
Step 5: Configure the Connector Endpoint
Fill in the following connection details:
Endpoint: https://CLUSTER_ID.REGION.cloud.es.io:9243
Authentication — choose one method:
- Username + password authentication
- API key authentication
💡 For API Key auth, the key must be Base64-encoded as id:api_key (colon-separated). A common auth error is providing the raw API key ID without encoding. Encode it with: echo -n 'your_id:your_api_key' | base64
Step 6: Link Source Collections and Configure Index Bindings
- Click Source from Capture and select the Capture you created in Step 3.
- Estuary lists all available Collections from that Capture.
- Select the Collections to materialize into Elasticsearch.
Each Collection becomes a binding — a mapping from a Collection to an Elasticsearch index. For each binding, configure:
- index — The Elasticsearch index name. Defaults to the last segment of the Collection name. Customize as needed.
- delta_updates — false (default) for standard upsert mode; true for append-only / event stream mode.
- number_of_shards — Number of shards for the index. Default is 1.
💡 Estuary automatically creates the Elasticsearch index with appropriate field mappings derived from your Collection schema. You do not need to pre-create the index manually.
Step 7: Advanced Field Configuration (Optional)
For fine-grained control over index mappings and routing, use the Advanced Specification Editor or flowctl to update the field configuration on your binding. Options include:
- routing: Route documents to specific shards using a key field. Improves read performance for queries that filter on that field.
- mapping: Set the type of a field for the index mapping. Optionally specify a format as well. Common use cases include:
- "type": "keyword": Force string fields to keyword type for exact-match filtering and aggregations.
- "type": "date" with "format": "epoch_seconds": Map Unix timestamps correctly for time-series queries and Kibana visualizations.
For example, your specification might look like this in the connector’s Advanced Specification Editor:
plaintext{
"bindings": [
{
"resource": {
"index": "my-orders-index"
},
"source": "PREFIX/orders_collection",
"fields": {
"require": {
"id": {
"routing": true
},
"status": {
"mapping": {
"type": "keyword"
}
},
"created_at": {
"mapping": {
"type": "date",
"format": "epoch_seconds"
}
}
},
"recommended": true
}
}
]
}
Step 8: Full YAML Specification with flowctl (Optional)
If you prefer to manage your pipeline as code using the flowctl CLI, create a materialization specification following this format:
plaintextmaterializations:
PREFIX/mat_elasticsearch:
endpoint:
connector:
image: ghcr.io/estuary/materialize-elasticsearch:v3
config:
endpoint: https://ec47fc4d.us-east-1.aws.found.io:9243
credentials:
username: flow_user
password: secret
advanced:
number_of_replicas: 1
bindings:
# Standard upsert mode
- resource:
index: my-orders-index
delta_updates: false
number_of_shards: 3
source: PREFIX/orders_collection
# Delta / append-only mode for event streams
- resource:
index: my-events-index
delta_updates: true
source: PREFIX/events_collection
Step 9: Save, Publish, and Monitor
- Click Next, then Save and Publish in the top-right corner.
- Estuary validates the configuration and deploys the pipeline.
- Monitor pipeline health and data throughput from the Estuary dashboard.
Step 10: Verify Data in Elasticsearch
plaintextGET /my-orders-index/_search
{
"query": { "match_all": {} },
"size": 5
}
plaintext# Check generated field mappings
GET /my-orders-index/_mapping
Start Moving Data into Elasticsearch in Real Time
Deliver right-time data to Elasticsearch with unified CDC and streaming pipelines.
Understanding Update Modes
Estuary supports two update strategies per binding. This is one of its most powerful and frequently misunderstood features.
- Standard Updates (delta_updates: false — default) behave like an upsert: inserts create new documents, updates modify existing documents by key, and deletes from CDC-enabled sources (PostgreSQL, MySQL) are propagated to Elasticsearch. Use this mode for entity records, dimension tables, or any data where you need the current state.
- Delta Updates (delta_updates: true) are append-only: every change event is inserted as a new Elasticsearch document. Ideal for event streams, audit logs, or analytics workloads where you want complete history. Deletes from the source are not propagated to Elasticsearch in this mode.
⚠️ If you switch an existing materialization between standard and delta update modes on a live index, backfilling your data is strongly recommended. Changing modes can cause duplicate key errors or unexpected document behavior. Always test mode changes in a non-production index first.
| Pros | Cons |
|---|---|
| Sub-second real-time CDC with exactly-once semantics | Custom Elasticsearch analyzers not configurable via connector (pre-create index as workaround) |
| Automatic index creation with schema-derived field mappings | geo_point field type not natively supported (use lat/lng floats) |
| No infrastructure to manage — fully managed SaaS | Managed SaaS pricing model (vs. self-hosted open source) |
| 120+ source connectors (DBs, SaaS APIs, files, streams) | 20-day data retention on managed cloud storage (configure own bucket for production) |
| Handles inserts, updates, AND deletes automatically | |
| Supports BYOC and private deployment for enterprise security | |
| Latency should be a dial you control, not a limitation you accept. |
Method Comparison at a Glance
| Feature | Logstash | REST / Bulk API | Estuary |
|---|---|---|---|
| Setup complexity | High | Low–Medium | Low (no-code UI) |
| Real-time CDC | Limited | No | Yes (sub-second) |
| Transform support | Extensive (350+ plugins) | Via ingest pipelines | Via derivations |
| Managed infra | Self-managed | Self-managed | Fully managed |
| Handles deletes (CDC) | Plugin-dependent | Manual | Yes (standard mode) |
| Best latency | Seconds | On-demand | Sub-second |
| Best for | Log pipelines, ETL | App-level indexing, dev/debug | DB/API real-time sync |
| Cost model | Open source + infra | Open source + infra | Managed SaaS pricing |
Troubleshooting Common Issues
Connection and Network Errors
| Symptom | Likely Cause | Fix |
|---|---|---|
| dial timeout or connection refused | Estuary IPs not allowlisted | Re-check all Estuary IP addresses (docs.estuary.dev/reference/allow-ip-addresses/) are in your cluster firewall rules. |
| Logstash Connection refused to Elasticsearch | Wrong host or port in output config | Verify the full URL including port (9200 for local, 9243 for Elastic Cloud). Ensure SSL settings match. |
| Auth error on startup | Wrong credential format for Estuary API key | For API key auth, encode as Base64 of id:api_key. Run: echo -n 'id:key' |
Schema and Type Mapping Errors
| Symptom | Likely Cause | Fix |
|---|---|---|
| Field mapped as wrong type | Elasticsearch dynamic mapping mismatch | For Estuary: use fields.require.<field>.mapping to explicitly set the type. For REST API: define explicit mappings with PUT /index before indexing. |
| String field not aggregatable in Kibana | Field mapped as text instead of keyword | Estuary: set mapping type to keyword. REST API: use .keyword sub-field or explicit keyword mapping. |
| Timestamps not recognized as dates | Unix epoch not mapped as date type | Estuary: set mapping type: date with format: epoch_seconds. Ingest pipeline: use date processor. |
| geo_point not supported (Estuary) | Connector limitation | Use separate latitude and longitude float fields as a workaround. |
Index and Write Errors
| Symptom | Likely Cause | Fix |
|---|---|---|
| Index not created automatically | Insufficient role privileges | Verify create_index privilege is granted. Use a wildcard * to cover all indices. |
| Logstash bulk indexing errors | Documents too large or malformed | Check the Logstash dead letter queue. Reduce max_batch_size in the Elasticsearch output plugin. |
| Bulk API returns partial success (errors: true) | Some documents failed validation | Inspect the items array in the bulk response. Each failed item includes a status code and error message. |
| Custom analyzer not applied (Estuary) | Analyzer config not supported via connector | Pre-create the index in Elasticsearch with your desired settings.analysis config before connecting Estuary. This requires an advanced feature flag setup. |
Which Method Should You Choose?
Use the table below to match your use case to the right ingestion method. In practice, production environments can combine two or more methods — for example, using Estuary to ingest continuous database CDC alongside Logstash for low-level application or system logs.
| Use Case | Recommended Method | Why |
|---|---|---|
| Real-time CDC from a database (Postgres, MySQL, MongoDB) | Estuary | Sub-second latency, handles inserts/updates/deletes automatically |
| Log or metric data from servers / Kubernetes / cloud | Logstash + Beats | Beats modules natively understand these formats, 70+ pre-built integrations |
| Indexing documents from your application at write time | REST / Bulk API | Direct, low-overhead, no extra infrastructure required |
| Large one-off historical data migration | Bulk API | Efficient batch indexing with full control over request sizing |
| Multi-source ETL pipeline with complex transformations | Logstash | 200+ input/filter/output plugins, rich transformation capabilities |
| Syncing SaaS APIs (Salesforce, HubSpot, Stripe, etc.) to Elasticsearch | Estuary | Pre-built connectors, no-code setup, continuous sync |
| Prototyping or dev/debug | REST API (Kibana Dev Tools) | Fastest feedback loop; no infrastructure needed |
| Enterprise with strict security / private deployment needs | Estuary | Supports BYOC and private deployment with fine-grained control |
Conclusion
There is no single best way to ingest data into Elasticsearch — the right method depends entirely on your data sources, latency requirements, and operational preferences.
Logstash remains the right choice when you need flexible, plugin-rich ETL for logs and metrics, particularly within a full Elastic Stack deployment.
The REST and Bulk API is the simplest path for application developers who need to index documents directly from their code, or for one-off data loads and prototyping.
Estuary stands out when you need a dependable, right-time data platform — one that delivers sub-second CDC from your operational databases and SaaS tools into Elasticsearch with zero infrastructure overhead and exactly-once guarantees. As Estuary's philosophy states: latency should be a dial you control, not a limitation you accept.
The best-performing Elasticsearch deployments tend to combine methods thoughtfully: real-time CDC via Estuary for operational data, Beats + Logstash for observability and log pipelines, and the Bulk API for historical migrations and application writes. Understanding each method deeply — as this guide has aimed to do — is the foundation for making that choice well.
Start Ingesting Data into Elasticsearch with Estuary Today
Stop settling for stale data. Estuary is the right-time data platform that delivers sub-second CDC from your databases and APIs into Elasticsearch — fully managed, exactly-once, and
built for the way your business actually moves.
- Start for free
- See how it works
- Talk to the team
- Use the official connector docs for the exact setup in this article

About the author
Dani is a data professional with a rich background in data engineering and real-time data platforms. At Estuary, Daniel focuses on promoting cutting-edge streaming solutions, helping to bridge the gap between technical innovation and developer adoption. With deep expertise in cloud-native and streaming technologies, Dani has successfully supported startups and enterprises in building robust data solutions.
















