Data Stream

Um fluxo de dados (data stream) é uma sequência ordenada, podendo ser ilimitada, de dados que chegam ao longo do tempo. Os intervalos de tempo entre a chegada de cada item podem variar. Os dados podem ser um atributo simples, como tuplas de banco de dados relacional ou estruturas mais complexas, como imagens [Krawczyk et al, 2017].

A ideia será escutar o stream por um certo período de tempo. Ficando na seguinte forma:

EasyStream.listen(stream, time = 1000)

Contudo, nessa primeira versão os dados estão em memória. Não necessitando por enquanto o tempo. Essa característica de fluxo de dados apresenta a necessidade de desenvolver sequenciamento em lote, não importando o tempo de escuta. Desta forma, podemos ter diversos tipos de Stream e todos irão herdar:

AbstractStream

A maior dificuldade é na parte na captura de dados pois pode variar. Alguns exemplos: CSV, API, memória, geração de dados, ... Para tal, foi imaginado um tipo genérico chamado Connector que será explorado na próxima seção.

Connectors

O Connector será responsável em capturar os dados na fonte e irá passar para o stream já processado. O stream possuirá um AbstractConnector e ele precisará possuir algumas funções implementadas e a principal é next que irá fornecer o próximo dado processado. Foi implementado dois Connectors:

  • GeneratorConnector: um conector para geração de dados.
  • TablesConnector: um conector para dados que respeitam a interface Tables.jl

GeneratorConnector

O GeneratorConnector tem como objetivo utilizar funções para gerar dados. A ideia é que ele possa ser utilizado com o SyntheticDatasets.jl. O construtor precisará receber a função geradora e poderá ser passado os argumentos adicionais dessa função como parâmetro

using EasyStream
using SyntheticDatasets

conn_gen = EasyStream.GeneratorConnector(SyntheticDatasets.generate_blobs, 
						centers = [-1 1;-0.5 0.75], 
                                        	cluster_std = 0.225, 
                                        	center_box = (-1.5, 1.5));

O DataFrames implementa a interface Tables.jl, mas existem outros pacotes como:

  • CSV
  • MLJ
  • SQLite

Exemplo:

using DataFrames
df = DataFrames.DataFrame(x = [1, 2, 3, 4, 5, 6], y = [6, 5, 4, 3, 2, 1]);

Para criar um TablesConnector é só passar o dado diretamente para ele.

conn_df = EasyStream.TablesConnector(df);

Existem outras funções auxiliaries como:

conn_df_suffle = EasyStream.TablesConnector(df, shuffle = true); # Suffle
conn_df_orderby = EasyStream.TablesConnector(df, :x); # Ordernação

Streams

BatchStream

Foi implementado o BatchStream, que é um AbstractStream, e ele abstrai o fluxo de dados. Ele receberá por parâmetro um AbstractConnector e opcionalmente o tamanho do batch. Exemplo:

stream = EasyStream.BatchStream(conn_gen; batch = 5);

Ou fazer uma interação através de um for.

for values in stream
	@show values
	break # é um loop infinito pois utiliza um gerador de dados
end

Dependendo do caso, o stream pode possuir um tamanho infinito e, assim, foi criado a função range para auxiliar definindo um limite na interação.

for values in EasyStream.range(5, stream)
	EasyStream.listen(stream)
end

Referências

Krawczyk, Bartosz, et al. 'Ensemble learning for data stream analysis: A survey.' Information Fusion 37 (2017): 132-156.