데이터/복기

데이터 처리 파이프라인 프로젝트(1)

mini'scloud 2025. 11. 10. 13:29

팀원과 함께 클라우드상에서 데이터 파이프라인 프로젝트를 진행하면서 발생한 문제에 대해 정리를 해볼려고 함

해당 그림은 아키텍처의 일부임

 

해당 아키텍처에 대해 먼저 알아보면, 공공데이터에 있는 데이터를 가져와야 할거임

공공데이터는 실시간으로 업데이트 되는 것이 아니라 가끔식 업데이트를 진행하기에 Cron job을 수행하는 EventBridge를 통해 배치 작업을 수행하도록 설계를 하였음

 

S3를 스토리지로 사용하면, Raw 영역과, Glue를 통해 필터링을 수행한 영역으로 구분하였음

Glue를 통해 필터링을 수행하였을때 오류가 발생하거나 잘못 처리한 데이터들이 저장되었을때 이를 복구하기 위해서 Raw 데이터 영역이 필요하게 됨

 

RAW 버킷에는 API에서 가져온 원본(예: 복잡한 JSON, XML)이 그대로 저장되는데, Athena가 이를 직접 접근하는 것은 비효율적임

즉, AWS Glue는 이  Raw 데이터를 분석에 최적화된 포맷으로 변환해야 함

 

  • 포맷 변환: JSON/XML/CSV → parquet 또는 ORC
  • 파티셔닝: 데이터를 '날짜별', '지역별' 등 작은 폴더로 나누어 저장함
  • Athena는 이렇게 필터링된 버킷의 Parquet 데이터를 읽기 때문에, RAW의 JSON을 직접 읽을 때보다 빠르고 cos가 낮은 쿼리 수행이 가능해짐

아래는 배치 처리를 수행하는 lambda의 설정값이다.

cost를 최대한 적고 효율적으로 사용하기 위해 이렇게 세팅을 적용했음

처음에는 페이지네이션 기법으로 데이터 수집을 진행했지만, Runtime.OutOfMemory 문제가 발생함

 

초기 코드를 보면...

const allRows = [];
let startIndex = 1;

//API 호출당 1,000개의 데이터를 가져오도로 만들음
while (startIndex <= totalCount) {
    const batchData = await fetchBatch(startIndex);
    //응답 데이터를 메모리의 'allRows' 배열에 누적
    allRows.push(...batchData.row); // <- 이 지점에서 메모리 사용량 폭증
    startIndex += 1000;
}

//누적된 모든 데이터를 한 번에 S3에 업로드 시도
await s3Client.send(new PutObjectCommand({
    Bucket: S3_RAW_BUCKET,
    Key: 'single-large-file.json',
    Body: JSON.stringify(allRows)
}));

메모리(RAM)가 512MB였고, 데이터는 1GB 이상이었으니깐 당연히 데이터를 계속 축적하면 문제가 발생하게 됨

만약 데이터가 512MB를 넘지 않았더라도, S3작업에서는 추가적인 메모리가 요구됨

직렬화(Serialization)핵심 원리 때문임

S3(혹은 모든 네트워크)로 데이터를 전송할 때는 '자바스크립트 객체(Object/Array)' 자체를 보낼 수 없고, 오직 텍스트(string)나 바이트 형태로만 보낼 수 있음

JSON.stringify는 데이터를 S3로 전송하기 위해, 원본(Array)을 전송용 복사본(String)으로 새로 만드는 작업이 필요하기에 추가적인 메모리가 요구됨

 

이를 해결하기 위해 Streaming 방식을 사용해보았음

Streaming

데이터를 메모리에 축적해서 한번에 보내는게 아니라, 바로 S3에 보내도록 하였음

아래와 같이 코드를 수정하였음

let startIndex = 1;
let partIndex = 1;

while (startIndex <= totalCount) {
    const response = await fetch(apiUrl); //API 호출

    const partS3Key = `part_${partIndex}.json`;
    console.log(`Streaming to S3: ${partS3Key}`);

    const uploadParams = {
        Bucket: S3_RAW_BUCKET,
        Key: partS3Key,
        // Node.js 스트림(ReadableStream)을 Body로 직접 전달
        Body: response.body,
        ContentType: 'application/json'
    };

    // 저수준(low-level) API로 스트림 업로드 시도
    await s3Client.send(new PutObjectCommand(uploadParams));

    startIndex += 1000;
    partIndex++;
}

API 호출을 통해 데이터를 불러오면, 바로 S3에 분할 적재를 하도록 코드를 수정하였음

 

그런데 아래 문제가 또 발생하였음

즉, 스트림의 hash계산을 수행할 수 없는 오류가 발생하였음

data integrity 문제가 발생한거임

  • S3의 요구: S3(혹은 AWS SDK)는 기본적으로 데이터가 전송 중에 손상되지 않았는지 검증하기 위해 'Content-MD5 hash' 값을 계산하거난 파일 전체 크기를 알도록 함
  • hash의 한계: 계산을 위해서는 파일 전체를 다 읽어야만 계산할 수 있음
  • 즉, 스트림 중에 계산을 시도할려다가 수신이 완료되기전 계산을 할 수 없는 문제가 발생한거임

이를 해결하기 위해 High-level API를 사용해서 해결할 수 있음

이 방식은 데이터 스트림을 받으면, 메모리에 다 올리는 대신 조각을 내서 병렬로 전송을 하게 됨

 

 

1. new Upload(...)가 실행되면, SDK는 S3에 조각을 보낼거라 알리고, 멀티 파트 업로드를 시작함

2. API에서 response.body 스트림이 흘러들어옴

  • Upload 라이브러리는 이 스트림을 메모리에 쌓아두는 대신, 임시 버퍼에 데이터를 모으기 시작함
  • 이 버퍼가 정해진 크기(예: 5MB)에 도달할 때까지 데이터를 모음

 

3. 버퍼가 5MB로 꽉 차면, SDK는 이 5MB 조각 에 대해서만 해시값(ETag)을 계산함

  • 그리고 이 '5MB 조각'을 S3에 "이건 1번 조각"이라고 알리고 전송함
  • S3는 이 조각을 받아 저장하고 ETag 일치하는지 확인하고 응답함

 

4. Upload는 1번 조각을 전송하는 동안, 동시에 스트림의 다음 데이터를 다른 버퍼에 모음

  • 5MB가 또 차면, "이건 2번 조각이야"라며 병렬로 전송함
  • 이 과정을 스트림이 끝날 때까지 3번, 4번... 조각을 계속 만들어 보냅니다

 

5. 업로드 완료

  • 완료 요청을 보내면, s3내부에서 chunk들을 합쳐 파일을 완성함
// @aws-sdk/lib-storage는 Lambda 런타임에 기본 내장
import { Upload } from "@aws-sdk/lib-storage";
import { S3Client } from "@aws-sdk/client-s3";

const s3Client = new S3Client({});

// ... while 반복문 내부 ...
const response = await fetch(apiUrl); // API 호출

// 1. 고수준 API 'Upload' 인스턴스 생성
const parallelUploads3 = new Upload({
    client: s3Client,
    params: {
        Bucket: S3_RAW_BUCKET,
        Key: partS3Key,
        Body: response.body, // 스트림을 그대로 전달
        ContentType: 'application/json'
    }
});

// 2. 스트림이 완료될 때까지 멀티파트 업로드 실행
await parallelUploads3.done();

console.log(`Part ${partIndex} uploaded successfully.`);

즉, 순차적인 작업(저수전)에서 조각들을 여러 곳에서 처리할 수 있도록 병렬 수행이 가능하도록 변경되었음

 

S3 적재 결과를 확인해보자

저장된 파일들이 내부에 API가 분할 호출된 횟수만큼 여러 파일들이 생성되었음을 알 수 있음

이러한 형식은 Glue에서 매우 효율적으로 데이터를 처리할 수 있도록 해줌

즉, glue에서는 폴더 자체를 인식해 안에 여러개의 파일들이 있더라도 병렬로 여러개의 파일들을 작업할 수 있도록 하고 있음

 

'데이터 > 복기' 카테고리의 다른 글

Linux 파일 시스템 loop 문제 & 병목 현상  (1) 2026.01.28
미니PC 우분투 와이파이 연결  (0) 2025.12.04
Neo4j 그래프 DB 속도 향상  (0) 2025.06.27